diff --git a/Whatsapp_Chat_Exporter/__main__.py b/Whatsapp_Chat_Exporter/__main__.py index 95ccfc3..0f78b40 100644 --- a/Whatsapp_Chat_Exporter/__main__.py +++ b/Whatsapp_Chat_Exporter/__main__.py @@ -15,6 +15,7 @@ from Whatsapp_Chat_Exporter.data_model import ChatCollection, ChatStore from Whatsapp_Chat_Exporter.utility import APPLE_TIME, CLEAR_LINE, Crypt, check_update from Whatsapp_Chat_Exporter.utility import readable_to_bytes, safe_name, bytes_to_readable from Whatsapp_Chat_Exporter.utility import import_from_json, incremental_merge, DbType +from Whatsapp_Chat_Exporter.utility import telegram_json_format from argparse import ArgumentParser, SUPPRESS from datetime import datetime from getpass import getpass @@ -152,6 +153,10 @@ def setup_argument_parser() -> ArgumentParser: '--pretty-print-json', dest='pretty_print_json', default=None, nargs='?', const=2, type=int, help="Pretty print the output JSON." ) + json_group.add_argument( + "--telegram", dest="telegram", default=False, action='store_true', + help="Output the JSON in a format compatible with Telegram export (implies json-per-chat)" + ) json_group.add_argument( "--per-chat", dest="json_per_chat", default=False, action='store_true', help="Output the JSON file per chat" @@ -652,7 +657,7 @@ def export_json(args, data: ChatCollection, contact_store=None) -> None: data = {jik: chat.to_json() for jik, chat in data.items()} # Export as a single file or per chat - if not args.json_per_chat: + if not args.json_per_chat and not args.telegram: export_single_json(args, data) else: export_multiple_json(args, data) @@ -688,9 +693,13 @@ def export_multiple_json(args, data: Dict) -> None: else: contact = jik.replace('+', '') + if args.telegram: + messages = telegram_json_format(jik, data[jik], args.timezone_offset) + else: + messages = {jik: data[jik]} with open(f"{json_path}/{safe_name(contact)}.json", "w") as f: file_content = json.dumps( - {jik: data[jik]}, + messages, ensure_ascii=not args.avoid_encoding_json, indent=args.pretty_print_json ) diff --git a/Whatsapp_Chat_Exporter/utility.py b/Whatsapp_Chat_Exporter/utility.py index a39af16..5f2add1 100644 --- a/Whatsapp_Chat_Exporter/utility.py +++ b/Whatsapp_Chat_Exporter/utility.py @@ -12,7 +12,7 @@ from bleach import clean as sanitize from markupsafe import Markup from datetime import datetime, timedelta from enum import IntEnum -from Whatsapp_Chat_Exporter.data_model import ChatCollection, ChatStore +from Whatsapp_Chat_Exporter.data_model import ChatCollection, ChatStore, Timing from typing import Dict, List, Optional, Tuple, Union try: from enum import StrEnum, IntEnum @@ -627,6 +627,82 @@ def safe_name(text: Union[str, bytes]) -> str: return "-".join(''.join(safe_chars).split()) +def get_from_string(msg: Dict, chat_id: str) -> str: + """Return the number or name for the sender""" + if msg["from_me"]: + return "Me" + if msg["sender"]: + return str(msg["sender"]) + return str(chat_id) + + +def get_chat_type(chat_id: str) -> str: + """Return the chat type based on the whatsapp id""" + if chat_id.endswith("@s.whatsapp.net"): + return "personal_chat" + if chat_id.endswith("@g.us"): + return "private_group" + logger.warning("Unknown chat type for %s, defaulting to private_group", chat_id) + return "private_group" + + +def get_from_id(msg: Dict, chat_id: str) -> str: + """Return the user id for the sender""" + if msg["from_me"]: + return "user00000" + if msg["sender"]: + return "user" + msg["sender"] + return f"user{chat_id}" + + +def get_reply_id(data: Dict, reply_key: int) -> Optional[int]: + """Get the id of the message corresponding to the reply""" + if not reply_key: + return None + for msg_id, msg in data["messages"].items(): + if msg["key_id"] == reply_key: + return msg_id + return None + + +def telegram_json_format(jik: str, data: Dict, timezone_offset) -> Dict: + """Convert the data to the Telegram export format""" + timing = Timing(timezone_offset or CURRENT_TZ_OFFSET) + try: + chat_id = int(''.join([c for c in jik if c.isdigit()])) + except ValueError: + # not a real chat: e.g. statusbroadcast + chat_id = 0 + obj = { + "name": data["name"] if data["name"] else jik, + "type": get_chat_type(jik), + "id": chat_id, + "messages": [ { + "id": int(msgId), + "type": "message", + "date": timing.format_timestamp(msg["timestamp"], "%Y-%m-%dT%H:%M:%S"), + "date_unixtime": int(msg["timestamp"]), + "from": get_from_string(msg, chat_id), + "from_id": get_from_id(msg, chat_id), + "reply_to_message_id": get_reply_id(data, msg["reply"]), + "text": msg["data"], + "text_entities": [ + { + # TODO this will lose formatting and different types + "type": "plain", + "text": msg["data"], + } + ], + } for msgId, msg in data["messages"].items()] + } + # remove empty messages and replies + for msg_id, msg in enumerate(obj["messages"]): + if not msg["reply_to_message_id"]: + del obj["messages"][msg_id]["reply_to_message_id"] + obj["messages"] = [m for m in obj["messages"] if m["text"]] + return obj + + class WhatsAppIdentifier(StrEnum): # AppDomainGroup-group.net.whatsapp.WhatsApp.shared-ChatStorage.sqlite MESSAGE = "7c7fba66680ef796b916b067077cc246adacf01d"