Refactor CLEAR_LINE in a more pythonic way

So it is easier for contributor to write a logging line for this project.
This commit is contained in:
KnugiHK
2026-01-24 16:48:07 +08:00
parent 746e4e1ac5
commit 4eed3ca321
8 changed files with 81 additions and 69 deletions

View File

@@ -12,7 +12,7 @@ import importlib.metadata
from Whatsapp_Chat_Exporter import android_crypt, exported_handler, android_handler from Whatsapp_Chat_Exporter import android_crypt, exported_handler, android_handler
from Whatsapp_Chat_Exporter import ios_handler, ios_media_handler from Whatsapp_Chat_Exporter import ios_handler, ios_media_handler
from Whatsapp_Chat_Exporter.data_model import ChatCollection, ChatStore, Timing from Whatsapp_Chat_Exporter.data_model import ChatCollection, ChatStore, Timing
from Whatsapp_Chat_Exporter.utility import APPLE_TIME, CLEAR_LINE, CURRENT_TZ_OFFSET, Crypt from Whatsapp_Chat_Exporter.utility import APPLE_TIME, CURRENT_TZ_OFFSET, Crypt
from Whatsapp_Chat_Exporter.utility import readable_to_bytes, safe_name, bytes_to_readable from Whatsapp_Chat_Exporter.utility import readable_to_bytes, safe_name, bytes_to_readable
from Whatsapp_Chat_Exporter.utility import import_from_json, incremental_merge, check_update from Whatsapp_Chat_Exporter.utility import import_from_json, incremental_merge, check_update
from Whatsapp_Chat_Exporter.utility import telegram_json_format, convert_time_unit, DbType from Whatsapp_Chat_Exporter.utility import telegram_json_format, convert_time_unit, DbType
@@ -440,10 +440,10 @@ def setup_contact_store(args) -> Optional['ContactsFromVCards']:
def decrypt_android_backup(args) -> int: def decrypt_android_backup(args) -> int:
"""Decrypt Android backup files and return error code.""" """Decrypt Android backup files and return error code."""
if args.key is None or args.backup is None: if args.key is None or args.backup is None:
logger.error(f"You must specify the backup file with -b and a key with -k{CLEAR_LINE}") logger.error(f"You must specify the backup file with -b and a key with -k")
return 1 return 1
logger.info(f"Decryption key specified, decrypting WhatsApp backup...{CLEAR_LINE}") logger.info(f"Decryption key specified, decrypting WhatsApp backup...")
# Determine crypt type # Determine crypt type
if "crypt12" in args.backup: if "crypt12" in args.backup:
@@ -454,7 +454,7 @@ def decrypt_android_backup(args) -> int:
crypt = Crypt.CRYPT15 crypt = Crypt.CRYPT15
else: else:
logger.error( logger.error(
f"Unknown backup format. The backup file must be crypt12, crypt14 or crypt15.{CLEAR_LINE}") f"Unknown backup format. The backup file must be crypt12, crypt14 or crypt15.")
return 1 return 1
# Get key # Get key
@@ -597,20 +597,20 @@ def handle_media_directory(args) -> None:
if os.path.isdir(media_path): if os.path.isdir(media_path):
logger.info( logger.info(
f"WhatsApp directory already exists in output directory. Skipping...{CLEAR_LINE}") f"WhatsApp directory already exists in output directory. Skipping...")
else: else:
if args.move_media: if args.move_media:
try: try:
logger.info(f"Moving media directory...\r") logger.info(f"Moving media directory...", extra={"clear": True})
shutil.move(args.media, f"{args.output}/") shutil.move(args.media, f"{args.output}/")
logger.info(f"Media directory has been moved to the output directory{CLEAR_LINE}") logger.info(f"Media directory has been moved to the output directory")
except PermissionError: except PermissionError:
logger.warning("Cannot remove original WhatsApp directory. " logger.warning("Cannot remove original WhatsApp directory. "
"Perhaps the directory is opened?\n") "Perhaps the directory is opened?\n")
else: else:
logger.info(f"Copying media directory...\r") logger.info(f"Copying media directory...", extra={"clear": True})
shutil.copytree(args.media, media_path) shutil.copytree(args.media, media_path)
logger.info(f"Media directory has been copied to the output directory{CLEAR_LINE}") logger.info(f"Media directory has been copied to the output directory")
def create_output_files(args, data: ChatCollection) -> None: def create_output_files(args, data: ChatCollection) -> None:
@@ -631,7 +631,7 @@ def create_output_files(args, data: ChatCollection) -> None:
# Create text files if requested # Create text files if requested
if args.text_format: if args.text_format:
logger.info(f"Writing text file...{CLEAR_LINE}") logger.info(f"Writing text file...")
android_handler.create_txt(data, args.text_format) android_handler.create_txt(data, args.text_format)
# Create JSON files if requested # Create JSON files if requested
@@ -661,9 +661,9 @@ def export_single_json(args, data: Dict) -> None:
ensure_ascii=not args.avoid_encoding_json, ensure_ascii=not args.avoid_encoding_json,
indent=args.pretty_print_json indent=args.pretty_print_json
) )
logger.info(f"Writing JSON file...\r") logger.info(f"Writing JSON file...", extra={"clear": True})
f.write(json_data) f.write(json_data)
logger.info(f"JSON file saved...({bytes_to_readable(len(json_data))}){CLEAR_LINE}") logger.info(f"JSON file saved...({bytes_to_readable(len(json_data))})")
def export_multiple_json(args, data: Dict) -> None: def export_multiple_json(args, data: Dict) -> None:
@@ -697,7 +697,7 @@ def export_multiple_json(args, data: Dict) -> None:
f.write(file_content) f.write(file_content)
pbar.update(1) pbar.update(1)
total_time = pbar.format_dict['elapsed'] total_time = pbar.format_dict['elapsed']
logger.info(f"Generated {total} JSON files in {convert_time_unit(total_time)}{CLEAR_LINE}") logger.info(f"Generated {total} JSON files in {convert_time_unit(total_time)}")
def process_exported_chat(args, data: ChatCollection) -> None: def process_exported_chat(args, data: ChatCollection) -> None:
@@ -722,16 +722,29 @@ def process_exported_chat(args, data: ChatCollection) -> None:
shutil.copy(file, args.output) shutil.copy(file, args.output)
class ClearLineFilter(logging.Filter):
def filter(self, record):
is_clear = getattr(record, 'clear', False)
if is_clear:
record.line_end = "\r"
record.prefix = "\x1b[K"
else:
record.line_end = "\n"
record.prefix = ""
return True
def setup_logging(level): def setup_logging(level):
log_handler_stdout = logging.StreamHandler() log_handler_stdout = logging.StreamHandler()
log_handler_stdout.terminator = "" log_handler_stdout.terminator = ""
handlers = [log_handler_stdout] handlers = [log_handler_stdout]
log_handler_stdout.addFilter(ClearLineFilter())
if level == logging.DEBUG: if level == logging.DEBUG:
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
handlers.append(logging.FileHandler(f"wtsexpoter-debug-{timestamp}.log", mode="w")) handlers.append(logging.FileHandler(f"wtsexpoter-debug-{timestamp}.log", mode="w"))
logging.basicConfig( logging.basicConfig(
level=level, level=level,
format="[%(levelname)s] %(message)s", format="[%(levelname)s] %(message)s%(line_end)s",
handlers=handlers handlers=handlers
) )
@@ -822,7 +835,7 @@ def main():
args.backup, identifiers, args.decrypt_chunk_size) args.backup, identifiers, args.decrypt_chunk_size)
else: else:
logger.info( logger.info(
f"WhatsApp directory already exists, skipping WhatsApp file extraction.{CLEAR_LINE}") f"WhatsApp directory already exists, skipping WhatsApp file extraction.")
# Set default DB paths if not provided # Set default DB paths if not provided
if args.db is None: if args.db is None:
@@ -838,7 +851,7 @@ def main():
args.pretty_print_json, args.pretty_print_json,
args.avoid_encoding_json args.avoid_encoding_json
) )
logger.info(f"Incremental merge completed successfully.{CLEAR_LINE}") logger.info(f"Incremental merge completed successfully.")
else: else:
# Process contacts # Process contacts
process_contacts(args, data) process_contacts(args, data)

