From 2944d00ca29296a67a533e5744e9d7794b69995d Mon Sep 17 00:00:00 2001 From: KnugiHK <24708955+KnugiHK@users.noreply.github.com> Date: Tue, 15 Aug 2023 17:40:20 +0800 Subject: [PATCH] Refactoring so that no file needs to be introduced --- Whatsapp_Chat_Exporter/__main__.py | 21 ++- .../extract_iphone_media.py | 20 +-- .../extract_iphone_media_smb.py | 126 ------------------ Whatsapp_Chat_Exporter/utility.py | 12 ++ 4 files changed, 32 insertions(+), 147 deletions(-) delete mode 100644 Whatsapp_Chat_Exporter/extract_iphone_media_smb.py diff --git a/Whatsapp_Chat_Exporter/__main__.py b/Whatsapp_Chat_Exporter/__main__.py index 45f708a..c20d8bd 100644 --- a/Whatsapp_Chat_Exporter/__main__.py +++ b/Whatsapp_Chat_Exporter/__main__.py @@ -7,7 +7,7 @@ import json import string import glob from Whatsapp_Chat_Exporter import extract_exported, extract_iphone -from Whatsapp_Chat_Exporter import extract, extract_iphone_media, extract_iphone_media_smb +from Whatsapp_Chat_Exporter import extract, extract_iphone_media from Whatsapp_Chat_Exporter.data_model import ChatStore from Whatsapp_Chat_Exporter.utility import Crypt, check_update, import_from_json from argparse import ArgumentParser, SUPPRESS @@ -274,24 +274,19 @@ def main(): media = extract_iphone.media vcard = extract_iphone.vcard create_html = extract.create_html + if args.business: + from Whatsapp_Chat_Exporter.utility import WhatsAppBusinessIdentifier as identifiers + else: + from Whatsapp_Chat_Exporter.utility import WhatsAppIdentifier as identifiers if args.media is None: - if args.smb: - args.media = "AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared" - else: - args.media = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared" + args.media = identifiers.DOMAIN if args.backup is not None: if not os.path.isdir(args.media): - if args.smb: - extract_iphone_media_smb.extract_media(args.backup) - else: - extract_iphone_media.extract_media(args.backup) + extract_iphone_media.extract_media(args.backup, identifiers) else: print("WhatsApp directory already exists, skipping WhatsApp file extraction.") if args.db is None: - if args.smb: - msg_db = "724bd3b98b18518b455a87c1f3ac3a0d189c4466" - else: - msg_db = "7c7fba66680ef796b916b067077cc246adacf01d" + msg_db = identifiers.MESSAGE else: msg_db = args.db if args.wa is None: diff --git a/Whatsapp_Chat_Exporter/extract_iphone_media.py b/Whatsapp_Chat_Exporter/extract_iphone_media.py index b293ea7..250d4d6 100644 --- a/Whatsapp_Chat_Exporter/extract_iphone_media.py +++ b/Whatsapp_Chat_Exporter/extract_iphone_media.py @@ -6,6 +6,7 @@ import os import time import getpass import threading +from Whatsapp_Chat_Exporter.utility import WhatsAppIdentifier try: from iphone_backup_decrypt import EncryptedBackup, RelativePath from iphone_backup_decrypt import FailedToDecryptError, Domain @@ -15,7 +16,7 @@ else: support_encrypted = True -def extract_encrypted(base_dir, password): +def extract_encrypted(base_dir, password, identifiers): backup = EncryptedBackup(backup_directory=base_dir, passphrase=password, cleanup=False, check_same_thread=False) print("Decrypting WhatsApp database...") try: @@ -61,7 +62,7 @@ def is_encrypted(base_dir): return False -def extract_media(base_dir): +def extract_media(base_dir, identifiers): if is_encrypted(base_dir): if not support_encrypted: print("You don't have the dependencies to handle encrypted backup.") @@ -70,20 +71,23 @@ def extract_media(base_dir): return False print("Encryption detected on the backup!") password = getpass.getpass("Enter the password for the backup:") - extract_encrypted(base_dir, password) + extract_encrypted(base_dir, password, identifiers) else: - wts_db = os.path.join(base_dir, "7c/7c7fba66680ef796b916b067077cc246adacf01d") - contact_db = os.path.join(base_dir, "b8/b8548dc30aa1030df0ce18ef08b882cf7ab5212f") + wts_db = os.path.join(base_dir, identifiers.MESSAGE[:2], identifiers.MESSAGE) + contact_db = os.path.join(base_dir, identifiers.CONTACT[:2], identifiers.CONTACT) if not os.path.isfile(wts_db): - print("WhatsApp database not found.") + if identifiers is WhatsAppIdentifier: + print("WhatsApp database not found.") + else: + print("WhatsApp Business database not found.") exit() else: - shutil.copyfile(wts_db, "7c7fba66680ef796b916b067077cc246adacf01d") + shutil.copyfile(wts_db, identifiers.MESSAGE) if not os.path.isfile(contact_db): print("Contact database not found.") exit() else: - shutil.copyfile(contact_db, "b8548dc30aa1030df0ce18ef08b882cf7ab5212f") + shutil.copyfile(contact_db, identifiers.CONTACT) _wts_id = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared" with sqlite3.connect(os.path.join(base_dir, "Manifest.db")) as manifest: manifest.row_factory = sqlite3.Row diff --git a/Whatsapp_Chat_Exporter/extract_iphone_media_smb.py b/Whatsapp_Chat_Exporter/extract_iphone_media_smb.py deleted file mode 100644 index 459344a..0000000 --- a/Whatsapp_Chat_Exporter/extract_iphone_media_smb.py +++ /dev/null @@ -1,126 +0,0 @@ -#!/usr/bin/python3 - -import shutil -import sqlite3 -import os -import time -import getpass -import threading -try: - from iphone_backup_decrypt import EncryptedBackup, RelativePath - from iphone_backup_decrypt import FailedToDecryptError, Domain -except ModuleNotFoundError: - support_encrypted = False -else: - support_encrypted = True - - -def extract_encrypted(base_dir, password): - backup = EncryptedBackup(backup_directory=base_dir, passphrase=password, cleanup=False, check_same_thread=False) - print("Decrypting WhatsApp database...") - try: - backup.extract_file(relative_path=RelativePath.WHATSAPP_MESSAGES, - output_filename="724bd3b98b18518b455a87c1f3ac3a0d189c4466") - backup.extract_file(relative_path=RelativePath.WHATSAPP_CONTACTS, - output_filename="d7246a707f51ddf8b17ee2dddabd9e0a4da5c552") - except FailedToDecryptError: - print("Failed to decrypt backup: incorrect password?") - exit() - extract_thread = threading.Thread( - target=backup.extract_files_by_domain, - args=(Domain.WHATSAPP, Domain.WHATSAPP) - ) - extract_thread.daemon = True - extract_thread.start() - dot = 0 - while extract_thread.is_alive(): - print(f"Decrypting and extracting files{'.' * dot}{' ' * (3 - dot)}", end="\r") - if dot < 3: - dot += 1 - time.sleep(0.5) - else: - dot = 0 - time.sleep(0.4) - print(f"All required files decrypted and extracted.", end="\n") - extract_thread.handled = True - return backup - - -def is_encrypted(base_dir): - with sqlite3.connect(os.path.join(base_dir, "Manifest.db")) as f: - c = f.cursor() - try: - c.execute("""SELECT count() - FROM Files - """) - except sqlite3.OperationalError as e: - raise e # These error cannot be used to determine if the backup is encrypted - except sqlite3.DatabaseError: - return True - else: - return False - - -def extract_media(base_dir): - if is_encrypted(base_dir): - if not support_encrypted: - print("You don't have the dependencies to handle encrypted backup.") - print("Read more on how to deal with encrypted backup:") - print("https://github.com/KnugiHK/Whatsapp-Chat-Exporter/blob/main/README.md#usage") - return False - print("Encryption detected on the backup!") - password = getpass.getpass("Enter the password for the backup:") - extract_encrypted(base_dir, password) - else: - wts_db = os.path.join(base_dir, "72/724bd3b98b18518b455a87c1f3ac3a0d189c4466") - contact_db = os.path.join(base_dir, "d7/d7246a707f51ddf8b17ee2dddabd9e0a4da5c552") - if not os.path.isfile(wts_db): - print("WhatsApp database not found.") - exit() - else: - shutil.copyfile(wts_db, "724bd3b98b18518b455a87c1f3ac3a0d189c4466") - if not os.path.isfile(contact_db): - print("Contact database not found.") - exit() - else: - shutil.copyfile(contact_db, "d7246a707f51ddf8b17ee2dddabd9e0a4da5c552") - _wts_id = "AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared" - with sqlite3.connect(os.path.join(base_dir, "Manifest.db")) as manifest: - manifest.row_factory = sqlite3.Row - c = manifest.cursor() - c.execute( - f"""SELECT count() - FROM Files - WHERE domain = '{_wts_id}'""" - ) - total_row_number = c.fetchone()[0] - print(f"Extracting WhatsApp files...(0/{total_row_number})", end="\r") - c.execute(f"""SELECT fileID, - relativePath, - flags, - ROW_NUMBER() OVER(ORDER BY relativePath) AS _index - FROM Files - WHERE domain = '{_wts_id}' - ORDER BY relativePath""") - if not os.path.isdir(_wts_id): - os.mkdir(_wts_id) - row = c.fetchone() - while row is not None: - if row["relativePath"] == "": - row = c.fetchone() - continue - destination = os.path.join(_wts_id, row["relativePath"]) - hashes = row["fileID"] - folder = hashes[:2] - flags = row["flags"] - if flags == 2: - try: - os.mkdir(destination) - except FileExistsError: - pass - elif flags == 1: - shutil.copyfile(os.path.join(base_dir, folder, hashes), destination) - if row["_index"] % 100 == 0: - print(f"Extracting WhatsApp files...({row['_index']}/{total_row_number})", end="\r") - row = c.fetchone() - print(f"Extracting WhatsApp files...({total_row_number}/{total_row_number})", end="\n") diff --git a/Whatsapp_Chat_Exporter/utility.py b/Whatsapp_Chat_Exporter/utility.py index 62350d0..26f1df1 100644 --- a/Whatsapp_Chat_Exporter/utility.py +++ b/Whatsapp_Chat_Exporter/utility.py @@ -290,3 +290,15 @@ def setup_template(template, no_avatar): # iOS Specific APPLE_TIME = datetime.timestamp(datetime(2001, 1, 1)) + + +class WhatsAppIdentifier(StrEnum): + MESSAGE = "7c7fba66680ef796b916b067077cc246adacf01d" + CONTACT = "b8548dc30aa1030df0ce18ef08b882cf7ab5212f" + DOMAIN = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared" + + +class WhatsAppBusinessIdentifier(StrEnum): + MESSAGE = "724bd3b98b18518b455a87c1f3ac3a0d189c4466" + CONTACT = "d7246a707f51ddf8b17ee2dddabd9e0a4da5c552" + DOMAIN = "AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared"