Merge branch 'dev'

This commit is contained in:
KnugiHK
2023-11-12 14:09:04 +08:00
8 changed files with 439 additions and 62 deletions

View File

@@ -154,6 +154,8 @@ See [issues](https://github.com/KnugiHK/Whatsapp-Chat-Exporter/issues).
# Copyright
This is a MIT licensed project.
The Telegram Desktop's export is the reference for whatsapp.html in this repo
The Telegram Desktop's export is the reference for whatsapp.html in this repo.
bplist.py was released by Vladimir "Farcaller" Pouzanov under MIT license.
WhatsApp Chat Exporter is not affiliated, associated, authorized, endorsed by, or in any way officially connected with the WhatsApp LLC, or any of its subsidiaries or its affiliates. The official WhatsApp LLC website can be found at https://www.whatsapp.com/.

View File

@@ -21,7 +21,7 @@ except ImportError:
def main():
parser = ArgumentParser(
description = 'A customizable Android and iPhone WhatsApp database parser that '
'will give you the history of your WhatsApp conversations inHTML '
'will give you the history of your WhatsApp conversations in HTML '
'and JSON. Android Backup Crypt12, Crypt14 and Crypt15 supported.',
epilog = f'WhatsApp Chat Exporter: {__version__} Licensed with MIT'
)
@@ -177,6 +177,20 @@ def main():
action='store_true',
help="Import JSON file and convert to HTML output"
)
parser.add_argument(
"--business",
dest="business",
default=False,
action='store_true',
help="Use Whatsapp Business default files (iOS only)"
)
parser.add_argument(
"--preserve-timestamp",
dest="preserve_timestamp",
default=False,
action='store_true',
help="Preserve the modification timestamp of the extracted files (iOS only)"
)
args = parser.parse_args()
# Check for updates
@@ -199,6 +213,9 @@ def main():
elif args.import_json and not os.path.isfile(args.json):
print("JSON file not found.")
exit(1)
if args.android and args.business:
print("WhatsApp Business is only available on iOS for now.")
exit(1)
data = {}
@@ -264,15 +281,19 @@ def main():
media = extract_iphone.media
vcard = extract_iphone.vcard
create_html = extract.create_html
if args.business:
from Whatsapp_Chat_Exporter.utility import WhatsAppBusinessIdentifier as identifiers
else:
from Whatsapp_Chat_Exporter.utility import WhatsAppIdentifier as identifiers
if args.media is None:
args.media = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared"
args.media = identifiers.DOMAIN
if args.backup is not None:
if not os.path.isdir(args.media):
extract_iphone_media.extract_media(args.backup)
extract_iphone_media.extract_media(args.backup, identifiers, args.preserve_timestamp)
else:
print("WhatsApp directory already exists, skipping WhatsApp file extraction.")
if args.db is None:
msg_db = "7c7fba66680ef796b916b067077cc246adacf01d"
msg_db = identifiers.MESSAGE
else:
msg_db = args.db
if args.wa is None:
@@ -290,7 +311,7 @@ def main():
db.row_factory = sqlite3.Row
messages(db, data, args.media)
media(db, data, args.media)
vcard(db, data)
vcard(db, data, args.media)
if args.android:
extract.calls(db, data)
if not args.no_html:

View File