View File

@@ -7,7 +7,7 @@ from tqdm import tqdm
from typing import Tuple, Union from typing import Tuple, Union
from hashlib import sha256 from hashlib import sha256
from functools import partial from functools import partial
from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, CRYPT14_OFFSETS, Crypt, DbType from Whatsapp_Chat_Exporter.utility import CRYPT14_OFFSETS, Crypt, DbType
try: try:
import zlib import zlib
@@ -172,11 +172,11 @@ def _decrypt_crypt14(database: bytes, main_key: bytes, max_worker: int = 10) ->
continue continue
else: else:
logger.debug( logger.debug(
f"Decryption successful with known offsets: IV {iv}, DB {db}{CLEAR_LINE}" f"Decryption successful with known offsets: IV {iv}, DB {db}"
) )
return decrypted_db # Successful decryption return decrypted_db # Successful decryption
logger.info(f"Common offsets failed. Will attempt to brute-force{CLEAR_LINE}") logger.info(f"Common offsets failed. Will attempt to brute-force")
offset_max = 200 offset_max = 200
workers = max_worker workers = max_worker
check_offset = partial(_attempt_decrypt_task, database=database, main_key=main_key) check_offset = partial(_attempt_decrypt_task, database=database, main_key=main_key)
@@ -196,19 +196,19 @@ def _decrypt_crypt14(database: bytes, main_key: bytes, max_worker: int = 10) ->
break break
if found: if found:
logger.info( logger.info(
f"The offsets of your IV and database are {start_iv} and {start_db}, respectively.{CLEAR_LINE}" f"The offsets of your IV and database are {start_iv} and {start_db}, respectively."
) )
logger.info( logger.info(
f"To include your offsets in the expoter, please report it in the discussion thread on GitHub:{CLEAR_LINE}" f"To include your offsets in the expoter, please report it in the discussion thread on GitHub:"
) )
logger.info(f"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/discussions/47{CLEAR_LINE}") logger.info(f"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/discussions/47")
return result return result
except KeyboardInterrupt: except KeyboardInterrupt:
executor.shutdown(wait=False, cancel_futures=True) executor.shutdown(wait=False, cancel_futures=True)
print("\n") print("\n")
raise KeyboardInterrupt( raise KeyboardInterrupt(
f"Brute force interrupted by user (Ctrl+C). Shutting down gracefully...{CLEAR_LINE}" f"Brute force interrupted by user (Ctrl+C). Shutting down gracefully..."
) )
finally: finally:
@@ -346,7 +346,7 @@ def decrypt_backup(
main_key, hex_key = _derive_main_enc_key(key) main_key, hex_key = _derive_main_enc_key(key)
if show_crypt15: if show_crypt15:
hex_key_str = ' '.join([hex_key.hex()[c:c+4] for c in range(0, len(hex_key.hex()), 4)]) hex_key_str = ' '.join([hex_key.hex()[c:c+4] for c in range(0, len(hex_key.hex()), 4)])
logger.info(f"The HEX key of the crypt15 backup is: {hex_key_str}{CLEAR_LINE}") logger.info(f"The HEX key of the crypt15 backup is: {hex_key_str}")
else: else:
main_key = key[126:] main_key = key[126:]

View File

@@ -11,7 +11,7 @@ from markupsafe import escape as htmle
from base64 import b64decode, b64encode from base64 import b64decode, b64encode
from datetime import datetime from datetime import datetime
from Whatsapp_Chat_Exporter.data_model import ChatStore, Message from Whatsapp_Chat_Exporter.data_model import ChatStore, Message
from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, MAX_SIZE, ROW_SIZE, JidType, Device, get_jid_map_join from Whatsapp_Chat_Exporter.utility import MAX_SIZE, ROW_SIZE, JidType, Device, get_jid_map_join
from Whatsapp_Chat_Exporter.utility import rendering, get_file_name, setup_template, get_cond_for_empty from Whatsapp_Chat_Exporter.utility import rendering, get_file_name, setup_template, get_cond_for_empty
from Whatsapp_Chat_Exporter.utility import get_status_location, convert_time_unit, get_jid_map_selection from Whatsapp_Chat_Exporter.utility import get_status_location, convert_time_unit, get_jid_map_selection
from Whatsapp_Chat_Exporter.utility import get_chat_condition, safe_name, bytes_to_readable, determine_metadata from Whatsapp_Chat_Exporter.utility import get_chat_condition, safe_name, bytes_to_readable, determine_metadata
@@ -56,7 +56,7 @@ def contacts(db, data, enrich_from_vcards):
current_chat.status = row["status"] current_chat.status = row["status"]
pbar.update(1) pbar.update(1)
total_time = pbar.format_dict['elapsed'] total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} contacts in {convert_time_unit(total_time)}{CLEAR_LINE}") logger.info(f"Processed {total_row_number} contacts in {convert_time_unit(total_time)}")
return True return True
@@ -101,7 +101,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
pbar.update(1) pbar.update(1)
total_time = pbar.format_dict['elapsed'] total_time = pbar.format_dict['elapsed']
_get_reactions(db, data) _get_reactions(db, data)
logger.info(f"Processed {total_row_number} messages in {convert_time_unit(total_time)}{CLEAR_LINE}") logger.info(f"Processed {total_row_number} messages in {convert_time_unit(total_time)}")
# Helper functions for message processing # Helper functions for message processing
@@ -518,7 +518,7 @@ def _get_reactions(db, data):
if c.fetchone()[0] == 0: if c.fetchone()[0] == 0:
return return
logger.info("Processing reactions...\r") logger.info("Processing reactions...", extra={"clear": True})
c.execute(""" c.execute("""
SELECT SELECT
@@ -539,7 +539,7 @@ def _get_reactions(db, data):
ON chat.jid_row_id = chat_jid._id ON chat.jid_row_id = chat_jid._id
""") """)
except sqlite3.OperationalError: except sqlite3.OperationalError:
logger.warning(f"Could not fetch reactions (schema might be too old or incompatible){CLEAR_LINE}") logger.warning(f"Could not fetch reactions (schema might be too old or incompatible)")
return return
rows = c.fetchall() rows = c.fetchall()
@@ -574,7 +574,7 @@ def _get_reactions(db, data):
message.reactions[sender_name] = reaction message.reactions[sender_name] = reaction
pbar.update(1) pbar.update(1)
total_time = pbar.format_dict['elapsed'] total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} reactions in {convert_time_unit(total_time)}{CLEAR_LINE}") logger.info(f"Processed {total_row_number} reactions in {convert_time_unit(total_time)}")
def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=True, fix_dot_files=False): def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=True, fix_dot_files=False):
@@ -609,7 +609,7 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa
_process_single_media(data, content, media_folder, mime, separate_media, fix_dot_files) _process_single_media(data, content, media_folder, mime, separate_media, fix_dot_files)
pbar.update(1) pbar.update(1)
total_time = pbar.format_dict['elapsed'] total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}{CLEAR_LINE}") logger.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}")
# Helper functions for media processing # Helper functions for media processing
@@ -828,7 +828,7 @@ def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty):
_process_vcard_row(row, path, data) _process_vcard_row(row, path, data)
pbar.update(1) pbar.update(1)
total_time = pbar.format_dict['elapsed'] total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} vCards in {convert_time_unit(total_time)}{CLEAR_LINE}") logger.info(f"Processed {total_row_number} vCards in {convert_time_unit(total_time)}")
def _execute_vcard_query_modern(c, filter_date, filter_chat, filter_empty): def _execute_vcard_query_modern(c, filter_date, filter_chat, filter_empty):
"""Execute vCard query for modern WhatsApp database schema.""" """Execute vCard query for modern WhatsApp database schema."""
@@ -935,7 +935,7 @@ def calls(db, data, timezone_offset, filter_chat):
if total_row_number == 0: if total_row_number == 0:
return return
logger.info(f"Processing calls...({total_row_number})\r") logger.info(f"Processing calls...({total_row_number})", extra={"clear": True})
# Fetch call data # Fetch call data
calls_data = _fetch_calls_data(c, filter_chat) calls_data = _fetch_calls_data(c, filter_chat)
@@ -952,7 +952,7 @@ def calls(db, data, timezone_offset, filter_chat):
# Add the calls chat to the data # Add the calls chat to the data
data.add_chat("000000000000000", chat) data.add_chat("000000000000000", chat)
logger.info(f"Processed {total_row_number} calls in {convert_time_unit(total_time)}{CLEAR_LINE}") logger.info(f"Processed {total_row_number} calls in {convert_time_unit(total_time)}")
def _get_calls_count(c, filter_chat): def _get_calls_count(c, filter_chat):
"""Get the count of call records that match the filter.""" """Get the count of call records that match the filter."""
@@ -1128,7 +1128,7 @@ def create_html(
pbar.update(1) pbar.update(1)
total_time = pbar.format_dict['elapsed'] total_time = pbar.format_dict['elapsed']
logger.info(f"Generated {total_row_number} chats in {convert_time_unit(total_time)}{CLEAR_LINE}") logger.info(f"Generated {total_row_number} chats in {convert_time_unit(total_time)}")
def _generate_single_chat(current_chat, safe_file_name, name, contact, output_folder, template, w3css, headline): def _generate_single_chat(current_chat, safe_file_name, name, contact, output_folder, template, w3css, headline):
"""Generate a single HTML file for a chat.""" """Generate a single HTML file for a chat."""

View File

@@ -6,7 +6,7 @@ from datetime import datetime
from mimetypes import MimeTypes from mimetypes import MimeTypes
from tqdm import tqdm from tqdm import tqdm
from Whatsapp_Chat_Exporter.data_model import ChatStore, Message from Whatsapp_Chat_Exporter.data_model import ChatStore, Message
from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, Device, convert_time_unit from Whatsapp_Chat_Exporter.utility import Device, convert_time_unit
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -43,7 +43,7 @@ def messages(path, data, assume_first_as_me=False):
) )
pbar.update(1) pbar.update(1)
total_time = pbar.format_dict['elapsed'] total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} messages & media in {convert_time_unit(total_time)}{CLEAR_LINE}") logger.info(f"Processed {total_row_number} messages & media in {convert_time_unit(total_time)}")
return data return data

