diff --git a/Whatsapp_Chat_Exporter/android_crypt.py b/Whatsapp_Chat_Exporter/android_crypt.py index 40a6a38..8e83ed5 100644 --- a/Whatsapp_Chat_Exporter/android_crypt.py +++ b/Whatsapp_Chat_Exporter/android_crypt.py @@ -1,10 +1,11 @@ import hmac import io import zlib -import javaobj +import concurrent.futures from typing import Tuple, Union from hashlib import sha256 from Whatsapp_Chat_Exporter.utility import CRYPT14_OFFSETS, Crypt, DbType + try: import zlib from Crypto.Cipher import AES @@ -12,6 +13,7 @@ except ModuleNotFoundError: support_backup = False else: support_backup = True + try: import javaobj except ModuleNotFoundError: @@ -20,9 +22,29 @@ else: support_crypt15 = True +class DecryptionError(Exception): + """Base class for decryption-related exceptions.""" + pass + + +class InvalidKeyError(DecryptionError): + """Raised when the provided key is invalid.""" + pass + + +class InvalidFileFormatError(DecryptionError): + """Raised when the input file format is invalid.""" + pass + + +class OffsetNotFoundError(DecryptionError): + """Raised when the correct offsets for decryption cannot be found.""" + pass + + def _derive_main_enc_key(key_stream: bytes) -> Tuple[bytes, bytes]: """ - Derive the main encryption key for the given key stream. The key is derived using HMAC of HMAC of the provided key stream. + Derive the main encryption key for the given key stream. Args: key_stream (bytes): The key stream to generate HMAC of HMAC. @@ -30,16 +52,9 @@ def _derive_main_enc_key(key_stream: bytes) -> Tuple[bytes, bytes]: Returns: Tuple[bytes, bytes]: A tuple containing the main encryption key and the original key stream. """ - key = hmac.new( - hmac.new( - b'\x00' * 32, - key_stream, - sha256 - ).digest(), - b"backup encryption\x01", - sha256 - ) - return key.digest(), key_stream + intermediate_hmac = hmac.new(b'\x00' * 32, key_stream, sha256).digest() + key = hmac.new(intermediate_hmac, b"backup encryption\x01", sha256).digest() + return key, key_stream def _extract_enc_key(keyfile: bytes) -> Tuple[bytes, bytes]: @@ -52,17 +67,13 @@ def _extract_enc_key(keyfile: bytes) -> Tuple[bytes, bytes]: Returns: Tuple[bytes, bytes]: values from _derive_main_enc_key() """ - key_stream = b"" - for byte in javaobj.loads(keyfile): - key_stream += byte.to_bytes(1, "big", signed=True) - + key_stream = b''.join([byte.to_bytes(1, "big", signed=True) for byte in javaobj.loads(keyfile)]) return _derive_main_enc_key(key_stream) def brute_force_offset(max_iv: int = 200, max_db: int = 200): """ Brute force the offsets for IV and database start position in WhatsApp backup files. - Used when common offsets are not applicable to a backup file. Args: max_iv (int, optional): Maximum value to try for IV offset. Defaults to 200. @@ -79,6 +90,154 @@ def brute_force_offset(max_iv: int = 200, max_db: int = 200): yield iv, iv + 16, db +def _decrypt_database(db_ciphertext: bytes, main_key: bytes, iv: bytes) -> bytes: + """Decrypt and decompress a database chunk. + + Args: + db_ciphertext (bytes): The encrypted chunk of the database. + main_key (bytes): The main decryption key. + iv (bytes): The initialization vector. + + Returns: + bytes: The decrypted and decompressed database. + + Raises: + zlib.error: If decompression fails. + ValueError: if the plaintext is not a SQLite database. + """ + cipher = AES.new(main_key, AES.MODE_GCM, iv) + db_compressed = cipher.decrypt(db_ciphertext) + db = zlib.decompress(db_compressed) + if db[0:6].upper() != b"SQLITE": + raise ValueError( + "The plaintext is not a SQLite database. Ensure you are using the correct key." + ) + return db + +def _decrypt_crypt14(database: bytes, main_key: bytes) -> bytes: + """Decrypt a crypt14 database using multithreading for brute-force offset detection. + + Args: + database (bytes): The encrypted database. + main_key (bytes): The decryption key. + + Returns: + bytes: The decrypted database. + + Raises: + InvalidFileFormatError: If the file is too small. + OffsetNotFoundError: If no valid offsets are found. + """ + if len(database) < 191: + raise InvalidFileFormatError("The crypt14 file must be at least 191 bytes") + + # Attempt known offsets first + for offsets in CRYPT14_OFFSETS: + iv = database[offsets["iv"]:offsets["iv"] + 16] + db_ciphertext = database[offsets["db"]:] + try: + return _decrypt_database(db_ciphertext, main_key, iv) + except (zlib.error, ValueError): + pass # Try next offset + + print("Common offsets failed. Initiating brute-force with multithreading...") + + # Convert brute force generator into a list for parallel processing + offset_combinations = list(brute_force_offset()) + + def attempt_decrypt(offset_tuple): + """Attempt decryption with the given offsets.""" + start_iv, end_iv, start_db = offset_tuple + iv = database[start_iv:end_iv] + db_ciphertext = database[start_db:] + + try: + db = _decrypt_database(db_ciphertext, main_key, iv) + print( + f"The offsets of your IV and database are {start_iv} and " + f"{start_db}, respectively. To include your offsets in the " + "program, please report it by creating an issue on GitHub: " + "https://github.com/KnugiHK/Whatsapp-Chat-Exporter/discussions/47" + "\nShutting down other threads..." + ) + return db + except (zlib.error, ValueError): + return None # Decryption failed, move to next + + with concurrent.futures.ThreadPoolExecutor(10) as executor: + future_to_offset = {executor.submit(attempt_decrypt, offset): offset for offset in offset_combinations} + + try: + for future in concurrent.futures.as_completed(future_to_offset): + result = future.result() + if result is not None: + # Shutdown remaining threads + executor.shutdown(wait=False, cancel_futures=True) + return result + + except KeyboardInterrupt: + print("\nBrute force interrupted by user (Ctrl+C). Exiting gracefully...") + executor.shutdown(wait=False, cancel_futures=True) + exit(1) + + raise OffsetNotFoundError("Could not find the correct offsets for decryption.") + + + +def _decrypt_crypt12(database: bytes, main_key: bytes) -> bytes: + """Decrypt a crypt12 database. + + Args: + database (bytes): The encrypted database. + main_key (bytes): The decryption key. + + Returns: + bytes: The decrypted database. + + Raises: + ValueError: If the file format is invalid or the signature mismatches. + """ + if len(database) < 67: + raise InvalidFileFormatError("The crypt12 file must be at least 67 bytes") + + t2 = database[3:35] + iv = database[51:67] + db_ciphertext = database[67:-20] + return _decrypt_database(db_ciphertext, main_key, iv) + + +def _decrypt_crypt15(database: bytes, main_key: bytes, db_type: DbType) -> bytes: + """Decrypt a crypt15 database. + + Args: + database (bytes): The encrypted database. + main_key (bytes): The decryption key. + db_type (DbType): The type of database. + + Returns: + bytes: The decrypted database. + + Raises: + ValueError: If the file format is invalid or the signature mismatches. + """ + if not support_crypt15: + raise RuntimeError("Crypt15 is not supported") + if len(database) < 131: + raise InvalidFileFormatError("The crypt15 file must be at least 131 bytes") + + if db_type == DbType.MESSAGE: + iv = database[8:24] + db_offset = database[0] + 2 + elif db_type == DbType.CONTACT: + iv = database[7:23] + db_offset = database[0] + 1 + else: + raise ValueError(f"Invalid db_type: {db_type}") + + db_ciphertext = database[db_offset:] + return _decrypt_database(db_ciphertext, main_key, iv) + + def decrypt_backup( database: bytes, key: Union[str, io.IOBase], @@ -95,62 +254,46 @@ def decrypt_backup( Args: database (bytes): The encrypted database file. - key (str or io.IOBase): The key to decrypt the database. The key should either be a string (32 bytes hex key) or a file object (encryption key file). - key_stream (bool, optional): Whether the key is a key stream. False for hex key. True for key stream. - output (str, optional): The path to save the decrypted database. Defaults to None. When dry_run is True, this parameter is ignored. + key (str or io.IOBase): The key to decrypt the database. + output (str, optional): The path to save the decrypted database. Defaults to None. crypt (Crypt, optional): The encryption version of the database. Defaults to Crypt.CRYPT14. show_crypt15 (bool, optional): Whether to show the HEX key of the crypt15 backup. Defaults to False. db_type (DbType, optional): The type of database (MESSAGE or CONTACT). Defaults to DbType.MESSAGE. - dry_run (bool, optional): Whether to perform a dry run without saving the decrypted database. Defaults to False. + dry_run (bool, optional): Whether to perform a dry run. Defaults to False. + keyfile_stream (bool, optional): Whether the key is a key stream. Defaults to False. Returns: - int: The status code of the decryption process. - - 0: The decryption process was successful. - - 1: The decryption process failed because the necessary dependencies for backup decryption are not available. - - 2: The decryption process failed because the common offsets for the IV and database are not applicable, and the brute force attempt to find the correct offsets also failed. - - 3: The decryption process failed due to unknown error + int: The status code of the decryption process (0 for success). + + Raises: + ValueError: If the key is invalid or output file not provided when dry_run is False. + DecryptionError: for errors during decryption + RuntimeError: for dependency errors """ if not support_backup: - return 1 + raise RuntimeError("Dependencies for backup decryption are not available.") + if not dry_run and output is None: - ValueError("The path to the decrypted database must be specified unless dry_run is true.") + raise ValueError( + "The path to the decrypted database must be specified unless dry_run is true." + ) + if isinstance(key, io.IOBase): key = key.read() - if crypt is not Crypt.CRYPT15: - t1 = key[30:62] - if crypt is not Crypt.CRYPT15 and len(key) != 158: - raise ValueError("The key file must be 158 bytes") - # Determine the IV and database offsets - if crypt == Crypt.CRYPT14: - if len(database) < 191: - raise ValueError("The crypt14 file must be at least 191 bytes") - current_try = 0 - offsets = CRYPT14_OFFSETS[current_try] - t2 = database[15:47] - iv = database[offsets["iv"]:offsets["iv"] + 16] - db_ciphertext = database[offsets["db"]:] - elif crypt == Crypt.CRYPT12: - if len(database) < 67: - raise ValueError("The crypt12 file must be at least 67 bytes") - t2 = database[3:35] - iv = database[51:67] - db_ciphertext = database[67:-20] - elif crypt == Crypt.CRYPT15: - if not support_crypt15: - return 1 - if len(database) < 131: - raise ValueError("The crypt15 file must be at least 131 bytes") - t1 = t2 = None - if db_type == DbType.MESSAGE: - iv = database[8:24] - db_offset = database[0] + 2 # Skip protobuf + protobuf size and backup type - elif db_type == DbType.CONTACT: - iv = database[7:23] - db_offset = database[0] + 1 # Skip protobuf + protobuf size - db_ciphertext = database[db_offset:] - if t1 != t2: - raise ValueError("The signature of key file and backup file mismatch") + if crypt is not Crypt.CRYPT15 and len(key) != 158: + raise InvalidKeyError("The key file must be 158 bytes") + + #signature check, this is check is used in crypt 12 and 14 + if crypt != Crypt.CRYPT15: + t1 = key[30:62] + + if t1 != database[15:47] and crypt == Crypt.CRYPT14: + raise ValueError("The signature of key file and backup file mismatch") + + if t1 != database[3:35] and crypt == Crypt.CRYPT12: + raise ValueError("The signature of key file and backup file mismatch") + if crypt == Crypt.CRYPT15: if keyfile_stream: @@ -158,55 +301,25 @@ def decrypt_backup( else: main_key, hex_key = _derive_main_enc_key(key) if show_crypt15: - hex_key = [hex_key.hex()[c:c+4] for c in range(0, len(hex_key.hex()), 4)] - print("The HEX key of the crypt15 backup is: " + ' '.join(hex_key)) + hex_key_str = ' '.join([hex_key.hex()[c:c+4] for c in range(0, len(hex_key.hex()), 4)]) + print(f"The HEX key of the crypt15 backup is: {hex_key_str}") else: main_key = key[126:] - decompressed = False - while not decompressed: - cipher = AES.new(main_key, AES.MODE_GCM, iv) - db_compressed = cipher.decrypt(db_ciphertext) - try: - db = zlib.decompress(db_compressed) - except zlib.error: - if crypt == Crypt.CRYPT14: - current_try += 1 - if current_try < len(CRYPT14_OFFSETS): - offsets = CRYPT14_OFFSETS[current_try] - iv = database[offsets["iv"]:offsets["iv"] + 16] - db_ciphertext = database[offsets["db"]:] - continue - else: - print("Common offsets are not applicable to " - "your backup. Trying to brute force it...") - for start_iv, end_iv, start_db in brute_force_offset(): - iv = database[start_iv:end_iv] - db_ciphertext = database[start_db:] - cipher = AES.new(main_key, AES.MODE_GCM, iv) - db_compressed = cipher.decrypt(db_ciphertext) - try: - db = zlib.decompress(db_compressed) - except zlib.error: - continue - else: - decompressed = True - print( - f"The offsets of your IV and database are {start_iv} and " - f"{start_db}, respectively. To include your offsets in the " - "program, please report it by creating an issue on GitHub: " - "https://github.com/KnugiHK/Whatsapp-Chat-Exporter/discussions/47" - ) - break - if not decompressed: - return 2 - else: - return 3 + + try: + if crypt == Crypt.CRYPT14: + db = _decrypt_crypt14(database, main_key) + elif crypt == Crypt.CRYPT12: + db = _decrypt_crypt12(database, main_key) + elif crypt == Crypt.CRYPT15: + db = _decrypt_crypt15(database, main_key, db_type) else: - decompressed = True - if db[0:6].upper() == b"SQLITE": - if not dry_run: - with open(output, "wb") as f: - f.write(db) - return 0 - else: - raise ValueError("The plaintext is not a SQLite database. Did you use the key to encrypt something...") + raise ValueError(f"Unsupported crypt type: {crypt}") + except (InvalidFileFormatError, OffsetNotFoundError, ValueError) as e: + raise DecryptionError(f"Decryption failed: {e}") from e + + + if not dry_run: + with open(output, "wb") as f: + f.write(db) + return 0