diff --git a/Whatsapp_Chat_Exporter/__main__.py b/Whatsapp_Chat_Exporter/__main__.py index 051f166..1784723 100644 --- a/Whatsapp_Chat_Exporter/__main__.py +++ b/Whatsapp_Chat_Exporter/__main__.py @@ -18,6 +18,9 @@ from argparse import ArgumentParser, SUPPRESS from datetime import datetime from getpass import getpass from sys import exit +from typing import Tuple, Optional, List, Dict, Any, Union + +# Try to import vobject for contacts processing try: import vobject except ModuleNotFoundError: @@ -27,311 +30,212 @@ else: vcards_deps_installed = True -def main(): +def setup_argument_parser() -> ArgumentParser: + """Set up and return the argument parser with all options.""" parser = ArgumentParser( - description = 'A customizable Android and iOS/iPadOS WhatsApp database parser that ' - 'will give you the history of your WhatsApp conversations in HTML ' - 'and JSON. Android Backup Crypt12, Crypt14 and Crypt15 supported.', - epilog = f'WhatsApp Chat Exporter: {importlib.metadata.version("whatsapp_chat_exporter")} Licensed with MIT. See ' - 'https://wts.knugi.dev/docs?dest=osl for all open source licenses.' + description='A customizable Android and iOS/iPadOS WhatsApp database parser that ' + 'will give you the history of your WhatsApp conversations in HTML ' + 'and JSON. Android Backup Crypt12, Crypt14 and Crypt15 supported.', + epilog=f'WhatsApp Chat Exporter: {importlib.metadata.version("whatsapp_chat_exporter")} Licensed with MIT. See ' + 'https://wts.knugi.dev/docs?dest=osl for all open source licenses.' ) - parser.add_argument( - '-a', - '--android', - dest='android', - default=False, - action='store_true', - help="Define the target as Android") - parser.add_argument( - '-i', - '--ios', - dest='ios', - default=False, - action='store_true', - help="Define the target as iPhone/iPad") - parser.add_argument( - "-e", - "--exported", - dest="exported", - default=None, + + # Device type arguments + device_group = parser.add_argument_group('Device Type') + device_group.add_argument( + '-a', '--android', dest='android', default=False, action='store_true', + help="Define the target as Android" + ) + device_group.add_argument( + '-i', '--ios', dest='ios', default=False, action='store_true', + help="Define the target as iPhone/iPad" + ) + device_group.add_argument( + "-e", "--exported", dest="exported", default=None, help="Define the target as exported chat file and specify the path to the file" ) - parser.add_argument( - "-w", - "--wa", - dest="wa", - default=None, - help="Path to contact database (default: wa.db/ContactsV2.sqlite)") - parser.add_argument( - "-m", - "--media", - dest="media", - default=None, - help="Path to WhatsApp media folder (default: WhatsApp)") - parser.add_argument( - "-b", - "--backup", - dest="backup", - default=None, - help="Path to Android (must be used together " - "with -k)/iOS WhatsApp backup") - parser.add_argument( - "-o", - "--output", - dest="output", - default="result", - help="Output to specific directory (default: result)") - parser.add_argument( - '-j', - '--json', - dest='json', - nargs='?', - default=None, - type=str, - const="result.json", - help="Save the result to a single JSON file (default if present: result.json)") - parser.add_argument( - '--avoid-encoding-json', - dest='avoid_encoding_json', - default=False, - action='store_true', - help="Don't encode non-ascii characters in the output JSON files") - parser.add_argument( - '--pretty-print-json', - dest='pretty_print_json', - default=None, - nargs='?', - const=2, - type=int, - help="Pretty print the output JSON.") - parser.add_argument( - '-d', - '--db', - dest='db', - default=None, - help="Path to database file (default: msgstore.db/" - "7c7fba66680ef796b916b067077cc246adacf01d)") - parser.add_argument( - '-k', - '--key', - dest='key', - default=None, - nargs='?', + + # Input file paths + input_group = parser.add_argument_group('Input Files') + input_group.add_argument( + "-w", "--wa", dest="wa", default=None, + help="Path to contact database (default: wa.db/ContactsV2.sqlite)" + ) + input_group.add_argument( + "-m", "--media", dest="media", default=None, + help="Path to WhatsApp media folder (default: WhatsApp)" + ) + input_group.add_argument( + "-b", "--backup", dest="backup", default=None, + help="Path to Android (must be used together with -k)/iOS WhatsApp backup" + ) + input_group.add_argument( + "-d", "--db", dest="db", default=None, + help="Path to database file (default: msgstore.db/7c7fba66680ef796b916b067077cc246adacf01d)" + ) + input_group.add_argument( + "-k", "--key", dest="key", default=None, nargs='?', help="Path to key file. If this option is set for crypt15 backup but nothing is specified, you will be prompted to enter the key." ) - parser.add_argument( - "-t", - "--template", - dest="template", - default=None, - help="Path to custom HTML template" + input_group.add_argument( + "--call-db", dest="call_db_ios", nargs='?', default=None, type=str, + const="1b432994e958845fffe8e2f190f26d1511534088", + help="Path to call database (default: 1b432994e958845fffe8e2f190f26d1511534088) iOS only" ) - parser.add_argument( - "--embedded", - dest="embedded", - default=False, - action='store_true', - help=SUPPRESS or "Embed media into HTML file (not yet implemented)" - ) - parser.add_argument( - "-s", - "--showkey", - dest="showkey", - default=False, - action='store_true', - help="Show the HEX key used to decrypt the database" - ) - parser.add_argument( - "-c", - "--move-media", - dest="move_media", - default=False, - action='store_true', - help="Move the media directory to output directory if the flag is set, otherwise copy it" - ) - parser.add_argument( - "--offline", - dest="offline", - default=None, - help="Relative path to offline static files" - ) - parser.add_argument( - "--size", - "--output-size", - "--split", - dest="size", - nargs='?', - const=0, - default=None, - help="Maximum (rough) size of a single output file in bytes, 0 for auto" - ) - parser.add_argument( - "--no-html", - dest="no_html", - default=False, - action='store_true', - help="Do not output html files" - ) - parser.add_argument( - "--check-update", - dest="check_update", - default=False, - action='store_true', - help="Check for updates (require Internet access)" - ) - parser.add_argument( - "--assume-first-as-me", - dest="assume_first_as_me", - default=False, - action='store_true', - help="Assume the first message in a chat as sent by me (must be used together with -e)" - ) - parser.add_argument( - "--no-avatar", - dest="no_avatar", - default=False, - action='store_true', - help="Do not render avatar in HTML output" - ) - parser.add_argument( - "--import", - dest="import_json", - default=False, - action='store_true', - help="Import JSON file and convert to HTML output" - ) - parser.add_argument( - "--business", - dest="business", - default=False, - action='store_true', - help="Use Whatsapp Business default files (iOS only)" - ) - parser.add_argument( - "--wab", - "--wa-backup", - dest="wab", - default=None, + input_group.add_argument( + "--wab", "--wa-backup", dest="wab", default=None, help="Path to contact database in crypt15 format" ) - parser.add_argument( - "--time-offset", - dest="timezone_offset", - default=0, - type=int, - choices=range(-12, 15), - metavar="{-12 to 14}", - help="Offset in hours (-12 to 14) for time displayed in the output" + + # Output options + output_group = parser.add_argument_group('Output Options') + output_group.add_argument( + "-o", "--output", dest="output", default="result", + help="Output to specific directory (default: result)" ) - parser.add_argument( - "--date", - dest="filter_date", - default=None, - metavar="DATE", + output_group.add_argument( + '-j', '--json', dest='json', nargs='?', default=None, type=str, const="result.json", + help="Save the result to a single JSON file (default if present: result.json)" + ) + output_group.add_argument( + "--txt", dest="text_format", nargs='?', default=None, type=str, const="result", + help="Export chats in text format similar to what WhatsApp officially provided (default if present: result/)" + ) + output_group.add_argument( + "--no-html", dest="no_html", default=False, action='store_true', + help="Do not output html files" + ) + output_group.add_argument( + "--size", "--output-size", "--split", dest="size", nargs='?', const=0, default=None, + help="Maximum (rough) size of a single output file in bytes, 0 for auto" + ) + + # JSON formatting options + json_group = parser.add_argument_group('JSON Options') + json_group.add_argument( + '--avoid-encoding-json', dest='avoid_encoding_json', default=False, action='store_true', + help="Don't encode non-ascii characters in the output JSON files" + ) + json_group.add_argument( + '--pretty-print-json', dest='pretty_print_json', default=None, nargs='?', const=2, type=int, + help="Pretty print the output JSON." + ) + json_group.add_argument( + "--per-chat", dest="json_per_chat", default=False, action='store_true', + help="Output the JSON file per chat" + ) + json_group.add_argument( + "--import", dest="import_json", default=False, action='store_true', + help="Import JSON file and convert to HTML output" + ) + + # HTML options + html_group = parser.add_argument_group('HTML Options') + html_group.add_argument( + "-t", "--template", dest="template", default=None, + help="Path to custom HTML template" + ) + html_group.add_argument( + "--embedded", dest="embedded", default=False, action='store_true', + help=SUPPRESS or "Embed media into HTML file (not yet implemented)" + ) + html_group.add_argument( + "--offline", dest="offline", default=None, + help="Relative path to offline static files" + ) + html_group.add_argument( + "--no-avatar", dest="no_avatar", default=False, action='store_true', + help="Do not render avatar in HTML output" + ) + html_group.add_argument( + "--experimental-new-theme", dest="whatsapp_theme", default=False, action='store_true', + help="Use the newly designed WhatsApp-alike theme" + ) + html_group.add_argument( + "--headline", dest="headline", default="Chat history with ??", + help="The custom headline for the HTML output. Use '??' as a placeholder for the chat name" + ) + + # Media handling + media_group = parser.add_argument_group('Media Handling') + media_group.add_argument( + "-c", "--move-media", dest="move_media", default=False, action='store_true', + help="Move the media directory to output directory if the flag is set, otherwise copy it" + ) + media_group.add_argument( + "--create-separated-media", dest="separate_media", default=False, action='store_true', + help="Create a copy of the media seperated per chat in /separated/ directory" + ) + + # Filtering options + filter_group = parser.add_argument_group('Filtering Options') + filter_group.add_argument( + "--time-offset", dest="timezone_offset", default=0, type=int, choices=range(-12, 15), + metavar="{-12 to 14}", help="Offset in hours (-12 to 14) for time displayed in the output" + ) + filter_group.add_argument( + "--date", dest="filter_date", default=None, metavar="DATE", help="The date filter in specific format (inclusive)" ) - parser.add_argument( - "--date-format", - dest="filter_date_format", - default="%Y-%m-%d %H:%M", - metavar="FORMAT", + filter_group.add_argument( + "--date-format", dest="filter_date_format", default="%Y-%m-%d %H:%M", metavar="FORMAT", help="The date format for the date filter" ) - parser.add_argument( - "--include", - dest="filter_chat_include", - nargs='*', - metavar="phone number", + filter_group.add_argument( + "--include", dest="filter_chat_include", nargs='*', metavar="phone number", help="Include chats that match the supplied phone number" ) - parser.add_argument( - "--exclude", - dest="filter_chat_exclude", - nargs='*', - metavar="phone number", + filter_group.add_argument( + "--exclude", dest="filter_chat_exclude", nargs='*', metavar="phone number", help="Exclude chats that match the supplied phone number" ) - parser.add_argument( - "--dont-filter-empty", - dest="filter_empty", - default=True, - action='store_false', + filter_group.add_argument( + "--dont-filter-empty", dest="filter_empty", default=True, action='store_false', help=("By default, the exporter will not render chats with no valid message. " "Setting this flag will cause the exporter to render those. " "This is useful if chat(s) are missing from the output") ) - parser.add_argument( - "--per-chat", - dest="json_per_chat", - default=False, - action='store_true', - help="Output the JSON file per chat" - ) - parser.add_argument( - "--create-separated-media", - dest="separate_media", - default=False, - action='store_true', - help="Create a copy of the media seperated per chat in /separated/ directory" - ) - parser.add_argument( - "--decrypt-chunk-size", - dest="decrypt_chunk_size", - default=1 * 1024 * 1024, - type=int, - help="Specify the chunk size for decrypting iOS backup, which may affect the decryption speed." - ) - parser.add_argument( - "--enrich-from-vcards", - dest="enrich_from_vcards", - default=None, + + # Contact enrichment + contact_group = parser.add_argument_group('Contact Enrichment') + contact_group.add_argument( + "--enrich-from-vcards", dest="enrich_from_vcards", default=None, help="Path to an exported vcf file from Google contacts export. Add names missing from WhatsApp's default database" ) - parser.add_argument( - "--default-country-code", - dest="default_contry_code", - default=None, + contact_group.add_argument( + "--default-country-code", dest="default_country_code", default=None, help="Use with --enrich-from-vcards. When numbers in the vcf file does not have a country code, this will be used. 1 is for US, 66 for Thailand etc. Most likely use the number of your own country" ) - parser.add_argument( - "--txt", - dest="text_format", - nargs='?', - default=None, - type=str, - const="result", - help="Export chats in text format similar to what WhatsApp officially provided (default if present: result/)" + + # Miscellaneous + misc_group = parser.add_argument_group('Miscellaneous') + misc_group.add_argument( + "-s", "--showkey", dest="showkey", default=False, action='store_true', + help="Show the HEX key used to decrypt the database" ) - parser.add_argument( - "--experimental-new-theme", - dest="whatsapp_theme", - default=False, - action='store_true', - help="Use the newly designed WhatsApp-alike theme" + misc_group.add_argument( + "--check-update", dest="check_update", default=False, action='store_true', + help="Check for updates (require Internet access)" ) - parser.add_argument( - "--call-db", - dest="call_db_ios", - nargs='?', - default=None, - type=str, - const="1b432994e958845fffe8e2f190f26d1511534088", - help="Path to call database (default: 1b432994e958845fffe8e2f190f26d1511534088) iOS only" + misc_group.add_argument( + "--assume-first-as-me", dest="assume_first_as_me", default=False, action='store_true', + help="Assume the first message in a chat as sent by me (must be used together with -e)" ) - parser.add_argument( - "--headline", - dest="headline", - default="Chat history with ??", - help="The custom headline for the HTML output. Use '??' as a placeholder for the chat name" + misc_group.add_argument( + "--business", dest="business", default=False, action='store_true', + help="Use Whatsapp Business default files (iOS only)" ) + misc_group.add_argument( + "--decrypt-chunk-size", dest="decrypt_chunk_size", default=1 * 1024 * 1024, type=int, + help="Specify the chunk size for decrypting iOS backup, which may affect the decryption speed." + ) + + return parser - args = parser.parse_args() - # Check for updates - if args.check_update: - exit(check_update()) - - # Sanity checks +def validate_args(parser: ArgumentParser, args) -> None: + """Validate command line arguments and modify them if needed.""" + # Basic validation checks if args.android and args.ios and args.exported and args.import_json: parser.error("You must define only one device type.") if not args.android and not args.ios and not args.exported and not args.import_json: @@ -346,258 +250,410 @@ def main(): parser.error("WhatsApp Business is only available on iOS for now.") if "??" not in args.headline: parser.error("--headline must contain '??' for replacement.") - if args.json_per_chat and ( - (args.json[-5:] != ".json" and os.path.isfile(args.json)) or \ - (args.json[-5:] == ".json" and os.path.isfile(args.json[:-5])) + + # JSON validation + if args.json_per_chat and args.json and ( + (args.json.endswith(".json") and os.path.isfile(args.json)) or + (not args.json.endswith(".json") and os.path.isfile(args.json)) ): parser.error("When --per-chat is enabled, the destination of --json must be a directory.") - if args.enrich_from_vcards is not None and args.default_contry_code is None: + + # vCards validation + if args.enrich_from_vcards is not None and args.default_country_code is None: parser.error("When --enrich-from-vcards is provided, you must also set --default-country-code") + + # Size validation if args.size is not None and not isinstance(args.size, int) and not args.size.isnumeric(): try: args.size = readable_to_bytes(args.size) except ValueError: parser.error("The value for --split must be ended in pure bytes or with a proper unit (e.g., 1048576 or 1MB)") + + # Date filter validation and processing if args.filter_date is not None: - if " - " in args.filter_date: - start, end = args.filter_date.split(" - ") - start = int(datetime.strptime(start, args.filter_date_format).timestamp()) - end = int(datetime.strptime(end, args.filter_date_format).timestamp()) - if start < 1009843200 or end < 1009843200: - parser.error("WhatsApp was first released in 2009...") - if start > end: - parser.error("The start date cannot be a moment after the end date.") - if args.android: - args.filter_date = f"BETWEEN {start}000 AND {end}000" - elif args.ios: - args.filter_date = f"BETWEEN {start - APPLE_TIME} AND {end - APPLE_TIME}" - else: - _timestamp = int(datetime.strptime(args.filter_date[2:], args.filter_date_format).timestamp()) - if _timestamp < 1009843200: - parser.error("WhatsApp was first released in 2009...") - if args.filter_date[:2] == "> ": - if args.android: - args.filter_date = f">= {_timestamp}000" - elif args.ios: - args.filter_date = f">= {_timestamp - APPLE_TIME}" - elif args.filter_date[:2] == "< ": - if args.android: - args.filter_date = f"<= {_timestamp}000" - elif args.ios: - args.filter_date = f"<= {_timestamp - APPLE_TIME}" - else: - parser.error("Unsupported date format. See https://wts.knugi.dev/docs?dest=date") + process_date_filter(parser, args) + + # Crypt15 key validation if args.key is None and args.backup is not None and args.backup.endswith("crypt15"): args.key = getpass("Enter your encryption key: ") + + # Theme validation if args.whatsapp_theme: args.template = "whatsapp_new.html" + + # Chat filter validation if args.filter_chat_include is not None and args.filter_chat_exclude is not None: parser.error("Chat inclusion and exclusion filters cannot be used together.") - if args.filter_chat_include is not None: - for chat in args.filter_chat_include: + + validate_chat_filters(parser, args.filter_chat_include) + validate_chat_filters(parser, args.filter_chat_exclude) + + +def validate_chat_filters(parser: ArgumentParser, chat_filter: Optional[List[str]]) -> None: + """Validate chat filters to ensure they contain only phone numbers.""" + if chat_filter is not None: + for chat in chat_filter: if not chat.isnumeric(): parser.error("Enter a phone number in the chat filter. See https://wts.knugi.dev/docs?dest=chat") - if args.filter_chat_exclude is not None: - for chat in args.filter_chat_exclude: - if not chat.isnumeric(): - parser.error("Enter a phone number in the chat filter. See https://wts.knugi.dev/docs?dest=chat") - filter_chat = (args.filter_chat_include, args.filter_chat_exclude) - data = ChatCollection() +def process_date_filter(parser: ArgumentParser, args) -> None: + """Process and validate date filter arguments.""" + if " - " in args.filter_date: + start, end = args.filter_date.split(" - ") + start = int(datetime.strptime(start, args.filter_date_format).timestamp()) + end = int(datetime.strptime(end, args.filter_date_format).timestamp()) + + if start < 1009843200 or end < 1009843200: + parser.error("WhatsApp was first released in 2009...") + if start > end: + parser.error("The start date cannot be a moment after the end date.") + + if args.android: + args.filter_date = f"BETWEEN {start}000 AND {end}000" + elif args.ios: + args.filter_date = f"BETWEEN {start - APPLE_TIME} AND {end - APPLE_TIME}" + else: + process_single_date_filter(parser, args) + + +def process_single_date_filter(parser: ArgumentParser, args) -> None: + """Process single date comparison filters.""" + if len(args.filter_date) < 3: + parser.error("Unsupported date format. See https://wts.knugi.dev/docs?dest=date") + + _timestamp = int(datetime.strptime(args.filter_date[2:], args.filter_date_format).timestamp()) + + if _timestamp < 1009843200: + parser.error("WhatsApp was first released in 2009...") + + if args.filter_date[:2] == "> ": + if args.android: + args.filter_date = f">= {_timestamp}000" + elif args.ios: + args.filter_date = f">= {_timestamp - APPLE_TIME}" + elif args.filter_date[:2] == "< ": + if args.android: + args.filter_date = f"<= {_timestamp}000" + elif args.ios: + args.filter_date = f"<= {_timestamp - APPLE_TIME}" + else: + parser.error("Unsupported date format. See https://wts.knugi.dev/docs?dest=date") + + +def setup_contact_store(args) -> Optional['ContactsFromVCards']: + """Set up and return a contact store if needed.""" if args.enrich_from_vcards is not None: if not vcards_deps_installed: - parser.error( + print( "You don't have the dependency to enrich contacts with vCard.\n" "Read more on how to deal with enriching contacts:\n" "https://github.com/KnugiHK/Whatsapp-Chat-Exporter/blob/main/README.md#usage" ) + exit(1) contact_store = ContactsFromVCards() - contact_store.load_vcf_file(args.enrich_from_vcards, args.default_contry_code) + contact_store.load_vcf_file(args.enrich_from_vcards, args.default_country_code) + return contact_store + return None + +def decrypt_android_backup(args) -> int: + """Decrypt Android backup files and return error code.""" + if args.key is None or args.backup is None: + print("You must specify the backup file with -b and a key with -k") + return 1 + + print("Decryption key specified, decrypting WhatsApp backup...") + + # Determine crypt type + if "crypt12" in args.backup: + crypt = Crypt.CRYPT12 + elif "crypt14" in args.backup: + crypt = Crypt.CRYPT14 + elif "crypt15" in args.backup: + crypt = Crypt.CRYPT15 + else: + print("Unknown backup format. The backup file must be crypt12, crypt14 or crypt15.") + return 1 + + # Get key + keyfile_stream = False + if not os.path.isfile(args.key) and all(char in string.hexdigits for char in args.key.replace(" ", "")): + key = bytes.fromhex(args.key.replace(" ", "")) + else: + key = open(args.key, "rb") + keyfile_stream = True + + # Read backup + db = open(args.backup, "rb").read() + + # Process WAB if provided + error_wa = 0 + if args.wab: + wab = open(args.wab, "rb").read() + error_wa = android_crypt.decrypt_backup( + wab, + key, + args.wa, + crypt, + args.showkey, + DbType.CONTACT, + keyfile_stream=keyfile_stream + ) + if isinstance(key, io.IOBase): + key.seek(0) + + # Decrypt message database + error_message = android_crypt.decrypt_backup( + db, + key, + args.db, + crypt, + args.showkey, + DbType.MESSAGE, + keyfile_stream=keyfile_stream + ) + + # Handle errors + if error_wa != 0: + return error_wa + return error_message + + +def handle_decrypt_error(error: int) -> None: + """Handle decryption errors with appropriate messages.""" + if error == 1: + print("Dependencies of decrypt_backup and/or extract_encrypted_key" + " are not present. For details, see README.md.") + exit(3) + elif error == 2: + print("Failed when decompressing the decrypted backup. " + "Possibly incorrect offsets used in decryption.") + exit(4) + else: + print("Unknown error occurred.", error) + exit(5) + + +def process_contacts(args, data: ChatCollection, contact_store=None) -> None: + """Process contacts from the database.""" + contact_db = args.wa if args.wa else "wa.db" if args.android else "ContactsV2.sqlite" + + if os.path.isfile(contact_db): + with sqlite3.connect(contact_db) as db: + db.row_factory = sqlite3.Row + if args.android: + android_handler.contacts(db, data, args.enrich_from_vcards) + else: + ios_handler.contacts(db, data) + + +def process_messages(args, data: ChatCollection) -> None: + """Process messages, media and vcards from the database.""" + msg_db = args.db if args.db else "msgstore.db" if args.android else args.identifiers.MESSAGE + + if not os.path.isfile(msg_db): + print( + "The message database does not exist. You may specify the path " + "to database file with option -d or check your provided path." + ) + exit(6) + + filter_chat = (args.filter_chat_include, args.filter_chat_exclude) + + with sqlite3.connect(msg_db) as db: + db.row_factory = sqlite3.Row + + # Process messages + if args.android: + message_handler = android_handler + else: + message_handler = ios_handler + + message_handler.messages( + db, data, args.media, args.timezone_offset, + args.filter_date, filter_chat, args.filter_empty + ) + + # Process media + message_handler.media( + db, data, args.media, args.filter_date, + filter_chat, args.filter_empty, args.separate_media + ) + + # Process vcards + message_handler.vcard( + db, data, args.media, args.filter_date, + filter_chat, args.filter_empty + ) + + # Process calls + process_calls(args, db, data, filter_chat) + + +def process_calls(args, db, data: ChatCollection, filter_chat) -> None: + """Process call history if available.""" if args.android: - contacts = android_handler.contacts - messages = android_handler.messages - media = android_handler.media - vcard = android_handler.vcard - create_html = android_handler.create_html - if args.db is None: - msg_db = "msgstore.db" + android_handler.calls(db, data, args.timezone_offset, filter_chat) + elif args.ios and args.call_db_ios is not None: + with sqlite3.connect(args.call_db_ios) as cdb: + cdb.row_factory = sqlite3.Row + ios_handler.calls(cdb, data, args.timezone_offset, filter_chat) + + +def handle_media_directory(args) -> None: + """Handle media directory copying or moving.""" + if os.path.isdir(args.media): + media_path = os.path.join(args.output, args.media) + + if os.path.isdir(media_path): + print("\nWhatsApp directory already exists in output directory. Skipping...", end="\n") else: - msg_db = args.db - if args.wa is None: - contact_db = "wa.db" + if args.move_media: + try: + print("\nMoving media directory...", end="\n") + shutil.move(args.media, f"{args.output}/") + except PermissionError: + print("\nCannot remove original WhatsApp directory. " + "Perhaps the directory is opened?", end="\n") + else: + print("\nCopying media directory...", end="\n") + shutil.copytree(args.media, media_path) + + +def create_output_files(args, data: ChatCollection, contact_store=None) -> None: + """Create output files in the specified formats.""" + # Create HTML files if requested + if not args.no_html: + # Enrich from vcards if available + if contact_store and not contact_store.is_empty(): + contact_store.enrich_from_vcards(data) + + android_handler.create_html( + data, + args.output, + args.template, + args.embedded, + args.offline, + args.size, + args.no_avatar, + args.whatsapp_theme, + args.headline + ) + + # Create text files if requested + if args.text_format: + print("Writing text file...") + android_handler.create_txt(data, args.text_format) + + # Create JSON files if requested + if args.json and not args.import_json: + export_json(args, data, contact_store) + + +def export_json(args, data: ChatCollection, contact_store=None) -> None: + """Export data to JSON format.""" + # Enrich from vcards if available + if contact_store and not contact_store.is_empty(): + contact_store.enrich_from_vcards(data) + + # Convert ChatStore objects to JSON + if isinstance(data.get(next(iter(data), None)), ChatStore): + data = {jik: chat.to_json() for jik, chat in data.items()} + + # Export as a single file or per chat + if not args.json_per_chat: + export_single_json(args, data) + else: + export_multiple_json(args, data) + + +def export_single_json(args, data: Dict) -> None: + """Export data to a single JSON file.""" + with open(args.json, "w") as f: + json_data = json.dumps( + data, + ensure_ascii=not args.avoid_encoding_json, + indent=args.pretty_print_json + ) + print(f"\nWriting JSON file...({bytes_to_readable(len(json_data))})") + f.write(json_data) + + +def export_multiple_json(args, data: Dict) -> None: + """Export data to multiple JSON files, one per chat.""" + # Adjust output path if needed + json_path = args.json[:-5] if args.json.endswith(".json") else args.json + + # Create directory if it doesn't exist + if not os.path.isdir(json_path): + os.makedirs(json_path, exist_ok=True) + + # Export each chat + total = len(data.keys()) + for index, jik in enumerate(data.keys()): + if data[jik]["name"] is not None: + contact = data[jik]["name"].replace('/', '') else: - contact_db = args.wa - if args.key is not None: - if args.backup is None: - print("You must specify the backup file with -b") - exit(1) - print("Decryption key specified, decrypting WhatsApp backup...") - if "crypt12" in args.backup: - crypt = Crypt.CRYPT12 - elif "crypt14" in args.backup: - crypt = Crypt.CRYPT14 - elif "crypt15" in args.backup: - crypt = Crypt.CRYPT15 - else: - print("Unknown backup format. The backup file must be crypt12, crypt14 or crypt15.") - exit(1) - if not os.path.isfile(args.key) and all(char in string.hexdigits for char in args.key.replace(" ", "")): - key = bytes.fromhex(args.key.replace(" ", "")) - keyfile_stream = False - else: - key = open(args.key, "rb") - keyfile_stream = True - db = open(args.backup, "rb").read() - if args.wab: - wab = open(args.wab, "rb").read() - error_wa = android_crypt.decrypt_backup( - wab, - key, - contact_db, - crypt, - args.showkey, - DbType.CONTACT, - keyfile_stream=keyfile_stream - ) - if isinstance(key, io.IOBase): - key.seek(0) - else: - error_wa = 0 - error_message = android_crypt.decrypt_backup( - db, - key, - msg_db, - crypt, - args.showkey, - DbType.MESSAGE, - keyfile_stream=keyfile_stream + contact = jik.replace('+', '') + + with open(f"{json_path}/{sanitize_filename(contact)}.json", "w") as f: + file_content = json.dumps( + {jik: data[jik]}, + ensure_ascii=not args.avoid_encoding_json, + indent=args.pretty_print_json ) - if error_wa != 0: - error = error_wa - elif error_message != 0: - error = error_message - else: - error = 0 - if error != 0: - if error == 1: - print("Dependencies of decrypt_backup and/or extract_encrypted_key" - " are not present. For details, see README.md.") - exit(3) - elif error == 2: - print("Failed when decompressing the decrypted backup. " - "Possibly incorrect offsets used in decryption.") - exit(4) - else: - print("Unknown error occurred.", error) - exit(5) - if args.media is None: - args.media = "WhatsApp" + f.write(file_content) + print(f"Writing JSON file...({index + 1}/{total})", end="\r") + print() - if os.path.isfile(contact_db): - with sqlite3.connect(contact_db) as db: - db.row_factory = sqlite3.Row - contacts(db, data, args.enrich_from_vcards) - elif args.ios: - contacts = ios_handler.contacts - messages = ios_handler.messages - media = ios_handler.media - vcard = ios_handler.vcard - create_html = android_handler.create_html - if args.business: - from Whatsapp_Chat_Exporter.utility import WhatsAppBusinessIdentifier as identifiers - else: - from Whatsapp_Chat_Exporter.utility import WhatsAppIdentifier as identifiers - if args.media is None: - args.media = identifiers.DOMAIN - if args.backup is not None: - if not os.path.isdir(args.media): - ios_media_handler.extract_media(args.backup, identifiers, args.decrypt_chunk_size) - else: - print("WhatsApp directory already exists, skipping WhatsApp file extraction.") - if args.db is None: - msg_db = identifiers.MESSAGE - else: - msg_db = args.db - if args.wa is None: - contact_db = "ContactsV2.sqlite" - else: - contact_db = args.wa - if os.path.isfile(contact_db): - with sqlite3.connect(contact_db) as db: - db.row_factory = sqlite3.Row - contacts(db, data) - if not args.exported and not args.import_json: - if os.path.isfile(msg_db): - with sqlite3.connect(msg_db) as db: - db.row_factory = sqlite3.Row - messages(db, data, args.media, args.timezone_offset, args.filter_date, filter_chat, args.filter_empty) - media(db, data, args.media, args.filter_date, filter_chat, args.filter_empty, args.separate_media) - vcard(db, data, args.media, args.filter_date, filter_chat, args.filter_empty) - if args.android: - android_handler.calls(db, data, args.timezone_offset, filter_chat) - elif args.ios and args.call_db_ios is not None: - with sqlite3.connect(args.call_db_ios) as cdb: - cdb.row_factory = sqlite3.Row - ios_handler.calls(cdb, data, args.timezone_offset, filter_chat) - if not args.no_html: - if args.enrich_from_vcards is not None and not contact_store.is_empty(): - contact_store.enrich_from_vcards(data) +def process_exported_chat(args, data: ChatCollection) -> None: + """Process an exported chat file.""" + exported_handler.messages(args.exported, data, args.assume_first_as_me) + + if not args.no_html: + android_handler.create_html( + data, + args.output, + args.template, + args.embedded, + args.offline, + args.size, + args.no_avatar, + args.whatsapp_theme, + args.headline + ) + + # Copy files to output directory + for file in glob.glob(r'*.*'): + shutil.copy(file, args.output) - create_html( - data, - args.output, - args.template, - args.embedded, - args.offline, - args.size, - args.no_avatar, - args.whatsapp_theme, - args.headline - ) - else: - print( - "The message database does not exist. You may specify the path " - "to database file with option -d or check your provided path." - ) - exit(6) - if os.path.isdir(args.media): - media_path = os.path.join(args.output, args.media) - if os.path.isdir(media_path): - print("\nWhatsApp directory already exists in output directory. Skipping...", end="\n") - else: - if not args.move_media: - if os.path.isdir(media_path): - print("\nWhatsApp directory already exists in output directory. Skipping...", end="\n") - else: - print("\nCopying media directory...", end="\n") - shutil.copytree(args.media, media_path) - else: - try: - shutil.move(args.media, f"{args.output}/") - except PermissionError: - print("\nCannot remove original WhatsApp directory. " - "Perhaps the directory is opened?", end="\n") - elif args.exported: - exported_handler.messages(args.exported, data, args.assume_first_as_me) - if not args.no_html: - android_handler.create_html( - data, - args.output, - args.template, - args.embedded, - args.offline, - args.size, - args.no_avatar, - args.whatsapp_theme, - args.headline - ) - for file in glob.glob(r'*.*'): - shutil.copy(file, args.output) - elif args.import_json: +def main(): + """Main function to run the WhatsApp Chat Exporter.""" + # Set up and parse arguments + parser = setup_argument_parser() + args = parser.parse_args() + + # Check for updates + if args.check_update: + exit(check_update()) + + # Validate arguments + validate_args(parser, args) + + # Create output directory if it doesn't exist + os.makedirs(args.output, exist_ok=True) + + # Initialize data collection + data = ChatCollection() + + # Set up contact store for vCard enrichment if needed + contact_store = setup_contact_store(args) + + if args.import_json: + # Import from JSON import_from_json(args.json, data) android_handler.create_html( data, @@ -610,48 +666,62 @@ def main(): args.whatsapp_theme, args.headline ) - - if args.text_format: - print("Writing text file...") - android_handler.create_txt(data, args.text_format) - - if args.json and not args.import_json: - if args.enrich_from_vcards is not None and not contact_store.is_empty(): - contact_store.enrich_from_vcards(data) - - if isinstance(data[next(iter(data))], ChatStore): - data = {jik: chat.to_json() for jik, chat in data.items()} - - if not args.json_per_chat: - with open(args.json, "w") as f: - data = json.dumps( - data, - ensure_ascii=not args.avoid_encoding_json, - indent=args.pretty_print_json - ) - print(f"\nWriting JSON file...({bytes_to_readable(len(data))})") - f.write(data) - else: - if args.json[-5:] == ".json": - args.json = args.json[:-5] - total = len(data.keys()) - if not os.path.isdir(args.json): - os.mkdir(args.json) - for index, jik in enumerate(data.keys()): - if data[jik]["name"] is not None: - contact = data[jik]["name"].replace('/', '') - else: - contact = jik.replace('+', '') - with open(f"{args.json}/{sanitize_filename(contact)}.json", "w") as f: - file_content_to_write = json.dumps({jik: data[jik]}, ensure_ascii=not args.avoid_encoding_json, indent=2 if args.pretty_print_json else None) - f.write(file_content_to_write) - print(f"Writing JSON file...({index + 1}/{total})", end="\r") - print() + elif args.exported: + # Process exported chat + process_exported_chat(args, data) else: - print() + # Process Android or iOS data + if args.android: + # Set default media path if not provided + if args.media is None: + args.media = "WhatsApp" + + # Set default DB paths if not provided + if args.db is None: + args.db = "msgstore.db" + if args.wa is None: + args.wa = "wa.db" + + # Decrypt backup if needed + if args.key is not None: + error = decrypt_android_backup(args) + if error != 0: + handle_decrypt_error(error) + elif args.ios: + # Set up identifiers based on business flag + if args.business: + from Whatsapp_Chat_Exporter.utility import WhatsAppBusinessIdentifier as identifiers + else: + from Whatsapp_Chat_Exporter.utility import WhatsAppIdentifier as identifiers + args.identifiers = identifiers + + # Set default media path if not provided + if args.media is None: + args.media = identifiers.DOMAIN + + # Extract media from backup if needed + if args.backup is not None: + if not os.path.isdir(args.media): + ios_media_handler.extract_media(args.backup, identifiers, args.decrypt_chunk_size) + else: + print("WhatsApp directory already exists, skipping WhatsApp file extraction.") + + # Set default DB paths if not provided + if args.db is None: + args.db = identifiers.MESSAGE + if args.wa is None: + args.wa = "ContactsV2.sqlite" + + # Process contacts + process_contacts(args, data, contact_store) + + # Process messages, media, and calls + process_messages(args, data) + + # Create output files + create_output_files(args, data, contact_store) + + # Handle media directory + handle_media_directory(args) - print("Everything is done!") - - -if __name__ == "__main__": - main() + print("Everything is done!") \ No newline at end of file diff --git a/Whatsapp_Chat_Exporter/android_handler.py b/Whatsapp_Chat_Exporter/android_handler.py index 3374c8c..71b5dcd 100644 --- a/Whatsapp_Chat_Exporter/android_handler.py +++ b/Whatsapp_Chat_Exporter/android_handler.py @@ -670,7 +670,7 @@ def create_html( if maximum_size == 0: maximum_size = MAX_SIZE last_msg = current_chat.get_last_message().key_id - for message in current_chat.get_messages(): + for message in current_chat.values(): if message.data is not None and not message.meta and not message.media: current_size += len(message.data) + ROW_SIZE else: @@ -717,7 +717,7 @@ def create_html( output_file_name, template, name, - current_chat.get_messages(), + current_chat.values(), contact, w3css, current_chat, @@ -739,7 +739,7 @@ def create_txt(data, output): contact = jik.replace('+', '') output_file = os.path.join(output, f"{contact}.txt") with open(output_file, "w", encoding="utf8") as f: - for message in chat.get_messages(): + for message in chat.values(): date = datetime.fromtimestamp(message.timestamp).date() if message.meta and message.mime != "media": continue # Skip any metadata in text format diff --git a/Whatsapp_Chat_Exporter/data_model.py b/Whatsapp_Chat_Exporter/data_model.py index 578713a..8989369 100644 --- a/Whatsapp_Chat_Exporter/data_model.py +++ b/Whatsapp_Chat_Exporter/data_model.py @@ -213,11 +213,19 @@ class ChatStore: def get_last_message(self) -> 'Message': """Get the most recent message in the chat.""" return tuple(self._messages.values())[-1] + + def items(self): + """Get message items pairs.""" + return self._messages.items() - def get_messages(self) -> 'Message': + def values(self): """Get all messages in the chat.""" return self._messages.values() + def keys(self): + """Get all message keys in the chat.""" + return self._messages.keys() + class Message: """ diff --git a/Whatsapp_Chat_Exporter/exported_handler.py b/Whatsapp_Chat_Exporter/exported_handler.py index 0aae498..7215f6f 100644 --- a/Whatsapp_Chat_Exporter/exported_handler.py +++ b/Whatsapp_Chat_Exporter/exported_handler.py @@ -8,85 +8,174 @@ from Whatsapp_Chat_Exporter.utility import Device def messages(path, data, assume_first_as_me=False): - """Extracts messages from the exported file""" + """ + Extracts messages from an exported WhatsApp chat file. + + Args: + path: Path to the exported chat file + data: Data container object to store the parsed chat + assume_first_as_me: If True, assumes the first message is sent from the user without asking + + Returns: + Updated data container with extracted messages + """ + # Create a new chat in the data container + chat = data.add_chat("ExportedChat", ChatStore(Device.EXPORTED)) + you = "" # Will store the username of the current user + user_identification_done = False # Flag to track if user identification has been done + + # First pass: count total lines for progress reporting + with open(path, "r", encoding="utf8") as file: + total_row_number = sum(1 for _ in file) + + # Second pass: process the messages with open(path, "r", encoding="utf8") as file: - you = "" - data["ExportedChat"] = ChatStore(Device.EXPORTED) - chat = data["ExportedChat"] - total_row_number = len(file.readlines()) - file.seek(0) for index, line in enumerate(file): - if len(line.split(" - ")) > 1: - time = line.split(" - ")[0] - if ":" not in line.split(time)[1]: - msg.data = line.split(time)[1][3:] - msg.meta = True - else: - name = line.split(time)[1].split(":")[0] - message = line.split(time)[1].split(name + ":")[1].strip() - name = name[3:] - if you == "": - if chat.name is None: - if not assume_first_as_me: - while True: - ans = input(f"Is '{name}' you? (Y/N)").lower() - if ans == "y": - you = name - break - elif ans == "n": - chat.name = name - break - else: - you = name - else: - if name != chat.name: - you = name - elif chat.name is None: - if name != you: - chat.name = name - msg = Message( - you == name, - datetime.strptime(time, "%d/%m/%Y, %H:%M").timestamp(), - time.split(", ")[1].strip(), - index - ) - if "" in message: - msg.data = "The media is omitted in the chat" - msg.mime = "media" - msg.meta = True - elif "(file attached)" in message: - mime = MimeTypes() - msg.media = True - file_path = os.path.join(os.path.dirname(path), message.split("(file attached)")[0].strip()) - if os.path.isfile(file_path): - msg.data = file_path - guess = mime.guess_type(file_path)[0] - if guess is not None: - msg.mime = guess - else: - msg.mime = "application/octet-stream" - else: - msg.data = "The media is missing" - msg.mime = "media" - msg.meta = True - else: - msg.data = message - if "\r\n" in message: - msg.data = message.replace("\r\n", "
") - if "\n" in message: - msg.data = message.replace("\n", "
") - chat.add_message(index, msg) - else: - lookback = index - 1 - while lookback not in chat.messages: - lookback -= 1 - msg = chat.messages[lookback] - if msg.media: - msg.caption = line.strip() - else: - msg.data += "
" + line.strip() - + you, user_identification_done = process_line( + line, index, chat, path, you, + assume_first_as_me, user_identification_done + ) + + # Show progress if index % 1000 == 0: print(f"Processing messages & media...({index}/{total_row_number})", end="\r") - print(f"Processing messages & media...({total_row_number}/{total_row_number})", end="\r") + + print(f"Processing messages & media...({total_row_number}/{total_row_number})") return data + + +def process_line(line, index, chat, file_path, you, assume_first_as_me, user_identification_done): + """ + Process a single line from the chat file + + Returns: + Tuple of (updated_you_value, updated_user_identification_done_flag) + """ + parts = line.split(" - ", 1) + + # Check if this is a new message (has timestamp format) + if len(parts) > 1: + time = parts[0] + you, user_identification_done = process_new_message( + time, parts[1], index, chat, you, file_path, + assume_first_as_me, user_identification_done + ) + else: + # This is a continuation of the previous message + process_message_continuation(line, index, chat) + + return you, user_identification_done + + +def process_new_message(time, content, index, chat, you, file_path, + assume_first_as_me, user_identification_done): + """ + Process a line that contains a new message + + Returns: + Tuple of (updated_you_value, updated_user_identification_done_flag) + """ + # Create a new message + msg = Message( + from_me=False, # Will be updated later if needed + timestamp=datetime.strptime(time, "%d/%m/%Y, %H:%M").timestamp(), + time=time.split(", ")[1].strip(), + key_id=index, + received_timestamp=None, + read_timestamp=None + ) + + # Check if this is a system message (no name:message format) + if ":" not in content: + msg.data = content + msg.meta = True + else: + # Process user message + name, message = content.strip().split(":", 1) + + # Handle user identification + if you == "": + if chat.name is None: + # First sender identification + if not user_identification_done: + if not assume_first_as_me: + # Ask only once if this is the user + you = prompt_for_user_identification(name) + user_identification_done = True + else: + you = name + user_identification_done = True + else: + # If we know the chat name, anyone else must be "you" + if name != chat.name: + you = name + + # Set the chat name if needed + if chat.name is None and name != you: + chat.name = name + + # Determine if this message is from the current user + msg.from_me = (name == you) + + # Process message content + process_message_content(msg, message, file_path) + + chat.add_message(index, msg) + return you, user_identification_done + + +def process_message_content(msg, message, file_path): + """Process and set the content of a message based on its type""" + if "" in message: + msg.data = "The media is omitted in the chat" + msg.mime = "media" + msg.meta = True + elif "(file attached)" in message: + process_attached_file(msg, message, file_path) + else: + msg.data = message.replace("\r\n", "
").replace("\n", "
") + + +def process_attached_file(msg, message, file_path): + """Process an attached file in a message""" + mime = MimeTypes() + msg.media = True + + # Extract file path and check if it exists + file_name = message.split("(file attached)")[0].strip() + attached_file_path = os.path.join(os.path.dirname(file_path), file_name) + + if os.path.isfile(attached_file_path): + msg.data = attached_file_path + guess = mime.guess_type(attached_file_path)[0] + msg.mime = guess if guess is not None else "application/octet-stream" + else: + msg.data = "The media is missing" + msg.mime = "media" + msg.meta = True + + +def process_message_continuation(line, index, chat): + """Process a line that continues a previous message""" + # Find the previous message + lookback = index - 1 + while lookback not in chat.keys(): + lookback -= 1 + + msg = chat.get_message(lookback) + + # Add the continuation line to the message + if msg.media: + msg.caption = line.strip() + else: + msg.data += "
" + line.strip() + + +def prompt_for_user_identification(name): + """Ask the user if the given name is their username""" + while True: + ans = input(f"Is '{name}' you? (Y/N)").lower() + if ans == "y": + return name + elif ans == "n": + return "" \ No newline at end of file diff --git a/Whatsapp_Chat_Exporter/ios_handler.py b/Whatsapp_Chat_Exporter/ios_handler.py index 1232a3f..7a15835 100644 --- a/Whatsapp_Chat_Exporter/ios_handler.py +++ b/Whatsapp_Chat_Exporter/ios_handler.py @@ -12,144 +12,179 @@ from Whatsapp_Chat_Exporter.utility import bytes_to_readable, convert_time_unit, def contacts(db, data): + """Process WhatsApp contacts with status information.""" c = db.cursor() - # Get status only lol c.execute("""SELECT count() FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""") total_row_number = c.fetchone()[0] print(f"Pre-processing contacts...({total_row_number})") + c.execute("""SELECT ZWHATSAPPID, ZABOUTTEXT FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""") content = c.fetchone() while content is not None: - if not content["ZWHATSAPPID"].endswith("@s.whatsapp.net"): - ZWHATSAPPID = content["ZWHATSAPPID"] + "@s.whatsapp.net" + zwhatsapp_id = content["ZWHATSAPPID"] + if not zwhatsapp_id.endswith("@s.whatsapp.net"): + zwhatsapp_id += "@s.whatsapp.net" + current_chat = ChatStore(Device.IOS) current_chat.status = content["ZABOUTTEXT"] - data.add_chat(ZWHATSAPPID, current_chat) + data.add_chat(zwhatsapp_id, current_chat) content = c.fetchone() +def process_contact_avatars(current_chat, media_folder, contact_id): + """Process and assign avatar images for a contact.""" + path = f'{media_folder}/Media/Profile/{contact_id.split("@")[0]}' + avatars = glob(f"{path}*") + + if 0 < len(avatars) <= 1: + current_chat.their_avatar = avatars[0] + else: + for avatar in avatars: + if avatar.endswith(".thumb") and current_chat.their_avatar_thumb is None: + current_chat.their_avatar_thumb = avatar + elif avatar.endswith(".jpg") and current_chat.their_avatar is None: + current_chat.their_avatar = avatar + + +def get_contact_name(content): + """Determine the appropriate contact name based on push name and partner name.""" + is_phone = content["ZPARTNERNAME"].replace("+", "").replace(" ", "").isdigit() + if content["ZPUSHNAME"] is None or (content["ZPUSHNAME"] and not is_phone): + return content["ZPARTNERNAME"] + else: + return content["ZPUSHNAME"] + + def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, filter_empty): + """Process WhatsApp messages and contacts from the database.""" c = db.cursor() cursor2 = db.cursor() - # Get contacts - c.execute( - f"""SELECT count() - FROM (SELECT DISTINCT ZCONTACTJID, - ZPARTNERNAME, - ZWAPROFILEPUSHNAME.ZPUSHNAME - FROM ZWACHATSESSION - INNER JOIN ZWAMESSAGE - ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK - LEFT JOIN ZWAPROFILEPUSHNAME - ON ZWACHATSESSION.ZCONTACTJID = ZWAPROFILEPUSHNAME.ZJID - LEFT JOIN ZWAGROUPMEMBER - ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK - WHERE 1=1 - {get_chat_condition(filter_chat[0], True, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")} - {get_chat_condition(filter_chat[1], False, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")} - GROUP BY ZCONTACTJID);""" - ) + + # Build the chat filter conditions + chat_filter_include = get_chat_condition(filter_chat[0], True, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios") + chat_filter_exclude = get_chat_condition(filter_chat[1], False, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios") + date_filter = f'AND ZMESSAGEDATE {filter_date}' if filter_date is not None else '' + + # Process contacts first + contact_query = f""" + SELECT count() + FROM (SELECT DISTINCT ZCONTACTJID, + ZPARTNERNAME, + ZWAPROFILEPUSHNAME.ZPUSHNAME + FROM ZWACHATSESSION + INNER JOIN ZWAMESSAGE + ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK + LEFT JOIN ZWAPROFILEPUSHNAME + ON ZWACHATSESSION.ZCONTACTJID = ZWAPROFILEPUSHNAME.ZJID + LEFT JOIN ZWAGROUPMEMBER + ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK + WHERE 1=1 + {chat_filter_include} + {chat_filter_exclude} + GROUP BY ZCONTACTJID); + """ + c.execute(contact_query) total_row_number = c.fetchone()[0] print(f"Processing contacts...({total_row_number})") - c.execute( - f"""SELECT DISTINCT ZCONTACTJID, - ZPARTNERNAME, - ZWAPROFILEPUSHNAME.ZPUSHNAME - FROM ZWACHATSESSION - INNER JOIN ZWAMESSAGE - ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK - LEFT JOIN ZWAPROFILEPUSHNAME - ON ZWACHATSESSION.ZCONTACTJID = ZWAPROFILEPUSHNAME.ZJID - LEFT JOIN ZWAGROUPMEMBER - ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK - WHERE 1=1 - {get_chat_condition(filter_chat[0], True, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")} - {get_chat_condition(filter_chat[1], False, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")} - GROUP BY ZCONTACTJID;""" - ) + # Get distinct contacts + contacts_query = f""" + SELECT DISTINCT ZCONTACTJID, + ZPARTNERNAME, + ZWAPROFILEPUSHNAME.ZPUSHNAME + FROM ZWACHATSESSION + INNER JOIN ZWAMESSAGE + ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK + LEFT JOIN ZWAPROFILEPUSHNAME + ON ZWACHATSESSION.ZCONTACTJID = ZWAPROFILEPUSHNAME.ZJID + LEFT JOIN ZWAGROUPMEMBER + ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK + WHERE 1=1 + {chat_filter_include} + {chat_filter_exclude} + GROUP BY ZCONTACTJID; + """ + c.execute(contacts_query) + + # Process each contact content = c.fetchone() while content is not None: - is_phone = content["ZPARTNERNAME"].replace("+", "").replace(" ", "").isdigit() - if content["ZPUSHNAME"] is None or (content["ZPUSHNAME"] and not is_phone): - contact_name = content["ZPARTNERNAME"] - else: - contact_name = content["ZPUSHNAME"] + contact_name = get_contact_name(content) contact_id = content["ZCONTACTJID"] + + # Add or update chat if contact_id not in data: current_chat = data.add_chat(contact_id, ChatStore(Device.IOS, contact_name, media_folder)) else: current_chat = data.get_chat(contact_id) current_chat.name = contact_name current_chat.my_avatar = os.path.join(media_folder, "Media/Profile/Photo.jpg") - path = f'{media_folder}/Media/Profile/{contact_id.split("@")[0]}' - avatars = glob(f"{path}*") - if 0 < len(avatars) <= 1: - current_chat.their_avatar = avatars[0] - else: - for avatar in avatars: - if avatar.endswith(".thumb") and current_chat.their_avatar_thumb is None: - current_chat.their_avatar_thumb = avatar - elif avatar.endswith(".jpg") and current_chat.their_avatar is None: - current_chat.their_avatar = avatar + + # Process avatar images + process_contact_avatars(current_chat, media_folder, contact_id) content = c.fetchone() - # Get message history - c.execute(f"""SELECT count() - FROM ZWAMESSAGE - INNER JOIN ZWACHATSESSION - ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK - LEFT JOIN ZWAGROUPMEMBER - ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK - WHERE 1=1 - {f'AND ZMESSAGEDATE {filter_date}' if filter_date is not None else ''} - {get_chat_condition(filter_chat[0], True, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")} - {get_chat_condition(filter_chat[1], False, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")}""") + # Get message count + message_count_query = f""" + SELECT count() + FROM ZWAMESSAGE + INNER JOIN ZWACHATSESSION + ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK + LEFT JOIN ZWAGROUPMEMBER + ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK + WHERE 1=1 + {date_filter} + {chat_filter_include} + {chat_filter_exclude} + """ + c.execute(message_count_query) total_row_number = c.fetchone()[0] print(f"Processing messages...(0/{total_row_number})", end="\r") - c.execute(f"""SELECT ZCONTACTJID, - ZWAMESSAGE.Z_PK, - ZISFROMME, - ZMESSAGEDATE, - ZTEXT, - ZMESSAGETYPE, - ZWAGROUPMEMBER.ZMEMBERJID, - ZMETADATA, - ZSTANZAID, - ZGROUPINFO, - ZSENTDATE - FROM ZWAMESSAGE - LEFT JOIN ZWAGROUPMEMBER - ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK - LEFT JOIN ZWAMEDIAITEM - ON ZWAMESSAGE.Z_PK = ZWAMEDIAITEM.ZMESSAGE - INNER JOIN ZWACHATSESSION - ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK - WHERE 1=1 - {f'AND ZMESSAGEDATE {filter_date}' if filter_date is not None else ''} - {get_chat_condition(filter_chat[0], True, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")} - {get_chat_condition(filter_chat[1], False, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")} - ORDER BY ZMESSAGEDATE ASC;""") + + # Fetch messages + messages_query = f""" + SELECT ZCONTACTJID, + ZWAMESSAGE.Z_PK, + ZISFROMME, + ZMESSAGEDATE, + ZTEXT, + ZMESSAGETYPE, + ZWAGROUPMEMBER.ZMEMBERJID, + ZMETADATA, + ZSTANZAID, + ZGROUPINFO, + ZSENTDATE + FROM ZWAMESSAGE + LEFT JOIN ZWAGROUPMEMBER + ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK + LEFT JOIN ZWAMEDIAITEM + ON ZWAMESSAGE.Z_PK = ZWAMEDIAITEM.ZMESSAGE + INNER JOIN ZWACHATSESSION + ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK + WHERE 1=1 + {date_filter} + {chat_filter_include} + {chat_filter_exclude} + ORDER BY ZMESSAGEDATE ASC; + """ + c.execute(messages_query) + + # Process each message i = 0 content = c.fetchone() while content is not None: - ZCONTACTJID = content["ZCONTACTJID"] - Z_PK = content["Z_PK"] + contact_id = content["ZCONTACTJID"] + message_pk = content["Z_PK"] is_group_message = content["ZGROUPINFO"] is not None - if ZCONTACTJID not in data: - current_chat = data.add_chat(ZCONTACTJID, ChatStore(Device.IOS)) - path = f'{media_folder}/Media/Profile/{ZCONTACTJID.split("@")[0]}' - avatars = glob(f"{path}*") - if 0 < len(avatars) <= 1: - current_chat.their_avatar = avatars[0] - else: - for avatar in avatars: - if avatar.endswith(".thumb"): - current_chat.their_avatar_thumb = avatar - elif avatar.endswith(".jpg"): - current_chat.their_avatar = avatar + + # Ensure chat exists + if contact_id not in data: + current_chat = data.add_chat(contact_id, ChatStore(Device.IOS)) + process_contact_avatars(current_chat, media_folder, contact_id) else: - current_chat = data.get_chat(ZCONTACTJID) + current_chat = data.get_chat(contact_id) + + # Create message object ts = APPLE_TIME + content["ZMESSAGEDATE"] message = Message( from_me=content["ZISFROMME"], @@ -159,290 +194,409 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, timezone_offset=timezone_offset if timezone_offset else CURRENT_TZ_OFFSET, message_type=content["ZMESSAGETYPE"], received_timestamp=APPLE_TIME + content["ZSENTDATE"] if content["ZSENTDATE"] else None, - read_timestamp=None # TODO: Add timestamp + read_timestamp=None # TODO: Add timestamp ) - invalid = False - if is_group_message and content["ZISFROMME"] == 0: - name = None - if content["ZMEMBERJID"] is not None: - if content["ZMEMBERJID"] in data: - name = data.get_chat(content["ZMEMBERJID"]).name - if "@" in content["ZMEMBERJID"]: - fallback = content["ZMEMBERJID"].split('@')[0] - else: - fallback = None - else: - fallback = None - message.sender = name or fallback - else: - message.sender = None - if content["ZMESSAGETYPE"] == 6: - # Metadata - if is_group_message: - # Group - if content["ZTEXT"] is not None: - # Chnaged name - try: - int(content["ZTEXT"]) - except ValueError: - msg = f"The group name changed to {content['ZTEXT']}" - message.data = msg - message.meta = True - else: - invalid = True - else: - message.data = None - else: - message.data = None - else: - # real message - if content["ZMETADATA"] is not None and content["ZMETADATA"].startswith(b"\x2a\x14"): - quoted = content["ZMETADATA"][2:19] - message.reply = quoted.decode() - cursor2.execute(f"""SELECT ZTEXT - FROM ZWAMESSAGE - WHERE ZSTANZAID LIKE '{message.reply}%'""") - quoted_content = cursor2.fetchone() - if quoted_content and "ZTEXT" in quoted_content: - message.quoted_data = quoted_content["ZTEXT"] - else: - message.quoted_data = None - if content["ZMESSAGETYPE"] == 15: # Sticker - message.sticker = True - - if content["ZISFROMME"] == 1: - if content["ZMESSAGETYPE"] == 14: - msg = "Message deleted" - message.meta = True - else: - msg = content["ZTEXT"] - if msg is not None: - if "\r\n" in msg: - msg = msg.replace("\r\n", "
") - if "\n" in msg: - msg = msg.replace("\n", "
") - else: - if content["ZMESSAGETYPE"] == 14: - msg = "Message deleted" - message.meta = True - else: - msg = content["ZTEXT"] - if msg is not None: - if "\r\n" in msg: - msg = msg.replace("\r\n", "
") - if "\n" in msg: - msg = msg.replace("\n", "
") - message.data = msg + + # Process message data + invalid = process_message_data(message, content, is_group_message, data, cursor2) + + # Add valid messages to chat if not invalid: - current_chat.add_message(Z_PK, message) + current_chat.add_message(message_pk, message) + + # Update progress i += 1 if i % 1000 == 0: print(f"Processing messages...({i}/{total_row_number})", end="\r") content = c.fetchone() + print(f"Processing messages...({total_row_number}/{total_row_number})", end="\r") +def process_message_data(message, content, is_group_message, data, cursor2): + """Process and set message data from content row.""" + # Handle group sender info + if is_group_message and content["ZISFROMME"] == 0: + name = None + if content["ZMEMBERJID"] is not None: + if content["ZMEMBERJID"] in data: + name = data.get_chat(content["ZMEMBERJID"]).name + if "@" in content["ZMEMBERJID"]: + fallback = content["ZMEMBERJID"].split('@')[0] + else: + fallback = None + else: + fallback = None + message.sender = name or fallback + else: + message.sender = None + + # Handle metadata messages + if content["ZMESSAGETYPE"] == 6: + return process_metadata_message(message, content, is_group_message) + + # Handle quoted replies + if content["ZMETADATA"] is not None and content["ZMETADATA"].startswith(b"\x2a\x14") and False: + quoted = content["ZMETADATA"][2:19] + message.reply = quoted.decode() + cursor2.execute(f"""SELECT ZTEXT + FROM ZWAMESSAGE + WHERE ZSTANZAID LIKE '{message.reply}%'""") + quoted_content = cursor2.fetchone() + if quoted_content and "ZTEXT" in quoted_content: + message.quoted_data = quoted_content["ZTEXT"] + else: + message.quoted_data = None + + # Handle stickers + if content["ZMESSAGETYPE"] == 15: + message.sticker = True + + # Process message text + process_message_text(message, content) + + return False # Message is valid + + +def process_metadata_message(message, content, is_group_message): + """Process metadata messages (action_type 6).""" + if is_group_message: + # Group + if content["ZTEXT"] is not None: + # Changed name + try: + int(content["ZTEXT"]) + except ValueError: + msg = f"The group name changed to {content['ZTEXT']}" + message.data = msg + message.meta = True + return False # Valid message + else: + return True # Invalid message + else: + message.data = None + return False + else: + message.data = None + return False + + +def process_message_text(message, content): + """Process and format message text content.""" + if content["ZISFROMME"] == 1: + if content["ZMESSAGETYPE"] == 14: + msg = "Message deleted" + message.meta = True + else: + msg = content["ZTEXT"] + if msg is not None: + msg = msg.replace("\r\n", "
").replace("\n", "
") + else: + if content["ZMESSAGETYPE"] == 14: + msg = "Message deleted" + message.meta = True + else: + msg = content["ZTEXT"] + if msg is not None: + msg = msg.replace("\r\n", "
").replace("\n", "
") + + message.data = msg + + def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=False): + """Process media files from WhatsApp messages.""" c = db.cursor() - # Get media - c.execute(f"""SELECT count() - FROM ZWAMEDIAITEM - INNER JOIN ZWAMESSAGE - ON ZWAMEDIAITEM.ZMESSAGE = ZWAMESSAGE.Z_PK - INNER JOIN ZWACHATSESSION - ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK - LEFT JOIN ZWAGROUPMEMBER - ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK - WHERE 1=1 - {f'AND ZMESSAGEDATE {filter_date}' if filter_date is not None else ''} - {get_chat_condition(filter_chat[0], True, ["ZWACHATSESSION.ZCONTACTJID","ZMEMBERJID"], "ZGROUPINFO", "ios")} - {get_chat_condition(filter_chat[1], False, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")} - """) + + # Build filter conditions + chat_filter_include = get_chat_condition(filter_chat[0], True, ["ZWACHATSESSION.ZCONTACTJID","ZMEMBERJID"], "ZGROUPINFO", "ios") + chat_filter_exclude = get_chat_condition(filter_chat[1], False, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios") + date_filter = f'AND ZMESSAGEDATE {filter_date}' if filter_date is not None else '' + + # Get media count + media_count_query = f""" + SELECT count() + FROM ZWAMEDIAITEM + INNER JOIN ZWAMESSAGE + ON ZWAMEDIAITEM.ZMESSAGE = ZWAMESSAGE.Z_PK + INNER JOIN ZWACHATSESSION + ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK + LEFT JOIN ZWAGROUPMEMBER + ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK + WHERE 1=1 + {date_filter} + {chat_filter_include} + {chat_filter_exclude} + """ + c.execute(media_count_query) total_row_number = c.fetchone()[0] print(f"\nProcessing media...(0/{total_row_number})", end="\r") - i = 0 - c.execute(f"""SELECT ZCONTACTJID, - ZMESSAGE, - ZMEDIALOCALPATH, - ZMEDIAURL, - ZVCARDSTRING, - ZMEDIAKEY, - ZTITLE - FROM ZWAMEDIAITEM - INNER JOIN ZWAMESSAGE - ON ZWAMEDIAITEM.ZMESSAGE = ZWAMESSAGE.Z_PK - INNER JOIN ZWACHATSESSION - ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK - LEFT JOIN ZWAGROUPMEMBER - ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK - WHERE ZMEDIALOCALPATH IS NOT NULL - {f'AND ZWAMESSAGE.ZMESSAGEDATE {filter_date}' if filter_date is not None else ''} - {get_chat_condition(filter_chat[0], True, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")} - {get_chat_condition(filter_chat[1], False, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")} - ORDER BY ZCONTACTJID ASC""") - content = c.fetchone() + + # Fetch media items + media_query = f""" + SELECT ZCONTACTJID, + ZMESSAGE, + ZMEDIALOCALPATH, + ZMEDIAURL, + ZVCARDSTRING, + ZMEDIAKEY, + ZTITLE + FROM ZWAMEDIAITEM + INNER JOIN ZWAMESSAGE + ON ZWAMEDIAITEM.ZMESSAGE = ZWAMESSAGE.Z_PK + INNER JOIN ZWACHATSESSION + ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK + LEFT JOIN ZWAGROUPMEMBER + ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK + WHERE ZMEDIALOCALPATH IS NOT NULL + {date_filter} + {chat_filter_include} + {chat_filter_exclude} + ORDER BY ZCONTACTJID ASC + """ + c.execute(media_query) + + # Process each media item mime = MimeTypes() + i = 0 + content = c.fetchone() while content is not None: - file_path = f"{media_folder}/Message/{content['ZMEDIALOCALPATH']}" - current_chat = data.get_chat(content["ZCONTACTJID"]) - message = current_chat.get_message(content["ZMESSAGE"]) - message.media = True - if current_chat.media_base == "": - current_chat.media_base = media_folder + "/" - if os.path.isfile(file_path): - message.data = '/'.join(file_path.split("/")[1:]) - if content["ZVCARDSTRING"] is None: - guess = mime.guess_type(file_path)[0] - if guess is not None: - message.mime = guess - else: - message.mime = "application/octet-stream" - else: - message.mime = content["ZVCARDSTRING"] - if separate_media: - chat_display_name = slugify(current_chat.name or message.sender \ - or content["ZCONTACTJID"].split('@')[0], True) - current_filename = file_path.split("/")[-1] - new_folder = os.path.join(media_folder, "separated", chat_display_name) - Path(new_folder).mkdir(parents=True, exist_ok=True) - new_path = os.path.join(new_folder, current_filename) - shutil.copy2(file_path, new_path) - message.data = '/'.join(new_path.split("\\")[1:]) - else: - message.data = "The media is missing" - message.mime = "media" - message.meta = True - if content["ZTITLE"] is not None: - message.caption = content["ZTITLE"] + process_media_item(content, data, media_folder, mime, separate_media) + + # Update progress i += 1 if i % 100 == 0: print(f"Processing media...({i}/{total_row_number})", end="\r") content = c.fetchone() - print( - f"Processing media...({total_row_number}/{total_row_number})", end="\r") + + print(f"Processing media...({total_row_number}/{total_row_number})", end="\r") + + +def process_media_item(content, data, media_folder, mime, separate_media): + """Process a single media item.""" + file_path = f"{media_folder}/Message/{content['ZMEDIALOCALPATH']}" + current_chat = data.get_chat(content["ZCONTACTJID"]) + message = current_chat.get_message(content["ZMESSAGE"]) + message.media = True + + if current_chat.media_base == "": + current_chat.media_base = media_folder + "/" + + if os.path.isfile(file_path): + message.data = '/'.join(file_path.split("/")[1:]) + + # Set MIME type + if content["ZVCARDSTRING"] is None: + guess = mime.guess_type(file_path)[0] + message.mime = guess if guess is not None else "application/octet-stream" + else: + message.mime = content["ZVCARDSTRING"] + + # Handle separate media option + if separate_media: + chat_display_name = slugify(current_chat.name or message.sender or content["ZCONTACTJID"].split('@')[0], True) + current_filename = file_path.split("/")[-1] + new_folder = os.path.join(media_folder, "separated", chat_display_name) + Path(new_folder).mkdir(parents=True, exist_ok=True) + new_path = os.path.join(new_folder, current_filename) + shutil.copy2(file_path, new_path) + message.data = '/'.join(new_path.split("\\")[1:]) + else: + # Handle missing media + message.data = "The media is missing" + message.mime = "media" + message.meta = True + + # Add caption if available + if content["ZTITLE"] is not None: + message.caption = content["ZTITLE"] def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty): + """Process vCard contacts from WhatsApp messages.""" c = db.cursor() - c.execute(f"""SELECT DISTINCT ZWAVCARDMENTION.ZMEDIAITEM, - ZWAMEDIAITEM.ZMESSAGE, - ZCONTACTJID, - ZVCARDNAME, - ZVCARDSTRING - FROM ZWAVCARDMENTION - INNER JOIN ZWAMEDIAITEM - ON ZWAVCARDMENTION.ZMEDIAITEM = ZWAMEDIAITEM.Z_PK - INNER JOIN ZWAMESSAGE - ON ZWAMEDIAITEM.ZMESSAGE = ZWAMESSAGE.Z_PK - INNER JOIN ZWACHATSESSION - ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK - LEFT JOIN ZWAGROUPMEMBER - ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK - WHERE 1=1 - {f'AND ZWAMESSAGE.ZMESSAGEDATE {filter_date}' if filter_date is not None else ''} - {get_chat_condition(filter_chat[0], True, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")} - {get_chat_condition(filter_chat[1], False, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")};""") + + # Build filter conditions + chat_filter_include = get_chat_condition(filter_chat[0], True, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios") + chat_filter_exclude = get_chat_condition(filter_chat[1], False, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios") + date_filter = f'AND ZWAMESSAGE.ZMESSAGEDATE {filter_date}' if filter_date is not None else '' + + # Fetch vCard mentions + vcard_query = f""" + SELECT DISTINCT ZWAVCARDMENTION.ZMEDIAITEM, + ZWAMEDIAITEM.ZMESSAGE, + ZCONTACTJID, + ZVCARDNAME, + ZVCARDSTRING + FROM ZWAVCARDMENTION + INNER JOIN ZWAMEDIAITEM + ON ZWAVCARDMENTION.ZMEDIAITEM = ZWAMEDIAITEM.Z_PK + INNER JOIN ZWAMESSAGE + ON ZWAMEDIAITEM.ZMESSAGE = ZWAMESSAGE.Z_PK + INNER JOIN ZWACHATSESSION + ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK + LEFT JOIN ZWAGROUPMEMBER + ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK + WHERE 1=1 + {date_filter} + {chat_filter_include} + {chat_filter_exclude} + """ + c.execute(vcard_query) contents = c.fetchall() total_row_number = len(contents) print(f"\nProcessing vCards...(0/{total_row_number})", end="\r") + + # Create vCards directory path = f'{media_folder}/Message/vCards' Path(path).mkdir(parents=True, exist_ok=True) + # Process each vCard for index, content in enumerate(contents): - file_paths = [] - vcard_names = content["ZVCARDNAME"].split("_$!!$_") - vcard_strings = content["ZVCARDSTRING"].split("_$!!$_") - - # If this is a list of contacts - if len(vcard_names) > len(vcard_strings): - vcard_names.pop(0) # Dismiss the first element, which is the group name - - for name, vcard_string in zip(vcard_names, vcard_strings): - file_name = "".join(x for x in name if x.isalnum()) - file_name = file_name.encode('utf-8')[:230].decode('utf-8', 'ignore') - file_path = os.path.join(path, f"{file_name}.vcf") - file_paths.append(file_path) - - if not os.path.isfile(file_path): - with open(file_path, "w", encoding="utf-8") as f: - f.write(vcard_string) - - vcard_summary = "This media include the following vCard file(s):
" - vcard_summary += " | ".join([f'{htmle(name)}' for name, fp in zip(vcard_names, file_paths)]) - message = data.get_chat(content["ZCONTACTJID"]).get_message(content["ZMESSAGE"]) - message.data = vcard_summary - message.mime = "text/x-vcard" - message.media = True - message.meta = True - message.safe = True + process_vcard_item(content, path, data) print(f"Processing vCards...({index + 1}/{total_row_number})", end="\r") +def process_vcard_item(content, path, data): + """Process a single vCard item.""" + file_paths = [] + vcard_names = content["ZVCARDNAME"].split("_$!!$_") + vcard_strings = content["ZVCARDSTRING"].split("_$!!$_") + + # If this is a list of contacts + if len(vcard_names) > len(vcard_strings): + vcard_names.pop(0) # Dismiss the first element, which is the group name + + # Save each vCard file + for name, vcard_string in zip(vcard_names, vcard_strings): + file_name = "".join(x for x in name if x.isalnum()) + file_name = file_name.encode('utf-8')[:230].decode('utf-8', 'ignore') + file_path = os.path.join(path, f"{file_name}.vcf") + file_paths.append(file_path) + + if not os.path.isfile(file_path): + with open(file_path, "w", encoding="utf-8") as f: + f.write(vcard_string) + + # Create vCard summary and update message + vcard_summary = "This media include the following vCard file(s):
" + vcard_summary += " | ".join([f'{htmle(name)}' for name, fp in zip(vcard_names, file_paths)]) + + message = data.get_chat(content["ZCONTACTJID"]).get_message(content["ZMESSAGE"]) + message.data = vcard_summary + message.mime = "text/x-vcard" + message.media = True + message.meta = True + message.safe = True + + def calls(db, data, timezone_offset, filter_chat): + """Process WhatsApp call records.""" c = db.cursor() - c.execute(f"""SELECT count() - FROM ZWACDCALLEVENT - WHERE 1=1 - {get_chat_condition(filter_chat[0], True, ["ZGROUPCALLCREATORUSERJIDSTRING"], None, "ios")} - {get_chat_condition(filter_chat[1], False, ["ZGROUPCALLCREATORUSERJIDSTRING"], None, "ios")}""") + + # Build filter conditions + chat_filter_include = get_chat_condition(filter_chat[0], True, ["ZGROUPCALLCREATORUSERJIDSTRING"], None, "ios") + chat_filter_exclude = get_chat_condition(filter_chat[1], False, ["ZGROUPCALLCREATORUSERJIDSTRING"], None, "ios") + + # Get call count + call_count_query = f""" + SELECT count() + FROM ZWACDCALLEVENT + WHERE 1=1 + {chat_filter_include} + {chat_filter_exclude} + """ + c.execute(call_count_query) total_row_number = c.fetchone()[0] if total_row_number == 0: return + print(f"\nProcessing calls...({total_row_number})", end="\r") - c.execute(f"""SELECT ZCALLIDSTRING, - ZGROUPCALLCREATORUSERJIDSTRING, - ZGROUPJIDSTRING, - ZDATE, - ZOUTCOME, - ZBYTESRECEIVED + ZBYTESSENT AS bytes_transferred, - ZDURATION, - ZVIDEO, - ZMISSED, - ZINCOMING - FROM ZWACDCALLEVENT - INNER JOIN ZWAAGGREGATECALLEVENT - ON ZWACDCALLEVENT.Z1CALLEVENTS = ZWAAGGREGATECALLEVENT.Z_PK - WHERE 1=1 - {get_chat_condition(filter_chat[0], True, ["ZGROUPCALLCREATORUSERJIDSTRING"], None, "ios")} - {get_chat_condition(filter_chat[1], False, ["ZGROUPCALLCREATORUSERJIDSTRING"], None, "ios")}""") + + # Fetch call records + calls_query = f""" + SELECT ZCALLIDSTRING, + ZGROUPCALLCREATORUSERJIDSTRING, + ZGROUPJIDSTRING, + ZDATE, + ZOUTCOME, + ZBYTESRECEIVED + ZBYTESSENT AS bytes_transferred, + ZDURATION, + ZVIDEO, + ZMISSED, + ZINCOMING + FROM ZWACDCALLEVENT + INNER JOIN ZWAAGGREGATECALLEVENT + ON ZWACDCALLEVENT.Z1CALLEVENTS = ZWAAGGREGATECALLEVENT.Z_PK + WHERE 1=1 + {chat_filter_include} + {chat_filter_exclude} + """ + c.execute(calls_query) + + # Create calls chat chat = ChatStore(Device.ANDROID, "WhatsApp Calls") + + # Process each call content = c.fetchone() while content is not None: - ts = APPLE_TIME + int(content["ZDATE"]) - call = Message( - from_me=content["ZINCOMING"] == 0, - timestamp=ts, - time=ts, - key_id=content["ZCALLIDSTRING"], - timezone_offset=timezone_offset if timezone_offset else CURRENT_TZ_OFFSET - ) - _jid = content["ZGROUPCALLCREATORUSERJIDSTRING"] - name = data.get_chat(_jid).name if _jid in data else None - if _jid is not None and "@" in _jid: - fallback = _jid.split('@')[0] - else: - fallback = None - call.sender = name or fallback - call.meta = True - call.data = ( - f"A {'group ' if content['ZGROUPJIDSTRING'] is not None else ''}" - f"{'video' if content['ZVIDEO'] == 1 else 'voice'} " - f"call {'to' if call.from_me else 'from'} " - f"{call.sender} was " - ) - if content['ZOUTCOME'] in (1, 4): - call.data += "not answered." if call.from_me else "missed." - elif content['ZOUTCOME'] == 2: - call.data += "failed." - elif content['ZOUTCOME'] == 0: - call_time = convert_time_unit(int(content['ZDURATION'])) - call_bytes = bytes_to_readable(content['bytes_transferred']) - call.data += ( - f"initiated and lasted for {call_time} " - f"with {call_bytes} data transferred." - ) - else: - call.data += "in an unknown state." - chat.add_message(call.key_id, call) + process_call_record(content, chat, data, timezone_offset) content = c.fetchone() + + # Add calls chat to data data.add_chat("000000000000000", chat) + + +def process_call_record(content, chat, data, timezone_offset): + """Process a single call record.""" + ts = APPLE_TIME + int(content["ZDATE"]) + call = Message( + from_me=content["ZINCOMING"] == 0, + timestamp=ts, + time=ts, + key_id=content["ZCALLIDSTRING"], + timezone_offset=timezone_offset if timezone_offset else CURRENT_TZ_OFFSET + ) + + # Set sender info + _jid = content["ZGROUPCALLCREATORUSERJIDSTRING"] + name = data.get_chat(_jid).name if _jid in data else None + if _jid is not None and "@" in _jid: + fallback = _jid.split('@')[0] + else: + fallback = None + call.sender = name or fallback + + # Set call metadata + call.meta = True + call.data = format_call_data(call, content) + + # Add call to chat + chat.add_message(call.key_id, call) + + +def format_call_data(call, content): + """Format call data message based on call attributes.""" + # Basic call info + call_data = ( + f"A {'group ' if content['ZGROUPJIDSTRING'] is not None else ''}" + f"{'video' if content['ZVIDEO'] == 1 else 'voice'} " + f"call {'to' if call.from_me else 'from'} " + f"{call.sender} was " + ) + + # Call outcome + if content['ZOUTCOME'] in (1, 4): + call_data += "not answered." if call.from_me else "missed." + elif content['ZOUTCOME'] == 2: + call_data += "failed." + elif content['ZOUTCOME'] == 0: + call_time = convert_time_unit(int(content['ZDURATION'])) + call_bytes = bytes_to_readable(content['bytes_transferred']) + call_data += ( + f"initiated and lasted for {call_time} " + f"with {call_bytes} data transferred." + ) + else: + call_data += "in an unknown state." + + return call_data \ No newline at end of file