From b2bcf366223d43ff9a3220a3445cce5f1aec3cdc Mon Sep 17 00:00:00 2001 From: KnugiHK <24708955+KnugiHK@users.noreply.github.com> Date: Wed, 26 Feb 2025 21:20:11 +0800 Subject: [PATCH] Move Android backup decryption to a standalone module --- Whatsapp_Chat_Exporter/__main__.py | 25 +-- Whatsapp_Chat_Exporter/android_crypt.py | 211 ++++++++++++++++++++++ Whatsapp_Chat_Exporter/android_handler.py | 197 +------------------- Whatsapp_Chat_Exporter/utility.py | 6 - 4 files changed, 228 insertions(+), 211 deletions(-) create mode 100644 Whatsapp_Chat_Exporter/android_crypt.py diff --git a/Whatsapp_Chat_Exporter/__main__.py b/Whatsapp_Chat_Exporter/__main__.py index 611df91..9fc9378 100644 --- a/Whatsapp_Chat_Exporter/__main__.py +++ b/Whatsapp_Chat_Exporter/__main__.py @@ -7,6 +7,17 @@ import shutil import json import string import glob +import importlib.metadata +from Whatsapp_Chat_Exporter import android_crypt, exported_handler, android_handler +from Whatsapp_Chat_Exporter import ios_handler, ios_media_handler +from Whatsapp_Chat_Exporter.data_model import ChatStore +from Whatsapp_Chat_Exporter.utility import APPLE_TIME, Crypt, check_update, DbType +from Whatsapp_Chat_Exporter.utility import readable_to_bytes, sanitize_filename +from Whatsapp_Chat_Exporter.utility import import_from_json, bytes_to_readable +from argparse import ArgumentParser, SUPPRESS +from datetime import datetime +from getpass import getpass +from sys import exit try: import vobject except ModuleNotFoundError: @@ -14,16 +25,6 @@ except ModuleNotFoundError: else: from Whatsapp_Chat_Exporter.vcards_contacts import ContactsFromVCards vcards_deps_installed = True -from Whatsapp_Chat_Exporter import exported_handler, android_handler -from Whatsapp_Chat_Exporter import ios_handler, ios_media_handler -from Whatsapp_Chat_Exporter.data_model import ChatStore -from Whatsapp_Chat_Exporter.utility import APPLE_TIME, Crypt, DbType, readable_to_bytes, check_update -from Whatsapp_Chat_Exporter.utility import import_from_json, sanitize_filename, bytes_to_readable -from argparse import ArgumentParser, SUPPRESS -from datetime import datetime -from getpass import getpass -from sys import exit -import importlib.metadata def main(): @@ -448,12 +449,12 @@ def main(): db = open(args.backup, "rb").read() if args.wab: wab = open(args.wab, "rb").read() - error_wa = android_handler.decrypt_backup(wab, key, contact_db, crypt, args.showkey, DbType.CONTACT, key_stream) + error_wa = android_crypt.decrypt_backup(wab, key, contact_db, crypt, args.showkey, DbType.CONTACT, key_stream) if isinstance(key, io.IOBase): key.seek(0) else: error_wa = 0 - error_message = android_handler.decrypt_backup(db, key, msg_db, crypt, args.showkey, DbType.MESSAGE, key_stream) + error_message = android_crypt.decrypt_backup(db, key, msg_db, crypt, args.showkey, DbType.MESSAGE, key_stream) if error_wa != 0: error = error_wa elif error_message != 0: diff --git a/Whatsapp_Chat_Exporter/android_crypt.py b/Whatsapp_Chat_Exporter/android_crypt.py new file mode 100644 index 0000000..45a807b --- /dev/null +++ b/Whatsapp_Chat_Exporter/android_crypt.py @@ -0,0 +1,211 @@ +import hmac +import io +import zlib +import javaobj +from typing import Tuple, Union +from hashlib import sha256 +from Whatsapp_Chat_Exporter.utility import CRYPT14_OFFSETS, Crypt, DbType +try: + import zlib + from Crypto.Cipher import AES +except ModuleNotFoundError: + support_backup = False +else: + support_backup = True +try: + import javaobj +except ModuleNotFoundError: + support_crypt15 = False +else: + support_crypt15 = True + + +def _derive_main_enc_key(key_stream: bytes) -> Tuple[bytes, bytes]: + """ + Derive the main encryption key for the given key stream. The key is derived using HMAC of HMAC of the provided key stream. + + Args: + key_stream (bytes): The key stream to generate HMAC of HMAC. + + Returns: + Tuple[bytes, bytes]: A tuple containing the main encryption key and the original key stream. + """ + key = hmac.new( + hmac.new( + b'\x00' * 32, + key_stream, + sha256 + ).digest(), + b"backup encryption\x01", + sha256 + ) + return key.digest(), key_stream + + +def _extract_enc_key(keyfile: bytes) -> Tuple[bytes, bytes]: + """ + Extract the encryption key from the keyfile. + + Args: + keyfile (bytes): The keyfile containing the encrypted key. + + Returns: + Tuple[bytes, bytes]: values from _derive_main_enc_key() + """ + key_stream = b"" + for byte in javaobj.loads(keyfile): + key_stream += byte.to_bytes(1, "big", signed=True) + + return _derive_main_enc_key(key_stream) + + +def brute_force_offset(max_iv: int = 200, max_db: int = 200): + """ + Brute force the offsets for IV and database start position in WhatsApp backup files. + Used when common offsets are not applicable to a backup file. + + Args: + max_iv (int, optional): Maximum value to try for IV offset. Defaults to 200. + max_db (int, optional): Maximum value to try for database start offset. Defaults to 200. + + Yields: + tuple: A tuple containing: + - int: Start position of IV + - int: End position of IV (start + 16) + - int: Start position of database + """ + for iv in range(0, max_iv): + for db in range(0, max_db): + yield iv, iv + 16, db + + +def decrypt_backup( + database: bytes, + key: Union[str, io.IOBase], + output: str = None, + crypt: Crypt = Crypt.CRYPT14, + show_crypt15: bool = False, + db_type: DbType = DbType.MESSAGE, + dry_run: bool = False, + key_stream: bool = False +) -> int: + """ + Decrypt the WhatsApp backup database. + + Args: + database (bytes): The encrypted database file. + key (str or io.IOBase): The key to decrypt the database. The key should either be a string (32 bytes hex key) or a file object (encryption key file). + key_stream (bool, optional): Whether the key is a key stream. False for hex key. True for key stream. + output (str, optional): The path to save the decrypted database. Defaults to None. When dry_run is True, this parameter is ignored. + crypt (Crypt, optional): The encryption version of the database. Defaults to Crypt.CRYPT14. + show_crypt15 (bool, optional): Whether to show the HEX key of the crypt15 backup. Defaults to False. + db_type (DbType, optional): The type of database (MESSAGE or CONTACT). Defaults to DbType.MESSAGE. + dry_run (bool, optional): Whether to perform a dry run without saving the decrypted database. Defaults to False. + + Returns: + int: The status code of the decryption process. + - 0: The decryption process was successful. + - 1: The decryption process failed because the necessary dependencies for backup decryption are not available. + - 2: The decryption process failed because the common offsets for the IV and database are not applicable, and the brute force attempt to find the correct offsets also failed. + - 3: The decryption process failed due to unknown error + """ + if not support_backup: + return 1 + if not dry_run and output is None: + ValueError("The path to the decrypted database must be specified unless dry_run is true.") + if isinstance(key, io.IOBase): + key = key.read() + if crypt is not Crypt.CRYPT15: + t1 = key[30:62] + if crypt is not Crypt.CRYPT15 and len(key) != 158: + raise ValueError("The key file must be 158 bytes") + # Determine the IV and database offsets + if crypt == Crypt.CRYPT14: + if len(database) < 191: + raise ValueError("The crypt14 file must be at least 191 bytes") + current_try = 0 + offsets = CRYPT14_OFFSETS[current_try] + t2 = database[15:47] + iv = database[offsets["iv"]:offsets["iv"] + 16] + db_ciphertext = database[offsets["db"]:] + elif crypt == Crypt.CRYPT12: + if len(database) < 67: + raise ValueError("The crypt12 file must be at least 67 bytes") + t2 = database[3:35] + iv = database[51:67] + db_ciphertext = database[67:-20] + elif crypt == Crypt.CRYPT15: + if not support_crypt15: + return 1 + if len(database) < 131: + raise ValueError("The crypt15 file must be at least 131 bytes") + t1 = t2 = None + if db_type == DbType.MESSAGE: + iv = database[8:24] + db_offset = database[0] + 2 # Skip protobuf + protobuf size and backup type + elif db_type == DbType.CONTACT: + iv = database[7:23] + db_offset = database[0] + 1 # Skip protobuf + protobuf size + db_ciphertext = database[db_offset:] + + if t1 != t2: + raise ValueError("The signature of key file and backup file mismatch") + + if crypt == Crypt.CRYPT15: + if key_stream: + main_key, hex_key = _extract_enc_key(key) + else: + main_key, hex_key = _derive_main_enc_key(key) + if show_crypt15: + hex_key = [hex_key.hex()[c:c+4] for c in range(0, len(hex_key.hex()), 4)] + print("The HEX key of the crypt15 backup is: " + ' '.join(hex_key)) + else: + main_key = key[126:] + decompressed = False + while not decompressed: + cipher = AES.new(main_key, AES.MODE_GCM, iv) + db_compressed = cipher.decrypt(db_ciphertext) + try: + db = zlib.decompress(db_compressed) + except zlib.error: + if crypt == Crypt.CRYPT14: + current_try += 1 + if current_try < len(CRYPT14_OFFSETS): + offsets = CRYPT14_OFFSETS[current_try] + iv = database[offsets["iv"]:offsets["iv"] + 16] + db_ciphertext = database[offsets["db"]:] + continue + else: + print("Common offsets are not applicable to " + "your backup. Trying to brute force it...") + for start_iv, end_iv, start_db in brute_force_offset(): + iv = database[start_iv:end_iv] + db_ciphertext = database[start_db:] + cipher = AES.new(main_key, AES.MODE_GCM, iv) + db_compressed = cipher.decrypt(db_ciphertext) + try: + db = zlib.decompress(db_compressed) + except zlib.error: + continue + else: + decompressed = True + print( + f"The offsets of your IV and database are {start_iv} and " + f"{start_db}, respectively. To include your offsets in the " + "program, please report it by creating an issue on GitHub: " + "https://github.com/KnugiHK/Whatsapp-Chat-Exporter/discussions/47" + ) + break + if not decompressed: + return 2 + else: + return 3 + else: + decompressed = True + if db[0:6].upper() == b"SQLITE": + if not dry_run: + with open(output, "wb") as f: + f.write(db) + return 0 + else: + raise ValueError("The plaintext is not a SQLite database. Did you use the key to encrypt something...") diff --git a/Whatsapp_Chat_Exporter/android_handler.py b/Whatsapp_Chat_Exporter/android_handler.py index c185034..30284a0 100644 --- a/Whatsapp_Chat_Exporter/android_handler.py +++ b/Whatsapp_Chat_Exporter/android_handler.py @@ -2,206 +2,17 @@ import sqlite3 import os -import io -import hmac import shutil from pathlib import Path from mimetypes import MimeTypes -from typing import Tuple, Union from markupsafe import escape as htmle -from hashlib import sha256 from base64 import b64decode, b64encode from datetime import datetime from Whatsapp_Chat_Exporter.data_model import ChatStore, Message -from Whatsapp_Chat_Exporter.utility import CURRENT_TZ_OFFSET, MAX_SIZE, ROW_SIZE, DbType, convert_time_unit, determine_metadata, get_cond_for_empty -from Whatsapp_Chat_Exporter.utility import rendering, Crypt, Device, get_file_name, setup_template -from Whatsapp_Chat_Exporter.utility import brute_force_offset, CRYPT14_OFFSETS, get_status_location -from Whatsapp_Chat_Exporter.utility import get_chat_condition, slugify, bytes_to_readable, JidType - -try: - import zlib - from Crypto.Cipher import AES -except ModuleNotFoundError: - support_backup = False -else: - support_backup = True -try: - import javaobj -except ModuleNotFoundError: - support_crypt15 = False -else: - support_crypt15 = True - - -def _derive_main_enc_key(key_stream: bytes) -> Tuple[bytes, bytes]: - """ - Derive the main encryption key for the given key stream. The key is derived using HMAC of HMAC of the provided key stream. - - Args: - key_stream (bytes): The key stream to generate HMAC of HMAC. - - Returns: - Tuple[bytes, bytes]: A tuple containing the main encryption key and the original key stream. - """ - key = hmac.new( - hmac.new( - b'\x00' * 32, - key_stream, - sha256 - ).digest(), - b"backup encryption\x01", - sha256 - ) - return key.digest(), key_stream - - -def _extract_enc_key(keyfile: bytes) -> Tuple[bytes, bytes]: - """ - Extract the encryption key from the keyfile. - - Args: - keyfile (bytes): The keyfile containing the encrypted key. - - Returns: - Tuple[bytes, bytes]: values from _derive_main_enc_key() - """ - key_stream = b"" - for byte in javaobj.loads(keyfile): - key_stream += byte.to_bytes(1, "big", signed=True) - - return _derive_main_enc_key(key_stream) - - -def decrypt_backup( - database: bytes, - key: Union[str, io.IOBase], - output: str = None, - crypt: Crypt = Crypt.CRYPT14, - show_crypt15: bool = False, - db_type: DbType = DbType.MESSAGE, - dry_run: bool = False, - key_stream: bool = False -) -> int: - """ - Decrypt the WhatsApp backup database. - - Args: - database (bytes): The encrypted database file. - key (str or io.IOBase): The key to decrypt the database. The key should either be a string (32 bytes hex key) or a file object (encryption key file). - key_stream (bool, optional): Whether the key is a key stream. False for hex key. True for key stream. - output (str, optional): The path to save the decrypted database. Defaults to None. When dry_run is True, this parameter is ignored. - crypt (Crypt, optional): The encryption version of the database. Defaults to Crypt.CRYPT14. - show_crypt15 (bool, optional): Whether to show the HEX key of the crypt15 backup. Defaults to False. - db_type (DbType, optional): The type of database (MESSAGE or CONTACT). Defaults to DbType.MESSAGE. - dry_run (bool, optional): Whether to perform a dry run without saving the decrypted database. Defaults to False. - - Returns: - int: The status code of the decryption process. - - 0: The decryption process was successful. - - 1: The decryption process failed because the necessary dependencies for backup decryption are not available. - - 2: The decryption process failed because the common offsets for the IV and database are not applicable, and the brute force attempt to find the correct offsets also failed. - - 3: The decryption process failed due to unknown error - """ - if not support_backup: - return 1 - if not dry_run and output is None: - ValueError("The path to the decrypted database must be specified unless dry_run is true.") - if isinstance(key, io.IOBase): - key = key.read() - if crypt is not Crypt.CRYPT15: - t1 = key[30:62] - if crypt is not Crypt.CRYPT15 and len(key) != 158: - raise ValueError("The key file must be 158 bytes") - # Determine the IV and database offsets - if crypt == Crypt.CRYPT14: - if len(database) < 191: - raise ValueError("The crypt14 file must be at least 191 bytes") - current_try = 0 - offsets = CRYPT14_OFFSETS[current_try] - t2 = database[15:47] - iv = database[offsets["iv"]:offsets["iv"] + 16] - db_ciphertext = database[offsets["db"]:] - elif crypt == Crypt.CRYPT12: - if len(database) < 67: - raise ValueError("The crypt12 file must be at least 67 bytes") - t2 = database[3:35] - iv = database[51:67] - db_ciphertext = database[67:-20] - elif crypt == Crypt.CRYPT15: - if not support_crypt15: - return 1 - if len(database) < 131: - raise ValueError("The crypt15 file must be at least 131 bytes") - t1 = t2 = None - if db_type == DbType.MESSAGE: - iv = database[8:24] - db_offset = database[0] + 2 # Skip protobuf + protobuf size and backup type - elif db_type == DbType.CONTACT: - iv = database[7:23] - db_offset = database[0] + 1 # Skip protobuf + protobuf size - db_ciphertext = database[db_offset:] - - if t1 != t2: - raise ValueError("The signature of key file and backup file mismatch") - - if crypt == Crypt.CRYPT15: - if key_stream: - main_key, hex_key = _extract_enc_key(key) - else: - main_key, hex_key = _derive_main_enc_key(key) - if show_crypt15: - hex_key = [hex_key.hex()[c:c+4] for c in range(0, len(hex_key.hex()), 4)] - print("The HEX key of the crypt15 backup is: " + ' '.join(hex_key)) - else: - main_key = key[126:] - decompressed = False - while not decompressed: - cipher = AES.new(main_key, AES.MODE_GCM, iv) - db_compressed = cipher.decrypt(db_ciphertext) - try: - db = zlib.decompress(db_compressed) - except zlib.error: - if crypt == Crypt.CRYPT14: - current_try += 1 - if current_try < len(CRYPT14_OFFSETS): - offsets = CRYPT14_OFFSETS[current_try] - iv = database[offsets["iv"]:offsets["iv"] + 16] - db_ciphertext = database[offsets["db"]:] - continue - else: - print("Common offsets are not applicable to " - "your backup. Trying to brute force it...") - for start_iv, end_iv, start_db in brute_force_offset(): - iv = database[start_iv:end_iv] - db_ciphertext = database[start_db:] - cipher = AES.new(main_key, AES.MODE_GCM, iv) - db_compressed = cipher.decrypt(db_ciphertext) - try: - db = zlib.decompress(db_compressed) - except zlib.error: - continue - else: - decompressed = True - print( - f"The offsets of your IV and database are {start_iv} and " - f"{start_db}, respectively. To include your offsets in the " - "program, please report it by creating an issue on GitHub: " - "https://github.com/KnugiHK/Whatsapp-Chat-Exporter/discussions/47" - ) - break - if not decompressed: - return 2 - else: - return 3 - else: - decompressed = True - if db[0:6].upper() == b"SQLITE": - if not dry_run: - with open(output, "wb") as f: - f.write(db) - return 0 - else: - raise ValueError("The plaintext is not a SQLite database. Did you use the key to encrypt something...") +from Whatsapp_Chat_Exporter.utility import CURRENT_TZ_OFFSET, MAX_SIZE, ROW_SIZE, JidType, Device +from Whatsapp_Chat_Exporter.utility import rendering, get_file_name, setup_template, get_cond_for_empty +from Whatsapp_Chat_Exporter.utility import get_status_location, convert_time_unit, determine_metadata +from Whatsapp_Chat_Exporter.utility import get_chat_condition, slugify, bytes_to_readable def contacts(db, data, enrich_from_vcards): diff --git a/Whatsapp_Chat_Exporter/utility.py b/Whatsapp_Chat_Exporter/utility.py index 259c318..f51f21d 100644 --- a/Whatsapp_Chat_Exporter/utility.py +++ b/Whatsapp_Chat_Exporter/utility.py @@ -279,12 +279,6 @@ class DbType(StrEnum): CONTACT = "contact" -def brute_force_offset(max_iv=200, max_db=200): - for iv in range(0, max_iv): - for db in range(0, max_db): - yield iv, iv + 16, db - - def determine_metadata(content, init_msg): msg = init_msg if init_msg else "" if content["is_me_joined"] == 1: # Override