View File

@@ -9,7 +9,7 @@ from pathlib import Path
from mimetypes import MimeTypes from mimetypes import MimeTypes
from markupsafe import escape as htmle from markupsafe import escape as htmle
from Whatsapp_Chat_Exporter.data_model import ChatStore, Message from Whatsapp_Chat_Exporter.data_model import ChatStore, Message
from Whatsapp_Chat_Exporter.utility import APPLE_TIME, CLEAR_LINE, get_chat_condition, Device from Whatsapp_Chat_Exporter.utility import APPLE_TIME, get_chat_condition, Device
from Whatsapp_Chat_Exporter.utility import bytes_to_readable, convert_time_unit, safe_name from Whatsapp_Chat_Exporter.utility import bytes_to_readable, convert_time_unit, safe_name
@@ -21,7 +21,7 @@ def contacts(db, data):
c = db.cursor() c = db.cursor()
c.execute("""SELECT count() FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""") c.execute("""SELECT count() FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""")
total_row_number = c.fetchone()[0] total_row_number = c.fetchone()[0]
logger.info(f"Pre-processing contacts...({total_row_number})\r") logger.info(f"Pre-processing contacts...({total_row_number})", extra={"clear": True})
c.execute("""SELECT ZWHATSAPPID, ZABOUTTEXT FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""") c.execute("""SELECT ZWHATSAPPID, ZABOUTTEXT FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""")
with tqdm(total=total_row_number, desc="Processing contacts", unit="contact", leave=False) as pbar: with tqdm(total=total_row_number, desc="Processing contacts", unit="contact", leave=False) as pbar:
@@ -35,7 +35,7 @@ def contacts(db, data):
data.add_chat(zwhatsapp_id, current_chat) data.add_chat(zwhatsapp_id, current_chat)
pbar.update(1) pbar.update(1)
total_time = pbar.format_dict['elapsed'] total_time = pbar.format_dict['elapsed']
logger.info(f"Pre-processed {total_row_number} contacts in {convert_time_unit(total_time)}{CLEAR_LINE}") logger.info(f"Pre-processed {total_row_number} contacts in {convert_time_unit(total_time)}")
def process_contact_avatars(current_chat, media_folder, contact_id): def process_contact_avatars(current_chat, media_folder, contact_id):
@@ -132,7 +132,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
process_contact_avatars(current_chat, media_folder, contact_id) process_contact_avatars(current_chat, media_folder, contact_id)
pbar.update(1) pbar.update(1)
total_time = pbar.format_dict['elapsed'] total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} contacts in {convert_time_unit(total_time)}{CLEAR_LINE}") logger.info(f"Processed {total_row_number} contacts in {convert_time_unit(total_time)}")
# Get message count # Get message count
message_count_query = f""" message_count_query = f"""
@@ -149,7 +149,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
""" """
c.execute(message_count_query) c.execute(message_count_query)
total_row_number = c.fetchone()[0] total_row_number = c.fetchone()[0]
logger.info(f"Processing messages...(0/{total_row_number})\r") logger.info(f"Processing messages...(0/{total_row_number})", extra={"clear": True})
# Fetch messages # Fetch messages
messages_query = f""" messages_query = f"""
@@ -226,7 +226,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
pbar.update(1) pbar.update(1)
total_time = pbar.format_dict['elapsed'] total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} messages in {convert_time_unit(total_time)}{CLEAR_LINE}") logger.info(f"Processed {total_row_number} messages in {convert_time_unit(total_time)}")
def process_message_data(message, content, is_group_message, data, message_map, no_reply): def process_message_data(message, content, is_group_message, data, message_map, no_reply):
@@ -340,7 +340,7 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa
""" """
c.execute(media_count_query) c.execute(media_count_query)
total_row_number = c.fetchone()[0] total_row_number = c.fetchone()[0]
logger.info(f"Processing media...(0/{total_row_number})\r") logger.info(f"Processing media...(0/{total_row_number})", extra={"clear": True})
# Fetch media items # Fetch media items
media_query = f""" media_query = f"""
@@ -373,7 +373,7 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa
process_media_item(content, data, media_folder, mime, separate_media, fix_dot_files) process_media_item(content, data, media_folder, mime, separate_media, fix_dot_files)
pbar.update(1) pbar.update(1)
total_time = pbar.format_dict['elapsed'] total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}{CLEAR_LINE}") logger.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}")
def process_media_item(content, data, media_folder, mime, separate_media, fix_dot_files=False): def process_media_item(content, data, media_folder, mime, separate_media, fix_dot_files=False):
@@ -462,7 +462,7 @@ def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty):
c.execute(vcard_query) c.execute(vcard_query)
contents = c.fetchall() contents = c.fetchall()
total_row_number = len(contents) total_row_number = len(contents)
logger.info(f"Processing vCards...(0/{total_row_number})\r") logger.info(f"Processing vCards...(0/{total_row_number})", extra={"clear": True})
# Create vCards directory # Create vCards directory
path = f'{media_folder}/Message/vCards' path = f'{media_folder}/Message/vCards'
@@ -474,7 +474,7 @@ def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty):
process_vcard_item(content, path, data) process_vcard_item(content, path, data)
pbar.update(1) pbar.update(1)
total_time = pbar.format_dict['elapsed'] total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} vCards in {convert_time_unit(total_time)}{CLEAR_LINE}") logger.info(f"Processed {total_row_number} vCards in {convert_time_unit(total_time)}")
def process_vcard_item(content, path, data): def process_vcard_item(content, path, data):
@@ -566,7 +566,7 @@ def calls(db, data, timezone_offset, filter_chat):
# Add calls chat to data # Add calls chat to data
data.add_chat("000000000000000", chat) data.add_chat("000000000000000", chat)
logger.info(f"Processed {total_row_number} calls in {convert_time_unit(total_time)}{CLEAR_LINE}") logger.info(f"Processed {total_row_number} calls in {convert_time_unit(total_time)}")
def process_call_record(content, chat, data, timezone_offset): def process_call_record(content, chat, data, timezone_offset):

