Merge branch 'dev'

This commit is contained in:
KnugiHK
2025-04-27 11:34:45 +08:00
15 changed files with 3533 additions and 1801 deletions

116
README.md
View File

@@ -145,55 +145,73 @@ After extracting, you will get these:
Invoke the wtsexporter with --help option will show you all options available.
```sh
> wtsexporter --help
usage: wtsexporter [-h] [-a] [-i] [-e EXPORTED] [-w WA] [-m MEDIA] [-b BACKUP] [-o OUTPUT] [-j [JSON]]
[--avoid-encoding-json] [--pretty-print-json [PRETTY_PRINT_JSON]] [-d DB] [-k KEY] [-t TEMPLATE]
[-s] [-c] [--offline OFFLINE] [--size [SIZE]] [--no-html] [--check-update] [--assume-first-as-me]
[--no-avatar] [--import] [--business] [--wab WAB] [--time-offset {-12 to 14}] [--date DATE]
usage: wtsexporter [-h] [-a] [-i] [-e EXPORTED] [-w WA] [-m MEDIA] [-b BACKUP] [-d DB] [-k [KEY]]
[--call-db [CALL_DB_IOS]] [--wab WAB] [-o OUTPUT] [-j [JSON]] [--txt [TEXT_FORMAT]] [--no-html]
[--size [SIZE]] [--avoid-encoding-json] [--pretty-print-json [PRETTY_PRINT_JSON]] [--per-chat]
[--import] [-t TEMPLATE] [--offline OFFLINE] [--no-avatar] [--experimental-new-theme]
[--headline HEADLINE] [-c] [--create-separated-media] [--time-offset {-12 to 14}] [--date DATE]
[--date-format FORMAT] [--include [phone number ...]] [--exclude [phone number ...]]
[--dont-filter-empty] [--per-chat] [--create-separated-media]
[--decrypt-chunk-size DECRYPT_CHUNK_SIZE] [--enrich-from-vcards ENRICH_FROM_VCARDS]
[--default-country-code DEFAULT_CONTRY_CODE] [--txt [TEXT_FORMAT]] [--experimental-new-theme]
[--call-db [CALL_DB_IOS]] [--headline HEADLINE]
[--dont-filter-empty] [--enrich-from-vcards ENRICH_FROM_VCARDS]
[--default-country-code DEFAULT_COUNTRY_CODE] [-s] [--check-update] [--assume-first-as-me]
[--business] [--decrypt-chunk-size DECRYPT_CHUNK_SIZE]
[--max-bruteforce-worker MAX_BRUTEFORCE_WORKER]
A customizable Android and iOS/iPadOS WhatsApp database parser that will give you the history of your WhatsApp
conversations in HTML and JSON. Android Backup Crypt12, Crypt14 and Crypt15 supported.
options:
-h, --help show this help message and exit
Device Type:
-a, --android Define the target as Android
-i, --ios, Define the target as iPhone/iPad
-e EXPORTED, --exported EXPORTED
-i, --ios Define the target as iPhone/iPad
-e, --exported EXPORTED
Define the target as exported chat file and specify the path to the file
-w WA, --wa WA Path to contact database (default: wa.db/ContactsV2.sqlite)
-m MEDIA, --media MEDIA
Path to WhatsApp media folder (default: WhatsApp)
-b BACKUP, --backup BACKUP
Path to Android (must be used together with -k)/iOS WhatsApp backup
-o OUTPUT, --output OUTPUT
Output to specific directory (default: result)
-j [JSON], --json [JSON]
Save the result to a single JSON file (default if present: result.json)
Input Files:
-w, --wa WA Path to contact database (default: wa.db/ContactsV2.sqlite)
-m, --media MEDIA Path to WhatsApp media folder (default: WhatsApp)
-b, --backup BACKUP Path to Android (must be used together with -k)/iOS WhatsApp backup
-d, --db DB Path to database file (default: msgstore.db/7c7fba66680ef796b916b067077cc246adacf01d)
-k, --key [KEY] Path to key file. If this option is set for crypt15 backup but nothing is specified, you will
be prompted to enter the key.
--call-db [CALL_DB_IOS]
Path to call database (default: 1b432994e958845fffe8e2f190f26d1511534088) iOS only
--wab, --wa-backup WAB
Path to contact database in crypt15 format
Output Options:
-o, --output OUTPUT Output to specific directory (default: result)
-j, --json [JSON] Save the result to a single JSON file (default if present: result.json)
--txt [TEXT_FORMAT] Export chats in text format similar to what WhatsApp officially provided (default if present:
result/)
--no-html Do not output html files
--size, --output-size, --split [SIZE]
Maximum (rough) size of a single output file in bytes, 0 for auto
JSON Options:
--avoid-encoding-json
Don't encode non-ascii characters in the output JSON files
--pretty-print-json [PRETTY_PRINT_JSON]
Pretty print the output JSON.
-d DB, --db DB Path to database file (default: msgstore.db/7c7fba66680ef796b916b067077cc246adacf01d)
-k KEY, --key KEY Path to key file
-t TEMPLATE, --template TEMPLATE
Path to custom HTML template
-s, --showkey Show the HEX key used to decrypt the database
-c, --move-media Move the media directory to output directory if the flag is set, otherwise copy it
--offline OFFLINE Relative path to offline static files
--size [SIZE], --output-size [SIZE], --split [SIZE]
Maximum (rough) size of a single output file in bytes, 0 for auto
--no-html Do not output html files
--check-update Check for updates (require Internet access)
--assume-first-as-me Assume the first message in a chat as sent by me (must be used together with -e)
--no-avatar Do not render avatar in HTML output
--per-chat Output the JSON file per chat
--import Import JSON file and convert to HTML output
--business Use Whatsapp Business default files (iOS only)
--wab WAB, --wa-backup WAB
Path to contact database in crypt15 format
HTML Options:
-t, --template TEMPLATE
Path to custom HTML template
--offline OFFLINE Relative path to offline static files
--no-avatar Do not render avatar in HTML output
--experimental-new-theme
Use the newly designed WhatsApp-alike theme
--headline HEADLINE The custom headline for the HTML output. Use '??' as a placeholder for the chat name
Media Handling:
-c, --move-media Move the media directory to output directory if the flag is set, otherwise copy it
--create-separated-media
Create a copy of the media seperated per chat in <MEDIA>/separated/ directory
Filtering Options:
--time-offset {-12 to 14}
Offset in hours (-12 to 14) for time displayed in the output
--date DATE The date filter in specific format (inclusive)
@@ -204,26 +222,26 @@ options:
Exclude chats that match the supplied phone number
--dont-filter-empty By default, the exporter will not render chats with no valid message. Setting this flag will
cause the exporter to render those. This is useful if chat(s) are missing from the output
--per-chat Output the JSON file per chat
--create-separated-media
Create a copy of the media seperated per chat in <MEDIA>/separated/ directory
--decrypt-chunk-size DECRYPT_CHUNK_SIZE
Specify the chunk size for decrypting iOS backup, which may affect the decryption speed.
Contact Enrichment:
--enrich-from-vcards ENRICH_FROM_VCARDS
Path to an exported vcf file from Google contacts export. Add names missing from WhatsApp's
default database
--default-country-code DEFAULT_CONTRY_CODE
--default-country-code DEFAULT_COUNTRY_CODE
Use with --enrich-from-vcards. When numbers in the vcf file does not have a country code, this
will be used. 1 is for US, 66 for Thailand etc. Most likely use the number of your own country
--txt [TEXT_FORMAT] Export chats in text format similar to what WhatsApp officially provided (default if present:
result/)
--experimental-new-theme
Use the newly designed WhatsApp-alike theme
--call-db [CALL_DB_IOS]
Path to call database (default: 1b432994e958845fffe8e2f190f26d1511534088) iOS only
--headline HEADLINE The custom headline for the HTML output. Use '??' as a placeholder for the chat name
WhatsApp Chat Exporter: 0.11.2 Licensed with MIT. See https://wts.knugi.dev/docs?dest=osl for all open source
Miscellaneous:
-s, --showkey Show the HEX key used to decrypt the database
--check-update Check for updates (require Internet access)
--assume-first-as-me Assume the first message in a chat as sent by me (must be used together with -e)
--business Use Whatsapp Business default files (iOS only)
--decrypt-chunk-size DECRYPT_CHUNK_SIZE
Specify the chunk size for decrypting iOS backup, which may affect the decryption speed.
--max-bruteforce-worker MAX_BRUTEFORCE_WORKER
Specify the maximum number of worker for bruteforce decryption.
WhatsApp Chat Exporter: 0.12.0 Licensed with MIT. See https://wts.knugi.dev/docs?dest=osl for all open source
licenses.
```

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,328 @@
import hmac
import io
import zlib
import concurrent.futures
from typing import Tuple, Union
from hashlib import sha256
from sys import exit
from Whatsapp_Chat_Exporter.utility import CRYPT14_OFFSETS, Crypt, DbType
try:
import zlib
from Crypto.Cipher import AES
except ModuleNotFoundError:
support_backup = False
else:
support_backup = True
try:
import javaobj
except ModuleNotFoundError:
support_crypt15 = False
else:
support_crypt15 = True
class DecryptionError(Exception):
"""Base class for decryption-related exceptions."""
pass
class InvalidKeyError(DecryptionError):
"""Raised when the provided key is invalid."""
pass
class InvalidFileFormatError(DecryptionError):
"""Raised when the input file format is invalid."""
pass
class OffsetNotFoundError(DecryptionError):
"""Raised when the correct offsets for decryption cannot be found."""
pass
def _derive_main_enc_key(key_stream: bytes) -> Tuple[bytes, bytes]:
"""
Derive the main encryption key for the given key stream.
Args:
key_stream (bytes): The key stream to generate HMAC of HMAC.
Returns:
Tuple[bytes, bytes]: A tuple containing the main encryption key and the original key stream.
"""
intermediate_hmac = hmac.new(b'\x00' * 32, key_stream, sha256).digest()
key = hmac.new(intermediate_hmac, b"backup encryption\x01", sha256).digest()
return key, key_stream
def _extract_enc_key(keyfile: bytes) -> Tuple[bytes, bytes]:
"""
Extract the encryption key from the keyfile.
Args:
keyfile (bytes): The keyfile containing the encrypted key.
Returns:
Tuple[bytes, bytes]: values from _derive_main_enc_key()
"""
key_stream = b''.join([byte.to_bytes(1, "big", signed=True) for byte in javaobj.loads(keyfile)])
return _derive_main_enc_key(key_stream)
def brute_force_offset(max_iv: int = 200, max_db: int = 200):
"""
Brute force the offsets for IV and database start position in WhatsApp backup files.
Args:
max_iv (int, optional): Maximum value to try for IV offset. Defaults to 200.
max_db (int, optional): Maximum value to try for database start offset. Defaults to 200.
Yields:
tuple: A tuple containing:
- int: Start position of IV
- int: End position of IV (start + 16)
- int: Start position of database
"""
for iv in range(0, max_iv):
for db in range(0, max_db):
yield iv, iv + 16, db
def _decrypt_database(db_ciphertext: bytes, main_key: bytes, iv: bytes) -> bytes:
"""Decrypt and decompress a database chunk.
Args:
db_ciphertext (bytes): The encrypted chunk of the database.
main_key (bytes): The main decryption key.
iv (bytes): The initialization vector.
Returns:
bytes: The decrypted and decompressed database.
Raises:
zlib.error: If decompression fails.
ValueError: if the plaintext is not a SQLite database.
"""
cipher = AES.new(main_key, AES.MODE_GCM, iv)
db_compressed = cipher.decrypt(db_ciphertext)
db = zlib.decompress(db_compressed)
if db[0:6].upper() != b"SQLITE":
raise ValueError(
"The plaintext is not a SQLite database. Ensure you are using the correct key."
)
return db
def _decrypt_crypt14(database: bytes, main_key: bytes, max_worker: int = 10) -> bytes:
"""Decrypt a crypt14 database using multithreading for brute-force offset detection.
Args:
database (bytes): The encrypted database.
main_key (bytes): The decryption key.
max_worker (int, optional): The maximum number of threads to use for brute force. Defaults to 10.
Returns:
bytes: The decrypted database.
Raises:
InvalidFileFormatError: If the file is too small.
OffsetNotFoundError: If no valid offsets are found.
"""
if len(database) < 191:
raise InvalidFileFormatError("The crypt14 file must be at least 191 bytes")
# Attempt known offsets first
for offsets in CRYPT14_OFFSETS:
iv = database[offsets["iv"]:offsets["iv"] + 16]
db_ciphertext = database[offsets["db"]:]
try:
return _decrypt_database(db_ciphertext, main_key, iv)
except (zlib.error, ValueError):
pass # Try next offset
print("Common offsets failed. Initiating brute-force with multithreading...")
# Convert brute force generator into a list for parallel processing
offset_combinations = list(brute_force_offset())
def attempt_decrypt(offset_tuple):
"""Attempt decryption with the given offsets."""
start_iv, end_iv, start_db = offset_tuple
iv = database[start_iv:end_iv]
db_ciphertext = database[start_db:]
try:
db = _decrypt_database(db_ciphertext, main_key, iv)
print(
f"The offsets of your IV and database are {start_iv} and "
f"{start_db}, respectively. To include your offsets in the "
"program, please report it by creating an issue on GitHub: "
"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/discussions/47"
"\nShutting down other threads..."
)
return db
except (zlib.error, ValueError):
return None # Decryption failed, move to next
with concurrent.futures.ThreadPoolExecutor(max_worker) as executor:
future_to_offset = {executor.submit(attempt_decrypt, offset): offset for offset in offset_combinations}
try:
for future in concurrent.futures.as_completed(future_to_offset):
result = future.result()
if result is not None:
# Shutdown remaining threads
executor.shutdown(wait=False, cancel_futures=True)
return result
except KeyboardInterrupt:
print("\nBrute force interrupted by user (Ctrl+C). Exiting gracefully...")
executor.shutdown(wait=False, cancel_futures=True)
exit(1)
raise OffsetNotFoundError("Could not find the correct offsets for decryption.")
def _decrypt_crypt12(database: bytes, main_key: bytes) -> bytes:
"""Decrypt a crypt12 database.
Args:
database (bytes): The encrypted database.
main_key (bytes): The decryption key.
Returns:
bytes: The decrypted database.
Raises:
ValueError: If the file format is invalid or the signature mismatches.
"""
if len(database) < 67:
raise InvalidFileFormatError("The crypt12 file must be at least 67 bytes")
t2 = database[3:35]
iv = database[51:67]
db_ciphertext = database[67:-20]
return _decrypt_database(db_ciphertext, main_key, iv)
def _decrypt_crypt15(database: bytes, main_key: bytes, db_type: DbType) -> bytes:
"""Decrypt a crypt15 database.
Args:
database (bytes): The encrypted database.
main_key (bytes): The decryption key.
db_type (DbType): The type of database.
Returns:
bytes: The decrypted database.
Raises:
ValueError: If the file format is invalid or the signature mismatches.
"""
if not support_crypt15:
raise RuntimeError("Crypt15 is not supported")
if len(database) < 131:
raise InvalidFileFormatError("The crypt15 file must be at least 131 bytes")
if db_type == DbType.MESSAGE:
iv = database[8:24]
db_offset = database[0] + 2
elif db_type == DbType.CONTACT:
iv = database[7:23]
db_offset = database[0] + 1
else:
raise ValueError(f"Invalid db_type: {db_type}")
db_ciphertext = database[db_offset:]
return _decrypt_database(db_ciphertext, main_key, iv)
def decrypt_backup(
database: bytes,
key: Union[str, io.IOBase],
output: str = None,
crypt: Crypt = Crypt.CRYPT14,
show_crypt15: bool = False,
db_type: DbType = DbType.MESSAGE,
*,
dry_run: bool = False,
keyfile_stream: bool = False,
max_worker: int = 10
) -> int:
"""
Decrypt the WhatsApp backup database.
Args:
database (bytes): The encrypted database file.
key (str or io.IOBase): The key to decrypt the database.
output (str, optional): The path to save the decrypted database. Defaults to None.
crypt (Crypt, optional): The encryption version of the database. Defaults to Crypt.CRYPT14.
show_crypt15 (bool, optional): Whether to show the HEX key of the crypt15 backup. Defaults to False.
db_type (DbType, optional): The type of database (MESSAGE or CONTACT). Defaults to DbType.MESSAGE.
dry_run (bool, optional): Whether to perform a dry run. Defaults to False.
keyfile_stream (bool, optional): Whether the key is a key stream. Defaults to False.
Returns:
int: The status code of the decryption process (0 for success).
Raises:
ValueError: If the key is invalid or output file not provided when dry_run is False.
DecryptionError: for errors during decryption
RuntimeError: for dependency errors
"""
if not support_backup:
raise RuntimeError("Dependencies for backup decryption are not available.")
if not dry_run and output is None:
raise ValueError(
"The path to the decrypted database must be specified unless dry_run is true."
)
if isinstance(key, io.IOBase):
key = key.read()
if crypt is not Crypt.CRYPT15 and len(key) != 158:
raise InvalidKeyError("The key file must be 158 bytes")
#signature check, this is check is used in crypt 12 and 14
if crypt != Crypt.CRYPT15:
t1 = key[30:62]
if t1 != database[15:47] and crypt == Crypt.CRYPT14:
raise ValueError("The signature of key file and backup file mismatch")
if t1 != database[3:35] and crypt == Crypt.CRYPT12:
raise ValueError("The signature of key file and backup file mismatch")
if crypt == Crypt.CRYPT15:
if keyfile_stream:
main_key, hex_key = _extract_enc_key(key)
else:
main_key, hex_key = _derive_main_enc_key(key)
if show_crypt15:
hex_key_str = ' '.join([hex_key.hex()[c:c+4] for c in range(0, len(hex_key.hex()), 4)])
print(f"The HEX key of the crypt15 backup is: {hex_key_str}")
else:
main_key = key[126:]
try:
if crypt == Crypt.CRYPT14:
db = _decrypt_crypt14(database, main_key, max_worker)
elif crypt == Crypt.CRYPT12:
db = _decrypt_crypt12(database, main_key)
elif crypt == Crypt.CRYPT15:
db = _decrypt_crypt15(database, main_key, db_type)
else:
raise ValueError(f"Unsupported crypt type: {crypt}")
except (InvalidFileFormatError, OffsetNotFoundError, ValueError) as e:
raise DecryptionError(f"Decryption failed: {e}") from e
if not dry_run:
with open(output, "wb") as f:
f.write(db)
return 0