@@ -0,0 +1,292 @@
#################################################################################
# Copyright (C) 2009-2011 Vladimir "Farcaller" Pouzanov <farcaller@gmail.com> #
# #
# Permission is hereby granted, free of charge, to any person obtaining a copy #
# of this software and associated documentation files (the "Software"), to deal #
# in the Software without restriction, including without limitation the rights #
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell #
# copies of the Software, and to permit persons to whom the Software is #
# furnished to do so, subject to the following conditions: #
# #
# The above copyright notice and this permission notice shall be included in #
# all copies or substantial portions of the Software. #
# #
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR #
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, #
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE #
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER #
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, #
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN #
# THE SOFTWARE. #
#################################################################################
import struct
import codecs
from datetime import datetime, timedelta
class BPListWriter(object):
def __init__(self, objects):
self.bplist = ""
self.objects = objects
def binary(self):
'''binary -> string
Generates bplist
'''
self.data = 'bplist00'
# TODO: flatten objects and count max length size
# TODO: write objects and save offsets
# TODO: write offsets
# TODO: write metadata
return self.data
def write(self, filename):
'''
Writes bplist to file
'''
if self.bplist != "":
pass
# TODO: save self.bplist to file
else:
raise Exception('BPlist not yet generated')
class BPListReader(object):
def __init__(self, s):
self.data = s
self.objects = []
self.resolved = {}
def __unpackIntStruct(self, sz, s):
'''__unpackIntStruct(size, string) -> int
Unpacks the integer of given size (1, 2 or 4 bytes) from string
'''
if sz == 1:
ot = '!B'
elif sz == 2:
ot = '!H'
elif sz == 4:
ot = '!I'
elif sz == 8:
ot = '!Q'
else:
raise Exception('int unpack size '+str(sz)+' unsupported')
return struct.unpack(ot, s)[0]
def __unpackInt(self, offset):
'''__unpackInt(offset) -> int
Unpacks int field from plist at given offset
'''
return self.__unpackIntMeta(offset)[1]
def __unpackIntMeta(self, offset):
'''__unpackIntMeta(offset) -> (size, int)
Unpacks int field from plist at given offset and returns its size and value
'''
obj_header = self.data[offset]
obj_type, obj_info = (obj_header & 0xF0), (obj_header & 0x0F)
int_sz = 2**obj_info
return int_sz, self.__unpackIntStruct(int_sz, self.data[offset+1:offset+1+int_sz])
def __resolveIntSize(self, obj_info, offset):
'''__resolveIntSize(obj_info, offset) -> (count, offset)
Calculates count of objref* array entries and returns count and offset to first element
'''
if obj_info == 0x0F:
ofs, obj_count = self.__unpackIntMeta(offset+1)
objref = offset+2+ofs
else:
obj_count = obj_info
objref = offset+1
return obj_count, objref
def __unpackFloatStruct(self, sz, s):
'''__unpackFloatStruct(size, string) -> float
Unpacks the float of given size (4 or 8 bytes) from string
'''
if sz == 4:
ot = '!f'
elif sz == 8:
ot = '!d'
else:
raise Exception('float unpack size '+str(sz)+' unsupported')
return struct.unpack(ot, s)[0]
def __unpackFloat(self, offset):
'''__unpackFloat(offset) -> float
Unpacks float field from plist at given offset
'''
obj_header = self.data[offset]
obj_type, obj_info = (obj_header & 0xF0), (obj_header & 0x0F)
int_sz = 2**obj_info
return int_sz, self.__unpackFloatStruct(int_sz, self.data[offset+1:offset+1+int_sz])
def __unpackDate(self, offset):
td = int(struct.unpack(">d", self.data[offset+1:offset+9])[0])
return datetime(year=2001,month=1,day=1) + timedelta(seconds=td)
def __unpackItem(self, offset):
'''__unpackItem(offset)
Unpacks and returns an item from plist
'''
obj_header = self.data[offset]
obj_type, obj_info = (obj_header & 0xF0), (obj_header & 0x0F)
if obj_type == 0x00:
if obj_info == 0x00: # null 0000 0000
return None
elif obj_info == 0x08: # bool 0000 1000 // false
return False
elif obj_info == 0x09: # bool 0000 1001 // true
return True
elif obj_info == 0x0F: # fill 0000 1111 // fill byte
raise Exception("0x0F Not Implemented") # this is really pad byte, FIXME
else:
raise Exception('unpack item type '+str(obj_header)+' at '+str(offset)+ 'failed')
elif obj_type == 0x10: # int 0001 nnnn ... // # of bytes is 2^nnnn, big-endian bytes
return self.__unpackInt(offset)
elif obj_type == 0x20: # real 0010 nnnn ... // # of bytes is 2^nnnn, big-endian bytes
return self.__unpackFloat(offset)
elif obj_type == 0x30: # date 0011 0011 ... // 8 byte float follows, big-endian bytes
return self.__unpackDate(offset)
elif obj_type == 0x40: # data 0100 nnnn [int] ... // nnnn is number of bytes unless 1111 then int count follows, followed by bytes
obj_count, objref = self.__resolveIntSize(obj_info, offset)
return self.data[objref:objref+obj_count] # XXX: we return data as str
elif obj_type == 0x50: # string 0101 nnnn [int] ... // ASCII string, nnnn is # of chars, else 1111 then int count, then bytes
obj_count, objref = self.__resolveIntSize(obj_info, offset)
return self.data[objref:objref+obj_count]
elif obj_type == 0x60: # string 0110 nnnn [int] ... // Unicode string, nnnn is # of chars, else 1111 then int count, then big-endian 2-byte uint16_t
obj_count, objref = self.__resolveIntSize(obj_info, offset)
return self.data[objref:objref+obj_count*2].decode('utf-16be')
elif obj_type == 0x80: # uid 1000 nnnn ... // nnnn+1 is # of bytes
# FIXME: Accept as a string for now
obj_count, objref = self.__resolveIntSize(obj_info, offset)
return self.data[objref:objref+obj_count]
elif obj_type == 0xA0: # array 1010 nnnn [int] objref* // nnnn is count, unless '1111', then int count follows
obj_count, objref = self.__resolveIntSize(obj_info, offset)
arr = []
for i in range(obj_count):
arr.append(self.__unpackIntStruct(self.object_ref_size, self.data[objref+i*self.object_ref_size:objref+i*self.object_ref_size+self.object_ref_size]))
return arr
elif obj_type == 0xC0: # set 1100 nnnn [int] objref* // nnnn is count, unless '1111', then int count follows
# XXX: not serializable via apple implementation
raise Exception("0xC0 Not Implemented") # FIXME: implement
elif obj_type == 0xD0: # dict 1101 nnnn [int] keyref* objref* // nnnn is count, unless '1111', then int count follows
obj_count, objref = self.__resolveIntSize(obj_info, offset)
keys = []
for i in range(obj_count):
keys.append(self.__unpackIntStruct(self.object_ref_size, self.data[objref+i*self.object_ref_size:objref+i*self.object_ref_size+self.object_ref_size]))
values = []
objref += obj_count*self.object_ref_size
for i in range(obj_count):
values.append(self.__unpackIntStruct(self.object_ref_size, self.data[objref+i*self.object_ref_size:objref+i*self.object_ref_size+self.object_ref_size]))
dic = {}
for i in range(obj_count):
dic[keys[i]] = values[i]
return dic
else:
raise Exception('don\'t know how to unpack obj type '+hex(obj_type)+' at '+str(offset))
def __resolveObject(self, idx):
try:
return self.resolved[idx]
except KeyError:
obj = self.objects[idx]
if type(obj) == list:
newArr = []
for i in obj:
newArr.append(self.__resolveObject(i))
self.resolved[idx] = newArr
return newArr
if type(obj) == dict:
newDic = {}
for k,v in obj.items():
key_resolved = self.__resolveObject(k)
if isinstance(key_resolved, str):
rk = key_resolved
else:
rk = codecs.decode(key_resolved, "utf-8")
rv = self.__resolveObject(v)
newDic[rk] = rv
self.resolved[idx] = newDic
return newDic
else:
self.resolved[idx] = obj
return obj
def parse(self):
# read header
if self.data[:8] != b'bplist00':
raise Exception('Bad magic')
# read trailer
self.offset_size, self.object_ref_size, self.number_of_objects, self.top_object, self.table_offset = struct.unpack('!6xBB4xI4xI4xI', self.data[-32:])
#print "** plist offset_size:",self.offset_size,"objref_size:",self.object_ref_size,"num_objs:",self.number_of_objects,"top:",self.top_object,"table_ofs:",self.table_offset
# read offset table
self.offset_table = self.data[self.table_offset:-32]
self.offsets = []
ot = self.offset_table
for i in range(self.number_of_objects):
offset_entry = ot[:self.offset_size]
ot = ot[self.offset_size:]
self.offsets.append(self.__unpackIntStruct(self.offset_size, offset_entry))
#print "** plist offsets:",self.offsets
# read object table
self.objects = []
k = 0
for i in self.offsets:
obj = self.__unpackItem(i)
#print "** plist unpacked",k,type(obj),obj,"at",i
k += 1
self.objects.append(obj)
# rebuild object tree
#for i in range(len(self.objects)):
# self.__resolveObject(i)
# return root object
return self.__resolveObject(self.top_object)
@classmethod
def plistWithString(cls, s):
parser = cls(s)
return parser.parse()
# helpers for testing
def plist(obj):
from Foundation import NSPropertyListSerialization, NSPropertyListBinaryFormat_v1_0
b = NSPropertyListSerialization.dataWithPropertyList_format_options_error_(obj, NSPropertyListBinaryFormat_v1_0, 0, None)
return str(b.bytes())
def unplist(s):
from Foundation import NSData, NSPropertyListSerialization
d = NSData.dataWithBytes_length_(s, len(s))
return NSPropertyListSerialization.propertyListWithData_options_format_error_(d, 0, None, None)
if __name__ == "__main__":
import os
import sys
import json
file_path = sys.argv[1]
with open(file_path, "rb") as fp:
data = fp.read()
out = BPListReader(data).parse()
with open(file_path + ".json", "w") as fp:
json.dump(out, indent=4)

