This commit is contained in:
KnugiHK
2025-05-05 17:13:43 +08:00
parent 3220ed2d3f
commit a58dd78be8
4 changed files with 224 additions and 171 deletions

View File

@@ -34,12 +34,12 @@ def setup_argument_parser() -> ArgumentParser:
"""Set up and return the argument parser with all options.""" """Set up and return the argument parser with all options."""
parser = ArgumentParser( parser = ArgumentParser(
description='A customizable Android and iOS/iPadOS WhatsApp database parser that ' description='A customizable Android and iOS/iPadOS WhatsApp database parser that '
'will give you the history of your WhatsApp conversations in HTML ' 'will give you the history of your WhatsApp conversations in HTML '
'and JSON. Android Backup Crypt12, Crypt14 and Crypt15 supported.', 'and JSON. Android Backup Crypt12, Crypt14 and Crypt15 supported.',
epilog=f'WhatsApp Chat Exporter: {importlib.metadata.version("whatsapp_chat_exporter")} Licensed with MIT. See ' epilog=f'WhatsApp Chat Exporter: {importlib.metadata.version("whatsapp_chat_exporter")} Licensed with MIT. See '
'https://wts.knugi.dev/docs?dest=osl for all open source licenses.' 'https://wts.knugi.dev/docs?dest=osl for all open source licenses.'
) )
# Device type arguments # Device type arguments
device_group = parser.add_argument_group('Device Type') device_group = parser.add_argument_group('Device Type')
device_group.add_argument( device_group.add_argument(
@@ -54,7 +54,7 @@ def setup_argument_parser() -> ArgumentParser:
"-e", "--exported", dest="exported", default=None, "-e", "--exported", dest="exported", default=None,
help="Define the target as exported chat file and specify the path to the file" help="Define the target as exported chat file and specify the path to the file"
) )
# Input file paths # Input file paths
input_group = parser.add_argument_group('Input Files') input_group = parser.add_argument_group('Input Files')
input_group.add_argument( input_group.add_argument(
@@ -86,7 +86,7 @@ def setup_argument_parser() -> ArgumentParser:
"--wab", "--wa-backup", dest="wab", default=None, "--wab", "--wa-backup", dest="wab", default=None,
help="Path to contact database in crypt15 format" help="Path to contact database in crypt15 format"
) )
# Output options # Output options
output_group = parser.add_argument_group('Output Options') output_group = parser.add_argument_group('Output Options')
output_group.add_argument( output_group.add_argument(
@@ -109,7 +109,7 @@ def setup_argument_parser() -> ArgumentParser:
"--size", "--output-size", "--split", dest="size", nargs='?', const=0, default=None, "--size", "--output-size", "--split", dest="size", nargs='?', const=0, default=None,
help="Maximum (rough) size of a single output file in bytes, 0 for auto" help="Maximum (rough) size of a single output file in bytes, 0 for auto"
) )
# JSON formatting options # JSON formatting options
json_group = parser.add_argument_group('JSON Options') json_group = parser.add_argument_group('JSON Options')
json_group.add_argument( json_group.add_argument(
@@ -128,7 +128,7 @@ def setup_argument_parser() -> ArgumentParser:
"--import", dest="import_json", default=False, action='store_true', "--import", dest="import_json", default=False, action='store_true',
help="Import JSON file and convert to HTML output" help="Import JSON file and convert to HTML output"
) )
# HTML options # HTML options
html_group = parser.add_argument_group('HTML Options') html_group = parser.add_argument_group('HTML Options')
html_group.add_argument( html_group.add_argument(
@@ -155,7 +155,7 @@ def setup_argument_parser() -> ArgumentParser:
"--headline", dest="headline", default="Chat history with ??", "--headline", dest="headline", default="Chat history with ??",
help="The custom headline for the HTML output. Use '??' as a placeholder for the chat name" help="The custom headline for the HTML output. Use '??' as a placeholder for the chat name"
) )
# Media handling # Media handling
media_group = parser.add_argument_group('Media Handling') media_group = parser.add_argument_group('Media Handling')
media_group.add_argument( media_group.add_argument(
@@ -166,7 +166,7 @@ def setup_argument_parser() -> ArgumentParser:
"--create-separated-media", dest="separate_media", default=False, action='store_true', "--create-separated-media", dest="separate_media", default=False, action='store_true',
help="Create a copy of the media seperated per chat in <MEDIA>/separated/ directory" help="Create a copy of the media seperated per chat in <MEDIA>/separated/ directory"
) )
# Filtering options # Filtering options
filter_group = parser.add_argument_group('Filtering Options') filter_group = parser.add_argument_group('Filtering Options')
filter_group.add_argument( filter_group.add_argument(
@@ -195,7 +195,7 @@ def setup_argument_parser() -> ArgumentParser:
"Setting this flag will cause the exporter to render those. " "Setting this flag will cause the exporter to render those. "
"This is useful if chat(s) are missing from the output") "This is useful if chat(s) are missing from the output")
) )
# Contact enrichment # Contact enrichment
contact_group = parser.add_argument_group('Contact Enrichment') contact_group = parser.add_argument_group('Contact Enrichment')
contact_group.add_argument( contact_group.add_argument(
@@ -219,7 +219,7 @@ def setup_argument_parser() -> ArgumentParser:
"The chats (JSON files only) and media from the source directory will be merged into the target directory. " "The chats (JSON files only) and media from the source directory will be merged into the target directory. "
"No chat messages or media will be deleted from the target directory; only new chat messages and media will be added to it. " "No chat messages or media will be deleted from the target directory; only new chat messages and media will be added to it. "
"This enables chat messages and media to be deleted from the device to free up space, while ensuring they are preserved in the exported backups." "This enables chat messages and media to be deleted from the device to free up space, while ensuring they are preserved in the exported backups."
) )
) )
inc_merging_group.add_argument( inc_merging_group.add_argument(
"--source-dir", "--source-dir",
@@ -233,7 +233,7 @@ def setup_argument_parser() -> ArgumentParser:
default=None, default=None,
help="Sets the target directory. Used for performing incremental merges." help="Sets the target directory. Used for performing incremental merges."
) )
# Miscellaneous # Miscellaneous
misc_group = parser.add_argument_group('Miscellaneous') misc_group = parser.add_argument_group('Miscellaneous')
misc_group.add_argument( misc_group.add_argument(
@@ -260,7 +260,7 @@ def setup_argument_parser() -> ArgumentParser:
"--max-bruteforce-worker", dest="max_bruteforce_worker", default=10, type=int, "--max-bruteforce-worker", dest="max_bruteforce_worker", default=10, type=int,
help="Specify the maximum number of worker for bruteforce decryption." help="Specify the maximum number of worker for bruteforce decryption."
) )
return parser return parser
@@ -272,52 +272,59 @@ def validate_args(parser: ArgumentParser, args) -> None:
if not args.android and not args.ios and not args.exported and not args.import_json: if not args.android and not args.ios and not args.exported and not args.import_json:
parser.error("You must define the device type.") parser.error("You must define the device type.")
if args.no_html and not args.json and not args.text_format: if args.no_html and not args.json and not args.text_format:
parser.error("You must either specify a JSON output file, text file output directory or enable HTML output.") parser.error(
"You must either specify a JSON output file, text file output directory or enable HTML output.")
if args.import_json and (args.android or args.ios or args.exported or args.no_html): if args.import_json and (args.android or args.ios or args.exported or args.no_html):
parser.error("You can only use --import with -j and without --no-html, -a, -i, -e.") parser.error(
"You can only use --import with -j and without --no-html, -a, -i, -e.")
elif args.import_json and not os.path.isfile(args.json): elif args.import_json and not os.path.isfile(args.json):
parser.error("JSON file not found.") parser.error("JSON file not found.")
if args.incremental_merge and (args.source_dir is None or args.target_dir is None): if args.incremental_merge and (args.source_dir is None or args.target_dir is None):
parser.error("You must specify both --source-dir and --target-dir for incremental merge.") parser.error(
"You must specify both --source-dir and --target-dir for incremental merge.")
if args.android and args.business: if args.android and args.business:
parser.error("WhatsApp Business is only available on iOS for now.") parser.error("WhatsApp Business is only available on iOS for now.")
if "??" not in args.headline: if "??" not in args.headline:
parser.error("--headline must contain '??' for replacement.") parser.error("--headline must contain '??' for replacement.")
# JSON validation # JSON validation
if args.json_per_chat and args.json and ( if args.json_per_chat and args.json and (
(args.json.endswith(".json") and os.path.isfile(args.json)) or (args.json.endswith(".json") and os.path.isfile(args.json)) or
(not args.json.endswith(".json") and os.path.isfile(args.json)) (not args.json.endswith(".json") and os.path.isfile(args.json))
): ):
parser.error("When --per-chat is enabled, the destination of --json must be a directory.") parser.error(
"When --per-chat is enabled, the destination of --json must be a directory.")
# vCards validation # vCards validation
if args.enrich_from_vcards is not None and args.default_country_code is None: if args.enrich_from_vcards is not None and args.default_country_code is None:
parser.error("When --enrich-from-vcards is provided, you must also set --default-country-code") parser.error(
"When --enrich-from-vcards is provided, you must also set --default-country-code")
# Size validation # Size validation
if args.size is not None and not isinstance(args.size, int) and not args.size.isnumeric(): if args.size is not None and not isinstance(args.size, int) and not args.size.isnumeric():
try: try:
args.size = readable_to_bytes(args.size) args.size = readable_to_bytes(args.size)
except ValueError: except ValueError:
parser.error("The value for --split must be ended in pure bytes or with a proper unit (e.g., 1048576 or 1MB)") parser.error(
"The value for --split must be ended in pure bytes or with a proper unit (e.g., 1048576 or 1MB)")
# Date filter validation and processing # Date filter validation and processing
if args.filter_date is not None: if args.filter_date is not None:
process_date_filter(parser, args) process_date_filter(parser, args)
# Crypt15 key validation # Crypt15 key validation
if args.key is None and args.backup is not None and args.backup.endswith("crypt15"): if args.key is None and args.backup is not None and args.backup.endswith("crypt15"):
args.key = getpass("Enter your encryption key: ") args.key = getpass("Enter your encryption key: ")
# Theme validation # Theme validation
if args.whatsapp_theme: if args.whatsapp_theme:
args.template = "whatsapp_new.html" args.template = "whatsapp_new.html"
# Chat filter validation # Chat filter validation
if args.filter_chat_include is not None and args.filter_chat_exclude is not None: if args.filter_chat_include is not None and args.filter_chat_exclude is not None:
parser.error("Chat inclusion and exclusion filters cannot be used together.") parser.error(
"Chat inclusion and exclusion filters cannot be used together.")
validate_chat_filters(parser, args.filter_chat_include) validate_chat_filters(parser, args.filter_chat_include)
validate_chat_filters(parser, args.filter_chat_exclude) validate_chat_filters(parser, args.filter_chat_exclude)
@@ -327,21 +334,24 @@ def validate_chat_filters(parser: ArgumentParser, chat_filter: Optional[List[str
if chat_filter is not None: if chat_filter is not None:
for chat in chat_filter: for chat in chat_filter:
if not chat.isnumeric(): if not chat.isnumeric():
parser.error("Enter a phone number in the chat filter. See https://wts.knugi.dev/docs?dest=chat") parser.error(
"Enter a phone number in the chat filter. See https://wts.knugi.dev/docs?dest=chat")
def process_date_filter(parser: ArgumentParser, args) -> None: def process_date_filter(parser: ArgumentParser, args) -> None:
"""Process and validate date filter arguments.""" """Process and validate date filter arguments."""
if " - " in args.filter_date: if " - " in args.filter_date:
start, end = args.filter_date.split(" - ") start, end = args.filter_date.split(" - ")
start = int(datetime.strptime(start, args.filter_date_format).timestamp()) start = int(datetime.strptime(
start, args.filter_date_format).timestamp())
end = int(datetime.strptime(end, args.filter_date_format).timestamp()) end = int(datetime.strptime(end, args.filter_date_format).timestamp())
if start < 1009843200 or end < 1009843200: if start < 1009843200 or end < 1009843200:
parser.error("WhatsApp was first released in 2009...") parser.error("WhatsApp was first released in 2009...")
if start > end: if start > end:
parser.error("The start date cannot be a moment after the end date.") parser.error(
"The start date cannot be a moment after the end date.")
if args.android: if args.android:
args.filter_date = f"BETWEEN {start}000 AND {end}000" args.filter_date = f"BETWEEN {start}000 AND {end}000"
elif args.ios: elif args.ios:
@@ -353,13 +363,15 @@ def process_date_filter(parser: ArgumentParser, args) -> None:
def process_single_date_filter(parser: ArgumentParser, args) -> None: def process_single_date_filter(parser: ArgumentParser, args) -> None:
"""Process single date comparison filters.""" """Process single date comparison filters."""
if len(args.filter_date) < 3: if len(args.filter_date) < 3:
parser.error("Unsupported date format. See https://wts.knugi.dev/docs?dest=date") parser.error(
"Unsupported date format. See https://wts.knugi.dev/docs?dest=date")
_timestamp = int(datetime.strptime(args.filter_date[2:], args.filter_date_format).timestamp())
_timestamp = int(datetime.strptime(
args.filter_date[2:], args.filter_date_format).timestamp())
if _timestamp < 1009843200: if _timestamp < 1009843200:
parser.error("WhatsApp was first released in 2009...") parser.error("WhatsApp was first released in 2009...")
if args.filter_date[:2] == "> ": if args.filter_date[:2] == "> ":
if args.android: if args.android:
args.filter_date = f">= {_timestamp}000" args.filter_date = f">= {_timestamp}000"
@@ -371,7 +383,8 @@ def process_single_date_filter(parser: ArgumentParser, args) -> None:
elif args.ios: elif args.ios:
args.filter_date = f"<= {_timestamp - APPLE_TIME}" args.filter_date = f"<= {_timestamp - APPLE_TIME}"
else: else:
parser.error("Unsupported date format. See https://wts.knugi.dev/docs?dest=date") parser.error(
"Unsupported date format. See https://wts.knugi.dev/docs?dest=date")
def setup_contact_store(args) -> Optional['ContactsFromVCards']: def setup_contact_store(args) -> Optional['ContactsFromVCards']:
@@ -385,7 +398,8 @@ def setup_contact_store(args) -> Optional['ContactsFromVCards']:
) )
exit(1) exit(1)
contact_store = ContactsFromVCards() contact_store = ContactsFromVCards()
contact_store.load_vcf_file(args.enrich_from_vcards, args.default_country_code) contact_store.load_vcf_file(
args.enrich_from_vcards, args.default_country_code)
return contact_store return contact_store
return None return None
@@ -395,9 +409,9 @@ def decrypt_android_backup(args) -> int:
if args.key is None or args.backup is None: if args.key is None or args.backup is None:
print("You must specify the backup file with -b and a key with -k") print("You must specify the backup file with -b and a key with -k")
return 1 return 1
print("Decryption key specified, decrypting WhatsApp backup...") print("Decryption key specified, decrypting WhatsApp backup...")
# Determine crypt type # Determine crypt type
if "crypt12" in args.backup: if "crypt12" in args.backup:
crypt = Crypt.CRYPT12 crypt = Crypt.CRYPT12
@@ -408,7 +422,7 @@ def decrypt_android_backup(args) -> int:
else: else:
print("Unknown backup format. The backup file must be crypt12, crypt14 or crypt15.") print("Unknown backup format. The backup file must be crypt12, crypt14 or crypt15.")
return 1 return 1
# Get key # Get key
keyfile_stream = False keyfile_stream = False
if not os.path.isfile(args.key) and all(char in string.hexdigits for char in args.key.replace(" ", "")): if not os.path.isfile(args.key) and all(char in string.hexdigits for char in args.key.replace(" ", "")):
@@ -416,10 +430,10 @@ def decrypt_android_backup(args) -> int:
else: else:
key = open(args.key, "rb") key = open(args.key, "rb")
keyfile_stream = True keyfile_stream = True
# Read backup # Read backup
db = open(args.backup, "rb").read() db = open(args.backup, "rb").read()
# Process WAB if provided # Process WAB if provided
error_wa = 0 error_wa = 0
if args.wab: if args.wab:
@@ -436,7 +450,7 @@ def decrypt_android_backup(args) -> int:
) )
if isinstance(key, io.IOBase): if isinstance(key, io.IOBase):
key.seek(0) key.seek(0)
# Decrypt message database # Decrypt message database
error_message = android_crypt.decrypt_backup( error_message = android_crypt.decrypt_backup(
db, db,
@@ -448,7 +462,7 @@ def decrypt_android_backup(args) -> int:
keyfile_stream=keyfile_stream, keyfile_stream=keyfile_stream,
max_worker=args.max_bruteforce_worker max_worker=args.max_bruteforce_worker
) )
# Handle errors # Handle errors
if error_wa != 0: if error_wa != 0:
return error_wa return error_wa
@@ -473,7 +487,7 @@ def handle_decrypt_error(error: int) -> None:
def process_contacts(args, data: ChatCollection, contact_store=None) -> None: def process_contacts(args, data: ChatCollection, contact_store=None) -> None:
"""Process contacts from the database.""" """Process contacts from the database."""
contact_db = args.wa if args.wa else "wa.db" if args.android else "ContactsV2.sqlite" contact_db = args.wa if args.wa else "wa.db" if args.android else "ContactsV2.sqlite"
if os.path.isfile(contact_db): if os.path.isfile(contact_db):
with sqlite3.connect(contact_db) as db: with sqlite3.connect(contact_db) as db:
db.row_factory = sqlite3.Row db.row_factory = sqlite3.Row
@@ -486,42 +500,42 @@ def process_contacts(args, data: ChatCollection, contact_store=None) -> None:
def process_messages(args, data: ChatCollection) -> None: def process_messages(args, data: ChatCollection) -> None:
"""Process messages, media and vcards from the database.""" """Process messages, media and vcards from the database."""
msg_db = args.db if args.db else "msgstore.db" if args.android else args.identifiers.MESSAGE msg_db = args.db if args.db else "msgstore.db" if args.android else args.identifiers.MESSAGE
if not os.path.isfile(msg_db): if not os.path.isfile(msg_db):
print( print(
"The message database does not exist. You may specify the path " "The message database does not exist. You may specify the path "
"to database file with option -d or check your provided path." "to database file with option -d or check your provided path."
) )
exit(6) exit(6)
filter_chat = (args.filter_chat_include, args.filter_chat_exclude) filter_chat = (args.filter_chat_include, args.filter_chat_exclude)
with sqlite3.connect(msg_db) as db: with sqlite3.connect(msg_db) as db:
db.row_factory = sqlite3.Row db.row_factory = sqlite3.Row
# Process messages # Process messages
if args.android: if args.android:
message_handler = android_handler message_handler = android_handler
else: else:
message_handler = ios_handler message_handler = ios_handler
message_handler.messages( message_handler.messages(
db, data, args.media, args.timezone_offset, db, data, args.media, args.timezone_offset,
args.filter_date, filter_chat, args.filter_empty args.filter_date, filter_chat, args.filter_empty
) )
# Process media # Process media
message_handler.media( message_handler.media(
db, data, args.media, args.filter_date, db, data, args.media, args.filter_date,
filter_chat, args.filter_empty, args.separate_media filter_chat, args.filter_empty, args.separate_media
) )
# Process vcards # Process vcards
message_handler.vcard( message_handler.vcard(
db, data, args.media, args.filter_date, db, data, args.media, args.filter_date,
filter_chat, args.filter_empty filter_chat, args.filter_empty
) )
# Process calls # Process calls
process_calls(args, db, data, filter_chat) process_calls(args, db, data, filter_chat)
@@ -540,9 +554,10 @@ def handle_media_directory(args) -> None:
"""Handle media directory copying or moving.""" """Handle media directory copying or moving."""
if os.path.isdir(args.media): if os.path.isdir(args.media):
media_path = os.path.join(args.output, args.media) media_path = os.path.join(args.output, args.media)
if os.path.isdir(media_path): if os.path.isdir(media_path):
print("\nWhatsApp directory already exists in output directory. Skipping...", end="\n") print(
"\nWhatsApp directory already exists in output directory. Skipping...", end="\n")
else: else:
if args.move_media: if args.move_media:
try: try:
@@ -563,7 +578,7 @@ def create_output_files(args, data: ChatCollection, contact_store=None) -> None:
# Enrich from vcards if available # Enrich from vcards if available
if contact_store and not contact_store.is_empty(): if contact_store and not contact_store.is_empty():
contact_store.enrich_from_vcards(data) contact_store.enrich_from_vcards(data)
android_handler.create_html( android_handler.create_html(
data, data,
args.output, args.output,
@@ -575,12 +590,12 @@ def create_output_files(args, data: ChatCollection, contact_store=None) -> None:
args.whatsapp_theme, args.whatsapp_theme,
args.headline args.headline
) )
# Create text files if requested # Create text files if requested
if args.text_format: if args.text_format:
print("Writing text file...") print("Writing text file...")
android_handler.create_txt(data, args.text_format) android_handler.create_txt(data, args.text_format)
# Create JSON files if requested # Create JSON files if requested
if args.json and not args.import_json: if args.json and not args.import_json:
export_json(args, data, contact_store) export_json(args, data, contact_store)
@@ -591,11 +606,11 @@ def export_json(args, data: ChatCollection, contact_store=None) -> None:
# Enrich from vcards if available # Enrich from vcards if available
if contact_store and not contact_store.is_empty(): if contact_store and not contact_store.is_empty():
contact_store.enrich_from_vcards(data) contact_store.enrich_from_vcards(data)
# Convert ChatStore objects to JSON # Convert ChatStore objects to JSON
if isinstance(data.get(next(iter(data), None)), ChatStore): if isinstance(data.get(next(iter(data), None)), ChatStore):
data = {jik: chat.to_json() for jik, chat in data.items()} data = {jik: chat.to_json() for jik, chat in data.items()}
# Export as a single file or per chat # Export as a single file or per chat
if not args.json_per_chat: if not args.json_per_chat:
export_single_json(args, data) export_single_json(args, data)
@@ -619,11 +634,11 @@ def export_multiple_json(args, data: Dict) -> None:
"""Export data to multiple JSON files, one per chat.""" """Export data to multiple JSON files, one per chat."""
# Adjust output path if needed # Adjust output path if needed
json_path = args.json[:-5] if args.json.endswith(".json") else args.json json_path = args.json[:-5] if args.json.endswith(".json") else args.json
# Create directory if it doesn't exist # Create directory if it doesn't exist
if not os.path.isdir(json_path): if not os.path.isdir(json_path):
os.makedirs(json_path, exist_ok=True) os.makedirs(json_path, exist_ok=True)
# Export each chat # Export each chat
total = len(data.keys()) total = len(data.keys())
for index, jik in enumerate(data.keys()): for index, jik in enumerate(data.keys()):
@@ -631,11 +646,11 @@ def export_multiple_json(args, data: Dict) -> None:
contact = data[jik]["name"].replace('/', '') contact = data[jik]["name"].replace('/', '')
else: else:
contact = jik.replace('+', '') contact = jik.replace('+', '')
with open(f"{json_path}/{sanitize_filename(contact)}.json", "w") as f: with open(f"{json_path}/{sanitize_filename(contact)}.json", "w") as f:
file_content = json.dumps( file_content = json.dumps(
{jik: data[jik]}, {jik: data[jik]},
ensure_ascii=not args.avoid_encoding_json, ensure_ascii=not args.avoid_encoding_json,
indent=args.pretty_print_json indent=args.pretty_print_json
) )
f.write(file_content) f.write(file_content)
@@ -646,7 +661,7 @@ def export_multiple_json(args, data: Dict) -> None:
def process_exported_chat(args, data: ChatCollection) -> None: def process_exported_chat(args, data: ChatCollection) -> None:
"""Process an exported chat file.""" """Process an exported chat file."""
exported_handler.messages(args.exported, data, args.assume_first_as_me) exported_handler.messages(args.exported, data, args.assume_first_as_me)
if not args.no_html: if not args.no_html:
android_handler.create_html( android_handler.create_html(
data, data,
@@ -659,7 +674,7 @@ def process_exported_chat(args, data: ChatCollection) -> None:
args.whatsapp_theme, args.whatsapp_theme,
args.headline args.headline
) )
# Copy files to output directory # Copy files to output directory
for file in glob.glob(r'*.*'): for file in glob.glob(r'*.*'):
shutil.copy(file, args.output) shutil.copy(file, args.output)
@@ -670,23 +685,23 @@ def main():
# Set up and parse arguments # Set up and parse arguments
parser = setup_argument_parser() parser = setup_argument_parser()
args = parser.parse_args() args = parser.parse_args()
# Check for updates # Check for updates
if args.check_update: if args.check_update:
exit(check_update()) exit(check_update())
# Validate arguments # Validate arguments
validate_args(parser, args) validate_args(parser, args)
# Create output directory if it doesn't exist # Create output directory if it doesn't exist
os.makedirs(args.output, exist_ok=True) os.makedirs(args.output, exist_ok=True)
# Initialize data collection # Initialize data collection
data = ChatCollection() data = ChatCollection()
# Set up contact store for vCard enrichment if needed # Set up contact store for vCard enrichment if needed
contact_store = setup_contact_store(args) contact_store = setup_contact_store(args)
if args.import_json: if args.import_json:
# Import from JSON # Import from JSON
import_from_json(args.json, data) import_from_json(args.json, data)
@@ -710,13 +725,13 @@ def main():
# Set default media path if not provided # Set default media path if not provided
if args.media is None: if args.media is None:
args.media = "WhatsApp" args.media = "WhatsApp"
# Set default DB paths if not provided # Set default DB paths if not provided
if args.db is None: if args.db is None:
args.db = "msgstore.db" args.db = "msgstore.db"
if args.wa is None: if args.wa is None:
args.wa = "wa.db" args.wa = "wa.db"
# Decrypt backup if needed # Decrypt backup if needed
if args.key is not None: if args.key is not None:
error = decrypt_android_backup(args) error = decrypt_android_backup(args)
@@ -729,24 +744,26 @@ def main():
else: else:
from Whatsapp_Chat_Exporter.utility import WhatsAppIdentifier as identifiers from Whatsapp_Chat_Exporter.utility import WhatsAppIdentifier as identifiers
args.identifiers = identifiers args.identifiers = identifiers
# Set default media path if not provided # Set default media path if not provided
if args.media is None: if args.media is None:
args.media = identifiers.DOMAIN args.media = identifiers.DOMAIN
# Extract media from backup if needed # Extract media from backup if needed
if args.backup is not None: if args.backup is not None:
if not os.path.isdir(args.media): if not os.path.isdir(args.media):
ios_media_handler.extract_media(args.backup, identifiers, args.decrypt_chunk_size) ios_media_handler.extract_media(
args.backup, identifiers, args.decrypt_chunk_size)
else: else:
print("WhatsApp directory already exists, skipping WhatsApp file extraction.") print(
"WhatsApp directory already exists, skipping WhatsApp file extraction.")
# Set default DB paths if not provided # Set default DB paths if not provided
if args.db is None: if args.db is None:
args.db = identifiers.MESSAGE args.db = identifiers.MESSAGE
if args.wa is None: if args.wa is None:
args.wa = "ContactsV2.sqlite" args.wa = "ContactsV2.sqlite"
if args.incremental_merge: if args.incremental_merge:
incremental_merge( incremental_merge(
args.source_dir, args.source_dir,
@@ -756,16 +773,16 @@ def main():
args.avoid_encoding_json args.avoid_encoding_json
) )
print("Incremental merge completed successfully.") print("Incremental merge completed successfully.")
else: else:
# Process contacts # Process contacts
process_contacts(args, data, contact_store) process_contacts(args, data, contact_store)
# Process messages, media, and calls # Process messages, media, and calls
process_messages(args, data) process_messages(args, data)
# Create output files # Create output files
create_output_files(args, data, contact_store) create_output_files(args, data, contact_store)
# Handle media directory # Handle media directory
handle_media_directory(args) handle_media_directory(args)