File diff suppressed because it is too large Load Diff

View File

@@ -234,7 +234,7 @@ class BPListReader(object):
# read trailer
self.offset_size, self.object_ref_size, self.number_of_objects, self.top_object, self.table_offset = struct.unpack('!6xBB4xI4xI4xI', self.data[-32:])
#print "** plist offset_size:",self.offset_size,"objref_size:",self.object_ref_size,"num_objs:",self.number_of_objects,"top:",self.top_object,"table_ofs:",self.table_offset
# read offset table
self.offset_table = self.data[self.table_offset:-32]
self.offsets = []

View File

@@ -1,25 +1,172 @@
#!/usr/bin/python3
import os
from datetime import datetime, tzinfo, timedelta
from typing import Union
from typing import MutableMapping, Union, Optional, Dict, Any
class Timing:
"""
Handles timestamp formatting with timezone support.
"""
def __init__(self, timezone_offset: Optional[int]) -> None:
"""
Initialize Timing object.
Args:
timezone_offset (Optional[int]): Hours offset from UTC
"""
self.timezone_offset = timezone_offset
def format_timestamp(self, timestamp: Optional[Union[int, float]], format: str) -> Optional[str]:
"""
Format a timestamp with the specified format string.
Args:
timestamp (Optional[Union[int, float]]): Unix timestamp to format
format (str): strftime format string
Returns:
Optional[str]: Formatted timestamp string, or None if timestamp is None
"""
if timestamp:
timestamp = timestamp / 1000 if timestamp > 9999999999 else timestamp
return datetime.fromtimestamp(timestamp, TimeZone(self.timezone_offset)).strftime(format)
return None
class TimeZone(tzinfo):
def __init__(self, offset):
"""
Custom timezone class with fixed offset.
"""
def __init__(self, offset: int) -> None:
"""
Initialize TimeZone object.
Args:
offset (int): Hours offset from UTC
"""
self.offset = offset
def utcoffset(self, dt):
return timedelta(hours=self.offset)
def dst(self, dt):
return timedelta(0)
def utcoffset(self, dt: Optional[datetime]) -> timedelta:
"""Get UTC offset."""
return timedelta(hours=self.offset)
def dst(self, dt: Optional[datetime]) -> timedelta:
"""Get DST offset (always 0)."""
return timedelta(0)
class ChatStore():
def __init__(self, type, name=None, media=None):
class ChatCollection(MutableMapping):
"""
A collection of chats that provides dictionary-like access with additional chat management methods.
Inherits from MutableMapping to implement a custom dictionary-like behavior.
"""
def __init__(self) -> None:
"""Initialize an empty chat collection."""
self._chats: Dict[str, ChatStore] = {}
def __getitem__(self, key: str) -> 'ChatStore':
"""Get a chat by its ID. Required for dict-like access."""
return self._chats[key]
def __setitem__(self, key: str, value: 'ChatStore') -> None:
"""Set a chat by its ID. Required for dict-like access."""
if not isinstance(value, ChatStore):
raise TypeError("Value must be a ChatStore object")
self._chats[key] = value
def __delitem__(self, key: str) -> None:
"""Delete a chat by its ID. Required for dict-like access."""
del self._chats[key]
def __iter__(self):
"""Iterate over chat IDs. Required for dict-like access."""
return iter(self._chats)
def __len__(self) -> int:
"""Get number of chats. Required for dict-like access."""
return len(self._chats)
def get_chat(self, chat_id: str) -> Optional['ChatStore']:
"""
Get a chat by its ID.
Args:
chat_id (str): The ID of the chat to retrieve
Returns:
Optional['ChatStore']: The chat if found, None otherwise
"""
return self._chats.get(chat_id)
def add_chat(self, chat_id: str, chat: 'ChatStore') -> None:
"""
Add a new chat to the collection.
Args:
chat_id (str): The ID for the chat
chat (ChatStore): The chat to add
Raises:
TypeError: If chat is not a ChatStore object
"""
if not isinstance(chat, ChatStore):
raise TypeError("Chat must be a ChatStore object")
self._chats[chat_id] = chat
return self._chats[chat_id]
def remove_chat(self, chat_id: str) -> None:
"""
Remove a chat from the collection.
Args:
chat_id (str): The ID of the chat to remove
"""
if chat_id in self._chats:
del self._chats[chat_id]
def items(self):
"""Get chat items (id, chat) pairs."""
return self._chats.items()
def values(self):
"""Get all chats."""
return self._chats.values()
def keys(self):
"""Get all chat IDs."""
return self._chats.keys()
def to_dict(self) -> Dict[str, Any]:
"""
Convert the collection to a dictionary.
Returns:
Dict[str, Any]: Dictionary representation of all chats
"""
return {chat_id: chat.to_json() for chat_id, chat in self._chats.items()}
class ChatStore:
"""
Stores chat information and messages.
"""
def __init__(self, type: str, name: Optional[str] = None, media: Optional[str] = None) -> None:
"""
Initialize ChatStore object.
Args:
type (str): Device type (IOS or ANDROID)
name (Optional[str]): Chat name
media (Optional[str]): Path to media folder
Raises:
TypeError: If name is not a string or None
"""
if name is not None and not isinstance(name, str):
raise TypeError("Name must be a string or None")
self.name = name
self.messages = {}
self._messages: Dict[str, 'Message'] = {}
self.type = type
if media is not None:
from Whatsapp_Chat_Exporter.utility import Device
@@ -36,17 +183,27 @@ class ChatStore():
self.status = None
self.media_base = ""
def add_message(self, id, message):
def __len__(self) -> int:
"""Get number of chats. Required for dict-like access."""
return len(self._messages)
def add_message(self, id: str, message: 'Message') -> None:
"""Add a message to the chat store."""
if not isinstance(message, Message):
raise TypeError("message must be a Message object")
self.messages[id] = message
self._messages[id] = message
def get_message(self, id: str) -> 'Message':
"""Get a message from the chat store."""
return self._messages.get(id)
def delete_message(self, id):
if id in self.messages:
del self.messages[id]
def delete_message(self, id: str) -> None:
"""Delete a message from the chat store."""
if id in self._messages:
del self._messages[id]
def to_json(self):
serialized_msgs = {id: msg.to_json() for id, msg in self.messages.items()}
def to_json(self) -> Dict[str, Any]:
"""Convert chat store to JSON-serializable dict."""
return {
'name': self.name,
'type': self.type,
@@ -54,26 +211,69 @@ class ChatStore():
'their_avatar': self.their_avatar,
'their_avatar_thumb': self.their_avatar_thumb,
'status': self.status,
'messages': serialized_msgs
'messages': {id: msg.to_json() for id, msg in self._messages.items()}
}
def get_last_message(self):
return tuple(self.messages.values())[-1]
def get_last_message(self) -> 'Message':
"""Get the most recent message in the chat."""
return tuple(self._messages.values())[-1]
def items(self):
"""Get message items pairs."""
return self._messages.items()
def get_messages(self):
return self.messages.values()
def values(self):
"""Get all messages in the chat."""
return self._messages.values()
def keys(self):
"""Get all message keys in the chat."""
return self._messages.keys()
class Message():
def __init__(self, from_me: Union[bool,int], timestamp: int, time: Union[int,float,str], key_id: int, timezone_offset: int = 0, message_type: int = None):
class Message:
"""
Represents a single message in a chat.
"""
def __init__(
self,
*,
from_me: Union[bool, int],
timestamp: int,
time: Union[int, float, str],
key_id: int,
received_timestamp: int,
read_timestamp: int,
timezone_offset: int = 0,
message_type: Optional[int] = None
) -> None:
"""
Initialize Message object.
Args:
from_me (Union[bool, int]): Whether message was sent by the user
timestamp (int): Message timestamp
time (Union[int, float, str]): Message time
key_id (int): Message unique identifier
received_timestamp (int): When message was received
read_timestamp (int): When message was read
timezone_offset (int, optional): Hours offset from UTC. Defaults to 0
message_type (Optional[int], optional): Type of message. Defaults to None
Raises:
TypeError: If time is not a string or number
"""
self.from_me = bool(from_me)
self.timestamp = timestamp / 1000 if timestamp > 9999999999 else timestamp
if isinstance(time, int) or isinstance(time, float):
self.time = datetime.fromtimestamp(self.timestamp, TimeZone(timezone_offset)).strftime("%H:%M")
timing = Timing(timezone_offset)
if isinstance(time, (int, float)):
self.time = timing.format_timestamp(self.timestamp, "%H:%M")
elif isinstance(time, str):
self.time = time
else:
raise TypeError("Time must be a string or number")
self.media = False
self.key_id = key_id
self.meta = False
@@ -81,29 +281,33 @@ class Message():
self.sender = None
self.safe = False
self.mime = None
self.message_type = message_type
# Extra
self.message_type = message_type,
self.received_timestamp = timing.format_timestamp(received_timestamp, "%Y/%m/%d %H:%M")
self.read_timestamp = timing.format_timestamp(read_timestamp, "%Y/%m/%d %H:%M")
# Extra attributes
self.reply = None
self.quoted_data = None
self.caption = None
self.thumb = None # Android specific
self.thumb = None # Android specific
self.sticker = False
def to_json(self):
def to_json(self) -> Dict[str, Any]:
"""Convert message to JSON-serializable dict."""
return {
'from_me' : self.from_me,
'timestamp' : self.timestamp,
'time' : self.time,
'media' : self.media,
'key_id' : self.key_id,
'meta' : self.meta,
'data' : self.data,
'sender' : self.sender,
'safe' : self.safe,
'mime' : self.mime,
'reply' : self.reply,
'quoted_data' : self.quoted_data,
'caption' : self.caption,
'thumb' : self.thumb,
'sticker' : self.sticker
}
'from_me': self.from_me,
'timestamp': self.timestamp,
'time': self.time,
'media': self.media,
'key_id': self.key_id,
'meta': self.meta,
'data': self.data,
'sender': self.sender,
'safe': self.safe,
'mime': self.mime,
'reply': self.reply,
'quoted_data': self.quoted_data,
'caption': self.caption,
'thumb': self.thumb,
'sticker': self.sticker
}

View File

@@ -8,85 +8,174 @@ from Whatsapp_Chat_Exporter.utility import Device
def messages(path, data, assume_first_as_me=False):
"""Extracts messages from the exported file"""
"""
Extracts messages from an exported WhatsApp chat file.
Args:
path: Path to the exported chat file
data: Data container object to store the parsed chat
assume_first_as_me: If True, assumes the first message is sent from the user without asking
Returns:
Updated data container with extracted messages
"""
# Create a new chat in the data container
chat = data.add_chat("ExportedChat", ChatStore(Device.EXPORTED))
you = "" # Will store the username of the current user
user_identification_done = False # Flag to track if user identification has been done
# First pass: count total lines for progress reporting
with open(path, "r", encoding="utf8") as file:
total_row_number = sum(1 for _ in file)
# Second pass: process the messages
with open(path, "r", encoding="utf8") as file:
you = ""
data["ExportedChat"] = ChatStore(Device.EXPORTED)
chat = data["ExportedChat"]
total_row_number = len(file.readlines())
file.seek(0)
for index, line in enumerate(file):
if len(line.split(" - ")) > 1:
time = line.split(" - ")[0]
if ":" not in line.split(time)[1]:
msg.data = line.split(time)[1][3:]
msg.meta = True
else:
name = line.split(time)[1].split(":")[0]
message = line.split(time)[1].split(name + ":")[1].strip()
name = name[3:]
if you == "":
if chat.name is None:
if not assume_first_as_me:
while True:
ans = input(f"Is '{name}' you? (Y/N)").lower()
if ans == "y":
you = name
break
elif ans == "n":
chat.name = name
break
else:
you = name
else:
if name != chat.name:
you = name
elif chat.name is None:
if name != you:
chat.name = name
msg = Message(
you == name,
datetime.strptime(time, "%d/%m/%Y, %H:%M").timestamp(),
time.split(", ")[1].strip(),
index
)
if "<Media omitted>" in message:
msg.data = "The media is omitted in the chat"
msg.mime = "media"
msg.meta = True
elif "(file attached)" in message:
mime = MimeTypes()
msg.media = True
file_path = os.path.join(os.path.dirname(path), message.split("(file attached)")[0].strip())
if os.path.isfile(file_path):
msg.data = file_path
guess = mime.guess_type(file_path)[0]
if guess is not None:
msg.mime = guess
else:
msg.mime = "application/octet-stream"
else:
msg.data = "The media is missing"
msg.mime = "media"
msg.meta = True
else:
msg.data = message
if "\r\n" in message:
msg.data = message.replace("\r\n", "<br>")
if "\n" in message:
msg.data = message.replace("\n", "<br>")
chat.add_message(index, msg)
else:
lookback = index - 1
while lookback not in chat.messages:
lookback -= 1
msg = chat.messages[lookback]
if msg.media:
msg.caption = line.strip()
else:
msg.data += "<br>" + line.strip()
you, user_identification_done = process_line(
line, index, chat, path, you,
assume_first_as_me, user_identification_done
)
# Show progress
if index % 1000 == 0:
print(f"Processing messages & media...({index}/{total_row_number})", end="\r")
print(f"Processing messages & media...({total_row_number}/{total_row_number})", end="\r")
print(f"Processing messages & media...({total_row_number}/{total_row_number})")
return data
def process_line(line, index, chat, file_path, you, assume_first_as_me, user_identification_done):
"""
Process a single line from the chat file
Returns:
Tuple of (updated_you_value, updated_user_identification_done_flag)
"""
parts = line.split(" - ", 1)
# Check if this is a new message (has timestamp format)
if len(parts) > 1:
time = parts[0]
you, user_identification_done = process_new_message(
time, parts[1], index, chat, you, file_path,
assume_first_as_me, user_identification_done
)
else:
# This is a continuation of the previous message
process_message_continuation(line, index, chat)
return you, user_identification_done
def process_new_message(time, content, index, chat, you, file_path,
assume_first_as_me, user_identification_done):
"""
Process a line that contains a new message
Returns:
Tuple of (updated_you_value, updated_user_identification_done_flag)
"""
# Create a new message
msg = Message(
from_me=False, # Will be updated later if needed
timestamp=datetime.strptime(time, "%d/%m/%Y, %H:%M").timestamp(),
time=time.split(", ")[1].strip(),
key_id=index,
received_timestamp=None,
read_timestamp=None
)
# Check if this is a system message (no name:message format)
if ":" not in content:
msg.data = content
msg.meta = True
else:
# Process user message
name, message = content.strip().split(":", 1)
# Handle user identification
if you == "":
if chat.name is None:
# First sender identification
if not user_identification_done:
if not assume_first_as_me:
# Ask only once if this is the user
you = prompt_for_user_identification(name)
user_identification_done = True
else:
you = name
user_identification_done = True
else:
# If we know the chat name, anyone else must be "you"
if name != chat.name:
you = name
# Set the chat name if needed
if chat.name is None and name != you:
chat.name = name
# Determine if this message is from the current user
msg.from_me = (name == you)
# Process message content
process_message_content(msg, message, file_path)
chat.add_message(index, msg)
return you, user_identification_done
def process_message_content(msg, message, file_path):
"""Process and set the content of a message based on its type"""
if "<Media omitted>" in message:
msg.data = "The media is omitted in the chat"
msg.mime = "media"
msg.meta = True
elif "(file attached)" in message:
process_attached_file(msg, message, file_path)
else:
msg.data = message.replace("\r\n", "<br>").replace("\n", "<br>")
def process_attached_file(msg, message, file_path):
"""Process an attached file in a message"""
mime = MimeTypes()
msg.media = True
# Extract file path and check if it exists
file_name = message.split("(file attached)")[0].strip()
attached_file_path = os.path.join(os.path.dirname(file_path), file_name)
if os.path.isfile(attached_file_path):
msg.data = attached_file_path
guess = mime.guess_type(attached_file_path)[0]
msg.mime = guess if guess is not None else "application/octet-stream"
else:
msg.data = "The media is missing"
msg.mime = "media"
msg.meta = True
def process_message_continuation(line, index, chat):
"""Process a line that continues a previous message"""
# Find the previous message
lookback = index - 1
while lookback not in chat.keys():
lookback -= 1
msg = chat.get_message(lookback)
# Add the continuation line to the message
if msg.media:
msg.caption = line.strip()
else:
msg.data += "<br>" + line.strip()
def prompt_for_user_identification(name):
"""Ask the user if the given name is their username"""
while True:
ans = input(f"Is '{name}' you? (Y/N)").lower()
if ans == "y":
return name
elif ans == "n":
return ""

View File

@@ -12,432 +12,591 @@ from Whatsapp_Chat_Exporter.utility import bytes_to_readable, convert_time_unit,
def contacts(db, data):
"""Process WhatsApp contacts with status information."""
c = db.cursor()
# Get status only lol
c.execute("""SELECT count() FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""")
total_row_number = c.fetchone()[0]
print(f"Pre-processing contacts...({total_row_number})")
c.execute("""SELECT ZWHATSAPPID, ZABOUTTEXT FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""")
content = c.fetchone()
while content is not None:
if not content["ZWHATSAPPID"].endswith("@s.whatsapp.net"):
ZWHATSAPPID = content["ZWHATSAPPID"] + "@s.whatsapp.net"
data[ZWHATSAPPID] = ChatStore(Device.IOS)
data[ZWHATSAPPID].status = content["ZABOUTTEXT"]
zwhatsapp_id = content["ZWHATSAPPID"]
if not zwhatsapp_id.endswith("@s.whatsapp.net"):
zwhatsapp_id += "@s.whatsapp.net"
current_chat = ChatStore(Device.IOS)
current_chat.status = content["ZABOUTTEXT"]
data.add_chat(zwhatsapp_id, current_chat)
content = c.fetchone()
def process_contact_avatars(current_chat, media_folder, contact_id):
"""Process and assign avatar images for a contact."""
path = f'{media_folder}/Media/Profile/{contact_id.split("@")[0]}'
avatars = glob(f"{path}*")
if 0 < len(avatars) <= 1:
current_chat.their_avatar = avatars[0]
else:
for avatar in avatars:
if avatar.endswith(".thumb") and current_chat.their_avatar_thumb is None:
current_chat.their_avatar_thumb = avatar
elif avatar.endswith(".jpg") and current_chat.their_avatar is None:
current_chat.their_avatar = avatar
def get_contact_name(content):
"""Determine the appropriate contact name based on push name and partner name."""
is_phone = content["ZPARTNERNAME"].replace("+", "").replace(" ", "").isdigit()
if content["ZPUSHNAME"] is None or (content["ZPUSHNAME"] and not is_phone):
return content["ZPARTNERNAME"]
else:
return content["ZPUSHNAME"]
def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, filter_empty):
"""Process WhatsApp messages and contacts from the database."""
c = db.cursor()
cursor2 = db.cursor()
# Get contacts
c.execute(
f"""SELECT count()
FROM (SELECT DISTINCT ZCONTACTJID,
ZPARTNERNAME,
ZWAPROFILEPUSHNAME.ZPUSHNAME
FROM ZWACHATSESSION
INNER JOIN ZWAMESSAGE
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
LEFT JOIN ZWAPROFILEPUSHNAME
ON ZWACHATSESSION.ZCONTACTJID = ZWAPROFILEPUSHNAME.ZJID
LEFT JOIN ZWAGROUPMEMBER
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
WHERE 1=1
{get_chat_condition(filter_chat[0], True, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")}
{get_chat_condition(filter_chat[1], False, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")}
GROUP BY ZCONTACTJID);"""
)
# Build the chat filter conditions
chat_filter_include = get_chat_condition(filter_chat[0], True, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")
chat_filter_exclude = get_chat_condition(filter_chat[1], False, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")
date_filter = f'AND ZMESSAGEDATE {filter_date}' if filter_date is not None else ''
# Process contacts first
contact_query = f"""
SELECT count()
FROM (SELECT DISTINCT ZCONTACTJID,
ZPARTNERNAME,
ZWAPROFILEPUSHNAME.ZPUSHNAME
FROM ZWACHATSESSION
INNER JOIN ZWAMESSAGE
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
LEFT JOIN ZWAPROFILEPUSHNAME
ON ZWACHATSESSION.ZCONTACTJID = ZWAPROFILEPUSHNAME.ZJID
LEFT JOIN ZWAGROUPMEMBER
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
WHERE 1=1
{chat_filter_include}
{chat_filter_exclude}
GROUP BY ZCONTACTJID);
"""
c.execute(contact_query)
total_row_number = c.fetchone()[0]
print(f"Processing contacts...({total_row_number})")
c.execute(
f"""SELECT DISTINCT ZCONTACTJID,
ZPARTNERNAME,
ZWAPROFILEPUSHNAME.ZPUSHNAME
FROM ZWACHATSESSION
INNER JOIN ZWAMESSAGE
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
LEFT JOIN ZWAPROFILEPUSHNAME
ON ZWACHATSESSION.ZCONTACTJID = ZWAPROFILEPUSHNAME.ZJID
LEFT JOIN ZWAGROUPMEMBER
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
WHERE 1=1
{get_chat_condition(filter_chat[0], True, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")}
{get_chat_condition(filter_chat[1], False, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")}
GROUP BY ZCONTACTJID;"""
)
# Get distinct contacts
contacts_query = f"""
SELECT DISTINCT ZCONTACTJID,
ZPARTNERNAME,
ZWAPROFILEPUSHNAME.ZPUSHNAME
FROM ZWACHATSESSION
INNER JOIN ZWAMESSAGE
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
LEFT JOIN ZWAPROFILEPUSHNAME
ON ZWACHATSESSION.ZCONTACTJID = ZWAPROFILEPUSHNAME.ZJID
LEFT JOIN ZWAGROUPMEMBER
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
WHERE 1=1
{chat_filter_include}
{chat_filter_exclude}
GROUP BY ZCONTACTJID;
"""
c.execute(contacts_query)
# Process each contact
content = c.fetchone()
while content is not None:
is_phone = content["ZPARTNERNAME"].replace("+", "").replace(" ", "").isdigit()
if content["ZPUSHNAME"] is None or (content["ZPUSHNAME"] and not is_phone):
contact_name = content["ZPARTNERNAME"]
else:
contact_name = content["ZPUSHNAME"]
contact_name = get_contact_name(content)
contact_id = content["ZCONTACTJID"]
# Add or update chat
if contact_id not in data:
data[contact_id] = ChatStore(Device.IOS, contact_name, media_folder)
current_chat = data.add_chat(contact_id, ChatStore(Device.IOS, contact_name, media_folder))
else:
data[contact_id].name = contact_name
data[contact_id].my_avatar = os.path.join(media_folder, "Media/Profile/Photo.jpg")
path = f'{media_folder}/Media/Profile/{contact_id.split("@")[0]}'
avatars = glob(f"{path}*")
if 0 < len(avatars) <= 1:
data[contact_id].their_avatar = avatars[0]
else:
for avatar in avatars:
if avatar.endswith(".thumb") and data[content["ZCONTACTJID"]].their_avatar_thumb is None:
data[contact_id].their_avatar_thumb = avatar
elif avatar.endswith(".jpg") and data[content["ZCONTACTJID"]].their_avatar is None:
data[contact_id].their_avatar = avatar
current_chat = data.get_chat(contact_id)
current_chat.name = contact_name
current_chat.my_avatar = os.path.join(media_folder, "Media/Profile/Photo.jpg")
# Process avatar images
process_contact_avatars(current_chat, media_folder, contact_id)
content = c.fetchone()
# Get message history
c.execute(f"""SELECT count()
FROM ZWAMESSAGE
INNER JOIN ZWACHATSESSION
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
LEFT JOIN ZWAGROUPMEMBER
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
WHERE 1=1
{f'AND ZMESSAGEDATE {filter_date}' if filter_date is not None else ''}
{get_chat_condition(filter_chat[0], True, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")}
{get_chat_condition(filter_chat[1], False, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")}""")
# Get message count
message_count_query = f"""
SELECT count()
FROM ZWAMESSAGE
INNER JOIN ZWACHATSESSION
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
LEFT JOIN ZWAGROUPMEMBER
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
WHERE 1=1
{date_filter}
{chat_filter_include}
{chat_filter_exclude}
"""
c.execute(message_count_query)
total_row_number = c.fetchone()[0]
print(f"Processing messages...(0/{total_row_number})", end="\r")
c.execute(f"""SELECT ZCONTACTJID,
ZWAMESSAGE.Z_PK,
ZISFROMME,
ZMESSAGEDATE,
ZTEXT,
ZMESSAGETYPE,
ZWAGROUPMEMBER.ZMEMBERJID,
ZMETADATA,
ZSTANZAID,
ZGROUPINFO
FROM ZWAMESSAGE
LEFT JOIN ZWAGROUPMEMBER
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
LEFT JOIN ZWAMEDIAITEM
ON ZWAMESSAGE.Z_PK = ZWAMEDIAITEM.ZMESSAGE
INNER JOIN ZWACHATSESSION
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
WHERE 1=1
{f'AND ZMESSAGEDATE {filter_date}' if filter_date is not None else ''}
{get_chat_condition(filter_chat[0], True, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")}
{get_chat_condition(filter_chat[1], False, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")}
ORDER BY ZMESSAGEDATE ASC;""")
# Fetch messages
messages_query = f"""
SELECT ZCONTACTJID,
ZWAMESSAGE.Z_PK,
ZISFROMME,
ZMESSAGEDATE,
ZTEXT,
ZMESSAGETYPE,
ZWAGROUPMEMBER.ZMEMBERJID,
ZMETADATA,
ZSTANZAID,
ZGROUPINFO,
ZSENTDATE
FROM ZWAMESSAGE
LEFT JOIN ZWAGROUPMEMBER
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
LEFT JOIN ZWAMEDIAITEM
ON ZWAMESSAGE.Z_PK = ZWAMEDIAITEM.ZMESSAGE
INNER JOIN ZWACHATSESSION
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
WHERE 1=1
{date_filter}
{chat_filter_include}
{chat_filter_exclude}
ORDER BY ZMESSAGEDATE ASC;
"""
c.execute(messages_query)
# Process each message
i = 0
content = c.fetchone()
while content is not None:
ZCONTACTJID = content["ZCONTACTJID"]
Z_PK = content["Z_PK"]
contact_id = content["ZCONTACTJID"]
message_pk = content["Z_PK"]
is_group_message = content["ZGROUPINFO"] is not None
if ZCONTACTJID not in data:
data[ZCONTACTJID] = ChatStore(Device.IOS)
path = f'{media_folder}/Media/Profile/{ZCONTACTJID.split("@")[0]}'
avatars = glob(f"{path}*")
if 0 < len(avatars) <= 1:
data[ZCONTACTJID].their_avatar = avatars[0]
else:
for avatar in avatars:
if avatar.endswith(".thumb"):
data[ZCONTACTJID].their_avatar_thumb = avatar
elif avatar.endswith(".jpg"):
data[ZCONTACTJID].their_avatar = avatar
# Ensure chat exists
if contact_id not in data:
current_chat = data.add_chat(contact_id, ChatStore(Device.IOS))
process_contact_avatars(current_chat, media_folder, contact_id)
else:
current_chat = data.get_chat(contact_id)
# Create message object
ts = APPLE_TIME + content["ZMESSAGEDATE"]
message = Message(
from_me=content["ZISFROMME"],
timestamp=ts,
time=ts, # TODO: Could be bug
time=ts,
key_id=content["ZSTANZAID"][:17],
timezone_offset=timezone_offset if timezone_offset else CURRENT_TZ_OFFSET,
message_type=content["ZMESSAGETYPE"]
message_type=content["ZMESSAGETYPE"],
received_timestamp=APPLE_TIME + content["ZSENTDATE"] if content["ZSENTDATE"] else None,
read_timestamp=None # TODO: Add timestamp
)
invalid = False
if is_group_message and content["ZISFROMME"] == 0:
name = None
if content["ZMEMBERJID"] is not None:
if content["ZMEMBERJID"] in data:
name = data[content["ZMEMBERJID"]].name
if "@" in content["ZMEMBERJID"]:
fallback = content["ZMEMBERJID"].split('@')[0]
else:
fallback = None
else:
fallback = None
message.sender = name or fallback
else:
message.sender = None
if content["ZMESSAGETYPE"] == 6:
# Metadata
if is_group_message:
# Group
if content["ZTEXT"] is not None:
# Chnaged name
try:
int(content["ZTEXT"])
except ValueError:
msg = f"The group name changed to {content['ZTEXT']}"
message.data = msg
message.meta = True
else:
invalid = True
else:
message.data = None
else:
message.data = None
else:
# real message
if content["ZMETADATA"] is not None and content["ZMETADATA"].startswith(b"\x2a\x14"):
quoted = content["ZMETADATA"][2:19]
message.reply = quoted.decode()
cursor2.execute(f"""SELECT ZTEXT
FROM ZWAMESSAGE
WHERE ZSTANZAID LIKE '{message.reply}%'""")
quoted_content = cursor2.fetchone()
if quoted_content and "ZTEXT" in quoted_content:
message.quoted_data = quoted_content["ZTEXT"]
else:
message.quoted_data = None
if content["ZMESSAGETYPE"] == 15: # Sticker
message.sticker = True
if content["ZISFROMME"] == 1:
if content["ZMESSAGETYPE"] == 14:
msg = "Message deleted"
message.meta = True
else:
msg = content["ZTEXT"]
if msg is not None:
if "\r\n" in msg:
msg = msg.replace("\r\n", "<br>")
if "\n" in msg:
msg = msg.replace("\n", "<br>")
else:
if content["ZMESSAGETYPE"] == 14:
msg = "Message deleted"
message.meta = True
else:
msg = content["ZTEXT"]
if msg is not None:
if "\r\n" in msg:
msg = msg.replace("\r\n", "<br>")
if "\n" in msg:
msg = msg.replace("\n", "<br>")
message.data = msg
# Process message data
invalid = process_message_data(message, content, is_group_message, data, cursor2)
# Add valid messages to chat
if not invalid:
data[ZCONTACTJID].add_message(Z_PK, message)
current_chat.add_message(message_pk, message)
# Update progress
i += 1
if i % 1000 == 0:
print(f"Processing messages...({i}/{total_row_number})", end="\r")
content = c.fetchone()
print(
f"Processing messages...({total_row_number}/{total_row_number})", end="\r")
print(f"Processing messages...({total_row_number}/{total_row_number})", end="\r")
def process_message_data(message, content, is_group_message, data, cursor2):
"""Process and set message data from content row."""
# Handle group sender info
if is_group_message and content["ZISFROMME"] == 0:
name = None
if content["ZMEMBERJID"] is not None:
if content["ZMEMBERJID"] in data:
name = data.get_chat(content["ZMEMBERJID"]).name
if "@" in content["ZMEMBERJID"]:
fallback = content["ZMEMBERJID"].split('@')[0]
else:
fallback = None
else:
fallback = None
message.sender = name or fallback
else:
message.sender = None
# Handle metadata messages
if content["ZMESSAGETYPE"] == 6:
return process_metadata_message(message, content, is_group_message)
# Handle quoted replies
if content["ZMETADATA"] is not None and content["ZMETADATA"].startswith(b"\x2a\x14") and False:
quoted = content["ZMETADATA"][2:19]
message.reply = quoted.decode()
cursor2.execute(f"""SELECT ZTEXT
FROM ZWAMESSAGE
WHERE ZSTANZAID LIKE '{message.reply}%'""")
quoted_content = cursor2.fetchone()
if quoted_content and "ZTEXT" in quoted_content:
message.quoted_data = quoted_content["ZTEXT"]
else:
message.quoted_data = None
# Handle stickers
if content["ZMESSAGETYPE"] == 15:
message.sticker = True
# Process message text
process_message_text(message, content)
return False # Message is valid
def process_metadata_message(message, content, is_group_message):
"""Process metadata messages (action_type 6)."""
if is_group_message:
# Group
if content["ZTEXT"] is not None:
# Changed name
try:
int(content["ZTEXT"])
except ValueError:
msg = f"The group name changed to {content['ZTEXT']}"
message.data = msg
message.meta = True
return False # Valid message
else:
return True # Invalid message
else:
message.data = None
return False
else:
message.data = None
return False
def process_message_text(message, content):
"""Process and format message text content."""
if content["ZISFROMME"] == 1:
if content["ZMESSAGETYPE"] == 14:
msg = "Message deleted"
message.meta = True
else:
msg = content["ZTEXT"]
if msg is not None:
msg = msg.replace("\r\n", "<br>").replace("\n", "<br>")
else:
if content["ZMESSAGETYPE"] == 14:
msg = "Message deleted"
message.meta = True
else:
msg = content["ZTEXT"]
if msg is not None:
msg = msg.replace("\r\n", "<br>").replace("\n", "<br>")
message.data = msg
def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=False):
"""Process media files from WhatsApp messages."""
c = db.cursor()
# Get media
c.execute(f"""SELECT count()
FROM ZWAMEDIAITEM
INNER JOIN ZWAMESSAGE
ON ZWAMEDIAITEM.ZMESSAGE = ZWAMESSAGE.Z_PK
INNER JOIN ZWACHATSESSION
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
LEFT JOIN ZWAGROUPMEMBER
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
WHERE 1=1
{f'AND ZMESSAGEDATE {filter_date}' if filter_date is not None else ''}
{get_chat_condition(filter_chat[0], True, ["ZWACHATSESSION.ZCONTACTJID","ZMEMBERJID"], "ZGROUPINFO", "ios")}
{get_chat_condition(filter_chat[1], False, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")}
""")
# Build filter conditions
chat_filter_include = get_chat_condition(filter_chat[0], True, ["ZWACHATSESSION.ZCONTACTJID","ZMEMBERJID"], "ZGROUPINFO", "ios")
chat_filter_exclude = get_chat_condition(filter_chat[1], False, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")
date_filter = f'AND ZMESSAGEDATE {filter_date}' if filter_date is not None else ''
# Get media count
media_count_query = f"""
SELECT count()
FROM ZWAMEDIAITEM
INNER JOIN ZWAMESSAGE
ON ZWAMEDIAITEM.ZMESSAGE = ZWAMESSAGE.Z_PK
INNER JOIN ZWACHATSESSION
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
LEFT JOIN ZWAGROUPMEMBER
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
WHERE 1=1
{date_filter}
{chat_filter_include}
{chat_filter_exclude}
"""
c.execute(media_count_query)
total_row_number = c.fetchone()[0]
print(f"\nProcessing media...(0/{total_row_number})", end="\r")
i = 0
c.execute(f"""SELECT ZCONTACTJID,
ZMESSAGE,
ZMEDIALOCALPATH,
ZMEDIAURL,
ZVCARDSTRING,
ZMEDIAKEY,
ZTITLE
FROM ZWAMEDIAITEM
INNER JOIN ZWAMESSAGE
ON ZWAMEDIAITEM.ZMESSAGE = ZWAMESSAGE.Z_PK
INNER JOIN ZWACHATSESSION
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
LEFT JOIN ZWAGROUPMEMBER
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
WHERE ZMEDIALOCALPATH IS NOT NULL
{f'AND ZWAMESSAGE.ZMESSAGEDATE {filter_date}' if filter_date is not None else ''}
{get_chat_condition(filter_chat[0], True, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")}
{get_chat_condition(filter_chat[1], False, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")}
ORDER BY ZCONTACTJID ASC""")
content = c.fetchone()
# Fetch media items
media_query = f"""
SELECT ZCONTACTJID,
ZMESSAGE,
ZMEDIALOCALPATH,
ZMEDIAURL,
ZVCARDSTRING,
ZMEDIAKEY,
ZTITLE
FROM ZWAMEDIAITEM
INNER JOIN ZWAMESSAGE
ON ZWAMEDIAITEM.ZMESSAGE = ZWAMESSAGE.Z_PK
INNER JOIN ZWACHATSESSION
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
LEFT JOIN ZWAGROUPMEMBER
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
WHERE ZMEDIALOCALPATH IS NOT NULL
{date_filter}
{chat_filter_include}
{chat_filter_exclude}
ORDER BY ZCONTACTJID ASC
"""
c.execute(media_query)
# Process each media item
mime = MimeTypes()
i = 0
content = c.fetchone()
while content is not None:
file_path = f"{media_folder}/Message/{content['ZMEDIALOCALPATH']}"
ZMESSAGE = content["ZMESSAGE"]
contact = data[content["ZCONTACTJID"]]
message = contact.messages[ZMESSAGE]
message.media = True
if contact.media_base == "":
contact.media_base = media_folder + "/"
if os.path.isfile(file_path):
message.data = '/'.join(file_path.split("/")[1:])
if content["ZVCARDSTRING"] is None:
guess = mime.guess_type(file_path)[0]
if guess is not None:
message.mime = guess
else:
message.mime = "application/octet-stream"
else:
message.mime = content["ZVCARDSTRING"]
if separate_media:
chat_display_name = slugify(contact.name or message.sender \
or content["ZCONTACTJID"].split('@')[0], True)
current_filename = file_path.split("/")[-1]
new_folder = os.path.join(media_folder, "separated", chat_display_name)
Path(new_folder).mkdir(parents=True, exist_ok=True)
new_path = os.path.join(new_folder, current_filename)
shutil.copy2(file_path, new_path)
message.data = '/'.join(new_path.split("\\")[1:])
else:
message.data = "The media is missing"
message.mime = "media"
message.meta = True
if content["ZTITLE"] is not None:
message.caption = content["ZTITLE"]
process_media_item(content, data, media_folder, mime, separate_media)
# Update progress
i += 1
if i % 100 == 0:
print(f"Processing media...({i}/{total_row_number})", end="\r")
content = c.fetchone()
print(
f"Processing media...({total_row_number}/{total_row_number})", end="\r")
print(f"Processing media...({total_row_number}/{total_row_number})", end="\r")
def process_media_item(content, data, media_folder, mime, separate_media):
"""Process a single media item."""
file_path = f"{media_folder}/Message/{content['ZMEDIALOCALPATH']}"
current_chat = data.get_chat(content["ZCONTACTJID"])
message = current_chat.get_message(content["ZMESSAGE"])
message.media = True
if current_chat.media_base == "":
current_chat.media_base = media_folder + "/"
if os.path.isfile(file_path):
message.data = '/'.join(file_path.split("/")[1:])
# Set MIME type
if content["ZVCARDSTRING"] is None:
guess = mime.guess_type(file_path)[0]
message.mime = guess if guess is not None else "application/octet-stream"
else:
message.mime = content["ZVCARDSTRING"]
# Handle separate media option
if separate_media:
chat_display_name = slugify(current_chat.name or message.sender or content["ZCONTACTJID"].split('@')[0], True)
current_filename = file_path.split("/")[-1]
new_folder = os.path.join(media_folder, "separated", chat_display_name)
Path(new_folder).mkdir(parents=True, exist_ok=True)
new_path = os.path.join(new_folder, current_filename)
shutil.copy2(file_path, new_path)
message.data = '/'.join(new_path.split("\\")[1:])
else:
# Handle missing media
message.data = "The media is missing"
message.mime = "media"
message.meta = True
# Add caption if available
if content["ZTITLE"] is not None:
message.caption = content["ZTITLE"]
def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty):
"""Process vCard contacts from WhatsApp messages."""
c = db.cursor()
c.execute(f"""SELECT DISTINCT ZWAVCARDMENTION.ZMEDIAITEM,
ZWAMEDIAITEM.ZMESSAGE,
ZCONTACTJID,
ZVCARDNAME,
ZVCARDSTRING
FROM ZWAVCARDMENTION
INNER JOIN ZWAMEDIAITEM
ON ZWAVCARDMENTION.ZMEDIAITEM = ZWAMEDIAITEM.Z_PK
INNER JOIN ZWAMESSAGE
ON ZWAMEDIAITEM.ZMESSAGE = ZWAMESSAGE.Z_PK
INNER JOIN ZWACHATSESSION
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
LEFT JOIN ZWAGROUPMEMBER
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
WHERE 1=1
{f'AND ZWAMESSAGE.ZMESSAGEDATE {filter_date}' if filter_date is not None else ''}
{get_chat_condition(filter_chat[0], True, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")}
{get_chat_condition(filter_chat[1], False, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")};""")
# Build filter conditions
chat_filter_include = get_chat_condition(filter_chat[0], True, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")
chat_filter_exclude = get_chat_condition(filter_chat[1], False, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")
date_filter = f'AND ZWAMESSAGE.ZMESSAGEDATE {filter_date}' if filter_date is not None else ''
# Fetch vCard mentions
vcard_query = f"""
SELECT DISTINCT ZWAVCARDMENTION.ZMEDIAITEM,
ZWAMEDIAITEM.ZMESSAGE,
ZCONTACTJID,
ZVCARDNAME,
ZVCARDSTRING
FROM ZWAVCARDMENTION
INNER JOIN ZWAMEDIAITEM
ON ZWAVCARDMENTION.ZMEDIAITEM = ZWAMEDIAITEM.Z_PK
INNER JOIN ZWAMESSAGE
ON ZWAMEDIAITEM.ZMESSAGE = ZWAMESSAGE.Z_PK
INNER JOIN ZWACHATSESSION
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
LEFT JOIN ZWAGROUPMEMBER
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
WHERE 1=1
{date_filter}
{chat_filter_include}
{chat_filter_exclude}
"""
c.execute(vcard_query)
contents = c.fetchall()
total_row_number = len(contents)
print(f"\nProcessing vCards...(0/{total_row_number})", end="\r")
# Create vCards directory
path = f'{media_folder}/Message/vCards'
Path(path).mkdir(parents=True, exist_ok=True)
# Process each vCard
for index, content in enumerate(contents):
file_paths = []
vcard_names = content["ZVCARDNAME"].split("_$!<Name-Separator>!$_")
vcard_strings = content["ZVCARDSTRING"].split("_$!<VCard-Separator>!$_")
# If this is a list of contacts
if len(vcard_names) > len(vcard_strings):
vcard_names.pop(0) # Dismiss the first element, which is the group name
for name, vcard_string in zip(vcard_names, vcard_strings):
file_name = "".join(x for x in name if x.isalnum())
file_name = file_name.encode('utf-8')[:230].decode('utf-8', 'ignore')
file_path = os.path.join(path, f"{file_name}.vcf")
file_paths.append(file_path)
if not os.path.isfile(file_path):
with open(file_path, "w", encoding="utf-8") as f:
f.write(vcard_string)
vcard_summary = "This media include the following vCard file(s):<br>"
vcard_summary += " | ".join([f'<a href="{htmle(fp)}">{htmle(name)}</a>' for name, fp in zip(vcard_names, file_paths)])
message = data[content["ZCONTACTJID"]].messages[content["ZMESSAGE"]]
message.data = vcard_summary
message.mime = "text/x-vcard"
message.media = True
message.meta = True
message.safe = True
process_vcard_item(content, path, data)
print(f"Processing vCards...({index + 1}/{total_row_number})", end="\r")
def process_vcard_item(content, path, data):
"""Process a single vCard item."""
file_paths = []
vcard_names = content["ZVCARDNAME"].split("_$!<Name-Separator>!$_")
vcard_strings = content["ZVCARDSTRING"].split("_$!<VCard-Separator>!$_")
# If this is a list of contacts
if len(vcard_names) > len(vcard_strings):
vcard_names.pop(0) # Dismiss the first element, which is the group name
# Save each vCard file
for name, vcard_string in zip(vcard_names, vcard_strings):
file_name = "".join(x for x in name if x.isalnum())
file_name = file_name.encode('utf-8')[:230].decode('utf-8', 'ignore')
file_path = os.path.join(path, f"{file_name}.vcf")
file_paths.append(file_path)
if not os.path.isfile(file_path):
with open(file_path, "w", encoding="utf-8") as f:
f.write(vcard_string)
# Create vCard summary and update message
vcard_summary = "This media include the following vCard file(s):<br>"
vcard_summary += " | ".join([f'<a href="{htmle(fp)}">{htmle(name)}</a>' for name, fp in zip(vcard_names, file_paths)])
message = data.get_chat(content["ZCONTACTJID"]).get_message(content["ZMESSAGE"])
message.data = vcard_summary
message.mime = "text/x-vcard"
message.media = True
message.meta = True
message.safe = True
def calls(db, data, timezone_offset, filter_chat):
"""Process WhatsApp call records."""
c = db.cursor()
c.execute(f"""SELECT count()
FROM ZWACDCALLEVENT
WHERE 1=1
{get_chat_condition(filter_chat[0], True, ["ZGROUPCALLCREATORUSERJIDSTRING"], None, "ios")}
{get_chat_condition(filter_chat[1], False, ["ZGROUPCALLCREATORUSERJIDSTRING"], None, "ios")}""")
# Build filter conditions
chat_filter_include = get_chat_condition(filter_chat[0], True, ["ZGROUPCALLCREATORUSERJIDSTRING"], None, "ios")
chat_filter_exclude = get_chat_condition(filter_chat[1], False, ["ZGROUPCALLCREATORUSERJIDSTRING"], None, "ios")
# Get call count
call_count_query = f"""
SELECT count()
FROM ZWACDCALLEVENT
WHERE 1=1
{chat_filter_include}
{chat_filter_exclude}
"""
c.execute(call_count_query)
total_row_number = c.fetchone()[0]
if total_row_number == 0:
return
print(f"\nProcessing calls...({total_row_number})", end="\r")
c.execute(f"""SELECT ZCALLIDSTRING,
ZGROUPCALLCREATORUSERJIDSTRING,
ZGROUPJIDSTRING,
ZDATE,
ZOUTCOME,
ZBYTESRECEIVED + ZBYTESSENT AS bytes_transferred,
ZDURATION,
ZVIDEO,
ZMISSED,
ZINCOMING
FROM ZWACDCALLEVENT
INNER JOIN ZWAAGGREGATECALLEVENT
ON ZWACDCALLEVENT.Z1CALLEVENTS = ZWAAGGREGATECALLEVENT.Z_PK
WHERE 1=1
{get_chat_condition(filter_chat[0], True, ["ZGROUPCALLCREATORUSERJIDSTRING"], None, "ios")}
{get_chat_condition(filter_chat[1], False, ["ZGROUPCALLCREATORUSERJIDSTRING"], None, "ios")}""")
# Fetch call records
calls_query = f"""
SELECT ZCALLIDSTRING,
ZGROUPCALLCREATORUSERJIDSTRING,
ZGROUPJIDSTRING,
ZDATE,
ZOUTCOME,
ZBYTESRECEIVED + ZBYTESSENT AS bytes_transferred,
ZDURATION,
ZVIDEO,
ZMISSED,
ZINCOMING
FROM ZWACDCALLEVENT
INNER JOIN ZWAAGGREGATECALLEVENT
ON ZWACDCALLEVENT.Z1CALLEVENTS = ZWAAGGREGATECALLEVENT.Z_PK
WHERE 1=1
{chat_filter_include}
{chat_filter_exclude}
"""
c.execute(calls_query)
# Create calls chat
chat = ChatStore(Device.ANDROID, "WhatsApp Calls")
# Process each call
content = c.fetchone()
while content is not None:
ts = APPLE_TIME + int(content["ZDATE"])
call = Message(
from_me=content["ZINCOMING"] == 0,
timestamp=ts,
time=ts,
key_id=content["ZCALLIDSTRING"],
timezone_offset=timezone_offset if timezone_offset else CURRENT_TZ_OFFSET
)
_jid = content["ZGROUPCALLCREATORUSERJIDSTRING"]
name = data[_jid].name if _jid in data else None
if _jid is not None and "@" in _jid:
fallback = _jid.split('@')[0]
else:
fallback = None
call.sender = name or fallback
call.meta = True
call.data = (
f"A {'group ' if content['ZGROUPJIDSTRING'] is not None else ''}"
f"{'video' if content['ZVIDEO'] == 1 else 'voice'} "
f"call {'to' if call.from_me else 'from'} "
f"{call.sender} was "
)
if content['ZOUTCOME'] in (1, 4):
call.data += "not answered." if call.from_me else "missed."
elif content['ZOUTCOME'] == 2:
call.data += "failed."
elif content['ZOUTCOME'] == 0:
call_time = convert_time_unit(int(content['ZDURATION']))
call_bytes = bytes_to_readable(content['bytes_transferred'])
call.data += (
f"initiated and lasted for {call_time} "
f"with {call_bytes} data transferred."
)
else:
call.data += "in an unknown state."
chat.add_message(call.key_id, call)
process_call_record(content, chat, data, timezone_offset)
content = c.fetchone()
data["000000000000000"] = chat
# Add calls chat to data
data.add_chat("000000000000000", chat)
def process_call_record(content, chat, data, timezone_offset):
"""Process a single call record."""
ts = APPLE_TIME + int(content["ZDATE"])
call = Message(
from_me=content["ZINCOMING"] == 0,
timestamp=ts,
time=ts,
key_id=content["ZCALLIDSTRING"],
timezone_offset=timezone_offset if timezone_offset else CURRENT_TZ_OFFSET
)
# Set sender info
_jid = content["ZGROUPCALLCREATORUSERJIDSTRING"]
name = data.get_chat(_jid).name if _jid in data else None
if _jid is not None and "@" in _jid:
fallback = _jid.split('@')[0]
else:
fallback = None
call.sender = name or fallback
# Set call metadata
call.meta = True
call.data = format_call_data(call, content)
# Add call to chat
chat.add_message(call.key_id, call)
def format_call_data(call, content):
"""Format call data message based on call attributes."""
# Basic call info
call_data = (
f"A {'group ' if content['ZGROUPJIDSTRING'] is not None else ''}"
f"{'video' if content['ZVIDEO'] == 1 else 'voice'} "
f"call {'to' if call.from_me else 'from'} "
f"{call.sender} was "
)
# Call outcome
if content['ZOUTCOME'] in (1, 4):
call_data += "not answered." if call.from_me else "missed."
elif content['ZOUTCOME'] == 2:
call_data += "failed."
elif content['ZOUTCOME'] == 0:
call_time = convert_time_unit(int(content['ZDURATION']))
call_bytes = bytes_to_readable(content['bytes_transferred'])
call_data += (
f"initiated and lasted for {call_time} "
f"with {call_bytes} data transferred."
)
else:
call_data += "in an unknown state."
return call_data

View File

@@ -4,6 +4,7 @@ import shutil
import sqlite3
import os
import getpass
from sys import exit
from Whatsapp_Chat_Exporter.utility import WhatsAppIdentifier
from Whatsapp_Chat_Exporter.bplist import BPListReader
try:
@@ -14,143 +15,218 @@ else:
support_encrypted = True
def extract_encrypted(base_dir, password, identifiers, decrypt_chunk_size):
print("Trying to decrypt the iOS backup...", end="")
backup = EncryptedBackup(
backup_directory=base_dir,
passphrase=password,
cleanup=False,
check_same_thread=False,
decrypt_chunk_size=decrypt_chunk_size
)
print("Done\nDecrypting WhatsApp database...", end="")
try:
backup.extract_file(
relative_path=RelativePath.WHATSAPP_MESSAGES,
domain_like=identifiers.DOMAIN,
output_filename=identifiers.MESSAGE
)
backup.extract_file(
relative_path=RelativePath.WHATSAPP_CONTACTS,
domain_like=identifiers.DOMAIN,
output_filename=identifiers.CONTACT
)
backup.extract_file(
relative_path=RelativePath.WHATSAPP_CALLS,
domain_like=identifiers.DOMAIN,
output_filename=identifiers.CALL
)
except ValueError:
print("Failed to decrypt backup: incorrect password?")
exit(7)
except FileNotFoundError:
print("Essential WhatsApp files are missing from the iOS backup.")
exit(6)
else:
print("Done")
class BackupExtractor:
"""
A class to handle the extraction of WhatsApp data from iOS backups,
including encrypted and unencrypted backups.
"""
def extract_progress_handler(file_id, domain, relative_path, n, total_files):
if n % 100 == 0:
print(f"Decrypting and extracting files...({n}/{total_files})", end="\r")
return True
def __init__(self, base_dir, identifiers, decrypt_chunk_size):
self.base_dir = base_dir
self.identifiers = identifiers
self.decrypt_chunk_size = decrypt_chunk_size
backup.extract_files(
domain_like=identifiers.DOMAIN,
output_folder=identifiers.DOMAIN,
preserve_folders=True,
filter_callback=extract_progress_handler
)
print(f"All required files are decrypted and extracted. ", end="\n")
return backup
def is_encrypted(base_dir):
with sqlite3.connect(os.path.join(base_dir, "Manifest.db")) as f:
c = f.cursor()
try:
c.execute("""SELECT count()
FROM Files
""")
except sqlite3.OperationalError as e:
raise e # These error cannot be used to determine if the backup is encrypted
except sqlite3.DatabaseError:
return True
def extract(self):
"""
Extracts WhatsApp data from the backup based on whether it's encrypted or not.
"""
if self._is_encrypted():
self._extract_encrypted_backup()
else:
return False
self._extract_unencrypted_backup()
def _is_encrypted(self):
"""
Checks if the iOS backup is encrypted.
def extract_media(base_dir, identifiers, decrypt_chunk_size):
if is_encrypted(base_dir):
Returns:
bool: True if encrypted, False otherwise.
"""
with sqlite3.connect(os.path.join(self.base_dir, "Manifest.db")) as db:
c = db.cursor()
try:
c.execute("SELECT count() FROM Files")
c.fetchone() # Execute and fetch to trigger potential errors
except (sqlite3.OperationalError, sqlite3.DatabaseError):
return True
else:
return False
def _extract_encrypted_backup(self):
"""
Handles the extraction of data from an encrypted iOS backup.
"""
if not support_encrypted:
print("You don't have the dependencies to handle encrypted backup.")
print("Read more on how to deal with encrypted backup:")
print("https://github.com/KnugiHK/Whatsapp-Chat-Exporter/blob/main/README.md#usage")
return False
return
print("Encryption detected on the backup!")
password = getpass.getpass("Enter the password for the backup:")
extract_encrypted(base_dir, password, identifiers, decrypt_chunk_size)
else:
wts_db = os.path.join(base_dir, identifiers.MESSAGE[:2], identifiers.MESSAGE)
contact_db = os.path.join(base_dir, identifiers.CONTACT[:2], identifiers.CONTACT)
call_db = os.path.join(base_dir, identifiers.CALL[:2], identifiers.CALL)
if not os.path.isfile(wts_db):
if identifiers is WhatsAppIdentifier:
self._decrypt_backup(password)
self._extract_decrypted_files()
def _decrypt_backup(self, password):
"""
Decrypts the iOS backup using the provided password.
Args:
password (str): The password for the encrypted backup.
"""
print("Trying to decrypt the iOS backup...", end="")
self.backup = EncryptedBackup(
backup_directory=self.base_dir,
passphrase=password,
cleanup=False,
check_same_thread=False,
decrypt_chunk_size=self.decrypt_chunk_size,
)
print("Done\nDecrypting WhatsApp database...", end="")
try:
self.backup.extract_file(
relative_path=RelativePath.WHATSAPP_MESSAGES,
domain_like=self.identifiers.DOMAIN,
output_filename=self.identifiers.MESSAGE,
)
self.backup.extract_file(
relative_path=RelativePath.WHATSAPP_CONTACTS,
domain_like=self.identifiers.DOMAIN,
output_filename=self.identifiers.CONTACT,
)
self.backup.extract_file(
relative_path=RelativePath.WHATSAPP_CALLS,
domain_like=self.identifiers.DOMAIN,
output_filename=self.identifiers.CALL,
)
except ValueError:
print("Failed to decrypt backup: incorrect password?")
exit(7)
except FileNotFoundError:
print(
"Essential WhatsApp files are missing from the iOS backup. "
"Perhapse you enabled end-to-end encryption for the backup? "
"See https://wts.knugi.dev/docs.html?dest=iose2e"
)
exit(6)
else:
print("Done")
def _extract_decrypted_files(self):
"""Extract all WhatsApp files after decryption"""
def extract_progress_handler(file_id, domain, relative_path, n, total_files):
if n % 100 == 0:
print(f"Decrypting and extracting files...({n}/{total_files})", end="\r")
return True
self.backup.extract_files(
domain_like=self.identifiers.DOMAIN,
output_folder=self.identifiers.DOMAIN,
preserve_folders=True,
filter_callback=extract_progress_handler
)
print(f"All required files are decrypted and extracted. ", end="\n")
def _extract_unencrypted_backup(self):
"""
Handles the extraction of data from an unencrypted iOS backup.
"""
self._copy_whatsapp_databases()
self._extract_media_files()
def _copy_whatsapp_databases(self):
"""
Copies the WhatsApp message, contact, and call databases to the working directory.
"""
wts_db_path = os.path.join(self.base_dir, self.identifiers.MESSAGE[:2], self.identifiers.MESSAGE)
contact_db_path = os.path.join(self.base_dir, self.identifiers.CONTACT[:2], self.identifiers.CONTACT)
call_db_path = os.path.join(self.base_dir, self.identifiers.CALL[:2], self.identifiers.CALL)
if not os.path.isfile(wts_db_path):
if self.identifiers is WhatsAppIdentifier:
print("WhatsApp database not found.")
else:
print("WhatsApp Business database not found.")
exit()
print(
"Essential WhatsApp files are missing from the iOS backup. "
"Perhapse you enabled end-to-end encryption for the backup? "
"See https://wts.knugi.dev/docs.html?dest=iose2e"
)
exit(1)
else:
shutil.copyfile(wts_db, identifiers.MESSAGE)
if not os.path.isfile(contact_db):
shutil.copyfile(wts_db_path, self.identifiers.MESSAGE)
if not os.path.isfile(contact_db_path):
print("Contact database not found. Skipping...")
else:
shutil.copyfile(contact_db, identifiers.CONTACT)
if not os.path.isfile(call_db):
shutil.copyfile(contact_db_path, self.identifiers.CONTACT)
if not os.path.isfile(call_db_path):
print("Call database not found. Skipping...")
else:
shutil.copyfile(call_db, identifiers.CALL)
_wts_id = identifiers.DOMAIN
with sqlite3.connect(os.path.join(base_dir, "Manifest.db")) as manifest:
shutil.copyfile(call_db_path, self.identifiers.CALL)
def _extract_media_files(self):
"""
Extracts media files from the unencrypted backup.
"""
_wts_id = self.identifiers.DOMAIN
with sqlite3.connect(os.path.join(self.base_dir, "Manifest.db")) as manifest:
manifest.row_factory = sqlite3.Row
c = manifest.cursor()
c.execute(
f"""SELECT count()
FROM Files
WHERE domain = '{_wts_id}'"""
)
c.execute(f"SELECT count() FROM Files WHERE domain = '{_wts_id}'")
total_row_number = c.fetchone()[0]
print(f"Extracting WhatsApp files...(0/{total_row_number})", end="\r")
c.execute(f"""SELECT fileID,
relativePath,
flags,
file AS metadata,
ROW_NUMBER() OVER(ORDER BY relativePath) AS _index
FROM Files
WHERE domain = '{_wts_id}'
ORDER BY relativePath""")
c.execute(
f"""
SELECT fileID, relativePath, flags, file AS metadata,
ROW_NUMBER() OVER(ORDER BY relativePath) AS _index
FROM Files
WHERE domain = '{_wts_id}'
ORDER BY relativePath
"""
)
if not os.path.isdir(_wts_id):
os.mkdir(_wts_id)
row = c.fetchone()
while row is not None:
if row["relativePath"] == "":
if not row["relativePath"]: # Skip empty relative paths
row = c.fetchone()
continue
destination = os.path.join(_wts_id, row["relativePath"])
hashes = row["fileID"]
folder = hashes[:2]
flags = row["flags"]
if flags == 2:
if flags == 2: # Directory
try:
os.mkdir(destination)
except FileExistsError:
pass
elif flags == 1:
shutil.copyfile(os.path.join(base_dir, folder, hashes), destination)
elif flags == 1: # File
shutil.copyfile(os.path.join(self.base_dir, folder, hashes), destination)
metadata = BPListReader(row["metadata"]).parse()
creation = metadata["$objects"][1]["Birth"]
modification = metadata["$objects"][1]["LastModified"]
os.utime(destination, (modification, modification))
if row["_index"] % 100 == 0:
print(f"Extracting WhatsApp files...({row['_index']}/{total_row_number})", end="\r")
row = c.fetchone()
print(f"Extracting WhatsApp files...({total_row_number}/{total_row_number})", end="\n")
def extract_media(base_dir, identifiers, decrypt_chunk_size):
"""
Extracts WhatsApp data (media, messages, contacts, calls) from an iOS backup.
Args:
base_dir (str): The path to the iOS backup directory.
identifiers (WhatsAppIdentifier): An object containing WhatsApp file identifiers.
decrypt_chunk_size (int): The chunk size for decryption.
"""
extractor = BackupExtractor(base_dir, identifiers, decrypt_chunk_size)
extractor.extract()

View File

@@ -1,3 +1,4 @@
import sqlite3
import jinja2
import json
import os
@@ -9,6 +10,7 @@ from markupsafe import Markup
from datetime import datetime, timedelta
from enum import IntEnum
from Whatsapp_Chat_Exporter.data_model import ChatStore
from typing import Dict, List, Optional, Tuple
try:
from enum import StrEnum, IntEnum
except ImportError:
@@ -26,7 +28,15 @@ ROW_SIZE = 0x3D0
CURRENT_TZ_OFFSET = datetime.now().astimezone().utcoffset().seconds / 3600
def convert_time_unit(time_second: int):
def convert_time_unit(time_second: int) -> str:
"""Converts a time duration in seconds to a human-readable string.
Args:
time_second: The time duration in seconds.
Returns:
str: A human-readable string representing the time duration.
"""
time = str(timedelta(seconds=time_second))
if "day" not in time:
if time_second < 1:
@@ -46,11 +56,19 @@ def convert_time_unit(time_second: int):
return time
def bytes_to_readable(size_bytes: int):
"""From https://stackoverflow.com/a/14822210/9478891
def bytes_to_readable(size_bytes: int) -> str:
"""Converts a file size in bytes to a human-readable string with units.
From https://stackoverflow.com/a/14822210/9478891
Authors: james-sapam & other contributors
Licensed under CC BY-SA 3.0
See git commit logs for changes, if any.
Args:
size_bytes: The file size in bytes.
Returns:
A human-readable string representing the file size.
"""
if size_bytes == 0:
return "0B"
@@ -61,7 +79,18 @@ def bytes_to_readable(size_bytes: int):
return "%s %s" % (s, size_name[i])
def readable_to_bytes(size_str: str):
def readable_to_bytes(size_str: str) -> int:
"""Converts a human-readable file size string to bytes.
Args:
size_str: The human-readable file size string (e.g., "1024KB", "1MB", "2GB").
Returns:
The file size in bytes.
Raises:
ValueError: If the input string is invalid.
"""
SIZE_UNITS = {
'B': 1,
'KB': 1024,
@@ -80,11 +109,28 @@ def readable_to_bytes(size_str: str):
return int(number) * SIZE_UNITS[unit]
def sanitize_except(html):
def sanitize_except(html: str) -> Markup:
"""Sanitizes HTML, only allowing <br> tag.
Args:
html: The HTML string to sanitize.
Returns:
A Markup object containing the sanitized HTML.
"""
return Markup(sanitize(html, tags=["br"]))
def determine_day(last, current):
def determine_day(last: int, current: int) -> Optional[datetime.date]:
"""Determines if the day has changed between two timestamps. Exposed to Jinja's environment.
Args:
last: The timestamp of the previous message.
current: The timestamp of the current message.
Returns:
The date of the current message if it's a different day than the last message, otherwise None.
"""
last = datetime.fromtimestamp(last).date()
current = datetime.fromtimestamp(current).date()
if last == current:
@@ -96,12 +142,12 @@ def determine_day(last, current):
def check_update():
import urllib.request
import json
import importlib
from sys import platform
from .__init__ import __version__
package_url_json = "https://pypi.org/pypi/whatsapp-chat-exporter/json"
PACKAGE_JSON = "https://pypi.org/pypi/whatsapp-chat-exporter/json"
try:
raw = urllib.request.urlopen(package_url_json)
raw = urllib.request.urlopen(PACKAGE_JSON)
except Exception:
print("Failed to check for updates.")
return 1
@@ -109,6 +155,7 @@ def check_update():
with raw:
package_info = json.load(raw)
latest_version = tuple(map(int, package_info["info"]["version"].split(".")))
__version__ = importlib.metadata.version("whatsapp_chat_exporter")
current_version = tuple(map(int, __version__.split(".")))
if current_version < latest_version:
print("===============Update===============")
@@ -168,7 +215,13 @@ class Device(StrEnum):
EXPORTED = "exported"
def import_from_json(json_file, data):
def import_from_json(json_file: str, data: Dict[str, ChatStore]):
"""Imports chat data from a JSON file into the data dictionary.
Args:
json_file: The path to the JSON file.
data: The dictionary to store the imported chat data.
"""
from Whatsapp_Chat_Exporter.data_model import ChatStore, Message
with open(json_file, "r") as f:
temp_data = json.loads(f.read())
@@ -182,10 +235,12 @@ def import_from_json(json_file, data):
chat.status = chat_data.get("status")
for id, msg in chat_data.get("messages").items():
message = Message(
msg["from_me"],
msg["timestamp"],
msg["time"],
msg["key_id"],
from_me=msg["from_me"],
timestamp=msg["timestamp"],
time=msg["time"],
key_id=msg["key_id"],
received_timestamp=msg.get("received_timestamp"),
read_timestamp=msg.get("read_timestamp")
)
message.media = msg.get("media")
message.meta = msg.get("meta")
@@ -203,11 +258,31 @@ def import_from_json(json_file, data):
print(f"Importing chats from JSON...({index + 1}/{total_row_number})", end="\r")
def sanitize_filename(file_name: str):
def sanitize_filename(file_name: str) -> str:
"""Sanitizes a filename by removing invalid and unsafe characters.
Args:
file_name: The filename to sanitize.
Returns:
The sanitized filename.
"""
return "".join(x for x in file_name if x.isalnum() or x in "- ")
def get_file_name(contact: str, chat: ChatStore):
def get_file_name(contact: str, chat: ChatStore) -> Tuple[str, str]:
"""Generates a sanitized filename and contact name for a chat.
Args:
contact: The contact identifier (e.g., a phone number or group ID).
chat: The ChatStore object for the chat.
Returns:
A tuple containing the sanitized filename and the contact name.
Raises:
ValueError: If the contact format is unexpected.
"""
if "@" not in contact and contact not in ("000000000000000", "000000000000001", "ExportedChat"):
raise ValueError("Unexpected contact format: " + contact)
phone_number = contact.split('@')[0]
@@ -227,11 +302,36 @@ def get_file_name(contact: str, chat: ChatStore):
return sanitize_filename(file_name), name
def get_cond_for_empty(enable, jid_field: str, broadcast_field: str):
def get_cond_for_empty(enable: bool, jid_field: str, broadcast_field: str) -> str:
"""Generates a SQL condition for filtering empty chats.
Args:
enable: True to include non-empty chats, False to include empty chats.
jid_field: The name of the JID field in the SQL query.
broadcast_field: The column name of the broadcast field in the SQL query.
Returns:
A SQL condition string.
"""
return f"AND (chat.hidden=0 OR {jid_field}='status@broadcast' OR {broadcast_field}>0)" if enable else ""
def get_chat_condition(filter, include, columns, jid=None, platform=None):
def get_chat_condition(filter: Optional[List[str]], include: bool, columns: List[str], jid: Optional[str] = None, platform: Optional[str] = None) -> str:
"""Generates a SQL condition for filtering chats based on inclusion or exclusion criteria.
Args:
filter: A list of phone numbers to include or exclude.
include: True to include chats that match the filter, False to exclude them.
columns: A list of column names to check against the filter.
jid: The JID column name (used for group identification).
platform: The platform ("android" or "ios") for platform-specific JID queries.
Returns:
A SQL condition string.
Raises:
ValueError: If the column count is invalid or an unsupported platform is provided.
"""
if filter is not None:
conditions = []
if len(columns) < 2 and jid is not None:
@@ -279,13 +379,16 @@ class DbType(StrEnum):
CONTACT = "contact"
def brute_force_offset(max_iv=200, max_db=200):
for iv in range(0, max_iv):
for db in range(0, max_db):
yield iv, iv + 16, db
def determine_metadata(content: sqlite3.Row, init_msg: Optional[str]) -> Optional[str]:
"""Determines the metadata of a message.
Args:
content (sqlite3.Row): A row from the messages table.
init_msg (Optional[str]): The initial message, if any.
def determine_metadata(content, init_msg):
Returns:
The metadata as a string or None if the type is unsupported.
"""
msg = init_msg if init_msg else ""
if content["is_me_joined"] == 1: # Override
return f"You were added into the group by {msg}"
@@ -333,7 +436,7 @@ def determine_metadata(content, init_msg):
msg = "Someone joined this group by using a invite link" # TODO: Find out who
elif content["action_type"] == 27:
msg += " changed the group description to:<br>"
msg += content['data'].replace("\n", '<br>')
msg += (content['data'] or "Unknown").replace("\n", '<br>')
elif content["action_type"] == 28:
try:
old = content['old_jid'].split('@')[0]
@@ -366,7 +469,17 @@ def determine_metadata(content, init_msg):
return msg
def get_status_location(output_folder, offline_static):
def get_status_location(output_folder: str, offline_static: str) -> str:
"""
Gets the location of the W3.CSS file, either from web or local storage.
Args:
output_folder (str): The folder where offline static files will be stored.
offline_static (str): The subfolder name for static files. If falsy, returns web URL.
Returns:
str: The path or URL to the W3.CSS file.
"""
w3css = "https://www.w3schools.com/w3css/4/w3.css"
if not offline_static:
return w3css
@@ -381,7 +494,18 @@ def get_status_location(output_folder, offline_static):
w3css = os.path.join(offline_static, "w3.css")
def setup_template(template, no_avatar, experimental=False):
def setup_template(template: Optional[str], no_avatar: bool, experimental: bool = False) -> jinja2.Template:
"""
Sets up the Jinja2 template environment and loads the template.
Args:
template (Optional[str]): Path to custom template file. If None, uses default template.
no_avatar (bool): Whether to disable avatar display in the template.
experimental (bool, optional): Whether to use experimental template features. Defaults to False.
Returns:
jinja2.Template: The configured Jinja2 template object.
"""
if template is None or experimental:
template_dir = os.path.dirname(__file__)
template_file = "whatsapp.html" if not experimental else template
@@ -401,13 +525,17 @@ def setup_template(template, no_avatar, experimental=False):
APPLE_TIME = 978307200
def slugify(value, allow_unicode=False):
def slugify(value: str, allow_unicode: bool = False) -> str:
"""
Convert text to ASCII-only slugs for URL-safe strings.
Taken from https://github.com/django/django/blob/master/django/utils/text.py
Convert to ASCII if 'allow_unicode' is False. Convert spaces or repeated
dashes to single dashes. Remove characters that aren't alphanumerics,
underscores, or hyphens. Convert to lowercase. Also strip leading and
trailing whitespace, dashes, and underscores.
Args:
value (str): The string to convert to a slug.
allow_unicode (bool, optional): Whether to allow Unicode characters. Defaults to False.
Returns:
str: The slugified string with only alphanumerics, underscores, or hyphens.
"""
value = str(value)
if allow_unicode:
@@ -419,16 +547,17 @@ def slugify(value, allow_unicode=False):
class WhatsAppIdentifier(StrEnum):
MESSAGE = "7c7fba66680ef796b916b067077cc246adacf01d"
CONTACT = "b8548dc30aa1030df0ce18ef08b882cf7ab5212f"
CALL = "1b432994e958845fffe8e2f190f26d1511534088"
MESSAGE = "7c7fba66680ef796b916b067077cc246adacf01d" # AppDomainGroup-group.net.whatsapp.WhatsApp.shared-ChatStorage.sqlite
CONTACT = "b8548dc30aa1030df0ce18ef08b882cf7ab5212f" # AppDomainGroup-group.net.whatsapp.WhatsApp.shared-ContactsV2.sqlite
CALL = "1b432994e958845fffe8e2f190f26d1511534088" # AppDomainGroup-group.net.whatsapp.WhatsApp.shared-CallHistory.sqlite
DOMAIN = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared"
class WhatsAppBusinessIdentifier(StrEnum):
MESSAGE = "724bd3b98b18518b455a87c1f3ac3a0d189c4466"
CONTACT = "d7246a707f51ddf8b17ee2dddabd9e0a4da5c552"
DOMAIN = "AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared"
MESSAGE = "724bd3b98b18518b455a87c1f3ac3a0d189c4466" # AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-ChatStorage.sqlite
CONTACT = "d7246a707f51ddf8b17ee2dddabd9e0a4da5c552" # AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-ContactsV2.sqlite
CALL = "b463f7c4365eefc5a8723930d97928d4e907c603" # AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-CallHistory.sqlite
DOMAIN = "AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared"
class JidType(IntEnum):
PM = 0

View File

@@ -123,6 +123,10 @@
.reply-box:active {
background-color:rgb(200 202 205 / var(--tw-bg-opacity, 1));
}
.info-box-tooltip {
--tw-translate-x: -50%;
transform: translate(var(--tw-translate-x), var(--tw-translate-y)) rotate(var(--tw-rotate)) skewX(var(--tw-skew-x)) skewY(var(--tw-skew-y)) scaleX(var(--tw-scale-x)) scaleY(var(--tw-scale-y));
}
</style>
<script>
function search(event) {
@@ -207,7 +211,25 @@
{% endif %}
<!--Actual messages-->
{% if msg.from_me == true %}
<div class="flex justify-end" id="{{ msg.key_id }}">
<div class="flex justify-end items-center group" id="{{ msg.key_id }}">
<div class="opacity-0 group-hover:opacity-100 transition-opacity duration-200 relative mr-2">
<div class="relative">
<div class="relative group/tooltip">
<svg xmlns="http://www.w3.org/2000/svg" class="h-5 w-5 text-[#8696a0] hover:text-[#54656f] cursor-pointer" fill="none" viewBox="0 0 24 24" stroke="currentColor">
<use href="#info-icon"></use>
</svg>
<div class="absolute bottom-full info-box-tooltip mb-2 hidden group-hover/tooltip:block z-50">
<div class="bg-black text-white text-xs rounded py-1 px-2 whitespace-nowrap">
Delivered at {{msg.received_timestamp or 'unknown'}}
{% if msg.read_timestamp is not none %}
<br>Read at {{ msg.read_timestamp }}
{% endif %}
</div>
<div class="absolute top-full right-3 -mt-1 border-4 border-transparent border-t-black"></div>
</div>
</div>
</div>
</div>
<div class="bg-whatsapp-light rounded-lg p-2 max-w-[80%] shadow-sm">
{% if msg.reply is not none %}
<a href="#{{msg.reply}}" target="_self" class="no-base">
@@ -268,7 +290,7 @@
</div>
</div>
{% else %}
<div class="flex justify-start" id="{{ msg.key_id }}">
<div class="flex justify-start items-center group" id="{{ msg.key_id }}">
<div class="bg-white rounded-lg p-2 max-w-[80%] shadow-sm">
{% if msg.reply is not none %}
<a href="#{{msg.reply}}" target="_self" class="no-base">
@@ -335,6 +357,21 @@
<span class="flex-shrink-0">{{ msg.time }}</span>
</div>
</div>
<!-- <div class="opacity-0 group-hover:opacity-100 transition-opacity duration-200 relative ml-2">
<div class="relative">
<div class="relative group/tooltip">
<svg xmlns="http://www.w3.org/2000/svg" class="h-5 w-5 text-[#8696a0] hover:text-[#54656f] cursor-pointer" fill="none" viewBox="0 0 24 24" stroke="currentColor">
<use href="#info-icon"></use>
</svg>
<div class="absolute bottom-full info-box-tooltip mb-2 hidden group-hover/tooltip:block z-50">
<div class="bg-black text-white text-xs rounded py-1 px-2 whitespace-nowrap">
Received at {{msg.received_timestamp or 'unknown'}}
</div>
<div class="absolute top-full right-3 ml-1 border-4 border-transparent border-t-black"></div>
</div>
</div>
</div>
</div> -->
</div>
{% endif %}
{% endfor %}
@@ -348,6 +385,12 @@
<br>
Portions of this page are reproduced from <a href="https://web.dev/articles/lazy-loading-video">work</a> created and <a href="https://developers.google.com/readme/policies">shared by Google</a> and used according to terms described in the <a href="https://www.apache.org/licenses/LICENSE-2.0">Apache 2.0 License</a>.
</footer>
<svg style="display: none;">
<!-- Tooltip info icon -->
<symbol id="info-icon" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M13 16h-1v-4h-1m1-4h.01M21 12a9 9 0 11-18 0 9 9 0 0118 0z" />
</symbol>
</svg>
</div>
</article>
</body>

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "whatsapp-chat-exporter"
version = "0.11.2"
version = "0.12.0"
description = "A Whatsapp database parser that provides history of your Whatsapp conversations in HTML and JSON. Android, iOS, iPadOS, Crypt12, Crypt14, Crypt15 supported."
readme = "README.md"
authors = [

View File

@@ -0,0 +1,115 @@
"""
This script processes a VCARD file to standardize telephone entries and add a second TEL line with the modified number (removing the extra ninth digit) for contacts with 9-digit subscribers.
It handles numbers that may already include a "+55" prefix and ensures that the output format is consistent.
Contributed by @magpires https://github.com/KnugiHK/WhatsApp-Chat-Exporter/issues/127#issuecomment-2646660625
"""
import re
import argparse
def process_phone_number(raw_phone):
"""
Process the raw phone string from the VCARD and return two formatted numbers:
- The original formatted number, and
- A modified formatted number with the extra (ninth) digit removed, if applicable.
Desired output:
For a number with a 9-digit subscriber:
Original: "+55 {area} {first 5 of subscriber}-{last 4 of subscriber}"
Modified: "+55 {area} {subscriber[1:5]}-{subscriber[5:]}"
For example, for an input that should represent "027912345678", the outputs are:
"+55 27 91234-5678" and "+55 27 1234-5678"
This function handles numbers that may already include a "+55" prefix.
It expects that after cleaning, a valid number (without the country code) should have either 10 digits
(2 for area + 8 for subscriber) or 11 digits (2 for area + 9 for subscriber).
If extra digits are present, it takes the last 11 (or 10) digits.
"""
# Store the original input for processing
number_to_process = raw_phone.strip()
# Remove all non-digit characters
digits = re.sub(r'\D', '', number_to_process)
# If the number starts with '55', remove it for processing
if digits.startswith("55") and len(digits) > 11:
digits = digits[2:]
# Remove trunk zero if present
if digits.startswith("0"):
digits = digits[1:]
# After cleaning, we expect a valid number to have either 10 or 11 digits
# If there are extra digits, use the last 11 (for a 9-digit subscriber) or last 10 (for an 8-digit subscriber)
if len(digits) > 11:
# Here, we assume the valid number is the last 11 digits
digits = digits[-11:]
elif len(digits) > 10 and len(digits) < 11:
# In some cases with an 8-digit subscriber, take the last 10 digits
digits = digits[-10:]
# Check if we have a valid number after processing
if len(digits) not in (10, 11):
return None, None
area = digits[:2]
subscriber = digits[2:]
if len(subscriber) == 9:
# Format the original number (5-4 split, e.g., "91234-5678")
orig_subscriber = f"{subscriber[:5]}-{subscriber[5:]}"
# Create a modified version: drop the first digit of the subscriber to form an 8-digit subscriber (4-4 split)
mod_subscriber = f"{subscriber[1:5]}-{subscriber[5:]}"
original_formatted = f"+55 {area} {orig_subscriber}"
modified_formatted = f"+55 {area} {mod_subscriber}"
elif len(subscriber) == 8:
original_formatted = f"+55 {area} {subscriber[:4]}-{subscriber[4:]}"
modified_formatted = None
else:
# This shouldn't happen given the earlier check, but just to be safe
return None, None
return original_formatted, modified_formatted
def process_vcard(input_vcard, output_vcard):
"""
Process a VCARD file to standardize telephone entries and add a second TEL line
with the modified number (removing the extra ninth digit) for contacts with 9-digit subscribers.
"""
with open(input_vcard, 'r', encoding='utf-8') as file:
lines = file.readlines()
output_lines = []
# Regex to capture any telephone line.
# It matches lines starting with "TEL:" or "TEL;TYPE=..." or with prefixes like "item1.TEL:".
phone_pattern = re.compile(r'^(?P<prefix>.*TEL(?:;TYPE=[^:]+)?):(?P<number>.*)$')
for line in lines:
stripped_line = line.rstrip("\n")
match = phone_pattern.match(stripped_line)
if match:
raw_phone = match.group("number").strip()
orig_formatted, mod_formatted = process_phone_number(raw_phone)
if orig_formatted:
# Always output using the standardized prefix.
output_lines.append(f"TEL;TYPE=CELL:{orig_formatted}\n")
else:
output_lines.append(line)
if mod_formatted:
output_lines.append(f"TEL;TYPE=CELL:{mod_formatted}\n")
else:
output_lines.append(line)
with open(output_vcard, 'w', encoding='utf-8') as file:
file.writelines(output_lines)
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="Process a VCARD file to standardize telephone entries and add a second TEL line with the modified number (removing the extra ninth digit) for contacts with 9-digit subscribers."
)
parser.add_argument('input_vcard', type=str, help='Input VCARD file')
parser.add_argument('output_vcard', type=str, help='Output VCARD file')
args = parser.parse_args()
process_vcard(args.input_vcard, args.output_vcard)
print(f"VCARD processed and saved to {args.output_vcard}")

View File

@@ -0,0 +1,269 @@
import subprocess
import unittest
import tempfile
import os
from unittest.mock import patch
from brazilian_number_processing import process_phone_number, process_vcard
class TestVCardProcessor(unittest.TestCase):
def test_process_phone_number(self):
"""Test the process_phone_number function with various inputs."""
# Test cases for 9-digit subscriber numbers
test_cases_9_digit = [
# Standard 11-digit number (2 area + 9 subscriber)
("27912345678", "+55 27 91234-5678", "+55 27 1234-5678"),
# With country code prefix
("5527912345678", "+55 27 91234-5678", "+55 27 1234-5678"),
# With plus in country code
("+5527912345678", "+55 27 91234-5678", "+55 27 1234-5678"),
# With spaces and formatting
("+55 27 9 1234-5678", "+55 27 91234-5678", "+55 27 1234-5678"),
# With trunk zero
("027912345678", "+55 27 91234-5678", "+55 27 1234-5678"),
# With country code and trunk zero
("+55027912345678", "+55 27 91234-5678", "+55 27 1234-5678"),
# With extra digits at the beginning (should use last 11)
("99927912345678", "+55 27 91234-5678", "+55 27 1234-5678"),
# With extra non-digit characters
("+55-27-9.1234_5678", "+55 27 91234-5678", "+55 27 1234-5678"),
]
# Test cases for 8-digit subscriber numbers
test_cases_8_digit = [
# Standard 10-digit number (2 area + 8 subscriber)
("2712345678", "+55 27 1234-5678", None),
# With country code prefix
("552712345678", "+55 27 1234-5678", None),
# With plus in country code
("+552712345678", "+55 27 1234-5678", None),
# With spaces and formatting
("+55 27 1234-5678", "+55 27 1234-5678", None),
# With trunk zero
("02712345678", "+55 27 1234-5678", None),
# With country code and trunk zero
("+55 0 27 1234-5678", "+55 27 1234-5678", None),
]
# Edge cases
edge_cases = [
# Too few digits
("271234567", None, None),
# Empty string
("", None, None),
# Non-numeric characters only
("abc-def+ghi", None, None),
# Single digit
("1", None, None),
# Unusual formatting but valid number
("(+55) [27] 9.1234_5678", "+55 27 91234-5678", "+55 27 1234-5678"),
]
# Run tests for all cases
all_cases = test_cases_9_digit + test_cases_8_digit + edge_cases
for raw_phone, expected_orig, expected_mod in all_cases:
with self.subTest(raw_phone=raw_phone):
orig, mod = process_phone_number(raw_phone)
self.assertEqual(orig, expected_orig)
self.assertEqual(mod, expected_mod)
def test_process_vcard(self):
"""Test the process_vcard function with various VCARD formats."""
# Test case 1: Standard TEL entries
vcard1 = """BEGIN:VCARD
VERSION:3.0
N:Doe;John;;;
FN:John Doe
TEL:+5527912345678
TEL:+552712345678
END:VCARD
"""
expected1 = """BEGIN:VCARD
VERSION:3.0
N:Doe;John;;;
FN:John Doe
TEL;TYPE=CELL:+55 27 91234-5678
TEL;TYPE=CELL:+55 27 1234-5678
TEL;TYPE=CELL:+55 27 1234-5678
END:VCARD
"""
# Test case 2: TEL entries with TYPE attributes
vcard2 = """BEGIN:VCARD
VERSION:3.0
N:Smith;Jane;;;
FN:Jane Smith
TEL;TYPE=CELL:+5527912345678
TEL;TYPE=HOME:+552712345678
END:VCARD
"""
expected2 = """BEGIN:VCARD
VERSION:3.0
N:Smith;Jane;;;
FN:Jane Smith
TEL;TYPE=CELL:+55 27 91234-5678
TEL;TYPE=CELL:+55 27 1234-5678
TEL;TYPE=CELL:+55 27 1234-5678
END:VCARD
"""
# Test case 3: Complex TEL entries with prefixes
vcard3 = """BEGIN:VCARD
VERSION:3.0
N:Brown;Robert;;;
FN:Robert Brown
item1.TEL:+5527912345678
item2.TEL;TYPE=CELL:+552712345678
END:VCARD
"""
expected3 = """BEGIN:VCARD
VERSION:3.0
N:Brown;Robert;;;
FN:Robert Brown
TEL;TYPE=CELL:+55 27 91234-5678
TEL;TYPE=CELL:+55 27 1234-5678
TEL;TYPE=CELL:+55 27 1234-5678
END:VCARD
"""
# Test case 4: Mixed valid and invalid phone numbers
vcard4 = """BEGIN:VCARD
VERSION:3.0
N:White;Alice;;;
FN:Alice White
TEL:123
TEL:+5527912345678
END:VCARD
"""
expected4 = """BEGIN:VCARD
VERSION:3.0
N:White;Alice;;;
FN:Alice White
TEL:123
TEL;TYPE=CELL:+55 27 91234-5678
TEL;TYPE=CELL:+55 27 1234-5678
END:VCARD
"""
# Test case 5: Multiple contacts with different formats
vcard5 = """BEGIN:VCARD
VERSION:3.0
N:Johnson;Mike;;;
FN:Mike Johnson
TEL:27912345678
END:VCARD
BEGIN:VCARD
VERSION:3.0
N:Williams;Sarah;;;
FN:Sarah Williams
TEL;TYPE=CELL:2712345678
END:VCARD
"""
expected5 = """BEGIN:VCARD
VERSION:3.0
N:Johnson;Mike;;;
FN:Mike Johnson
TEL;TYPE=CELL:+55 27 91234-5678
TEL;TYPE=CELL:+55 27 1234-5678
END:VCARD
BEGIN:VCARD
VERSION:3.0
N:Williams;Sarah;;;
FN:Sarah Williams
TEL;TYPE=CELL:+55 27 1234-5678
END:VCARD
"""
# Test case 6: VCARD with no phone numbers
vcard6 = """BEGIN:VCARD
VERSION:3.0
N:Davis;Tom;;;
FN:Tom Davis
EMAIL:tom@example.com
END:VCARD
"""
expected6 = """BEGIN:VCARD
VERSION:3.0
N:Davis;Tom;;;
FN:Tom Davis
EMAIL:tom@example.com
END:VCARD
"""
test_cases = [
(vcard1, expected1),
(vcard2, expected2),
(vcard3, expected3),
(vcard4, expected4),
(vcard5, expected5),
(vcard6, expected6)
]
for i, (input_vcard, expected_output) in enumerate(test_cases):
with self.subTest(case=i+1):
# Create temporary files for input and output
with tempfile.NamedTemporaryFile(mode='w+', delete=False, encoding='utf-8') as input_file:
input_file.write(input_vcard)
input_path = input_file.name
output_path = input_path + '.out'
try:
# Process the VCARD
process_vcard(input_path, output_path)
# Read and verify the output
with open(output_path, 'r', encoding='utf-8') as output_file:
actual_output = output_file.read()
self.assertEqual(actual_output, expected_output)
finally:
# Clean up temporary files
if os.path.exists(input_path):
os.unlink(input_path)
if os.path.exists(output_path):
os.unlink(output_path)
def test_script_argument_handling(self):
"""Test the script's command-line argument handling."""
test_input = """BEGIN:VCARD
VERSION:3.0
N:Test;User;;;
FN:User Test
TEL:+5527912345678
END:VCARD
"""
# Create a temporary input file
with tempfile.NamedTemporaryFile(mode='w+', delete=False, encoding='utf-8') as input_file:
input_file.write(test_input)
input_path = input_file.name
output_path = input_path + '.out'
try:
test_args = ['python' if os.name == 'nt' else 'python3', 'brazilian_number_processing.py', input_path, output_path]
# We're just testing that the argument parsing works
subprocess.call(
test_args,
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT
)
# Check if the output file was created
self.assertTrue(os.path.exists(output_path))
finally:
# Clean up temporary files
if os.path.exists(input_path):
os.unlink(input_path)
if os.path.exists(output_path):
os.unlink(output_path)
if __name__ == '__main__':
unittest.main()

View File

@@ -3,6 +3,7 @@ import javaobj
import zlib
from Crypto.Cipher import AES
from hashlib import sha256
from sys import exit
def _generate_hmac_of_hmac(key_stream):