Change print to logger for better logging in the future

This commit also added --debug and --no-banner options, which will enable debug level of logging and supress the default banner
This commit is contained in:
KnugiHK
2025-05-11 16:53:46 +08:00
parent 0681661660
commit fa41572753
7 changed files with 226 additions and 110 deletions

View File

@@ -1,11 +1,14 @@
import time
import hmac
import io
import logging
import threading
import zlib
import concurrent.futures
from typing import Tuple, Union
from hashlib import sha256
from sys import exit
from Whatsapp_Chat_Exporter.utility import CRYPT14_OFFSETS, Crypt, DbType
from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, CRYPT14_OFFSETS, Crypt, DbType
try:
import zlib
@@ -23,6 +26,9 @@ else:
support_crypt15 = True
logger = logging.getLogger(__name__)
class DecryptionError(Exception):
"""Base class for decryption-related exceptions."""
pass
@@ -138,11 +144,28 @@ def _decrypt_crypt14(database: bytes, main_key: bytes, max_worker: int = 10) ->
iv = database[offsets["iv"]:offsets["iv"] + 16]
db_ciphertext = database[offsets["db"]:]
try:
return _decrypt_database(db_ciphertext, main_key, iv)
decrypted_db = _decrypt_database(db_ciphertext, main_key, iv)
except (zlib.error, ValueError):
pass # Try next offset
else:
logger.debug(
f"Decryption successful with known offsets: IV {offsets["iv"]}, DB {offsets["db"]}{CLEAR_LINE}"
)
return decrypted_db # Successful decryption
print("Common offsets failed. Initiating brute-force with multithreading...")
def animate_message(stop_event):
base_msg = "Common offsets failed. Initiating brute-force with multithreading"
dots = ["", ".", "..", "..."]
i = 0
while not stop_event.is_set():
logger.info(f"{base_msg}{dots[i % len(dots)]}\x1b[K\r")
time.sleep(0.3)
i += 1
logger.info(f"Common offsets failed but brute-forcing the offset works!{CLEAR_LINE}")
stop_event = threading.Event()
anim_thread = threading.Thread(target=animate_message, args=(stop_event,))
anim_thread.start()
# Convert brute force generator into a list for parallel processing
offset_combinations = list(brute_force_offset())
@@ -152,19 +175,23 @@ def _decrypt_crypt14(database: bytes, main_key: bytes, max_worker: int = 10) ->
start_iv, end_iv, start_db = offset_tuple
iv = database[start_iv:end_iv]
db_ciphertext = database[start_db:]
logger.debug(""f"Trying offsets: IV {start_iv}-{end_iv}, DB {start_db}{CLEAR_LINE}")
try:
db = _decrypt_database(db_ciphertext, main_key, iv)
print(
except (zlib.error, ValueError):
return None # Decryption failed, move to next
else:
stop_event.set()
anim_thread.join()
logger.info(
f"The offsets of your IV and database are {start_iv} and "
f"{start_db}, respectively. To include your offsets in the "
"program, please report it by creating an issue on GitHub: "
"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/discussions/47"
"\nShutting down other threads..."
f"\nShutting down other threads...{CLEAR_LINE}"
)
return db
except (zlib.error, ValueError):
return None # Decryption failed, move to next
with concurrent.futures.ThreadPoolExecutor(max_worker) as executor:
future_to_offset = {executor.submit(attempt_decrypt, offset): offset for offset in offset_combinations}
@@ -178,9 +205,14 @@ def _decrypt_crypt14(database: bytes, main_key: bytes, max_worker: int = 10) ->
return result
except KeyboardInterrupt:
print("\nBrute force interrupted by user (Ctrl+C). Exiting gracefully...")
stop_event.set()
anim_thread.join()
logger.info(f"Brute force interrupted by user (Ctrl+C). Shutting down gracefully...{CLEAR_LINE}")
executor.shutdown(wait=False, cancel_futures=True)
exit(1)
finally:
stop_event.set()
anim_thread.join()
raise OffsetNotFoundError("Could not find the correct offsets for decryption.")
@@ -305,7 +337,7 @@ def decrypt_backup(
main_key, hex_key = _derive_main_enc_key(key)
if show_crypt15:
hex_key_str = ' '.join([hex_key.hex()[c:c+4] for c in range(0, len(hex_key.hex()), 4)])
print(f"The HEX key of the crypt15 backup is: {hex_key_str}")
logger.info(f"The HEX key of the crypt15 backup is: {hex_key_str}{CLEAR_LINE}")
else:
main_key = key[126:]