diff --git a/Whatsapp_Chat_Exporter/data_model.py b/Whatsapp_Chat_Exporter/data_model.py new file mode 100644 index 0000000..2e03c06 --- /dev/null +++ b/Whatsapp_Chat_Exporter/data_model.py @@ -0,0 +1,35 @@ +from datetime import datetime +from typing import Union + + +class ChatStore(): + def __init__(self, name=None): + if name is not None and not isinstance(name, str): + raise TypeError("Name must be a string or None") + self.name = name + self.messages = {} + + def add_message(self, id, message): + if not isinstance(message, Message): + raise TypeError("Chat must be a Chat object") + self.messages[id] = message + + def delete_message(self, id): + if id in self.messages: + del self.messages[id] + +class Message(): + def __init__(self, from_me: Union[bool,int], timestamp: int, time: str, key_id: int): + self.from_me = bool(from_me) + self.timestamp = timestamp / 1000 if timestamp > 9999999999 else timestamp + self.time = datetime.fromtimestamp(time/1000).strftime("%H:%M") + self.media = False + self.key_id = key_id + self.meta = False + self.data = None + self.sender = None + # Extra + self.reply = None + self.quoted_data = None + self.caption = None + diff --git a/Whatsapp_Chat_Exporter/extract_new.py b/Whatsapp_Chat_Exporter/extract_new.py index 4c119a1..25498eb 100644 --- a/Whatsapp_Chat_Exporter/extract_new.py +++ b/Whatsapp_Chat_Exporter/extract_new.py @@ -16,6 +16,8 @@ from enum import Enum from mimetypes import MimeTypes from hashlib import sha256 +from Whatsapp_Chat_Exporter.data_model import ChatStore, Message + try: import zlib from Crypto.Cipher import AES @@ -30,6 +32,7 @@ except ModuleNotFoundError: else: support_crypt15 = True + def sanitize_except(html): return Markup(sanitize(html, tags=["br"])) @@ -42,6 +45,7 @@ def determine_day(last, current): else: return current + CRYPT14_OFFSETS = [ {"iv": 67, "db": 191}, {"iv": 67, "db": 190}, @@ -80,7 +84,7 @@ def _extract_encrypted_key(keyfile): key_stream += byte.to_bytes(1, "big", signed=True) return _generate_hmac_of_hmac(key_stream) - + def decrypt_backup(database, key, output, crypt=Crypt.CRYPT14, show_crypt15=False): if not support_backup: @@ -187,92 +191,104 @@ def contacts(db, data): c.execute("""SELECT jid, display_name FROM wa_contacts; """) row = c.fetchone() while row is not None: - data[row[0]] = {"name": row[1], "messages": {}} + data[row["jid"]] = ChatStore(row["display_name"]) row = c.fetchone() def messages(db, data): # Get message history c = db.cursor() - c.execute("""SELECT count() FROM messages""") + c.execute("""SELECT count() FROM message""") total_row_number = c.fetchone()[0] print(f"Gathering messages...(0/{total_row_number})", end="\r") phone_number_re = re.compile(r"[0-9]+@s.whatsapp.net") - c.execute("""SELECT messages.key_remote_jid, - messages._id, - messages.key_from_me, - messages.timestamp, - messages.data, - messages.status, - messages.edit_version, - messages.thumb_image, - messages.remote_resource, - messages.media_wa_type, - messages.latitude, - messages.longitude, - messages_quotes.key_id as quoted, - messages.key_id, - messages_quotes.data, - messages.media_caption - FROM messages - LEFT JOIN messages_quotes - ON messages.quoted_row_id = messages_quotes._id - WHERE messages.key_remote_jid <> '-1';""") + c.execute("""SELECT jid.raw_string as key_remote_jid, + message._id, + message.from_me as key_from_me, + message.timestamp, + message.text_data as data, + message.status, + message_future.version as edit_version, + message_thumbnail.thumbnail as thumb_image, + message_media.file_path as remote_resource, + message_media.mime_type as media_wa_type, + message_location.latitude, + message_location.longitude, + message_quoted.key_id as quoted, + message.key_id, + message_quoted.text_data as quoted_data, + message_media.media_caption, + chat.subject as chat_subject + FROM message + LEFT JOIN message_quoted + ON message_quoted.message_row_id = message._id + LEFT JOIN message_location + ON message_location.message_row_id = message._id + LEFT JOIN message_media + ON message_media.message_row_id = message._id + LEFT JOIN message_thumbnail + ON message_thumbnail.message_row_id = message._id + LEFT JOIN message_future + ON message_future.message_row_id = message._id + LEFT JOIN chat + ON chat._id = message.chat_row_id + INNER JOIN jid + ON jid._id = chat.jid_row_id + WHERE key_remote_jid <> '-1';""") i = 0 content = c.fetchone() while content is not None: - if content[0] not in data: - data[content[0]] = {"name": None, "messages": {}} - data[content[0]]["messages"][content[1]] = { - "from_me": bool(content[2]), - "timestamp": content[3]/1000, - "time": datetime.fromtimestamp(content[3]/1000).strftime("%H:%M"), - "media": False, - "key_id": content[13], - "meta": False, - "data": None - } - if "-" in content[0] and content[2] == 0: + if content["key_remote_jid"] not in data: + data[content["key_remote_jid"]] = ChatStore() + if content["key_remote_jid"] is None: + continue + data[content["key_remote_jid"]].add_message(content["_id"], Message( + from_me=content["key_from_me"], + timestamp=content["timestamp"], + time=content["timestamp"], + key_id=content["key_id"], + )) + if "-" in content["key_remote_jid"] and content["key_from_me"] == 0: name = None - if content[8] in data: - name = data[content[8]]["name"] - if "@" in content[8]: - fallback = content[8].split('@')[0] + if content["remote_resource"] in data: + name = data[content["remote_resource"]]["name"] + if "@" in content["remote_resource"]: + fallback = content["remote_resource"].split('@')["key_remote_jid"] else: fallback = None else: fallback = None - data[content[0]]["messages"][content[1]]["sender"] = name or fallback + data[content["key_remote_jid"]].messages[content["_id"]].sender = name or fallback else: - data[content[0]]["messages"][content[1]]["sender"] = None + data[content["key_remote_jid"]].messages[content["_id"]].sender = None - if content[12] is not None: - data[content[0]]["messages"][content[1]]["reply"] = content[12] - data[content[0]]["messages"][content[1]]["quoted_data"] = content[14] + if content["quoted"] is not None: + data[content["key_remote_jid"]].messages[content["_id"]].reply = content["quoted"] + data[content["key_remote_jid"]].messages[content["_id"]].quoted_data = content["quoted_data"] else: - data[content[0]]["messages"][content[1]]["reply"] = None + data[content["key_remote_jid"]].messages[content["_id"]].reply = None - if content[15] is not None: - data[content[0]]["messages"][content[1]]["caption"] = content[15] + if content["key_id"] is not None: + data[content["key_remote_jid"]].messages[content["_id"]].caption = content["key_id"] else: - data[content[0]]["messages"][content[1]]["caption"] = None + data[content["key_remote_jid"]].messages[content["_id"]].caption = None - if content[5] == 6: - if "-" in content[0]: + if content["status"] == 6: + if content["chat_subject"] is not None: # Is Group - if content[4] is not None: + if content["data"] is not None: try: - int(content[4]) + int(content["data"]) except ValueError: - msg = f"The group name changed to {content[4]}" - data[content[0]]["messages"][content[1]]["data"] = msg - data[content[0]]["messages"][content[1]]["meta"] = True + msg = f"The group name changed to {content['data']}" + data[content["key_remote_jid"]].messages[content["_id"]].data = msg + data[content["key_remote_jid"]].messages[content["_id"]].meta = True else: - del data[content[0]]["messages"][content[1]] + data[content["key_remote_jid"]].delete_message(content["_id"]) else: - thumb_image = content[7] + thumb_image = content["thumb_image"] if thumb_image is not None: if b"\x00\x00\x01\x74\x00\x1A" in thumb_image: # Add user @@ -282,62 +298,62 @@ def messages(db, data): name_right = data[added]["name"] else: name_right = added.split('@')[0] - if content[8] is not None: - if content[8] in data: - name_left = data[content[8]]["name"] + if content["remote_resource"] is not None: + if content["remote_resource"] in data: + name_left = data[content["remote_resource"]]["name"] else: - name_left = content[8].split('@')[0] + name_left = content["remote_resource"].split('@')[0] msg = f"{name_left} added {name_right or 'You'}" else: msg = f"Added {name_right or 'You'}" elif b"\xac\xed\x00\x05\x74\x00" in thumb_image: # Changed number - original = content[8].split('@')[0] + original = content["remote_resource"].split('@')[0] changed = thumb_image[7:].decode().split('@')[0] msg = f"{original} changed to {changed}" - data[content[0]]["messages"][content[1]]["data"] = msg - data[content[0]]["messages"][content[1]]["meta"] = True + data[content["key_remote_jid"]].messages[content["_id"]].data = msg + data[content["key_remote_jid"]].messages[content["_id"]].meta = True else: - if content[4] is None: - del data[content[0]]["messages"][content[1]] + if content["data"] is None: + del data[content["key_remote_jid"]].messages[content["_id"]] else: # Private chat - if content[4] is None and content[7] is None: - del data[content[0]]["messages"][content[1]] + if content["data"] is None and content["thumb_image"] is None: + data[content["key_remote_jid"]].delete_message(content["_id"]) else: - if content[2] == 1: - if content[5] == 5 and content[6] == 7: + if content["key_from_me"] == 1: + if content["status"] == 5 and content["edit_version"] == 7: msg = "Message deleted" - data[content[0]]["messages"][content[1]]["meta"] = True + data[content["key_remote_jid"]].messages[content["_id"]].meta = True else: - if content[9] == "5": + if content["media_wa_type"] == "5": msg = f"Location shared: {content[10], content[11]}" - data[content[0]]["messages"][content[1]]["meta"] = True + data[content["key_remote_jid"]].messages[content["_id"]].meta = True else: - msg = content[4] + msg = content["data"] if msg is not None: if "\r\n" in msg: msg = msg.replace("\r\n", "
") if "\n" in msg: msg = msg.replace("\n", "
") else: - if content[5] == 0 and content[6] == 7: + if content["status"] == 0 and content["edit_version"] == 7: msg = "Message deleted" - data[content[0]]["messages"][content[1]]["meta"] = True + data[content["key_remote_jid"]].messages[content["_id"]].meta = True else: - if content[9] == "5": + if content["media_wa_type"] == "5": msg = f"Location shared: {content[10], content[11]}" - data[content[0]]["messages"][content[1]]["meta"] = True + data[content["key_remote_jid"]].messages[content["_id"]].meta = True else: - msg = content[4] + msg = content["data"] if msg is not None: if "\r\n" in msg: msg = msg.replace("\r\n", "
") if "\n" in msg: msg = msg.replace("\n", "
") - data[content[0]]["messages"][content[1]]["data"] = msg + data[content["key_remote_jid"]].messages[content["_id"]].data = msg i += 1 if i % 1000 == 0: @@ -353,45 +369,50 @@ def media(db, data, media_folder): total_row_number = c.fetchone()[0] print(f"\nGathering media...(0/{total_row_number})", end="\r") i = 0 - c.execute("""SELECT messages.key_remote_jid, + c.execute("""SELECT jid.raw_string, message_row_id, file_path, message_url, mime_type, media_key FROM message_media - INNER JOIN messages - ON message_media.message_row_id = messages._id - ORDER BY messages.key_remote_jid ASC""") + INNER JOIN message + ON message_media.message_row_id = message._id + LEFT JOIN chat + ON chat._id = message.chat_row_id + INNER JOIN jid + ON jid._id = chat.jid_row_id + + ORDER BY jid.raw_string ASC""") content = c.fetchone() mime = MimeTypes() while content is not None: - file_path = f"{media_folder}/{content[2]}" - data[content[0]]["messages"][content[1]]["media"] = True + file_path = f"{media_folder}/{content['file_path']}" + data[content["raw_string"]].messages[content["message_row_id"]].media = True if os.path.isfile(file_path): - data[content[0]]["messages"][content[1]]["data"] = file_path - if content[4] is None: + data[content["raw_string"]].messages[content["message_row_id"]].data = file_path + if content["mime_type"] is None: guess = mime.guess_type(file_path)[0] if guess is not None: - data[content[0]]["messages"][content[1]]["mime"] = guess + data[content["raw_string"]].messages[content["message_row_id"]].mime = guess else: - data[content[0]]["messages"][content[1]]["mime"] = "data/data" + data[content["raw_string"]].messages[content["message_row_id"]].mime = "data/data" else: - data[content[0]]["messages"][content[1]]["mime"] = content[4] + data[content["raw_string"]].messages[content["message_row_id"]].mime = content["mime_type"] else: - # if "https://mmg" in content[4]: + # if "https://mmg" in content["mime_type"]: # try: - # r = requests.get(content[3]) + # r = requests.get(content["message_url"]) # if r.status_code != 200: # raise RuntimeError() # except: - # data[content[0]]["messages"][content[1]]["data"] = "{The media is missing}" - # data[content[0]]["messages"][content[1]]["media"] = True - # data[content[0]]["messages"][content[1]]["mime"] = "media" + # data[content["raw_string"]].messages[content["message_row_id"]].data = "{The media is missing}" + # data[content["raw_string"]].messages[content["message_row_id"]].media = True + # data[content["raw_string"]].messages[content["message_row_id"]].mime = "media" # else: - data[content[0]]["messages"][content[1]]["data"] = "The media is missing" - data[content[0]]["messages"][content[1]]["mime"] = "media" - data[content[0]]["messages"][content[1]]["meta"] = True + data[content["raw_string"]].messages[content["message_row_id"]].data = "The media is missing" + data[content["raw_string"]].messages[content["message_row_id"]].mime = "media" + data[content["raw_string"]].messages[content["message_row_id"]].meta = True i += 1 if i % 100 == 0: print(f"Gathering media...({i}/{total_row_number})", end="\r") @@ -403,13 +424,17 @@ def media(db, data, media_folder): def vcard(db, data): c = db.cursor() c.execute("""SELECT message_row_id, - messages.key_remote_jid, + jid.raw_string, vcard, - messages.media_name - FROM messages_vcards - INNER JOIN messages - ON messages_vcards.message_row_id = messages._id - ORDER BY messages.key_remote_jid ASC;""") + message.text_data + FROM message_vcard + INNER JOIN message + ON message_vcard.message_row_id = message._id + LEFT JOIN chat + ON chat._id = message.chat_row_id + INNER JOIN jid + ON jid._id = chat.jid_row_id + ORDER BY message.chat_row_id ASC;""") rows = c.fetchall() total_row_number = len(rows) print(f"\nGathering vCards...(0/{total_row_number})", end="\r") @@ -417,17 +442,17 @@ def vcard(db, data): if not os.path.isdir(base): Path(base).mkdir(parents=True, exist_ok=True) for index, row in enumerate(rows): - media_name = row[3] if row[3] else "" + media_name = row["text_data"] if row["text_data"] else "" file_name = "".join(x for x in media_name if x.isalnum()) file_path = f"{base}/{file_name}.vcf" if not os.path.isfile(file_path): with open(file_path, "w", encoding="utf-8") as f: - f.write(row[2]) - data[row[1]]["messages"][row[0]]["data"] = media_name + \ + f.write(row["vcard"]) + data[row["raw_string"]].messages[row["message_row_id"]].data = media_name + \ "The vCard file cannot be displayed here, " \ f"however it should be located at {file_path}" - data[row[1]]["messages"][row[0]]["mime"] = "text/x-vcard" - data[row[1]]["messages"][row[0]]["meta"] = True + data[row["raw_string"]].messages[row["message_row_id"]].mime = "text/x-vcard" + data[row["raw_string"]].messages[row["message_row_id"]].meta = True print(f"Gathering vCards...({index + 1}/{total_row_number})", end="\r") @@ -451,28 +476,29 @@ def create_html(data, output_folder, template=None, embedded=False): os.mkdir(output_folder) for current, contact in enumerate(data): - if len(data[contact]["messages"]) == 0: + if len(data[contact].messages) == 0: continue phone_number = contact.split('@')[0] + file_name = phone_number if "-" in contact: file_name = "" else: file_name = phone_number - if data[contact]["name"] is not None: + if data[contact].name is not None: if file_name != "": file_name += "-" - file_name += data[contact]["name"].replace("/", "-") - name = data[contact]["name"] + file_name += data[contact].name.replace("/", "-") + name = data[contact].name else: name = phone_number - safe_file_name = '' + safe_file_name = "".join(x for x in file_name if x.isalnum() or x in "- ") with open(f"{output_folder}/{safe_file_name}.html", "w", encoding="utf-8") as f: f.write( template.render( name=name, - msgs=data[contact]["messages"].values(), + msgs=data[contact].messages.values(), my_avatar=None, their_avatar=f"WhatsApp/Avatars/{contact}.j" )