Refactor the logging facility a bit

This commit is contained in:
KnugiHK
2026-01-24 17:05:14 +08:00
parent 4eed3ca321
commit f920ca82b4
8 changed files with 123 additions and 121 deletions

View File

@@ -26,7 +26,6 @@ from typing import Optional, List, Dict
from Whatsapp_Chat_Exporter.vcards_contacts import ContactsFromVCards
logger = logging.getLogger(__name__)
__version__ = importlib.metadata.version("whatsapp_chat_exporter")
WTSEXPORTER_BANNER = f"""========================================================================================================
██╗ ██╗██╗ ██╗ █████╗ ████████╗███████╗ █████╗ ██████╗ ██████╗
@@ -440,10 +439,10 @@ def setup_contact_store(args) -> Optional['ContactsFromVCards']:
def decrypt_android_backup(args) -> int:
"""Decrypt Android backup files and return error code."""
if args.key is None or args.backup is None:
logger.error(f"You must specify the backup file with -b and a key with -k")
logging.error(f"You must specify the backup file with -b and a key with -k")
return 1
logger.info(f"Decryption key specified, decrypting WhatsApp backup...")
logging.info(f"Decryption key specified, decrypting WhatsApp backup...")
# Determine crypt type
if "crypt12" in args.backup:
@@ -453,7 +452,7 @@ def decrypt_android_backup(args) -> int:
elif "crypt15" in args.backup:
crypt = Crypt.CRYPT15
else:
logger.error(
logging.error(
f"Unknown backup format. The backup file must be crypt12, crypt14 or crypt15.")
return 1
@@ -506,15 +505,15 @@ def decrypt_android_backup(args) -> int:
def handle_decrypt_error(error: int) -> None:
"""Handle decryption errors with appropriate messages."""
if error == 1:
logger.error("Dependencies of decrypt_backup and/or extract_encrypted_key"
" are not present. For details, see README.md.\n")
logging.error("Dependencies of decrypt_backup and/or extract_encrypted_key"
" are not present. For details, see README.md.")
exit(3)
elif error == 2:
logger.error("Failed when decompressing the decrypted backup. "
"Possibly incorrect offsets used in decryption.\n")
logging.error("Failed when decompressing the decrypted backup. "
"Possibly incorrect offsets used in decryption.")
exit(4)
else:
logger.error("Unknown error occurred.\n")
logging.error("Unknown error occurred.")
exit(5)
@@ -537,9 +536,9 @@ def process_messages(args, data: ChatCollection) -> None:
msg_db = args.db if args.db else "msgstore.db" if args.android else args.identifiers.MESSAGE
if not os.path.isfile(msg_db):
logger.error(
logging.error(
"The message database does not exist. You may specify the path "
"to database file with option -d or check your provided path.\n"
"to database file with option -d or check your provided path."
)
exit(6)
@@ -596,21 +595,21 @@ def handle_media_directory(args) -> None:
media_path = os.path.join(args.output, args.media)
if os.path.isdir(media_path):
logger.info(
logging.info(
f"WhatsApp directory already exists in output directory. Skipping...")
else:
if args.move_media:
try:
logger.info(f"Moving media directory...", extra={"clear": True})
logging.info(f"Moving media directory...", extra={"clear": True})
shutil.move(args.media, f"{args.output}/")
logger.info(f"Media directory has been moved to the output directory")
logging.info(f"Media directory has been moved to the output directory")
except PermissionError:
logger.warning("Cannot remove original WhatsApp directory. "
"Perhaps the directory is opened?\n")
logging.warning("Cannot remove original WhatsApp directory. "
"Perhaps the directory is opened?")
else:
logger.info(f"Copying media directory...", extra={"clear": True})
logging.info(f"Copying media directory...", extra={"clear": True})
shutil.copytree(args.media, media_path)
logger.info(f"Media directory has been copied to the output directory")
logging.info(f"Media directory has been copied to the output directory")
def create_output_files(args, data: ChatCollection) -> None:
@@ -631,7 +630,7 @@ def create_output_files(args, data: ChatCollection) -> None:
# Create text files if requested
if args.text_format:
logger.info(f"Writing text file...")
logging.info(f"Writing text file...")
android_handler.create_txt(data, args.text_format)
# Create JSON files if requested
@@ -661,9 +660,9 @@ def export_single_json(args, data: Dict) -> None:
ensure_ascii=not args.avoid_encoding_json,
indent=args.pretty_print_json
)
logger.info(f"Writing JSON file...", extra={"clear": True})
logging.info(f"Writing JSON file...", extra={"clear": True})
f.write(json_data)
logger.info(f"JSON file saved...({bytes_to_readable(len(json_data))})")
logging.info(f"JSON file saved...({bytes_to_readable(len(json_data))})")
def export_multiple_json(args, data: Dict) -> None:
@@ -697,7 +696,7 @@ def export_multiple_json(args, data: Dict) -> None:
f.write(file_content)
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Generated {total} JSON files in {convert_time_unit(total_time)}")
logging.info(f"Generated {total} JSON files in {convert_time_unit(total_time)}")
def process_exported_chat(args, data: ChatCollection) -> None:
@@ -737,11 +736,18 @@ class ClearLineFilter(logging.Filter):
def setup_logging(level):
log_handler_stdout = logging.StreamHandler()
log_handler_stdout.terminator = ""
handlers = [log_handler_stdout]
log_handler_stdout.addFilter(ClearLineFilter())
log_handler_stdout.set_name("console")
handlers = [log_handler_stdout]
if level == logging.DEBUG:
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
handlers.append(logging.FileHandler(f"wtsexpoter-debug-{timestamp}.log", mode="w"))
log_handler_file = logging.FileHandler(f"wtsexpoter-debug-{timestamp}.log", mode="w")
log_handler_file.terminator = ""
log_handler_file.addFilter(ClearLineFilter())
handlers.append(log_handler_file)
logging.basicConfig(
level=level,
format="[%(levelname)s] %(message)s%(line_end)s",
@@ -755,23 +761,26 @@ def main():
parser = setup_argument_parser()
args = parser.parse_args()
# Check for updates
if args.check_update:
exit(check_update())
# Validate arguments
validate_args(parser, args)
# Print banner if not suppressed
if not args.no_banner:
print(WTSEXPORTER_BANNER)
if args.debug:
setup_logging(logging.DEBUG)
logger.debug("Debug mode enabled.\n")
logging.debug("Debug mode enabled.")
for handler in logging.getLogger().handlers:
if handler.name == "console":
handler.setLevel(logging.INFO)
else:
setup_logging(logging.INFO)
# Check for updates
if args.check_update:
exit(check_update())
# Validate arguments
validate_args(parser, args)
# Create output directory if it doesn't exist
os.makedirs(args.output, exist_ok=True)
@@ -834,7 +843,7 @@ def main():
ios_media_handler.extract_media(
args.backup, identifiers, args.decrypt_chunk_size)
else:
logger.info(
logging.info(
f"WhatsApp directory already exists, skipping WhatsApp file extraction.")
# Set default DB paths if not provided
@@ -851,7 +860,7 @@ def main():
args.pretty_print_json,
args.avoid_encoding_json
)
logger.info(f"Incremental merge completed successfully.")
logging.info(f"Incremental merge completed successfully.")
else:
# Process contacts
process_contacts(args, data)
@@ -869,7 +878,7 @@ def main():
# Handle media directory
handle_media_directory(args)
logger.info("Everything is done!")
logging.info("Everything is done!")
if __name__ == "__main__":

View File

@@ -25,7 +25,6 @@ else:
support_crypt15 = True
logger = logging.getLogger(__name__)
class DecryptionError(Exception):
@@ -126,7 +125,7 @@ def _decrypt_database(db_ciphertext: bytes, main_key: bytes, iv: bytes) -> bytes
raise ValueError("Decryption/Authentication failed. Ensure you are using the correct key.")
if len(db_compressed) < 2 or db_compressed[0] != 0x78:
logger.debug(f"Data passes GCM but is not Zlib. Header: {db_compressed[:2].hex()}")
logging.debug(f"Data passes GCM but is not Zlib. Header: {db_compressed[:2].hex()}")
raise ValueError(
"Key is correct, but decrypted data is not a valid compressed stream. "
"Is this even a valid WhatsApp database backup?"
@@ -171,12 +170,12 @@ def _decrypt_crypt14(database: bytes, main_key: bytes, max_worker: int = 10) ->
except (zlib.error, ValueError):
continue
else:
logger.debug(
logging.debug(
f"Decryption successful with known offsets: IV {iv}, DB {db}"
)
return decrypted_db # Successful decryption
logger.info(f"Common offsets failed. Will attempt to brute-force")
logging.info(f"Common offsets failed. Will attempt to brute-force")
offset_max = 200
workers = max_worker
check_offset = partial(_attempt_decrypt_task, database=database, main_key=main_key)
@@ -195,18 +194,18 @@ def _decrypt_crypt14(database: bytes, main_key: bytes, max_worker: int = 10) ->
found = True
break
if found:
logger.info(
logging.info(
f"The offsets of your IV and database are {start_iv} and {start_db}, respectively."
)
logger.info(
logging.info(
f"To include your offsets in the expoter, please report it in the discussion thread on GitHub:"
)
logger.info(f"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/discussions/47")
logging.info(f"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/discussions/47")
return result
except KeyboardInterrupt:
executor.shutdown(wait=False, cancel_futures=True)
print("\n")
logging.info("")
raise KeyboardInterrupt(
f"Brute force interrupted by user (Ctrl+C). Shutting down gracefully..."
)
@@ -346,7 +345,7 @@ def decrypt_backup(
main_key, hex_key = _derive_main_enc_key(key)
if show_crypt15:
hex_key_str = ' '.join([hex_key.hex()[c:c+4] for c in range(0, len(hex_key.hex()), 4)])
logger.info(f"The HEX key of the crypt15 backup is: {hex_key_str}")
logging.info(f"The HEX key of the crypt15 backup is: {hex_key_str}")
else:
main_key = key[126:]

View File

@@ -17,7 +17,6 @@ from Whatsapp_Chat_Exporter.utility import get_status_location, convert_time_uni
from Whatsapp_Chat_Exporter.utility import get_chat_condition, safe_name, bytes_to_readable, determine_metadata
logger = logging.getLogger(__name__)
def contacts(db, data, enrich_from_vcards):
@@ -38,14 +37,14 @@ def contacts(db, data, enrich_from_vcards):
if total_row_number == 0:
if enrich_from_vcards is not None:
logger.info(
"No contacts profiles found in the default database, contacts will be imported from the specified vCard file.\n")
logging.info(
"No contacts profiles found in the default database, contacts will be imported from the specified vCard file.")
else:
logger.warning(
"No contacts profiles found in the default database, consider using --enrich-from-vcards for adopting names from exported contacts from Google\n")
logging.warning(
"No contacts profiles found in the default database, consider using --enrich-from-vcards for adopting names from exported contacts from Google")
return False
else:
logger.info(f"Processed {total_row_number} contacts\n")
logging.info(f"Processed {total_row_number} contacts")
c.execute("SELECT jid, COALESCE(display_name, wa_name) as display_name, status FROM wa_contacts;")
@@ -56,7 +55,7 @@ def contacts(db, data, enrich_from_vcards):
current_chat.status = row["status"]
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} contacts in {convert_time_unit(total_time)}")
logging.info(f"Processed {total_row_number} contacts in {convert_time_unit(total_time)}")
return True
@@ -81,7 +80,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
content_cursor = _get_messages_cursor_legacy(c, filter_empty, filter_date, filter_chat)
table_message = False
except sqlite3.OperationalError as e:
logger.debug(f'Got sql error "{e}" in _get_message_cursor_legacy trying fallback.\n')
logging.debug(f'Got sql error "{e}" in _get_message_cursor_legacy trying fallback.\n')
try:
content_cursor = _get_messages_cursor_new(
c,
@@ -101,7 +100,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
pbar.update(1)
total_time = pbar.format_dict['elapsed']
_get_reactions(db, data)
logger.info(f"Processed {total_row_number} messages in {convert_time_unit(total_time)}")
logging.info(f"Processed {total_row_number} messages in {convert_time_unit(total_time)}")
# Helper functions for message processing
@@ -127,7 +126,7 @@ def _get_message_count(cursor, filter_empty, filter_date, filter_chat, jid_map_e
{include_filter}
{exclude_filter}""")
except sqlite3.OperationalError as e:
logger.debug(f'Got sql error "{e}" in _get_message_count trying fallback.\n')
logging.debug(f'Got sql error "{e}" in _get_message_count trying fallback.\n')
empty_filter = get_cond_for_empty(filter_empty, "key_remote_jid", "broadcast")
date_filter = f'AND timestamp {filter_date}' if filter_date is not None else ''
@@ -315,8 +314,8 @@ def _fetch_row_safely(cursor):
except sqlite3.OperationalError as e:
# Not sure how often this might happen, but this check should reduce the overhead
# if DEBUG flag is not set.
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'Got sql error "{e}" in _fetch_row_safely ignoring row.\n')
if logging.isEnabledFor(logging.DEBUG):
logging.debug(f'Got sql error "{e}" in _fetch_row_safely ignoring row.\n')
continue
@@ -518,7 +517,7 @@ def _get_reactions(db, data):
if c.fetchone()[0] == 0:
return
logger.info("Processing reactions...", extra={"clear": True})
logging.info("Processing reactions...", extra={"clear": True})
c.execute("""
SELECT
@@ -539,7 +538,7 @@ def _get_reactions(db, data):
ON chat.jid_row_id = chat_jid._id
""")
except sqlite3.OperationalError:
logger.warning(f"Could not fetch reactions (schema might be too old or incompatible)")
logging.warning(f"Could not fetch reactions (schema might be too old or incompatible)")
return
rows = c.fetchall()
@@ -574,7 +573,7 @@ def _get_reactions(db, data):
message.reactions[sender_name] = reaction
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} reactions in {convert_time_unit(total_time)}")
logging.info(f"Processed {total_row_number} reactions in {convert_time_unit(total_time)}")
def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=True, fix_dot_files=False):
@@ -595,7 +594,7 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa
try:
content_cursor = _get_media_cursor_legacy(c, filter_empty, filter_date, filter_chat)
except sqlite3.OperationalError as e:
logger.debug(f'Got sql error "{e}" in _get_media_cursor_legacy trying fallback.\n')
logging.debug(f'Got sql error "{e}" in _get_media_cursor_legacy trying fallback.\n')
content_cursor = _get_media_cursor_new(c, filter_empty, filter_date, filter_chat)
content = content_cursor.fetchone()
@@ -609,7 +608,7 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa
_process_single_media(data, content, media_folder, mime, separate_media, fix_dot_files)
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}")
logging.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}")
# Helper functions for media processing
@@ -637,7 +636,7 @@ def _get_media_count(cursor, filter_empty, filter_date, filter_chat):
{include_filter}
{exclude_filter}""")
except sqlite3.OperationalError as e:
logger.debug(f'Got sql error "{e}" in _get_media_count trying fallback.\n')
logging.debug(f'Got sql error "{e}" in _get_media_count trying fallback.\n')
empty_filter = get_cond_for_empty(filter_empty, "jid.raw_string", "broadcast")
date_filter = f'AND message.timestamp {filter_date}' if filter_date is not None else ''
include_filter = get_chat_condition(
@@ -814,7 +813,7 @@ def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty):
try:
rows = _execute_vcard_query_modern(c, filter_date, filter_chat, filter_empty)
except sqlite3.OperationalError as e:
logger.debug(f'Got sql error "{e}" in _execute_vcard_query_modern trying fallback.\n')
logging.debug(f'Got sql error "{e}" in _execute_vcard_query_modern trying fallback.\n')
rows = _execute_vcard_query_legacy(c, filter_date, filter_chat, filter_empty)
total_row_number = len(rows)
@@ -828,7 +827,7 @@ def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty):
_process_vcard_row(row, path, data)
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} vCards in {convert_time_unit(total_time)}")
logging.info(f"Processed {total_row_number} vCards in {convert_time_unit(total_time)}")
def _execute_vcard_query_modern(c, filter_date, filter_chat, filter_empty):
"""Execute vCard query for modern WhatsApp database schema."""
@@ -935,7 +934,7 @@ def calls(db, data, timezone_offset, filter_chat):
if total_row_number == 0:
return
logger.info(f"Processing calls...({total_row_number})", extra={"clear": True})
logging.info(f"Processing calls...({total_row_number})", extra={"clear": True})
# Fetch call data
calls_data = _fetch_calls_data(c, filter_chat)
@@ -952,7 +951,7 @@ def calls(db, data, timezone_offset, filter_chat):
# Add the calls chat to the data
data.add_chat("000000000000000", chat)
logger.info(f"Processed {total_row_number} calls in {convert_time_unit(total_time)}")
logging.info(f"Processed {total_row_number} calls in {convert_time_unit(total_time)}")
def _get_calls_count(c, filter_chat):
"""Get the count of call records that match the filter."""
@@ -1128,7 +1127,7 @@ def create_html(
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Generated {total_row_number} chats in {convert_time_unit(total_time)}")
logging.info(f"Generated {total_row_number} chats in {convert_time_unit(total_time)}")
def _generate_single_chat(current_chat, safe_file_name, name, contact, output_folder, template, w3css, headline):
"""Generate a single HTML file for a chat."""

View File

@@ -9,7 +9,6 @@ from Whatsapp_Chat_Exporter.data_model import ChatStore, Message
from Whatsapp_Chat_Exporter.utility import Device, convert_time_unit
logger = logging.getLogger(__name__)
def messages(path, data, assume_first_as_me=False):
@@ -43,7 +42,7 @@ def messages(path, data, assume_first_as_me=False):
)
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} messages & media in {convert_time_unit(total_time)}")
logging.info(f"Processed {total_row_number} messages & media in {convert_time_unit(total_time)}")
return data

View File

@@ -13,7 +13,6 @@ from Whatsapp_Chat_Exporter.utility import APPLE_TIME, get_chat_condition, Devic
from Whatsapp_Chat_Exporter.utility import bytes_to_readable, convert_time_unit, safe_name
logger = logging.getLogger(__name__)
def contacts(db, data):
@@ -21,7 +20,7 @@ def contacts(db, data):
c = db.cursor()
c.execute("""SELECT count() FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""")
total_row_number = c.fetchone()[0]
logger.info(f"Pre-processing contacts...({total_row_number})", extra={"clear": True})
logging.info(f"Pre-processing contacts...({total_row_number})", extra={"clear": True})
c.execute("""SELECT ZWHATSAPPID, ZABOUTTEXT FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""")
with tqdm(total=total_row_number, desc="Processing contacts", unit="contact", leave=False) as pbar:
@@ -35,7 +34,7 @@ def contacts(db, data):
data.add_chat(zwhatsapp_id, current_chat)
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Pre-processed {total_row_number} contacts in {convert_time_unit(total_time)}")
logging.info(f"Pre-processed {total_row_number} contacts in {convert_time_unit(total_time)}")
def process_contact_avatars(current_chat, media_folder, contact_id):
@@ -132,7 +131,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
process_contact_avatars(current_chat, media_folder, contact_id)
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} contacts in {convert_time_unit(total_time)}")
logging.info(f"Processed {total_row_number} contacts in {convert_time_unit(total_time)}")
# Get message count
message_count_query = f"""
@@ -149,7 +148,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
"""
c.execute(message_count_query)
total_row_number = c.fetchone()[0]
logger.info(f"Processing messages...(0/{total_row_number})", extra={"clear": True})
logging.info(f"Processing messages...(0/{total_row_number})", extra={"clear": True})
# Fetch messages
messages_query = f"""
@@ -226,7 +225,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} messages in {convert_time_unit(total_time)}")
logging.info(f"Processed {total_row_number} messages in {convert_time_unit(total_time)}")
def process_message_data(message, content, is_group_message, data, message_map, no_reply):
@@ -340,7 +339,7 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa
"""
c.execute(media_count_query)
total_row_number = c.fetchone()[0]
logger.info(f"Processing media...(0/{total_row_number})", extra={"clear": True})
logging.info(f"Processing media...(0/{total_row_number})", extra={"clear": True})
# Fetch media items
media_query = f"""
@@ -373,7 +372,7 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa
process_media_item(content, data, media_folder, mime, separate_media, fix_dot_files)
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}")
logging.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}")
def process_media_item(content, data, media_folder, mime, separate_media, fix_dot_files=False):
@@ -462,7 +461,7 @@ def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty):
c.execute(vcard_query)
contents = c.fetchall()
total_row_number = len(contents)
logger.info(f"Processing vCards...(0/{total_row_number})", extra={"clear": True})
logging.info(f"Processing vCards...(0/{total_row_number})", extra={"clear": True})
# Create vCards directory
path = f'{media_folder}/Message/vCards'
@@ -474,7 +473,7 @@ def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty):
process_vcard_item(content, path, data)
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} vCards in {convert_time_unit(total_time)}")
logging.info(f"Processed {total_row_number} vCards in {convert_time_unit(total_time)}")
def process_vcard_item(content, path, data):
@@ -566,7 +565,7 @@ def calls(db, data, timezone_offset, filter_chat):
# Add calls chat to data
data.add_chat("000000000000000", chat)
logger.info(f"Processed {total_row_number} calls in {convert_time_unit(total_time)}")
logging.info(f"Processed {total_row_number} calls in {convert_time_unit(total_time)}")
def process_call_record(content, chat, data, timezone_offset):

