mirror of
https://github.com/KnugiHK/WhatsApp-Chat-Exporter.git
synced 2026-03-30 01:35:27 +00:00
Compare commits
46 Commits
0.13.0rc2
...
1694ae7dd9
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1694ae7dd9 | ||
|
|
f05e0d3451 | ||
|
|
0c5f2b7f13 | ||
|
|
db01d05263 | ||
|
|
2e7953f4ca | ||
|
|
95a52231be | ||
|
|
bf230db595 | ||
|
|
242e8ee43a | ||
|
|
c32096b26b | ||
|
|
4aa1c26232 | ||
|
|
feca9ae8e0 | ||
|
|
92c325294c | ||
|
|
7dbd0dbe3c | ||
|
|
035e61c4d7 | ||
|
|
96d323e0ed | ||
|
|
35ad2559d7 | ||
|
|
8058ed8219 | ||
|
|
908d8f71ca | ||
|
|
f2b6a39011 | ||
|
|
4f531ec52a | ||
|
|
b69f645ac3 | ||
|
|
f8b959e1e1 | ||
|
|
9be210f34a | ||
|
|
ae7ba3da96 | ||
|
|
00e58ce2c9 | ||
|
|
4245ecc615 | ||
|
|
68dcc6abe0 | ||
|
|
c05e76569b | ||
|
|
a6fe0d93b1 | ||
|
|
2d096eff4d | ||
|
|
ea9675973c | ||
|
|
064b923cfa | ||
|
|
cd35ffc185 | ||
|
|
05bd26b8ed | ||
|
|
d200130335 | ||
|
|
1c7d6f7912 | ||
|
|
94960e4a23 | ||
|
|
79578d867f | ||
|
|
6910cc46a4 | ||
|
|
9e0457e720 | ||
|
|
e0967a3104 | ||
|
|
db50f24dd8 | ||
|
|
75fcf33fda | ||
|
|
0ba81e0863 | ||
|
|
647e406ac0 | ||
|
|
9cedcf1767 |
@@ -115,7 +115,7 @@ Do an iPhone/iPad Backup with iTunes/Finder first.
|
||||
|
||||
If you want to work on an encrypted iOS/iPadOS Backup, you should install iphone_backup_decrypt from [KnugiHK/iphone_backup_decrypt](https://github.com/KnugiHK/iphone_backup_decrypt) before you run the extract_iphone_media.py.
|
||||
```sh
|
||||
pip install git+https://github.com/KnugiHK/iphone_backup_decrypt
|
||||
pip install whatsapp-chat-exporter["ios_backup"]
|
||||
```
|
||||
> [!NOTE]
|
||||
> You will need to disable the built-in end-to-end encryption for WhatsApp backups. See [WhatsApp's FAQ](https://faq.whatsapp.com/490592613091019#turn-off-end-to-end-encrypted-backup) for how to do it.
|
||||
|
||||
@@ -11,14 +11,16 @@ import logging
|
||||
import importlib.metadata
|
||||
from Whatsapp_Chat_Exporter import android_crypt, exported_handler, android_handler
|
||||
from Whatsapp_Chat_Exporter import ios_handler, ios_media_handler
|
||||
from Whatsapp_Chat_Exporter.data_model import ChatCollection, ChatStore
|
||||
from Whatsapp_Chat_Exporter.utility import APPLE_TIME, CLEAR_LINE, Crypt, check_update
|
||||
from Whatsapp_Chat_Exporter.data_model import ChatCollection, ChatStore, Timing
|
||||
from Whatsapp_Chat_Exporter.utility import APPLE_TIME, CLEAR_LINE, CURRENT_TZ_OFFSET, Crypt
|
||||
from Whatsapp_Chat_Exporter.utility import readable_to_bytes, safe_name, bytes_to_readable
|
||||
from Whatsapp_Chat_Exporter.utility import import_from_json, incremental_merge, DbType
|
||||
from Whatsapp_Chat_Exporter.utility import telegram_json_format
|
||||
from Whatsapp_Chat_Exporter.utility import import_from_json, incremental_merge, check_update
|
||||
from Whatsapp_Chat_Exporter.utility import telegram_json_format, convert_time_unit, DbType
|
||||
from Whatsapp_Chat_Exporter.utility import get_transcription_selection, check_jid_map
|
||||
from argparse import ArgumentParser, SUPPRESS
|
||||
from datetime import datetime
|
||||
from getpass import getpass
|
||||
from tqdm import tqdm
|
||||
from sys import exit
|
||||
from typing import Optional, List, Dict
|
||||
from Whatsapp_Chat_Exporter.vcards_contacts import ContactsFromVCards
|
||||
@@ -286,13 +288,17 @@ def setup_argument_parser() -> ArgumentParser:
|
||||
help="Specify the chunk size for decrypting iOS backup, which may affect the decryption speed."
|
||||
)
|
||||
misc_group.add_argument(
|
||||
"--max-bruteforce-worker", dest="max_bruteforce_worker", default=10, type=int,
|
||||
"--max-bruteforce-worker", dest="max_bruteforce_worker", default=4, type=int,
|
||||
help="Specify the maximum number of worker for bruteforce decryption."
|
||||
)
|
||||
misc_group.add_argument(
|
||||
"--no-banner", dest="no_banner", default=False, action='store_true',
|
||||
help="Do not show the banner"
|
||||
)
|
||||
misc_group.add_argument(
|
||||
"--fix-dot-files", dest="fix_dot_files", default=False, action='store_true',
|
||||
help="Fix files with a dot at the end of their name (allowing the outputs be stored in FAT filesystems)"
|
||||
)
|
||||
|
||||
return parser
|
||||
|
||||
@@ -519,6 +525,7 @@ def process_contacts(args, data: ChatCollection) -> None:
|
||||
if os.path.isfile(contact_db):
|
||||
with sqlite3.connect(contact_db) as db:
|
||||
db.row_factory = sqlite3.Row
|
||||
db.text_factory = lambda b: b.decode(encoding="utf-8", errors="replace")
|
||||
if args.android:
|
||||
android_handler.contacts(db, data, args.enrich_from_vcards)
|
||||
else:
|
||||
@@ -537,25 +544,29 @@ def process_messages(args, data: ChatCollection) -> None:
|
||||
exit(6)
|
||||
|
||||
filter_chat = (args.filter_chat_include, args.filter_chat_exclude)
|
||||
timing = Timing(args.timezone_offset if args.timezone_offset else CURRENT_TZ_OFFSET)
|
||||
|
||||
with sqlite3.connect(msg_db) as db:
|
||||
db.row_factory = sqlite3.Row
|
||||
db.text_factory = lambda b: b.decode(encoding="utf-8", errors="replace")
|
||||
|
||||
# Process messages
|
||||
if args.android:
|
||||
message_handler = android_handler
|
||||
data.set_system("jid_map_exists", check_jid_map(db))
|
||||
data.set_system("transcription_selection", get_transcription_selection(db))
|
||||
else:
|
||||
message_handler = ios_handler
|
||||
|
||||
message_handler.messages(
|
||||
db, data, args.media, args.timezone_offset, args.filter_date,
|
||||
db, data, args.media, timing, args.filter_date,
|
||||
filter_chat, args.filter_empty, args.no_reply_ios
|
||||
)
|
||||
|
||||
# Process media
|
||||
message_handler.media(
|
||||
db, data, args.media, args.filter_date,
|
||||
filter_chat, args.filter_empty, args.separate_media
|
||||
filter_chat, args.filter_empty, args.separate_media, args.fix_dot_files
|
||||
)
|
||||
|
||||
# Process vcards
|
||||
@@ -565,17 +576,18 @@ def process_messages(args, data: ChatCollection) -> None:
|
||||
)
|
||||
|
||||
# Process calls
|
||||
process_calls(args, db, data, filter_chat)
|
||||
process_calls(args, db, data, filter_chat, timing)
|
||||
|
||||
|
||||
def process_calls(args, db, data: ChatCollection, filter_chat) -> None:
|
||||
def process_calls(args, db, data: ChatCollection, filter_chat, timing) -> None:
|
||||
"""Process call history if available."""
|
||||
if args.android:
|
||||
android_handler.calls(db, data, args.timezone_offset, filter_chat)
|
||||
android_handler.calls(db, data, timing, filter_chat)
|
||||
elif args.ios and args.call_db_ios is not None:
|
||||
with sqlite3.connect(args.call_db_ios) as cdb:
|
||||
cdb.row_factory = sqlite3.Row
|
||||
ios_handler.calls(cdb, data, args.timezone_offset, filter_chat)
|
||||
cdb.text_factory = lambda b: b.decode(encoding="utf-8", errors="replace")
|
||||
ios_handler.calls(cdb, data, timing, filter_chat)
|
||||
|
||||
|
||||
def handle_media_directory(args) -> None:
|
||||
@@ -665,24 +677,27 @@ def export_multiple_json(args, data: Dict) -> None:
|
||||
|
||||
# Export each chat
|
||||
total = len(data.keys())
|
||||
for index, jik in enumerate(data.keys()):
|
||||
if data[jik]["name"] is not None:
|
||||
contact = data[jik]["name"].replace('/', '')
|
||||
else:
|
||||
contact = jik.replace('+', '')
|
||||
with tqdm(total=total, desc="Generating JSON files", unit="file", leave=False) as pbar:
|
||||
for jik in data.keys():
|
||||
if data[jik]["name"] is not None:
|
||||
contact = data[jik]["name"].replace('/', '')
|
||||
else:
|
||||
contact = jik.replace('+', '')
|
||||
|
||||
if args.telegram:
|
||||
messages = telegram_json_format(jik, data[jik], args.timezone_offset)
|
||||
else:
|
||||
messages = {jik: data[jik]}
|
||||
with open(f"{json_path}/{safe_name(contact)}.json", "w") as f:
|
||||
file_content = json.dumps(
|
||||
messages,
|
||||
ensure_ascii=not args.avoid_encoding_json,
|
||||
indent=args.pretty_print_json
|
||||
)
|
||||
f.write(file_content)
|
||||
logger.info(f"Writing JSON file...({index + 1}/{total})\r")
|
||||
if args.telegram:
|
||||
messages = telegram_json_format(jik, data[jik], args.timezone_offset)
|
||||
else:
|
||||
messages = {jik: data[jik]}
|
||||
with open(f"{json_path}/{safe_name(contact)}.json", "w") as f:
|
||||
file_content = json.dumps(
|
||||
messages,
|
||||
ensure_ascii=not args.avoid_encoding_json,
|
||||
indent=args.pretty_print_json
|
||||
)
|
||||
f.write(file_content)
|
||||
pbar.update(1)
|
||||
total_time = pbar.format_dict['elapsed']
|
||||
logger.info(f"Generated {total} JSON files in {convert_time_unit(total_time)}{CLEAR_LINE}")
|
||||
|
||||
|
||||
def process_exported_chat(args, data: ChatCollection) -> None:
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
import time
|
||||
import hmac
|
||||
import io
|
||||
import logging
|
||||
import threading
|
||||
import zlib
|
||||
import concurrent.futures
|
||||
from tqdm import tqdm
|
||||
from typing import Tuple, Union
|
||||
from hashlib import sha256
|
||||
from sys import exit
|
||||
from functools import partial
|
||||
from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, CRYPT14_OFFSETS, Crypt, DbType
|
||||
|
||||
try:
|
||||
@@ -112,13 +111,36 @@ def _decrypt_database(db_ciphertext: bytes, main_key: bytes, iv: bytes) -> bytes
|
||||
zlib.error: If decompression fails.
|
||||
ValueError: if the plaintext is not a SQLite database.
|
||||
"""
|
||||
FOOTER_SIZE = 32
|
||||
if len(db_ciphertext) <= FOOTER_SIZE:
|
||||
raise ValueError("Input data too short to contain a valid GCM tag.")
|
||||
|
||||
actual_ciphertext = db_ciphertext[:-FOOTER_SIZE]
|
||||
tag = db_ciphertext[-FOOTER_SIZE: -FOOTER_SIZE + 16]
|
||||
|
||||
cipher = AES.new(main_key, AES.MODE_GCM, iv)
|
||||
db_compressed = cipher.decrypt(db_ciphertext)
|
||||
db = zlib.decompress(db_compressed)
|
||||
if db[0:6].upper() != b"SQLITE":
|
||||
try:
|
||||
db_compressed = cipher.decrypt_and_verify(actual_ciphertext, tag)
|
||||
except ValueError:
|
||||
# This could be key, IV, or tag is wrong, but likely the key is wrong.
|
||||
raise ValueError("Decryption/Authentication failed. Ensure you are using the correct key.")
|
||||
|
||||
if len(db_compressed) < 2 or db_compressed[0] != 0x78:
|
||||
logger.debug(f"Data passes GCM but is not Zlib. Header: {db_compressed[:2].hex()}")
|
||||
raise ValueError(
|
||||
"The plaintext is not a SQLite database. Ensure you are using the correct key."
|
||||
"Key is correct, but decrypted data is not a valid compressed stream. "
|
||||
"Is this even a valid WhatsApp database backup?"
|
||||
)
|
||||
|
||||
try:
|
||||
db = zlib.decompress(db_compressed)
|
||||
except zlib.error as e:
|
||||
raise zlib.error(f"Decompression failed (The backup file likely corrupted at source): {e}")
|
||||
|
||||
if not db.startswith(b"SQLite"):
|
||||
raise ValueError(
|
||||
"Data is valid and decompressed, but it is not a SQLite database. "
|
||||
"Is this even a valid WhatsApp database backup?")
|
||||
return db
|
||||
|
||||
|
||||
@@ -142,82 +164,69 @@ def _decrypt_crypt14(database: bytes, main_key: bytes, max_worker: int = 10) ->
|
||||
|
||||
# Attempt known offsets first
|
||||
for offsets in CRYPT14_OFFSETS:
|
||||
iv = database[offsets["iv"]:offsets["iv"] + 16]
|
||||
db_ciphertext = database[offsets["db"]:]
|
||||
iv = offsets["iv"]
|
||||
db = offsets["db"]
|
||||
try:
|
||||
decrypted_db = _decrypt_database(db_ciphertext, main_key, iv)
|
||||
decrypted_db = _attempt_decrypt_task((iv, iv + 16, db), database, main_key)
|
||||
except (zlib.error, ValueError):
|
||||
pass # Try next offset
|
||||
continue
|
||||
else:
|
||||
logger.debug(
|
||||
f"Decryption successful with known offsets: IV {offsets['iv']}, DB {offsets['db']}{CLEAR_LINE}"
|
||||
f"Decryption successful with known offsets: IV {iv}, DB {db}{CLEAR_LINE}"
|
||||
)
|
||||
return decrypted_db # Successful decryption
|
||||
|
||||
def animate_message(stop_event):
|
||||
base_msg = "Common offsets failed. Initiating brute-force with multithreading"
|
||||
dots = ["", ".", "..", "..."]
|
||||
i = 0
|
||||
while not stop_event.is_set():
|
||||
logger.info(f"{base_msg}{dots[i % len(dots)]}\x1b[K\r")
|
||||
time.sleep(0.3)
|
||||
i += 1
|
||||
logger.info(f"Common offsets failed but brute-forcing the offset works!{CLEAR_LINE}")
|
||||
|
||||
stop_event = threading.Event()
|
||||
anim_thread = threading.Thread(target=animate_message, args=(stop_event,))
|
||||
anim_thread.start()
|
||||
|
||||
# Convert brute force generator into a list for parallel processing
|
||||
offset_combinations = list(brute_force_offset())
|
||||
|
||||
def attempt_decrypt(offset_tuple):
|
||||
"""Attempt decryption with the given offsets."""
|
||||
start_iv, end_iv, start_db = offset_tuple
|
||||
iv = database[start_iv:end_iv]
|
||||
db_ciphertext = database[start_db:]
|
||||
logger.debug(""f"Trying offsets: IV {start_iv}-{end_iv}, DB {start_db}{CLEAR_LINE}")
|
||||
|
||||
try:
|
||||
db = _decrypt_database(db_ciphertext, main_key, iv)
|
||||
except (zlib.error, ValueError):
|
||||
return None # Decryption failed, move to next
|
||||
else:
|
||||
stop_event.set()
|
||||
anim_thread.join()
|
||||
logger.info(
|
||||
f"The offsets of your IV and database are {start_iv} and "
|
||||
f"{start_db}, respectively. To include your offsets in the "
|
||||
"program, please report it by creating an issue on GitHub: "
|
||||
"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/discussions/47"
|
||||
f"\nShutting down other threads...{CLEAR_LINE}"
|
||||
)
|
||||
return db
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor(max_worker) as executor:
|
||||
future_to_offset = {executor.submit(attempt_decrypt, offset)
|
||||
: offset for offset in offset_combinations}
|
||||
|
||||
try:
|
||||
for future in concurrent.futures.as_completed(future_to_offset):
|
||||
result = future.result()
|
||||
if result is not None:
|
||||
# Shutdown remaining threads
|
||||
logger.info(f"Common offsets failed. Will attempt to brute-force{CLEAR_LINE}")
|
||||
offset_max = 200
|
||||
workers = max_worker
|
||||
check_offset = partial(_attempt_decrypt_task, database=database, main_key=main_key)
|
||||
all_offsets = list(brute_force_offset(offset_max, offset_max))
|
||||
executor = concurrent.futures.ProcessPoolExecutor(max_workers=workers)
|
||||
try:
|
||||
with tqdm(total=len(all_offsets), desc="Brute-forcing offsets", unit="trial", leave=False) as pbar:
|
||||
results = executor.map(check_offset, all_offsets, chunksize=8)
|
||||
found = False
|
||||
for offset_info, result in zip(all_offsets, results):
|
||||
pbar.update(1)
|
||||
if result:
|
||||
start_iv, _, start_db = offset_info
|
||||
# Clean shutdown on success
|
||||
executor.shutdown(wait=False, cancel_futures=True)
|
||||
return result
|
||||
found = True
|
||||
break
|
||||
if found:
|
||||
logger.info(
|
||||
f"The offsets of your IV and database are {start_iv} and {start_db}, respectively.{CLEAR_LINE}"
|
||||
)
|
||||
logger.info(
|
||||
f"To include your offsets in the expoter, please report it in the discussion thread on GitHub:{CLEAR_LINE}"
|
||||
)
|
||||
logger.info(f"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/discussions/47{CLEAR_LINE}")
|
||||
return result
|
||||
|
||||
except KeyboardInterrupt:
|
||||
stop_event.set()
|
||||
anim_thread.join()
|
||||
logger.info(f"Brute force interrupted by user (Ctrl+C). Shutting down gracefully...{CLEAR_LINE}")
|
||||
executor.shutdown(wait=False, cancel_futures=True)
|
||||
exit(1)
|
||||
finally:
|
||||
stop_event.set()
|
||||
anim_thread.join()
|
||||
except KeyboardInterrupt:
|
||||
executor.shutdown(wait=False, cancel_futures=True)
|
||||
print("\n")
|
||||
raise KeyboardInterrupt(
|
||||
f"Brute force interrupted by user (Ctrl+C). Shutting down gracefully...{CLEAR_LINE}"
|
||||
)
|
||||
|
||||
finally:
|
||||
executor.shutdown(wait=False)
|
||||
|
||||
raise OffsetNotFoundError("Could not find the correct offsets for decryption.")
|
||||
|
||||
def _attempt_decrypt_task(offset_tuple, database, main_key):
|
||||
"""Attempt decryption with the given offsets."""
|
||||
start_iv, end_iv, start_db = offset_tuple
|
||||
iv = database[start_iv:end_iv]
|
||||
db_ciphertext = database[start_db:]
|
||||
|
||||
try:
|
||||
return _decrypt_database(db_ciphertext, main_key, iv)
|
||||
except (zlib.error, ValueError):
|
||||
return None
|
||||
|
||||
|
||||
def _decrypt_crypt12(database: bytes, main_key: bytes) -> bytes:
|
||||
"""Decrypt a crypt12 database.
|
||||
|
||||
@@ -4,16 +4,17 @@ import logging
|
||||
import sqlite3
|
||||
import os
|
||||
import shutil
|
||||
from tqdm import tqdm
|
||||
from pathlib import Path
|
||||
from mimetypes import MimeTypes
|
||||
from markupsafe import escape as htmle
|
||||
from base64 import b64decode, b64encode
|
||||
from datetime import datetime
|
||||
from Whatsapp_Chat_Exporter.data_model import ChatStore, Message
|
||||
from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, CURRENT_TZ_OFFSET, MAX_SIZE, ROW_SIZE, JidType, Device
|
||||
from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, MAX_SIZE, ROW_SIZE, JidType, Device, get_jid_map_join
|
||||
from Whatsapp_Chat_Exporter.utility import rendering, get_file_name, setup_template, get_cond_for_empty
|
||||
from Whatsapp_Chat_Exporter.utility import get_status_location, convert_time_unit, determine_metadata
|
||||
from Whatsapp_Chat_Exporter.utility import get_chat_condition, safe_name, bytes_to_readable
|
||||
from Whatsapp_Chat_Exporter.utility import get_status_location, convert_time_unit, get_jid_map_selection
|
||||
from Whatsapp_Chat_Exporter.utility import get_chat_condition, safe_name, bytes_to_readable, determine_metadata
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -38,21 +39,24 @@ def contacts(db, data, enrich_from_vcards):
|
||||
if total_row_number == 0:
|
||||
if enrich_from_vcards is not None:
|
||||
logger.info(
|
||||
"No contacts profiles found in the default database, contacts will be imported from the specified vCard file.")
|
||||
"No contacts profiles found in the default database, contacts will be imported from the specified vCard file.\n")
|
||||
else:
|
||||
logger.warning(
|
||||
"No contacts profiles found in the default database, consider using --enrich-from-vcards for adopting names from exported contacts from Google")
|
||||
"No contacts profiles found in the default database, consider using --enrich-from-vcards for adopting names from exported contacts from Google\n")
|
||||
return False
|
||||
else:
|
||||
logger.info(f"Processed {total_row_number} contacts\n")
|
||||
|
||||
c.execute("SELECT jid, COALESCE(display_name, wa_name) as display_name, status FROM wa_contacts;")
|
||||
row = c.fetchone()
|
||||
while row is not None:
|
||||
current_chat = data.add_chat(row["jid"], ChatStore(Device.ANDROID, row["display_name"]))
|
||||
if row["status"] is not None:
|
||||
current_chat.status = row["status"]
|
||||
row = c.fetchone()
|
||||
|
||||
with tqdm(total=total_row_number, desc="Processing contacts", unit="contact", leave=False) as pbar:
|
||||
while (row := _fetch_row_safely(c)) is not None:
|
||||
current_chat = data.add_chat(row["jid"], ChatStore(Device.ANDROID, row["display_name"]))
|
||||
if row["status"] is not None:
|
||||
current_chat.status = row["status"]
|
||||
pbar.update(1)
|
||||
total_time = pbar.format_dict['elapsed']
|
||||
logger.info(f"Processed {total_row_number} contacts in {convert_time_unit(total_time)}{CLEAR_LINE}")
|
||||
|
||||
return True
|
||||
|
||||
@@ -71,39 +75,37 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
|
||||
filter_empty: Filter for empty chats
|
||||
"""
|
||||
c = db.cursor()
|
||||
total_row_number = _get_message_count(c, filter_empty, filter_date, filter_chat)
|
||||
logger.info(f"Processing messages...(0/{total_row_number})\r")
|
||||
total_row_number = _get_message_count(c, filter_empty, filter_date, filter_chat, data.get_system("jid_map_exists"))
|
||||
|
||||
try:
|
||||
content_cursor = _get_messages_cursor_legacy(c, filter_empty, filter_date, filter_chat)
|
||||
table_message = False
|
||||
except sqlite3.OperationalError:
|
||||
except sqlite3.OperationalError as e:
|
||||
logger.debug(f'Got sql error "{e}" in _get_message_cursor_legacy trying fallback.\n')
|
||||
try:
|
||||
content_cursor = _get_messages_cursor_new(c, filter_empty, filter_date, filter_chat)
|
||||
content_cursor = _get_messages_cursor_new(
|
||||
c,
|
||||
filter_empty,
|
||||
filter_date,
|
||||
filter_chat,
|
||||
data.get_system("transcription_selection"),
|
||||
data.get_system("jid_map_exists")
|
||||
)
|
||||
table_message = True
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
i = 0
|
||||
# Fetch the first row safely
|
||||
content = _fetch_row_safely(content_cursor)
|
||||
|
||||
while content is not None:
|
||||
_process_single_message(data, content, table_message, timezone_offset)
|
||||
|
||||
i += 1
|
||||
if i % 1000 == 0:
|
||||
logger.info(f"Processing messages...({i}/{total_row_number})\r")
|
||||
|
||||
# Fetch the next row safely
|
||||
content = _fetch_row_safely(content_cursor)
|
||||
|
||||
logger.info(f"Processed {total_row_number} messages{CLEAR_LINE}")
|
||||
|
||||
with tqdm(total=total_row_number, desc="Processing messages", unit="msg", leave=False) as pbar:
|
||||
while (content := _fetch_row_safely(content_cursor)) is not None:
|
||||
_process_single_message(data, content, table_message, timezone_offset)
|
||||
pbar.update(1)
|
||||
total_time = pbar.format_dict['elapsed']
|
||||
_get_reactions(db, data)
|
||||
logger.info(f"Processed {total_row_number} messages in {convert_time_unit(total_time)}{CLEAR_LINE}")
|
||||
|
||||
# Helper functions for message processing
|
||||
|
||||
def _get_message_count(cursor, filter_empty, filter_date, filter_chat):
|
||||
def _get_message_count(cursor, filter_empty, filter_date, filter_chat, jid_map_exists):
|
||||
"""Get the total number of messages to process."""
|
||||
try:
|
||||
empty_filter = get_cond_for_empty(filter_empty, "messages.key_remote_jid", "messages.needs_push")
|
||||
@@ -124,22 +126,28 @@ def _get_message_count(cursor, filter_empty, filter_date, filter_chat):
|
||||
{date_filter}
|
||||
{include_filter}
|
||||
{exclude_filter}""")
|
||||
except sqlite3.OperationalError:
|
||||
empty_filter = get_cond_for_empty(filter_empty, "jid.raw_string", "broadcast")
|
||||
date_filter = f'AND timestamp {filter_date}' if filter_date is not None else ''
|
||||
include_filter = get_chat_condition(
|
||||
filter_chat[0], True, ["jid.raw_string", "jid_group.raw_string"], "jid", "android")
|
||||
exclude_filter = get_chat_condition(
|
||||
filter_chat[1], False, ["jid.raw_string", "jid_group.raw_string"], "jid", "android")
|
||||
except sqlite3.OperationalError as e:
|
||||
logger.debug(f'Got sql error "{e}" in _get_message_count trying fallback.\n')
|
||||
|
||||
cursor.execute(f"""SELECT count()
|
||||
empty_filter = get_cond_for_empty(filter_empty, "key_remote_jid", "broadcast")
|
||||
date_filter = f'AND timestamp {filter_date}' if filter_date is not None else ''
|
||||
remote_jid_selection, group_jid_selection = get_jid_map_selection(jid_map_exists)
|
||||
include_filter = get_chat_condition(
|
||||
filter_chat[0], True, ["key_remote_jid", "group_sender_jid"], "jid", "android")
|
||||
exclude_filter = get_chat_condition(
|
||||
filter_chat[1], False, ["key_remote_jid", "group_sender_jid"], "jid", "android")
|
||||
|
||||
cursor.execute(f"""SELECT count(),
|
||||
{remote_jid_selection} as key_remote_jid,
|
||||
{group_jid_selection} as group_sender_jid
|
||||
FROM message
|
||||
LEFT JOIN chat
|
||||
ON chat._id = message.chat_row_id
|
||||
INNER JOIN jid
|
||||
ON jid._id = chat.jid_row_id
|
||||
INNER JOIN jid jid_global
|
||||
ON jid_global._id = chat.jid_row_id
|
||||
LEFT JOIN jid jid_group
|
||||
ON jid_group._id = message.sender_jid_row_id
|
||||
{get_jid_map_join(jid_map_exists)}
|
||||
WHERE 1=1
|
||||
{empty_filter}
|
||||
{date_filter}
|
||||
@@ -213,16 +221,24 @@ def _get_messages_cursor_legacy(cursor, filter_empty, filter_date, filter_chat):
|
||||
return cursor
|
||||
|
||||
|
||||
def _get_messages_cursor_new(cursor, filter_empty, filter_date, filter_chat):
|
||||
def _get_messages_cursor_new(
|
||||
cursor,
|
||||
filter_empty,
|
||||
filter_date,
|
||||
filter_chat,
|
||||
transcription_selection,
|
||||
jid_map_exists
|
||||
):
|
||||
"""Get cursor for new database schema."""
|
||||
empty_filter = get_cond_for_empty(filter_empty, "key_remote_jid", "broadcast")
|
||||
date_filter = f'AND message.timestamp {filter_date}' if filter_date is not None else ''
|
||||
remote_jid_selection, group_jid_selection = get_jid_map_selection(jid_map_exists)
|
||||
include_filter = get_chat_condition(
|
||||
filter_chat[0], True, ["key_remote_jid", "jid_group.raw_string"], "jid_global", "android")
|
||||
filter_chat[0], True, ["key_remote_jid", "group_sender_jid"], "jid_global", "android")
|
||||
exclude_filter = get_chat_condition(
|
||||
filter_chat[1], False, ["key_remote_jid", "jid_group.raw_string"], "jid_global", "android")
|
||||
filter_chat[1], False, ["key_remote_jid", "group_sender_jid"], "jid_global", "android")
|
||||
|
||||
cursor.execute(f"""SELECT jid_global.raw_string as key_remote_jid,
|
||||
cursor.execute(f"""SELECT {remote_jid_selection} as key_remote_jid,
|
||||
message._id,
|
||||
message.from_me as key_from_me,
|
||||
message.timestamp,
|
||||
@@ -237,7 +253,7 @@ def _get_messages_cursor_new(cursor, filter_empty, filter_date, filter_chat):
|
||||
message.key_id,
|
||||
message_quoted.text_data as quoted_data,
|
||||
message.message_type as media_wa_type,
|
||||
jid_group.raw_string as group_sender_jid,
|
||||
{group_jid_selection} as group_sender_jid,
|
||||
chat.subject as chat_subject,
|
||||
missed_call_logs.video_call,
|
||||
message.sender_jid_row_id,
|
||||
@@ -247,7 +263,8 @@ def _get_messages_cursor_new(cursor, filter_empty, filter_date, filter_chat):
|
||||
jid_new.raw_string as new_jid,
|
||||
jid_global.type as jid_type,
|
||||
COALESCE(receipt_user.receipt_timestamp, message.received_timestamp) as received_timestamp,
|
||||
COALESCE(receipt_user.read_timestamp, receipt_user.played_timestamp) as read_timestamp
|
||||
COALESCE(receipt_user.read_timestamp, receipt_user.played_timestamp) as read_timestamp,
|
||||
{transcription_selection}
|
||||
FROM message
|
||||
LEFT JOIN message_quoted
|
||||
ON message_quoted.message_row_id = message._id
|
||||
@@ -279,6 +296,7 @@ def _get_messages_cursor_new(cursor, filter_empty, filter_date, filter_chat):
|
||||
ON jid_new._id = message_system_number_change.new_jid_row_id
|
||||
LEFT JOIN receipt_user
|
||||
ON receipt_user.message_row_id = message._id
|
||||
{get_jid_map_join(jid_map_exists)}
|
||||
WHERE key_remote_jid <> '-1'
|
||||
{empty_filter}
|
||||
{date_filter}
|
||||
@@ -294,7 +312,11 @@ def _fetch_row_safely(cursor):
|
||||
try:
|
||||
content = cursor.fetchone()
|
||||
return content
|
||||
except sqlite3.OperationalError:
|
||||
except sqlite3.OperationalError as e:
|
||||
# Not sure how often this might happen, but this check should reduce the overhead
|
||||
# if DEBUG flag is not set.
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
logger.debug(f'Got sql error "{e}" in _fetch_row_safely ignoring row.\n')
|
||||
continue
|
||||
|
||||
|
||||
@@ -320,7 +342,7 @@ def _process_single_message(data, content, table_message, timezone_offset):
|
||||
timestamp=content["timestamp"],
|
||||
time=content["timestamp"],
|
||||
key_id=content["key_id"],
|
||||
timezone_offset=timezone_offset if timezone_offset else CURRENT_TZ_OFFSET,
|
||||
timezone_offset=timezone_offset,
|
||||
message_type=content["media_wa_type"],
|
||||
received_timestamp=content["received_timestamp"],
|
||||
read_timestamp=content["read_timestamp"]
|
||||
@@ -352,9 +374,12 @@ def _process_single_message(data, content, table_message, timezone_offset):
|
||||
if not table_message and content["media_caption"] is not None:
|
||||
# Old schema
|
||||
message.caption = content["media_caption"]
|
||||
elif table_message and content["media_wa_type"] == 1 and content["data"] is not None:
|
||||
elif table_message:
|
||||
# New schema
|
||||
message.caption = content["data"]
|
||||
if content["media_wa_type"] == 1 and content["data"] is not None:
|
||||
message.caption = content["data"]
|
||||
elif content["media_wa_type"] == 2 and content["transcription_text"] is not None:
|
||||
message.caption = f'"{content["transcription_text"]}"'
|
||||
else:
|
||||
message.caption = None
|
||||
|
||||
@@ -480,7 +505,79 @@ def _format_message_text(text):
|
||||
return text
|
||||
|
||||
|
||||
def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=True):
|
||||
def _get_reactions(db, data):
|
||||
"""
|
||||
Process message reactions. Only new schema is supported.
|
||||
Chat filter is not applied here at the moment. Maybe in the future.
|
||||
"""
|
||||
c = db.cursor()
|
||||
|
||||
try:
|
||||
# Check if tables exist, old schema might not have reactions or in somewhere else
|
||||
c.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='message_add_on'")
|
||||
if c.fetchone()[0] == 0:
|
||||
return
|
||||
|
||||
logger.info("Processing reactions...\r")
|
||||
|
||||
c.execute("""
|
||||
SELECT
|
||||
message_add_on.parent_message_row_id,
|
||||
message_add_on_reaction.reaction,
|
||||
message_add_on.from_me,
|
||||
jid.raw_string as sender_jid_raw,
|
||||
chat_jid.raw_string as chat_jid_raw,
|
||||
message_add_on_reaction.sender_timestamp
|
||||
FROM message_add_on
|
||||
INNER JOIN message_add_on_reaction
|
||||
ON message_add_on._id = message_add_on_reaction.message_add_on_row_id
|
||||
LEFT JOIN jid
|
||||
ON message_add_on.sender_jid_row_id = jid._id
|
||||
LEFT JOIN chat
|
||||
ON message_add_on.chat_row_id = chat._id
|
||||
LEFT JOIN jid chat_jid
|
||||
ON chat.jid_row_id = chat_jid._id
|
||||
""")
|
||||
except sqlite3.OperationalError:
|
||||
logger.warning(f"Could not fetch reactions (schema might be too old or incompatible){CLEAR_LINE}")
|
||||
return
|
||||
|
||||
rows = c.fetchall()
|
||||
total_row_number = len(rows)
|
||||
|
||||
with tqdm(total=total_row_number, desc="Processing reactions", unit="reaction", leave=False) as pbar:
|
||||
for row in rows:
|
||||
parent_id = row["parent_message_row_id"]
|
||||
reaction = row["reaction"]
|
||||
chat_id = row["chat_jid_raw"]
|
||||
_react_timestamp = row["sender_timestamp"]
|
||||
|
||||
if chat_id and chat_id in data:
|
||||
chat = data[chat_id]
|
||||
if parent_id in chat._messages:
|
||||
message = chat._messages[parent_id]
|
||||
|
||||
# Determine sender name
|
||||
sender_name = None
|
||||
if row["from_me"]:
|
||||
sender_name = "You"
|
||||
elif row["sender_jid_raw"]:
|
||||
sender_jid = row["sender_jid_raw"]
|
||||
if sender_jid in data:
|
||||
sender_name = data[sender_jid].name
|
||||
if not sender_name:
|
||||
sender_name = sender_jid.split('@')[0] if "@" in sender_jid else sender_jid
|
||||
|
||||
if not sender_name:
|
||||
sender_name = "Unknown"
|
||||
|
||||
message.reactions[sender_name] = reaction
|
||||
pbar.update(1)
|
||||
total_time = pbar.format_dict['elapsed']
|
||||
logger.info(f"Processed {total_row_number} reactions in {convert_time_unit(total_time)}{CLEAR_LINE}")
|
||||
|
||||
|
||||
def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=True, fix_dot_files=False):
|
||||
"""
|
||||
Process WhatsApp media files from the database.
|
||||
|
||||
@@ -495,11 +592,10 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa
|
||||
"""
|
||||
c = db.cursor()
|
||||
total_row_number = _get_media_count(c, filter_empty, filter_date, filter_chat)
|
||||
logger.info(f"Processing media...(0/{total_row_number})\r")
|
||||
|
||||
try:
|
||||
content_cursor = _get_media_cursor_legacy(c, filter_empty, filter_date, filter_chat)
|
||||
except sqlite3.OperationalError:
|
||||
except sqlite3.OperationalError as e:
|
||||
logger.debug(f'Got sql error "{e}" in _get_media_cursor_legacy trying fallback.\n')
|
||||
content_cursor = _get_media_cursor_new(c, filter_empty, filter_date, filter_chat)
|
||||
|
||||
content = content_cursor.fetchone()
|
||||
@@ -508,18 +604,12 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa
|
||||
# Ensure thumbnails directory exists
|
||||
Path(f"{media_folder}/thumbnails").mkdir(parents=True, exist_ok=True)
|
||||
|
||||
i = 0
|
||||
while content is not None:
|
||||
_process_single_media(data, content, media_folder, mime, separate_media)
|
||||
|
||||
i += 1
|
||||
if i % 100 == 0:
|
||||
logger.info(f"Processing media...({i}/{total_row_number})\r")
|
||||
|
||||
content = content_cursor.fetchone()
|
||||
|
||||
logger.info(f"Processed {total_row_number} media{CLEAR_LINE}")
|
||||
|
||||
with tqdm(total=total_row_number, desc="Processing media", unit="media", leave=False) as pbar:
|
||||
while (content := _fetch_row_safely(content_cursor)) is not None:
|
||||
_process_single_media(data, content, media_folder, mime, separate_media, fix_dot_files)
|
||||
pbar.update(1)
|
||||
total_time = pbar.format_dict['elapsed']
|
||||
logger.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}{CLEAR_LINE}")
|
||||
|
||||
# Helper functions for media processing
|
||||
|
||||
@@ -546,15 +636,18 @@ def _get_media_count(cursor, filter_empty, filter_date, filter_chat):
|
||||
{date_filter}
|
||||
{include_filter}
|
||||
{exclude_filter}""")
|
||||
except sqlite3.OperationalError:
|
||||
except sqlite3.OperationalError as e:
|
||||
logger.debug(f'Got sql error "{e}" in _get_media_count trying fallback.\n')
|
||||
empty_filter = get_cond_for_empty(filter_empty, "jid.raw_string", "broadcast")
|
||||
date_filter = f'AND message.timestamp {filter_date}' if filter_date is not None else ''
|
||||
include_filter = get_chat_condition(
|
||||
filter_chat[0], True, ["jid.raw_string", "jid_group.raw_string"], "jid", "android")
|
||||
filter_chat[0], True, ["key_remote_jid", "group_sender_jid"], "jid", "android")
|
||||
exclude_filter = get_chat_condition(
|
||||
filter_chat[1], False, ["jid.raw_string", "jid_group.raw_string"], "jid", "android")
|
||||
filter_chat[1], False, ["key_remote_jid", "group_sender_jid"], "jid", "android")
|
||||
|
||||
cursor.execute(f"""SELECT count()
|
||||
cursor.execute(f"""SELECT count(),
|
||||
COALESCE(lid_global.raw_string, jid.raw_string) as key_remote_jid,
|
||||
COALESCE(lid_group.raw_string, jid_group.raw_string) as group_sender_jid
|
||||
FROM message_media
|
||||
INNER JOIN message
|
||||
ON message_media.message_row_id = message._id
|
||||
@@ -564,6 +657,14 @@ def _get_media_count(cursor, filter_empty, filter_date, filter_chat):
|
||||
ON jid._id = chat.jid_row_id
|
||||
LEFT JOIN jid jid_group
|
||||
ON jid_group._id = message.sender_jid_row_id
|
||||
LEFT JOIN jid_map as jid_map_global
|
||||
ON chat.jid_row_id = jid_map_global.lid_row_id
|
||||
LEFT JOIN jid lid_global
|
||||
ON jid_map_global.jid_row_id = lid_global._id
|
||||
LEFT JOIN jid_map as jid_map_group
|
||||
ON message.sender_jid_row_id = jid_map_group.lid_row_id
|
||||
LEFT JOIN jid lid_group
|
||||
ON jid_map_group.jid_row_id = lid_group._id
|
||||
WHERE 1=1
|
||||
{empty_filter}
|
||||
{date_filter}
|
||||
@@ -612,18 +713,19 @@ def _get_media_cursor_new(cursor, filter_empty, filter_date, filter_chat):
|
||||
empty_filter = get_cond_for_empty(filter_empty, "key_remote_jid", "broadcast")
|
||||
date_filter = f'AND message.timestamp {filter_date}' if filter_date is not None else ''
|
||||
include_filter = get_chat_condition(
|
||||
filter_chat[0], True, ["key_remote_jid", "jid_group.raw_string"], "jid", "android")
|
||||
filter_chat[0], True, ["key_remote_jid", "group_sender_jid"], "jid", "android")
|
||||
exclude_filter = get_chat_condition(
|
||||
filter_chat[1], False, ["key_remote_jid", "jid_group.raw_string"], "jid", "android")
|
||||
filter_chat[1], False, ["key_remote_jid", "group_sender_jid"], "jid", "android")
|
||||
|
||||
cursor.execute(f"""SELECT jid.raw_string as key_remote_jid,
|
||||
cursor.execute(f"""SELECT COALESCE(lid_global.raw_string, jid.raw_string) as key_remote_jid,
|
||||
message_row_id,
|
||||
file_path,
|
||||
message_url,
|
||||
mime_type,
|
||||
media_key,
|
||||
file_hash,
|
||||
thumbnail
|
||||
thumbnail,
|
||||
COALESCE(lid_group.raw_string, jid_group.raw_string) as group_sender_jid
|
||||
FROM message_media
|
||||
INNER JOIN message
|
||||
ON message_media.message_row_id = message._id
|
||||
@@ -635,6 +737,14 @@ def _get_media_cursor_new(cursor, filter_empty, filter_date, filter_chat):
|
||||
ON message_media.file_hash = media_hash_thumbnail.media_hash
|
||||
LEFT JOIN jid jid_group
|
||||
ON jid_group._id = message.sender_jid_row_id
|
||||
LEFT JOIN jid_map as jid_map_global
|
||||
ON chat.jid_row_id = jid_map_global.lid_row_id
|
||||
LEFT JOIN jid lid_global
|
||||
ON jid_map_global.jid_row_id = lid_global._id
|
||||
LEFT JOIN jid_map as jid_map_group
|
||||
ON message.sender_jid_row_id = jid_map_group.lid_row_id
|
||||
LEFT JOIN jid lid_group
|
||||
ON jid_map_group.jid_row_id = lid_group._id
|
||||
WHERE jid.type <> 7
|
||||
{empty_filter}
|
||||
{date_filter}
|
||||
@@ -644,7 +754,7 @@ def _get_media_cursor_new(cursor, filter_empty, filter_date, filter_chat):
|
||||
return cursor
|
||||
|
||||
|
||||
def _process_single_media(data, content, media_folder, mime, separate_media):
|
||||
def _process_single_media(data, content, media_folder, mime, separate_media, fix_dot_files=False):
|
||||
"""Process a single media file."""
|
||||
file_path = f"{media_folder}/{content['file_path']}"
|
||||
current_chat = data.get_chat(content["key_remote_jid"])
|
||||
@@ -652,8 +762,6 @@ def _process_single_media(data, content, media_folder, mime, separate_media):
|
||||
message.media = True
|
||||
|
||||
if os.path.isfile(file_path):
|
||||
message.data = file_path
|
||||
|
||||
# Set mime type
|
||||
if content["mime_type"] is None:
|
||||
guess = mime.guess_type(file_path)[0]
|
||||
@@ -663,6 +771,16 @@ def _process_single_media(data, content, media_folder, mime, separate_media):
|
||||
message.mime = "application/octet-stream"
|
||||
else:
|
||||
message.mime = content["mime_type"]
|
||||
|
||||
if fix_dot_files and file_path.endswith("."):
|
||||
extension = mime.guess_extension(message.mime)
|
||||
if message.mime == "application/octet-stream" or not extension:
|
||||
new_file_path = file_path[:-1]
|
||||
else:
|
||||
extension = mime.guess_extension(message.mime)
|
||||
new_file_path = file_path[:-1] + extension
|
||||
os.rename(file_path, new_file_path)
|
||||
file_path = new_file_path
|
||||
|
||||
# Copy media to separate folder if needed
|
||||
if separate_media:
|
||||
@@ -674,6 +792,8 @@ def _process_single_media(data, content, media_folder, mime, separate_media):
|
||||
new_path = os.path.join(new_folder, current_filename)
|
||||
shutil.copy2(file_path, new_path)
|
||||
message.data = new_path
|
||||
else:
|
||||
message.data = file_path
|
||||
else:
|
||||
message.data = "The media is missing"
|
||||
message.mime = "media"
|
||||
@@ -693,49 +813,61 @@ def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty):
|
||||
c = db.cursor()
|
||||
try:
|
||||
rows = _execute_vcard_query_modern(c, filter_date, filter_chat, filter_empty)
|
||||
except sqlite3.OperationalError:
|
||||
except sqlite3.OperationalError as e:
|
||||
logger.debug(f'Got sql error "{e}" in _execute_vcard_query_modern trying fallback.\n')
|
||||
rows = _execute_vcard_query_legacy(c, filter_date, filter_chat, filter_empty)
|
||||
|
||||
total_row_number = len(rows)
|
||||
logger.info(f"Processing vCards...(0/{total_row_number})\r")
|
||||
|
||||
# Create vCards directory if it doesn't exist
|
||||
path = os.path.join(media_folder, "vCards")
|
||||
Path(path).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for index, row in enumerate(rows):
|
||||
_process_vcard_row(row, path, data)
|
||||
logger.info(f"Processing vCards...({index + 1}/{total_row_number})\r")
|
||||
logger.info(f"Processed {total_row_number} vCards{CLEAR_LINE}")
|
||||
|
||||
with tqdm(total=total_row_number, desc="Processing vCards", unit="vcard", leave=False) as pbar:
|
||||
for row in rows:
|
||||
_process_vcard_row(row, path, data)
|
||||
pbar.update(1)
|
||||
total_time = pbar.format_dict['elapsed']
|
||||
logger.info(f"Processed {total_row_number} vCards in {convert_time_unit(total_time)}{CLEAR_LINE}")
|
||||
|
||||
def _execute_vcard_query_modern(c, filter_date, filter_chat, filter_empty):
|
||||
"""Execute vCard query for modern WhatsApp database schema."""
|
||||
|
||||
# Build the filter conditions
|
||||
chat_filter_include = get_chat_condition(
|
||||
filter_chat[0], True, ["messages.key_remote_jid", "remote_resource"], "jid", "android")
|
||||
chat_filter_exclude = get_chat_condition(
|
||||
filter_chat[1], False, ["messages.key_remote_jid", "remote_resource"], "jid", "android")
|
||||
date_filter = f'AND messages.timestamp {filter_date}' if filter_date is not None else ''
|
||||
empty_filter = get_cond_for_empty(filter_empty, "key_remote_jid", "messages.needs_push")
|
||||
include_filter = get_chat_condition(
|
||||
filter_chat[0], True, ["key_remote_jid", "group_sender_jid"], "jid", "android")
|
||||
exclude_filter = get_chat_condition(
|
||||
filter_chat[1], False, ["key_remote_jid", "group_sender_jid"], "jid", "android")
|
||||
|
||||
query = f"""SELECT message_row_id,
|
||||
messages.key_remote_jid,
|
||||
vcard,
|
||||
messages.media_name
|
||||
FROM messages_vcards
|
||||
INNER JOIN messages
|
||||
ON messages_vcards.message_row_id = messages._id
|
||||
INNER JOIN jid
|
||||
ON messages.key_remote_jid = jid.raw_string
|
||||
LEFT JOIN chat
|
||||
ON chat.jid_row_id = jid._id
|
||||
COALESCE(lid_global.raw_string, jid.raw_string) as key_remote_jid,
|
||||
vcard,
|
||||
messages.media_name,
|
||||
COALESCE(lid_group.raw_string, jid_group.raw_string) as group_sender_jid
|
||||
FROM messages_vcards
|
||||
INNER JOIN messages
|
||||
ON messages_vcards.message_row_id = messages._id
|
||||
INNER JOIN jid
|
||||
ON messages.key_remote_jid = jid.raw_string
|
||||
LEFT JOIN chat
|
||||
ON chat.jid_row_id = jid._id
|
||||
LEFT JOIN jid jid_group
|
||||
ON jid_group._id = message.sender_jid_row_id
|
||||
LEFT JOIN jid_map as jid_map_global
|
||||
ON chat.jid_row_id = jid_map_global.lid_row_id
|
||||
LEFT JOIN jid lid_global
|
||||
ON jid_map_global.jid_row_id = lid_global._id
|
||||
LEFT JOIN jid_map as jid_map_group
|
||||
ON message.sender_jid_row_id = jid_map_group.lid_row_id
|
||||
LEFT JOIN jid lid_group
|
||||
ON jid_map_group.jid_row_id = lid_group._id
|
||||
WHERE 1=1
|
||||
{empty_filter}
|
||||
{date_filter}
|
||||
{chat_filter_include}
|
||||
{chat_filter_exclude}
|
||||
{include_filter}
|
||||
{exclude_filter}
|
||||
ORDER BY messages.key_remote_jid ASC;"""
|
||||
c.execute(query)
|
||||
return c.fetchall()
|
||||
@@ -812,32 +944,37 @@ def calls(db, data, timezone_offset, filter_chat):
|
||||
chat = ChatStore(Device.ANDROID, "WhatsApp Calls")
|
||||
|
||||
# Process each call
|
||||
content = calls_data.fetchone()
|
||||
while content is not None:
|
||||
_process_call_record(content, chat, data, timezone_offset)
|
||||
content = calls_data.fetchone()
|
||||
with tqdm(total=total_row_number, desc="Processing calls", unit="call", leave=False) as pbar:
|
||||
while (content := _fetch_row_safely(calls_data)) is not None:
|
||||
_process_call_record(content, chat, data, timezone_offset)
|
||||
pbar.update(1)
|
||||
total_time = pbar.format_dict['elapsed']
|
||||
|
||||
# Add the calls chat to the data
|
||||
data.add_chat("000000000000000", chat)
|
||||
logger.info(f"Processed {total_row_number} calls{CLEAR_LINE}")
|
||||
|
||||
logger.info(f"Processed {total_row_number} calls in {convert_time_unit(total_time)}{CLEAR_LINE}")
|
||||
|
||||
def _get_calls_count(c, filter_chat):
|
||||
"""Get the count of call records that match the filter."""
|
||||
|
||||
# Build the filter conditions
|
||||
chat_filter_include = get_chat_condition(filter_chat[0], True, ["jid.raw_string"])
|
||||
chat_filter_exclude = get_chat_condition(filter_chat[1], False, ["jid.raw_string"])
|
||||
include_filter = get_chat_condition(filter_chat[0], True, ["key_remote_jid"])
|
||||
exclude_filter = get_chat_condition(filter_chat[1], False, ["key_remote_jid"])
|
||||
|
||||
query = f"""SELECT count()
|
||||
query = f"""SELECT count(),
|
||||
COALESCE(lid_global.raw_string, jid.raw_string) as key_remote_jid
|
||||
FROM call_log
|
||||
INNER JOIN jid
|
||||
ON call_log.jid_row_id = jid._id
|
||||
LEFT JOIN chat
|
||||
ON call_log.jid_row_id = chat.jid_row_id
|
||||
LEFT JOIN jid_map as jid_map_global
|
||||
ON chat.jid_row_id = jid_map_global.lid_row_id
|
||||
LEFT JOIN jid lid_global
|
||||
ON jid_map_global.jid_row_id = lid_global._id
|
||||
WHERE 1=1
|
||||
{chat_filter_include}
|
||||
{chat_filter_exclude}"""
|
||||
{include_filter}
|
||||
{exclude_filter}"""
|
||||
c.execute(query)
|
||||
return c.fetchone()[0]
|
||||
|
||||
@@ -846,11 +983,11 @@ def _fetch_calls_data(c, filter_chat):
|
||||
"""Fetch call data from the database."""
|
||||
|
||||
# Build the filter conditions
|
||||
chat_filter_include = get_chat_condition(filter_chat[0], True, ["jid.raw_string"])
|
||||
chat_filter_exclude = get_chat_condition(filter_chat[1], False, ["jid.raw_string"])
|
||||
include_filter = get_chat_condition(filter_chat[0], True, ["key_remote_jid"])
|
||||
exclude_filter = get_chat_condition(filter_chat[1], False, ["key_remote_jid"])
|
||||
|
||||
query = f"""SELECT call_log._id,
|
||||
jid.raw_string,
|
||||
COALESCE(lid_global.raw_string, jid.raw_string) as key_remote_jid,
|
||||
from_me,
|
||||
call_id,
|
||||
timestamp,
|
||||
@@ -864,9 +1001,13 @@ def _fetch_calls_data(c, filter_chat):
|
||||
ON call_log.jid_row_id = jid._id
|
||||
LEFT JOIN chat
|
||||
ON call_log.jid_row_id = chat.jid_row_id
|
||||
LEFT JOIN jid_map as jid_map_global
|
||||
ON chat.jid_row_id = jid_map_global.lid_row_id
|
||||
LEFT JOIN jid lid_global
|
||||
ON jid_map_global.jid_row_id = lid_global._id
|
||||
WHERE 1=1
|
||||
{chat_filter_include}
|
||||
{chat_filter_exclude}"""
|
||||
{include_filter}
|
||||
{exclude_filter}"""
|
||||
c.execute(query)
|
||||
return c
|
||||
|
||||
@@ -878,13 +1019,13 @@ def _process_call_record(content, chat, data, timezone_offset):
|
||||
timestamp=content["timestamp"],
|
||||
time=content["timestamp"],
|
||||
key_id=content["call_id"],
|
||||
timezone_offset=timezone_offset if timezone_offset else CURRENT_TZ_OFFSET,
|
||||
timezone_offset=timezone_offset,
|
||||
received_timestamp=None, # TODO: Add timestamp
|
||||
read_timestamp=None # TODO: Add timestamp
|
||||
)
|
||||
|
||||
# Get caller/callee name
|
||||
_jid = content["raw_string"]
|
||||
_jid = content["key_remote_jid"]
|
||||
name = data.get_chat(_jid).name if _jid in data else content["chat_subject"] or None
|
||||
if _jid is not None and "@" in _jid:
|
||||
fallback = _jid.split('@')[0]
|
||||
@@ -929,6 +1070,7 @@ def _construct_call_description(content, call):
|
||||
return description
|
||||
|
||||
|
||||
# TODO: Marked for enhancement on multi-threaded processing
|
||||
def create_html(
|
||||
data,
|
||||
output_folder,
|
||||
@@ -944,7 +1086,6 @@ def create_html(
|
||||
template = setup_template(template, no_avatar, experimental)
|
||||
|
||||
total_row_number = len(data)
|
||||
logger.info(f"Generating chats...(0/{total_row_number})\r")
|
||||
|
||||
# Create output directory if it doesn't exist
|
||||
if not os.path.isdir(output_folder):
|
||||
@@ -952,43 +1093,42 @@ def create_html(
|
||||
|
||||
w3css = get_status_location(output_folder, offline_static)
|
||||
|
||||
for current, contact in enumerate(data):
|
||||
current_chat = data.get_chat(contact)
|
||||
if len(current_chat) == 0:
|
||||
# Skip empty chats
|
||||
continue
|
||||
with tqdm(total=total_row_number, desc="Generating HTML", unit="file", leave=False) as pbar:
|
||||
for contact in data:
|
||||
current_chat = data.get_chat(contact)
|
||||
if len(current_chat) == 0:
|
||||
# Skip empty chats
|
||||
continue
|
||||
|
||||
safe_file_name, name = get_file_name(contact, current_chat)
|
||||
safe_file_name, name = get_file_name(contact, current_chat)
|
||||
|
||||
if maximum_size is not None:
|
||||
_generate_paginated_chat(
|
||||
current_chat,
|
||||
safe_file_name,
|
||||
name,
|
||||
contact,
|
||||
output_folder,
|
||||
template,
|
||||
w3css,
|
||||
maximum_size,
|
||||
headline
|
||||
)
|
||||
else:
|
||||
_generate_single_chat(
|
||||
current_chat,
|
||||
safe_file_name,
|
||||
name,
|
||||
contact,
|
||||
output_folder,
|
||||
template,
|
||||
w3css,
|
||||
headline
|
||||
)
|
||||
|
||||
if current % 10 == 0:
|
||||
logger.info(f"Generating chats...({current}/{total_row_number})\r")
|
||||
|
||||
logger.info(f"Generated {total_row_number} chats{CLEAR_LINE}")
|
||||
if maximum_size is not None:
|
||||
_generate_paginated_chat(
|
||||
current_chat,
|
||||
safe_file_name,
|
||||
name,
|
||||
contact,
|
||||
output_folder,
|
||||
template,
|
||||
w3css,
|
||||
maximum_size,
|
||||
headline
|
||||
)
|
||||
else:
|
||||
_generate_single_chat(
|
||||
current_chat,
|
||||
safe_file_name,
|
||||
name,
|
||||
contact,
|
||||
output_folder,
|
||||
template,
|
||||
w3css,
|
||||
headline
|
||||
)
|
||||
|
||||
pbar.update(1)
|
||||
total_time = pbar.format_dict['elapsed']
|
||||
logger.info(f"Generated {total_row_number} chats in {convert_time_unit(total_time)}{CLEAR_LINE}")
|
||||
|
||||
def _generate_single_chat(current_chat, safe_file_name, name, contact, output_folder, template, w3css, headline):
|
||||
"""Generate a single HTML file for a chat."""
|
||||
|
||||
@@ -66,6 +66,7 @@ class ChatCollection(MutableMapping):
|
||||
def __init__(self) -> None:
|
||||
"""Initialize an empty chat collection."""
|
||||
self._chats: Dict[str, ChatStore] = {}
|
||||
self._system: Dict[str, Any] = {}
|
||||
|
||||
def __getitem__(self, key: str) -> 'ChatStore':
|
||||
"""Get a chat by its ID. Required for dict-like access."""
|
||||
@@ -148,6 +149,28 @@ class ChatCollection(MutableMapping):
|
||||
"""
|
||||
return {chat_id: chat.to_json() for chat_id, chat in self._chats.items()}
|
||||
|
||||
def get_system(self, key: str) -> Any:
|
||||
"""
|
||||
Get a system value by its key.
|
||||
|
||||
Args:
|
||||
key (str): The key of the system value to retrieve
|
||||
|
||||
Returns:
|
||||
Any: The system value if found, None otherwise
|
||||
"""
|
||||
return self._system.get(key)
|
||||
|
||||
def set_system(self, key: str, value: Any) -> None:
|
||||
"""
|
||||
Set a system value by its key.
|
||||
|
||||
Args:
|
||||
key (str): The key of the system value to set
|
||||
value (Any): The value to set
|
||||
"""
|
||||
self._system[key] = value
|
||||
|
||||
|
||||
class ChatStore:
|
||||
"""
|
||||
@@ -279,7 +302,7 @@ class Message:
|
||||
key_id: Union[int, str],
|
||||
received_timestamp: int = None,
|
||||
read_timestamp: int = None,
|
||||
timezone_offset: int = 0,
|
||||
timezone_offset: Optional[Timing] = Timing(0),
|
||||
message_type: Optional[int] = None
|
||||
) -> None:
|
||||
"""
|
||||
@@ -300,10 +323,9 @@ class Message:
|
||||
"""
|
||||
self.from_me = bool(from_me)
|
||||
self.timestamp = timestamp / 1000 if timestamp > 9999999999 else timestamp
|
||||
timing = Timing(timezone_offset)
|
||||
|
||||
if isinstance(time, (int, float)):
|
||||
self.time = timing.format_timestamp(self.timestamp, "%H:%M")
|
||||
self.time = timezone_offset.format_timestamp(self.timestamp, "%H:%M")
|
||||
elif isinstance(time, str):
|
||||
self.time = time
|
||||
else:
|
||||
@@ -318,14 +340,14 @@ class Message:
|
||||
self.mime = None
|
||||
self.message_type = message_type
|
||||
if isinstance(received_timestamp, (int, float)):
|
||||
self.received_timestamp = timing.format_timestamp(
|
||||
self.received_timestamp = timezone_offset.format_timestamp(
|
||||
received_timestamp, "%Y/%m/%d %H:%M")
|
||||
elif isinstance(received_timestamp, str):
|
||||
self.received_timestamp = received_timestamp
|
||||
else:
|
||||
self.received_timestamp = None
|
||||
if isinstance(read_timestamp, (int, float)):
|
||||
self.read_timestamp = timing.format_timestamp(
|
||||
self.read_timestamp = timezone_offset.format_timestamp(
|
||||
read_timestamp, "%Y/%m/%d %H:%M")
|
||||
elif isinstance(read_timestamp, str):
|
||||
self.read_timestamp = read_timestamp
|
||||
@@ -338,6 +360,7 @@ class Message:
|
||||
self.caption = None
|
||||
self.thumb = None # Android specific
|
||||
self.sticker = False
|
||||
self.reactions = {}
|
||||
|
||||
def to_json(self) -> Dict[str, Any]:
|
||||
"""Convert message to JSON-serializable dict."""
|
||||
|
||||
@@ -4,8 +4,9 @@ import os
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from mimetypes import MimeTypes
|
||||
from tqdm import tqdm
|
||||
from Whatsapp_Chat_Exporter.data_model import ChatStore, Message
|
||||
from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, Device
|
||||
from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, Device, convert_time_unit
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -34,17 +35,16 @@ def messages(path, data, assume_first_as_me=False):
|
||||
|
||||
# Second pass: process the messages
|
||||
with open(path, "r", encoding="utf8") as file:
|
||||
for index, line in enumerate(file):
|
||||
you, user_identification_done = process_line(
|
||||
line, index, chat, path, you,
|
||||
assume_first_as_me, user_identification_done
|
||||
)
|
||||
with tqdm(total=total_row_number, desc="Processing messages & media", unit="msg&media", leave=False) as pbar:
|
||||
for index, line in enumerate(file):
|
||||
you, user_identification_done = process_line(
|
||||
line, index, chat, path, you,
|
||||
assume_first_as_me, user_identification_done
|
||||
)
|
||||
pbar.update(1)
|
||||
total_time = pbar.format_dict['elapsed']
|
||||
logger.info(f"Processed {total_row_number} messages & media in {convert_time_unit(total_time)}{CLEAR_LINE}")
|
||||
|
||||
# Show progress
|
||||
if index % 1000 == 0:
|
||||
logger.info(f"Processing messages & media...({index}/{total_row_number})\r")
|
||||
|
||||
logger.info(f"Processed {total_row_number} messages & media{CLEAR_LINE}")
|
||||
return data
|
||||
|
||||
|
||||
|
||||
@@ -4,12 +4,13 @@ import os
|
||||
import logging
|
||||
import shutil
|
||||
from glob import glob
|
||||
from tqdm import tqdm
|
||||
from pathlib import Path
|
||||
from mimetypes import MimeTypes
|
||||
from markupsafe import escape as htmle
|
||||
from Whatsapp_Chat_Exporter.data_model import ChatStore, Message
|
||||
from Whatsapp_Chat_Exporter.utility import APPLE_TIME, CLEAR_LINE, CURRENT_TZ_OFFSET, get_chat_condition
|
||||
from Whatsapp_Chat_Exporter.utility import bytes_to_readable, convert_time_unit, safe_name, Device
|
||||
from Whatsapp_Chat_Exporter.utility import APPLE_TIME, CLEAR_LINE, get_chat_condition, Device
|
||||
from Whatsapp_Chat_Exporter.utility import bytes_to_readable, convert_time_unit, safe_name
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -23,17 +24,18 @@ def contacts(db, data):
|
||||
logger.info(f"Pre-processing contacts...({total_row_number})\r")
|
||||
|
||||
c.execute("""SELECT ZWHATSAPPID, ZABOUTTEXT FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""")
|
||||
content = c.fetchone()
|
||||
while content is not None:
|
||||
zwhatsapp_id = content["ZWHATSAPPID"]
|
||||
if not zwhatsapp_id.endswith("@s.whatsapp.net"):
|
||||
zwhatsapp_id += "@s.whatsapp.net"
|
||||
with tqdm(total=total_row_number, desc="Processing contacts", unit="contact", leave=False) as pbar:
|
||||
while (content := c.fetchone()) is not None:
|
||||
zwhatsapp_id = content["ZWHATSAPPID"]
|
||||
if not zwhatsapp_id.endswith("@s.whatsapp.net"):
|
||||
zwhatsapp_id += "@s.whatsapp.net"
|
||||
|
||||
current_chat = ChatStore(Device.IOS)
|
||||
current_chat.status = content["ZABOUTTEXT"]
|
||||
data.add_chat(zwhatsapp_id, current_chat)
|
||||
content = c.fetchone()
|
||||
logger.info(f"Pre-processed {total_row_number} contacts{CLEAR_LINE}")
|
||||
current_chat = ChatStore(Device.IOS)
|
||||
current_chat.status = content["ZABOUTTEXT"]
|
||||
data.add_chat(zwhatsapp_id, current_chat)
|
||||
pbar.update(1)
|
||||
total_time = pbar.format_dict['elapsed']
|
||||
logger.info(f"Pre-processed {total_row_number} contacts in {convert_time_unit(total_time)}{CLEAR_LINE}")
|
||||
|
||||
|
||||
def process_contact_avatars(current_chat, media_folder, contact_id):
|
||||
@@ -92,7 +94,6 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
|
||||
"""
|
||||
c.execute(contact_query)
|
||||
total_row_number = c.fetchone()[0]
|
||||
logger.info(f"Processing contacts...({total_row_number})\r")
|
||||
|
||||
# Get distinct contacts
|
||||
contacts_query = f"""
|
||||
@@ -114,24 +115,24 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
|
||||
c.execute(contacts_query)
|
||||
|
||||
# Process each contact
|
||||
content = c.fetchone()
|
||||
while content is not None:
|
||||
contact_name = get_contact_name(content)
|
||||
contact_id = content["ZCONTACTJID"]
|
||||
with tqdm(total=total_row_number, desc="Processing contacts", unit="contact", leave=False) as pbar:
|
||||
while (content := c.fetchone()) is not None:
|
||||
contact_name = get_contact_name(content)
|
||||
contact_id = content["ZCONTACTJID"]
|
||||
|
||||
# Add or update chat
|
||||
if contact_id not in data:
|
||||
current_chat = data.add_chat(contact_id, ChatStore(Device.IOS, contact_name, media_folder))
|
||||
else:
|
||||
current_chat = data.get_chat(contact_id)
|
||||
current_chat.name = contact_name
|
||||
current_chat.my_avatar = os.path.join(media_folder, "Media/Profile/Photo.jpg")
|
||||
# Add or update chat
|
||||
if contact_id not in data:
|
||||
current_chat = data.add_chat(contact_id, ChatStore(Device.IOS, contact_name, media_folder))
|
||||
else:
|
||||
current_chat = data.get_chat(contact_id)
|
||||
current_chat.name = contact_name
|
||||
current_chat.my_avatar = os.path.join(media_folder, "Media/Profile/Photo.jpg")
|
||||
|
||||
# Process avatar images
|
||||
process_contact_avatars(current_chat, media_folder, contact_id)
|
||||
content = c.fetchone()
|
||||
|
||||
logger.info(f"Processed {total_row_number} contacts{CLEAR_LINE}")
|
||||
# Process avatar images
|
||||
process_contact_avatars(current_chat, media_folder, contact_id)
|
||||
pbar.update(1)
|
||||
total_time = pbar.format_dict['elapsed']
|
||||
logger.info(f"Processed {total_row_number} contacts in {convert_time_unit(total_time)}{CLEAR_LINE}")
|
||||
|
||||
# Get message count
|
||||
message_count_query = f"""
|
||||
@@ -190,46 +191,42 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
|
||||
message_map = {row[0][:17]: row[1] or row[2] for row in cursor2.fetchall() if row[0]}
|
||||
|
||||
# Process each message
|
||||
i = 0
|
||||
content = c.fetchone()
|
||||
while content is not None:
|
||||
contact_id = content["ZCONTACTJID"]
|
||||
message_pk = content["Z_PK"]
|
||||
is_group_message = content["ZGROUPINFO"] is not None
|
||||
with tqdm(total=total_row_number, desc="Processing messages", unit="msg", leave=False) as pbar:
|
||||
while (content := c.fetchone()) is not None:
|
||||
contact_id = content["ZCONTACTJID"]
|
||||
message_pk = content["Z_PK"]
|
||||
is_group_message = content["ZGROUPINFO"] is not None
|
||||
|
||||
# Ensure chat exists
|
||||
if contact_id not in data:
|
||||
current_chat = data.add_chat(contact_id, ChatStore(Device.IOS))
|
||||
process_contact_avatars(current_chat, media_folder, contact_id)
|
||||
else:
|
||||
current_chat = data.get_chat(contact_id)
|
||||
# Ensure chat exists
|
||||
if contact_id not in data:
|
||||
current_chat = data.add_chat(contact_id, ChatStore(Device.IOS))
|
||||
process_contact_avatars(current_chat, media_folder, contact_id)
|
||||
else:
|
||||
current_chat = data.get_chat(contact_id)
|
||||
|
||||
# Create message object
|
||||
ts = APPLE_TIME + content["ZMESSAGEDATE"]
|
||||
message = Message(
|
||||
from_me=content["ZISFROMME"],
|
||||
timestamp=ts,
|
||||
time=ts,
|
||||
key_id=content["ZSTANZAID"][:17],
|
||||
timezone_offset=timezone_offset if timezone_offset else CURRENT_TZ_OFFSET,
|
||||
message_type=content["ZMESSAGETYPE"],
|
||||
received_timestamp=APPLE_TIME + content["ZSENTDATE"] if content["ZSENTDATE"] else None,
|
||||
read_timestamp=None # TODO: Add timestamp
|
||||
)
|
||||
# Create message object
|
||||
ts = APPLE_TIME + content["ZMESSAGEDATE"]
|
||||
message = Message(
|
||||
from_me=content["ZISFROMME"],
|
||||
timestamp=ts,
|
||||
time=ts,
|
||||
key_id=content["ZSTANZAID"][:17],
|
||||
timezone_offset=timezone_offset,
|
||||
message_type=content["ZMESSAGETYPE"],
|
||||
received_timestamp=APPLE_TIME + content["ZSENTDATE"] if content["ZSENTDATE"] else None,
|
||||
read_timestamp=None # TODO: Add timestamp
|
||||
)
|
||||
|
||||
# Process message data
|
||||
invalid = process_message_data(message, content, is_group_message, data, message_map, no_reply)
|
||||
# Process message data
|
||||
invalid = process_message_data(message, content, is_group_message, data, message_map, no_reply)
|
||||
|
||||
# Add valid messages to chat
|
||||
if not invalid:
|
||||
current_chat.add_message(message_pk, message)
|
||||
# Add valid messages to chat
|
||||
if not invalid:
|
||||
current_chat.add_message(message_pk, message)
|
||||
|
||||
# Update progress
|
||||
i += 1
|
||||
if i % 1000 == 0:
|
||||
logger.info(f"Processing messages...({i}/{total_row_number})\r")
|
||||
content = c.fetchone()
|
||||
logger.info(f"Processed {total_row_number} messages{CLEAR_LINE}")
|
||||
pbar.update(1)
|
||||
total_time = pbar.format_dict['elapsed']
|
||||
logger.info(f"Processed {total_row_number} messages in {convert_time_unit(total_time)}{CLEAR_LINE}")
|
||||
|
||||
|
||||
def process_message_data(message, content, is_group_message, data, message_map, no_reply):
|
||||
@@ -315,7 +312,7 @@ def process_message_text(message, content):
|
||||
message.data = msg
|
||||
|
||||
|
||||
def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=False):
|
||||
def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=False, fix_dot_files=False):
|
||||
"""Process media files from WhatsApp messages."""
|
||||
c = db.cursor()
|
||||
|
||||
@@ -371,20 +368,15 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa
|
||||
|
||||
# Process each media item
|
||||
mime = MimeTypes()
|
||||
i = 0
|
||||
content = c.fetchone()
|
||||
while content is not None:
|
||||
process_media_item(content, data, media_folder, mime, separate_media)
|
||||
|
||||
# Update progress
|
||||
i += 1
|
||||
if i % 100 == 0:
|
||||
logger.info(f"Processing media...({i}/{total_row_number})\r")
|
||||
content = c.fetchone()
|
||||
logger.info(f"Processed {total_row_number} media{CLEAR_LINE}")
|
||||
with tqdm(total=total_row_number, desc="Processing media", unit="media", leave=False) as pbar:
|
||||
while (content := c.fetchone()) is not None:
|
||||
process_media_item(content, data, media_folder, mime, separate_media, fix_dot_files)
|
||||
pbar.update(1)
|
||||
total_time = pbar.format_dict['elapsed']
|
||||
logger.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}{CLEAR_LINE}")
|
||||
|
||||
|
||||
def process_media_item(content, data, media_folder, mime, separate_media):
|
||||
def process_media_item(content, data, media_folder, mime, separate_media, fix_dot_files=False):
|
||||
"""Process a single media item."""
|
||||
file_path = f"{media_folder}/Message/{content['ZMEDIALOCALPATH']}"
|
||||
current_chat = data.get_chat(content["ZCONTACTJID"])
|
||||
@@ -395,14 +387,22 @@ def process_media_item(content, data, media_folder, mime, separate_media):
|
||||
current_chat.media_base = media_folder + "/"
|
||||
|
||||
if os.path.isfile(file_path):
|
||||
message.data = '/'.join(file_path.split("/")[1:])
|
||||
|
||||
# Set MIME type
|
||||
if content["ZVCARDSTRING"] is None:
|
||||
guess = mime.guess_type(file_path)[0]
|
||||
message.mime = guess if guess is not None else "application/octet-stream"
|
||||
else:
|
||||
message.mime = content["ZVCARDSTRING"]
|
||||
|
||||
if fix_dot_files and file_path.endswith("."):
|
||||
extension = mime.guess_extension(message.mime)
|
||||
if message.mime == "application/octet-stream" or not extension:
|
||||
new_file_path = file_path[:-1]
|
||||
else:
|
||||
extension = mime.guess_extension(message.mime)
|
||||
new_file_path = file_path[:-1] + extension
|
||||
os.rename(file_path, new_file_path)
|
||||
file_path = new_file_path
|
||||
|
||||
# Handle separate media option
|
||||
if separate_media:
|
||||
@@ -413,7 +413,9 @@ def process_media_item(content, data, media_folder, mime, separate_media):
|
||||
Path(new_folder).mkdir(parents=True, exist_ok=True)
|
||||
new_path = os.path.join(new_folder, current_filename)
|
||||
shutil.copy2(file_path, new_path)
|
||||
message.data = '/'.join(new_path.split("\\")[1:])
|
||||
message.data = '/'.join(new_path.split("/")[1:])
|
||||
else:
|
||||
message.data = '/'.join(file_path.split("/")[1:])
|
||||
else:
|
||||
# Handle missing media
|
||||
message.data = "The media is missing"
|
||||
@@ -467,10 +469,12 @@ def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty):
|
||||
Path(path).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Process each vCard
|
||||
for index, content in enumerate(contents):
|
||||
process_vcard_item(content, path, data)
|
||||
logger.info(f"Processing vCards...({index + 1}/{total_row_number})\r")
|
||||
logger.info(f"Processed {total_row_number} vCards{CLEAR_LINE}")
|
||||
with tqdm(total=total_row_number, desc="Processing vCards", unit="vcard", leave=False) as pbar:
|
||||
for content in contents:
|
||||
process_vcard_item(content, path, data)
|
||||
pbar.update(1)
|
||||
total_time = pbar.format_dict['elapsed']
|
||||
logger.info(f"Processed {total_row_number} vCards in {convert_time_unit(total_time)}{CLEAR_LINE}")
|
||||
|
||||
|
||||
def process_vcard_item(content, path, data):
|
||||
@@ -530,8 +534,6 @@ def calls(db, data, timezone_offset, filter_chat):
|
||||
if total_row_number == 0:
|
||||
return
|
||||
|
||||
logger.info(f"Processed {total_row_number} calls{CLEAR_LINE}\n")
|
||||
|
||||
# Fetch call records
|
||||
calls_query = f"""
|
||||
SELECT ZCALLIDSTRING,
|
||||
@@ -556,14 +558,15 @@ def calls(db, data, timezone_offset, filter_chat):
|
||||
# Create calls chat
|
||||
chat = ChatStore(Device.ANDROID, "WhatsApp Calls")
|
||||
|
||||
# Process each call
|
||||
content = c.fetchone()
|
||||
while content is not None:
|
||||
process_call_record(content, chat, data, timezone_offset)
|
||||
content = c.fetchone()
|
||||
with tqdm(total=total_row_number, desc="Processing calls", unit="call", leave=False) as pbar:
|
||||
while (content := c.fetchone()) is not None:
|
||||
process_call_record(content, chat, data, timezone_offset)
|
||||
pbar.update(1)
|
||||
total_time = pbar.format_dict['elapsed']
|
||||
|
||||
# Add calls chat to data
|
||||
data.add_chat("000000000000000", chat)
|
||||
logger.info(f"Processed {total_row_number} calls in {convert_time_unit(total_time)}{CLEAR_LINE}")
|
||||
|
||||
|
||||
def process_call_record(content, chat, data, timezone_offset):
|
||||
@@ -574,7 +577,7 @@ def process_call_record(content, chat, data, timezone_offset):
|
||||
timestamp=ts,
|
||||
time=ts,
|
||||
key_id=content["ZCALLIDSTRING"],
|
||||
timezone_offset=timezone_offset if timezone_offset else CURRENT_TZ_OFFSET
|
||||
timezone_offset=timezone_offset
|
||||
)
|
||||
|
||||
# Set sender info
|
||||
|
||||
@@ -6,7 +6,9 @@ import sqlite3
|
||||
import os
|
||||
import getpass
|
||||
from sys import exit, platform as osname
|
||||
from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, WhatsAppIdentifier
|
||||
import sys
|
||||
from tqdm import tqdm
|
||||
from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, WhatsAppIdentifier, convert_time_unit
|
||||
from Whatsapp_Chat_Exporter.bplist import BPListReader
|
||||
try:
|
||||
from iphone_backup_decrypt import EncryptedBackup, RelativePath
|
||||
@@ -79,6 +81,8 @@ class BackupExtractor:
|
||||
|
||||
logger.info(f"Encryption detected on the backup!{CLEAR_LINE}")
|
||||
password = getpass.getpass("Enter the password for the backup:")
|
||||
sys.stdout.write("\033[F\033[K")
|
||||
sys.stdout.flush()
|
||||
self._decrypt_backup(password)
|
||||
self._extract_decrypted_files()
|
||||
|
||||
@@ -89,7 +93,7 @@ class BackupExtractor:
|
||||
Args:
|
||||
password (str): The password for the encrypted backup.
|
||||
"""
|
||||
logger.info(f"Trying to decrypt the iOS backup...{CLEAR_LINE}")
|
||||
logger.info(f"Trying to open the iOS backup...{CLEAR_LINE}")
|
||||
self.backup = EncryptedBackup(
|
||||
backup_directory=self.base_dir,
|
||||
passphrase=password,
|
||||
@@ -97,7 +101,7 @@ class BackupExtractor:
|
||||
check_same_thread=False,
|
||||
decrypt_chunk_size=self.decrypt_chunk_size,
|
||||
)
|
||||
logger.info(f"iOS backup decrypted successfully{CLEAR_LINE}")
|
||||
logger.info(f"iOS backup is opened successfully{CLEAR_LINE}")
|
||||
logger.info("Decrypting WhatsApp database...\r")
|
||||
try:
|
||||
self.backup.extract_file(
|
||||
@@ -130,9 +134,12 @@ class BackupExtractor:
|
||||
|
||||
def _extract_decrypted_files(self):
|
||||
"""Extract all WhatsApp files after decryption"""
|
||||
pbar = tqdm(desc="Decrypting and extracting files", unit="file", leave=False)
|
||||
def extract_progress_handler(file_id, domain, relative_path, n, total_files):
|
||||
if n % 100 == 0:
|
||||
logger.info(f"Decrypting and extracting files...({n}/{total_files})\r")
|
||||
if pbar.total is None:
|
||||
pbar.total = total_files
|
||||
pbar.n = n
|
||||
pbar.refresh()
|
||||
return True
|
||||
|
||||
self.backup.extract_files(
|
||||
@@ -141,7 +148,9 @@ class BackupExtractor:
|
||||
preserve_folders=True,
|
||||
filter_callback=extract_progress_handler
|
||||
)
|
||||
logger.info(f"All required files are decrypted and extracted.{CLEAR_LINE}")
|
||||
total_time = pbar.format_dict['elapsed']
|
||||
pbar.close()
|
||||
logger.info(f"All required files are decrypted and extracted in {convert_time_unit(total_time)}{CLEAR_LINE}")
|
||||
|
||||
def _extract_unencrypted_backup(self):
|
||||
"""
|
||||
@@ -192,7 +201,6 @@ class BackupExtractor:
|
||||
c = manifest.cursor()
|
||||
c.execute(f"SELECT count() FROM Files WHERE domain = '{_wts_id}'")
|
||||
total_row_number = c.fetchone()[0]
|
||||
logger.info(f"Extracting WhatsApp files...(0/{total_row_number})\r")
|
||||
c.execute(
|
||||
f"""
|
||||
SELECT fileID, relativePath, flags, file AS metadata,
|
||||
@@ -205,33 +213,30 @@ class BackupExtractor:
|
||||
if not os.path.isdir(_wts_id):
|
||||
os.mkdir(_wts_id)
|
||||
|
||||
row = c.fetchone()
|
||||
while row is not None:
|
||||
if not row["relativePath"]: # Skip empty relative paths
|
||||
row = c.fetchone()
|
||||
continue
|
||||
with tqdm(total=total_row_number, desc="Extracting WhatsApp files", unit="file", leave=False) as pbar:
|
||||
while (row := c.fetchone()) is not None:
|
||||
if not row["relativePath"]: # Skip empty relative paths
|
||||
continue
|
||||
|
||||
destination = os.path.join(_wts_id, row["relativePath"])
|
||||
hashes = row["fileID"]
|
||||
folder = hashes[:2]
|
||||
flags = row["flags"]
|
||||
destination = os.path.join(_wts_id, row["relativePath"])
|
||||
hashes = row["fileID"]
|
||||
folder = hashes[:2]
|
||||
flags = row["flags"]
|
||||
|
||||
if flags == 2: # Directory
|
||||
try:
|
||||
os.mkdir(destination)
|
||||
except FileExistsError:
|
||||
pass
|
||||
elif flags == 1: # File
|
||||
shutil.copyfile(os.path.join(self.base_dir, folder, hashes), destination)
|
||||
metadata = BPListReader(row["metadata"]).parse()
|
||||
creation = metadata["$objects"][1]["Birth"]
|
||||
modification = metadata["$objects"][1]["LastModified"]
|
||||
os.utime(destination, (modification, modification))
|
||||
|
||||
if row["_index"] % 100 == 0:
|
||||
logger.info(f"Extracting WhatsApp files...({row['_index']}/{total_row_number})\r")
|
||||
row = c.fetchone()
|
||||
logger.info(f"Extracted WhatsApp files...({total_row_number}){CLEAR_LINE}")
|
||||
if flags == 2: # Directory
|
||||
try:
|
||||
os.mkdir(destination)
|
||||
except FileExistsError:
|
||||
pass
|
||||
elif flags == 1: # File
|
||||
shutil.copyfile(os.path.join(self.base_dir, folder, hashes), destination)
|
||||
metadata = BPListReader(row["metadata"]).parse()
|
||||
_creation = metadata["$objects"][1]["Birth"]
|
||||
modification = metadata["$objects"][1]["LastModified"]
|
||||
os.utime(destination, (modification, modification))
|
||||
pbar.update(1)
|
||||
total_time = pbar.format_dict['elapsed']
|
||||
logger.info(f"Extracted {total_row_number} WhatsApp files in {convert_time_unit(total_time)}{CLEAR_LINE}")
|
||||
|
||||
|
||||
def extract_media(base_dir, identifiers, decrypt_chunk_size):
|
||||
|
||||
@@ -5,15 +5,15 @@ import json
|
||||
import os
|
||||
import unicodedata
|
||||
import re
|
||||
import string
|
||||
import math
|
||||
import shutil
|
||||
from bleach import clean as sanitize
|
||||
from markupsafe import Markup
|
||||
from datetime import datetime, timedelta
|
||||
from enum import IntEnum
|
||||
from tqdm import tqdm
|
||||
from Whatsapp_Chat_Exporter.data_model import ChatCollection, ChatStore, Timing
|
||||
from typing import Dict, List, Optional, Tuple, Union
|
||||
from typing import Dict, List, Optional, Tuple, Union, Any
|
||||
try:
|
||||
from enum import StrEnum, IntEnum
|
||||
except ImportError:
|
||||
@@ -248,95 +248,240 @@ def import_from_json(json_file: str, data: ChatCollection):
|
||||
with open(json_file, "r") as f:
|
||||
temp_data = json.loads(f.read())
|
||||
total_row_number = len(tuple(temp_data.keys()))
|
||||
logger.info(f"Importing chats from JSON...(0/{total_row_number})\r")
|
||||
for index, (jid, chat_data) in enumerate(temp_data.items()):
|
||||
chat = ChatStore.from_json(chat_data)
|
||||
data.add_chat(jid, chat)
|
||||
logger.info(
|
||||
f"Importing chats from JSON...({index + 1}/{total_row_number})\r")
|
||||
logger.info(f"Imported {total_row_number} chats from JSON{CLEAR_LINE}")
|
||||
with tqdm(total=total_row_number, desc="Importing chats from JSON", unit="chat", leave=False) as pbar:
|
||||
for jid, chat_data in temp_data.items():
|
||||
chat = ChatStore.from_json(chat_data)
|
||||
data.add_chat(jid, chat)
|
||||
pbar.update(1)
|
||||
total_time = pbar.format_dict['elapsed']
|
||||
logger.info(f"Imported {total_row_number} chats from JSON in {convert_time_unit(total_time)}{CLEAR_LINE}")
|
||||
|
||||
|
||||
def incremental_merge(source_dir: str, target_dir: str, media_dir: str, pretty_print_json: int, avoid_encoding_json: bool):
|
||||
"""Merges JSON files from the source directory into the target directory.
|
||||
class IncrementalMerger:
|
||||
"""Handles incremental merging of WhatsApp chat exports."""
|
||||
|
||||
def __init__(self, pretty_print_json: int, avoid_encoding_json: bool):
|
||||
"""Initialize the merger with JSON formatting options.
|
||||
|
||||
Args:
|
||||
pretty_print_json: JSON indentation level.
|
||||
avoid_encoding_json: Whether to avoid ASCII encoding.
|
||||
"""
|
||||
self.pretty_print_json = pretty_print_json
|
||||
self.avoid_encoding_json = avoid_encoding_json
|
||||
|
||||
def _get_json_files(self, source_dir: str) -> List[str]:
|
||||
"""Get list of JSON files from source directory.
|
||||
|
||||
Args:
|
||||
source_dir: Path to the source directory.
|
||||
|
||||
Returns:
|
||||
List of JSON filenames.
|
||||
|
||||
Raises:
|
||||
SystemExit: If no JSON files are found.
|
||||
"""
|
||||
json_files = [f for f in os.listdir(source_dir) if f.endswith('.json')]
|
||||
if not json_files:
|
||||
logger.error("No JSON files found in the source directory.")
|
||||
raise SystemExit(1)
|
||||
|
||||
logger.info("JSON files found:", json_files)
|
||||
return json_files
|
||||
|
||||
Args:
|
||||
source_dir (str): The path to the source directory containing JSON files.
|
||||
target_dir (str): The path to the target directory to merge into.
|
||||
media_dir (str): The path to the media directory.
|
||||
"""
|
||||
json_files = [f for f in os.listdir(source_dir) if f.endswith('.json')]
|
||||
if not json_files:
|
||||
logger.error("No JSON files found in the source directory.")
|
||||
return
|
||||
def _copy_new_file(self, source_path: str, target_path: str, target_dir: str, json_file: str) -> None:
|
||||
"""Copy a new JSON file to target directory.
|
||||
|
||||
Args:
|
||||
source_path: Path to source file.
|
||||
target_path: Path to target file.
|
||||
target_dir: Target directory path.
|
||||
json_file: Name of the JSON file.
|
||||
"""
|
||||
logger.info(f"Copying '{json_file}' to target directory...")
|
||||
os.makedirs(target_dir, exist_ok=True)
|
||||
shutil.copy2(source_path, target_path)
|
||||
|
||||
logger.info("JSON files found:", json_files)
|
||||
def _load_chat_data(self, file_path: str) -> Dict[str, Any]:
|
||||
"""Load JSON data from file.
|
||||
|
||||
Args:
|
||||
file_path: Path to JSON file.
|
||||
|
||||
Returns:
|
||||
Loaded JSON data.
|
||||
"""
|
||||
with open(file_path, 'r') as file:
|
||||
return json.load(file)
|
||||
|
||||
for json_file in json_files:
|
||||
source_path = os.path.join(source_dir, json_file)
|
||||
target_path = os.path.join(target_dir, json_file)
|
||||
def _parse_chats_from_json(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Parse JSON data into ChatStore objects.
|
||||
|
||||
Args:
|
||||
data: Raw JSON data.
|
||||
|
||||
Returns:
|
||||
Dictionary of JID to ChatStore objects.
|
||||
"""
|
||||
return {jid: ChatStore.from_json(chat) for jid, chat in data.items()}
|
||||
|
||||
if not os.path.exists(target_path):
|
||||
logger.info(f"Copying '{json_file}' to target directory...")
|
||||
os.makedirs(target_dir, exist_ok=True)
|
||||
shutil.copy2(source_path, target_path)
|
||||
def _merge_chat_stores(self, source_chats: Dict[str, Any], target_chats: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Merge source chats into target chats.
|
||||
|
||||
Args:
|
||||
source_chats: Source ChatStore objects.
|
||||
target_chats: Target ChatStore objects.
|
||||
|
||||
Returns:
|
||||
Merged ChatStore objects.
|
||||
"""
|
||||
for jid, chat in source_chats.items():
|
||||
if jid in target_chats:
|
||||
target_chats[jid].merge_with(chat)
|
||||
else:
|
||||
target_chats[jid] = chat
|
||||
return target_chats
|
||||
|
||||
def _serialize_chats(self, chats: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Serialize ChatStore objects to JSON format.
|
||||
|
||||
Args:
|
||||
chats: Dictionary of ChatStore objects.
|
||||
|
||||
Returns:
|
||||
Serialized JSON data.
|
||||
"""
|
||||
return {jid: chat.to_json() for jid, chat in chats.items()}
|
||||
|
||||
def _has_changes(self, merged_data: Dict[str, Any], original_data: Dict[str, Any]) -> bool:
|
||||
"""Check if merged data differs from original data.
|
||||
|
||||
Args:
|
||||
merged_data: Merged JSON data.
|
||||
original_data: Original JSON data.
|
||||
|
||||
Returns:
|
||||
True if changes detected, False otherwise.
|
||||
"""
|
||||
return json.dumps(merged_data, sort_keys=True) != json.dumps(original_data, sort_keys=True)
|
||||
|
||||
def _save_merged_data(self, target_path: str, merged_data: Dict[str, Any]) -> None:
|
||||
"""Save merged data to target file.
|
||||
|
||||
Args:
|
||||
target_path: Path to target file.
|
||||
merged_data: Merged JSON data.
|
||||
"""
|
||||
with open(target_path, 'w') as merged_file:
|
||||
json.dump(
|
||||
merged_data,
|
||||
merged_file,
|
||||
indent=self.pretty_print_json,
|
||||
ensure_ascii=not self.avoid_encoding_json,
|
||||
)
|
||||
|
||||
def _merge_json_file(self, source_path: str, target_path: str, json_file: str) -> None:
|
||||
"""Merge a single JSON file.
|
||||
|
||||
Args:
|
||||
source_path: Path to source file.
|
||||
target_path: Path to target file.
|
||||
json_file: Name of the JSON file.
|
||||
"""
|
||||
logger.info(f"Merging '{json_file}' with existing file in target directory...")
|
||||
|
||||
source_data = self._load_chat_data(source_path)
|
||||
target_data = self._load_chat_data(target_path)
|
||||
|
||||
source_chats = self._parse_chats_from_json(source_data)
|
||||
target_chats = self._parse_chats_from_json(target_data)
|
||||
|
||||
merged_chats = self._merge_chat_stores(source_chats, target_chats)
|
||||
merged_data = self._serialize_chats(merged_chats)
|
||||
|
||||
if self._has_changes(merged_data, target_data):
|
||||
logger.info(f"Changes detected in '{json_file}', updating target file...")
|
||||
self._save_merged_data(target_path, merged_data)
|
||||
else:
|
||||
logger.info(
|
||||
f"Merging '{json_file}' with existing file in target directory...")
|
||||
with open(source_path, 'r') as src_file, open(target_path, 'r') as tgt_file:
|
||||
source_data = json.load(src_file)
|
||||
target_data = json.load(tgt_file)
|
||||
logger.info(f"No changes detected in '{json_file}', skipping update.")
|
||||
|
||||
# Parse JSON into ChatStore objects using from_json()
|
||||
source_chats = {jid: ChatStore.from_json(
|
||||
chat) for jid, chat in source_data.items()}
|
||||
target_chats = {jid: ChatStore.from_json(
|
||||
chat) for jid, chat in target_data.items()}
|
||||
def _should_copy_media_file(self, source_file: str, target_file: str) -> bool:
|
||||
"""Check if media file should be copied.
|
||||
|
||||
Args:
|
||||
source_file: Path to source media file.
|
||||
target_file: Path to target media file.
|
||||
|
||||
Returns:
|
||||
True if file should be copied, False otherwise.
|
||||
"""
|
||||
return not os.path.exists(target_file) or os.path.getmtime(source_file) > os.path.getmtime(target_file)
|
||||
|
||||
# Merge chats using merge_with()
|
||||
for jid, chat in source_chats.items():
|
||||
if jid in target_chats:
|
||||
target_chats[jid].merge_with(chat)
|
||||
else:
|
||||
target_chats[jid] = chat
|
||||
|
||||
# Serialize merged data
|
||||
merged_data = {jid: chat.to_json()
|
||||
for jid, chat in target_chats.items()}
|
||||
|
||||
# Check if the merged data differs from the original target data
|
||||
if json.dumps(merged_data, sort_keys=True) != json.dumps(target_data, sort_keys=True):
|
||||
logger.info(
|
||||
f"Changes detected in '{json_file}', updating target file...")
|
||||
with open(target_path, 'w') as merged_file:
|
||||
json.dump(
|
||||
merged_data,
|
||||
merged_file,
|
||||
indent=pretty_print_json,
|
||||
ensure_ascii=not avoid_encoding_json,
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
f"No changes detected in '{json_file}', skipping update.")
|
||||
|
||||
# Merge media directories
|
||||
source_media_path = os.path.join(source_dir, media_dir)
|
||||
target_media_path = os.path.join(target_dir, media_dir)
|
||||
logger.info(
|
||||
f"Merging media directories. Source: {source_media_path}, target: {target_media_path}")
|
||||
if os.path.exists(source_media_path):
|
||||
def _merge_media_directories(self, source_dir: str, target_dir: str, media_dir: str) -> None:
|
||||
"""Merge media directories from source to target.
|
||||
|
||||
Args:
|
||||
source_dir: Source directory path.
|
||||
target_dir: Target directory path.
|
||||
media_dir: Media directory name.
|
||||
"""
|
||||
source_media_path = os.path.join(source_dir, media_dir)
|
||||
target_media_path = os.path.join(target_dir, media_dir)
|
||||
|
||||
logger.info(f"Merging media directories. Source: {source_media_path}, target: {target_media_path}")
|
||||
|
||||
if not os.path.exists(source_media_path):
|
||||
return
|
||||
|
||||
for root, _, files in os.walk(source_media_path):
|
||||
relative_path = os.path.relpath(root, source_media_path)
|
||||
target_root = os.path.join(target_media_path, relative_path)
|
||||
os.makedirs(target_root, exist_ok=True)
|
||||
|
||||
for file in files:
|
||||
source_file = os.path.join(root, file)
|
||||
target_file = os.path.join(target_root, file)
|
||||
# we only copy if the file doesn't exist in the target or if the source is newer
|
||||
if not os.path.exists(target_file) or os.path.getmtime(source_file) > os.path.getmtime(target_file):
|
||||
|
||||
if self._should_copy_media_file(source_file, target_file):
|
||||
logger.info(f"Copying '{source_file}' to '{target_file}'...")
|
||||
shutil.copy2(source_file, target_file)
|
||||
|
||||
def merge(self, source_dir: str, target_dir: str, media_dir: str) -> None:
|
||||
"""Merge JSON files and media from source to target directory.
|
||||
|
||||
Args:
|
||||
source_dir: The path to the source directory containing JSON files.
|
||||
target_dir: The path to the target directory to merge into.
|
||||
media_dir: The path to the media directory.
|
||||
"""
|
||||
json_files = self._get_json_files(source_dir)
|
||||
|
||||
for json_file in json_files:
|
||||
source_path = os.path.join(source_dir, json_file)
|
||||
target_path = os.path.join(target_dir, json_file)
|
||||
|
||||
if not os.path.exists(target_path):
|
||||
self._copy_new_file(source_path, target_path, target_dir, json_file)
|
||||
else:
|
||||
self._merge_json_file(source_path, target_path, json_file)
|
||||
|
||||
self._merge_media_directories(source_dir, target_dir, media_dir)
|
||||
|
||||
|
||||
def incremental_merge(source_dir: str, target_dir: str, media_dir: str, pretty_print_json: int, avoid_encoding_json: bool) -> None:
|
||||
"""Wrapper for merging JSON files from the source directory into the target directory.
|
||||
|
||||
Args:
|
||||
source_dir: The path to the source directory containing JSON files.
|
||||
target_dir: The path to the target directory to merge into.
|
||||
media_dir: The path to the media directory.
|
||||
pretty_print_json: JSON indentation level.
|
||||
avoid_encoding_json: Whether to avoid ASCII encoding.
|
||||
"""
|
||||
merger = IncrementalMerger(pretty_print_json, avoid_encoding_json)
|
||||
merger.merge(source_dir, target_dir, media_dir)
|
||||
|
||||
|
||||
def get_file_name(contact: str, chat: ChatStore) -> Tuple[str, str]:
|
||||
"""Generates a sanitized filename and contact name for a chat.
|
||||
@@ -384,9 +529,41 @@ def get_cond_for_empty(enable: bool, jid_field: str, broadcast_field: str) -> st
|
||||
return f"AND (chat.hidden=0 OR {jid_field}='status@broadcast' OR {broadcast_field}>0)" if enable else ""
|
||||
|
||||
|
||||
def get_chat_condition(filter: Optional[List[str]], include: bool, columns: List[str], jid: Optional[str] = None, platform: Optional[str] = None) -> str:
|
||||
def _get_group_condition(jid: str, platform: str) -> str:
|
||||
"""Generate platform-specific group identification condition.
|
||||
|
||||
Args:
|
||||
jid: The JID column name.
|
||||
platform: The platform ("android" or "ios").
|
||||
|
||||
Returns:
|
||||
SQL condition string for group identification.
|
||||
|
||||
Raises:
|
||||
ValueError: If platform is not supported.
|
||||
"""
|
||||
if platform == "android":
|
||||
return f"{jid}.type == 1"
|
||||
elif platform == "ios":
|
||||
return f"{jid} IS NOT NULL"
|
||||
else:
|
||||
raise ValueError(
|
||||
"Only android and ios are supported for argument platform if jid is not None")
|
||||
|
||||
|
||||
def get_chat_condition(
|
||||
filter: Optional[List[str]],
|
||||
include: bool,
|
||||
columns: List[str],
|
||||
jid: Optional[str] = None,
|
||||
platform: Optional[str] = None
|
||||
) -> str:
|
||||
"""Generates a SQL condition for filtering chats based on inclusion or exclusion criteria.
|
||||
|
||||
SQL injection risks from chat filters were evaluated during development and deemed negligible
|
||||
due to the tool's offline, trusted-input model (user running this tool on WhatsApp
|
||||
backups/databases on their own device).
|
||||
|
||||
Args:
|
||||
filter: A list of phone numbers to include or exclude.
|
||||
include: True to include chats that match the filter, False to exclude them.
|
||||
@@ -400,35 +577,39 @@ def get_chat_condition(filter: Optional[List[str]], include: bool, columns: List
|
||||
Raises:
|
||||
ValueError: If the column count is invalid or an unsupported platform is provided.
|
||||
"""
|
||||
if filter is not None:
|
||||
conditions = []
|
||||
if len(columns) < 2 and jid is not None:
|
||||
raise ValueError(
|
||||
"There must be at least two elements in argument columns if jid is not None")
|
||||
if jid is not None:
|
||||
if platform == "android":
|
||||
is_group = f"{jid}.type == 1"
|
||||
elif platform == "ios":
|
||||
is_group = f"{jid} IS NOT NULL"
|
||||
else:
|
||||
raise ValueError(
|
||||
"Only android and ios are supported for argument platform if jid is not None")
|
||||
for index, chat in enumerate(filter):
|
||||
if include:
|
||||
conditions.append(
|
||||
f"{' OR' if index > 0 else ''} {columns[0]} LIKE '%{chat}%'")
|
||||
if len(columns) > 1:
|
||||
conditions.append(
|
||||
f" OR ({columns[1]} LIKE '%{chat}%' AND {is_group})")
|
||||
else:
|
||||
conditions.append(
|
||||
f"{' AND' if index > 0 else ''} {columns[0]} NOT LIKE '%{chat}%'")
|
||||
if len(columns) > 1:
|
||||
conditions.append(
|
||||
f" AND ({columns[1]} NOT LIKE '%{chat}%' AND {is_group})")
|
||||
return f"AND ({' '.join(conditions)})"
|
||||
else:
|
||||
if not filter:
|
||||
return ""
|
||||
|
||||
if jid is not None and len(columns) < 2:
|
||||
raise ValueError(
|
||||
"There must be at least two elements in argument columns if jid is not None")
|
||||
|
||||
# Get group condition if needed
|
||||
is_group_condition = None
|
||||
if jid is not None:
|
||||
is_group_condition = _get_group_condition(jid, platform)
|
||||
|
||||
# Build conditions for each chat filter
|
||||
conditions = []
|
||||
for index, chat in enumerate(filter):
|
||||
# Add connector for subsequent conditions (with double space)
|
||||
connector = " OR" if include else " AND"
|
||||
prefix = connector if index > 0 else ""
|
||||
|
||||
# Primary column condition
|
||||
operator = "LIKE" if include else "NOT LIKE"
|
||||
conditions.append(f"{prefix} {columns[0]} {operator} '%{chat}%'")
|
||||
|
||||
# Secondary column condition for groups
|
||||
if len(columns) > 1 and is_group_condition:
|
||||
if include:
|
||||
group_condition = f" OR ({columns[1]} {operator} '%{chat}%' AND {is_group_condition})"
|
||||
else:
|
||||
group_condition = f" AND ({columns[1]} {operator} '%{chat}%' AND {is_group_condition})"
|
||||
conditions.append(group_condition)
|
||||
|
||||
combined_conditions = "".join(conditions)
|
||||
return f"AND ({combined_conditions})"
|
||||
|
||||
|
||||
# Android Specific
|
||||
@@ -439,7 +620,7 @@ CRYPT14_OFFSETS = (
|
||||
{"iv": 67, "db": 193},
|
||||
{"iv": 67, "db": 194},
|
||||
{"iv": 67, "db": 158},
|
||||
{"iv": 67, "db": 196}
|
||||
{"iv": 67, "db": 196},
|
||||
)
|
||||
|
||||
|
||||
@@ -534,7 +715,7 @@ def determine_metadata(content: sqlite3.Row, init_msg: Optional[str]) -> Optiona
|
||||
else:
|
||||
msg = "The security code in this chat changed"
|
||||
elif content["action_type"] == 58:
|
||||
msg = "You blocked this contact"
|
||||
msg = "You blocked/unblocked this contact"
|
||||
elif content["action_type"] == 67:
|
||||
return # (PM) this contact use secure service from Facebook???
|
||||
elif content["action_type"] == 69:
|
||||
@@ -572,6 +753,69 @@ def get_status_location(output_folder: str, offline_static: str) -> str:
|
||||
return w3css
|
||||
|
||||
|
||||
def check_jid_map(db: sqlite3.Connection) -> bool:
|
||||
"""
|
||||
Checks if the jid_map table exists in the database.
|
||||
|
||||
Args:
|
||||
db (sqlite3.Connection): The SQLite database connection.
|
||||
|
||||
Returns:
|
||||
bool: True if the jid_map table exists, False otherwise.
|
||||
"""
|
||||
cursor = db.cursor()
|
||||
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='jid_map'")
|
||||
return cursor.fetchone() is not None
|
||||
|
||||
|
||||
def get_jid_map_join(jid_map_exists: bool) -> str:
|
||||
"""
|
||||
Returns the SQL JOIN statements for jid_map table.
|
||||
"""
|
||||
if not jid_map_exists:
|
||||
return ""
|
||||
else:
|
||||
return """LEFT JOIN jid_map as jid_map_global
|
||||
ON chat.jid_row_id = jid_map_global.lid_row_id
|
||||
LEFT JOIN jid lid_global
|
||||
ON jid_map_global.jid_row_id = lid_global._id
|
||||
LEFT JOIN jid_map as jid_map_group
|
||||
ON message.sender_jid_row_id = jid_map_group.lid_row_id
|
||||
LEFT JOIN jid lid_group
|
||||
ON jid_map_group.jid_row_id = lid_group._id"""
|
||||
|
||||
def get_jid_map_selection(jid_map_exists: bool) -> tuple:
|
||||
"""
|
||||
Returns the SQL selection statements for jid_map table.
|
||||
"""
|
||||
if not jid_map_exists:
|
||||
return "jid_global.raw_string", "jid_group.raw_string"
|
||||
else:
|
||||
return (
|
||||
"COALESCE(lid_global.raw_string, jid_global.raw_string)",
|
||||
"COALESCE(lid_group.raw_string, jid_group.raw_string)"
|
||||
)
|
||||
|
||||
|
||||
def get_transcription_selection(db: sqlite3.Connection) -> str:
|
||||
"""
|
||||
Returns the SQL selection statement for transcription text based on the database schema.
|
||||
|
||||
Args:
|
||||
db (sqlite3.Connection): The SQLite database connection.
|
||||
Returns:
|
||||
str: The SQL selection statement for transcription.
|
||||
"""
|
||||
cursor = db.cursor()
|
||||
cursor.execute("PRAGMA table_info(message_media)")
|
||||
columns = [row[1] for row in cursor.fetchall()]
|
||||
|
||||
if "raw_transcription_text" in columns:
|
||||
return "message_media.raw_transcription_text AS transcription_text"
|
||||
else:
|
||||
return "NULL AS transcription_text"
|
||||
|
||||
|
||||
def setup_template(template: Optional[str], no_avatar: bool, experimental: bool = False) -> jinja2.Template:
|
||||
"""
|
||||
Sets up the Jinja2 template environment and loads the template.
|
||||
@@ -639,11 +883,17 @@ def get_from_string(msg: Dict, chat_id: str) -> str:
|
||||
|
||||
def get_chat_type(chat_id: str) -> str:
|
||||
"""Return the chat type based on the whatsapp id"""
|
||||
if chat_id.endswith("@s.whatsapp.net"):
|
||||
if chat_id == "000000000000000":
|
||||
return "calls"
|
||||
elif chat_id.endswith("@s.whatsapp.net"):
|
||||
return "personal_chat"
|
||||
if chat_id.endswith("@g.us"):
|
||||
elif chat_id.endswith("@g.us"):
|
||||
return "private_group"
|
||||
logger.warning("Unknown chat type for %s, defaulting to private_group", chat_id)
|
||||
elif chat_id == "status@broadcast":
|
||||
return "status_broadcast"
|
||||
elif chat_id.endswith("@broadcast"):
|
||||
return "broadcast_channel"
|
||||
logger.warning(f"Unknown chat type for {chat_id}, defaulting to private_group{CLEAR_LINE}")
|
||||
return "private_group"
|
||||
|
||||
|
||||
@@ -674,34 +924,35 @@ def telegram_json_format(jik: str, data: Dict, timezone_offset) -> Dict:
|
||||
except ValueError:
|
||||
# not a real chat: e.g. statusbroadcast
|
||||
chat_id = 0
|
||||
obj = {
|
||||
"name": data["name"] if data["name"] else jik,
|
||||
"type": get_chat_type(jik),
|
||||
"id": chat_id,
|
||||
"messages": [ {
|
||||
"id": int(msgId),
|
||||
"type": "message",
|
||||
"date": timing.format_timestamp(msg["timestamp"], "%Y-%m-%dT%H:%M:%S"),
|
||||
"date_unixtime": int(msg["timestamp"]),
|
||||
"from": get_from_string(msg, chat_id),
|
||||
"from_id": get_from_id(msg, chat_id),
|
||||
"reply_to_message_id": get_reply_id(data, msg["reply"]),
|
||||
"text": msg["data"],
|
||||
"text_entities": [
|
||||
{
|
||||
# TODO this will lose formatting and different types
|
||||
"type": "plain",
|
||||
"text": msg["data"],
|
||||
}
|
||||
],
|
||||
} for msgId, msg in data["messages"].items()]
|
||||
json_obj = {
|
||||
"name": data["name"] if data["name"] else jik,
|
||||
"type": get_chat_type(jik),
|
||||
"id": chat_id,
|
||||
"messages": [ {
|
||||
"id": int(msgId),
|
||||
"type": "message",
|
||||
"date": timing.format_timestamp(msg["timestamp"], "%Y-%m-%dT%H:%M:%S"),
|
||||
"date_unixtime": int(msg["timestamp"]),
|
||||
"from": get_from_string(msg, chat_id),
|
||||
"from_id": get_from_id(msg, chat_id),
|
||||
"reply_to_message_id": get_reply_id(data, msg["reply"]),
|
||||
"text": msg["data"],
|
||||
"text_entities": [
|
||||
{
|
||||
# TODO this will lose formatting and different types
|
||||
"type": "plain",
|
||||
"text": msg["data"],
|
||||
}
|
||||
],
|
||||
}
|
||||
for msgId, msg in data["messages"].items()]
|
||||
}
|
||||
# remove empty messages and replies
|
||||
for msg_id, msg in enumerate(obj["messages"]):
|
||||
for msg_id, msg in enumerate(json_obj["messages"]):
|
||||
if not msg["reply_to_message_id"]:
|
||||
del obj["messages"][msg_id]["reply_to_message_id"]
|
||||
obj["messages"] = [m for m in obj["messages"] if m["text"]]
|
||||
return obj
|
||||
del json_obj["messages"][msg_id]["reply_to_message_id"]
|
||||
json_obj["messages"] = [m for m in json_obj["messages"] if m["text"]]
|
||||
return json_obj
|
||||
|
||||
|
||||
class WhatsAppIdentifier(StrEnum):
|
||||
|
||||
@@ -127,6 +127,125 @@
|
||||
--tw-translate-x: -50%;
|
||||
transform: translate(var(--tw-translate-x), var(--tw-translate-y)) rotate(var(--tw-rotate)) skewX(var(--tw-skew-x)) skewY(var(--tw-skew-y)) scaleX(var(--tw-scale-x)) scaleY(var(--tw-scale-y));
|
||||
}
|
||||
|
||||
.status-indicator {
|
||||
display: inline-block;
|
||||
margin-left: 4px;
|
||||
font-size: 0.8em;
|
||||
color: #8c8c8c;
|
||||
}
|
||||
|
||||
.status-indicator.read {
|
||||
color: #34B7F1;
|
||||
}
|
||||
|
||||
.play-icon {
|
||||
width: 0;
|
||||
height: 0;
|
||||
border-left: 8px solid white;
|
||||
border-top: 5px solid transparent;
|
||||
border-bottom: 5px solid transparent;
|
||||
filter: drop-shadow(0 1px 2px rgba(0, 0, 0, 0.3));
|
||||
}
|
||||
|
||||
.speaker-icon {
|
||||
position: relative;
|
||||
width: 8px;
|
||||
height: 6px;
|
||||
background: #666;
|
||||
border-radius: 1px 0 0 1px;
|
||||
}
|
||||
|
||||
.speaker-icon::before {
|
||||
content: '';
|
||||
position: absolute;
|
||||
right: -4px;
|
||||
top: -1px;
|
||||
width: 0;
|
||||
height: 0;
|
||||
border-left: 4px solid #666;
|
||||
border-top: 4px solid transparent;
|
||||
border-bottom: 4px solid transparent;
|
||||
}
|
||||
|
||||
.speaker-icon::after {
|
||||
content: '';
|
||||
position: absolute;
|
||||
right: -8px;
|
||||
top: -3px;
|
||||
width: 8px;
|
||||
height: 12px;
|
||||
border: 2px solid #666;
|
||||
border-left: none;
|
||||
border-radius: 0 8px 8px 0;
|
||||
}
|
||||
|
||||
.search-icon {
|
||||
width: 20px;
|
||||
height: 20px;
|
||||
position: relative;
|
||||
display: inline-block;
|
||||
}
|
||||
|
||||
.search-icon::before {
|
||||
content: '';
|
||||
position: absolute;
|
||||
width: 12px;
|
||||
height: 12px;
|
||||
border: 2px solid #aebac1;
|
||||
border-radius: 50%;
|
||||
top: 2px;
|
||||
left: 2px;
|
||||
}
|
||||
|
||||
.search-icon::after {
|
||||
content: '';
|
||||
position: absolute;
|
||||
width: 2px;
|
||||
height: 6px;
|
||||
background: #aebac1;
|
||||
transform: rotate(45deg);
|
||||
top: 12px;
|
||||
left: 12px;
|
||||
}
|
||||
|
||||
.arrow-left {
|
||||
width: 0;
|
||||
height: 0;
|
||||
border-top: 6px solid transparent;
|
||||
border-bottom: 6px solid transparent;
|
||||
border-right: 8px solid #aebac1;
|
||||
display: inline-block;
|
||||
}
|
||||
|
||||
.arrow-right {
|
||||
width: 0;
|
||||
height: 0;
|
||||
border-top: 6px solid transparent;
|
||||
border-bottom: 6px solid transparent;
|
||||
border-left: 8px solid #aebac1;
|
||||
display: inline-block;
|
||||
}
|
||||
|
||||
.info-icon {
|
||||
width: 20px;
|
||||
height: 20px;
|
||||
border: 2px solid currentColor;
|
||||
border-radius: 50%;
|
||||
position: relative;
|
||||
display: inline-block;
|
||||
}
|
||||
|
||||
.info-icon::before {
|
||||
content: 'i';
|
||||
position: absolute;
|
||||
top: 50%;
|
||||
left: 50%;
|
||||
transform: translate(-50%, -50%);
|
||||
font-size: 12px;
|
||||
font-weight: bold;
|
||||
font-style: normal;
|
||||
}
|
||||
</style>
|
||||
<script>
|
||||
function search(event) {
|
||||
@@ -163,34 +282,24 @@
|
||||
</div>
|
||||
<div class="flex space-x-4">
|
||||
<!-- <button id="searchButton">
|
||||
<svg xmlns="http://www.w3.org/2000/svg" class="h-5 w-5 text-[#aebac1]" fill="none" viewBox="0 0 24 24" stroke="currentColor">
|
||||
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M21 21l-6-6m2-5a7 7 0 11-14 0 7 7 0 0114 0z" />
|
||||
</svg>
|
||||
</button> -->
|
||||
<!-- <svg xmlns="http://www.w3.org/2000/svg" class="h-5 w-5 text-[#aebac1]" fill="none" viewBox="0 0 24 24" stroke="currentColor">
|
||||
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M15 19l-7-7 7-7" />
|
||||
</svg> -->
|
||||
<span class="search-icon"></span>
|
||||
</button> -->
|
||||
<!-- <span class="arrow-left"></span> -->
|
||||
{% if previous %}
|
||||
<a href="./{{ previous }}" target="_self">
|
||||
<svg xmlns="http://www.w3.org/2000/svg" class="h-5 w-5 text-[#aebac1]" fill="none" viewBox="0 0 24 24" stroke="currentColor">
|
||||
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M15 5l-7 7 7 7" />
|
||||
</svg>
|
||||
<span class="arrow-left"></span>
|
||||
</a>
|
||||
{% endif %}
|
||||
{% if next %}
|
||||
<a href="./{{ next }}" target="_self">
|
||||
<svg xmlns="http://www.w3.org/2000/svg" class="h-5 w-5 text-[#aebac1]" fill="none" viewBox="0 0 24 24" stroke="currentColor">
|
||||
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M9 5l7 7-7 7" />
|
||||
</svg>
|
||||
<span class="arrow-right"></span>
|
||||
</a>
|
||||
{% endif %}
|
||||
</div>
|
||||
<!-- Search Input Overlay -->
|
||||
<div id="mainSearchInput" class="search-input absolute article top-0 bg-whatsapp-dark p-3 flex items-center space-x-3">
|
||||
<button id="closeMainSearch" class="text-[#aebac1]">
|
||||
<svg xmlns="http://www.w3.org/2000/svg" class="h-6 w-6" fill="none" viewBox="0 0 24 24" stroke="currentColor">
|
||||
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M15 19l-7-7 7-7" />
|
||||
</svg>
|
||||
<span class="arrow-left"></span>
|
||||
</button>
|
||||
<input type="text" placeholder="Search..." class="flex-1 bg-[#1f2c34] text-white rounded-lg px-3 py-1 focus:outline-none" id="mainHeaderSearchInput" onkeyup="search(event)">
|
||||
</div>
|
||||
@@ -230,18 +339,44 @@
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="bg-whatsapp-light rounded-lg p-2 max-w-[80%] shadow-sm">
|
||||
<div class="bg-whatsapp-light rounded-lg p-2 max-w-[80%] shadow-sm relative {% if msg.reactions %}mb-2{% endif %}">
|
||||
{% if msg.reply is not none %}
|
||||
<a href="#{{msg.reply}}" target="_self" class="no-base">
|
||||
<div class="mb-2 p-1 bg-whatsapp-chat-light rounded border-l-4 border-whatsapp text-sm reply-box">
|
||||
<p class="text-whatsapp font-medium text-xs">Replying to</p>
|
||||
<p class="text-[#111b21] text-xs truncate">
|
||||
{% if msg.quoted_data is not none %}
|
||||
"{{msg.quoted_data}}"
|
||||
{% else %}
|
||||
this message
|
||||
<div
|
||||
class="mb-2 p-1 bg-whatsapp-chat-light rounded border-l-4 border-whatsapp text-sm reply-box">
|
||||
<div class="flex items-center gap-2">
|
||||
<div class="flex-1 overflow-hidden">
|
||||
<p class="text-whatsapp font-medium text-xs">Replying to</p>
|
||||
<p class="text-[#111b21] text-xs truncate">
|
||||
{% if msg.quoted_data is not none %}
|
||||
"{{msg.quoted_data}}"
|
||||
{% else %}
|
||||
this message
|
||||
{% endif %}
|
||||
</p>
|
||||
</div>
|
||||
{% set replied_msg = msgs | selectattr('key_id', 'equalto', msg.reply) | first %}
|
||||
{% if replied_msg and replied_msg.media == true %}
|
||||
<div class="flex-shrink-0">
|
||||
{% if "image/" in replied_msg.mime %}
|
||||
<img src="{{ replied_msg.thumb if replied_msg.thumb is not none else replied_msg.data }}"
|
||||
class="w-8 h-8 rounded object-cover" loading="lazy" />
|
||||
{% elif "video/" in replied_msg.mime %}
|
||||
<div class="relative w-8 h-8 rounded overflow-hidden bg-gray-200">
|
||||
<img src="{{ replied_msg.thumb if replied_msg.thumb is not none else replied_msg.data }}"
|
||||
class="w-full h-full object-cover" loading="lazy" />
|
||||
<div class="absolute inset-0 flex items-center justify-center">
|
||||
<div class="play-icon"></div>
|
||||
</div>
|
||||
</div>
|
||||
{% elif "audio/" in replied_msg.mime %}
|
||||
<div class="w-8 h-8 rounded bg-gray-200 flex items-center justify-center">
|
||||
<div class="speaker-icon"></div>
|
||||
</div>
|
||||
{% endif %}
|
||||
</div>
|
||||
{% endif %}
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
</a>
|
||||
{% endif %}
|
||||
@@ -281,28 +416,73 @@
|
||||
{% filter escape %}{{ msg.data }}{% endfilter %}
|
||||
{% endif %}
|
||||
{% if msg.caption is not none %}
|
||||
{{ msg.caption | urlize(none, true, '_blank') }}
|
||||
<p class='mt-1 {% if "audio/" in msg.mime %}text-[#808080]{% endif %}'>
|
||||
{{ msg.caption | urlize(none, true, '_blank') }}
|
||||
</p>
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
</p>
|
||||
<p class="text-[10px] text-[#667781] text-right mt-1">{{ msg.time }}</p>
|
||||
<p class="text-[10px] text-[#667781] text-right mt-1">{{ msg.time }}
|
||||
<span class="status-indicator{% if msg.read_timestamp %} read{% endif %}">
|
||||
{% if msg.received_timestamp %}
|
||||
✓✓
|
||||
{% else %}
|
||||
✓
|
||||
{% endif %}
|
||||
</span>
|
||||
</p>
|
||||
{% if msg.reactions %}
|
||||
<div class="flex flex-wrap gap-1 mt-1 justify-end absolute -bottom-3 -right-2">
|
||||
{% for sender, emoji in msg.reactions.items() %}
|
||||
<div class="bg-white rounded-full px-1.5 py-0.5 text-xs shadow-sm border border-gray-200 cursor-help" title="{{ sender }}">
|
||||
{{ emoji }}
|
||||
</div>
|
||||
{% endfor %}
|
||||
</div>
|
||||
{% endif %}
|
||||
</div>
|
||||
</div>
|
||||
{% else %}
|
||||
<div class="flex justify-start items-center group" id="{{ msg.key_id }}">
|
||||
<div class="bg-white rounded-lg p-2 max-w-[80%] shadow-sm">
|
||||
<div class="bg-white rounded-lg p-2 max-w-[80%] shadow-sm relative {% if msg.reactions %}mb-2{% endif %}">
|
||||
{% if msg.reply is not none %}
|
||||
<a href="#{{msg.reply}}" target="_self" class="no-base">
|
||||
<div class="mb-2 p-1 bg-whatsapp-chat-light rounded border-l-4 border-whatsapp text-sm reply-box">
|
||||
<p class="text-whatsapp font-medium text-xs">Replying to</p>
|
||||
<p class="text-[#808080] text-xs truncate">
|
||||
{% if msg.quoted_data is not none %}
|
||||
{{msg.quoted_data}}
|
||||
{% else %}
|
||||
this message
|
||||
<div
|
||||
class="mb-2 p-1 bg-whatsapp-chat-light rounded border-l-4 border-whatsapp text-sm reply-box">
|
||||
<div class="flex items-center gap-2">
|
||||
<div class="flex-1 overflow-hidden">
|
||||
<p class="text-whatsapp font-medium text-xs">Replying to</p>
|
||||
<p class="text-[#808080] text-xs truncate">
|
||||
{% if msg.quoted_data is not none %}
|
||||
{{msg.quoted_data}}
|
||||
{% else %}
|
||||
this message
|
||||
{% endif %}
|
||||
</p>
|
||||
</div>
|
||||
{% set replied_msg = msgs | selectattr('key_id', 'equalto', msg.reply) | first %}
|
||||
{% if replied_msg and replied_msg.media == true %}
|
||||
<div class="flex-shrink-0">
|
||||
{% if "image/" in replied_msg.mime %}
|
||||
<img src="{{ replied_msg.thumb if replied_msg.thumb is not none else replied_msg.data }}"
|
||||
class="w-8 h-8 rounded object-cover" loading="lazy" />
|
||||
{% elif "video/" in replied_msg.mime %}
|
||||
<div class="relative w-8 h-8 rounded overflow-hidden bg-gray-200">
|
||||
<img src="{{ replied_msg.thumb if replied_msg.thumb is not none else replied_msg.data }}"
|
||||
class="w-full h-full object-cover" loading="lazy" />
|
||||
<div class="absolute inset-0 flex items-center justify-center">
|
||||
<div class="play-icon"></div>
|
||||
</div>
|
||||
</div>
|
||||
{% elif "audio/" in replied_msg.mime %}
|
||||
<div class="w-8 h-8 rounded bg-gray-200 flex items-center justify-center">
|
||||
<div class="speaker-icon"></div>
|
||||
</div>
|
||||
{% endif %}
|
||||
</div>
|
||||
{% endif %}
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
</a>
|
||||
{% endif %}
|
||||
@@ -342,7 +522,9 @@
|
||||
{% filter escape %}{{ msg.data }}{% endfilter %}
|
||||
{% endif %}
|
||||
{% if msg.caption is not none %}
|
||||
{{ msg.caption | urlize(none, true, '_blank') }}
|
||||
<p class='mt-1 {% if "audio/" in msg.mime %}text-[#808080]{% endif %}'>
|
||||
{{ msg.caption | urlize(none, true, '_blank') }}
|
||||
</p>
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
@@ -356,6 +538,15 @@
|
||||
<span class="flex-grow min-w-[4px]"></span>
|
||||
<span class="flex-shrink-0">{{ msg.time }}</span>
|
||||
</div>
|
||||
{% if msg.reactions %}
|
||||
<div class="flex flex-wrap gap-1 mt-1 justify-start absolute -bottom-3 -left-2">
|
||||
{% for sender, emoji in msg.reactions.items() %}
|
||||
<div class="bg-gray-100 rounded-full px-1.5 py-0.5 text-xs shadow-sm border border-gray-200 cursor-help" title="{{ sender }}">
|
||||
{{ emoji }}
|
||||
</div>
|
||||
{% endfor %}
|
||||
</div>
|
||||
{% endif %}
|
||||
</div>
|
||||
<!-- <div class="opacity-0 group-hover:opacity-100 transition-opacity duration-200 relative ml-2">
|
||||
<div class="relative">
|
||||
@@ -377,20 +568,19 @@
|
||||
{% endfor %}
|
||||
</div>
|
||||
<footer>
|
||||
<h2 class="text-center">
|
||||
{% if not next %}
|
||||
End of History
|
||||
<div class="flex justify-center mb-6">
|
||||
<div class="bg-[#e1f2fb] rounded-lg px-3 py-2 text-sm text-[#54656f]">
|
||||
End of History
|
||||
</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
</h2>
|
||||
<br>
|
||||
Portions of this page are reproduced from <a href="https://web.dev/articles/lazy-loading-video">work</a> created and <a href="https://developers.google.com/readme/policies">shared by Google</a> and used according to terms described in the <a href="https://www.apache.org/licenses/LICENSE-2.0">Apache 2.0 License</a>.
|
||||
Portions of this page are reproduced from <a href="https://web.dev/articles/lazy-loading-video">work</a>
|
||||
created and <a href="https://developers.google.com/readme/policies">shared by Google</a> and used
|
||||
according to terms described in the <a href="https://www.apache.org/licenses/LICENSE-2.0">Apache 2.0
|
||||
License</a>.
|
||||
</footer>
|
||||
<svg style="display: none;">
|
||||
<!-- Tooltip info icon -->
|
||||
<symbol id="info-icon" viewBox="0 0 24 24">
|
||||
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M13 16h-1v-4h-1m1-4h.01M21 12a9 9 0 11-18 0 9 9 0 0118 0z" />
|
||||
</symbol>
|
||||
</svg>
|
||||
</div>
|
||||
</article>
|
||||
</body>
|
||||
|
||||
@@ -36,17 +36,19 @@ classifiers = [
|
||||
requires-python = ">=3.10"
|
||||
dependencies = [
|
||||
"jinja2",
|
||||
"bleach"
|
||||
"bleach",
|
||||
"tqdm"
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
android_backup = ["pycryptodome", "javaobj-py3"]
|
||||
ios_backup = ["iphone_backup_decrypt @ git+https://github.com/KnugiHK/iphone_backup_decrypt"]
|
||||
crypt12 = ["pycryptodome"]
|
||||
crypt14 = ["pycryptodome"]
|
||||
crypt15 = ["pycryptodome", "javaobj-py3"]
|
||||
all = ["pycryptodome", "javaobj-py3"]
|
||||
everything = ["pycryptodome", "javaobj-py3"]
|
||||
backup = ["pycryptodome", "javaobj-py3"]
|
||||
all = ["pycryptodome", "javaobj-py3", "iphone_backup_decrypt @ git+https://github.com/KnugiHK/iphone_backup_decrypt"]
|
||||
everything = ["pycryptodome", "javaobj-py3", "iphone_backup_decrypt @ git+https://github.com/KnugiHK/iphone_backup_decrypt"]
|
||||
backup = ["pycryptodome", "javaobj-py3", "iphone_backup_decrypt @ git+https://github.com/KnugiHK/iphone_backup_decrypt"]
|
||||
|
||||
[project.scripts]
|
||||
wtsexporter = "Whatsapp_Chat_Exporter.__main__:main"
|
||||
|
||||
27
tests/conftest.py
Normal file
27
tests/conftest.py
Normal file
@@ -0,0 +1,27 @@
|
||||
import pytest
|
||||
import os
|
||||
|
||||
def pytest_collection_modifyitems(config, items):
|
||||
"""
|
||||
Moves test_nuitka_binary.py to the end and fails if the file is missing.
|
||||
"""
|
||||
target_file = "test_nuitka_binary.py"
|
||||
|
||||
# Sanity Check: Ensure the file actually exists in the tests directory
|
||||
test_dir = os.path.join(config.rootdir, "tests")
|
||||
file_path = os.path.join(test_dir, target_file)
|
||||
|
||||
if not os.path.exists(file_path):
|
||||
pytest.exit(f"\n[FATAL] Required test file '{target_file}' not found in {test_dir}. "
|
||||
f"Order enforcement failed!", returncode=1)
|
||||
|
||||
nuitka_tests = []
|
||||
remaining_tests = []
|
||||
|
||||
for item in items:
|
||||
if target_file in item.nodeid:
|
||||
nuitka_tests.append(item)
|
||||
else:
|
||||
remaining_tests.append(item)
|
||||
|
||||
items[:] = remaining_tests + nuitka_tests
|
||||
@@ -101,6 +101,7 @@ chat_data_merged = {
|
||||
"mime": None,
|
||||
"reply": None,
|
||||
"quoted_data": None,
|
||||
'reactions': {},
|
||||
"caption": None,
|
||||
"thumb": None,
|
||||
"sticker": False,
|
||||
@@ -121,6 +122,7 @@ chat_data_merged = {
|
||||
"mime": None,
|
||||
"reply": None,
|
||||
"quoted_data": None,
|
||||
'reactions': {},
|
||||
"caption": None,
|
||||
"thumb": None,
|
||||
"sticker": False,
|
||||
@@ -141,6 +143,7 @@ chat_data_merged = {
|
||||
"mime": None,
|
||||
"reply": None,
|
||||
"quoted_data": None,
|
||||
'reactions': {},
|
||||
"caption": None,
|
||||
"thumb": None,
|
||||
"sticker": False,
|
||||
|
||||
@@ -254,3 +254,99 @@ class TestSafeName:
|
||||
def test_safe_name(self, input_text, expected_output):
|
||||
result = safe_name(input_text)
|
||||
assert result == expected_output
|
||||
|
||||
|
||||
class TestGetChatCondition:
|
||||
def test_no_filter(self):
|
||||
"""Test when filter is None"""
|
||||
result = get_chat_condition(None, True, ["column1", "column2"])
|
||||
assert result == ""
|
||||
|
||||
result = get_chat_condition(None, False, ["column1"])
|
||||
assert result == ""
|
||||
|
||||
def test_include_single_chat_single_column(self):
|
||||
"""Test including a single chat with single column"""
|
||||
result = get_chat_condition(["1234567890"], True, ["phone"])
|
||||
assert result == "AND ( phone LIKE '%1234567890%')"
|
||||
|
||||
def test_include_multiple_chats_single_column(self):
|
||||
"""Test including multiple chats with single column"""
|
||||
result = get_chat_condition(["1234567890", "0987654321"], True, ["phone"])
|
||||
assert result == "AND ( phone LIKE '%1234567890%' OR phone LIKE '%0987654321%')"
|
||||
|
||||
def test_exclude_single_chat_single_column(self):
|
||||
"""Test excluding a single chat with single column"""
|
||||
result = get_chat_condition(["1234567890"], False, ["phone"])
|
||||
assert result == "AND ( phone NOT LIKE '%1234567890%')"
|
||||
|
||||
def test_exclude_multiple_chats_single_column(self):
|
||||
"""Test excluding multiple chats with single column"""
|
||||
result = get_chat_condition(["1234567890", "0987654321"], False, ["phone"])
|
||||
assert result == "AND ( phone NOT LIKE '%1234567890%' AND phone NOT LIKE '%0987654321%')"
|
||||
|
||||
def test_include_with_jid_android(self):
|
||||
"""Test including chats with JID for Android platform"""
|
||||
result = get_chat_condition(["1234567890"], True, ["phone", "name"], "jid", "android")
|
||||
assert result == "AND ( phone LIKE '%1234567890%' OR (name LIKE '%1234567890%' AND jid.type == 1))"
|
||||
|
||||
def test_include_with_jid_ios(self):
|
||||
"""Test including chats with JID for iOS platform"""
|
||||
result = get_chat_condition(["1234567890"], True, ["phone", "name"], "jid", "ios")
|
||||
assert result == "AND ( phone LIKE '%1234567890%' OR (name LIKE '%1234567890%' AND jid IS NOT NULL))"
|
||||
|
||||
def test_exclude_with_jid_android(self):
|
||||
"""Test excluding chats with JID for Android platform"""
|
||||
result = get_chat_condition(["1234567890"], False, ["phone", "name"], "jid", "android")
|
||||
assert result == "AND ( phone NOT LIKE '%1234567890%' AND (name NOT LIKE '%1234567890%' AND jid.type == 1))"
|
||||
|
||||
def test_exclude_with_jid_ios(self):
|
||||
"""Test excluding chats with JID for iOS platform"""
|
||||
result = get_chat_condition(["1234567890"], False, ["phone", "name"], "jid", "ios")
|
||||
assert result == "AND ( phone NOT LIKE '%1234567890%' AND (name NOT LIKE '%1234567890%' AND jid IS NOT NULL))"
|
||||
|
||||
def test_multiple_chats_with_jid_android(self):
|
||||
"""Test multiple chats with JID for Android platform"""
|
||||
result = get_chat_condition(["1234567890", "0987654321"], True, ["phone", "name"], "jid", "android")
|
||||
expected = "AND ( phone LIKE '%1234567890%' OR (name LIKE '%1234567890%' AND jid.type == 1) OR phone LIKE '%0987654321%' OR (name LIKE '%0987654321%' AND jid.type == 1))"
|
||||
assert result == expected
|
||||
|
||||
def test_multiple_chats_exclude_with_jid_android(self):
|
||||
"""Test excluding multiple chats with JID for Android platform"""
|
||||
result = get_chat_condition(["1234567890", "0987654321"], False, ["phone", "name"], "jid", "android")
|
||||
expected = "AND ( phone NOT LIKE '%1234567890%' AND (name NOT LIKE '%1234567890%' AND jid.type == 1) AND phone NOT LIKE '%0987654321%' AND (name NOT LIKE '%0987654321%' AND jid.type == 1))"
|
||||
assert result == expected
|
||||
|
||||
def test_invalid_column_count_with_jid(self):
|
||||
"""Test error when column count is less than 2 but jid is provided"""
|
||||
with pytest.raises(ValueError, match="There must be at least two elements in argument columns if jid is not None"):
|
||||
get_chat_condition(["1234567890"], True, ["phone"], "jid", "android")
|
||||
|
||||
def test_unsupported_platform(self):
|
||||
"""Test error when unsupported platform is provided"""
|
||||
with pytest.raises(ValueError, match="Only android and ios are supported for argument platform if jid is not None"):
|
||||
get_chat_condition(["1234567890"], True, ["phone", "name"], "jid", "windows")
|
||||
|
||||
def test_empty_filter_list(self):
|
||||
"""Test with empty filter list"""
|
||||
result = get_chat_condition([], True, ["phone"])
|
||||
assert result == ""
|
||||
|
||||
result = get_chat_condition([], False, ["phone"])
|
||||
assert result == ""
|
||||
|
||||
def test_filter_with_empty_strings(self):
|
||||
"""Test with filter containing empty strings"""
|
||||
result = get_chat_condition(["", "1234567890"], True, ["phone"])
|
||||
assert result == "AND ( phone LIKE '%%' OR phone LIKE '%1234567890%')"
|
||||
|
||||
result = get_chat_condition([""], True, ["phone"])
|
||||
assert result == "AND ( phone LIKE '%%')"
|
||||
|
||||
def test_special_characters_in_filter(self):
|
||||
"""Test with special characters in filter values"""
|
||||
result = get_chat_condition(["test@example.com"], True, ["email"])
|
||||
assert result == "AND ( email LIKE '%test@example.com%')"
|
||||
|
||||
result = get_chat_condition(["user-name"], True, ["username"])
|
||||
assert result == "AND ( username LIKE '%user-name%')"
|
||||
Reference in New Issue
Block a user