mirror of
https://github.com/KnugiHK/WhatsApp-Chat-Exporter.git
synced 2026-02-03 08:09:09 +00:00
Refactor ios_media_handler
This commit is contained in:
@@ -14,143 +14,209 @@ else:
|
||||
support_encrypted = True
|
||||
|
||||
|
||||
def extract_encrypted(base_dir, password, identifiers, decrypt_chunk_size):
|
||||
print("Trying to decrypt the iOS backup...", end="")
|
||||
backup = EncryptedBackup(
|
||||
backup_directory=base_dir,
|
||||
passphrase=password,
|
||||
cleanup=False,
|
||||
check_same_thread=False,
|
||||
decrypt_chunk_size=decrypt_chunk_size
|
||||
)
|
||||
print("Done\nDecrypting WhatsApp database...", end="")
|
||||
try:
|
||||
backup.extract_file(
|
||||
relative_path=RelativePath.WHATSAPP_MESSAGES,
|
||||
domain_like=identifiers.DOMAIN,
|
||||
output_filename=identifiers.MESSAGE
|
||||
)
|
||||
backup.extract_file(
|
||||
relative_path=RelativePath.WHATSAPP_CONTACTS,
|
||||
domain_like=identifiers.DOMAIN,
|
||||
output_filename=identifiers.CONTACT
|
||||
)
|
||||
backup.extract_file(
|
||||
relative_path=RelativePath.WHATSAPP_CALLS,
|
||||
domain_like=identifiers.DOMAIN,
|
||||
output_filename=identifiers.CALL
|
||||
)
|
||||
except ValueError:
|
||||
print("Failed to decrypt backup: incorrect password?")
|
||||
exit(7)
|
||||
except FileNotFoundError:
|
||||
print("Essential WhatsApp files are missing from the iOS backup.")
|
||||
exit(6)
|
||||
else:
|
||||
print("Done")
|
||||
class BackupExtractor:
|
||||
"""
|
||||
A class to handle the extraction of WhatsApp data from iOS backups,
|
||||
including encrypted and unencrypted backups.
|
||||
"""
|
||||
|
||||
def extract_progress_handler(file_id, domain, relative_path, n, total_files):
|
||||
if n % 100 == 0:
|
||||
print(f"Decrypting and extracting files...({n}/{total_files})", end="\r")
|
||||
return True
|
||||
def __init__(self, base_dir, identifiers, decrypt_chunk_size):
|
||||
self.base_dir = base_dir
|
||||
self.identifiers = identifiers
|
||||
self.decrypt_chunk_size = decrypt_chunk_size
|
||||
|
||||
backup.extract_files(
|
||||
domain_like=identifiers.DOMAIN,
|
||||
output_folder=identifiers.DOMAIN,
|
||||
preserve_folders=True,
|
||||
filter_callback=extract_progress_handler
|
||||
)
|
||||
print(f"All required files are decrypted and extracted. ", end="\n")
|
||||
return backup
|
||||
|
||||
|
||||
def is_encrypted(base_dir):
|
||||
with sqlite3.connect(os.path.join(base_dir, "Manifest.db")) as f:
|
||||
c = f.cursor()
|
||||
try:
|
||||
c.execute("""SELECT count()
|
||||
FROM Files
|
||||
""")
|
||||
except sqlite3.OperationalError as e:
|
||||
raise e # These error cannot be used to determine if the backup is encrypted
|
||||
except sqlite3.DatabaseError:
|
||||
return True
|
||||
def extract(self):
|
||||
"""
|
||||
Extracts WhatsApp data from the backup based on whether it's encrypted or not.
|
||||
"""
|
||||
if self._is_encrypted():
|
||||
self._extract_encrypted_backup()
|
||||
else:
|
||||
return False
|
||||
self._extract_unencrypted_backup()
|
||||
|
||||
def _is_encrypted(self):
|
||||
"""
|
||||
Checks if the iOS backup is encrypted.
|
||||
|
||||
def extract_media(base_dir, identifiers, decrypt_chunk_size):
|
||||
if is_encrypted(base_dir):
|
||||
Returns:
|
||||
bool: True if encrypted, False otherwise.
|
||||
"""
|
||||
with sqlite3.connect(os.path.join(self.base_dir, "Manifest.db")) as db:
|
||||
c = db.cursor()
|
||||
try:
|
||||
c.execute("SELECT count() FROM Files")
|
||||
c.fetchone() # Execute and fetch to trigger potential errors
|
||||
except (sqlite3.OperationalError, sqlite3.DatabaseError):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def _extract_encrypted_backup(self):
|
||||
"""
|
||||
Handles the extraction of data from an encrypted iOS backup.
|
||||
"""
|
||||
if not support_encrypted:
|
||||
print("You don't have the dependencies to handle encrypted backup.")
|
||||
print("Read more on how to deal with encrypted backup:")
|
||||
print("https://github.com/KnugiHK/Whatsapp-Chat-Exporter/blob/main/README.md#usage")
|
||||
return False
|
||||
return
|
||||
|
||||
print("Encryption detected on the backup!")
|
||||
password = getpass.getpass("Enter the password for the backup:")
|
||||
extract_encrypted(base_dir, password, identifiers, decrypt_chunk_size)
|
||||
else:
|
||||
wts_db = os.path.join(base_dir, identifiers.MESSAGE[:2], identifiers.MESSAGE)
|
||||
contact_db = os.path.join(base_dir, identifiers.CONTACT[:2], identifiers.CONTACT)
|
||||
call_db = os.path.join(base_dir, identifiers.CALL[:2], identifiers.CALL)
|
||||
if not os.path.isfile(wts_db):
|
||||
if identifiers is WhatsAppIdentifier:
|
||||
self._decrypt_backup(password)
|
||||
self._extract_decrypted_files()
|
||||
|
||||
def _decrypt_backup(self, password):
|
||||
"""
|
||||
Decrypts the iOS backup using the provided password.
|
||||
|
||||
Args:
|
||||
password (str): The password for the encrypted backup.
|
||||
"""
|
||||
print("Trying to decrypt the iOS backup...", end="")
|
||||
self.backup = EncryptedBackup(
|
||||
backup_directory=self.base_dir,
|
||||
passphrase=password,
|
||||
cleanup=False,
|
||||
check_same_thread=False,
|
||||
decrypt_chunk_size=self.decrypt_chunk_size,
|
||||
)
|
||||
print("Done\nDecrypting WhatsApp database...", end="")
|
||||
try:
|
||||
self.backup.extract_file(
|
||||
relative_path=RelativePath.WHATSAPP_MESSAGES,
|
||||
domain_like=self.identifiers.DOMAIN,
|
||||
output_filename=self.identifiers.MESSAGE,
|
||||
)
|
||||
self.backup.extract_file(
|
||||
relative_path=RelativePath.WHATSAPP_CONTACTS,
|
||||
domain_like=self.identifiers.DOMAIN,
|
||||
output_filename=self.identifiers.CONTACT,
|
||||
)
|
||||
self.backup.extract_file(
|
||||
relative_path=RelativePath.WHATSAPP_CALLS,
|
||||
domain_like=self.identifiers.DOMAIN,
|
||||
output_filename=self.identifiers.CALL,
|
||||
)
|
||||
except ValueError:
|
||||
print("Failed to decrypt backup: incorrect password?")
|
||||
exit(7)
|
||||
except FileNotFoundError:
|
||||
print("Essential WhatsApp files are missing from the iOS backup.")
|
||||
exit(6)
|
||||
else:
|
||||
print("Done")
|
||||
|
||||
def _extract_decrypted_files(self):
|
||||
"""Extract all WhatsApp files after decryption"""
|
||||
def extract_progress_handler(file_id, domain, relative_path, n, total_files):
|
||||
if n % 100 == 0:
|
||||
print(f"Decrypting and extracting files...({n}/{total_files})", end="\r")
|
||||
return True
|
||||
|
||||
self.backup.extract_files(
|
||||
domain_like=self.identifiers.DOMAIN,
|
||||
output_folder=self.identifiers.DOMAIN,
|
||||
preserve_folders=True,
|
||||
filter_callback=extract_progress_handler
|
||||
)
|
||||
print(f"All required files are decrypted and extracted. ", end="\n")
|
||||
|
||||
def _extract_unencrypted_backup(self):
|
||||
"""
|
||||
Handles the extraction of data from an unencrypted iOS backup.
|
||||
"""
|
||||
self._copy_whatsapp_databases()
|
||||
self._extract_media_files()
|
||||
|
||||
def _copy_whatsapp_databases(self):
|
||||
"""
|
||||
Copies the WhatsApp message, contact, and call databases to the working directory.
|
||||
"""
|
||||
wts_db_path = os.path.join(self.base_dir, self.identifiers.MESSAGE[:2], self.identifiers.MESSAGE)
|
||||
contact_db_path = os.path.join(self.base_dir, self.identifiers.CONTACT[:2], self.identifiers.CONTACT)
|
||||
call_db_path = os.path.join(self.base_dir, self.identifiers.CALL[:2], self.identifiers.CALL)
|
||||
|
||||
if not os.path.isfile(wts_db_path):
|
||||
if self.identifiers is WhatsAppIdentifier:
|
||||
print("WhatsApp database not found.")
|
||||
else:
|
||||
print("WhatsApp Business database not found.")
|
||||
exit()
|
||||
else:
|
||||
shutil.copyfile(wts_db, identifiers.MESSAGE)
|
||||
if not os.path.isfile(contact_db):
|
||||
shutil.copyfile(wts_db_path, self.identifiers.MESSAGE)
|
||||
|
||||
if not os.path.isfile(contact_db_path):
|
||||
print("Contact database not found. Skipping...")
|
||||
else:
|
||||
shutil.copyfile(contact_db, identifiers.CONTACT)
|
||||
if not os.path.isfile(call_db):
|
||||
shutil.copyfile(contact_db_path, self.identifiers.CONTACT)
|
||||
|
||||
if not os.path.isfile(call_db_path):
|
||||
print("Call database not found. Skipping...")
|
||||
else:
|
||||
shutil.copyfile(call_db, identifiers.CALL)
|
||||
_wts_id = identifiers.DOMAIN
|
||||
with sqlite3.connect(os.path.join(base_dir, "Manifest.db")) as manifest:
|
||||
shutil.copyfile(call_db_path, self.identifiers.CALL)
|
||||
|
||||
def _extract_media_files(self):
|
||||
"""
|
||||
Extracts media files from the unencrypted backup.
|
||||
"""
|
||||
_wts_id = self.identifiers.DOMAIN
|
||||
with sqlite3.connect(os.path.join(self.base_dir, "Manifest.db")) as manifest:
|
||||
manifest.row_factory = sqlite3.Row
|
||||
c = manifest.cursor()
|
||||
c.execute(
|
||||
f"""SELECT count()
|
||||
FROM Files
|
||||
WHERE domain = '{_wts_id}'"""
|
||||
)
|
||||
c.execute(f"SELECT count() FROM Files WHERE domain = '{_wts_id}'")
|
||||
total_row_number = c.fetchone()[0]
|
||||
print(f"Extracting WhatsApp files...(0/{total_row_number})", end="\r")
|
||||
c.execute(f"""SELECT fileID,
|
||||
relativePath,
|
||||
flags,
|
||||
file AS metadata,
|
||||
ROW_NUMBER() OVER(ORDER BY relativePath) AS _index
|
||||
FROM Files
|
||||
WHERE domain = '{_wts_id}'
|
||||
ORDER BY relativePath""")
|
||||
c.execute(
|
||||
f"""
|
||||
SELECT fileID, relativePath, flags, file AS metadata,
|
||||
ROW_NUMBER() OVER(ORDER BY relativePath) AS _index
|
||||
FROM Files
|
||||
WHERE domain = '{_wts_id}'
|
||||
ORDER BY relativePath
|
||||
"""
|
||||
)
|
||||
if not os.path.isdir(_wts_id):
|
||||
os.mkdir(_wts_id)
|
||||
|
||||
row = c.fetchone()
|
||||
while row is not None:
|
||||
if row["relativePath"] == "":
|
||||
if not row["relativePath"]: # Skip empty relative paths
|
||||
row = c.fetchone()
|
||||
continue
|
||||
|
||||
destination = os.path.join(_wts_id, row["relativePath"])
|
||||
hashes = row["fileID"]
|
||||
folder = hashes[:2]
|
||||
flags = row["flags"]
|
||||
if flags == 2:
|
||||
|
||||
if flags == 2: # Directory
|
||||
try:
|
||||
os.mkdir(destination)
|
||||
except FileExistsError:
|
||||
pass
|
||||
elif flags == 1:
|
||||
shutil.copyfile(os.path.join(base_dir, folder, hashes), destination)
|
||||
elif flags == 1: # File
|
||||
shutil.copyfile(os.path.join(self.base_dir, folder, hashes), destination)
|
||||
metadata = BPListReader(row["metadata"]).parse()
|
||||
creation = metadata["$objects"][1]["Birth"]
|
||||
modification = metadata["$objects"][1]["LastModified"]
|
||||
os.utime(destination, (modification, modification))
|
||||
|
||||
if row["_index"] % 100 == 0:
|
||||
print(f"Extracting WhatsApp files...({row['_index']}/{total_row_number})", end="\r")
|
||||
row = c.fetchone()
|
||||
print(f"Extracting WhatsApp files...({total_row_number}/{total_row_number})", end="\n")
|
||||
|
||||
|
||||
def extract_media(base_dir, identifiers, decrypt_chunk_size):
|
||||
"""
|
||||
Extracts WhatsApp data (media, messages, contacts, calls) from an iOS backup.
|
||||
|
||||
Args:
|
||||
base_dir (str): The path to the iOS backup directory.
|
||||
identifiers (WhatsAppIdentifier): An object containing WhatsApp file identifiers.
|
||||
decrypt_chunk_size (int): The chunk size for decryption.
|
||||
"""
|
||||
extractor = BackupExtractor(base_dir, identifiers, decrypt_chunk_size)
|
||||
extractor.extract()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user