diff --git a/Whatsapp_Chat_Exporter/__main__.py b/Whatsapp_Chat_Exporter/__main__.py index 7ca07ca..51bbabf 100644 --- a/Whatsapp_Chat_Exporter/__main__.py +++ b/Whatsapp_Chat_Exporter/__main__.py @@ -34,12 +34,12 @@ def setup_argument_parser() -> ArgumentParser: """Set up and return the argument parser with all options.""" parser = ArgumentParser( description='A customizable Android and iOS/iPadOS WhatsApp database parser that ' - 'will give you the history of your WhatsApp conversations in HTML ' - 'and JSON. Android Backup Crypt12, Crypt14 and Crypt15 supported.', + 'will give you the history of your WhatsApp conversations in HTML ' + 'and JSON. Android Backup Crypt12, Crypt14 and Crypt15 supported.', epilog=f'WhatsApp Chat Exporter: {importlib.metadata.version("whatsapp_chat_exporter")} Licensed with MIT. See ' - 'https://wts.knugi.dev/docs?dest=osl for all open source licenses.' + 'https://wts.knugi.dev/docs?dest=osl for all open source licenses.' ) - + # Device type arguments device_group = parser.add_argument_group('Device Type') device_group.add_argument( @@ -54,7 +54,7 @@ def setup_argument_parser() -> ArgumentParser: "-e", "--exported", dest="exported", default=None, help="Define the target as exported chat file and specify the path to the file" ) - + # Input file paths input_group = parser.add_argument_group('Input Files') input_group.add_argument( @@ -86,7 +86,7 @@ def setup_argument_parser() -> ArgumentParser: "--wab", "--wa-backup", dest="wab", default=None, help="Path to contact database in crypt15 format" ) - + # Output options output_group = parser.add_argument_group('Output Options') output_group.add_argument( @@ -109,7 +109,7 @@ def setup_argument_parser() -> ArgumentParser: "--size", "--output-size", "--split", dest="size", nargs='?', const=0, default=None, help="Maximum (rough) size of a single output file in bytes, 0 for auto" ) - + # JSON formatting options json_group = parser.add_argument_group('JSON Options') json_group.add_argument( @@ -128,7 +128,7 @@ def setup_argument_parser() -> ArgumentParser: "--import", dest="import_json", default=False, action='store_true', help="Import JSON file and convert to HTML output" ) - + # HTML options html_group = parser.add_argument_group('HTML Options') html_group.add_argument( @@ -155,7 +155,7 @@ def setup_argument_parser() -> ArgumentParser: "--headline", dest="headline", default="Chat history with ??", help="The custom headline for the HTML output. Use '??' as a placeholder for the chat name" ) - + # Media handling media_group = parser.add_argument_group('Media Handling') media_group.add_argument( @@ -166,7 +166,7 @@ def setup_argument_parser() -> ArgumentParser: "--create-separated-media", dest="separate_media", default=False, action='store_true', help="Create a copy of the media seperated per chat in /separated/ directory" ) - + # Filtering options filter_group = parser.add_argument_group('Filtering Options') filter_group.add_argument( @@ -195,7 +195,7 @@ def setup_argument_parser() -> ArgumentParser: "Setting this flag will cause the exporter to render those. " "This is useful if chat(s) are missing from the output") ) - + # Contact enrichment contact_group = parser.add_argument_group('Contact Enrichment') contact_group.add_argument( @@ -219,7 +219,7 @@ def setup_argument_parser() -> ArgumentParser: "The chats (JSON files only) and media from the source directory will be merged into the target directory. " "No chat messages or media will be deleted from the target directory; only new chat messages and media will be added to it. " "This enables chat messages and media to be deleted from the device to free up space, while ensuring they are preserved in the exported backups." - ) + ) ) inc_merging_group.add_argument( "--source-dir", @@ -233,7 +233,7 @@ def setup_argument_parser() -> ArgumentParser: default=None, help="Sets the target directory. Used for performing incremental merges." ) - + # Miscellaneous misc_group = parser.add_argument_group('Miscellaneous') misc_group.add_argument( @@ -260,7 +260,7 @@ def setup_argument_parser() -> ArgumentParser: "--max-bruteforce-worker", dest="max_bruteforce_worker", default=10, type=int, help="Specify the maximum number of worker for bruteforce decryption." ) - + return parser @@ -272,52 +272,59 @@ def validate_args(parser: ArgumentParser, args) -> None: if not args.android and not args.ios and not args.exported and not args.import_json: parser.error("You must define the device type.") if args.no_html and not args.json and not args.text_format: - parser.error("You must either specify a JSON output file, text file output directory or enable HTML output.") + parser.error( + "You must either specify a JSON output file, text file output directory or enable HTML output.") if args.import_json and (args.android or args.ios or args.exported or args.no_html): - parser.error("You can only use --import with -j and without --no-html, -a, -i, -e.") + parser.error( + "You can only use --import with -j and without --no-html, -a, -i, -e.") elif args.import_json and not os.path.isfile(args.json): parser.error("JSON file not found.") if args.incremental_merge and (args.source_dir is None or args.target_dir is None): - parser.error("You must specify both --source-dir and --target-dir for incremental merge.") + parser.error( + "You must specify both --source-dir and --target-dir for incremental merge.") if args.android and args.business: parser.error("WhatsApp Business is only available on iOS for now.") if "??" not in args.headline: parser.error("--headline must contain '??' for replacement.") - + # JSON validation if args.json_per_chat and args.json and ( - (args.json.endswith(".json") and os.path.isfile(args.json)) or + (args.json.endswith(".json") and os.path.isfile(args.json)) or (not args.json.endswith(".json") and os.path.isfile(args.json)) ): - parser.error("When --per-chat is enabled, the destination of --json must be a directory.") - + parser.error( + "When --per-chat is enabled, the destination of --json must be a directory.") + # vCards validation if args.enrich_from_vcards is not None and args.default_country_code is None: - parser.error("When --enrich-from-vcards is provided, you must also set --default-country-code") - + parser.error( + "When --enrich-from-vcards is provided, you must also set --default-country-code") + # Size validation if args.size is not None and not isinstance(args.size, int) and not args.size.isnumeric(): try: args.size = readable_to_bytes(args.size) except ValueError: - parser.error("The value for --split must be ended in pure bytes or with a proper unit (e.g., 1048576 or 1MB)") - + parser.error( + "The value for --split must be ended in pure bytes or with a proper unit (e.g., 1048576 or 1MB)") + # Date filter validation and processing if args.filter_date is not None: process_date_filter(parser, args) - + # Crypt15 key validation if args.key is None and args.backup is not None and args.backup.endswith("crypt15"): args.key = getpass("Enter your encryption key: ") - + # Theme validation if args.whatsapp_theme: args.template = "whatsapp_new.html" - + # Chat filter validation if args.filter_chat_include is not None and args.filter_chat_exclude is not None: - parser.error("Chat inclusion and exclusion filters cannot be used together.") - + parser.error( + "Chat inclusion and exclusion filters cannot be used together.") + validate_chat_filters(parser, args.filter_chat_include) validate_chat_filters(parser, args.filter_chat_exclude) @@ -327,21 +334,24 @@ def validate_chat_filters(parser: ArgumentParser, chat_filter: Optional[List[str if chat_filter is not None: for chat in chat_filter: if not chat.isnumeric(): - parser.error("Enter a phone number in the chat filter. See https://wts.knugi.dev/docs?dest=chat") + parser.error( + "Enter a phone number in the chat filter. See https://wts.knugi.dev/docs?dest=chat") def process_date_filter(parser: ArgumentParser, args) -> None: """Process and validate date filter arguments.""" if " - " in args.filter_date: start, end = args.filter_date.split(" - ") - start = int(datetime.strptime(start, args.filter_date_format).timestamp()) + start = int(datetime.strptime( + start, args.filter_date_format).timestamp()) end = int(datetime.strptime(end, args.filter_date_format).timestamp()) - + if start < 1009843200 or end < 1009843200: parser.error("WhatsApp was first released in 2009...") if start > end: - parser.error("The start date cannot be a moment after the end date.") - + parser.error( + "The start date cannot be a moment after the end date.") + if args.android: args.filter_date = f"BETWEEN {start}000 AND {end}000" elif args.ios: @@ -353,13 +363,15 @@ def process_date_filter(parser: ArgumentParser, args) -> None: def process_single_date_filter(parser: ArgumentParser, args) -> None: """Process single date comparison filters.""" if len(args.filter_date) < 3: - parser.error("Unsupported date format. See https://wts.knugi.dev/docs?dest=date") - - _timestamp = int(datetime.strptime(args.filter_date[2:], args.filter_date_format).timestamp()) - + parser.error( + "Unsupported date format. See https://wts.knugi.dev/docs?dest=date") + + _timestamp = int(datetime.strptime( + args.filter_date[2:], args.filter_date_format).timestamp()) + if _timestamp < 1009843200: parser.error("WhatsApp was first released in 2009...") - + if args.filter_date[:2] == "> ": if args.android: args.filter_date = f">= {_timestamp}000" @@ -371,7 +383,8 @@ def process_single_date_filter(parser: ArgumentParser, args) -> None: elif args.ios: args.filter_date = f"<= {_timestamp - APPLE_TIME}" else: - parser.error("Unsupported date format. See https://wts.knugi.dev/docs?dest=date") + parser.error( + "Unsupported date format. See https://wts.knugi.dev/docs?dest=date") def setup_contact_store(args) -> Optional['ContactsFromVCards']: @@ -385,7 +398,8 @@ def setup_contact_store(args) -> Optional['ContactsFromVCards']: ) exit(1) contact_store = ContactsFromVCards() - contact_store.load_vcf_file(args.enrich_from_vcards, args.default_country_code) + contact_store.load_vcf_file( + args.enrich_from_vcards, args.default_country_code) return contact_store return None @@ -395,9 +409,9 @@ def decrypt_android_backup(args) -> int: if args.key is None or args.backup is None: print("You must specify the backup file with -b and a key with -k") return 1 - + print("Decryption key specified, decrypting WhatsApp backup...") - + # Determine crypt type if "crypt12" in args.backup: crypt = Crypt.CRYPT12 @@ -408,7 +422,7 @@ def decrypt_android_backup(args) -> int: else: print("Unknown backup format. The backup file must be crypt12, crypt14 or crypt15.") return 1 - + # Get key keyfile_stream = False if not os.path.isfile(args.key) and all(char in string.hexdigits for char in args.key.replace(" ", "")): @@ -416,10 +430,10 @@ def decrypt_android_backup(args) -> int: else: key = open(args.key, "rb") keyfile_stream = True - + # Read backup db = open(args.backup, "rb").read() - + # Process WAB if provided error_wa = 0 if args.wab: @@ -436,7 +450,7 @@ def decrypt_android_backup(args) -> int: ) if isinstance(key, io.IOBase): key.seek(0) - + # Decrypt message database error_message = android_crypt.decrypt_backup( db, @@ -448,7 +462,7 @@ def decrypt_android_backup(args) -> int: keyfile_stream=keyfile_stream, max_worker=args.max_bruteforce_worker ) - + # Handle errors if error_wa != 0: return error_wa @@ -473,7 +487,7 @@ def handle_decrypt_error(error: int) -> None: def process_contacts(args, data: ChatCollection, contact_store=None) -> None: """Process contacts from the database.""" contact_db = args.wa if args.wa else "wa.db" if args.android else "ContactsV2.sqlite" - + if os.path.isfile(contact_db): with sqlite3.connect(contact_db) as db: db.row_factory = sqlite3.Row @@ -486,42 +500,42 @@ def process_contacts(args, data: ChatCollection, contact_store=None) -> None: def process_messages(args, data: ChatCollection) -> None: """Process messages, media and vcards from the database.""" msg_db = args.db if args.db else "msgstore.db" if args.android else args.identifiers.MESSAGE - + if not os.path.isfile(msg_db): print( "The message database does not exist. You may specify the path " "to database file with option -d or check your provided path." ) exit(6) - + filter_chat = (args.filter_chat_include, args.filter_chat_exclude) - + with sqlite3.connect(msg_db) as db: db.row_factory = sqlite3.Row - + # Process messages if args.android: message_handler = android_handler else: message_handler = ios_handler - + message_handler.messages( - db, data, args.media, args.timezone_offset, + db, data, args.media, args.timezone_offset, args.filter_date, filter_chat, args.filter_empty ) - + # Process media message_handler.media( - db, data, args.media, args.filter_date, + db, data, args.media, args.filter_date, filter_chat, args.filter_empty, args.separate_media ) - + # Process vcards message_handler.vcard( - db, data, args.media, args.filter_date, + db, data, args.media, args.filter_date, filter_chat, args.filter_empty ) - + # Process calls process_calls(args, db, data, filter_chat) @@ -540,9 +554,10 @@ def handle_media_directory(args) -> None: """Handle media directory copying or moving.""" if os.path.isdir(args.media): media_path = os.path.join(args.output, args.media) - + if os.path.isdir(media_path): - print("\nWhatsApp directory already exists in output directory. Skipping...", end="\n") + print( + "\nWhatsApp directory already exists in output directory. Skipping...", end="\n") else: if args.move_media: try: @@ -563,7 +578,7 @@ def create_output_files(args, data: ChatCollection, contact_store=None) -> None: # Enrich from vcards if available if contact_store and not contact_store.is_empty(): contact_store.enrich_from_vcards(data) - + android_handler.create_html( data, args.output, @@ -575,12 +590,12 @@ def create_output_files(args, data: ChatCollection, contact_store=None) -> None: args.whatsapp_theme, args.headline ) - + # Create text files if requested if args.text_format: print("Writing text file...") android_handler.create_txt(data, args.text_format) - + # Create JSON files if requested if args.json and not args.import_json: export_json(args, data, contact_store) @@ -591,11 +606,11 @@ def export_json(args, data: ChatCollection, contact_store=None) -> None: # Enrich from vcards if available if contact_store and not contact_store.is_empty(): contact_store.enrich_from_vcards(data) - + # Convert ChatStore objects to JSON if isinstance(data.get(next(iter(data), None)), ChatStore): data = {jik: chat.to_json() for jik, chat in data.items()} - + # Export as a single file or per chat if not args.json_per_chat: export_single_json(args, data) @@ -619,11 +634,11 @@ def export_multiple_json(args, data: Dict) -> None: """Export data to multiple JSON files, one per chat.""" # Adjust output path if needed json_path = args.json[:-5] if args.json.endswith(".json") else args.json - + # Create directory if it doesn't exist if not os.path.isdir(json_path): os.makedirs(json_path, exist_ok=True) - + # Export each chat total = len(data.keys()) for index, jik in enumerate(data.keys()): @@ -631,11 +646,11 @@ def export_multiple_json(args, data: Dict) -> None: contact = data[jik]["name"].replace('/', '') else: contact = jik.replace('+', '') - + with open(f"{json_path}/{sanitize_filename(contact)}.json", "w") as f: file_content = json.dumps( - {jik: data[jik]}, - ensure_ascii=not args.avoid_encoding_json, + {jik: data[jik]}, + ensure_ascii=not args.avoid_encoding_json, indent=args.pretty_print_json ) f.write(file_content) @@ -646,7 +661,7 @@ def export_multiple_json(args, data: Dict) -> None: def process_exported_chat(args, data: ChatCollection) -> None: """Process an exported chat file.""" exported_handler.messages(args.exported, data, args.assume_first_as_me) - + if not args.no_html: android_handler.create_html( data, @@ -659,7 +674,7 @@ def process_exported_chat(args, data: ChatCollection) -> None: args.whatsapp_theme, args.headline ) - + # Copy files to output directory for file in glob.glob(r'*.*'): shutil.copy(file, args.output) @@ -670,23 +685,23 @@ def main(): # Set up and parse arguments parser = setup_argument_parser() args = parser.parse_args() - + # Check for updates if args.check_update: exit(check_update()) - + # Validate arguments validate_args(parser, args) - + # Create output directory if it doesn't exist os.makedirs(args.output, exist_ok=True) - + # Initialize data collection data = ChatCollection() - + # Set up contact store for vCard enrichment if needed contact_store = setup_contact_store(args) - + if args.import_json: # Import from JSON import_from_json(args.json, data) @@ -710,13 +725,13 @@ def main(): # Set default media path if not provided if args.media is None: args.media = "WhatsApp" - + # Set default DB paths if not provided if args.db is None: args.db = "msgstore.db" if args.wa is None: args.wa = "wa.db" - + # Decrypt backup if needed if args.key is not None: error = decrypt_android_backup(args) @@ -729,24 +744,26 @@ def main(): else: from Whatsapp_Chat_Exporter.utility import WhatsAppIdentifier as identifiers args.identifiers = identifiers - + # Set default media path if not provided if args.media is None: args.media = identifiers.DOMAIN - + # Extract media from backup if needed if args.backup is not None: if not os.path.isdir(args.media): - ios_media_handler.extract_media(args.backup, identifiers, args.decrypt_chunk_size) + ios_media_handler.extract_media( + args.backup, identifiers, args.decrypt_chunk_size) else: - print("WhatsApp directory already exists, skipping WhatsApp file extraction.") - + print( + "WhatsApp directory already exists, skipping WhatsApp file extraction.") + # Set default DB paths if not provided if args.db is None: args.db = identifiers.MESSAGE if args.wa is None: args.wa = "ContactsV2.sqlite" - + if args.incremental_merge: incremental_merge( args.source_dir, @@ -756,16 +773,16 @@ def main(): args.avoid_encoding_json ) print("Incremental merge completed successfully.") - else: + else: # Process contacts process_contacts(args, data, contact_store) - + # Process messages, media, and calls process_messages(args, data) - + # Create output files create_output_files(args, data, contact_store) - + # Handle media directory handle_media_directory(args) diff --git a/Whatsapp_Chat_Exporter/data_model.py b/Whatsapp_Chat_Exporter/data_model.py index 3dc6b24..1ebf75d 100644 --- a/Whatsapp_Chat_Exporter/data_model.py +++ b/Whatsapp_Chat_Exporter/data_model.py @@ -7,6 +7,7 @@ class Timing: """ Handles timestamp formatting with timezone support. """ + def __init__(self, timezone_offset: Optional[int]) -> None: """ Initialize Timing object. @@ -37,6 +38,7 @@ class TimeZone(tzinfo): """ Custom timezone class with fixed offset. """ + def __init__(self, offset: int) -> None: """ Initialize TimeZone object. @@ -151,6 +153,7 @@ class ChatStore: """ Stores chat information and messages. """ + def __init__(self, type: str, name: Optional[str] = None, media: Optional[str] = None) -> None: """ Initialize ChatStore object. @@ -159,7 +162,7 @@ class ChatStore: type (str): Device type (IOS or ANDROID) name (Optional[str]): Chat name media (Optional[str]): Path to media folder - + Raises: TypeError: If name is not a string or None """ @@ -182,7 +185,7 @@ class ChatStore: self.their_avatar_thumb = None self.status = None self.media_base = "" - + def __len__(self) -> int: """Get number of chats. Required for dict-like access.""" return len(self._messages) @@ -192,7 +195,7 @@ class ChatStore: if not isinstance(message, Message): raise TypeError("message must be a Message object") self._messages[id] = message - + def get_message(self, id: str) -> 'Message': """Get a message from the chat store.""" return self._messages.get(id) @@ -214,7 +217,7 @@ class ChatStore: 'media_base': self.media_base, 'messages': {id: msg.to_json() for id, msg in self._messages.items()} } - + @classmethod def from_json(cls, data: Dict) -> 'ChatStore': """Create a chat store from JSON data.""" @@ -232,7 +235,7 @@ class ChatStore: def get_last_message(self) -> 'Message': """Get the most recent message in the chat.""" return tuple(self._messages.values())[-1] - + def items(self): """Get message items pairs.""" return self._messages.items() @@ -250,11 +253,11 @@ class ChatStore: Args: other (ChatStore): The ChatStore to merge with - + """ if not isinstance(other, ChatStore): raise TypeError("Can only merge with another ChatStore object") - + # Update fields if they are not None in the other ChatStore self.name = other.name or self.name self.type = other.type or self.type @@ -262,14 +265,16 @@ class ChatStore: self.their_avatar = other.their_avatar or self.their_avatar self.their_avatar_thumb = other.their_avatar_thumb or self.their_avatar_thumb self.status = other.status or self.status - + # Merge messages self._messages.update(other._messages) + class Message: """ Represents a single message in a chat. """ + def __init__( self, *, @@ -301,7 +306,7 @@ class Message: self.from_me = bool(from_me) self.timestamp = timestamp / 1000 if timestamp > 9999999999 else timestamp timing = Timing(timezone_offset) - + if isinstance(time, (int, float)): self.time = timing.format_timestamp(self.timestamp, "%H:%M") elif isinstance(time, str): @@ -318,13 +323,15 @@ class Message: self.mime = None self.message_type = message_type if isinstance(received_timestamp, (int, float)): - self.received_timestamp = timing.format_timestamp(received_timestamp, "%Y/%m/%d %H:%M") + self.received_timestamp = timing.format_timestamp( + received_timestamp, "%Y/%m/%d %H:%M") elif isinstance(received_timestamp, str): self.received_timestamp = received_timestamp else: self.received_timestamp = None if isinstance(read_timestamp, (int, float)): - self.read_timestamp = timing.format_timestamp(read_timestamp, "%Y/%m/%d %H:%M") + self.read_timestamp = timing.format_timestamp( + read_timestamp, "%Y/%m/%d %H:%M") elif isinstance(read_timestamp, str): self.read_timestamp = read_timestamp else: @@ -363,13 +370,13 @@ class Message: @classmethod def from_json(cls, data: Dict) -> 'Message': message = cls( - from_me = data["from_me"], - timestamp = data["timestamp"], - time = data["time"], - key_id = data["key_id"], - message_type = data.get("message_type"), - received_timestamp = data.get("received_timestamp"), - read_timestamp = data.get("read_timestamp") + from_me=data["from_me"], + timestamp=data["timestamp"], + time=data["time"], + key_id=data["key_id"], + message_type=data.get("message_type"), + received_timestamp=data.get("received_timestamp"), + read_timestamp=data.get("read_timestamp") ) message.media = data.get("media") message.meta = data.get("meta") diff --git a/Whatsapp_Chat_Exporter/utility.py b/Whatsapp_Chat_Exporter/utility.py index cbc67ae..49b8335 100644 --- a/Whatsapp_Chat_Exporter/utility.py +++ b/Whatsapp_Chat_Exporter/utility.py @@ -18,6 +18,7 @@ except ImportError: # < Python 3.11 # This should be removed when the support for Python 3.10 ends. (31 Oct 2026) from enum import Enum + class StrEnum(str, Enum): pass @@ -72,7 +73,7 @@ def bytes_to_readable(size_bytes: int) -> str: A human-readable string representing the file size. """ if size_bytes == 0: - return "0B" + return "0B" size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB") i = int(math.floor(math.log(size_bytes, 1024))) p = math.pow(1024, i) @@ -100,7 +101,7 @@ def readable_to_bytes(size_str: str) -> int: 'TB': 1024**4, 'PB': 1024**5, 'EB': 1024**6, - 'ZB': 1024**7, + 'ZB': 1024**7, 'YB': 1024**8 } size_str = size_str.upper().strip() @@ -155,7 +156,8 @@ def check_update(): else: with raw: package_info = json.load(raw) - latest_version = tuple(map(int, package_info["info"]["version"].split("."))) + latest_version = tuple( + map(int, package_info["info"]["version"].split("."))) __version__ = importlib.metadata.version("whatsapp_chat_exporter") current_version = tuple(map(int, __version__.split("."))) if current_version < latest_version: @@ -174,17 +176,17 @@ def check_update(): def rendering( - output_file_name, - template, - name, - msgs, - contact, - w3css, - chat, - headline, - next=False, - previous=False - ): + output_file_name, + template, + name, + msgs, + contact, + w3css, + chat, + headline, + next=False, + previous=False +): if chat.their_avatar_thumb is None and chat.their_avatar is not None: their_avatar_thumb = chat.their_avatar else: @@ -256,7 +258,8 @@ def import_from_json(json_file: str, data: Dict[str, ChatStore]): message.sticker = msg.get("sticker") chat.add_message(id, message) data[jid] = chat - print(f"Importing chats from JSON...({index + 1}/{total_row_number})", end="\r") + print( + f"Importing chats from JSON...({index + 1}/{total_row_number})", end="\r") def incremental_merge(source_dir: str, target_dir: str, media_dir: str, pretty_print_json: int, avoid_encoding_json: bool): @@ -273,39 +276,44 @@ def incremental_merge(source_dir: str, target_dir: str, media_dir: str, pretty_p return print("JSON files found:", json_files) - + for json_file in json_files: source_path = os.path.join(source_dir, json_file) target_path = os.path.join(target_dir, json_file) - + if not os.path.exists(target_path): print(f"Copying '{json_file}' to target directory...") os.makedirs(target_dir, exist_ok=True) with open(source_path, 'rb') as src, open(target_path, 'wb') as dst: dst.write(src.read()) else: - print(f"Merging '{json_file}' with existing file in target directory...") + print( + f"Merging '{json_file}' with existing file in target directory...") with open(source_path, 'r') as src_file, open(target_path, 'r') as tgt_file: source_data = json.load(src_file) target_data = json.load(tgt_file) - + # Parse JSON into ChatStore objects using from_json() - source_chats = {jid: ChatStore.from_json(chat) for jid, chat in source_data.items()} - target_chats = {jid: ChatStore.from_json(chat) for jid, chat in target_data.items()} - + source_chats = {jid: ChatStore.from_json( + chat) for jid, chat in source_data.items()} + target_chats = {jid: ChatStore.from_json( + chat) for jid, chat in target_data.items()} + # Merge chats using merge_with() for jid, chat in source_chats.items(): if jid in target_chats: target_chats[jid].merge_with(chat) else: target_chats[jid] = chat - + # Serialize merged data - merged_data = {jid: chat.to_json() for jid, chat in target_chats.items()} - + merged_data = {jid: chat.to_json() + for jid, chat in target_chats.items()} + # Check if the merged data differs from the original target data if json.dumps(merged_data, sort_keys=True) != json.dumps(target_data, sort_keys=True): - print(f"Changes detected in '{json_file}', updating target file...") + print( + f"Changes detected in '{json_file}', updating target file...") with open(target_path, 'w') as merged_file: json.dump( merged_data, @@ -314,12 +322,14 @@ def incremental_merge(source_dir: str, target_dir: str, media_dir: str, pretty_p ensure_ascii=not avoid_encoding_json, ) else: - print(f"No changes detected in '{json_file}', skipping update.") + print( + f"No changes detected in '{json_file}', skipping update.") # Merge media directories source_media_path = os.path.join(source_dir, media_dir) target_media_path = os.path.join(target_dir, media_dir) - print(f"Merging media directories. Source: {source_media_path}, target: {target_media_path}") + print( + f"Merging media directories. Source: {source_media_path}, target: {target_media_path}") if os.path.exists(source_media_path): for root, _, files in os.walk(source_media_path): relative_path = os.path.relpath(root, source_media_path) @@ -411,23 +421,29 @@ def get_chat_condition(filter: Optional[List[str]], include: bool, columns: List if filter is not None: conditions = [] if len(columns) < 2 and jid is not None: - raise ValueError("There must be at least two elements in argument columns if jid is not None") + raise ValueError( + "There must be at least two elements in argument columns if jid is not None") if jid is not None: if platform == "android": is_group = f"{jid}.type == 1" elif platform == "ios": is_group = f"{jid} IS NOT NULL" else: - raise ValueError("Only android and ios are supported for argument platform if jid is not None") + raise ValueError( + "Only android and ios are supported for argument platform if jid is not None") for index, chat in enumerate(filter): if include: - conditions.append(f"{' OR' if index > 0 else ''} {columns[0]} LIKE '%{chat}%'") + conditions.append( + f"{' OR' if index > 0 else ''} {columns[0]} LIKE '%{chat}%'") if len(columns) > 1: - conditions.append(f" OR ({columns[1]} LIKE '%{chat}%' AND {is_group})") + conditions.append( + f" OR ({columns[1]} LIKE '%{chat}%' AND {is_group})") else: - conditions.append(f"{' AND' if index > 0 else ''} {columns[0]} NOT LIKE '%{chat}%'") + conditions.append( + f"{' AND' if index > 0 else ''} {columns[0]} NOT LIKE '%{chat}%'") if len(columns) > 1: - conditions.append(f" AND ({columns[1]} NOT LIKE '%{chat}%' AND {is_group})") + conditions.append( + f" AND ({columns[1]} NOT LIKE '%{chat}%' AND {is_group})") return f"AND ({' '.join(conditions)})" else: return "" @@ -522,7 +538,7 @@ def determine_metadata(content: sqlite3.Row, init_msg: Optional[str]) -> Optiona else: msg = f"{old} changed their number to {new}" elif content["action_type"] == 46: - return # Voice message in PM??? Seems no need to handle. + return # Voice message in PM??? Seems no need to handle. elif content["action_type"] == 47: msg = "The contact is an official business account" elif content["action_type"] == 50: @@ -539,7 +555,8 @@ def determine_metadata(content: sqlite3.Row, init_msg: Optional[str]) -> Optiona elif content["action_type"] == 67: return # (PM) this contact use secure service from Facebook??? elif content["action_type"] == 69: - return # (PM) this contact use secure service from Facebook??? What's the difference with 67???? + # (PM) this contact use secure service from Facebook??? What's the difference with 67???? + return else: return # Unsupported return msg @@ -566,7 +583,8 @@ def get_status_location(output_folder: str, offline_static: str) -> str: w3css_path = os.path.join(static_folder, "w3.css") if not os.path.isfile(w3css_path): with urllib.request.urlopen(w3css) as resp: - with open(w3css_path, "wb") as f: f.write(resp.read()) + with open(w3css_path, "wb") as f: + f.write(resp.read()) w3css = os.path.join(offline_static, "w3.css") @@ -597,6 +615,7 @@ def setup_template(template: Optional[str], no_avatar: bool, experimental: bool template_env.filters['sanitize_except'] = sanitize_except return template_env.get_template(template_file) + # iOS Specific APPLE_TIME = 978307200 @@ -617,23 +636,31 @@ def slugify(value: str, allow_unicode: bool = False) -> str: if allow_unicode: value = unicodedata.normalize('NFKC', value) else: - value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore').decode('ascii') + value = unicodedata.normalize('NFKD', value).encode( + 'ascii', 'ignore').decode('ascii') value = re.sub(r'[^\w\s-]', '', value.lower()) return re.sub(r'[-\s]+', '-', value).strip('-_') class WhatsAppIdentifier(StrEnum): - MESSAGE = "7c7fba66680ef796b916b067077cc246adacf01d" # AppDomainGroup-group.net.whatsapp.WhatsApp.shared-ChatStorage.sqlite - CONTACT = "b8548dc30aa1030df0ce18ef08b882cf7ab5212f" # AppDomainGroup-group.net.whatsapp.WhatsApp.shared-ContactsV2.sqlite - CALL = "1b432994e958845fffe8e2f190f26d1511534088" # AppDomainGroup-group.net.whatsapp.WhatsApp.shared-CallHistory.sqlite + # AppDomainGroup-group.net.whatsapp.WhatsApp.shared-ChatStorage.sqlite + MESSAGE = "7c7fba66680ef796b916b067077cc246adacf01d" + # AppDomainGroup-group.net.whatsapp.WhatsApp.shared-ContactsV2.sqlite + CONTACT = "b8548dc30aa1030df0ce18ef08b882cf7ab5212f" + # AppDomainGroup-group.net.whatsapp.WhatsApp.shared-CallHistory.sqlite + CALL = "1b432994e958845fffe8e2f190f26d1511534088" DOMAIN = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared" class WhatsAppBusinessIdentifier(StrEnum): - MESSAGE = "724bd3b98b18518b455a87c1f3ac3a0d189c4466" # AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-ChatStorage.sqlite - CONTACT = "d7246a707f51ddf8b17ee2dddabd9e0a4da5c552" # AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-ContactsV2.sqlite - CALL = "b463f7c4365eefc5a8723930d97928d4e907c603" # AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-CallHistory.sqlite - DOMAIN = "AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared" + # AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-ChatStorage.sqlite + MESSAGE = "724bd3b98b18518b455a87c1f3ac3a0d189c4466" + # AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-ContactsV2.sqlite + CONTACT = "d7246a707f51ddf8b17ee2dddabd9e0a4da5c552" + # AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-CallHistory.sqlite + CALL = "b463f7c4365eefc5a8723930d97928d4e907c603" + DOMAIN = "AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared" + class JidType(IntEnum): PM = 0 diff --git a/tests/test_incremental_merge.py b/tests/test_incremental_merge.py index abd1dc2..5f96afd 100644 --- a/tests/test_incremental_merge.py +++ b/tests/test_incremental_merge.py @@ -209,24 +209,24 @@ def test_incremental_merge_existing_file_with_changes(mock_filesystem): source_dir = "/source" target_dir = "/target" media_dir = "media" - + # Setup mock filesystem mock_filesystem["exists"].side_effect = lambda x: True mock_filesystem["listdir"].return_value = ["chat.json"] - + # Mock file operations mock_file_content = { "/source/chat.json": json.dumps(chat_data_2), "/target/chat.json": json.dumps(chat_data_1), } - + written_chunks = [] - + def mock_file_write(data): written_chunks.append(data) - + mock_write = MagicMock(side_effect=mock_file_write) - + with patch("builtins.open", mock_open()) as mock_file: def mock_file_read(filename, mode="r"): content = mock_file_content.get(filename) @@ -234,27 +234,27 @@ def test_incremental_merge_existing_file_with_changes(mock_filesystem): if mode == 'w': file_mock.write.side_effect = mock_write return file_mock - + mock_file.side_effect = mock_file_read - + # Run the function incremental_merge(source_dir, target_dir, media_dir, 2, True) - + # Verify file operations - both files opened in text mode when target exists mock_file.assert_any_call("/source/chat.json", "r") mock_file.assert_any_call("/target/chat.json", "r") mock_file.assert_any_call("/target/chat.json", "w") - + # Verify write was called assert mock_write.called, "Write method was never called" - + # Combine chunks and parse JSON written_data = json.loads(''.join(written_chunks)) - + # Verify the merged data is correct assert written_data is not None, "No data was written" assert written_data == chat_data_merged, "Merged data does not match expected result" - + # Verify specific message retention messages = written_data["12345678@s.whatsapp.net"]["messages"] assert "24690" in messages, "Common message should be present" @@ -292,7 +292,8 @@ def test_incremental_merge_existing_file_no_changes(mock_filesystem): incremental_merge(source_dir, target_dir, media_dir, 2, True) # Verify no write operations occurred on target file - write_calls = [call for call in mock_file.mock_calls if call[0] == "().write"] + write_calls = [ + call for call in mock_file.mock_calls if call[0] == "().write"] assert len(write_calls) == 0 @@ -333,4 +334,5 @@ def test_incremental_merge_media_copy(mock_filesystem): assert ( mock_filesystem["makedirs"].call_count >= 2 ) # At least target dir and media dir - assert mock_filesystem["copy2"].call_count == 2 # Two media files copied + # Two media files copied + assert mock_filesystem["copy2"].call_count == 2