View File

@@ -8,7 +8,7 @@ import getpass
from sys import exit, platform as osname from sys import exit, platform as osname
import sys import sys
from tqdm import tqdm from tqdm import tqdm
from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, WhatsAppIdentifier, convert_time_unit from Whatsapp_Chat_Exporter.utility import WhatsAppIdentifier, convert_time_unit
from Whatsapp_Chat_Exporter.bplist import BPListReader from Whatsapp_Chat_Exporter.bplist import BPListReader
try: try:
from iphone_backup_decrypt import EncryptedBackup, RelativePath from iphone_backup_decrypt import EncryptedBackup, RelativePath
@@ -79,7 +79,7 @@ class BackupExtractor:
) )
return return
logger.info(f"Encryption detected on the backup!{CLEAR_LINE}") logger.info(f"Encryption detected on the backup!")
password = getpass.getpass("Enter the password for the backup:") password = getpass.getpass("Enter the password for the backup:")
sys.stdout.write("\033[F\033[K") sys.stdout.write("\033[F\033[K")
sys.stdout.flush() sys.stdout.flush()
@@ -93,7 +93,7 @@ class BackupExtractor:
Args: Args:
password (str): The password for the encrypted backup. password (str): The password for the encrypted backup.
""" """
logger.info(f"Trying to open the iOS backup...{CLEAR_LINE}") logger.info(f"Trying to open the iOS backup...")
self.backup = EncryptedBackup( self.backup = EncryptedBackup(
backup_directory=self.base_dir, backup_directory=self.base_dir,
passphrase=password, passphrase=password,
@@ -101,8 +101,8 @@ class BackupExtractor:
check_same_thread=False, check_same_thread=False,
decrypt_chunk_size=self.decrypt_chunk_size, decrypt_chunk_size=self.decrypt_chunk_size,
) )
logger.info(f"iOS backup is opened successfully{CLEAR_LINE}") logger.info(f"iOS backup is opened successfully")
logger.info("Decrypting WhatsApp database...\r") logger.info("Decrypting WhatsApp database...", extra={"clear": True})
try: try:
self.backup.extract_file( self.backup.extract_file(
relative_path=RelativePath.WHATSAPP_MESSAGES, relative_path=RelativePath.WHATSAPP_MESSAGES,
@@ -130,7 +130,7 @@ class BackupExtractor:
) )
exit(6) exit(6)
else: else:
logger.info(f"WhatsApp database decrypted successfully{CLEAR_LINE}") logger.info(f"WhatsApp database decrypted successfully")
def _extract_decrypted_files(self): def _extract_decrypted_files(self):
"""Extract all WhatsApp files after decryption""" """Extract all WhatsApp files after decryption"""
@@ -150,7 +150,7 @@ class BackupExtractor:
) )
total_time = pbar.format_dict['elapsed'] total_time = pbar.format_dict['elapsed']
pbar.close() pbar.close()
logger.info(f"All required files are decrypted and extracted in {convert_time_unit(total_time)}{CLEAR_LINE}") logger.info(f"All required files are decrypted and extracted in {convert_time_unit(total_time)}")
def _extract_unencrypted_backup(self): def _extract_unencrypted_backup(self):
""" """
@@ -182,12 +182,12 @@ class BackupExtractor:
shutil.copyfile(wts_db_path, self.identifiers.MESSAGE) shutil.copyfile(wts_db_path, self.identifiers.MESSAGE)
if not os.path.isfile(contact_db_path): if not os.path.isfile(contact_db_path):
logger.warning(f"Contact database not found. Skipping...{CLEAR_LINE}") logger.warning(f"Contact database not found. Skipping...")
else: else:
shutil.copyfile(contact_db_path, self.identifiers.CONTACT) shutil.copyfile(contact_db_path, self.identifiers.CONTACT)
if not os.path.isfile(call_db_path): if not os.path.isfile(call_db_path):
logger.warning(f"Call database not found. Skipping...{CLEAR_LINE}") logger.warning(f"Call database not found. Skipping...")
else: else:
shutil.copyfile(call_db_path, self.identifiers.CALL) shutil.copyfile(call_db_path, self.identifiers.CALL)
@@ -236,7 +236,7 @@ class BackupExtractor:
os.utime(destination, (modification, modification)) os.utime(destination, (modification, modification))
pbar.update(1) pbar.update(1)
total_time = pbar.format_dict['elapsed'] total_time = pbar.format_dict['elapsed']
logger.info(f"Extracted {total_row_number} WhatsApp files in {convert_time_unit(total_time)}{CLEAR_LINE}") logger.info(f"Extracted {total_row_number} WhatsApp files in {convert_time_unit(total_time)}")
def extract_media(base_dir, identifiers, decrypt_chunk_size): def extract_media(base_dir, identifiers, decrypt_chunk_size):

