mirror of
https://github.com/KnugiHK/WhatsApp-Chat-Exporter.git
synced 2026-02-11 03:32:25 +00:00
Refactoring so that no file needs to be introduced
This commit is contained in:
@@ -7,7 +7,7 @@ import json
|
||||
import string
|
||||
import glob
|
||||
from Whatsapp_Chat_Exporter import extract_exported, extract_iphone
|
||||
from Whatsapp_Chat_Exporter import extract, extract_iphone_media, extract_iphone_media_smb
|
||||
from Whatsapp_Chat_Exporter import extract, extract_iphone_media
|
||||
from Whatsapp_Chat_Exporter.data_model import ChatStore
|
||||
from Whatsapp_Chat_Exporter.utility import Crypt, check_update, import_from_json
|
||||
from argparse import ArgumentParser, SUPPRESS
|
||||
@@ -274,24 +274,19 @@ def main():
|
||||
media = extract_iphone.media
|
||||
vcard = extract_iphone.vcard
|
||||
create_html = extract.create_html
|
||||
if args.business:
|
||||
from Whatsapp_Chat_Exporter.utility import WhatsAppBusinessIdentifier as identifiers
|
||||
else:
|
||||
from Whatsapp_Chat_Exporter.utility import WhatsAppIdentifier as identifiers
|
||||
if args.media is None:
|
||||
if args.smb:
|
||||
args.media = "AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared"
|
||||
else:
|
||||
args.media = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared"
|
||||
args.media = identifiers.DOMAIN
|
||||
if args.backup is not None:
|
||||
if not os.path.isdir(args.media):
|
||||
if args.smb:
|
||||
extract_iphone_media_smb.extract_media(args.backup)
|
||||
else:
|
||||
extract_iphone_media.extract_media(args.backup)
|
||||
extract_iphone_media.extract_media(args.backup, identifiers)
|
||||
else:
|
||||
print("WhatsApp directory already exists, skipping WhatsApp file extraction.")
|
||||
if args.db is None:
|
||||
if args.smb:
|
||||
msg_db = "724bd3b98b18518b455a87c1f3ac3a0d189c4466"
|
||||
else:
|
||||
msg_db = "7c7fba66680ef796b916b067077cc246adacf01d"
|
||||
msg_db = identifiers.MESSAGE
|
||||
else:
|
||||
msg_db = args.db
|
||||
if args.wa is None:
|
||||
|
||||
@@ -6,6 +6,7 @@ import os
|
||||
import time
|
||||
import getpass
|
||||
import threading
|
||||
from Whatsapp_Chat_Exporter.utility import WhatsAppIdentifier
|
||||
try:
|
||||
from iphone_backup_decrypt import EncryptedBackup, RelativePath
|
||||
from iphone_backup_decrypt import FailedToDecryptError, Domain
|
||||
@@ -15,7 +16,7 @@ else:
|
||||
support_encrypted = True
|
||||
|
||||
|
||||
def extract_encrypted(base_dir, password):
|
||||
def extract_encrypted(base_dir, password, identifiers):
|
||||
backup = EncryptedBackup(backup_directory=base_dir, passphrase=password, cleanup=False, check_same_thread=False)
|
||||
print("Decrypting WhatsApp database...")
|
||||
try:
|
||||
@@ -61,7 +62,7 @@ def is_encrypted(base_dir):
|
||||
return False
|
||||
|
||||
|
||||
def extract_media(base_dir):
|
||||
def extract_media(base_dir, identifiers):
|
||||
if is_encrypted(base_dir):
|
||||
if not support_encrypted:
|
||||
print("You don't have the dependencies to handle encrypted backup.")
|
||||
@@ -70,20 +71,23 @@ def extract_media(base_dir):
|
||||
return False
|
||||
print("Encryption detected on the backup!")
|
||||
password = getpass.getpass("Enter the password for the backup:")
|
||||
extract_encrypted(base_dir, password)
|
||||
extract_encrypted(base_dir, password, identifiers)
|
||||
else:
|
||||
wts_db = os.path.join(base_dir, "7c/7c7fba66680ef796b916b067077cc246adacf01d")
|
||||
contact_db = os.path.join(base_dir, "b8/b8548dc30aa1030df0ce18ef08b882cf7ab5212f")
|
||||
wts_db = os.path.join(base_dir, identifiers.MESSAGE[:2], identifiers.MESSAGE)
|
||||
contact_db = os.path.join(base_dir, identifiers.CONTACT[:2], identifiers.CONTACT)
|
||||
if not os.path.isfile(wts_db):
|
||||
print("WhatsApp database not found.")
|
||||
if identifiers is WhatsAppIdentifier:
|
||||
print("WhatsApp database not found.")
|
||||
else:
|
||||
print("WhatsApp Business database not found.")
|
||||
exit()
|
||||
else:
|
||||
shutil.copyfile(wts_db, "7c7fba66680ef796b916b067077cc246adacf01d")
|
||||
shutil.copyfile(wts_db, identifiers.MESSAGE)
|
||||
if not os.path.isfile(contact_db):
|
||||
print("Contact database not found.")
|
||||
exit()
|
||||
else:
|
||||
shutil.copyfile(contact_db, "b8548dc30aa1030df0ce18ef08b882cf7ab5212f")
|
||||
shutil.copyfile(contact_db, identifiers.CONTACT)
|
||||
_wts_id = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared"
|
||||
with sqlite3.connect(os.path.join(base_dir, "Manifest.db")) as manifest:
|
||||
manifest.row_factory = sqlite3.Row
|
||||
|
||||
@@ -1,126 +0,0 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import shutil
|
||||
import sqlite3
|
||||
import os
|
||||
import time
|
||||
import getpass
|
||||
import threading
|
||||
try:
|
||||
from iphone_backup_decrypt import EncryptedBackup, RelativePath
|
||||
from iphone_backup_decrypt import FailedToDecryptError, Domain
|
||||
except ModuleNotFoundError:
|
||||
support_encrypted = False
|
||||
else:
|
||||
support_encrypted = True
|
||||
|
||||
|
||||
def extract_encrypted(base_dir, password):
|
||||
backup = EncryptedBackup(backup_directory=base_dir, passphrase=password, cleanup=False, check_same_thread=False)
|
||||
print("Decrypting WhatsApp database...")
|
||||
try:
|
||||
backup.extract_file(relative_path=RelativePath.WHATSAPP_MESSAGES,
|
||||
output_filename="724bd3b98b18518b455a87c1f3ac3a0d189c4466")
|
||||
backup.extract_file(relative_path=RelativePath.WHATSAPP_CONTACTS,
|
||||
output_filename="d7246a707f51ddf8b17ee2dddabd9e0a4da5c552")
|
||||
except FailedToDecryptError:
|
||||
print("Failed to decrypt backup: incorrect password?")
|
||||
exit()
|
||||
extract_thread = threading.Thread(
|
||||
target=backup.extract_files_by_domain,
|
||||
args=(Domain.WHATSAPP, Domain.WHATSAPP)
|
||||
)
|
||||
extract_thread.daemon = True
|
||||
extract_thread.start()
|
||||
dot = 0
|
||||
while extract_thread.is_alive():
|
||||
print(f"Decrypting and extracting files{'.' * dot}{' ' * (3 - dot)}", end="\r")
|
||||
if dot < 3:
|
||||
dot += 1
|
||||
time.sleep(0.5)
|
||||
else:
|
||||
dot = 0
|
||||
time.sleep(0.4)
|
||||
print(f"All required files decrypted and extracted.", end="\n")
|
||||
extract_thread.handled = True
|
||||
return backup
|
||||
|
||||
|
||||
def is_encrypted(base_dir):
|
||||
with sqlite3.connect(os.path.join(base_dir, "Manifest.db")) as f:
|
||||
c = f.cursor()
|
||||
try:
|
||||
c.execute("""SELECT count()
|
||||
FROM Files
|
||||
""")
|
||||
except sqlite3.OperationalError as e:
|
||||
raise e # These error cannot be used to determine if the backup is encrypted
|
||||
except sqlite3.DatabaseError:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def extract_media(base_dir):
|
||||
if is_encrypted(base_dir):
|
||||
if not support_encrypted:
|
||||
print("You don't have the dependencies to handle encrypted backup.")
|
||||
print("Read more on how to deal with encrypted backup:")
|
||||
print("https://github.com/KnugiHK/Whatsapp-Chat-Exporter/blob/main/README.md#usage")
|
||||
return False
|
||||
print("Encryption detected on the backup!")
|
||||
password = getpass.getpass("Enter the password for the backup:")
|
||||
extract_encrypted(base_dir, password)
|
||||
else:
|
||||
wts_db = os.path.join(base_dir, "72/724bd3b98b18518b455a87c1f3ac3a0d189c4466")
|
||||
contact_db = os.path.join(base_dir, "d7/d7246a707f51ddf8b17ee2dddabd9e0a4da5c552")
|
||||
if not os.path.isfile(wts_db):
|
||||
print("WhatsApp database not found.")
|
||||
exit()
|
||||
else:
|
||||
shutil.copyfile(wts_db, "724bd3b98b18518b455a87c1f3ac3a0d189c4466")
|
||||
if not os.path.isfile(contact_db):
|
||||
print("Contact database not found.")
|
||||
exit()
|
||||
else:
|
||||
shutil.copyfile(contact_db, "d7246a707f51ddf8b17ee2dddabd9e0a4da5c552")
|
||||
_wts_id = "AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared"
|
||||
with sqlite3.connect(os.path.join(base_dir, "Manifest.db")) as manifest:
|
||||
manifest.row_factory = sqlite3.Row
|
||||
c = manifest.cursor()
|
||||
c.execute(
|
||||
f"""SELECT count()
|
||||
FROM Files
|
||||
WHERE domain = '{_wts_id}'"""
|
||||
)
|
||||
total_row_number = c.fetchone()[0]
|
||||
print(f"Extracting WhatsApp files...(0/{total_row_number})", end="\r")
|
||||
c.execute(f"""SELECT fileID,
|
||||
relativePath,
|
||||
flags,
|
||||
ROW_NUMBER() OVER(ORDER BY relativePath) AS _index
|
||||
FROM Files
|
||||
WHERE domain = '{_wts_id}'
|
||||
ORDER BY relativePath""")
|
||||
if not os.path.isdir(_wts_id):
|
||||
os.mkdir(_wts_id)
|
||||
row = c.fetchone()
|
||||
while row is not None:
|
||||
if row["relativePath"] == "":
|
||||
row = c.fetchone()
|
||||
continue
|
||||
destination = os.path.join(_wts_id, row["relativePath"])
|
||||
hashes = row["fileID"]
|
||||
folder = hashes[:2]
|
||||
flags = row["flags"]
|
||||
if flags == 2:
|
||||
try:
|
||||
os.mkdir(destination)
|
||||
except FileExistsError:
|
||||
pass
|
||||
elif flags == 1:
|
||||
shutil.copyfile(os.path.join(base_dir, folder, hashes), destination)
|
||||
if row["_index"] % 100 == 0:
|
||||
print(f"Extracting WhatsApp files...({row['_index']}/{total_row_number})", end="\r")
|
||||
row = c.fetchone()
|
||||
print(f"Extracting WhatsApp files...({total_row_number}/{total_row_number})", end="\n")
|
||||
@@ -290,3 +290,15 @@ def setup_template(template, no_avatar):
|
||||
|
||||
# iOS Specific
|
||||
APPLE_TIME = datetime.timestamp(datetime(2001, 1, 1))
|
||||
|
||||
|
||||
class WhatsAppIdentifier(StrEnum):
|
||||
MESSAGE = "7c7fba66680ef796b916b067077cc246adacf01d"
|
||||
CONTACT = "b8548dc30aa1030df0ce18ef08b882cf7ab5212f"
|
||||
DOMAIN = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared"
|
||||
|
||||
|
||||
class WhatsAppBusinessIdentifier(StrEnum):
|
||||
MESSAGE = "724bd3b98b18518b455a87c1f3ac3a0d189c4466"
|
||||
CONTACT = "d7246a707f51ddf8b17ee2dddabd9e0a4da5c552"
|
||||
DOMAIN = "AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared"
|
||||
|
||||
Reference in New Issue
Block a user