mirror of
https://github.com/KnugiHK/WhatsApp-Chat-Exporter.git
synced 2026-01-29 05:40:42 +00:00
Added support for incremental merging
This commit is contained in:
@@ -13,7 +13,7 @@ from Whatsapp_Chat_Exporter import ios_handler, ios_media_handler
|
||||
from Whatsapp_Chat_Exporter.data_model import ChatCollection, ChatStore
|
||||
from Whatsapp_Chat_Exporter.utility import APPLE_TIME, Crypt, check_update, DbType
|
||||
from Whatsapp_Chat_Exporter.utility import readable_to_bytes, sanitize_filename
|
||||
from Whatsapp_Chat_Exporter.utility import import_from_json, bytes_to_readable
|
||||
from Whatsapp_Chat_Exporter.utility import import_from_json, incremental_merge, bytes_to_readable
|
||||
from argparse import ArgumentParser, SUPPRESS
|
||||
from datetime import datetime
|
||||
from getpass import getpass
|
||||
@@ -206,6 +206,32 @@ def setup_argument_parser() -> ArgumentParser:
|
||||
"--default-country-code", dest="default_country_code", default=None,
|
||||
help="Use with --enrich-from-vcards. When numbers in the vcf file does not have a country code, this will be used. 1 is for US, 66 for Thailand etc. Most likely use the number of your own country"
|
||||
)
|
||||
|
||||
# Incremental merging
|
||||
inc_merging_group = parser.add_argument_group('Incremental Merging')
|
||||
inc_merging_group.add_argument(
|
||||
"--incremental-merge",
|
||||
dest="incremental_merge",
|
||||
default=False,
|
||||
action='store_true',
|
||||
help=("Performs an incremental merge of two exports."
|
||||
"Requires setting both --source-dir and --target-dir."
|
||||
"The chats and media of the source directory will be merged into the target directory."
|
||||
"No chats or media will be deleted from the target directory, only new chats and media will be added to it."
|
||||
)
|
||||
)
|
||||
inc_merging_group.add_argument(
|
||||
"--source-dir",
|
||||
dest="source_dir",
|
||||
default=None,
|
||||
help="Sets the source directory. Used for performing incremental merges."
|
||||
)
|
||||
inc_merging_group.add_argument(
|
||||
"--target-dir",
|
||||
dest="target_dir",
|
||||
default=None,
|
||||
help="Sets the target directory. Used for performing incremental merges."
|
||||
)
|
||||
|
||||
# Miscellaneous
|
||||
misc_group = parser.add_argument_group('Miscellaneous')
|
||||
@@ -250,6 +276,8 @@ def validate_args(parser: ArgumentParser, args) -> None:
|
||||
parser.error("You can only use --import with -j and without --no-html, -a, -i, -e.")
|
||||
elif args.import_json and not os.path.isfile(args.json):
|
||||
parser.error("JSON file not found.")
|
||||
if args.incremental_merge and args.source_dir is None or args.target_dir is None:
|
||||
parser.error("You must specify both --source-dir and --target-dir for incremental merge.")
|
||||
if args.android and args.business:
|
||||
parser.error("WhatsApp Business is only available on iOS for now.")
|
||||
if "??" not in args.headline:
|
||||
@@ -672,6 +700,8 @@ def main():
|
||||
args.whatsapp_theme,
|
||||
args.headline
|
||||
)
|
||||
elif args.incremental_merge:
|
||||
incremental_merge(args.source_dir, args.target_dir, args.media)
|
||||
elif args.exported:
|
||||
# Process exported chat
|
||||
process_exported_chat(args, data)
|
||||
|
||||
@@ -213,6 +213,18 @@ class ChatStore:
|
||||
'status': self.status,
|
||||
'messages': {id: msg.to_json() for id, msg in self._messages.items()}
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, data):
|
||||
chat = cls(data.get("type"), data.get("name"))
|
||||
chat.my_avatar = data.get("my_avatar")
|
||||
chat.their_avatar = data.get("their_avatar")
|
||||
chat.their_avatar_thumb = data.get("their_avatar_thumb")
|
||||
chat.status = data.get("status")
|
||||
for id, msg_data in data.get("messages", {}).items():
|
||||
message = Message.from_json(msg_data)
|
||||
chat.add_message(id, message)
|
||||
return chat
|
||||
|
||||
def get_last_message(self) -> 'Message':
|
||||
"""Get the most recent message in the chat."""
|
||||
@@ -230,6 +242,20 @@ class ChatStore:
|
||||
"""Get all message keys in the chat."""
|
||||
return self._messages.keys()
|
||||
|
||||
def merge_with(self, other):
|
||||
if not isinstance(other, ChatStore):
|
||||
raise TypeError("Can only merge with another ChatStore object")
|
||||
|
||||
# Update fields if they are not None in the other ChatStore
|
||||
self.name = other.name or self.name
|
||||
self.type = other.type or self.type
|
||||
self.my_avatar = other.my_avatar or self.my_avatar
|
||||
self.their_avatar = other.their_avatar or self.their_avatar
|
||||
self.their_avatar_thumb = other.their_avatar_thumb or self.their_avatar_thumb
|
||||
self.status = other.status or self.status
|
||||
|
||||
# Merge messages
|
||||
self.messages.update(other.messages)
|
||||
|
||||
class Message:
|
||||
"""
|
||||
@@ -310,4 +336,25 @@ class Message:
|
||||
'caption': self.caption,
|
||||
'thumb': self.thumb,
|
||||
'sticker': self.sticker
|
||||
}
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, data):
|
||||
message = cls(
|
||||
data["from_me"],
|
||||
data["timestamp"],
|
||||
data["time"],
|
||||
data["key_id"]
|
||||
)
|
||||
message.media = data.get("media")
|
||||
message.meta = data.get("meta")
|
||||
message.data = data.get("data")
|
||||
message.sender = data.get("sender")
|
||||
message.safe = data.get("safe")
|
||||
message.mime = data.get("mime")
|
||||
message.reply = data.get("reply")
|
||||
message.quoted_data = data.get("quoted_data")
|
||||
message.caption = data.get("caption")
|
||||
message.thumb = data.get("thumb")
|
||||
message.sticker = data.get("sticker")
|
||||
return message
|
||||
|
||||
@@ -10,6 +10,7 @@ from markupsafe import Markup
|
||||
from datetime import datetime, timedelta
|
||||
from enum import IntEnum
|
||||
from Whatsapp_Chat_Exporter.data_model import ChatStore
|
||||
import shutil
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
try:
|
||||
from enum import StrEnum, IntEnum
|
||||
@@ -258,6 +259,58 @@ def import_from_json(json_file: str, data: Dict[str, ChatStore]):
|
||||
print(f"Importing chats from JSON...({index + 1}/{total_row_number})", end="\r")
|
||||
|
||||
|
||||
def incremental_merge(source_dir: str, target_dir: str, media_dir: str):
|
||||
json_files = [f for f in os.listdir(source_dir) if f.endswith('.json')]
|
||||
print("JSON files found:", json_files)
|
||||
|
||||
for json_file in json_files:
|
||||
source_path = os.path.join(source_dir, json_file)
|
||||
target_path = os.path.join(target_dir, json_file)
|
||||
|
||||
if not os.path.exists(target_path):
|
||||
print(f"Copying {json_file} to target directory...")
|
||||
os.makedirs(target_dir, exist_ok=True)
|
||||
with open(source_path, 'rb') as src, open(target_path, 'wb') as dst:
|
||||
dst.write(src.read())
|
||||
else:
|
||||
print(f"Merging {json_file} with existing file in target directory...")
|
||||
with open(source_path, 'r') as src_file, open(target_path, 'r') as tgt_file:
|
||||
source_data = json.load(src_file)
|
||||
target_data = json.load(tgt_file)
|
||||
|
||||
# Parse JSON into ChatStore objects using from_json()
|
||||
source_chats = {jid: ChatStore.from_json(chat) for jid, chat in source_data.items()}
|
||||
target_chats = {jid: ChatStore.from_json(chat) for jid, chat in target_data.items()}
|
||||
|
||||
# Merge chats using merge_with()
|
||||
for jid, chat in source_chats.items():
|
||||
if jid in target_chats:
|
||||
target_chats[jid].merge_with(chat)
|
||||
else:
|
||||
target_chats[jid] = chat
|
||||
|
||||
# Write merged data back to the target file
|
||||
with open(target_path, 'w') as merged_file:
|
||||
merged_data = {jid: chat.to_json() for jid, chat in target_chats.items()}
|
||||
json.dump(merged_data, merged_file, indent=2)
|
||||
|
||||
# Merge media directories
|
||||
source_media_path = os.path.join(source_dir, media_dir)
|
||||
target_media_path = os.path.join(target_dir, media_dir)
|
||||
if os.path.exists(source_media_path):
|
||||
for root, dirs, files in os.walk(source_media_path):
|
||||
relative_path = os.path.relpath(root, source_media_path)
|
||||
target_root = os.path.join(target_media_path, relative_path)
|
||||
os.makedirs(target_root, exist_ok=True)
|
||||
for file in files:
|
||||
source_file = os.path.join(root, file)
|
||||
target_file = os.path.join(target_root, file)
|
||||
# we only copy if the file doesn't exist in the target or if the source is newer
|
||||
if not os.path.exists(target_file) or os.path.getmtime(source_file) > os.path.getmtime(target_file):
|
||||
print(f"Copying {source_file} to {target_file}...")
|
||||
shutil.copy2(source_file, target_file)
|
||||
|
||||
|
||||
def sanitize_filename(file_name: str) -> str:
|
||||
"""Sanitizes a filename by removing invalid and unsafe characters.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user