From 7a1fa463685cc696a12fb945d88b4bd0d6112734 Mon Sep 17 00:00:00 2001 From: KnugiHK <24708955+KnugiHK@users.noreply.github.com> Date: Thu, 2 Jan 2025 20:48:11 +0800 Subject: [PATCH] Implement call log for iOS #122 --- Whatsapp_Chat_Exporter/__main__.py | 13 ++++ Whatsapp_Chat_Exporter/ios_handler.py | 72 ++++++++++++++++++++- Whatsapp_Chat_Exporter/ios_media_handler.py | 10 +++ Whatsapp_Chat_Exporter/utility.py | 1 + 4 files changed, 95 insertions(+), 1 deletion(-) diff --git a/Whatsapp_Chat_Exporter/__main__.py b/Whatsapp_Chat_Exporter/__main__.py index 4a2600b..fcf231c 100644 --- a/Whatsapp_Chat_Exporter/__main__.py +++ b/Whatsapp_Chat_Exporter/__main__.py @@ -305,6 +305,15 @@ def main(): action='store_true', help="Use the newly designed WhatsApp-alike theme" ) + parser.add_argument( + "--call-db", + dest="call_db_ios", + nargs='?', + default=None, + type=str, + const="1b432994e958845fffe8e2f190f26d1511534088", + help="Path to call database (default: 1b432994e958845fffe8e2f190f26d1511534088) iOS only" + ) args = parser.parse_args() @@ -500,6 +509,10 @@ def main(): vcard(db, data, args.media, args.filter_date, filter_chat) if args.android: android_handler.calls(db, data, args.timezone_offset, filter_chat) + elif args.ios and args.call_db_ios is not None: + with sqlite3.connect(args.call_db_ios) as cdb: + cdb.row_factory = sqlite3.Row + ios_handler.calls(cdb, data, args.timezone_offset, filter_chat) if not args.no_html: if args.enrich_from_vcards is not None and not contact_store.is_empty(): contact_store.enrich_from_vcards(data) diff --git a/Whatsapp_Chat_Exporter/ios_handler.py b/Whatsapp_Chat_Exporter/ios_handler.py index 96fdced..a0296a7 100644 --- a/Whatsapp_Chat_Exporter/ios_handler.py +++ b/Whatsapp_Chat_Exporter/ios_handler.py @@ -7,7 +7,8 @@ from pathlib import Path from mimetypes import MimeTypes from markupsafe import escape as htmle from Whatsapp_Chat_Exporter.data_model import ChatStore, Message -from Whatsapp_Chat_Exporter.utility import APPLE_TIME, CURRENT_TZ_OFFSET, Device, get_chat_condition, slugify +from Whatsapp_Chat_Exporter.utility import APPLE_TIME, CURRENT_TZ_OFFSET, get_chat_condition +from Whatsapp_Chat_Exporter.utility import bytes_to_readable, convert_time_unit, slugify, Device def contacts(db, data): @@ -361,3 +362,72 @@ def vcard(db, data, media_folder, filter_date, filter_chat): message.meta = True message.safe = True print(f"Processing vCards...({index + 1}/{total_row_number})", end="\r") + + +def calls(db, data, timezone_offset, filter_chat): + c = db.cursor() + c.execute(f"""SELECT count() + FROM ZWACDCALLEVENT + WHERE 1=1 + {get_chat_condition(filter_chat[0], True, ["ZGROUPCALLCREATORUSERJIDSTRING"], None, "ios")} + {get_chat_condition(filter_chat[1], False, ["ZGROUPCALLCREATORUSERJIDSTRING"], None, "ios")}""") + total_row_number = c.fetchone()[0] + if total_row_number == 0: + return + print(f"\nProcessing calls...({total_row_number})", end="\r") + c.execute(f"""SELECT ZCALLIDSTRING, + ZGROUPCALLCREATORUSERJIDSTRING, + ZGROUPJIDSTRING, + ZDATE, + ZOUTCOME, + ZBYTESRECEIVED + ZBYTESSENT AS bytes_transferred, + ZDURATION, + ZVIDEO, + ZMISSED, + ZINCOMING + FROM ZWACDCALLEVENT + INNER JOIN ZWAAGGREGATECALLEVENT + ON ZWACDCALLEVENT.Z1CALLEVENTS = ZWAAGGREGATECALLEVENT.Z_PK + WHERE 1=1 + {get_chat_condition(filter_chat[0], True, ["ZGROUPCALLCREATORUSERJIDSTRING"], None, "ios")} + {get_chat_condition(filter_chat[1], False, ["ZGROUPCALLCREATORUSERJIDSTRING"], None, "ios")}""") + chat = ChatStore(Device.ANDROID, "WhatsApp Calls") + content = c.fetchone() + while content is not None: + ts = APPLE_TIME + int(content["ZDATE"]) + call = Message( + from_me=content["ZINCOMING"] == 0, + timestamp=ts, + time=ts, + key_id=content["ZCALLIDSTRING"], + timezone_offset=timezone_offset if timezone_offset else CURRENT_TZ_OFFSET + ) + _jid = content["ZGROUPCALLCREATORUSERJIDSTRING"] + name = data[_jid].name if _jid in data else None + if _jid is not None and "@" in _jid: + fallback = _jid.split('@')[0] + else: + fallback = None + call.sender = name or fallback + call.meta = True + call.data = ( + f"A {'video' if content['ZVIDEO'] == 1 else 'voice'} " + f"call {'to' if call.from_me else 'from'} " + f"{call.sender} was " + ) + if content['ZOUTCOME'] in (1, 4): + call.data += "not answered." if call.from_me else "missed." + elif content['ZOUTCOME'] == 2: + call.data += "failed." + elif content['ZOUTCOME'] == 0: + call_time = convert_time_unit(int(content['ZDURATION'])) + call_bytes = bytes_to_readable(content['bytes_transferred']) + call.data += ( + f"initiated and lasted for {call_time} " + f"with {call_bytes} data transferred." + ) + else: + call.data += "in an unknown state." + chat.add_message(call.key_id, call) + content = c.fetchone() + data["000000000000000"] = chat \ No newline at end of file diff --git a/Whatsapp_Chat_Exporter/ios_media_handler.py b/Whatsapp_Chat_Exporter/ios_media_handler.py index c4360af..dc817b6 100644 --- a/Whatsapp_Chat_Exporter/ios_media_handler.py +++ b/Whatsapp_Chat_Exporter/ios_media_handler.py @@ -35,6 +35,11 @@ def extract_encrypted(base_dir, password, identifiers, decrypt_chunk_size): domain_like=identifiers.DOMAIN, output_filename=identifiers.CONTACT ) + backup.extract_file( + relative_path=RelativePath.WHATSAPP_CALLS, + domain_like=identifiers.DOMAIN, + output_filename=identifiers.CALL + ) except ValueError: print("Failed to decrypt backup: incorrect password?") exit(7) @@ -87,6 +92,7 @@ def extract_media(base_dir, identifiers, decrypt_chunk_size): else: wts_db = os.path.join(base_dir, identifiers.MESSAGE[:2], identifiers.MESSAGE) contact_db = os.path.join(base_dir, identifiers.CONTACT[:2], identifiers.CONTACT) + call_db = os.path.join(base_dir, identifiers.CALL[:2], identifiers.CALL) if not os.path.isfile(wts_db): if identifiers is WhatsAppIdentifier: print("WhatsApp database not found.") @@ -99,6 +105,10 @@ def extract_media(base_dir, identifiers, decrypt_chunk_size): print("Contact database not found. Skipping...") else: shutil.copyfile(contact_db, identifiers.CONTACT) + if not os.path.isfile(call_db): + print("Call database not found. Skipping...") + else: + shutil.copyfile(call_db, identifiers.CALL) _wts_id = identifiers.DOMAIN with sqlite3.connect(os.path.join(base_dir, "Manifest.db")) as manifest: manifest.row_factory = sqlite3.Row diff --git a/Whatsapp_Chat_Exporter/utility.py b/Whatsapp_Chat_Exporter/utility.py index 46826b3..0a72c74 100644 --- a/Whatsapp_Chat_Exporter/utility.py +++ b/Whatsapp_Chat_Exporter/utility.py @@ -416,6 +416,7 @@ def slugify(value, allow_unicode=False): class WhatsAppIdentifier(StrEnum): MESSAGE = "7c7fba66680ef796b916b067077cc246adacf01d" CONTACT = "b8548dc30aa1030df0ce18ef08b882cf7ab5212f" + CALL = "1b432994e958845fffe8e2f190f26d1511534088" DOMAIN = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared"