From 6a67f72ff37200f0fa197be9017893f9e8297149 Mon Sep 17 00:00:00 2001 From: KnugiHK <24708955+KnugiHK@users.noreply.github.com> Date: Sun, 2 Mar 2025 14:57:35 +0800 Subject: [PATCH] Refactor ios_media_handler --- Whatsapp_Chat_Exporter/ios_media_handler.py | 254 ++++++++++++-------- 1 file changed, 160 insertions(+), 94 deletions(-) diff --git a/Whatsapp_Chat_Exporter/ios_media_handler.py b/Whatsapp_Chat_Exporter/ios_media_handler.py index dc817b6..317eb61 100644 --- a/Whatsapp_Chat_Exporter/ios_media_handler.py +++ b/Whatsapp_Chat_Exporter/ios_media_handler.py @@ -14,143 +14,209 @@ else: support_encrypted = True -def extract_encrypted(base_dir, password, identifiers, decrypt_chunk_size): - print("Trying to decrypt the iOS backup...", end="") - backup = EncryptedBackup( - backup_directory=base_dir, - passphrase=password, - cleanup=False, - check_same_thread=False, - decrypt_chunk_size=decrypt_chunk_size - ) - print("Done\nDecrypting WhatsApp database...", end="") - try: - backup.extract_file( - relative_path=RelativePath.WHATSAPP_MESSAGES, - domain_like=identifiers.DOMAIN, - output_filename=identifiers.MESSAGE - ) - backup.extract_file( - relative_path=RelativePath.WHATSAPP_CONTACTS, - domain_like=identifiers.DOMAIN, - output_filename=identifiers.CONTACT - ) - backup.extract_file( - relative_path=RelativePath.WHATSAPP_CALLS, - domain_like=identifiers.DOMAIN, - output_filename=identifiers.CALL - ) - except ValueError: - print("Failed to decrypt backup: incorrect password?") - exit(7) - except FileNotFoundError: - print("Essential WhatsApp files are missing from the iOS backup.") - exit(6) - else: - print("Done") +class BackupExtractor: + """ + A class to handle the extraction of WhatsApp data from iOS backups, + including encrypted and unencrypted backups. + """ - def extract_progress_handler(file_id, domain, relative_path, n, total_files): - if n % 100 == 0: - print(f"Decrypting and extracting files...({n}/{total_files})", end="\r") - return True + def __init__(self, base_dir, identifiers, decrypt_chunk_size): + self.base_dir = base_dir + self.identifiers = identifiers + self.decrypt_chunk_size = decrypt_chunk_size - backup.extract_files( - domain_like=identifiers.DOMAIN, - output_folder=identifiers.DOMAIN, - preserve_folders=True, - filter_callback=extract_progress_handler - ) - print(f"All required files are decrypted and extracted. ", end="\n") - return backup - - -def is_encrypted(base_dir): - with sqlite3.connect(os.path.join(base_dir, "Manifest.db")) as f: - c = f.cursor() - try: - c.execute("""SELECT count() - FROM Files - """) - except sqlite3.OperationalError as e: - raise e # These error cannot be used to determine if the backup is encrypted - except sqlite3.DatabaseError: - return True + def extract(self): + """ + Extracts WhatsApp data from the backup based on whether it's encrypted or not. + """ + if self._is_encrypted(): + self._extract_encrypted_backup() else: - return False + self._extract_unencrypted_backup() + def _is_encrypted(self): + """ + Checks if the iOS backup is encrypted. -def extract_media(base_dir, identifiers, decrypt_chunk_size): - if is_encrypted(base_dir): + Returns: + bool: True if encrypted, False otherwise. + """ + with sqlite3.connect(os.path.join(self.base_dir, "Manifest.db")) as db: + c = db.cursor() + try: + c.execute("SELECT count() FROM Files") + c.fetchone() # Execute and fetch to trigger potential errors + except (sqlite3.OperationalError, sqlite3.DatabaseError): + return True + else: + return False + + def _extract_encrypted_backup(self): + """ + Handles the extraction of data from an encrypted iOS backup. + """ if not support_encrypted: print("You don't have the dependencies to handle encrypted backup.") print("Read more on how to deal with encrypted backup:") print("https://github.com/KnugiHK/Whatsapp-Chat-Exporter/blob/main/README.md#usage") - return False + return + print("Encryption detected on the backup!") password = getpass.getpass("Enter the password for the backup:") - extract_encrypted(base_dir, password, identifiers, decrypt_chunk_size) - else: - wts_db = os.path.join(base_dir, identifiers.MESSAGE[:2], identifiers.MESSAGE) - contact_db = os.path.join(base_dir, identifiers.CONTACT[:2], identifiers.CONTACT) - call_db = os.path.join(base_dir, identifiers.CALL[:2], identifiers.CALL) - if not os.path.isfile(wts_db): - if identifiers is WhatsAppIdentifier: + self._decrypt_backup(password) + self._extract_decrypted_files() + + def _decrypt_backup(self, password): + """ + Decrypts the iOS backup using the provided password. + + Args: + password (str): The password for the encrypted backup. + """ + print("Trying to decrypt the iOS backup...", end="") + self.backup = EncryptedBackup( + backup_directory=self.base_dir, + passphrase=password, + cleanup=False, + check_same_thread=False, + decrypt_chunk_size=self.decrypt_chunk_size, + ) + print("Done\nDecrypting WhatsApp database...", end="") + try: + self.backup.extract_file( + relative_path=RelativePath.WHATSAPP_MESSAGES, + domain_like=self.identifiers.DOMAIN, + output_filename=self.identifiers.MESSAGE, + ) + self.backup.extract_file( + relative_path=RelativePath.WHATSAPP_CONTACTS, + domain_like=self.identifiers.DOMAIN, + output_filename=self.identifiers.CONTACT, + ) + self.backup.extract_file( + relative_path=RelativePath.WHATSAPP_CALLS, + domain_like=self.identifiers.DOMAIN, + output_filename=self.identifiers.CALL, + ) + except ValueError: + print("Failed to decrypt backup: incorrect password?") + exit(7) + except FileNotFoundError: + print("Essential WhatsApp files are missing from the iOS backup.") + exit(6) + else: + print("Done") + + def _extract_decrypted_files(self): + """Extract all WhatsApp files after decryption""" + def extract_progress_handler(file_id, domain, relative_path, n, total_files): + if n % 100 == 0: + print(f"Decrypting and extracting files...({n}/{total_files})", end="\r") + return True + + self.backup.extract_files( + domain_like=self.identifiers.DOMAIN, + output_folder=self.identifiers.DOMAIN, + preserve_folders=True, + filter_callback=extract_progress_handler + ) + print(f"All required files are decrypted and extracted. ", end="\n") + + def _extract_unencrypted_backup(self): + """ + Handles the extraction of data from an unencrypted iOS backup. + """ + self._copy_whatsapp_databases() + self._extract_media_files() + + def _copy_whatsapp_databases(self): + """ + Copies the WhatsApp message, contact, and call databases to the working directory. + """ + wts_db_path = os.path.join(self.base_dir, self.identifiers.MESSAGE[:2], self.identifiers.MESSAGE) + contact_db_path = os.path.join(self.base_dir, self.identifiers.CONTACT[:2], self.identifiers.CONTACT) + call_db_path = os.path.join(self.base_dir, self.identifiers.CALL[:2], self.identifiers.CALL) + + if not os.path.isfile(wts_db_path): + if self.identifiers is WhatsAppIdentifier: print("WhatsApp database not found.") else: print("WhatsApp Business database not found.") exit() else: - shutil.copyfile(wts_db, identifiers.MESSAGE) - if not os.path.isfile(contact_db): + shutil.copyfile(wts_db_path, self.identifiers.MESSAGE) + + if not os.path.isfile(contact_db_path): print("Contact database not found. Skipping...") else: - shutil.copyfile(contact_db, identifiers.CONTACT) - if not os.path.isfile(call_db): + shutil.copyfile(contact_db_path, self.identifiers.CONTACT) + + if not os.path.isfile(call_db_path): print("Call database not found. Skipping...") else: - shutil.copyfile(call_db, identifiers.CALL) - _wts_id = identifiers.DOMAIN - with sqlite3.connect(os.path.join(base_dir, "Manifest.db")) as manifest: + shutil.copyfile(call_db_path, self.identifiers.CALL) + + def _extract_media_files(self): + """ + Extracts media files from the unencrypted backup. + """ + _wts_id = self.identifiers.DOMAIN + with sqlite3.connect(os.path.join(self.base_dir, "Manifest.db")) as manifest: manifest.row_factory = sqlite3.Row c = manifest.cursor() - c.execute( - f"""SELECT count() - FROM Files - WHERE domain = '{_wts_id}'""" - ) + c.execute(f"SELECT count() FROM Files WHERE domain = '{_wts_id}'") total_row_number = c.fetchone()[0] print(f"Extracting WhatsApp files...(0/{total_row_number})", end="\r") - c.execute(f"""SELECT fileID, - relativePath, - flags, - file AS metadata, - ROW_NUMBER() OVER(ORDER BY relativePath) AS _index - FROM Files - WHERE domain = '{_wts_id}' - ORDER BY relativePath""") + c.execute( + f""" + SELECT fileID, relativePath, flags, file AS metadata, + ROW_NUMBER() OVER(ORDER BY relativePath) AS _index + FROM Files + WHERE domain = '{_wts_id}' + ORDER BY relativePath + """ + ) if not os.path.isdir(_wts_id): os.mkdir(_wts_id) + row = c.fetchone() while row is not None: - if row["relativePath"] == "": + if not row["relativePath"]: # Skip empty relative paths row = c.fetchone() continue + destination = os.path.join(_wts_id, row["relativePath"]) hashes = row["fileID"] folder = hashes[:2] flags = row["flags"] - if flags == 2: + + if flags == 2: # Directory try: os.mkdir(destination) except FileExistsError: pass - elif flags == 1: - shutil.copyfile(os.path.join(base_dir, folder, hashes), destination) + elif flags == 1: # File + shutil.copyfile(os.path.join(self.base_dir, folder, hashes), destination) metadata = BPListReader(row["metadata"]).parse() creation = metadata["$objects"][1]["Birth"] modification = metadata["$objects"][1]["LastModified"] os.utime(destination, (modification, modification)) + if row["_index"] % 100 == 0: print(f"Extracting WhatsApp files...({row['_index']}/{total_row_number})", end="\r") row = c.fetchone() print(f"Extracting WhatsApp files...({total_row_number}/{total_row_number})", end="\n") + + +def extract_media(base_dir, identifiers, decrypt_chunk_size): + """ + Extracts WhatsApp data (media, messages, contacts, calls) from an iOS backup. + + Args: + base_dir (str): The path to the iOS backup directory. + identifiers (WhatsAppIdentifier): An object containing WhatsApp file identifiers. + decrypt_chunk_size (int): The chunk size for decryption. + """ + extractor = BackupExtractor(base_dir, identifiers, decrypt_chunk_size) + extractor.extract() +