View File

@@ -15,7 +15,7 @@ from base64 import b64decode, b64encode
from Whatsapp_Chat_Exporter.data_model import ChatStore, Message
from Whatsapp_Chat_Exporter.utility import MAX_SIZE, ROW_SIZE, determine_metadata, get_status_location
from Whatsapp_Chat_Exporter.utility import rendering, Crypt, Device, get_file_name, setup_template
from Whatsapp_Chat_Exporter.utility import brute_force_offset, CRYPT14_OFFSETS
from Whatsapp_Chat_Exporter.utility import brute_force_offset, CRYPT14_OFFSETS, JidType
try:
import zlib
@@ -175,7 +175,6 @@ def messages(db, data, media_folder):
total_row_number = c.fetchone()[0]
print(f"Processing messages...(0/{total_row_number})", end="\r")
phone_number_re = re.compile(r"[0-9]+@s.whatsapp.net")
try:
c.execute("""SELECT messages.key_remote_jid,
messages._id,
@@ -193,12 +192,18 @@ def messages(db, data, media_folder):
messages.key_id,
messages_quotes.data as quoted_data,
messages.media_caption,
missed_call_logs.video_call,
missed_call_logs.video_call,
chat.subject as chat_subject,
message_system.action_type,
message_system_group.is_me_joined,
jid_old.raw_string as old_jid,
jid_new.raw_string as new_jid
jid_new.raw_string as new_jid,
jid_global.type as jid_type,
group_concat(receipt_user.receipt_timestamp) as receipt_timestamp,
group_concat(message.received_timestamp) as received_timestamp,
group_concat(receipt_user.read_timestamp) as read_timestamp,
group_concat(receipt_user.played_timestamp) as played_timestamp,
group_concat(messages.read_device_timestamp) as read_device_timestamp
FROM messages
LEFT JOIN messages_quotes
ON messages.quoted_row_id = messages_quotes._id
@@ -218,7 +223,10 @@ def messages(db, data, media_folder):
ON jid_old._id = message_system_number_change.old_jid_row_id
LEFT JOIN jid jid_new
ON jid_new._id = message_system_number_change.new_jid_row_id
WHERE messages.key_remote_jid <> '-1';"""
LEFT JOIN receipt_user
ON receipt_user.message_row_id = messages._id
WHERE messages.key_remote_jid <> '-1'
GROUP BY message._id;"""
)
except sqlite3.OperationalError:
try:
@@ -244,7 +252,12 @@ def messages(db, data, media_folder):
message_system.action_type,
message_system_group.is_me_joined,
jid_old.raw_string as old_jid,
jid_new.raw_string as new_jid
jid_new.raw_string as new_jid,
jid_global.type as jid_type,
group_concat(receipt_user.receipt_timestamp) as receipt_timestamp,
group_concat(message.received_timestamp) as received_timestamp,
group_concat(receipt_user.read_timestamp) as read_timestamp,
group_concat(receipt_user.played_timestamp) as played_timestamp
FROM message
LEFT JOIN message_quoted
ON message_quoted.message_row_id = message._id
@@ -274,7 +287,10 @@ def messages(db, data, media_folder):
ON jid_old._id = message_system_number_change.old_jid_row_id
LEFT JOIN jid jid_new
ON jid_new._id = message_system_number_change.new_jid_row_id
WHERE key_remote_jid <> '-1';"""
LEFT JOIN receipt_user
ON receipt_user.message_row_id = message._id
WHERE key_remote_jid <> '-1'
GROUP BY message._id;"""
)
except Exception as e:
raise e
@@ -316,7 +332,7 @@ def messages(db, data, media_folder):
i += 1
content = c.fetchone()
continue
if "-" in content["key_remote_jid"] and content["key_from_me"] == 0:
if content["jid_type"] == JidType.GROUP and content["key_from_me"] == 0:
name = fallback = None
if table_message:
if content["sender_jid_row_id"] > 0:
@@ -390,8 +406,7 @@ def messages(db, data, media_folder):
message.data = None
else:
# Real message
if content["media_wa_type"] == 20: # Sticker is a message
message.sticker = True
message.sticker = content["media_wa_type"] == 20 # Sticker is a message
if content["key_from_me"] == 1:
if content["status"] == 5 and content["edit_version"] == 7 or table_message and content["media_wa_type"] == 15:
msg = "Message deleted"
@@ -459,6 +474,7 @@ def media(db, data, media_folder):
ON message_media.message_row_id = messages._id
LEFT JOIN media_hash_thumbnail
ON message_media.file_hash = media_hash_thumbnail.media_hash
WHERE jid.type <> 7
ORDER BY messages.key_remote_jid ASC"""
)
except sqlite3.OperationalError:
@@ -471,14 +487,15 @@ def media(db, data, media_folder):
file_hash,
thumbnail
FROM message_media
INNER JOIN message
ON message_media.message_row_id = message._id
LEFT JOIN chat
ON chat._id = message.chat_row_id
INNER JOIN jid
ON jid._id = chat.jid_row_id
LEFT JOIN media_hash_thumbnail
INNER JOIN message
ON message_media.message_row_id = message._id
LEFT JOIN chat
ON chat._id = message.chat_row_id
INNER JOIN jid
ON jid._id = chat.jid_row_id
LEFT JOIN media_hash_thumbnail
ON message_media.file_hash = media_hash_thumbnail.media_hash
WHERE jid.type <> 7
ORDER BY jid.raw_string ASC"""
)
content = c.fetchone()
@@ -528,7 +545,7 @@ def media(db, data, media_folder):
f"Processing media...({total_row_number}/{total_row_number})", end="\r")
def vcard(db, data):
def vcard(db, data, media_folder):
c = db.cursor()
try:
c.execute("""SELECT message_row_id,
@@ -558,14 +575,14 @@ def vcard(db, data):
rows = c.fetchall()
total_row_number = len(rows)
print(f"\nProcessing vCards...(0/{total_row_number})", end="\r")
base = "WhatsApp/vCards"
if not os.path.isdir(base):
Path(base).mkdir(parents=True, exist_ok=True)
path = f"{media_folder}/vCards"
if not os.path.isdir(path):
Path(path).mkdir(parents=True, exist_ok=True)
for index, row in enumerate(rows):
media_name = row["media_name"] if row["media_name"] is not None else ""
file_name = "".join(x for x in media_name if x.isalnum())
file_name = file_name.encode('utf-8')[:230].decode('utf-8', 'ignore')
file_path = os.path.join(base, f"{file_name}.vcf")
file_path = os.path.join(path, f"{file_name}.vcf")
if not os.path.isfile(file_path):
with open(file_path, "w", encoding="utf-8") as f:
f.write(row["vcard"])
@@ -593,10 +610,13 @@ def calls(db, data):
video_call,
duration,
call_result,
bytes_transferred
bytes_transferred,
chat.subject as chat_subject
FROM call_log
INNER JOIN jid
ON call_log.jid_row_id = jid._id"""
ON call_log.jid_row_id = jid._id
LEFT JOIN chat
ON call_log.jid_row_id = chat.jid_row_id"""
)
chat = ChatStore(Device.ANDROID, "WhatsApp Calls")
content = c.fetchone()
@@ -608,16 +628,17 @@ def calls(db, data):
key_id=content["call_id"],
)
_jid = content["raw_string"]
if _jid in data:
name = data[_jid].name
fallback = _jid.split('@')[0] if "@" in _jid else None
call.sender = name or fallback
name = data[_jid].name if _jid in data else content["chat_subject"] or None
if _jid is not None and "@" in _jid:
fallback = _jid.split('@')[0]
else:
fallback = None
call.sender = name or fallback
call.meta = True
call.data = (
f"A {'video' if content['video_call'] else 'voice'} "
f"call {'to' if call.from_me else 'from'} "
f"{name or fallback} was "
f"{call.sender} was "
)
if content['call_result'] == 2:
call.data += "not answered." if call.from_me else "missed."

