mirror of
https://github.com/KnugiHK/WhatsApp-Chat-Exporter.git
synced 2026-04-25 23:41:33 +00:00
Merge branch 'dev'
This commit is contained in:
@@ -4,8 +4,9 @@ except ImportError:
|
||||
from Whatsapp_Chat_Exporter.__init__ import __version__
|
||||
from Whatsapp_Chat_Exporter import extract, extract_iphone
|
||||
from Whatsapp_Chat_Exporter import extract_iphone_media
|
||||
from Whatsapp_Chat_Exporter.extract import Crypt
|
||||
from optparse import OptionParser
|
||||
from Whatsapp_Chat_Exporter.data_model import ChatStore
|
||||
from Whatsapp_Chat_Exporter.utility import Crypt
|
||||
from argparse import ArgumentParser
|
||||
import os
|
||||
import sqlite3
|
||||
import shutil
|
||||
@@ -15,116 +16,167 @@ from sys import exit
|
||||
|
||||
|
||||
def main():
|
||||
parser = OptionParser(version=f"Whatsapp Chat Exporter: {__version__}")
|
||||
parser.add_option(
|
||||
parser = ArgumentParser(
|
||||
description = 'A customizable Android and iPhone WhatsApp database parser that '
|
||||
'will give you the history of your WhatsApp conversations inHTML '
|
||||
'and JSON. Android Backup Crypt12, Crypt14 and Crypt15 supported.',
|
||||
epilog = f'WhatsApp Chat Exporter: {__version__} Licensed with MIT'
|
||||
)
|
||||
parser.add_argument(
|
||||
'-a',
|
||||
'--android',
|
||||
dest='android',
|
||||
default=False,
|
||||
action='store_true',
|
||||
help="Define the target as Android")
|
||||
parser.add_option(
|
||||
parser.add_argument(
|
||||
'-i',
|
||||
'--iphone',
|
||||
'--ios',
|
||||
dest='iphone',
|
||||
default=False,
|
||||
action='store_true',
|
||||
help="Define the target as iPhone")
|
||||
parser.add_option(
|
||||
parser.add_argument(
|
||||
"-w",
|
||||
"--wa",
|
||||
dest="wa",
|
||||
default=None,
|
||||
help="Path to contact database")
|
||||
parser.add_option(
|
||||
help="Path to contact database (default: wa.db/ContactsV2.sqlite)")
|
||||
parser.add_argument(
|
||||
"-m",
|
||||
"--media",
|
||||
dest="media",
|
||||
default=None,
|
||||
help="Path to WhatsApp media folder")
|
||||
parser.add_option(
|
||||
help="Path to WhatsApp media folder (default: WhatsApp)")
|
||||
parser.add_argument(
|
||||
"-b",
|
||||
"--backup",
|
||||
dest="backup",
|
||||
default=None,
|
||||
help="Path to Android (must be used together "
|
||||
"with -k)/iPhone WhatsApp backup")
|
||||
parser.add_option(
|
||||
parser.add_argument(
|
||||
"-o",
|
||||
"--output",
|
||||
dest="output",
|
||||
default="result",
|
||||
help="Output to specific directory")
|
||||
parser.add_option(
|
||||
help="Output to specific directory (default: result)")
|
||||
parser.add_argument(
|
||||
'-j',
|
||||
'--json',
|
||||
dest='json',
|
||||
default=False,
|
||||
action='store_true',
|
||||
help="Save the result to a single JSON file")
|
||||
parser.add_option(
|
||||
nargs='?',
|
||||
default=None,
|
||||
type=str,
|
||||
const="result.json",
|
||||
help="Save the result to a single JSON file (default if present: result.json)")
|
||||
parser.add_argument(
|
||||
'-d',
|
||||
'--db',
|
||||
dest='db',
|
||||
default=None,
|
||||
help="Path to database file")
|
||||
parser.add_option(
|
||||
help="Path to database file (default: msgstore.db/"
|
||||
"7c7fba66680ef796b916b067077cc246adacf01d)")
|
||||
parser.add_argument(
|
||||
'-k',
|
||||
'--key',
|
||||
dest='key',
|
||||
default=None,
|
||||
help="Path to key file"
|
||||
)
|
||||
parser.add_option(
|
||||
parser.add_argument(
|
||||
"-t",
|
||||
"--template",
|
||||
dest="template",
|
||||
default=None,
|
||||
help="Path to custom HTML template")
|
||||
parser.add_option(
|
||||
help="Path to custom HTML template"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-e",
|
||||
"--embedded",
|
||||
dest="embedded",
|
||||
default=False,
|
||||
action='store_true',
|
||||
help="Embed media into HTML file")
|
||||
(options, args) = parser.parse_args()
|
||||
help="Embed media into HTML file (not yet implemented)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-s",
|
||||
"--showkey",
|
||||
dest="showkey",
|
||||
default=False,
|
||||
action='store_true',
|
||||
help="Show the HEX key used to decrypt the database"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-c",
|
||||
"--move-media",
|
||||
dest="move_media",
|
||||
default=False,
|
||||
action='store_true',
|
||||
help="Move the media directory to output directory if the flag is set, otherwise copy it"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--offline",
|
||||
dest="offline",
|
||||
default=None,
|
||||
help="Relative path to offline static files"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--size",
|
||||
"--output-size",
|
||||
dest="size",
|
||||
default=None,
|
||||
help="Maximum size of a single output file in bytes, 0 for auto (not yet implemented)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--no-html",
|
||||
dest="no_html",
|
||||
default=False,
|
||||
action='store_true',
|
||||
help="Do not output html files"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
if options.android and options.iphone:
|
||||
if args.android and args.iphone:
|
||||
print("You must define only one device type.")
|
||||
exit(1)
|
||||
if not options.android and not options.iphone:
|
||||
if not args.android and not args.iphone:
|
||||
print("You must define the device type.")
|
||||
exit(1)
|
||||
if args.no_html and not args.json:
|
||||
print("You must either specify a JSON output file or enable HTML output.")
|
||||
exit(1)
|
||||
|
||||
data = {}
|
||||
|
||||
if options.android:
|
||||
if args.android:
|
||||
contacts = extract.contacts
|
||||
messages = extract.messages
|
||||
media = extract.media
|
||||
vcard = extract.vcard
|
||||
create_html = extract.create_html
|
||||
if options.db is None:
|
||||
if args.db is None:
|
||||
msg_db = "msgstore.db"
|
||||
else:
|
||||
msg_db = options.db
|
||||
if options.key is not None:
|
||||
if options.backup is None:
|
||||
msg_db = args.db
|
||||
if args.key is not None:
|
||||
if args.backup is None:
|
||||
print("You must specify the backup file with -b")
|
||||
exit(1)
|
||||
print("Decryption key specified, decrypting WhatsApp backup...")
|
||||
if "crypt12" in options.backup:
|
||||
if "crypt12" in args.backup:
|
||||
crypt = Crypt.CRYPT12
|
||||
elif "crypt14" in options.backup:
|
||||
elif "crypt14" in args.backup:
|
||||
crypt = Crypt.CRYPT14
|
||||
elif "crypt15" in options.backup:
|
||||
elif "crypt15" in args.backup:
|
||||
crypt = Crypt.CRYPT15
|
||||
if os.path.isfile(options.key):
|
||||
key = open(options.key, "rb")
|
||||
elif all(char in string.hexdigits for char in options.key):
|
||||
key = bytes.fromhex(options.key)
|
||||
db = open(options.backup, "rb").read()
|
||||
error = extract.decrypt_backup(db, key, msg_db, crypt)
|
||||
if os.path.isfile(args.key):
|
||||
key = open(args.key, "rb")
|
||||
elif all(char in string.hexdigits for char in args.key):
|
||||
key = bytes.fromhex(args.key)
|
||||
db = open(args.backup, "rb").read()
|
||||
error = extract.decrypt_backup(db, key, msg_db, crypt, args.showkey)
|
||||
if error != 0:
|
||||
if error == 1:
|
||||
print("Dependencies of decrypt_backup and/or extract_encrypted_key"
|
||||
@@ -135,67 +187,81 @@ def main():
|
||||
"Possibly incorrect offsets used in decryption.")
|
||||
exit(4)
|
||||
else:
|
||||
print("Unknown error occurred.")
|
||||
print("Unknown error occurred.", error)
|
||||
exit(5)
|
||||
if options.wa is None:
|
||||
if args.wa is None:
|
||||
contact_db = "wa.db"
|
||||
else:
|
||||
contact_db = options.wa
|
||||
if options.media is None:
|
||||
options.media = "WhatsApp"
|
||||
|
||||
if len(args) == 1:
|
||||
msg_db = args[0]
|
||||
contact_db = args.wa
|
||||
if args.media is None:
|
||||
args.media = "WhatsApp"
|
||||
|
||||
if os.path.isfile(contact_db):
|
||||
with sqlite3.connect(contact_db) as db:
|
||||
db.row_factory = sqlite3.Row
|
||||
contacts(db, data)
|
||||
|
||||
elif options.iphone:
|
||||
elif args.iphone:
|
||||
import sys
|
||||
if "--iphone" in sys.argv:
|
||||
print("WARNING: The --iphone flag is deprecated and will be removed in the future. Use --ios instead.")
|
||||
messages = extract_iphone.messages
|
||||
media = extract_iphone.media
|
||||
vcard = extract_iphone.vcard
|
||||
create_html = extract_iphone.create_html
|
||||
if options.backup is not None:
|
||||
extract_iphone_media.extract_media(options.backup)
|
||||
if options.db is None:
|
||||
if args.backup is not None:
|
||||
extract_iphone_media.extract_media(args.backup)
|
||||
if args.db is None:
|
||||
msg_db = "7c7fba66680ef796b916b067077cc246adacf01d"
|
||||
else:
|
||||
msg_db = options.db
|
||||
if options.wa is None:
|
||||
msg_db = args.db
|
||||
if args.wa is None:
|
||||
contact_db = "ContactsV2.sqlite"
|
||||
else:
|
||||
contact_db = options.wa
|
||||
if options.media is None:
|
||||
options.media = "Message"
|
||||
|
||||
if len(args) == 1:
|
||||
msg_db = args[0]
|
||||
contact_db = args.wa
|
||||
if args.media is None:
|
||||
args.media = "Message"
|
||||
|
||||
if os.path.isfile(msg_db):
|
||||
with sqlite3.connect(msg_db) as db:
|
||||
db.row_factory = sqlite3.Row
|
||||
messages(db, data)
|
||||
media(db, data, options.media)
|
||||
media(db, data, args.media)
|
||||
vcard(db, data)
|
||||
create_html(data, options.output, options.template, options.embedded)
|
||||
if not args.no_html:
|
||||
create_html(
|
||||
data,
|
||||
args.output,
|
||||
args.template,
|
||||
args.embedded,
|
||||
args.offline,
|
||||
args.size
|
||||
)
|
||||
else:
|
||||
print(
|
||||
"The message database does not exist. You may specify the path "
|
||||
"to database file with option -d or check your provided path.",
|
||||
end="\r"
|
||||
"to database file with option -d or check your provided path."
|
||||
)
|
||||
exit(2)
|
||||
|
||||
if os.path.isdir(options.media) and \
|
||||
not os.path.isdir(f"{options.output}/{options.media}"):
|
||||
try:
|
||||
shutil.move(options.media, f"{options.output}/")
|
||||
except PermissionError:
|
||||
print("Cannot remove original WhatsApp directory. "
|
||||
"Perhaps the directory is opened?")
|
||||
if os.path.isdir(args.media):
|
||||
if os.path.isdir(f"{args.output}/{args.media}"):
|
||||
print("Media directory already exists in output directory. Skipping...")
|
||||
else:
|
||||
if not args.move_media:
|
||||
print("Copying media directory...")
|
||||
shutil.copytree(args.media, f"{args.output}/WhatsApp")
|
||||
else:
|
||||
try:
|
||||
shutil.move(args.media, f"{args.output}/")
|
||||
except PermissionError:
|
||||
print("Cannot remove original WhatsApp directory. "
|
||||
"Perhaps the directory is opened?")
|
||||
|
||||
if options.json:
|
||||
with open("result.json", "w") as f:
|
||||
if args.json:
|
||||
if isinstance(data[next(iter(data))], ChatStore):
|
||||
data = {jik: chat.to_json() for jik, chat in data.items()}
|
||||
with open(args.json, "w") as f:
|
||||
data = json.dumps(data)
|
||||
print(f"\nWriting JSON file...({int(len(data)/1024/1024)}MB)")
|
||||
f.write(data)
|
||||
|
||||
53
Whatsapp_Chat_Exporter/data_model.py
Normal file
53
Whatsapp_Chat_Exporter/data_model.py
Normal file
@@ -0,0 +1,53 @@
|
||||
from datetime import datetime
|
||||
from typing import Union
|
||||
|
||||
|
||||
class ChatStore():
|
||||
def __init__(self, name=None):
|
||||
if name is not None and not isinstance(name, str):
|
||||
raise TypeError("Name must be a string or None")
|
||||
self.name = name
|
||||
self.messages = {}
|
||||
|
||||
def add_message(self, id, message):
|
||||
if not isinstance(message, Message):
|
||||
raise TypeError("Chat must be a Chat object")
|
||||
self.messages[id] = message
|
||||
|
||||
def delete_message(self, id):
|
||||
if id in self.messages:
|
||||
del self.messages[id]
|
||||
|
||||
def to_json(self):
|
||||
serialized_msgs = {id : msg.to_json() for id,msg in self.messages.items()}
|
||||
return {'name' : self.name, 'messages' : serialized_msgs}
|
||||
|
||||
class Message():
|
||||
def __init__(self, from_me: Union[bool,int], timestamp: int, time: str, key_id: int):
|
||||
self.from_me = bool(from_me)
|
||||
self.timestamp = timestamp / 1000 if timestamp > 9999999999 else timestamp
|
||||
self.time = datetime.fromtimestamp(time/1000).strftime("%H:%M")
|
||||
self.media = False
|
||||
self.key_id = key_id
|
||||
self.meta = False
|
||||
self.data = None
|
||||
self.sender = None
|
||||
# Extra
|
||||
self.reply = None
|
||||
self.quoted_data = None
|
||||
self.caption = None
|
||||
|
||||
def to_json(self):
|
||||
return {
|
||||
'from_me' : self.from_me,
|
||||
'timestamp' : self.timestamp,
|
||||
'time' : self.time,
|
||||
'media' : self.media,
|
||||
'key_id' : self.key_id,
|
||||
'meta' : self.meta,
|
||||
'data' : self.data,
|
||||
'sender' : self.sender,
|
||||
'reply' : self.reply,
|
||||
'quoted_data' : self.quoted_data,
|
||||
'caption' : self.caption
|
||||
}
|
||||
@@ -9,12 +9,11 @@ import re
|
||||
import io
|
||||
import hmac
|
||||
from pathlib import Path
|
||||
from bleach import clean as sanitize
|
||||
from markupsafe import Markup
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from mimetypes import MimeTypes
|
||||
from hashlib import sha256
|
||||
from Whatsapp_Chat_Exporter.data_model import ChatStore, Message
|
||||
from Whatsapp_Chat_Exporter.utility import sanitize_except, determine_day, Crypt
|
||||
from Whatsapp_Chat_Exporter.utility import brute_force_offset, CRYPT14_OFFSETS
|
||||
|
||||
try:
|
||||
import zlib
|
||||
@@ -30,36 +29,6 @@ except ModuleNotFoundError:
|
||||
else:
|
||||
support_crypt15 = True
|
||||
|
||||
def sanitize_except(html):
|
||||
return Markup(sanitize(html, tags=["br"]))
|
||||
|
||||
|
||||
def determine_day(last, current):
|
||||
last = datetime.fromtimestamp(last).date()
|
||||
current = datetime.fromtimestamp(current).date()
|
||||
if last == current:
|
||||
return None
|
||||
else:
|
||||
return current
|
||||
|
||||
CRYPT14_OFFSETS = [
|
||||
{"iv": 67, "db": 191},
|
||||
{"iv": 67, "db": 190},
|
||||
{"iv": 66, "db": 99}
|
||||
]
|
||||
|
||||
|
||||
class Crypt(Enum):
|
||||
CRYPT15 = 15
|
||||
CRYPT14 = 14
|
||||
CRYPT12 = 12
|
||||
|
||||
|
||||
def brute_force_offset():
|
||||
for iv in range(0, 200):
|
||||
for db in range(0, 200):
|
||||
yield iv, iv + 16, db
|
||||
|
||||
|
||||
def _generate_hmac_of_hmac(key_stream):
|
||||
key = hmac.new(
|
||||
@@ -71,7 +40,7 @@ def _generate_hmac_of_hmac(key_stream):
|
||||
b"backup encryption\x01",
|
||||
sha256
|
||||
)
|
||||
return key.digest()
|
||||
return key.digest(), key_stream
|
||||
|
||||
|
||||
def _extract_encrypted_key(keyfile):
|
||||
@@ -82,7 +51,7 @@ def _extract_encrypted_key(keyfile):
|
||||
return _generate_hmac_of_hmac(key_stream)
|
||||
|
||||
|
||||
def decrypt_backup(database, key, output, crypt=Crypt.CRYPT14):
|
||||
def decrypt_backup(database, key, output, crypt=Crypt.CRYPT14, show_crypt15=False):
|
||||
if not support_backup:
|
||||
return 1
|
||||
if isinstance(key, io.IOBase):
|
||||
@@ -91,6 +60,7 @@ def decrypt_backup(database, key, output, crypt=Crypt.CRYPT14):
|
||||
t1 = key[30:62]
|
||||
if crypt is not Crypt.CRYPT15 and len(key) != 158:
|
||||
raise ValueError("The key file must be 158 bytes")
|
||||
# Determine the IV and database offsets
|
||||
if crypt == Crypt.CRYPT14:
|
||||
if len(database) < 191:
|
||||
raise ValueError("The crypt14 file must be at least 191 bytes")
|
||||
@@ -120,9 +90,12 @@ def decrypt_backup(database, key, output, crypt=Crypt.CRYPT14):
|
||||
|
||||
if crypt == Crypt.CRYPT15:
|
||||
if len(key) == 32:
|
||||
main_key = _generate_hmac_of_hmac(key)
|
||||
main_key, hex_key = _generate_hmac_of_hmac(key)
|
||||
else:
|
||||
main_key = _extract_encrypted_key(key)
|
||||
main_key, hex_key = _extract_encrypted_key(key)
|
||||
if show_crypt15:
|
||||
hex_key = [hex_key.hex()[c:c+4] for c in range(0, len(hex_key.hex()), 4)]
|
||||
print("The HEX key of the crypt15 backup is: " + ' '.join(hex_key))
|
||||
else:
|
||||
main_key = key[126:]
|
||||
decompressed = False
|
||||
@@ -184,157 +157,222 @@ def contacts(db, data):
|
||||
c.execute("""SELECT jid, display_name FROM wa_contacts; """)
|
||||
row = c.fetchone()
|
||||
while row is not None:
|
||||
data[row[0]] = {"name": row[1], "messages": {}}
|
||||
data[row["jid"]] = ChatStore(row["display_name"])
|
||||
row = c.fetchone()
|
||||
|
||||
|
||||
def messages(db, data):
|
||||
# Get message history
|
||||
c = db.cursor()
|
||||
c.execute("""SELECT count() FROM messages""")
|
||||
try:
|
||||
c.execute("""SELECT count() FROM messages""")
|
||||
except sqlite3.OperationalError:
|
||||
c.execute("""SELECT count() FROM message""")
|
||||
total_row_number = c.fetchone()[0]
|
||||
print(f"Gathering messages...(0/{total_row_number})", end="\r")
|
||||
|
||||
phone_number_re = re.compile(r"[0-9]+@s.whatsapp.net")
|
||||
c.execute("""SELECT messages.key_remote_jid,
|
||||
messages._id,
|
||||
messages.key_from_me,
|
||||
messages.timestamp,
|
||||
messages.data,
|
||||
messages.status,
|
||||
messages.edit_version,
|
||||
messages.thumb_image,
|
||||
messages.remote_resource,
|
||||
messages.media_wa_type,
|
||||
messages.latitude,
|
||||
messages.longitude,
|
||||
messages_quotes.key_id as quoted,
|
||||
messages.key_id,
|
||||
messages_quotes.data,
|
||||
messages.media_caption
|
||||
FROM messages
|
||||
LEFT JOIN messages_quotes
|
||||
ON messages.quoted_row_id = messages_quotes._id
|
||||
WHERE messages.key_remote_jid <> '-1';""")
|
||||
try:
|
||||
c.execute("""SELECT messages.key_remote_jid,
|
||||
messages._id,
|
||||
messages.key_from_me,
|
||||
messages.timestamp,
|
||||
messages.data,
|
||||
messages.status,
|
||||
messages.edit_version,
|
||||
messages.thumb_image,
|
||||
messages.remote_resource,
|
||||
messages.media_wa_type,
|
||||
messages.latitude,
|
||||
messages.longitude,
|
||||
messages_quotes.key_id as quoted,
|
||||
messages.key_id,
|
||||
messages_quotes.data as quoted_data,
|
||||
messages.media_caption
|
||||
FROM messages
|
||||
LEFT JOIN messages_quotes
|
||||
ON messages.quoted_row_id = messages_quotes._id
|
||||
WHERE messages.key_remote_jid <> '-1';"""
|
||||
)
|
||||
except sqlite3.OperationalError:
|
||||
try:
|
||||
c.execute("""SELECT jid_global.raw_string as key_remote_jid,
|
||||
message._id,
|
||||
message.from_me as key_from_me,
|
||||
message.timestamp,
|
||||
message.text_data as data,
|
||||
message.status,
|
||||
message_future.version as edit_version,
|
||||
message_thumbnail.thumbnail as thumb_image,
|
||||
message_media.file_path as remote_resource,
|
||||
message_media.mime_type as media_wa_type,
|
||||
message_location.latitude,
|
||||
message_location.longitude,
|
||||
message_quoted.key_id as quoted,
|
||||
message.key_id,
|
||||
message_quoted.text_data as quoted_data,
|
||||
message.message_type,
|
||||
jid_group.raw_string as group_sender_jid,
|
||||
chat.subject as chat_subject
|
||||
FROM message
|
||||
LEFT JOIN message_quoted
|
||||
ON message_quoted.message_row_id = message._id
|
||||
LEFT JOIN message_location
|
||||
ON message_location.message_row_id = message._id
|
||||
LEFT JOIN message_media
|
||||
ON message_media.message_row_id = message._id
|
||||
LEFT JOIN message_thumbnail
|
||||
ON message_thumbnail.message_row_id = message._id
|
||||
LEFT JOIN message_future
|
||||
ON message_future.message_row_id = message._id
|
||||
LEFT JOIN chat
|
||||
ON chat._id = message.chat_row_id
|
||||
INNER JOIN jid jid_global
|
||||
ON jid_global._id = chat.jid_row_id
|
||||
LEFT JOIN jid jid_group
|
||||
ON jid_group._id = message.sender_jid_row_id
|
||||
WHERE key_remote_jid <> '-1';"""
|
||||
)
|
||||
except Exception as e:
|
||||
raise e
|
||||
else:
|
||||
table_message = True
|
||||
else:
|
||||
table_message = False
|
||||
i = 0
|
||||
content = c.fetchone()
|
||||
while content is not None:
|
||||
if content[0] not in data:
|
||||
data[content[0]] = {"name": None, "messages": {}}
|
||||
data[content[0]]["messages"][content[1]] = {
|
||||
"from_me": bool(content[2]),
|
||||
"timestamp": content[3]/1000,
|
||||
"time": datetime.fromtimestamp(content[3]/1000).strftime("%H:%M"),
|
||||
"media": False,
|
||||
"key_id": content[13],
|
||||
"meta": False,
|
||||
"data": None
|
||||
}
|
||||
if "-" in content[0] and content[2] == 0:
|
||||
if content["key_remote_jid"] not in data:
|
||||
data[content["key_remote_jid"]] = ChatStore()
|
||||
if content["key_remote_jid"] is None:
|
||||
continue # Not sure
|
||||
data[content["key_remote_jid"]].add_message(content["_id"], Message(
|
||||
from_me=content["key_from_me"],
|
||||
timestamp=content["timestamp"],
|
||||
time=content["timestamp"],
|
||||
key_id=content["key_id"],
|
||||
))
|
||||
if "-" in content["key_remote_jid"] and content["key_from_me"] == 0:
|
||||
name = None
|
||||
if content[8] in data:
|
||||
name = data[content[8]]["name"]
|
||||
if "@" in content[8]:
|
||||
fallback = content[8].split('@')[0]
|
||||
if table_message:
|
||||
if content["chat_subject"] is not None:
|
||||
_jid = content["group_sender_jid"]
|
||||
else:
|
||||
_jid = content["key_remote_jid"]
|
||||
if _jid in data:
|
||||
name = data[_jid].name
|
||||
fallback = _jid.split('@')[0] if "@" in _jid else None
|
||||
else:
|
||||
fallback = None
|
||||
else:
|
||||
fallback = None
|
||||
|
||||
data[content[0]]["messages"][content[1]]["sender"] = name or fallback
|
||||
else:
|
||||
data[content[0]]["messages"][content[1]]["sender"] = None
|
||||
|
||||
if content[12] is not None:
|
||||
data[content[0]]["messages"][content[1]]["reply"] = content[12]
|
||||
data[content[0]]["messages"][content[1]]["quoted_data"] = content[14]
|
||||
else:
|
||||
data[content[0]]["messages"][content[1]]["reply"] = None
|
||||
|
||||
if content[15] is not None:
|
||||
data[content[0]]["messages"][content[1]]["caption"] = content[15]
|
||||
else:
|
||||
data[content[0]]["messages"][content[1]]["caption"] = None
|
||||
|
||||
if content[5] == 6:
|
||||
if "-" in content[0]:
|
||||
# Is Group
|
||||
if content[4] is not None:
|
||||
try:
|
||||
int(content[4])
|
||||
except ValueError:
|
||||
msg = f"The group name changed to {content[4]}"
|
||||
data[content[0]]["messages"][content[1]]["data"] = msg
|
||||
data[content[0]]["messages"][content[1]]["meta"] = True
|
||||
if content["remote_resource"] in data:
|
||||
name = data[content["remote_resource"]].name
|
||||
if "@" in content["remote_resource"]:
|
||||
fallback = content["remote_resource"].split('@')[0]
|
||||
else:
|
||||
del data[content[0]]["messages"][content[1]]
|
||||
fallback = None
|
||||
else:
|
||||
thumb_image = content[7]
|
||||
fallback = None
|
||||
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].sender = name or fallback
|
||||
else:
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].sender = None
|
||||
|
||||
if content["quoted"] is not None:
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].reply = content["quoted"]
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].quoted_data = content["quoted_data"]
|
||||
else:
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].reply = None
|
||||
|
||||
if not table_message and content["media_caption"] is not None:
|
||||
# Old schema
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].caption = content["media_caption"]
|
||||
elif table_message and content["message_type"] == 1 and content["data"] is not None:
|
||||
# New schema
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].caption = content["data"]
|
||||
else:
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].caption = None
|
||||
|
||||
if content["status"] == 6: # 6 = Metadata, otherwise it's a message
|
||||
if (not table_message and "-" in content["key_remote_jid"]) or \
|
||||
(table_message and content["chat_subject"] is not None):
|
||||
# Is Group
|
||||
if content["data"] is not None:
|
||||
try:
|
||||
int(content["data"])
|
||||
except ValueError:
|
||||
msg = f"The group name changed to {content['data']}"
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].data = msg
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
|
||||
else:
|
||||
data[content["key_remote_jid"]].delete_message(content["_id"])
|
||||
else:
|
||||
thumb_image = content["thumb_image"]
|
||||
if thumb_image is not None:
|
||||
if b"\x00\x00\x01\x74\x00\x1A" in thumb_image:
|
||||
# Add user
|
||||
added = phone_number_re.search(
|
||||
thumb_image.decode("unicode_escape"))[0]
|
||||
if added in data:
|
||||
name_right = data[added]["name"]
|
||||
name_right = data[added].name
|
||||
else:
|
||||
name_right = added.split('@')[0]
|
||||
if content[8] is not None:
|
||||
if content[8] in data:
|
||||
name_left = data[content[8]]["name"]
|
||||
if content["remote_resource"] is not None:
|
||||
if content["remote_resource"] in data:
|
||||
name_left = data[content["remote_resource"]].name
|
||||
else:
|
||||
name_left = content[8].split('@')[0]
|
||||
name_left = content["remote_resource"].split('@')[0]
|
||||
msg = f"{name_left} added {name_right or 'You'}"
|
||||
else:
|
||||
msg = f"Added {name_right or 'You'}"
|
||||
elif b"\xac\xed\x00\x05\x74\x00" in thumb_image:
|
||||
# Changed number
|
||||
original = content[8].split('@')[0]
|
||||
original = content["remote_resource"].split('@')[0]
|
||||
changed = thumb_image[7:].decode().split('@')[0]
|
||||
msg = f"{original} changed to {changed}"
|
||||
data[content[0]]["messages"][content[1]]["data"] = msg
|
||||
data[content[0]]["messages"][content[1]]["meta"] = True
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].data = msg
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
|
||||
else:
|
||||
if content[4] is None:
|
||||
del data[content[0]]["messages"][content[1]]
|
||||
if content["data"] is None:
|
||||
data[content["key_remote_jid"]].delete_message(content["_id"])
|
||||
else:
|
||||
# Private chat
|
||||
if content[4] is None and content[7] is None:
|
||||
del data[content[0]]["messages"][content[1]]
|
||||
if content["data"] is None and content["thumb_image"] is None:
|
||||
data[content["key_remote_jid"]].delete_message(content["_id"])
|
||||
|
||||
else:
|
||||
if content[2] == 1:
|
||||
if content[5] == 5 and content[6] == 7:
|
||||
if content["key_from_me"] == 1:
|
||||
if content["status"] == 5 and content["edit_version"] == 7 or table_message and content["message_type"] == 15:
|
||||
msg = "Message deleted"
|
||||
data[content[0]]["messages"][content[1]]["meta"] = True
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
|
||||
else:
|
||||
if content[9] == "5":
|
||||
msg = f"Location shared: {content[10], content[11]}"
|
||||
data[content[0]]["messages"][content[1]]["meta"] = True
|
||||
if content["media_wa_type"] == "5":
|
||||
msg = f"Location shared: {content['latitude'], content['longitude']}"
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
|
||||
else:
|
||||
msg = content[4]
|
||||
msg = content["data"]
|
||||
if msg is not None:
|
||||
if "\r\n" in msg:
|
||||
msg = msg.replace("\r\n", "<br>")
|
||||
if "\n" in msg:
|
||||
msg = msg.replace("\n", "<br>")
|
||||
else:
|
||||
if content[5] == 0 and content[6] == 7:
|
||||
if content["status"] == 0 and content["edit_version"] == 7 or table_message and content["message_type"] == 15:
|
||||
msg = "Message deleted"
|
||||
data[content[0]]["messages"][content[1]]["meta"] = True
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
|
||||
else:
|
||||
if content[9] == "5":
|
||||
msg = f"Location shared: {content[10], content[11]}"
|
||||
data[content[0]]["messages"][content[1]]["meta"] = True
|
||||
if content["media_wa_type"] == "5":
|
||||
msg = f"Location shared: {content['latitude'], content['longitude']}"
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
|
||||
else:
|
||||
msg = content[4]
|
||||
msg = content["data"]
|
||||
if msg is not None:
|
||||
if "\r\n" in msg:
|
||||
msg = msg.replace("\r\n", "<br>")
|
||||
if "\n" in msg:
|
||||
msg = msg.replace("\n", "<br>")
|
||||
|
||||
data[content[0]]["messages"][content[1]]["data"] = msg
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].data = msg
|
||||
|
||||
i += 1
|
||||
if i % 1000 == 0:
|
||||
@@ -350,7 +388,8 @@ def media(db, data, media_folder):
|
||||
total_row_number = c.fetchone()[0]
|
||||
print(f"\nGathering media...(0/{total_row_number})", end="\r")
|
||||
i = 0
|
||||
c.execute("""SELECT messages.key_remote_jid,
|
||||
try:
|
||||
c.execute("""SELECT messages.key_remote_jid,
|
||||
message_row_id,
|
||||
file_path,
|
||||
message_url,
|
||||
@@ -359,22 +398,39 @@ def media(db, data, media_folder):
|
||||
FROM message_media
|
||||
INNER JOIN messages
|
||||
ON message_media.message_row_id = messages._id
|
||||
ORDER BY messages.key_remote_jid ASC""")
|
||||
ORDER BY messages.key_remote_jid ASC"""
|
||||
)
|
||||
except sqlite3.OperationalError:
|
||||
c.execute("""SELECT jid.raw_string as key_remote_jid,
|
||||
message_row_id,
|
||||
file_path,
|
||||
message_url,
|
||||
mime_type,
|
||||
media_key
|
||||
FROM message_media
|
||||
INNER JOIN message
|
||||
ON message_media.message_row_id = message._id
|
||||
LEFT JOIN chat
|
||||
ON chat._id = message.chat_row_id
|
||||
INNER JOIN jid
|
||||
ON jid._id = chat.jid_row_id
|
||||
ORDER BY jid.raw_string ASC"""
|
||||
)
|
||||
content = c.fetchone()
|
||||
mime = MimeTypes()
|
||||
while content is not None:
|
||||
file_path = f"{media_folder}/{content[2]}"
|
||||
data[content[0]]["messages"][content[1]]["media"] = True
|
||||
file_path = f"{media_folder}/{content['file_path']}"
|
||||
data[content["key_remote_jid"]].messages[content["message_row_id"]].media = True
|
||||
if os.path.isfile(file_path):
|
||||
data[content[0]]["messages"][content[1]]["data"] = file_path
|
||||
if content[4] is None:
|
||||
data[content["key_remote_jid"]].messages[content["message_row_id"]].data = file_path
|
||||
if content["mime_type"] is None:
|
||||
guess = mime.guess_type(file_path)[0]
|
||||
if guess is not None:
|
||||
data[content[0]]["messages"][content[1]]["mime"] = guess
|
||||
data[content["key_remote_jid"]].messages[content["message_row_id"]].mime = guess
|
||||
else:
|
||||
data[content[0]]["messages"][content[1]]["mime"] = "data/data"
|
||||
data[content["key_remote_jid"]].messages[content["message_row_id"]].mime = "data/data"
|
||||
else:
|
||||
data[content[0]]["messages"][content[1]]["mime"] = content[4]
|
||||
data[content["key_remote_jid"]].messages[content["message_row_id"]].mime = content["mime_type"]
|
||||
else:
|
||||
# if "https://mmg" in content[4]:
|
||||
# try:
|
||||
@@ -386,9 +442,9 @@ def media(db, data, media_folder):
|
||||
# data[content[0]]["messages"][content[1]]["media"] = True
|
||||
# data[content[0]]["messages"][content[1]]["mime"] = "media"
|
||||
# else:
|
||||
data[content[0]]["messages"][content[1]]["data"] = "The media is missing"
|
||||
data[content[0]]["messages"][content[1]]["mime"] = "media"
|
||||
data[content[0]]["messages"][content[1]]["meta"] = True
|
||||
data[content["key_remote_jid"]].messages[content["message_row_id"]].data = "The media is missing"
|
||||
data[content["key_remote_jid"]].messages[content["message_row_id"]].mime = "media"
|
||||
data[content["key_remote_jid"]].messages[content["message_row_id"]].meta = True
|
||||
i += 1
|
||||
if i % 100 == 0:
|
||||
print(f"Gathering media...({i}/{total_row_number})", end="\r")
|
||||
@@ -399,14 +455,31 @@ def media(db, data, media_folder):
|
||||
|
||||
def vcard(db, data):
|
||||
c = db.cursor()
|
||||
c.execute("""SELECT message_row_id,
|
||||
try:
|
||||
c.execute("""SELECT message_row_id,
|
||||
messages.key_remote_jid,
|
||||
vcard,
|
||||
messages.media_name
|
||||
FROM messages_vcards
|
||||
INNER JOIN messages
|
||||
ON messages_vcards.message_row_id = messages._id
|
||||
ORDER BY messages.key_remote_jid ASC;""")
|
||||
ORDER BY messages.key_remote_jid ASC;"""
|
||||
)
|
||||
except sqlite3.OperationalError:
|
||||
c.execute("""SELECT message_row_id,
|
||||
jid.raw_string as key_remote_jid,
|
||||
vcard,
|
||||
message.text_data as media_name
|
||||
FROM message_vcard
|
||||
INNER JOIN message
|
||||
ON message_vcard.message_row_id = message._id
|
||||
LEFT JOIN chat
|
||||
ON chat._id = message.chat_row_id
|
||||
INNER JOIN jid
|
||||
ON jid._id = chat.jid_row_id
|
||||
ORDER BY message.chat_row_id ASC;"""
|
||||
)
|
||||
|
||||
rows = c.fetchall()
|
||||
total_row_number = len(rows)
|
||||
print(f"\nGathering vCards...(0/{total_row_number})", end="\r")
|
||||
@@ -414,21 +487,28 @@ def vcard(db, data):
|
||||
if not os.path.isdir(base):
|
||||
Path(base).mkdir(parents=True, exist_ok=True)
|
||||
for index, row in enumerate(rows):
|
||||
media_name = row[3] if row[3] else ""
|
||||
media_name = row["media_name"] if row["media_name"] is not None else ""
|
||||
file_name = "".join(x for x in media_name if x.isalnum())
|
||||
file_path = f"{base}/{file_name}.vcf"
|
||||
if not os.path.isfile(file_path):
|
||||
with open(file_path, "w", encoding="utf-8") as f:
|
||||
f.write(row[2])
|
||||
data[row[1]]["messages"][row[0]]["data"] = media_name + \
|
||||
f.write(row["vcard"])
|
||||
data[row["key_remote_jid"]].messages[row["message_row_id"]].data = media_name + \
|
||||
"The vCard file cannot be displayed here, " \
|
||||
f"however it should be located at {file_path}"
|
||||
data[row[1]]["messages"][row[0]]["mime"] = "text/x-vcard"
|
||||
data[row[1]]["messages"][row[0]]["meta"] = True
|
||||
data[row["key_remote_jid"]].messages[row["message_row_id"]].mime = "text/x-vcard"
|
||||
data[row["key_remote_jid"]].messages[row["message_row_id"]].meta = True
|
||||
print(f"Gathering vCards...({index + 1}/{total_row_number})", end="\r")
|
||||
|
||||
|
||||
def create_html(data, output_folder, template=None, embedded=False):
|
||||
def create_html(
|
||||
data,
|
||||
output_folder,
|
||||
template=None,
|
||||
embedded=False,
|
||||
offline_static=False,
|
||||
maximum_size=None
|
||||
):
|
||||
if template is None:
|
||||
template_dir = os.path.dirname(__file__)
|
||||
template_file = "whatsapp.html"
|
||||
@@ -447,8 +527,21 @@ def create_html(data, output_folder, template=None, embedded=False):
|
||||
if not os.path.isdir(output_folder):
|
||||
os.mkdir(output_folder)
|
||||
|
||||
w3css = "https://www.w3schools.com/w3css/4/w3.css"
|
||||
if offline_static:
|
||||
import urllib.request
|
||||
static_folder = os.path.join(output_folder, offline_static)
|
||||
if not os.path.isdir(static_folder):
|
||||
os.mkdir(static_folder)
|
||||
w3css_path = os.path.join(static_folder, "w3.css")
|
||||
if not os.path.isfile(w3css_path):
|
||||
with urllib.request.urlopen(w3css) as resp:
|
||||
with open(w3css_path, "wb") as f:
|
||||
f.write(resp.read())
|
||||
w3css = os.path.join(offline_static, "w3.css")
|
||||
|
||||
for current, contact in enumerate(data):
|
||||
if len(data[contact]["messages"]) == 0:
|
||||
if len(data[contact].messages) == 0:
|
||||
continue
|
||||
phone_number = contact.split('@')[0]
|
||||
if "-" in contact:
|
||||
@@ -456,11 +549,11 @@ def create_html(data, output_folder, template=None, embedded=False):
|
||||
else:
|
||||
file_name = phone_number
|
||||
|
||||
if data[contact]["name"] is not None:
|
||||
if data[contact].name is not None:
|
||||
if file_name != "":
|
||||
file_name += "-"
|
||||
file_name += data[contact]["name"].replace("/", "-")
|
||||
name = data[contact]["name"]
|
||||
file_name += data[contact].name.replace("/", "-")
|
||||
name = data[contact].name
|
||||
else:
|
||||
name = phone_number
|
||||
safe_file_name = ''
|
||||
@@ -469,9 +562,10 @@ def create_html(data, output_folder, template=None, embedded=False):
|
||||
f.write(
|
||||
template.render(
|
||||
name=name,
|
||||
msgs=data[contact]["messages"].values(),
|
||||
msgs=data[contact].messages.values(),
|
||||
my_avatar=None,
|
||||
their_avatar=f"WhatsApp/Avatars/{contact}.j"
|
||||
their_avatar=f"WhatsApp/Avatars/{contact}.j",
|
||||
w3css=w3css
|
||||
)
|
||||
)
|
||||
if current % 10 == 0:
|
||||
|
||||
@@ -6,25 +6,9 @@ import jinja2
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from bleach import clean as sanitize
|
||||
from markupsafe import Markup
|
||||
from datetime import datetime
|
||||
from mimetypes import MimeTypes
|
||||
|
||||
APPLE_TIME = datetime.timestamp(datetime(2001, 1, 1))
|
||||
|
||||
|
||||
def sanitize_except(html):
|
||||
return Markup(sanitize(html, tags=["br"]))
|
||||
|
||||
|
||||
def determine_day(last, current):
|
||||
last = datetime.fromtimestamp(last).date()
|
||||
current = datetime.fromtimestamp(current).date()
|
||||
if last == current:
|
||||
return None
|
||||
else:
|
||||
return current
|
||||
from Whatsapp_Chat_Exporter.utility import sanitize_except, determine_day, APPLE_TIME
|
||||
|
||||
|
||||
def messages(db, data):
|
||||
@@ -228,7 +212,7 @@ def vcard(db, data):
|
||||
print(f"Gathering vCards...({index + 1}/{total_row_number})", end="\r")
|
||||
|
||||
|
||||
def create_html(data, output_folder, template=None, embedded=False):
|
||||
def create_html(data, output_folder, template=None, embedded=False, offline_static=False):
|
||||
if template is None:
|
||||
template_dir = os.path.dirname(__file__)
|
||||
template_file = "whatsapp.html"
|
||||
@@ -247,6 +231,18 @@ def create_html(data, output_folder, template=None, embedded=False):
|
||||
if not os.path.isdir(output_folder):
|
||||
os.mkdir(output_folder)
|
||||
|
||||
w3css = "https://www.w3schools.com/w3css/4/w3.css"
|
||||
if offline_static:
|
||||
import urllib.request
|
||||
static_folder = os.path.join(output_folder, offline_static)
|
||||
if not os.path.isdir(static_folder):
|
||||
os.mkdir(static_folder)
|
||||
w3css_path = os.path.join(static_folder, "w3.css")
|
||||
if not os.path.isfile(w3css_path):
|
||||
with urllib.request.urlopen(w3css) as resp:
|
||||
with open(w3css_path, "wb") as f: f.write(resp.read())
|
||||
w3css = os.path.join(offline_static, "w3.css")
|
||||
|
||||
for current, contact in enumerate(data):
|
||||
if len(data[contact]["messages"]) == 0:
|
||||
continue
|
||||
@@ -272,7 +268,8 @@ def create_html(data, output_folder, template=None, embedded=False):
|
||||
name=name,
|
||||
msgs=data[contact]["messages"].values(),
|
||||
my_avatar=None,
|
||||
their_avatar=f"WhatsApp/Avatars/{contact}.j"
|
||||
their_avatar=f"WhatsApp/Avatars/{contact}.j",
|
||||
w3css=w3css
|
||||
)
|
||||
)
|
||||
if current % 10 == 0:
|
||||
|
||||
588
Whatsapp_Chat_Exporter/extract_new.py
Normal file
588
Whatsapp_Chat_Exporter/extract_new.py
Normal file
@@ -0,0 +1,588 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import sqlite3
|
||||
import json
|
||||
import jinja2
|
||||
import os
|
||||
import shutil
|
||||
import re
|
||||
import io
|
||||
import hmac
|
||||
from pathlib import Path
|
||||
from bleach import clean as sanitize
|
||||
from markupsafe import Markup
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from mimetypes import MimeTypes
|
||||
from hashlib import sha256
|
||||
|
||||
from Whatsapp_Chat_Exporter.data_model import ChatStore, Message
|
||||
|
||||
try:
|
||||
import zlib
|
||||
from Crypto.Cipher import AES
|
||||
except ModuleNotFoundError:
|
||||
support_backup = False
|
||||
else:
|
||||
support_backup = True
|
||||
try:
|
||||
import javaobj
|
||||
except ModuleNotFoundError:
|
||||
support_crypt15 = False
|
||||
else:
|
||||
support_crypt15 = True
|
||||
|
||||
|
||||
def sanitize_except(html):
|
||||
return Markup(sanitize(html, tags=["br"]))
|
||||
|
||||
|
||||
def determine_day(last, current):
|
||||
last = datetime.fromtimestamp(last).date()
|
||||
current = datetime.fromtimestamp(current).date()
|
||||
if last == current:
|
||||
return None
|
||||
else:
|
||||
return current
|
||||
|
||||
|
||||
CRYPT14_OFFSETS = (
|
||||
{"iv": 67, "db": 191},
|
||||
{"iv": 67, "db": 190},
|
||||
{"iv": 66, "db": 99},
|
||||
{"iv": 67, "db": 193}
|
||||
)
|
||||
|
||||
|
||||
class Crypt(Enum):
|
||||
CRYPT15 = 15
|
||||
CRYPT14 = 14
|
||||
CRYPT12 = 12
|
||||
|
||||
|
||||
def brute_force_offset():
|
||||
for iv in range(0, 200):
|
||||
for db in range(0, 200):
|
||||
yield iv, iv + 16, db
|
||||
|
||||
|
||||
def _generate_hmac_of_hmac(key_stream):
|
||||
key = hmac.new(
|
||||
hmac.new(
|
||||
b'\x00' * 32,
|
||||
key_stream,
|
||||
sha256
|
||||
).digest(),
|
||||
b"backup encryption\x01",
|
||||
sha256
|
||||
)
|
||||
return key.digest(), key_stream
|
||||
|
||||
|
||||
def _extract_encrypted_key(keyfile):
|
||||
key_stream = b""
|
||||
for byte in javaobj.loads(keyfile):
|
||||
key_stream += byte.to_bytes(1, "big", signed=True)
|
||||
|
||||
return _generate_hmac_of_hmac(key_stream)
|
||||
|
||||
|
||||
def decrypt_backup(database, key, output, crypt=Crypt.CRYPT14, show_crypt15=False):
|
||||
if not support_backup:
|
||||
return 1
|
||||
if isinstance(key, io.IOBase):
|
||||
key = key.read()
|
||||
if crypt is not Crypt.CRYPT15:
|
||||
t1 = key[30:62]
|
||||
if crypt is not Crypt.CRYPT15 and len(key) != 158:
|
||||
raise ValueError("The key file must be 158 bytes")
|
||||
if crypt == Crypt.CRYPT14:
|
||||
if len(database) < 191:
|
||||
raise ValueError("The crypt14 file must be at least 191 bytes")
|
||||
current_try = 0
|
||||
offsets = CRYPT14_OFFSETS[current_try]
|
||||
t2 = database[15:47]
|
||||
iv = database[offsets["iv"]:offsets["iv"] + 16]
|
||||
db_ciphertext = database[offsets["db"]:]
|
||||
elif crypt == Crypt.CRYPT12:
|
||||
if len(database) < 67:
|
||||
raise ValueError("The crypt12 file must be at least 67 bytes")
|
||||
t2 = database[3:35]
|
||||
iv = database[51:67]
|
||||
db_ciphertext = database[67:-20]
|
||||
elif crypt == Crypt.CRYPT15:
|
||||
if not support_crypt15:
|
||||
return 1
|
||||
if len(database) < 131:
|
||||
raise ValueError("The crypt15 file must be at least 131 bytes")
|
||||
t1 = t2 = None
|
||||
iv = database[8:24]
|
||||
db_offset = database[0] + 2 # Skip protobuf + protobuf size and backup type
|
||||
db_ciphertext = database[db_offset:]
|
||||
|
||||
if t1 != t2:
|
||||
raise ValueError("The signature of key file and backup file mismatch")
|
||||
|
||||
if crypt == Crypt.CRYPT15:
|
||||
if len(key) == 32:
|
||||
main_key, hex_key = _generate_hmac_of_hmac(key)
|
||||
else:
|
||||
main_key, hex_key = _extract_encrypted_key(key)
|
||||
if show_crypt15:
|
||||
hex_key = [hex_key.hex()[c:c+4] for c in range(0, len(hex_key.hex()), 4)]
|
||||
print("The HEX key of the crypt15 backup is: " + ' '.join(hex_key))
|
||||
else:
|
||||
main_key = key[126:]
|
||||
decompressed = False
|
||||
while not decompressed:
|
||||
cipher = AES.new(main_key, AES.MODE_GCM, iv)
|
||||
db_compressed = cipher.decrypt(db_ciphertext)
|
||||
try:
|
||||
db = zlib.decompress(db_compressed)
|
||||
except zlib.error:
|
||||
if crypt == Crypt.CRYPT14:
|
||||
current_try += 1
|
||||
if current_try < len(CRYPT14_OFFSETS):
|
||||
offsets = CRYPT14_OFFSETS[current_try]
|
||||
iv = database[offsets["iv"]:offsets["iv"] + 16]
|
||||
db_ciphertext = database[offsets["db"]:]
|
||||
continue
|
||||
else:
|
||||
print("Common offsets are not applicable to "
|
||||
"your backup. Trying to brute force it...")
|
||||
for start_iv, end_iv, start_db in brute_force_offset():
|
||||
iv = database[start_iv:end_iv]
|
||||
db_ciphertext = database[start_db:]
|
||||
cipher = AES.new(main_key, AES.MODE_GCM, iv)
|
||||
db_compressed = cipher.decrypt(db_ciphertext)
|
||||
try:
|
||||
db = zlib.decompress(db_compressed)
|
||||
except zlib.error:
|
||||
continue
|
||||
else:
|
||||
decompressed = True
|
||||
print(
|
||||
f"The offsets of your IV and database are {start_iv} and "
|
||||
f"{start_db}, respectively. To include your offsets in the "
|
||||
"program, please report it by creating an issue on GitHub: "
|
||||
"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/issues/new"
|
||||
)
|
||||
break
|
||||
if not decompressed:
|
||||
return 2
|
||||
else:
|
||||
return 3
|
||||
else:
|
||||
decompressed = True
|
||||
if db[0:6].upper() == b"SQLITE":
|
||||
with open(output, "wb") as f:
|
||||
f.write(db)
|
||||
return 0
|
||||
else:
|
||||
raise ValueError("The plaintext is not a SQLite database. Did you use the key to encrypt something...")
|
||||
|
||||
|
||||
def contacts(db, data):
|
||||
# Get contacts
|
||||
c = db.cursor()
|
||||
c.execute("""SELECT count() FROM wa_contacts""")
|
||||
total_row_number = c.fetchone()[0]
|
||||
print(f"Gathering contacts...({total_row_number})")
|
||||
|
||||
c.execute("""SELECT jid, display_name FROM wa_contacts; """)
|
||||
row = c.fetchone()
|
||||
while row is not None:
|
||||
data[row["jid"]] = ChatStore(row["display_name"])
|
||||
row = c.fetchone()
|
||||
|
||||
|
||||
def messages(db, data):
|
||||
# Get message history
|
||||
c = db.cursor()
|
||||
c.execute("""SELECT count() FROM message""")
|
||||
total_row_number = c.fetchone()[0]
|
||||
print(f"Gathering messages...(0/{total_row_number})", end="\r")
|
||||
|
||||
phone_number_re = re.compile(r"[0-9]+@s.whatsapp.net")
|
||||
c.execute("""SELECT jid_global.raw_string as key_remote_jid,
|
||||
message._id,
|
||||
message.from_me as key_from_me,
|
||||
message.timestamp,
|
||||
message.text_data as data,
|
||||
message.status,
|
||||
message_future.version as edit_version,
|
||||
message_thumbnail.thumbnail as thumb_image,
|
||||
message_media.file_path as remote_resource,
|
||||
message_media.mime_type as media_wa_type,
|
||||
message_location.latitude,
|
||||
message_location.longitude,
|
||||
message_quoted.key_id as quoted,
|
||||
message.key_id,
|
||||
message_quoted.text_data as quoted_data,
|
||||
message.message_type,
|
||||
jid_group.raw_string as group_sender_jid,
|
||||
chat.subject as chat_subject
|
||||
FROM message
|
||||
LEFT JOIN message_quoted
|
||||
ON message_quoted.message_row_id = message._id
|
||||
LEFT JOIN message_location
|
||||
ON message_location.message_row_id = message._id
|
||||
LEFT JOIN message_media
|
||||
ON message_media.message_row_id = message._id
|
||||
LEFT JOIN message_thumbnail
|
||||
ON message_thumbnail.message_row_id = message._id
|
||||
LEFT JOIN message_future
|
||||
ON message_future.message_row_id = message._id
|
||||
LEFT JOIN chat
|
||||
ON chat._id = message.chat_row_id
|
||||
INNER JOIN jid jid_global
|
||||
ON jid_global._id = chat.jid_row_id
|
||||
LEFT JOIN jid jid_group
|
||||
ON jid_group._id = message.sender_jid_row_id
|
||||
WHERE key_remote_jid <> '-1';""")
|
||||
i = 0
|
||||
content = c.fetchone()
|
||||
while content is not None:
|
||||
if content["key_remote_jid"] not in data:
|
||||
data[content["key_remote_jid"]] = ChatStore()
|
||||
if content["key_remote_jid"] is None:
|
||||
continue
|
||||
data[content["key_remote_jid"]].add_message(content["_id"], Message(
|
||||
from_me=content["key_from_me"],
|
||||
timestamp=content["timestamp"],
|
||||
time=content["timestamp"],
|
||||
key_id=content["key_id"],
|
||||
))
|
||||
if "-" in content["key_remote_jid"] and content["key_from_me"] == 0:
|
||||
name = None
|
||||
if content["chat_subject"] is not None:
|
||||
_jid = content["group_sender_jid"]
|
||||
else:
|
||||
_jid = content["key_remote_jid"]
|
||||
if _jid in data:
|
||||
name = data[_jid].name
|
||||
fallback = _jid.split('@')[0] if "@" in _jid else None
|
||||
else:
|
||||
fallback = None
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].sender = name or fallback
|
||||
else:
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].sender = None
|
||||
|
||||
if content["quoted"] is not None:
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].reply = content["quoted"]
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].quoted_data = content["quoted_data"]
|
||||
else:
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].reply = None
|
||||
|
||||
if content["message_type"] == 1:
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].caption = content["data"]
|
||||
else:
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].caption = None
|
||||
|
||||
if content["status"] == 6:
|
||||
if content["chat_subject"] is not None:
|
||||
# Is Group
|
||||
if content["data"] is not None:
|
||||
try:
|
||||
int(content["data"])
|
||||
except ValueError:
|
||||
msg = f"The group name changed to {content['data']}"
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].data = msg
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
|
||||
else:
|
||||
data[content["key_remote_jid"]].delete_message(content["_id"])
|
||||
else:
|
||||
thumb_image = content["thumb_image"]
|
||||
if thumb_image is not None:
|
||||
if b"\x00\x00\x01\x74\x00\x1A" in thumb_image:
|
||||
# Add user
|
||||
added = phone_number_re.search(
|
||||
thumb_image.decode("unicode_escape"))[0]
|
||||
if added in data:
|
||||
name_right = data[added]["name"]
|
||||
else:
|
||||
name_right = added.split('@')[0]
|
||||
if content["remote_resource"] is not None:
|
||||
if content["remote_resource"] in data:
|
||||
name_left = data[content["remote_resource"]]["name"]
|
||||
else:
|
||||
name_left = content["remote_resource"].split('@')[0]
|
||||
msg = f"{name_left} added {name_right or 'You'}"
|
||||
else:
|
||||
msg = f"Added {name_right or 'You'}"
|
||||
elif b"\xac\xed\x00\x05\x74\x00" in thumb_image:
|
||||
# Changed number
|
||||
original = content["remote_resource"].split('@')[0]
|
||||
changed = thumb_image[7:].decode().split('@')[0]
|
||||
msg = f"{original} changed to {changed}"
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].data = msg
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
|
||||
else:
|
||||
if content["data"] is None:
|
||||
data[content["key_remote_jid"]].delete_message(content["_id"])
|
||||
else:
|
||||
# Private chat
|
||||
if content["data"] is None and content["thumb_image"] is None:
|
||||
data[content["key_remote_jid"]].delete_message(content["_id"])
|
||||
|
||||
else:
|
||||
if content["key_from_me"] == 1:
|
||||
if content["status"] == 5 and content["edit_version"] == 7:
|
||||
msg = "Message deleted"
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
|
||||
else:
|
||||
if content["media_wa_type"] == "5":
|
||||
msg = f"Location shared: {content[10], content[11]}"
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
|
||||
else:
|
||||
msg = content["data"]
|
||||
if msg is not None:
|
||||
if "\r\n" in msg:
|
||||
msg = msg.replace("\r\n", "<br>")
|
||||
if "\n" in msg:
|
||||
msg = msg.replace("\n", "<br>")
|
||||
else:
|
||||
if content["status"] == 0 and content["edit_version"] == 7:
|
||||
msg = "Message deleted"
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
|
||||
else:
|
||||
if content["media_wa_type"] == "5":
|
||||
msg = f"Location shared: {content[10], content[11]}"
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
|
||||
else:
|
||||
msg = content["data"]
|
||||
if msg is not None:
|
||||
if "\r\n" in msg:
|
||||
msg = msg.replace("\r\n", "<br>")
|
||||
if "\n" in msg:
|
||||
msg = msg.replace("\n", "<br>")
|
||||
|
||||
data[content["key_remote_jid"]].messages[content["_id"]].data = msg
|
||||
|
||||
i += 1
|
||||
if i % 1000 == 0:
|
||||
print(f"Gathering messages...({i}/{total_row_number})", end="\r")
|
||||
content = c.fetchone()
|
||||
print(f"Gathering messages...({total_row_number}/{total_row_number})", end="\r")
|
||||
|
||||
|
||||
def media(db, data, media_folder):
|
||||
# Get media
|
||||
c = db.cursor()
|
||||
c.execute("""SELECT count() FROM message_media""")
|
||||
total_row_number = c.fetchone()[0]
|
||||
print(f"\nGathering media...(0/{total_row_number})", end="\r")
|
||||
i = 0
|
||||
c.execute("""SELECT jid.raw_string,
|
||||
message_row_id,
|
||||
file_path,
|
||||
message_url,
|
||||
mime_type,
|
||||
media_key
|
||||
FROM message_media
|
||||
INNER JOIN message
|
||||
ON message_media.message_row_id = message._id
|
||||
LEFT JOIN chat
|
||||
ON chat._id = message.chat_row_id
|
||||
INNER JOIN jid
|
||||
ON jid._id = chat.jid_row_id
|
||||
ORDER BY jid.raw_string ASC""")
|
||||
content = c.fetchone()
|
||||
mime = MimeTypes()
|
||||
while content is not None:
|
||||
file_path = f"{media_folder}/{content['file_path']}"
|
||||
data[content["raw_string"]].messages[content["message_row_id"]].media = True
|
||||
if os.path.isfile(file_path):
|
||||
data[content["raw_string"]].messages[content["message_row_id"]].data = file_path
|
||||
if content["mime_type"] is None:
|
||||
guess = mime.guess_type(file_path)[0]
|
||||
if guess is not None:
|
||||
data[content["raw_string"]].messages[content["message_row_id"]].mime = guess
|
||||
else:
|
||||
data[content["raw_string"]].messages[content["message_row_id"]].mime = "data/data"
|
||||
else:
|
||||
data[content["raw_string"]].messages[content["message_row_id"]].mime = content["mime_type"]
|
||||
else:
|
||||
# if "https://mmg" in content["mime_type"]:
|
||||
# try:
|
||||
# r = requests.get(content["message_url"])
|
||||
# if r.status_code != 200:
|
||||
# raise RuntimeError()
|
||||
# except:
|
||||
# data[content["raw_string"]].messages[content["message_row_id"]].data = "{The media is missing}"
|
||||
# data[content["raw_string"]].messages[content["message_row_id"]].media = True
|
||||
# data[content["raw_string"]].messages[content["message_row_id"]].mime = "media"
|
||||
# else:
|
||||
data[content["raw_string"]].messages[content["message_row_id"]].data = "The media is missing"
|
||||
data[content["raw_string"]].messages[content["message_row_id"]].mime = "media"
|
||||
data[content["raw_string"]].messages[content["message_row_id"]].meta = True
|
||||
i += 1
|
||||
if i % 100 == 0:
|
||||
print(f"Gathering media...({i}/{total_row_number})", end="\r")
|
||||
content = c.fetchone()
|
||||
print(
|
||||
f"Gathering media...({total_row_number}/{total_row_number})", end="\r")
|
||||
|
||||
|
||||
def vcard(db, data):
|
||||
c = db.cursor()
|
||||
c.execute("""SELECT message_row_id,
|
||||
jid.raw_string,
|
||||
vcard,
|
||||
message.text_data
|
||||
FROM message_vcard
|
||||
INNER JOIN message
|
||||
ON message_vcard.message_row_id = message._id
|
||||
LEFT JOIN chat
|
||||
ON chat._id = message.chat_row_id
|
||||
INNER JOIN jid
|
||||
ON jid._id = chat.jid_row_id
|
||||
ORDER BY message.chat_row_id ASC;""")
|
||||
rows = c.fetchall()
|
||||
total_row_number = len(rows)
|
||||
print(f"\nGathering vCards...(0/{total_row_number})", end="\r")
|
||||
base = "WhatsApp/vCards"
|
||||
if not os.path.isdir(base):
|
||||
Path(base).mkdir(parents=True, exist_ok=True)
|
||||
for index, row in enumerate(rows):
|
||||
media_name = row["text_data"] if row["text_data"] else ""
|
||||
file_name = "".join(x for x in media_name if x.isalnum())
|
||||
file_path = f"{base}/{file_name}.vcf"
|
||||
if not os.path.isfile(file_path):
|
||||
with open(file_path, "w", encoding="utf-8") as f:
|
||||
f.write(row["vcard"])
|
||||
data[row["raw_string"]].messages[row["message_row_id"]].data = media_name + \
|
||||
"The vCard file cannot be displayed here, " \
|
||||
f"however it should be located at {file_path}"
|
||||
data[row["raw_string"]].messages[row["message_row_id"]].mime = "text/x-vcard"
|
||||
data[row["raw_string"]].messages[row["message_row_id"]].meta = True
|
||||
print(f"Gathering vCards...({index + 1}/{total_row_number})", end="\r")
|
||||
|
||||
|
||||
def create_html(
|
||||
data,
|
||||
output_folder,
|
||||
template=None,
|
||||
embedded=False,
|
||||
offline_static=False,
|
||||
maximum_size=None
|
||||
):
|
||||
if template is None:
|
||||
template_dir = os.path.dirname(__file__)
|
||||
template_file = "whatsapp.html"
|
||||
else:
|
||||
template_dir = os.path.dirname(template)
|
||||
template_file = os.path.basename(template)
|
||||
templateLoader = jinja2.FileSystemLoader(searchpath=template_dir)
|
||||
templateEnv = jinja2.Environment(loader=templateLoader)
|
||||
templateEnv.globals.update(determine_day=determine_day)
|
||||
templateEnv.filters['sanitize_except'] = sanitize_except
|
||||
template = templateEnv.get_template(template_file)
|
||||
|
||||
total_row_number = len(data)
|
||||
print(f"\nCreating HTML...(0/{total_row_number})", end="\r")
|
||||
|
||||
if not os.path.isdir(output_folder):
|
||||
os.mkdir(output_folder)
|
||||
|
||||
w3css = "https://www.w3schools.com/w3css/4/w3.css"
|
||||
if offline_static:
|
||||
import urllib.request
|
||||
static_folder = os.path.join(output_folder, offline_static)
|
||||
if not os.path.isdir(static_folder):
|
||||
os.mkdir(static_folder)
|
||||
w3css_path = os.path.join(static_folder, "w3.css")
|
||||
if not os.path.isfile(w3css_path):
|
||||
with urllib.request.urlopen(w3css) as resp:
|
||||
with open(w3css_path, "wb") as f: f.write(resp.read())
|
||||
w3css = os.path.join(offline_static, "w3.css")
|
||||
|
||||
for current, contact in enumerate(data):
|
||||
if len(data[contact].messages) == 0:
|
||||
continue
|
||||
phone_number = contact.split('@')[0]
|
||||
if "-" in contact:
|
||||
file_name = ""
|
||||
else:
|
||||
file_name = phone_number
|
||||
|
||||
if data[contact].name is not None:
|
||||
if file_name != "":
|
||||
file_name += "-"
|
||||
file_name += data[contact].name.replace("/", "-")
|
||||
name = data[contact].name
|
||||
else:
|
||||
name = phone_number
|
||||
|
||||
safe_file_name = "".join(x for x in file_name if x.isalnum() or x in "- ")
|
||||
with open(f"{output_folder}/{safe_file_name}.html", "w", encoding="utf-8") as f:
|
||||
f.write(
|
||||
template.render(
|
||||
name=name,
|
||||
msgs=data[contact].messages.values(),
|
||||
my_avatar=None,
|
||||
their_avatar=f"WhatsApp/Avatars/{contact}.j",
|
||||
w3css=w3css
|
||||
)
|
||||
)
|
||||
if current % 10 == 0:
|
||||
print(f"Creating HTML...({current}/{total_row_number})", end="\r")
|
||||
|
||||
print(f"Creating HTML...({total_row_number}/{total_row_number})", end="\r")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from optparse import OptionParser
|
||||
parser = OptionParser()
|
||||
parser.add_option(
|
||||
"-w",
|
||||
"--wa",
|
||||
dest="wa",
|
||||
default="wa.db",
|
||||
help="Path to contact database")
|
||||
parser.add_option(
|
||||
"-m",
|
||||
"--media",
|
||||
dest="media",
|
||||
default="WhatsApp",
|
||||
help="Path to WhatsApp media folder"
|
||||
)
|
||||
# parser.add_option(
|
||||
# "-t",
|
||||
# "--template",
|
||||
# dest="html",
|
||||
# default="wa.db",
|
||||
# help="Path to HTML template")
|
||||
(options, args) = parser.parse_args()
|
||||
msg_db = "msgstore.db"
|
||||
output_folder = "temp"
|
||||
contact_db = options.wa
|
||||
media_folder = options.media
|
||||
|
||||
if len(args) == 1:
|
||||
msg_db = args[0]
|
||||
elif len(args) == 2:
|
||||
msg_db = args[0]
|
||||
output_folder = args[1]
|
||||
|
||||
data = {}
|
||||
|
||||
if os.path.isfile(contact_db):
|
||||
with sqlite3.connect(contact_db) as db:
|
||||
contacts(db, data)
|
||||
if os.path.isfile(msg_db):
|
||||
with sqlite3.connect(msg_db) as db:
|
||||
messages(db, data)
|
||||
media(db, data, media_folder)
|
||||
vcard(db, data)
|
||||
create_html(data, output_folder)
|
||||
|
||||
if not os.path.isdir(f"{output_folder}/WhatsApp"):
|
||||
shutil.move(media_folder, f"{output_folder}/")
|
||||
|
||||
with open("result.json", "w") as f:
|
||||
data = json.dumps(data)
|
||||
print(f"\nWriting JSON file...({int(len(data)/1024/1024)}MB)")
|
||||
f.write(data)
|
||||
|
||||
print("Everything is done!")
|
||||
44
Whatsapp_Chat_Exporter/utility.py
Normal file
44
Whatsapp_Chat_Exporter/utility.py
Normal file
@@ -0,0 +1,44 @@
|
||||
from bleach import clean as sanitize
|
||||
from markupsafe import Markup
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
|
||||
|
||||
def sanitize_except(html):
|
||||
return Markup(sanitize(html, tags=["br"]))
|
||||
|
||||
|
||||
def determine_day(last, current):
|
||||
last = datetime.fromtimestamp(last).date()
|
||||
current = datetime.fromtimestamp(current).date()
|
||||
if last == current:
|
||||
return None
|
||||
else:
|
||||
return current
|
||||
|
||||
|
||||
# Android Specific
|
||||
|
||||
CRYPT14_OFFSETS = (
|
||||
{"iv": 67, "db": 191},
|
||||
{"iv": 67, "db": 190},
|
||||
{"iv": 66, "db": 99},
|
||||
{"iv": 67, "db": 193}
|
||||
)
|
||||
|
||||
|
||||
class Crypt(Enum):
|
||||
CRYPT15 = 15
|
||||
CRYPT14 = 14
|
||||
CRYPT12 = 12
|
||||
|
||||
|
||||
def brute_force_offset(max_iv=200, max_db=200):
|
||||
for iv in range(0, max_iv):
|
||||
for db in range(0, max_db):
|
||||
yield iv, iv + 16, db
|
||||
|
||||
|
||||
# iOS Specific
|
||||
|
||||
APPLE_TIME = datetime.timestamp(datetime(2001, 1, 1))
|
||||
@@ -2,11 +2,10 @@
|
||||
<html>
|
||||
<head>
|
||||
<title>Whatsapp - {{ name }}</title>
|
||||
<link rel="stylesheet" href="https://www.w3schools.com/w3css/4/w3.css">
|
||||
<meta charset="UTF-8">
|
||||
<link rel="stylesheet" href="{{w3css}}">
|
||||
<style>
|
||||
@import url('https://fonts.googleapis.com/css2?family=Noto+Sans+HK:wght@300;400&display=swap');
|
||||
html {
|
||||
font-family: 'Noto Sans HK', sans-serif;
|
||||
html, body {
|
||||
font-size: 12px;
|
||||
scroll-behavior: smooth;
|
||||
}
|
||||
@@ -139,11 +138,11 @@
|
||||
{% if "image/" in msg.mime %}
|
||||
<a href="{{ msg.data }}"><img src="{{ msg.data }}" /></a>
|
||||
{% elif "audio/" in msg.mime %}
|
||||
<audio controls="controls" autobuffer="autobuffer">
|
||||
<audio controls preload="auto">
|
||||
<source src="{{ msg.data }}" />
|
||||
</audio>
|
||||
{% elif "video/" in msg.mime %}
|
||||
<video controls="controls" autobuffer="autobuffer">
|
||||
<video controls preload="auto">
|
||||
<source src="{{ msg.data }}" />
|
||||
</video>
|
||||
{% elif "/" in msg.mime %}
|
||||
|
||||
Reference in New Issue
Block a user