View File

@@ -30,7 +30,6 @@ except ImportError:
MAX_SIZE = 4 * 1024 * 1024 # Default 4MB MAX_SIZE = 4 * 1024 * 1024 # Default 4MB
ROW_SIZE = 0x3D0 ROW_SIZE = 0x3D0
CURRENT_TZ_OFFSET = datetime.now().astimezone().utcoffset().seconds / 3600 CURRENT_TZ_OFFSET = datetime.now().astimezone().utcoffset().seconds / 3600
CLEAR_LINE = "\x1b[K\n"
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -254,7 +253,7 @@ def import_from_json(json_file: str, data: ChatCollection):
data.add_chat(jid, chat) data.add_chat(jid, chat)
pbar.update(1) pbar.update(1)
total_time = pbar.format_dict['elapsed'] total_time = pbar.format_dict['elapsed']
logger.info(f"Imported {total_row_number} chats from JSON in {convert_time_unit(total_time)}{CLEAR_LINE}") logger.info(f"Imported {total_row_number} chats from JSON in {convert_time_unit(total_time)}")
class IncrementalMerger: class IncrementalMerger:
@@ -389,7 +388,7 @@ class IncrementalMerger:
target_path: Path to target file. target_path: Path to target file.
json_file: Name of the JSON file. json_file: Name of the JSON file.
""" """
logger.info(f"Merging '{json_file}' with existing file in target directory...\r") logger.info(f"Merging '{json_file}' with existing file in target directory...", extra={"clear": True})
source_data = self._load_chat_data(source_path) source_data = self._load_chat_data(source_path)
target_data = self._load_chat_data(target_path) target_data = self._load_chat_data(target_path)
@@ -401,10 +400,10 @@ class IncrementalMerger:
merged_data = self._serialize_chats(merged_chats) merged_data = self._serialize_chats(merged_chats)
if self._has_changes(merged_data, target_data): if self._has_changes(merged_data, target_data):
logger.info(f"Changes detected in '{json_file}', updating target file...{CLEAR_LINE}") logger.info(f"Changes detected in '{json_file}', updating target file...")
self._save_merged_data(target_path, merged_data) self._save_merged_data(target_path, merged_data)
else: else:
logger.info(f"No changes detected in '{json_file}', skipping update.{CLEAR_LINE}") logger.info(f"No changes detected in '{json_file}', skipping update.")
def _should_copy_media_file(self, source_file: str, target_file: str) -> bool: def _should_copy_media_file(self, source_file: str, target_file: str) -> bool:
"""Check if media file should be copied. """Check if media file should be copied.
@@ -429,7 +428,7 @@ class IncrementalMerger:
source_media_path = os.path.join(source_dir, media_dir) source_media_path = os.path.join(source_dir, media_dir)
target_media_path = os.path.join(target_dir, media_dir) target_media_path = os.path.join(target_dir, media_dir)
logger.info(f"Merging media directories. Source: {source_media_path}, target: {target_media_path}{CLEAR_LINE}") logger.info(f"Merging media directories. Source: {source_media_path}, target: {target_media_path}")
if not os.path.exists(source_media_path): if not os.path.exists(source_media_path):
return return
@@ -457,7 +456,7 @@ class IncrementalMerger:
""" """
json_files = self._get_json_files(source_dir) json_files = self._get_json_files(source_dir)
logger.info("Starting incremental merge process...{CLEAR_LINE}") logger.info("Starting incremental merge process...")
for json_file in json_files: for json_file in json_files:
source_path = os.path.join(source_dir, json_file) source_path = os.path.join(source_dir, json_file)
target_path = os.path.join(target_dir, json_file) target_path = os.path.join(target_dir, json_file)
@@ -894,7 +893,7 @@ def get_chat_type(chat_id: str) -> str:
return "status_broadcast" return "status_broadcast"
elif chat_id.endswith("@broadcast"): elif chat_id.endswith("@broadcast"):
return "broadcast_channel" return "broadcast_channel"
logger.warning(f"Unknown chat type for {chat_id}, defaulting to private_group{CLEAR_LINE}") logger.warning(f"Unknown chat type for {chat_id}, defaulting to private_group")
return "private_group" return "private_group"