View File

@@ -244,7 +244,7 @@ def media(db, data, media_folder):
f"Processing media...({total_row_number}/{total_row_number})", end="\r")
def vcard(db, data):
def vcard(db, data, media_folder):
c = db.cursor()
c.execute("""SELECT DISTINCT ZWAVCARDMENTION.ZMEDIAITEM,
ZWAMEDIAITEM.ZMESSAGE,
@@ -260,13 +260,13 @@ def vcard(db, data):
contents = c.fetchall()
total_row_number = len(contents)
print(f"\nProcessing vCards...(0/{total_row_number})", end="\r")
base = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared/Message/vCards"
if not os.path.isdir(base):
Path(base).mkdir(parents=True, exist_ok=True)
path = f'{media_folder}/Message/vCards'
if not os.path.isdir(path):
Path(path).mkdir(parents=True, exist_ok=True)
for index, content in enumerate(contents):
file_name = "".join(x for x in content["ZVCARDNAME"] if x.isalnum())
file_name = file_name.encode('utf-8')[:230].decode('utf-8', 'ignore')
file_path = os.path.join(base, f"{file_name}.vcf")
file_path = os.path.join(path, f"{file_name}.vcf")
if not os.path.isfile(file_path):
with open(file_path, "w", encoding="utf-8") as f:
f.write(content["ZVCARDSTRING"])

View File

@@ -6,29 +6,38 @@ import os
import time
import getpass
import threading
from Whatsapp_Chat_Exporter.utility import WhatsAppIdentifier
try:
from iphone_backup_decrypt import EncryptedBackup, RelativePath
from iphone_backup_decrypt import FailedToDecryptError, Domain
from iphone_backup_decrypt import FailedToDecryptError
except ModuleNotFoundError:
support_encrypted = False
else:
support_encrypted = True
def extract_encrypted(base_dir, password):
def extract_encrypted(base_dir, password, identifiers, bplist_reader=None):
backup = EncryptedBackup(backup_directory=base_dir, passphrase=password, cleanup=False, check_same_thread=False)
print("Decrypting WhatsApp database...")
print("Decrypting WhatsApp database...", end="")
try:
backup.extract_file(relative_path=RelativePath.WHATSAPP_MESSAGES,
output_filename="7c7fba66680ef796b916b067077cc246adacf01d")
backup.extract_file(relative_path=RelativePath.WHATSAPP_CONTACTS,
output_filename="b8548dc30aa1030df0ce18ef08b882cf7ab5212f")
backup.extract_file(
relative_path=RelativePath.WHATSAPP_MESSAGES,
domain=identifiers.DOMAIN,
output_filename=identifiers.MESSAGE
)
backup.extract_file(
relative_path=RelativePath.WHATSAPP_CONTACTS,
domain=identifiers.DOMAIN,
output_filename=identifiers.CONTACT
)
except FailedToDecryptError:
print("Failed to decrypt backup: incorrect password?")
exit()
else:
print("Done")
extract_thread = threading.Thread(
target=backup.extract_files_by_domain,
args=(Domain.WHATSAPP, Domain.WHATSAPP)
args=(identifiers.DOMAIN, identifiers.DOMAIN, bplist_reader)
)
extract_thread.daemon = True
extract_thread.start()
@@ -61,7 +70,10 @@ def is_encrypted(base_dir):
return False
def extract_media(base_dir):
def extract_media(base_dir, identifiers, preserve_timestamp=False):
if preserve_timestamp:
from Whatsapp_Chat_Exporter.bplist import BPListReader
preserve_timestamp = BPListReader
if is_encrypted(base_dir):
if not support_encrypted:
print("You don't have the dependencies to handle encrypted backup.")
@@ -70,21 +82,24 @@ def extract_media(base_dir):
return False
print("Encryption detected on the backup!")
password = getpass.getpass("Enter the password for the backup:")
extract_encrypted(base_dir, password)
extract_encrypted(base_dir, password, identifiers, preserve_timestamp)
else:
wts_db = os.path.join(base_dir, "7c/7c7fba66680ef796b916b067077cc246adacf01d")
contact_db = os.path.join(base_dir, "b8/b8548dc30aa1030df0ce18ef08b882cf7ab5212f")
wts_db = os.path.join(base_dir, identifiers.MESSAGE[:2], identifiers.MESSAGE)
contact_db = os.path.join(base_dir, identifiers.CONTACT[:2], identifiers.CONTACT)
if not os.path.isfile(wts_db):
print("WhatsApp database not found.")
if identifiers is WhatsAppIdentifier:
print("WhatsApp database not found.")
else:
print("WhatsApp Business database not found.")
exit()
else:
shutil.copyfile(wts_db, "7c7fba66680ef796b916b067077cc246adacf01d")
shutil.copyfile(wts_db, identifiers.MESSAGE)
if not os.path.isfile(contact_db):
print("Contact database not found.")
exit()
else:
shutil.copyfile(contact_db, "b8548dc30aa1030df0ce18ef08b882cf7ab5212f")
_wts_id = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared"
shutil.copyfile(contact_db, identifiers.CONTACT)
_wts_id = identifiers.DOMAIN
with sqlite3.connect(os.path.join(base_dir, "Manifest.db")) as manifest:
manifest.row_factory = sqlite3.Row
c = manifest.cursor()
@@ -98,6 +113,7 @@ def extract_media(base_dir):
c.execute(f"""SELECT fileID,
relativePath,
flags,
file AS metadata,
ROW_NUMBER() OVER(ORDER BY relativePath) AS _index
FROM Files
WHERE domain = '{_wts_id}'
@@ -120,6 +136,11 @@ def extract_media(base_dir):
pass
elif flags == 1:
shutil.copyfile(os.path.join(base_dir, folder, hashes), destination)
if preserve_timestamp:
metadata = BPListReader(row["metadata"]).parse()
creation = metadata["$objects"][1]["Birth"]
modification = metadata["$objects"][1]["LastModified"]
os.utime(destination, (modification, modification))
if row["_index"] % 100 == 0:
print(f"Extracting WhatsApp files...({row['_index']}/{total_row_number})", end="\r")
row = c.fetchone()

View File

@@ -7,13 +7,15 @@ from datetime import datetime
from enum import IntEnum
from Whatsapp_Chat_Exporter.data_model import ChatStore
try:
from enum import StrEnum
from enum import StrEnum, IntEnum
except ImportError:
# < Python 3.11
from enum import Enum
class StrEnum(str, Enum):
pass
class IntEnum(int, Enum):
pass
MAX_SIZE = 4 * 1024 * 1024 # Default 4MB
ROW_SIZE = 0x3D0
@@ -290,3 +292,21 @@ def setup_template(template, no_avatar):
# iOS Specific
APPLE_TIME = datetime.timestamp(datetime(2001, 1, 1))
class WhatsAppIdentifier(StrEnum):
MESSAGE = "7c7fba66680ef796b916b067077cc246adacf01d"
CONTACT = "b8548dc30aa1030df0ce18ef08b882cf7ab5212f"
DOMAIN = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared"
class WhatsAppBusinessIdentifier(StrEnum):
MESSAGE = "724bd3b98b18518b455a87c1f3ac3a0d189c4466"
CONTACT = "d7246a707f51ddf8b17ee2dddabd9e0a4da5c552"
DOMAIN = "AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared"
class JidType(IntEnum):
PM = 0
GROUP = 1
SYSTEM_BROADCAST = 5
STATUS = 11

View File

@@ -139,7 +139,7 @@
</div>
{% else %}
{% if msg.media == false %}
{{ msg.data | sanitize_except() }}
{{ msg.data | sanitize_except() | urlize(none, true, '_blank') }}
{% else %}
{% if "image/" in msg.mime %}
<a href="{{ msg.data }}">
@@ -162,7 +162,7 @@
{% endif %}
{% if msg.caption is not none %}
<br>
{{ msg.caption }}
{{ msg.caption | urlize(none, true, '_blank') }}
{% endif %}
{% endif %}
{% endif %}
@@ -223,7 +223,7 @@
</div>
{% else %}
{% if msg.media == false %}
{{ msg.data | sanitize_except() }}
{{ msg.data | sanitize_except() | urlize(none, true, '_blank') }}
{% else %}
{% if "image/" in msg.mime %}
<a href="{{ msg.data }}">
@@ -246,7 +246,7 @@
{% endif %}
{% if msg.caption is not none %}
<br>
{{ msg.caption }}
{{ msg.caption | urlize(none, true, '_blank') }}
{% endif %}
{% endif %}
{% endif %}