View File

@@ -7,6 +7,7 @@ class Timing:
""" """
Handles timestamp formatting with timezone support. Handles timestamp formatting with timezone support.
""" """
def __init__(self, timezone_offset: Optional[int]) -> None: def __init__(self, timezone_offset: Optional[int]) -> None:
""" """
Initialize Timing object. Initialize Timing object.
@@ -37,6 +38,7 @@ class TimeZone(tzinfo):
""" """
Custom timezone class with fixed offset. Custom timezone class with fixed offset.
""" """
def __init__(self, offset: int) -> None: def __init__(self, offset: int) -> None:
""" """
Initialize TimeZone object. Initialize TimeZone object.
@@ -151,6 +153,7 @@ class ChatStore:
""" """
Stores chat information and messages. Stores chat information and messages.
""" """
def __init__(self, type: str, name: Optional[str] = None, media: Optional[str] = None) -> None: def __init__(self, type: str, name: Optional[str] = None, media: Optional[str] = None) -> None:
""" """
Initialize ChatStore object. Initialize ChatStore object.
@@ -159,7 +162,7 @@ class ChatStore:
type (str): Device type (IOS or ANDROID) type (str): Device type (IOS or ANDROID)
name (Optional[str]): Chat name name (Optional[str]): Chat name
media (Optional[str]): Path to media folder media (Optional[str]): Path to media folder
Raises: Raises:
TypeError: If name is not a string or None TypeError: If name is not a string or None
""" """
@@ -182,7 +185,7 @@ class ChatStore:
self.their_avatar_thumb = None self.their_avatar_thumb = None
self.status = None self.status = None
self.media_base = "" self.media_base = ""
def __len__(self) -> int: def __len__(self) -> int:
"""Get number of chats. Required for dict-like access.""" """Get number of chats. Required for dict-like access."""
return len(self._messages) return len(self._messages)
@@ -192,7 +195,7 @@ class ChatStore:
if not isinstance(message, Message): if not isinstance(message, Message):
raise TypeError("message must be a Message object") raise TypeError("message must be a Message object")
self._messages[id] = message self._messages[id] = message
def get_message(self, id: str) -> 'Message': def get_message(self, id: str) -> 'Message':
"""Get a message from the chat store.""" """Get a message from the chat store."""
return self._messages.get(id) return self._messages.get(id)
@@ -214,7 +217,7 @@ class ChatStore:
'media_base': self.media_base, 'media_base': self.media_base,
'messages': {id: msg.to_json() for id, msg in self._messages.items()} 'messages': {id: msg.to_json() for id, msg in self._messages.items()}
} }
@classmethod @classmethod
def from_json(cls, data: Dict) -> 'ChatStore': def from_json(cls, data: Dict) -> 'ChatStore':
"""Create a chat store from JSON data.""" """Create a chat store from JSON data."""
@@ -232,7 +235,7 @@ class ChatStore:
def get_last_message(self) -> 'Message': def get_last_message(self) -> 'Message':
"""Get the most recent message in the chat.""" """Get the most recent message in the chat."""
return tuple(self._messages.values())[-1] return tuple(self._messages.values())[-1]
def items(self): def items(self):
"""Get message items pairs.""" """Get message items pairs."""
return self._messages.items() return self._messages.items()
@@ -250,11 +253,11 @@ class ChatStore:
Args: Args:
other (ChatStore): The ChatStore to merge with other (ChatStore): The ChatStore to merge with
""" """
if not isinstance(other, ChatStore): if not isinstance(other, ChatStore):
raise TypeError("Can only merge with another ChatStore object") raise TypeError("Can only merge with another ChatStore object")
# Update fields if they are not None in the other ChatStore # Update fields if they are not None in the other ChatStore
self.name = other.name or self.name self.name = other.name or self.name
self.type = other.type or self.type self.type = other.type or self.type
@@ -262,14 +265,16 @@ class ChatStore:
self.their_avatar = other.their_avatar or self.their_avatar self.their_avatar = other.their_avatar or self.their_avatar
self.their_avatar_thumb = other.their_avatar_thumb or self.their_avatar_thumb self.their_avatar_thumb = other.their_avatar_thumb or self.their_avatar_thumb
self.status = other.status or self.status self.status = other.status or self.status
# Merge messages # Merge messages
self._messages.update(other._messages) self._messages.update(other._messages)
class Message: class Message:
""" """
Represents a single message in a chat. Represents a single message in a chat.
""" """
def __init__( def __init__(
self, self,
*, *,
@@ -301,7 +306,7 @@ class Message:
self.from_me = bool(from_me) self.from_me = bool(from_me)
self.timestamp = timestamp / 1000 if timestamp > 9999999999 else timestamp self.timestamp = timestamp / 1000 if timestamp > 9999999999 else timestamp
timing = Timing(timezone_offset) timing = Timing(timezone_offset)
if isinstance(time, (int, float)): if isinstance(time, (int, float)):
self.time = timing.format_timestamp(self.timestamp, "%H:%M") self.time = timing.format_timestamp(self.timestamp, "%H:%M")
elif isinstance(time, str): elif isinstance(time, str):
@@ -318,13 +323,15 @@ class Message:
self.mime = None self.mime = None
self.message_type = message_type self.message_type = message_type
if isinstance(received_timestamp, (int, float)): if isinstance(received_timestamp, (int, float)):
self.received_timestamp = timing.format_timestamp(received_timestamp, "%Y/%m/%d %H:%M") self.received_timestamp = timing.format_timestamp(
received_timestamp, "%Y/%m/%d %H:%M")
elif isinstance(received_timestamp, str): elif isinstance(received_timestamp, str):
self.received_timestamp = received_timestamp self.received_timestamp = received_timestamp
else: else:
self.received_timestamp = None self.received_timestamp = None
if isinstance(read_timestamp, (int, float)): if isinstance(read_timestamp, (int, float)):
self.read_timestamp = timing.format_timestamp(read_timestamp, "%Y/%m/%d %H:%M") self.read_timestamp = timing.format_timestamp(
read_timestamp, "%Y/%m/%d %H:%M")
elif isinstance(read_timestamp, str): elif isinstance(read_timestamp, str):
self.read_timestamp = read_timestamp self.read_timestamp = read_timestamp
else: else:
@@ -363,13 +370,13 @@ class Message:
@classmethod @classmethod
def from_json(cls, data: Dict) -> 'Message': def from_json(cls, data: Dict) -> 'Message':
message = cls( message = cls(
from_me = data["from_me"], from_me=data["from_me"],
timestamp = data["timestamp"], timestamp=data["timestamp"],
time = data["time"], time=data["time"],
key_id = data["key_id"], key_id=data["key_id"],
message_type = data.get("message_type"), message_type=data.get("message_type"),
received_timestamp = data.get("received_timestamp"), received_timestamp=data.get("received_timestamp"),
read_timestamp = data.get("read_timestamp") read_timestamp=data.get("read_timestamp")
) )
message.media = data.get("media") message.media = data.get("media")
message.meta = data.get("meta") message.meta = data.get("meta")

View File

@@ -18,6 +18,7 @@ except ImportError:
# < Python 3.11 # < Python 3.11
# This should be removed when the support for Python 3.10 ends. (31 Oct 2026) # This should be removed when the support for Python 3.10 ends. (31 Oct 2026)
from enum import Enum from enum import Enum
class StrEnum(str, Enum): class StrEnum(str, Enum):
pass pass
@@ -72,7 +73,7 @@ def bytes_to_readable(size_bytes: int) -> str:
A human-readable string representing the file size. A human-readable string representing the file size.
""" """
if size_bytes == 0: if size_bytes == 0:
return "0B" return "0B"
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB") size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
i = int(math.floor(math.log(size_bytes, 1024))) i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i) p = math.pow(1024, i)
@@ -100,7 +101,7 @@ def readable_to_bytes(size_str: str) -> int:
'TB': 1024**4, 'TB': 1024**4,
'PB': 1024**5, 'PB': 1024**5,
'EB': 1024**6, 'EB': 1024**6,
'ZB': 1024**7, 'ZB': 1024**7,
'YB': 1024**8 'YB': 1024**8
} }
size_str = size_str.upper().strip() size_str = size_str.upper().strip()
@@ -155,7 +156,8 @@ def check_update():
else: else:
with raw: with raw:
package_info = json.load(raw) package_info = json.load(raw)
latest_version = tuple(map(int, package_info["info"]["version"].split("."))) latest_version = tuple(
map(int, package_info["info"]["version"].split(".")))
__version__ = importlib.metadata.version("whatsapp_chat_exporter") __version__ = importlib.metadata.version("whatsapp_chat_exporter")
current_version = tuple(map(int, __version__.split("."))) current_version = tuple(map(int, __version__.split(".")))
if current_version < latest_version: if current_version < latest_version:
@@ -174,17 +176,17 @@ def check_update():
def rendering( def rendering(
output_file_name, output_file_name,
template, template,
name, name,
msgs, msgs,
contact, contact,
w3css, w3css,
chat, chat,
headline, headline,
next=False, next=False,
previous=False previous=False
): ):
if chat.their_avatar_thumb is None and chat.their_avatar is not None: if chat.their_avatar_thumb is None and chat.their_avatar is not None:
their_avatar_thumb = chat.their_avatar their_avatar_thumb = chat.their_avatar
else: else:
@@ -256,7 +258,8 @@ def import_from_json(json_file: str, data: Dict[str, ChatStore]):
message.sticker = msg.get("sticker") message.sticker = msg.get("sticker")
chat.add_message(id, message) chat.add_message(id, message)
data[jid] = chat data[jid] = chat
print(f"Importing chats from JSON...({index + 1}/{total_row_number})", end="\r") print(
f"Importing chats from JSON...({index + 1}/{total_row_number})", end="\r")
def incremental_merge(source_dir: str, target_dir: str, media_dir: str, pretty_print_json: int, avoid_encoding_json: bool): def incremental_merge(source_dir: str, target_dir: str, media_dir: str, pretty_print_json: int, avoid_encoding_json: bool):
@@ -273,39 +276,44 @@ def incremental_merge(source_dir: str, target_dir: str, media_dir: str, pretty_p
return return
print("JSON files found:", json_files) print("JSON files found:", json_files)
for json_file in json_files: for json_file in json_files:
source_path = os.path.join(source_dir, json_file) source_path = os.path.join(source_dir, json_file)
target_path = os.path.join(target_dir, json_file) target_path = os.path.join(target_dir, json_file)
if not os.path.exists(target_path): if not os.path.exists(target_path):
print(f"Copying '{json_file}' to target directory...") print(f"Copying '{json_file}' to target directory...")
os.makedirs(target_dir, exist_ok=True) os.makedirs(target_dir, exist_ok=True)
with open(source_path, 'rb') as src, open(target_path, 'wb') as dst: with open(source_path, 'rb') as src, open(target_path, 'wb') as dst:
dst.write(src.read()) dst.write(src.read())
else: else:
print(f"Merging '{json_file}' with existing file in target directory...") print(
f"Merging '{json_file}' with existing file in target directory...")
with open(source_path, 'r') as src_file, open(target_path, 'r') as tgt_file: with open(source_path, 'r') as src_file, open(target_path, 'r') as tgt_file:
source_data = json.load(src_file) source_data = json.load(src_file)
target_data = json.load(tgt_file) target_data = json.load(tgt_file)
# Parse JSON into ChatStore objects using from_json() # Parse JSON into ChatStore objects using from_json()
source_chats = {jid: ChatStore.from_json(chat) for jid, chat in source_data.items()} source_chats = {jid: ChatStore.from_json(
target_chats = {jid: ChatStore.from_json(chat) for jid, chat in target_data.items()} chat) for jid, chat in source_data.items()}
target_chats = {jid: ChatStore.from_json(
chat) for jid, chat in target_data.items()}
# Merge chats using merge_with() # Merge chats using merge_with()
for jid, chat in source_chats.items(): for jid, chat in source_chats.items():
if jid in target_chats: if jid in target_chats:
target_chats[jid].merge_with(chat) target_chats[jid].merge_with(chat)
else: else:
target_chats[jid] = chat target_chats[jid] = chat
# Serialize merged data # Serialize merged data
merged_data = {jid: chat.to_json() for jid, chat in target_chats.items()} merged_data = {jid: chat.to_json()
for jid, chat in target_chats.items()}
# Check if the merged data differs from the original target data # Check if the merged data differs from the original target data
if json.dumps(merged_data, sort_keys=True) != json.dumps(target_data, sort_keys=True): if json.dumps(merged_data, sort_keys=True) != json.dumps(target_data, sort_keys=True):
print(f"Changes detected in '{json_file}', updating target file...") print(
f"Changes detected in '{json_file}', updating target file...")
with open(target_path, 'w') as merged_file: with open(target_path, 'w') as merged_file:
json.dump( json.dump(
merged_data, merged_data,
@@ -314,12 +322,14 @@ def incremental_merge(source_dir: str, target_dir: str, media_dir: str, pretty_p
ensure_ascii=not avoid_encoding_json, ensure_ascii=not avoid_encoding_json,
) )
else: else:
print(f"No changes detected in '{json_file}', skipping update.") print(
f"No changes detected in '{json_file}', skipping update.")
# Merge media directories # Merge media directories
source_media_path = os.path.join(source_dir, media_dir) source_media_path = os.path.join(source_dir, media_dir)
target_media_path = os.path.join(target_dir, media_dir) target_media_path = os.path.join(target_dir, media_dir)
print(f"Merging media directories. Source: {source_media_path}, target: {target_media_path}") print(
f"Merging media directories. Source: {source_media_path}, target: {target_media_path}")
if os.path.exists(source_media_path): if os.path.exists(source_media_path):
for root, _, files in os.walk(source_media_path): for root, _, files in os.walk(source_media_path):
relative_path = os.path.relpath(root, source_media_path) relative_path = os.path.relpath(root, source_media_path)
@@ -411,23 +421,29 @@ def get_chat_condition(filter: Optional[List[str]], include: bool, columns: List
if filter is not None: if filter is not None:
conditions = [] conditions = []
if len(columns) < 2 and jid is not None: if len(columns) < 2 and jid is not None:
raise ValueError("There must be at least two elements in argument columns if jid is not None") raise ValueError(
"There must be at least two elements in argument columns if jid is not None")
if jid is not None: if jid is not None:
if platform == "android": if platform == "android":
is_group = f"{jid}.type == 1" is_group = f"{jid}.type == 1"
elif platform == "ios": elif platform == "ios":
is_group = f"{jid} IS NOT NULL" is_group = f"{jid} IS NOT NULL"
else: else:
raise ValueError("Only android and ios are supported for argument platform if jid is not None") raise ValueError(
"Only android and ios are supported for argument platform if jid is not None")
for index, chat in enumerate(filter): for index, chat in enumerate(filter):
if include: if include:
conditions.append(f"{' OR' if index > 0 else ''} {columns[0]} LIKE '%{chat}%'") conditions.append(
f"{' OR' if index > 0 else ''} {columns[0]} LIKE '%{chat}%'")
if len(columns) > 1: if len(columns) > 1:
conditions.append(f" OR ({columns[1]} LIKE '%{chat}%' AND {is_group})") conditions.append(
f" OR ({columns[1]} LIKE '%{chat}%' AND {is_group})")
else: else:
conditions.append(f"{' AND' if index > 0 else ''} {columns[0]} NOT LIKE '%{chat}%'") conditions.append(
f"{' AND' if index > 0 else ''} {columns[0]} NOT LIKE '%{chat}%'")
if len(columns) > 1: if len(columns) > 1:
conditions.append(f" AND ({columns[1]} NOT LIKE '%{chat}%' AND {is_group})") conditions.append(
f" AND ({columns[1]} NOT LIKE '%{chat}%' AND {is_group})")
return f"AND ({' '.join(conditions)})" return f"AND ({' '.join(conditions)})"
else: else:
return "" return ""
@@ -522,7 +538,7 @@ def determine_metadata(content: sqlite3.Row, init_msg: Optional[str]) -> Optiona
else: else:
msg = f"{old} changed their number to {new}" msg = f"{old} changed their number to {new}"
elif content["action_type"] == 46: elif content["action_type"] == 46:
return # Voice message in PM??? Seems no need to handle. return # Voice message in PM??? Seems no need to handle.
elif content["action_type"] == 47: elif content["action_type"] == 47:
msg = "The contact is an official business account" msg = "The contact is an official business account"
elif content["action_type"] == 50: elif content["action_type"] == 50:
@@ -539,7 +555,8 @@ def determine_metadata(content: sqlite3.Row, init_msg: Optional[str]) -> Optiona
elif content["action_type"] == 67: elif content["action_type"] == 67:
return # (PM) this contact use secure service from Facebook??? return # (PM) this contact use secure service from Facebook???
elif content["action_type"] == 69: elif content["action_type"] == 69:
return # (PM) this contact use secure service from Facebook??? What's the difference with 67???? # (PM) this contact use secure service from Facebook??? What's the difference with 67????
return
else: else:
return # Unsupported return # Unsupported
return msg return msg
@@ -566,7 +583,8 @@ def get_status_location(output_folder: str, offline_static: str) -> str:
w3css_path = os.path.join(static_folder, "w3.css") w3css_path = os.path.join(static_folder, "w3.css")
if not os.path.isfile(w3css_path): if not os.path.isfile(w3css_path):
with urllib.request.urlopen(w3css) as resp: with urllib.request.urlopen(w3css) as resp:
with open(w3css_path, "wb") as f: f.write(resp.read()) with open(w3css_path, "wb") as f:
f.write(resp.read())
w3css = os.path.join(offline_static, "w3.css") w3css = os.path.join(offline_static, "w3.css")
@@ -597,6 +615,7 @@ def setup_template(template: Optional[str], no_avatar: bool, experimental: bool
template_env.filters['sanitize_except'] = sanitize_except template_env.filters['sanitize_except'] = sanitize_except
return template_env.get_template(template_file) return template_env.get_template(template_file)
# iOS Specific # iOS Specific
APPLE_TIME = 978307200 APPLE_TIME = 978307200
@@ -617,23 +636,31 @@ def slugify(value: str, allow_unicode: bool = False) -> str:
if allow_unicode: if allow_unicode:
value = unicodedata.normalize('NFKC', value) value = unicodedata.normalize('NFKC', value)
else: else:
value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore').decode('ascii') value = unicodedata.normalize('NFKD', value).encode(
'ascii', 'ignore').decode('ascii')
value = re.sub(r'[^\w\s-]', '', value.lower()) value = re.sub(r'[^\w\s-]', '', value.lower())
return re.sub(r'[-\s]+', '-', value).strip('-_') return re.sub(r'[-\s]+', '-', value).strip('-_')
class WhatsAppIdentifier(StrEnum): class WhatsAppIdentifier(StrEnum):
MESSAGE = "7c7fba66680ef796b916b067077cc246adacf01d" # AppDomainGroup-group.net.whatsapp.WhatsApp.shared-ChatStorage.sqlite # AppDomainGroup-group.net.whatsapp.WhatsApp.shared-ChatStorage.sqlite
CONTACT = "b8548dc30aa1030df0ce18ef08b882cf7ab5212f" # AppDomainGroup-group.net.whatsapp.WhatsApp.shared-ContactsV2.sqlite MESSAGE = "7c7fba66680ef796b916b067077cc246adacf01d"
CALL = "1b432994e958845fffe8e2f190f26d1511534088" # AppDomainGroup-group.net.whatsapp.WhatsApp.shared-CallHistory.sqlite # AppDomainGroup-group.net.whatsapp.WhatsApp.shared-ContactsV2.sqlite
CONTACT = "b8548dc30aa1030df0ce18ef08b882cf7ab5212f"
# AppDomainGroup-group.net.whatsapp.WhatsApp.shared-CallHistory.sqlite
CALL = "1b432994e958845fffe8e2f190f26d1511534088"
DOMAIN = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared" DOMAIN = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared"
class WhatsAppBusinessIdentifier(StrEnum): class WhatsAppBusinessIdentifier(StrEnum):
MESSAGE = "724bd3b98b18518b455a87c1f3ac3a0d189c4466" # AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-ChatStorage.sqlite # AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-ChatStorage.sqlite
CONTACT = "d7246a707f51ddf8b17ee2dddabd9e0a4da5c552" # AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-ContactsV2.sqlite MESSAGE = "724bd3b98b18518b455a87c1f3ac3a0d189c4466"
CALL = "b463f7c4365eefc5a8723930d97928d4e907c603" # AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-CallHistory.sqlite # AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-ContactsV2.sqlite
DOMAIN = "AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared" CONTACT = "d7246a707f51ddf8b17ee2dddabd9e0a4da5c552"
# AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-CallHistory.sqlite
CALL = "b463f7c4365eefc5a8723930d97928d4e907c603"
DOMAIN = "AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared"
class JidType(IntEnum): class JidType(IntEnum):
PM = 0 PM = 0

View File

@@ -209,24 +209,24 @@ def test_incremental_merge_existing_file_with_changes(mock_filesystem):
source_dir = "/source" source_dir = "/source"
target_dir = "/target" target_dir = "/target"
media_dir = "media" media_dir = "media"
# Setup mock filesystem # Setup mock filesystem
mock_filesystem["exists"].side_effect = lambda x: True mock_filesystem["exists"].side_effect = lambda x: True
mock_filesystem["listdir"].return_value = ["chat.json"] mock_filesystem["listdir"].return_value = ["chat.json"]
# Mock file operations # Mock file operations
mock_file_content = { mock_file_content = {
"/source/chat.json": json.dumps(chat_data_2), "/source/chat.json": json.dumps(chat_data_2),
"/target/chat.json": json.dumps(chat_data_1), "/target/chat.json": json.dumps(chat_data_1),
} }
written_chunks = [] written_chunks = []
def mock_file_write(data): def mock_file_write(data):
written_chunks.append(data) written_chunks.append(data)
mock_write = MagicMock(side_effect=mock_file_write) mock_write = MagicMock(side_effect=mock_file_write)
with patch("builtins.open", mock_open()) as mock_file: with patch("builtins.open", mock_open()) as mock_file:
def mock_file_read(filename, mode="r"): def mock_file_read(filename, mode="r"):
content = mock_file_content.get(filename) content = mock_file_content.get(filename)
@@ -234,27 +234,27 @@ def test_incremental_merge_existing_file_with_changes(mock_filesystem):
if mode == 'w': if mode == 'w':
file_mock.write.side_effect = mock_write file_mock.write.side_effect = mock_write
return file_mock return file_mock
mock_file.side_effect = mock_file_read mock_file.side_effect = mock_file_read
# Run the function # Run the function
incremental_merge(source_dir, target_dir, media_dir, 2, True) incremental_merge(source_dir, target_dir, media_dir, 2, True)
# Verify file operations - both files opened in text mode when target exists # Verify file operations - both files opened in text mode when target exists
mock_file.assert_any_call("/source/chat.json", "r") mock_file.assert_any_call("/source/chat.json", "r")
mock_file.assert_any_call("/target/chat.json", "r") mock_file.assert_any_call("/target/chat.json", "r")
mock_file.assert_any_call("/target/chat.json", "w") mock_file.assert_any_call("/target/chat.json", "w")
# Verify write was called # Verify write was called
assert mock_write.called, "Write method was never called" assert mock_write.called, "Write method was never called"
# Combine chunks and parse JSON # Combine chunks and parse JSON
written_data = json.loads(''.join(written_chunks)) written_data = json.loads(''.join(written_chunks))
# Verify the merged data is correct # Verify the merged data is correct
assert written_data is not None, "No data was written" assert written_data is not None, "No data was written"
assert written_data == chat_data_merged, "Merged data does not match expected result" assert written_data == chat_data_merged, "Merged data does not match expected result"
# Verify specific message retention # Verify specific message retention
messages = written_data["12345678@s.whatsapp.net"]["messages"] messages = written_data["12345678@s.whatsapp.net"]["messages"]
assert "24690" in messages, "Common message should be present" assert "24690" in messages, "Common message should be present"
@@ -292,7 +292,8 @@ def test_incremental_merge_existing_file_no_changes(mock_filesystem):
incremental_merge(source_dir, target_dir, media_dir, 2, True) incremental_merge(source_dir, target_dir, media_dir, 2, True)
# Verify no write operations occurred on target file # Verify no write operations occurred on target file
write_calls = [call for call in mock_file.mock_calls if call[0] == "().write"] write_calls = [
call for call in mock_file.mock_calls if call[0] == "().write"]
assert len(write_calls) == 0 assert len(write_calls) == 0
@@ -333,4 +334,5 @@ def test_incremental_merge_media_copy(mock_filesystem):
assert ( assert (
mock_filesystem["makedirs"].call_count >= 2 mock_filesystem["makedirs"].call_count >= 2
) # At least target dir and media dir ) # At least target dir and media dir
assert mock_filesystem["copy2"].call_count == 2 # Two media files copied # Two media files copied
assert mock_filesystem["copy2"].call_count == 2