View File

@@ -18,7 +18,6 @@ else:
support_encrypted = True
logger = logging.getLogger(__name__)
class BackupExtractor:
@@ -60,7 +59,7 @@ class BackupExtractor:
return False
except sqlite3.DatabaseError as e:
if str(e) == "authorization denied" and osname == "darwin":
logger.error(
logging.error(
"You don't have permission to access the backup database. Please"
"check your permissions or try moving the backup to somewhere else."
)
@@ -73,13 +72,13 @@ class BackupExtractor:
Handles the extraction of data from an encrypted iOS backup.
"""
if not support_encrypted:
logger.error("You don't have the dependencies to handle encrypted backup."
logging.error("You don't have the dependencies to handle encrypted backup."
"Read more on how to deal with encrypted backup:"
"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/blob/main/README.md#usage"
)
return
logger.info(f"Encryption detected on the backup!")
logging.info(f"Encryption detected on the backup!")
password = getpass.getpass("Enter the password for the backup:")
sys.stdout.write("\033[F\033[K")
sys.stdout.flush()
@@ -93,7 +92,7 @@ class BackupExtractor:
Args:
password (str): The password for the encrypted backup.
"""
logger.info(f"Trying to open the iOS backup...")
logging.info(f"Trying to open the iOS backup...")
self.backup = EncryptedBackup(
backup_directory=self.base_dir,
passphrase=password,
@@ -101,8 +100,8 @@ class BackupExtractor:
check_same_thread=False,
decrypt_chunk_size=self.decrypt_chunk_size,
)
logger.info(f"iOS backup is opened successfully")
logger.info("Decrypting WhatsApp database...", extra={"clear": True})
logging.info(f"iOS backup is opened successfully")
logging.info("Decrypting WhatsApp database...", extra={"clear": True})
try:
self.backup.extract_file(
relative_path=RelativePath.WHATSAPP_MESSAGES,
@@ -120,17 +119,17 @@ class BackupExtractor:
output_filename=self.identifiers.CALL,
)
except ValueError:
logger.error("Failed to decrypt backup: incorrect password?")
logging.error("Failed to decrypt backup: incorrect password?")
exit(7)
except FileNotFoundError:
logger.error(
logging.error(
"Essential WhatsApp files are missing from the iOS backup. "
"Perhapse you enabled end-to-end encryption for the backup? "
"See https://wts.knugi.dev/docs.html?dest=iose2e"
)
exit(6)
else:
logger.info(f"WhatsApp database decrypted successfully")
logging.info(f"WhatsApp database decrypted successfully")
def _extract_decrypted_files(self):
"""Extract all WhatsApp files after decryption"""
@@ -150,7 +149,7 @@ class BackupExtractor:
)
total_time = pbar.format_dict['elapsed']
pbar.close()
logger.info(f"All required files are decrypted and extracted in {convert_time_unit(total_time)}")
logging.info(f"All required files are decrypted and extracted in {convert_time_unit(total_time)}")
def _extract_unencrypted_backup(self):
"""
@@ -169,10 +168,10 @@ class BackupExtractor:
if not os.path.isfile(wts_db_path):
if self.identifiers is WhatsAppIdentifier:
logger.error("WhatsApp database not found.")
logging.error("WhatsApp database not found.")
else:
logger.error("WhatsApp Business database not found.")
logger.error(
logging.error("WhatsApp Business database not found.")
logging.error(
"Essential WhatsApp files are missing from the iOS backup. "
"Perhapse you enabled end-to-end encryption for the backup? "
"See https://wts.knugi.dev/docs.html?dest=iose2e"
@@ -182,12 +181,12 @@ class BackupExtractor:
shutil.copyfile(wts_db_path, self.identifiers.MESSAGE)
if not os.path.isfile(contact_db_path):
logger.warning(f"Contact database not found. Skipping...")
logging.warning(f"Contact database not found. Skipping...")
else:
shutil.copyfile(contact_db_path, self.identifiers.CONTACT)
if not os.path.isfile(call_db_path):
logger.warning(f"Call database not found. Skipping...")
logging.warning(f"Call database not found. Skipping...")
else:
shutil.copyfile(call_db_path, self.identifiers.CALL)
@@ -236,7 +235,7 @@ class BackupExtractor:
os.utime(destination, (modification, modification))
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Extracted {total_row_number} WhatsApp files in {convert_time_unit(total_time)}")
logging.info(f"Extracted {total_row_number} WhatsApp files in {convert_time_unit(total_time)}")
def extract_media(base_dir, identifiers, decrypt_chunk_size):

View File

@@ -31,7 +31,6 @@ MAX_SIZE = 4 * 1024 * 1024 # Default 4MB
ROW_SIZE = 0x3D0
CURRENT_TZ_OFFSET = datetime.now().astimezone().utcoffset().seconds / 3600
logger = logging.getLogger(__name__)
def convert_time_unit(time_second: int) -> str:
@@ -168,7 +167,7 @@ def check_update():
try:
raw = urllib.request.urlopen(PACKAGE_JSON)
except Exception:
logger.error("Failed to check for updates.")
logging.error("Failed to check for updates.")
return 1
else:
with raw:
@@ -178,19 +177,19 @@ def check_update():
__version__ = importlib.metadata.version("whatsapp_chat_exporter")
current_version = tuple(map(int, __version__.split(".")))
if current_version < latest_version:
logger.info(
logging.info(
"===============Update===============\n"
"A newer version of WhatsApp Chat Exporter is available.\n"
f"Current version: {__version__}\n"
f"Latest version: {package_info['info']['version']}\n"
"A newer version of WhatsApp Chat Exporter is available."
f"Current version: {__version__}"
f"Latest version: {package_info['info']['version']}"
)
if platform == "win32":
logger.info("Update with: pip install --upgrade whatsapp-chat-exporter\n")
logging.info("Update with: pip install --upgrade whatsapp-chat-exporter")
else:
logger.info("Update with: pip3 install --upgrade whatsapp-chat-exporter\n")
logger.info("====================================\n")
logging.info("Update with: pip3 install --upgrade whatsapp-chat-exporter")
logging.info("====================================")
else:
logger.info("You are using the latest version of WhatsApp Chat Exporter.\n")
logging.info("You are using the latest version of WhatsApp Chat Exporter.")
return 0
@@ -253,7 +252,7 @@ def import_from_json(json_file: str, data: ChatCollection):
data.add_chat(jid, chat)
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Imported {total_row_number} chats from JSON in {convert_time_unit(total_time)}")
logging.info(f"Imported {total_row_number} chats from JSON in {convert_time_unit(total_time)}")
class IncrementalMerger:
@@ -283,10 +282,10 @@ class IncrementalMerger:
"""
json_files = [f for f in os.listdir(source_dir) if f.endswith('.json')]
if not json_files:
logger.error("No JSON files found in the source directory.")
logging.error("No JSON files found in the source directory.")
raise SystemExit(1)
logger.debug("JSON files found:", json_files)
logging.debug("JSON files found:", json_files)
return json_files
def _copy_new_file(self, source_path: str, target_path: str, target_dir: str, json_file: str) -> None:
@@ -298,7 +297,7 @@ class IncrementalMerger:
target_dir: Target directory path.
json_file: Name of the JSON file.
"""
logger.info(f"Copying '{json_file}' to target directory...")
logging.info(f"Copying '{json_file}' to target directory...")
os.makedirs(target_dir, exist_ok=True)
shutil.copy2(source_path, target_path)
@@ -388,7 +387,7 @@ class IncrementalMerger:
target_path: Path to target file.
json_file: Name of the JSON file.
"""
logger.info(f"Merging '{json_file}' with existing file in target directory...", extra={"clear": True})
logging.info(f"Merging '{json_file}' with existing file in target directory...", extra={"clear": True})
source_data = self._load_chat_data(source_path)
target_data = self._load_chat_data(target_path)
@@ -400,10 +399,10 @@ class IncrementalMerger:
merged_data = self._serialize_chats(merged_chats)
if self._has_changes(merged_data, target_data):
logger.info(f"Changes detected in '{json_file}', updating target file...")
logging.info(f"Changes detected in '{json_file}', updating target file...")
self._save_merged_data(target_path, merged_data)
else:
logger.info(f"No changes detected in '{json_file}', skipping update.")
logging.info(f"No changes detected in '{json_file}', skipping update.")
def _should_copy_media_file(self, source_file: str, target_file: str) -> bool:
"""Check if media file should be copied.
@@ -428,7 +427,7 @@ class IncrementalMerger:
source_media_path = os.path.join(source_dir, media_dir)
target_media_path = os.path.join(target_dir, media_dir)
logger.info(f"Merging media directories. Source: {source_media_path}, target: {target_media_path}")
logging.info(f"Merging media directories. Source: {source_media_path}, target: {target_media_path}")
if not os.path.exists(source_media_path):
return
@@ -443,7 +442,7 @@ class IncrementalMerger:
target_file = os.path.join(target_root, file)
if self._should_copy_media_file(source_file, target_file):
logger.debug(f"Copying '{source_file}' to '{target_file}'...")
logging.debug(f"Copying '{source_file}' to '{target_file}'...")
shutil.copy2(source_file, target_file)
def merge(self, source_dir: str, target_dir: str, media_dir: str) -> None:
@@ -456,7 +455,7 @@ class IncrementalMerger:
"""
json_files = self._get_json_files(source_dir)
logger.info("Starting incremental merge process...")
logging.info("Starting incremental merge process...")
for json_file in json_files:
source_path = os.path.join(source_dir, json_file)
target_path = os.path.join(target_dir, json_file)
@@ -893,7 +892,7 @@ def get_chat_type(chat_id: str) -> str:
return "status_broadcast"
elif chat_id.endswith("@broadcast"):
return "broadcast_channel"
logger.warning(f"Unknown chat type for {chat_id}, defaulting to private_group")
logging.warning(f"Unknown chat type for {chat_id}, defaulting to private_group")
return "private_group"

View File

@@ -6,7 +6,6 @@ from Whatsapp_Chat_Exporter.data_model import ChatStore
from Whatsapp_Chat_Exporter.utility import Device
logger = logging.getLogger(__name__)
class ExportedContactNumbers(TypedDict):
@@ -45,7 +44,7 @@ def decode_quoted_printable(value: str, charset: str) -> str:
return bytes_val.decode(charset, errors="replace")
except Exception:
# Fallback: return the original value if decoding fails
logger.warning(
logging.warning(
f"Failed to decode quoted-printable value: {value}, "
f"charset: {charset}. Please report this issue."
)
@@ -176,7 +175,7 @@ def read_vcards_file(vcf_file_path, default_country_code: str):
if contact := process_vcard_entry(vcard):
contacts.append(contact)
logger.info(f"Imported {len(contacts)} contacts/vcards")
logging.info(f"Imported {len(contacts)} contacts/vcards")
return map_number_to_name(contacts, default_country_code)