View File

@@ -3,7 +3,7 @@ import re
import quopri import quopri
from typing import List, TypedDict from typing import List, TypedDict
from Whatsapp_Chat_Exporter.data_model import ChatStore from Whatsapp_Chat_Exporter.data_model import ChatStore
from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, Device from Whatsapp_Chat_Exporter.utility import Device
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -47,7 +47,7 @@ def decode_quoted_printable(value: str, charset: str) -> str:
# Fallback: return the original value if decoding fails # Fallback: return the original value if decoding fails
logger.warning( logger.warning(
f"Failed to decode quoted-printable value: {value}, " f"Failed to decode quoted-printable value: {value}, "
f"charset: {charset}. Please report this issue.{CLEAR_LINE}" f"charset: {charset}. Please report this issue."
) )
return value return value
@@ -176,7 +176,7 @@ def read_vcards_file(vcf_file_path, default_country_code: str):
if contact := process_vcard_entry(vcard): if contact := process_vcard_entry(vcard):
contacts.append(contact) contacts.append(contact)
logger.info(f"Imported {len(contacts)} contacts/vcards{CLEAR_LINE}") logger.info(f"Imported {len(contacts)} contacts/vcards")
return map_number_to_name(contacts, default_country_code) return map_number_to_name(contacts, default_country_code)