diff --git a/Whatsapp_Chat_Exporter/__main__.py b/Whatsapp_Chat_Exporter/__main__.py index 807279a..0a7aae3 100644 --- a/Whatsapp_Chat_Exporter/__main__.py +++ b/Whatsapp_Chat_Exporter/__main__.py @@ -452,7 +452,8 @@ def decrypt_android_backup(args) -> int: elif "crypt15" in args.backup: crypt = Crypt.CRYPT15 else: - logger.error(f"Unknown backup format. The backup file must be crypt12, crypt14 or crypt15.{CLEAR_LINE}") + logger.error( + f"Unknown backup format. The backup file must be crypt12, crypt14 or crypt15.{CLEAR_LINE}") return 1 # Get key @@ -505,11 +506,11 @@ def handle_decrypt_error(error: int) -> None: """Handle decryption errors with appropriate messages.""" if error == 1: logger.error("Dependencies of decrypt_backup and/or extract_encrypted_key" - " are not present. For details, see README.md.\n") + " are not present. For details, see README.md.\n") exit(3) elif error == 2: logger.error("Failed when decompressing the decrypted backup. " - "Possibly incorrect offsets used in decryption.\n") + "Possibly incorrect offsets used in decryption.\n") exit(4) else: logger.error("Unknown error occurred.\n") @@ -598,7 +599,7 @@ def handle_media_directory(args) -> None: logger.info(f"Media directory has been moved to the output directory{CLEAR_LINE}") except PermissionError: logger.warning("Cannot remove original WhatsApp directory. " - "Perhaps the directory is opened?\n") + "Perhaps the directory is opened?\n") else: logger.info(f"Copying media directory...\r") shutil.copytree(args.media, media_path) diff --git a/Whatsapp_Chat_Exporter/android_crypt.py b/Whatsapp_Chat_Exporter/android_crypt.py index cf7148b..3e921d1 100644 --- a/Whatsapp_Chat_Exporter/android_crypt.py +++ b/Whatsapp_Chat_Exporter/android_crypt.py @@ -121,6 +121,7 @@ def _decrypt_database(db_ciphertext: bytes, main_key: bytes, iv: bytes) -> bytes ) return db + def _decrypt_crypt14(database: bytes, main_key: bytes, max_worker: int = 10) -> bytes: """Decrypt a crypt14 database using multithreading for brute-force offset detection. @@ -194,7 +195,8 @@ def _decrypt_crypt14(database: bytes, main_key: bytes, max_worker: int = 10) -> return db with concurrent.futures.ThreadPoolExecutor(max_worker) as executor: - future_to_offset = {executor.submit(attempt_decrypt, offset): offset for offset in offset_combinations} + future_to_offset = {executor.submit(attempt_decrypt, offset) + : offset for offset in offset_combinations} try: for future in concurrent.futures.as_completed(future_to_offset): @@ -217,7 +219,6 @@ def _decrypt_crypt14(database: bytes, main_key: bytes, max_worker: int = 10) -> raise OffsetNotFoundError("Could not find the correct offsets for decryption.") - def _decrypt_crypt12(database: bytes, main_key: bytes) -> bytes: """Decrypt a crypt12 database. @@ -319,7 +320,7 @@ def decrypt_backup( if crypt is not Crypt.CRYPT15 and len(key) != 158: raise InvalidKeyError("The key file must be 158 bytes") - #signature check, this is check is used in crypt 12 and 14 + # signature check, this is check is used in crypt 12 and 14 if crypt != Crypt.CRYPT15: t1 = key[30:62] @@ -329,7 +330,6 @@ def decrypt_backup( if t1 != database[3:35] and crypt == Crypt.CRYPT12: raise ValueError("The signature of key file and backup file mismatch") - if crypt == Crypt.CRYPT15: if keyfile_stream: main_key, hex_key = _extract_enc_key(key) @@ -353,7 +353,6 @@ def decrypt_backup( except (InvalidFileFormatError, OffsetNotFoundError, ValueError) as e: raise DecryptionError(f"Decryption failed: {e}") from e - if not dry_run: with open(output, "wb") as f: f.write(db) diff --git a/Whatsapp_Chat_Exporter/android_handler.py b/Whatsapp_Chat_Exporter/android_handler.py index cd364e7..d371b4f 100644 --- a/Whatsapp_Chat_Exporter/android_handler.py +++ b/Whatsapp_Chat_Exporter/android_handler.py @@ -22,24 +22,26 @@ logger = logging.getLogger(__name__) def contacts(db, data, enrich_from_vcards): """ Process WhatsApp contacts from the database. - + Args: db: Database connection data: Data store object enrich_from_vcards: Path to vCard file for contact enrichment - + Returns: bool: False if no contacts found, True otherwise """ c = db.cursor() c.execute("SELECT count() FROM wa_contacts") total_row_number = c.fetchone()[0] - + if total_row_number == 0: if enrich_from_vcards is not None: - logger.info("No contacts profiles found in the default database, contacts will be imported from the specified vCard file.") + logger.info( + "No contacts profiles found in the default database, contacts will be imported from the specified vCard file.") else: - logger.warning("No contacts profiles found in the default database, consider using --enrich-from-vcards for adopting names from exported contacts from Google") + logger.warning( + "No contacts profiles found in the default database, consider using --enrich-from-vcards for adopting names from exported contacts from Google") return False else: logger.info(f"Processed {total_row_number} contacts\n") @@ -51,14 +53,14 @@ def contacts(db, data, enrich_from_vcards): if row["status"] is not None: current_chat.status = row["status"] row = c.fetchone() - + return True def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, filter_empty): """ Process WhatsApp messages from the database. - + Args: db: Database connection data: Data store object @@ -85,17 +87,17 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, i = 0 # Fetch the first row safely content = _fetch_row_safely(content_cursor) - + while content is not None: _process_single_message(data, content, table_message, timezone_offset) - + i += 1 if i % 1000 == 0: logger.info(f"Processing messages...({i}/{total_row_number})\r") - + # Fetch the next row safely content = _fetch_row_safely(content_cursor) - + logger.info(f"Processed {total_row_number} messages{CLEAR_LINE}") @@ -106,8 +108,10 @@ def _get_message_count(cursor, filter_empty, filter_date, filter_chat): try: empty_filter = get_cond_for_empty(filter_empty, "messages.key_remote_jid", "messages.needs_push") date_filter = f'AND timestamp {filter_date}' if filter_date is not None else '' - include_filter = get_chat_condition(filter_chat[0], True, ["messages.key_remote_jid", "messages.remote_resource"], "jid", "android") - exclude_filter = get_chat_condition(filter_chat[1], False, ["messages.key_remote_jid", "messages.remote_resource"], "jid", "android") + include_filter = get_chat_condition( + filter_chat[0], True, ["messages.key_remote_jid", "messages.remote_resource"], "jid", "android") + exclude_filter = get_chat_condition( + filter_chat[1], False, ["messages.key_remote_jid", "messages.remote_resource"], "jid", "android") cursor.execute(f"""SELECT count() FROM messages @@ -123,8 +127,10 @@ def _get_message_count(cursor, filter_empty, filter_date, filter_chat): except sqlite3.OperationalError: empty_filter = get_cond_for_empty(filter_empty, "jid.raw_string", "broadcast") date_filter = f'AND timestamp {filter_date}' if filter_date is not None else '' - include_filter = get_chat_condition(filter_chat[0], True, ["jid.raw_string", "jid_group.raw_string"], "jid", "android") - exclude_filter = get_chat_condition(filter_chat[1], False, ["jid.raw_string", "jid_group.raw_string"], "jid", "android") + include_filter = get_chat_condition( + filter_chat[0], True, ["jid.raw_string", "jid_group.raw_string"], "jid", "android") + exclude_filter = get_chat_condition( + filter_chat[1], False, ["jid.raw_string", "jid_group.raw_string"], "jid", "android") cursor.execute(f"""SELECT count() FROM message @@ -146,8 +152,10 @@ def _get_messages_cursor_legacy(cursor, filter_empty, filter_date, filter_chat): """Get cursor for legacy database schema.""" empty_filter = get_cond_for_empty(filter_empty, "messages.key_remote_jid", "messages.needs_push") date_filter = f'AND messages.timestamp {filter_date}' if filter_date is not None else '' - include_filter = get_chat_condition(filter_chat[0], True, ["messages.key_remote_jid", "messages.remote_resource"], "jid_global", "android") - exclude_filter = get_chat_condition(filter_chat[1], False, ["messages.key_remote_jid", "messages.remote_resource"], "jid_global", "android") + include_filter = get_chat_condition( + filter_chat[0], True, ["messages.key_remote_jid", "messages.remote_resource"], "jid_global", "android") + exclude_filter = get_chat_condition( + filter_chat[1], False, ["messages.key_remote_jid", "messages.remote_resource"], "jid_global", "android") cursor.execute(f"""SELECT messages.key_remote_jid, messages._id, @@ -209,8 +217,10 @@ def _get_messages_cursor_new(cursor, filter_empty, filter_date, filter_chat): """Get cursor for new database schema.""" empty_filter = get_cond_for_empty(filter_empty, "key_remote_jid", "broadcast") date_filter = f'AND message.timestamp {filter_date}' if filter_date is not None else '' - include_filter = get_chat_condition(filter_chat[0], True, ["key_remote_jid", "jid_group.raw_string"], "jid_global", "android") - exclude_filter = get_chat_condition(filter_chat[1], False, ["key_remote_jid", "jid_group.raw_string"], "jid_global", "android") + include_filter = get_chat_condition( + filter_chat[0], True, ["key_remote_jid", "jid_group.raw_string"], "jid_global", "android") + exclude_filter = get_chat_condition( + filter_chat[1], False, ["key_remote_jid", "jid_group.raw_string"], "jid_global", "android") cursor.execute(f"""SELECT jid_global.raw_string as key_remote_jid, message._id, @@ -292,19 +302,20 @@ def _process_single_message(data, content, table_message, timezone_offset): """Process a single message row.""" if content["key_remote_jid"] is None: return - + # Get or create the chat if not data.get_chat(content["key_remote_jid"]): - current_chat = data.add_chat(content["key_remote_jid"], ChatStore(Device.ANDROID, content["chat_subject"])) + current_chat = data.add_chat(content["key_remote_jid"], ChatStore( + Device.ANDROID, content["chat_subject"])) else: current_chat = data.get_chat(content["key_remote_jid"]) - + # Determine sender_jid_row_id if "sender_jid_row_id" in content: sender_jid_row_id = content["sender_jid_row_id"] else: sender_jid_row_id = None - + # Create message object message = Message( from_me=not sender_jid_row_id and content["key_from_me"], @@ -316,19 +327,19 @@ def _process_single_message(data, content, table_message, timezone_offset): received_timestamp=content["received_timestamp"], read_timestamp=content["read_timestamp"] ) - + # Handle binary data if isinstance(content["data"], bytes): _process_binary_message(message, content) current_chat.add_message(content["_id"], message) return - + # Set sender for group chats if content["jid_type"] == JidType.GROUP and content["key_from_me"] == 0: _set_group_sender(message, content, data, table_message) else: message.sender = None - + # Handle quoted messages if content["quoted"] is not None: message.reply = content["quoted"] @@ -338,7 +349,7 @@ def _process_single_message(data, content, table_message, timezone_offset): message.quoted_data = content["quoted_data"] else: message.reply = None - + # Handle message caption if not table_message and content["media_caption"] is not None: # Old schema @@ -348,14 +359,14 @@ def _process_single_message(data, content, table_message, timezone_offset): message.caption = content["data"] else: message.caption = None - + # Handle message content based on status if content["status"] == 6: # 6 = Metadata _process_metadata_message(message, content, data, table_message) else: # Real message _process_regular_message(message, content, table_message) - + current_chat.add_message(content["_id"], message) @@ -385,7 +396,7 @@ def _set_group_sender(message, content, data, table_message): name = data.get_chat(content["remote_resource"]).name if "@" in content["remote_resource"]: fallback = content["remote_resource"].split('@')[0] - + message.sender = name or fallback @@ -393,7 +404,7 @@ def _process_metadata_message(message, content, data, table_message): """Process metadata message.""" message.meta = True name = fallback = None - + if table_message: if content["sender_jid_row_id"] > 0: _jid = content["group_sender_jid"] @@ -412,12 +423,12 @@ def _process_metadata_message(message, content, data, table_message): fallback = _jid.split('@')[0] else: name = "You" - + message.data = determine_metadata(content, name or fallback) - + if isinstance(message.data, str) and "
" in message.data: message.safe = True - + if message.data is None: if content["video_call"] is not None: # Missed call message.meta = True @@ -433,7 +444,7 @@ def _process_metadata_message(message, content, data, table_message): def _process_regular_message(message, content, table_message): """Process regular (non-metadata) message.""" message.sticker = content["media_wa_type"] == 20 # Sticker is a message - + if content["key_from_me"] == 1: if content["status"] == 5 and content["edit_version"] == 7 or table_message and content["media_wa_type"] == 15: msg = "Message deleted" @@ -458,7 +469,7 @@ def _process_regular_message(message, content, table_message): msg = content["data"] if msg is not None: msg = _format_message_text(msg) - + message.data = msg @@ -474,7 +485,7 @@ def _format_message_text(text): def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=True): """ Process WhatsApp media files from the database. - + Args: db: Database connection data: Data store object @@ -487,28 +498,28 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa c = db.cursor() total_row_number = _get_media_count(c, filter_empty, filter_date, filter_chat) logger.info(f"Processing media...(0/{total_row_number})\r") - + try: content_cursor = _get_media_cursor_legacy(c, filter_empty, filter_date, filter_chat) except sqlite3.OperationalError: content_cursor = _get_media_cursor_new(c, filter_empty, filter_date, filter_chat) - + content = content_cursor.fetchone() mime = MimeTypes() - + # Ensure thumbnails directory exists Path(f"{media_folder}/thumbnails").mkdir(parents=True, exist_ok=True) - + i = 0 while content is not None: _process_single_media(data, content, media_folder, mime, separate_media) - + i += 1 if i % 100 == 0: logger.info(f"Processing media...({i}/{total_row_number})\r") - + content = content_cursor.fetchone() - + logger.info(f"Processed {total_row_number} media{CLEAR_LINE}") @@ -519,8 +530,10 @@ def _get_media_count(cursor, filter_empty, filter_date, filter_chat): try: empty_filter = get_cond_for_empty(filter_empty, "key_remote_jid", "messages.needs_push") date_filter = f'AND messages.timestamp {filter_date}' if filter_date is not None else '' - include_filter = get_chat_condition(filter_chat[0], True, ["messages.key_remote_jid", "remote_resource"], "jid", "android") - exclude_filter = get_chat_condition(filter_chat[1], False, ["messages.key_remote_jid", "remote_resource"], "jid", "android") + include_filter = get_chat_condition( + filter_chat[0], True, ["messages.key_remote_jid", "remote_resource"], "jid", "android") + exclude_filter = get_chat_condition( + filter_chat[1], False, ["messages.key_remote_jid", "remote_resource"], "jid", "android") cursor.execute(f"""SELECT count() FROM message_media @@ -538,8 +551,10 @@ def _get_media_count(cursor, filter_empty, filter_date, filter_chat): except sqlite3.OperationalError: empty_filter = get_cond_for_empty(filter_empty, "jid.raw_string", "broadcast") date_filter = f'AND message.timestamp {filter_date}' if filter_date is not None else '' - include_filter = get_chat_condition(filter_chat[0], True, ["jid.raw_string", "jid_group.raw_string"], "jid", "android") - exclude_filter = get_chat_condition(filter_chat[1], False, ["jid.raw_string", "jid_group.raw_string"], "jid", "android") + include_filter = get_chat_condition( + filter_chat[0], True, ["jid.raw_string", "jid_group.raw_string"], "jid", "android") + exclude_filter = get_chat_condition( + filter_chat[1], False, ["jid.raw_string", "jid_group.raw_string"], "jid", "android") cursor.execute(f"""SELECT count() FROM message_media @@ -563,8 +578,10 @@ def _get_media_cursor_legacy(cursor, filter_empty, filter_date, filter_chat): """Get cursor for legacy media database schema.""" empty_filter = get_cond_for_empty(filter_empty, "key_remote_jid", "broadcast") date_filter = f'AND messages.timestamp {filter_date}' if filter_date is not None else '' - include_filter = get_chat_condition(filter_chat[0], True, ["messages.key_remote_jid", "remote_resource"], "jid", "android") - exclude_filter = get_chat_condition(filter_chat[1], False, ["messages.key_remote_jid", "remote_resource"], "jid", "android") + include_filter = get_chat_condition( + filter_chat[0], True, ["messages.key_remote_jid", "remote_resource"], "jid", "android") + exclude_filter = get_chat_condition( + filter_chat[1], False, ["messages.key_remote_jid", "remote_resource"], "jid", "android") cursor.execute(f"""SELECT messages.key_remote_jid, message_row_id, @@ -596,8 +613,10 @@ def _get_media_cursor_new(cursor, filter_empty, filter_date, filter_chat): """Get cursor for new media database schema.""" empty_filter = get_cond_for_empty(filter_empty, "key_remote_jid", "broadcast") date_filter = f'AND message.timestamp {filter_date}' if filter_date is not None else '' - include_filter = get_chat_condition(filter_chat[0], True, ["key_remote_jid", "jid_group.raw_string"], "jid", "android") - exclude_filter = get_chat_condition(filter_chat[1], False, ["key_remote_jid", "jid_group.raw_string"], "jid", "android") + include_filter = get_chat_condition( + filter_chat[0], True, ["key_remote_jid", "jid_group.raw_string"], "jid", "android") + exclude_filter = get_chat_condition( + filter_chat[1], False, ["key_remote_jid", "jid_group.raw_string"], "jid", "android") cursor.execute(f"""SELECT jid.raw_string as key_remote_jid, message_row_id, @@ -633,10 +652,10 @@ def _process_single_media(data, content, media_folder, mime, separate_media): current_chat = data.get_chat(content["key_remote_jid"]) message = current_chat.get_message(content["message_row_id"]) message.media = True - + if os.path.isfile(file_path): message.data = file_path - + # Set mime type if content["mime_type"] is None: guess = mime.guess_type(file_path)[0] @@ -646,11 +665,11 @@ def _process_single_media(data, content, media_folder, mime, separate_media): message.mime = "application/octet-stream" else: message.mime = content["mime_type"] - + # Copy media to separate folder if needed if separate_media: - chat_display_name = slugify(current_chat.name or message.sender - or content["key_remote_jid"].split('@')[0], True) + chat_display_name = slugify(current_chat.name or message.sender + or content["key_remote_jid"].split('@')[0], True) current_filename = file_path.split("/")[-1] new_folder = os.path.join(media_folder, "separated", chat_display_name) Path(new_folder).mkdir(parents=True, exist_ok=True) @@ -661,7 +680,7 @@ def _process_single_media(data, content, media_folder, mime, separate_media): message.data = "The media is missing" message.mime = "media" message.meta = True - + # Handle thumbnail if content["thumbnail"] is not None: thumb_path = f"{media_folder}/thumbnails/{b64decode(content['file_hash']).hex()}.png" @@ -681,11 +700,11 @@ def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty): total_row_number = len(rows) logger.info(f"Processing vCards...(0/{total_row_number})\r") - + # Create vCards directory if it doesn't exist path = os.path.join(media_folder, "vCards") Path(path).mkdir(parents=True, exist_ok=True) - + for index, row in enumerate(rows): _process_vcard_row(row, path, data) logger.info(f"Processing vCards...({index + 1}/{total_row_number})\r") @@ -696,8 +715,10 @@ def _execute_vcard_query_modern(c, filter_date, filter_chat, filter_empty): """Execute vCard query for modern WhatsApp database schema.""" # Build the filter conditions - chat_filter_include = get_chat_condition(filter_chat[0], True, ["messages.key_remote_jid", "remote_resource"], "jid", "android") - chat_filter_exclude = get_chat_condition(filter_chat[1], False, ["messages.key_remote_jid", "remote_resource"], "jid", "android") + chat_filter_include = get_chat_condition( + filter_chat[0], True, ["messages.key_remote_jid", "remote_resource"], "jid", "android") + chat_filter_exclude = get_chat_condition( + filter_chat[1], False, ["messages.key_remote_jid", "remote_resource"], "jid", "android") date_filter = f'AND messages.timestamp {filter_date}' if filter_date is not None else '' empty_filter = get_cond_for_empty(filter_empty, "key_remote_jid", "messages.needs_push") @@ -726,8 +747,10 @@ def _execute_vcard_query_legacy(c, filter_date, filter_chat, filter_empty): """Execute vCard query for legacy WhatsApp database schema.""" # Build the filter conditions - chat_filter_include = get_chat_condition(filter_chat[0], True, ["key_remote_jid", "jid_group.raw_string"], "jid", "android") - chat_filter_exclude = get_chat_condition(filter_chat[1], False, ["key_remote_jid", "jid_group.raw_string"], "jid", "android") + chat_filter_include = get_chat_condition( + filter_chat[0], True, ["key_remote_jid", "jid_group.raw_string"], "jid", "android") + chat_filter_exclude = get_chat_condition( + filter_chat[1], False, ["key_remote_jid", "jid_group.raw_string"], "jid", "android") date_filter = f'AND message.timestamp {filter_date}' if filter_date is not None else '' empty_filter = get_cond_for_empty(filter_empty, "key_remote_jid", "broadcast") @@ -760,11 +783,11 @@ def _process_vcard_row(row, path, data): file_name = "".join(x for x in media_name if x.isalnum()) file_name = file_name.encode('utf-8')[:230].decode('utf-8', 'ignore') file_path = os.path.join(path, f"{file_name}.vcf") - + if not os.path.isfile(file_path): with open(file_path, "w", encoding="utf-8") as f: f.write(row["vcard"]) - + message = data.get_chat(row["key_remote_jid"]).get_message(row["message_row_id"]) message.data = "This media include the following vCard file(s):
" \ f'{htmle(media_name)}' @@ -776,26 +799,26 @@ def _process_vcard_row(row, path, data): def calls(db, data, timezone_offset, filter_chat): """Process call logs from WhatsApp database.""" c = db.cursor() - + # Check if there are any calls that match the filter total_row_number = _get_calls_count(c, filter_chat) if total_row_number == 0: return - + logger.info(f"Processing calls...({total_row_number})\r") - + # Fetch call data calls_data = _fetch_calls_data(c, filter_chat) - + # Create a chat store for all calls chat = ChatStore(Device.ANDROID, "WhatsApp Calls") - + # Process each call content = calls_data.fetchone() while content is not None: _process_call_record(content, chat, data, timezone_offset) content = calls_data.fetchone() - + # Add the calls chat to the data data.add_chat("000000000000000", chat) logger.info(f"Processed {total_row_number} calls{CLEAR_LINE}") @@ -861,7 +884,7 @@ def _process_call_record(content, chat, data, timezone_offset): received_timestamp=None, # TODO: Add timestamp read_timestamp=None # TODO: Add timestamp ) - + # Get caller/callee name _jid = content["raw_string"] name = data.get_chat(_jid).name if _jid in data else content["chat_subject"] or None @@ -870,13 +893,13 @@ def _process_call_record(content, chat, data, timezone_offset): else: fallback = None call.sender = name or fallback - + # Set metadata call.meta = True - + # Construct call description based on call type and result call.data = _construct_call_description(content, call) - + # Add call to chat chat.add_message(content["_id"], call) @@ -888,7 +911,7 @@ def _construct_call_description(content, call): f"call {'to' if call.from_me else 'from'} " f"{call.sender} was " ) - + if content['call_result'] in (0, 4, 7): description += "cancelled." if call.from_me else "missed." elif content['call_result'] == 2: @@ -904,21 +927,21 @@ def _construct_call_description(content, call): ) else: description += "in an unknown state." - + return description def create_html( - data, - output_folder, - template=None, - embedded=False, - offline_static=False, - maximum_size=None, - no_avatar=False, - experimental=False, - headline=None - ): + data, + output_folder, + template=None, + embedded=False, + offline_static=False, + maximum_size=None, + no_avatar=False, + experimental=False, + headline=None +): """Generate HTML chat files from data.""" template = setup_template(template, no_avatar, experimental) @@ -936,33 +959,33 @@ def create_html( if len(current_chat) == 0: # Skip empty chats continue - + safe_file_name, name = get_file_name(contact, current_chat) if maximum_size is not None: _generate_paginated_chat( - current_chat, - safe_file_name, - name, - contact, - output_folder, - template, - w3css, - maximum_size, + current_chat, + safe_file_name, + name, + contact, + output_folder, + template, + w3css, + maximum_size, headline ) else: _generate_single_chat( - current_chat, - safe_file_name, - name, - contact, - output_folder, - template, - w3css, + current_chat, + safe_file_name, + name, + contact, + output_folder, + template, + w3css, headline ) - + if current % 10 == 0: logger.info(f"Generating chats...({current}/{total_row_number})\r") @@ -990,20 +1013,20 @@ def _generate_paginated_chat(current_chat, safe_file_name, name, contact, output current_size = 0 current_page = 1 render_box = [] - + # Use default maximum size if set to 0 if maximum_size == 0: maximum_size = MAX_SIZE - + last_msg = current_chat.get_last_message().key_id - + for message in current_chat.values(): # Calculate message size if message.data is not None and not message.meta and not message.media: current_size += len(message.data) + ROW_SIZE else: current_size += ROW_SIZE + 100 # Assume media and meta HTML are 100 bytes - + if current_size > maximum_size: # Create a new page output_file_name = f"{output_folder}/{safe_file_name}-{current_page}.html" @@ -1047,25 +1070,25 @@ def _generate_paginated_chat(current_chat, safe_file_name, name, contact, output def create_txt(data, output): """Generate text files from chat data.""" os.makedirs(output, exist_ok=True) - + for jik, chat in data.items(): if len(chat) == 0: continue - + # Determine file name if chat.name is not None: contact = chat.name.replace('/', '') else: contact = jik.replace('+', '') - + output_file = os.path.join(output, f"{contact}.txt") - + with open(output_file, "w", encoding="utf8") as f: for message in chat.values(): # Skip metadata in text format if message.meta and message.mime != "media": continue - + # Format the message formatted_message = _format_message_for_txt(message, contact) f.write(f"{formatted_message}\n") @@ -1074,16 +1097,16 @@ def create_txt(data, output): def _format_message_for_txt(message, contact): """Format a message for text output.""" date = datetime.fromtimestamp(message.timestamp).date() - + # Determine the sender name if message.from_me: name = "You" else: name = message.sender if message.sender else contact - + prefix = f"[{date} {message.time}] {name}: " prefix_length = len(prefix) - + # Handle different message types if message.media and ("/" in message.mime or message.mime == "media"): if message.data == "The media is missing": @@ -1095,9 +1118,9 @@ def _format_message_for_txt(message, contact): message_text = "" else: message_text = message.data.replace('
', f'\n{" " * prefix_length}') - + # Add caption if present if message.caption is not None: message_text += "\n" + ' ' * len(prefix) + message.caption.replace('
', f'\n{" " * prefix_length}') - + return f"{prefix}{message_text}" diff --git a/Whatsapp_Chat_Exporter/bplist.py b/Whatsapp_Chat_Exporter/bplist.py index 390fe6e..4ca572a 100644 --- a/Whatsapp_Chat_Exporter/bplist.py +++ b/Whatsapp_Chat_Exporter/bplist.py @@ -24,31 +24,32 @@ import struct import codecs from datetime import datetime, timedelta + class BPListWriter(object): def __init__(self, objects): self.bplist = "" self.objects = objects - + def binary(self): '''binary -> string - + Generates bplist ''' self.data = 'bplist00' - + # TODO: flatten objects and count max length size - + # TODO: write objects and save offsets - + # TODO: write offsets - + # TODO: write metadata - + return self.data - + def write(self, filename): ''' - + Writes bplist to file ''' if self.bplist != "": @@ -57,18 +58,19 @@ class BPListWriter(object): else: raise Exception('BPlist not yet generated') + class BPListReader(object): def __init__(self, s): self.data = s self.objects = [] self.resolved = {} - + def __unpackIntStruct(self, sz, s): '''__unpackIntStruct(size, string) -> int - + Unpacks the integer of given size (1, 2 or 4 bytes) from string ''' - if sz == 1: + if sz == 1: ot = '!B' elif sz == 2: ot = '!H' @@ -79,17 +81,17 @@ class BPListReader(object): else: raise Exception('int unpack size '+str(sz)+' unsupported') return struct.unpack(ot, s)[0] - + def __unpackInt(self, offset): '''__unpackInt(offset) -> int - + Unpacks int field from plist at given offset ''' return self.__unpackIntMeta(offset)[1] def __unpackIntMeta(self, offset): '''__unpackIntMeta(offset) -> (size, int) - + Unpacks int field from plist at given offset and returns its size and value ''' obj_header = self.data[offset] @@ -99,7 +101,7 @@ class BPListReader(object): def __resolveIntSize(self, obj_info, offset): '''__resolveIntSize(obj_info, offset) -> (count, offset) - + Calculates count of objref* array entries and returns count and offset to first element ''' if obj_info == 0x0F: @@ -112,10 +114,10 @@ class BPListReader(object): def __unpackFloatStruct(self, sz, s): '''__unpackFloatStruct(size, string) -> float - + Unpacks the float of given size (4 or 8 bytes) from string ''' - if sz == 4: + if sz == 4: ot = '!f' elif sz == 8: ot = '!d' @@ -125,7 +127,7 @@ class BPListReader(object): def __unpackFloat(self, offset): '''__unpackFloat(offset) -> float - + Unpacks float field from plist at given offset ''' obj_header = self.data[offset] @@ -135,70 +137,79 @@ class BPListReader(object): def __unpackDate(self, offset): td = int(struct.unpack(">d", self.data[offset+1:offset+9])[0]) - return datetime(year=2001,month=1,day=1) + timedelta(seconds=td) + return datetime(year=2001, month=1, day=1) + timedelta(seconds=td) def __unpackItem(self, offset): '''__unpackItem(offset) - + Unpacks and returns an item from plist ''' obj_header = self.data[offset] obj_type, obj_info = (obj_header & 0xF0), (obj_header & 0x0F) - if obj_type == 0x00: - if obj_info == 0x00: # null 0000 0000 + if obj_type == 0x00: + if obj_info == 0x00: # null 0000 0000 return None - elif obj_info == 0x08: # bool 0000 1000 // false + elif obj_info == 0x08: # bool 0000 1000 // false return False - elif obj_info == 0x09: # bool 0000 1001 // true + elif obj_info == 0x09: # bool 0000 1001 // true return True - elif obj_info == 0x0F: # fill 0000 1111 // fill byte - raise Exception("0x0F Not Implemented") # this is really pad byte, FIXME + elif obj_info == 0x0F: # fill 0000 1111 // fill byte + raise Exception("0x0F Not Implemented") # this is really pad byte, FIXME else: - raise Exception('unpack item type '+str(obj_header)+' at '+str(offset)+ 'failed') - elif obj_type == 0x10: # int 0001 nnnn ... // # of bytes is 2^nnnn, big-endian bytes + raise Exception('unpack item type '+str(obj_header)+' at '+str(offset) + 'failed') + elif obj_type == 0x10: # int 0001 nnnn ... // # of bytes is 2^nnnn, big-endian bytes return self.__unpackInt(offset) - elif obj_type == 0x20: # real 0010 nnnn ... // # of bytes is 2^nnnn, big-endian bytes + elif obj_type == 0x20: # real 0010 nnnn ... // # of bytes is 2^nnnn, big-endian bytes return self.__unpackFloat(offset) - elif obj_type == 0x30: # date 0011 0011 ... // 8 byte float follows, big-endian bytes + elif obj_type == 0x30: # date 0011 0011 ... // 8 byte float follows, big-endian bytes return self.__unpackDate(offset) - elif obj_type == 0x40: # data 0100 nnnn [int] ... // nnnn is number of bytes unless 1111 then int count follows, followed by bytes + # data 0100 nnnn [int] ... // nnnn is number of bytes unless 1111 then int count follows, followed by bytes + elif obj_type == 0x40: obj_count, objref = self.__resolveIntSize(obj_info, offset) - return self.data[objref:objref+obj_count] # XXX: we return data as str - elif obj_type == 0x50: # string 0101 nnnn [int] ... // ASCII string, nnnn is # of chars, else 1111 then int count, then bytes + return self.data[objref:objref+obj_count] # XXX: we return data as str + # string 0101 nnnn [int] ... // ASCII string, nnnn is # of chars, else 1111 then int count, then bytes + elif obj_type == 0x50: obj_count, objref = self.__resolveIntSize(obj_info, offset) return self.data[objref:objref+obj_count] - elif obj_type == 0x60: # string 0110 nnnn [int] ... // Unicode string, nnnn is # of chars, else 1111 then int count, then big-endian 2-byte uint16_t + # string 0110 nnnn [int] ... // Unicode string, nnnn is # of chars, else 1111 then int count, then big-endian 2-byte uint16_t + elif obj_type == 0x60: obj_count, objref = self.__resolveIntSize(obj_info, offset) return self.data[objref:objref+obj_count*2].decode('utf-16be') - elif obj_type == 0x80: # uid 1000 nnnn ... // nnnn+1 is # of bytes + elif obj_type == 0x80: # uid 1000 nnnn ... // nnnn+1 is # of bytes # FIXME: Accept as a string for now obj_count, objref = self.__resolveIntSize(obj_info, offset) return self.data[objref:objref+obj_count] - elif obj_type == 0xA0: # array 1010 nnnn [int] objref* // nnnn is count, unless '1111', then int count follows + # array 1010 nnnn [int] objref* // nnnn is count, unless '1111', then int count follows + elif obj_type == 0xA0: obj_count, objref = self.__resolveIntSize(obj_info, offset) arr = [] for i in range(obj_count): - arr.append(self.__unpackIntStruct(self.object_ref_size, self.data[objref+i*self.object_ref_size:objref+i*self.object_ref_size+self.object_ref_size])) + arr.append(self.__unpackIntStruct( + self.object_ref_size, self.data[objref+i*self.object_ref_size:objref+i*self.object_ref_size+self.object_ref_size])) return arr - elif obj_type == 0xC0: # set 1100 nnnn [int] objref* // nnnn is count, unless '1111', then int count follows + # set 1100 nnnn [int] objref* // nnnn is count, unless '1111', then int count follows + elif obj_type == 0xC0: # XXX: not serializable via apple implementation - raise Exception("0xC0 Not Implemented") # FIXME: implement - elif obj_type == 0xD0: # dict 1101 nnnn [int] keyref* objref* // nnnn is count, unless '1111', then int count follows + raise Exception("0xC0 Not Implemented") # FIXME: implement + # dict 1101 nnnn [int] keyref* objref* // nnnn is count, unless '1111', then int count follows + elif obj_type == 0xD0: obj_count, objref = self.__resolveIntSize(obj_info, offset) keys = [] for i in range(obj_count): - keys.append(self.__unpackIntStruct(self.object_ref_size, self.data[objref+i*self.object_ref_size:objref+i*self.object_ref_size+self.object_ref_size])) + keys.append(self.__unpackIntStruct( + self.object_ref_size, self.data[objref+i*self.object_ref_size:objref+i*self.object_ref_size+self.object_ref_size])) values = [] objref += obj_count*self.object_ref_size for i in range(obj_count): - values.append(self.__unpackIntStruct(self.object_ref_size, self.data[objref+i*self.object_ref_size:objref+i*self.object_ref_size+self.object_ref_size])) + values.append(self.__unpackIntStruct( + self.object_ref_size, self.data[objref+i*self.object_ref_size:objref+i*self.object_ref_size+self.object_ref_size])) dic = {} for i in range(obj_count): dic[keys[i]] = values[i] return dic else: raise Exception('don\'t know how to unpack obj type '+hex(obj_type)+' at '+str(offset)) - + def __resolveObject(self, idx): try: return self.resolved[idx] @@ -212,7 +223,7 @@ class BPListReader(object): return newArr if type(obj) == dict: newDic = {} - for k,v in obj.items(): + for k, v in obj.items(): key_resolved = self.__resolveObject(k) if isinstance(key_resolved, str): rk = key_resolved @@ -225,15 +236,16 @@ class BPListReader(object): else: self.resolved[idx] = obj return obj - + def parse(self): # read header if self.data[:8] != b'bplist00': raise Exception('Bad magic') - + # read trailer - self.offset_size, self.object_ref_size, self.number_of_objects, self.top_object, self.table_offset = struct.unpack('!6xBB4xI4xI4xI', self.data[-32:]) - #print "** plist offset_size:",self.offset_size,"objref_size:",self.object_ref_size,"num_objs:",self.number_of_objects,"top:",self.top_object,"table_ofs:",self.table_offset + self.offset_size, self.object_ref_size, self.number_of_objects, self.top_object, self.table_offset = struct.unpack( + '!6xBB4xI4xI4xI', self.data[-32:]) + # print "** plist offset_size:",self.offset_size,"objref_size:",self.object_ref_size,"num_objs:",self.number_of_objects,"top:",self.top_object,"table_ofs:",self.table_offset # read offset table self.offset_table = self.data[self.table_offset:-32] @@ -243,40 +255,45 @@ class BPListReader(object): offset_entry = ot[:self.offset_size] ot = ot[self.offset_size:] self.offsets.append(self.__unpackIntStruct(self.offset_size, offset_entry)) - #print "** plist offsets:",self.offsets - + # print "** plist offsets:",self.offsets + # read object table self.objects = [] k = 0 for i in self.offsets: obj = self.__unpackItem(i) - #print "** plist unpacked",k,type(obj),obj,"at",i + # print "** plist unpacked",k,type(obj),obj,"at",i k += 1 self.objects.append(obj) - + # rebuild object tree - #for i in range(len(self.objects)): + # for i in range(len(self.objects)): # self.__resolveObject(i) - + # return root object return self.__resolveObject(self.top_object) - + @classmethod def plistWithString(cls, s): parser = cls(s) return parser.parse() # helpers for testing + + def plist(obj): from Foundation import NSPropertyListSerialization, NSPropertyListBinaryFormat_v1_0 - b = NSPropertyListSerialization.dataWithPropertyList_format_options_error_(obj, NSPropertyListBinaryFormat_v1_0, 0, None) + b = NSPropertyListSerialization.dataWithPropertyList_format_options_error_( + obj, NSPropertyListBinaryFormat_v1_0, 0, None) return str(b.bytes()) + def unplist(s): from Foundation import NSData, NSPropertyListSerialization d = NSData.dataWithBytes_length_(s, len(s)) return NSPropertyListSerialization.propertyListWithData_options_format_error_(d, 0, None, None) + if __name__ == "__main__": import os import sys diff --git a/Whatsapp_Chat_Exporter/exported_handler.py b/Whatsapp_Chat_Exporter/exported_handler.py index 4a81adb..9e53c23 100644 --- a/Whatsapp_Chat_Exporter/exported_handler.py +++ b/Whatsapp_Chat_Exporter/exported_handler.py @@ -14,12 +14,12 @@ logger = logging.getLogger(__name__) def messages(path, data, assume_first_as_me=False): """ Extracts messages from an exported WhatsApp chat file. - + Args: path: Path to the exported chat file data: Data container object to store the parsed chat assume_first_as_me: If True, assumes the first message is sent from the user without asking - + Returns: Updated data container with extracted messages """ @@ -27,16 +27,16 @@ def messages(path, data, assume_first_as_me=False): chat = data.add_chat("ExportedChat", ChatStore(Device.EXPORTED)) you = "" # Will store the username of the current user user_identification_done = False # Flag to track if user identification has been done - + # First pass: count total lines for progress reporting with open(path, "r", encoding="utf8") as file: total_row_number = sum(1 for _ in file) - + # Second pass: process the messages with open(path, "r", encoding="utf8") as file: for index, line in enumerate(file): you, user_identification_done = process_line( - line, index, chat, path, you, + line, index, chat, path, you, assume_first_as_me, user_identification_done ) @@ -51,31 +51,31 @@ def messages(path, data, assume_first_as_me=False): def process_line(line, index, chat, file_path, you, assume_first_as_me, user_identification_done): """ Process a single line from the chat file - + Returns: Tuple of (updated_you_value, updated_user_identification_done_flag) """ parts = line.split(" - ", 1) - + # Check if this is a new message (has timestamp format) if len(parts) > 1: time = parts[0] you, user_identification_done = process_new_message( - time, parts[1], index, chat, you, file_path, + time, parts[1], index, chat, you, file_path, assume_first_as_me, user_identification_done ) else: # This is a continuation of the previous message process_message_continuation(line, index, chat) - + return you, user_identification_done -def process_new_message(time, content, index, chat, you, file_path, +def process_new_message(time, content, index, chat, you, file_path, assume_first_as_me, user_identification_done): """ Process a line that contains a new message - + Returns: Tuple of (updated_you_value, updated_user_identification_done_flag) """ @@ -88,7 +88,7 @@ def process_new_message(time, content, index, chat, you, file_path, received_timestamp=None, read_timestamp=None ) - + # Check if this is a system message (no name:message format) if ":" not in content: msg.data = content @@ -96,7 +96,7 @@ def process_new_message(time, content, index, chat, you, file_path, else: # Process user message name, message = content.strip().split(":", 1) - + # Handle user identification if you == "": if chat.name is None: @@ -113,17 +113,17 @@ def process_new_message(time, content, index, chat, you, file_path, # If we know the chat name, anyone else must be "you" if name != chat.name: you = name - + # Set the chat name if needed if chat.name is None and name != you: chat.name = name - + # Determine if this message is from the current user msg.from_me = (name == you) - + # Process message content process_message_content(msg, message, file_path) - + chat.add_message(index, msg) return you, user_identification_done @@ -144,11 +144,11 @@ def process_attached_file(msg, message, file_path): """Process an attached file in a message""" mime = MimeTypes() msg.media = True - + # Extract file path and check if it exists file_name = message.split("(file attached)")[0].strip() attached_file_path = os.path.join(os.path.dirname(file_path), file_name) - + if os.path.isfile(attached_file_path): msg.data = attached_file_path guess = mime.guess_type(attached_file_path)[0] @@ -165,9 +165,9 @@ def process_message_continuation(line, index, chat): lookback = index - 1 while lookback not in chat.keys(): lookback -= 1 - + msg = chat.get_message(lookback) - + # Add the continuation line to the message if msg.media: msg.caption = line.strip() @@ -182,4 +182,4 @@ def prompt_for_user_identification(name): if ans == "y": return name elif ans == "n": - return "" \ No newline at end of file + return "" diff --git a/Whatsapp_Chat_Exporter/ios_handler.py b/Whatsapp_Chat_Exporter/ios_handler.py index a833c7f..14a43e0 100644 --- a/Whatsapp_Chat_Exporter/ios_handler.py +++ b/Whatsapp_Chat_Exporter/ios_handler.py @@ -21,14 +21,14 @@ def contacts(db, data): c.execute("""SELECT count() FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""") total_row_number = c.fetchone()[0] logger.info(f"Pre-processing contacts...({total_row_number})\r") - + c.execute("""SELECT ZWHATSAPPID, ZABOUTTEXT FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""") content = c.fetchone() while content is not None: zwhatsapp_id = content["ZWHATSAPPID"] if not zwhatsapp_id.endswith("@s.whatsapp.net"): zwhatsapp_id += "@s.whatsapp.net" - + current_chat = ChatStore(Device.IOS) current_chat.status = content["ZABOUTTEXT"] data.add_chat(zwhatsapp_id, current_chat) @@ -40,7 +40,7 @@ def process_contact_avatars(current_chat, media_folder, contact_id): """Process and assign avatar images for a contact.""" path = f'{media_folder}/Media/Profile/{contact_id.split("@")[0]}' avatars = glob(f"{path}*") - + if 0 < len(avatars) <= 1: current_chat.their_avatar = avatars[0] else: @@ -64,12 +64,14 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, """Process WhatsApp messages and contacts from the database.""" c = db.cursor() cursor2 = db.cursor() - + # Build the chat filter conditions - chat_filter_include = get_chat_condition(filter_chat[0], True, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios") - chat_filter_exclude = get_chat_condition(filter_chat[1], False, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios") + chat_filter_include = get_chat_condition( + filter_chat[0], True, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios") + chat_filter_exclude = get_chat_condition( + filter_chat[1], False, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios") date_filter = f'AND ZMESSAGEDATE {filter_date}' if filter_date is not None else '' - + # Process contacts first contact_query = f""" SELECT count() @@ -110,13 +112,13 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, GROUP BY ZCONTACTJID; """ c.execute(contacts_query) - + # Process each contact content = c.fetchone() while content is not None: contact_name = get_contact_name(content) contact_id = content["ZCONTACTJID"] - + # Add or update chat if contact_id not in data: current_chat = data.add_chat(contact_id, ChatStore(Device.IOS, contact_name, media_folder)) @@ -124,11 +126,11 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, current_chat = data.get_chat(contact_id) current_chat.name = contact_name current_chat.my_avatar = os.path.join(media_folder, "Media/Profile/Photo.jpg") - + # Process avatar images process_contact_avatars(current_chat, media_folder, contact_id) content = c.fetchone() - + logger.info(f"Processed {total_row_number} contacts{CLEAR_LINE}") # Get message count @@ -147,7 +149,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, c.execute(message_count_query) total_row_number = c.fetchone()[0] logger.info(f"Processing messages...(0/{total_row_number})\r") - + # Fetch messages messages_query = f""" SELECT ZCONTACTJID, @@ -175,7 +177,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, ORDER BY ZMESSAGEDATE ASC; """ c.execute(messages_query) - + # Process each message i = 0 content = c.fetchone() @@ -183,14 +185,14 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, contact_id = content["ZCONTACTJID"] message_pk = content["Z_PK"] is_group_message = content["ZGROUPINFO"] is not None - + # Ensure chat exists if contact_id not in data: current_chat = data.add_chat(contact_id, ChatStore(Device.IOS)) process_contact_avatars(current_chat, media_folder, contact_id) else: current_chat = data.get_chat(contact_id) - + # Create message object ts = APPLE_TIME + content["ZMESSAGEDATE"] message = Message( @@ -203,14 +205,14 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, received_timestamp=APPLE_TIME + content["ZSENTDATE"] if content["ZSENTDATE"] else None, read_timestamp=None # TODO: Add timestamp ) - + # Process message data invalid = process_message_data(message, content, is_group_message, data, cursor2) - + # Add valid messages to chat if not invalid: current_chat.add_message(message_pk, message) - + # Update progress i += 1 if i % 1000 == 0: @@ -236,11 +238,11 @@ def process_message_data(message, content, is_group_message, data, cursor2): message.sender = name or fallback else: message.sender = None - + # Handle metadata messages if content["ZMESSAGETYPE"] == 6: return process_metadata_message(message, content, is_group_message) - + # Handle quoted replies if content["ZMETADATA"] is not None and content["ZMETADATA"].startswith(b"\x2a\x14") and False: quoted = content["ZMETADATA"][2:19] @@ -250,17 +252,17 @@ def process_message_data(message, content, is_group_message, data, cursor2): WHERE ZSTANZAID LIKE '{message.reply}%'""") quoted_content = cursor2.fetchone() if quoted_content and "ZTEXT" in quoted_content: - message.quoted_data = quoted_content["ZTEXT"] + message.quoted_data = quoted_content["ZTEXT"] else: message.quoted_data = None - + # Handle stickers if content["ZMESSAGETYPE"] == 15: message.sticker = True # Process message text process_message_text(message, content) - + return False # Message is valid @@ -305,19 +307,21 @@ def process_message_text(message, content): msg = content["ZTEXT"] if msg is not None: msg = msg.replace("\r\n", "
").replace("\n", "
") - + message.data = msg def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=False): """Process media files from WhatsApp messages.""" c = db.cursor() - + # Build filter conditions - chat_filter_include = get_chat_condition(filter_chat[0], True, ["ZWACHATSESSION.ZCONTACTJID","ZMEMBERJID"], "ZGROUPINFO", "ios") - chat_filter_exclude = get_chat_condition(filter_chat[1], False, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios") + chat_filter_include = get_chat_condition( + filter_chat[0], True, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios") + chat_filter_exclude = get_chat_condition( + filter_chat[1], False, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios") date_filter = f'AND ZMESSAGEDATE {filter_date}' if filter_date is not None else '' - + # Get media count media_count_query = f""" SELECT count() @@ -336,7 +340,7 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa c.execute(media_count_query) total_row_number = c.fetchone()[0] logger.info(f"Processing media...(0/{total_row_number})\r") - + # Fetch media items media_query = f""" SELECT ZCONTACTJID, @@ -360,14 +364,14 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa ORDER BY ZCONTACTJID ASC """ c.execute(media_query) - + # Process each media item mime = MimeTypes() i = 0 content = c.fetchone() while content is not None: process_media_item(content, data, media_folder, mime, separate_media) - + # Update progress i += 1 if i % 100 == 0: @@ -382,23 +386,24 @@ def process_media_item(content, data, media_folder, mime, separate_media): current_chat = data.get_chat(content["ZCONTACTJID"]) message = current_chat.get_message(content["ZMESSAGE"]) message.media = True - + if current_chat.media_base == "": current_chat.media_base = media_folder + "/" - + if os.path.isfile(file_path): message.data = '/'.join(file_path.split("/")[1:]) - + # Set MIME type if content["ZVCARDSTRING"] is None: guess = mime.guess_type(file_path)[0] message.mime = guess if guess is not None else "application/octet-stream" else: message.mime = content["ZVCARDSTRING"] - + # Handle separate media option if separate_media: - chat_display_name = slugify(current_chat.name or message.sender or content["ZCONTACTJID"].split('@')[0], True) + chat_display_name = slugify( + current_chat.name or message.sender or content["ZCONTACTJID"].split('@')[0], True) current_filename = file_path.split("/")[-1] new_folder = os.path.join(media_folder, "separated", chat_display_name) Path(new_folder).mkdir(parents=True, exist_ok=True) @@ -410,7 +415,7 @@ def process_media_item(content, data, media_folder, mime, separate_media): message.data = "The media is missing" message.mime = "media" message.meta = True - + # Add caption if available if content["ZTITLE"] is not None: message.caption = content["ZTITLE"] @@ -419,12 +424,14 @@ def process_media_item(content, data, media_folder, mime, separate_media): def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty): """Process vCard contacts from WhatsApp messages.""" c = db.cursor() - + # Build filter conditions - chat_filter_include = get_chat_condition(filter_chat[0], True, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios") - chat_filter_exclude = get_chat_condition(filter_chat[1], False, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios") + chat_filter_include = get_chat_condition( + filter_chat[0], True, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios") + chat_filter_exclude = get_chat_condition( + filter_chat[1], False, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios") date_filter = f'AND ZWAMESSAGE.ZMESSAGEDATE {filter_date}' if filter_date is not None else '' - + # Fetch vCard mentions vcard_query = f""" SELECT DISTINCT ZWAVCARDMENTION.ZMEDIAITEM, @@ -450,7 +457,7 @@ def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty): contents = c.fetchall() total_row_number = len(contents) logger.info(f"Processing vCards...(0/{total_row_number})\r") - + # Create vCards directory path = f'{media_folder}/Message/vCards' Path(path).mkdir(parents=True, exist_ok=True) @@ -484,9 +491,10 @@ def process_vcard_item(content, path, data): f.write(vcard_string) # Create vCard summary and update message - vcard_summary = "This media include the following vCard file(s):
" - vcard_summary += " | ".join([f'{htmle(name)}' for name, fp in zip(vcard_names, file_paths)]) - + vcard_summary = "This media include the following vCard file(s):
" + vcard_summary += " | ".join([f'{htmle(name)}' for name, + fp in zip(vcard_names, file_paths)]) + message = data.get_chat(content["ZCONTACTJID"]).get_message(content["ZMESSAGE"]) message.data = vcard_summary message.mime = "text/x-vcard" @@ -498,11 +506,13 @@ def process_vcard_item(content, path, data): def calls(db, data, timezone_offset, filter_chat): """Process WhatsApp call records.""" c = db.cursor() - + # Build filter conditions - chat_filter_include = get_chat_condition(filter_chat[0], True, ["ZGROUPCALLCREATORUSERJIDSTRING"], None, "ios") - chat_filter_exclude = get_chat_condition(filter_chat[1], False, ["ZGROUPCALLCREATORUSERJIDSTRING"], None, "ios") - + chat_filter_include = get_chat_condition( + filter_chat[0], True, ["ZGROUPCALLCREATORUSERJIDSTRING"], None, "ios") + chat_filter_exclude = get_chat_condition( + filter_chat[1], False, ["ZGROUPCALLCREATORUSERJIDSTRING"], None, "ios") + # Get call count call_count_query = f""" SELECT count() @@ -515,9 +525,9 @@ def calls(db, data, timezone_offset, filter_chat): total_row_number = c.fetchone()[0] if total_row_number == 0: return - + logger.info(f"Processed {total_row_number} calls{CLEAR_LINE}\n") - + # Fetch call records calls_query = f""" SELECT ZCALLIDSTRING, @@ -538,16 +548,16 @@ def calls(db, data, timezone_offset, filter_chat): {chat_filter_exclude} """ c.execute(calls_query) - + # Create calls chat chat = ChatStore(Device.ANDROID, "WhatsApp Calls") - + # Process each call content = c.fetchone() while content is not None: process_call_record(content, chat, data, timezone_offset) content = c.fetchone() - + # Add calls chat to data data.add_chat("000000000000000", chat) @@ -562,7 +572,7 @@ def process_call_record(content, chat, data, timezone_offset): key_id=content["ZCALLIDSTRING"], timezone_offset=timezone_offset if timezone_offset else CURRENT_TZ_OFFSET ) - + # Set sender info _jid = content["ZGROUPCALLCREATORUSERJIDSTRING"] name = data.get_chat(_jid).name if _jid in data else None @@ -571,11 +581,11 @@ def process_call_record(content, chat, data, timezone_offset): else: fallback = None call.sender = name or fallback - + # Set call metadata call.meta = True call.data = format_call_data(call, content) - + # Add call to chat chat.add_message(call.key_id, call) @@ -589,7 +599,7 @@ def format_call_data(call, content): f"call {'to' if call.from_me else 'from'} " f"{call.sender} was " ) - + # Call outcome if content['ZOUTCOME'] in (1, 4): call_data += "not answered." if call.from_me else "missed." @@ -604,5 +614,5 @@ def format_call_data(call, content): ) else: call_data += "in an unknown state." - - return call_data \ No newline at end of file + + return call_data diff --git a/Whatsapp_Chat_Exporter/ios_media_handler.py b/Whatsapp_Chat_Exporter/ios_media_handler.py index a3c26e2..bee7d9b 100644 --- a/Whatsapp_Chat_Exporter/ios_media_handler.py +++ b/Whatsapp_Chat_Exporter/ios_media_handler.py @@ -18,6 +18,7 @@ else: logger = logging.getLogger(__name__) + class BackupExtractor: """ A class to handle the extraction of WhatsApp data from iOS backups, @@ -61,9 +62,9 @@ class BackupExtractor: """ if not support_encrypted: logger.error("You don't have the dependencies to handle encrypted backup." - "Read more on how to deal with encrypted backup:" - "https://github.com/KnugiHK/Whatsapp-Chat-Exporter/blob/main/README.md#usage" - ) + "Read more on how to deal with encrypted backup:" + "https://github.com/KnugiHK/Whatsapp-Chat-Exporter/blob/main/README.md#usage" + ) return logger.info(f"Encryption detected on the backup!{CLEAR_LINE}") @@ -116,12 +117,12 @@ class BackupExtractor: exit(6) else: logger.info(f"Done{CLEAR_LINE}") - + def _extract_decrypted_files(self): """Extract all WhatsApp files after decryption""" def extract_progress_handler(file_id, domain, relative_path, n, total_files): if n % 100 == 0: - logger.info(f"Decrypting and extracting files...({n}/{total_files})\r") + logger.info(f"Decrypting and extracting files...({n}/{total_files})\r") return True self.backup.extract_files( @@ -234,4 +235,3 @@ def extract_media(base_dir, identifiers, decrypt_chunk_size): """ extractor = BackupExtractor(base_dir, identifiers, decrypt_chunk_size) extractor.extract() - diff --git a/Whatsapp_Chat_Exporter/utility.py b/Whatsapp_Chat_Exporter/utility.py index 08ea6b0..e656ddc 100644 --- a/Whatsapp_Chat_Exporter/utility.py +++ b/Whatsapp_Chat_Exporter/utility.py @@ -33,6 +33,7 @@ CLEAR_LINE = "\x1b[K\n" logger = logging.getLogger(__name__) + def convert_time_unit(time_second: int) -> str: """Converts a time duration in seconds to a human-readable string. @@ -168,7 +169,7 @@ def check_update(): "===============Update===============\n" "A newer version of WhatsApp Chat Exporter is available.\n" f"Current version: {__version__}\n" - f"Latest version: {package_info['info']['version']}\n" + f"Latest version: {package_info['info']['version']}\n" ) if platform == "win32": logger.info("Update with: pip install --upgrade whatsapp-chat-exporter\n") diff --git a/scripts/brazilian_number_processing.py b/scripts/brazilian_number_processing.py index bb51213..a42678c 100644 --- a/scripts/brazilian_number_processing.py +++ b/scripts/brazilian_number_processing.py @@ -6,19 +6,20 @@ Contributed by @magpires https://github.com/KnugiHK/WhatsApp-Chat-Exporter/issue import re import argparse + def process_phone_number(raw_phone): """ Process the raw phone string from the VCARD and return two formatted numbers: - The original formatted number, and - A modified formatted number with the extra (ninth) digit removed, if applicable. - + Desired output: For a number with a 9-digit subscriber: Original: "+55 {area} {first 5 of subscriber}-{last 4 of subscriber}" Modified: "+55 {area} {subscriber[1:5]}-{subscriber[5:]}" For example, for an input that should represent "027912345678", the outputs are: "+55 27 91234-5678" and "+55 27 1234-5678" - + This function handles numbers that may already include a "+55" prefix. It expects that after cleaning, a valid number (without the country code) should have either 10 digits (2 for area + 8 for subscriber) or 11 digits (2 for area + 9 for subscriber). @@ -26,18 +27,18 @@ def process_phone_number(raw_phone): """ # Store the original input for processing number_to_process = raw_phone.strip() - + # Remove all non-digit characters digits = re.sub(r'\D', '', number_to_process) - + # If the number starts with '55', remove it for processing if digits.startswith("55") and len(digits) > 11: digits = digits[2:] - + # Remove trunk zero if present if digits.startswith("0"): digits = digits[1:] - + # After cleaning, we expect a valid number to have either 10 or 11 digits # If there are extra digits, use the last 11 (for a 9-digit subscriber) or last 10 (for an 8-digit subscriber) if len(digits) > 11: @@ -46,7 +47,7 @@ def process_phone_number(raw_phone): elif len(digits) > 10 and len(digits) < 11: # In some cases with an 8-digit subscriber, take the last 10 digits digits = digits[-10:] - + # Check if we have a valid number after processing if len(digits) not in (10, 11): return None, None @@ -70,6 +71,7 @@ def process_phone_number(raw_phone): return original_formatted, modified_formatted + def process_vcard(input_vcard, output_vcard): """ Process a VCARD file to standardize telephone entries and add a second TEL line @@ -77,13 +79,13 @@ def process_vcard(input_vcard, output_vcard): """ with open(input_vcard, 'r', encoding='utf-8') as file: lines = file.readlines() - + output_lines = [] - + # Regex to capture any telephone line. # It matches lines starting with "TEL:" or "TEL;TYPE=..." or with prefixes like "item1.TEL:". phone_pattern = re.compile(r'^(?P.*TEL(?:;TYPE=[^:]+)?):(?P.*)$') - + for line in lines: stripped_line = line.rstrip("\n") match = phone_pattern.match(stripped_line) @@ -99,10 +101,11 @@ def process_vcard(input_vcard, output_vcard): output_lines.append(f"TEL;TYPE=CELL:{mod_formatted}\n") else: output_lines.append(line) - + with open(output_vcard, 'w', encoding='utf-8') as file: file.writelines(output_lines) + if __name__ == '__main__': parser = argparse.ArgumentParser( description="Process a VCARD file to standardize telephone entries and add a second TEL line with the modified number (removing the extra ninth digit) for contacts with 9-digit subscribers." @@ -110,6 +113,6 @@ if __name__ == '__main__': parser.add_argument('input_vcard', type=str, help='Input VCARD file') parser.add_argument('output_vcard', type=str, help='Output VCARD file') args = parser.parse_args() - + process_vcard(args.input_vcard, args.output_vcard) - print(f"VCARD processed and saved to {args.output_vcard}") \ No newline at end of file + print(f"VCARD processed and saved to {args.output_vcard}") diff --git a/scripts/bruteforce_crypt15.py b/scripts/bruteforce_crypt15.py index c8646d4..d4497ce 100644 --- a/scripts/bruteforce_crypt15.py +++ b/scripts/bruteforce_crypt15.py @@ -26,6 +26,7 @@ def _extract_encrypted_key(keyfile): return _generate_hmac_of_hmac(key_stream) + if __name__ == "__main__": key = open("encrypted_backup.key", "rb").read() database = open("wa.db.crypt15", "rb").read() diff --git a/tests/test_brazilian_number_processing.py b/tests/test_brazilian_number_processing.py index f7ccd27..3612481 100644 --- a/tests/test_brazilian_number_processing.py +++ b/tests/test_brazilian_number_processing.py @@ -6,11 +6,12 @@ from unittest.mock import patch from scripts.brazilian_number_processing import process_phone_number, process_vcard + class TestVCardProcessor(unittest.TestCase): - + def test_process_phone_number(self): """Test the process_phone_number function with various inputs.""" - + # Test cases for 9-digit subscriber numbers test_cases_9_digit = [ # Standard 11-digit number (2 area + 9 subscriber) @@ -30,7 +31,7 @@ class TestVCardProcessor(unittest.TestCase): # With extra non-digit characters ("+55-27-9.1234_5678", "+55 27 91234-5678", "+55 27 1234-5678"), ] - + # Test cases for 8-digit subscriber numbers test_cases_8_digit = [ # Standard 10-digit number (2 area + 8 subscriber) @@ -46,7 +47,7 @@ class TestVCardProcessor(unittest.TestCase): # With country code and trunk zero ("+55 0 27 1234-5678", "+55 27 1234-5678", None), ] - + # Edge cases edge_cases = [ # Too few digits @@ -60,19 +61,19 @@ class TestVCardProcessor(unittest.TestCase): # Unusual formatting but valid number ("(+55) [27] 9.1234_5678", "+55 27 91234-5678", "+55 27 1234-5678"), ] - + # Run tests for all cases all_cases = test_cases_9_digit + test_cases_8_digit + edge_cases - + for raw_phone, expected_orig, expected_mod in all_cases: with self.subTest(raw_phone=raw_phone): orig, mod = process_phone_number(raw_phone) self.assertEqual(orig, expected_orig) self.assertEqual(mod, expected_mod) - + def test_process_vcard(self): """Test the process_vcard function with various VCARD formats.""" - + # Test case 1: Standard TEL entries vcard1 = """BEGIN:VCARD VERSION:3.0 @@ -202,26 +203,26 @@ END:VCARD (vcard5, expected5), (vcard6, expected6) ] - + for i, (input_vcard, expected_output) in enumerate(test_cases): with self.subTest(case=i+1): # Create temporary files for input and output with tempfile.NamedTemporaryFile(mode='w+', delete=False, encoding='utf-8') as input_file: input_file.write(input_vcard) input_path = input_file.name - + output_path = input_path + '.out' - + try: # Process the VCARD process_vcard(input_path, output_path) - + # Read and verify the output with open(output_path, 'r', encoding='utf-8') as output_file: actual_output = output_file.read() self.assertEqual(actual_output, expected_output) - + finally: # Clean up temporary files if os.path.exists(input_path): @@ -231,7 +232,7 @@ END:VCARD def test_script_argument_handling(self): """Test the script's command-line argument handling.""" - + test_input = """BEGIN:VCARD VERSION:3.0 N:Test;User;;; @@ -239,16 +240,17 @@ FN:User Test TEL:+5527912345678 END:VCARD """ - + # Create a temporary input file with tempfile.NamedTemporaryFile(mode='w+', delete=False, encoding='utf-8') as input_file: input_file.write(test_input) input_path = input_file.name - + output_path = input_path + '.out' - + try: - test_args = ['python' if os.name == 'nt' else 'python3', 'scripts/brazilian_number_processing.py', input_path, output_path] + test_args = ['python' if os.name == 'nt' else 'python3', + 'scripts/brazilian_number_processing.py', input_path, output_path] # We're just testing that the argument parsing works subprocess.call( test_args, @@ -257,7 +259,7 @@ END:VCARD ) # Check if the output file was created self.assertTrue(os.path.exists(output_path)) - + finally: # Clean up temporary files if os.path.exists(input_path): @@ -265,5 +267,6 @@ END:VCARD if os.path.exists(output_path): os.unlink(output_path) + if __name__ == '__main__': unittest.main() diff --git a/tests/test_incremental_merge.py b/tests/test_incremental_merge.py index 5d6ef58..527e5ae 100644 --- a/tests/test_incremental_merge.py +++ b/tests/test_incremental_merge.py @@ -178,14 +178,14 @@ def test_incremental_merge_new_file(mock_filesystem): source_dir = "/source" target_dir = "/target" media_dir = "media" - + # Setup mock filesystem mock_filesystem["exists"].side_effect = lambda x: x == "/source" mock_filesystem["listdir"].return_value = ["chat.json"] - + # Run the function incremental_merge(source_dir, target_dir, media_dir, 2, True) - + # Verify the operations mock_filesystem["makedirs"].assert_called_once_with(target_dir, exist_ok=True) mock_filesystem["copy2"].assert_called_once_with( diff --git a/tests/test_nuitka_binary.py b/tests/test_nuitka_binary.py index 0e91930..0cd986e 100644 --- a/tests/test_nuitka_binary.py +++ b/tests/test_nuitka_binary.py @@ -43,9 +43,9 @@ def test_nuitka_binary(): "--assume-yes-for-downloads", "--follow-imports", "Whatsapp_Chat_Exporter/__main__.py", - "--output-filename=wtsexporter.exe" # use .exe on all platforms for compatibility + "--output-filename=wtsexporter.exe" # use .exe on all platforms for compatibility ] - + compile_result = subprocess.run( nuitka_command, capture_output=True, diff --git a/tests/test_vcards_contacts.py b/tests/test_vcards_contacts.py index 5d22b56..62cf603 100644 --- a/tests/test_vcards_contacts.py +++ b/tests/test_vcards_contacts.py @@ -8,12 +8,15 @@ def test_readVCardsFile(): data_dir = os.path.join(os.path.dirname(__file__), "data") assert len(read_vcards_file(os.path.join(data_dir, "contacts.vcf"), "852")) > 0 + def test_create_number_to_name_dicts(): pass + def test_fuzzy_match_numbers(): pass + def test_normalize_number(): assert normalize_number('0531234567', '1') == '1531234567' assert normalize_number('001531234567', '2') == '1531234567'