46 Commits

Author SHA1 Message Date
KnugiHK
2ebb389ad1 Fix the support on grouped vCard properties (#207 )
Parse and match vCard properties that use grouping prefixes (e.g. item1.TEL) by extracting the property name correctly.

Regression caused by the removal of the vobject dependency.
2026-04-01 01:04:51 +08:00
Knugi
0056204d87 Merge pull request #206 from tang-vu/contribai/improve/quality/crash-in-timestamp-formatting-when-timez
 Quality: Crash in timestamp formatting when timezone_offset is None
2026-03-26 21:56:43 +08:00
KnugiHK
abf4f3c814 Implement tests for classes TimeZone and Timing 2026-03-26 21:53:44 +08:00
KnugiHK
9e138d3a1f Cache TimeZone object in Timing class 2026-03-26 21:31:52 +08:00
KnugiHK
18a0d822b3 Timezone offset should also accepts float 2026-03-26 21:30:10 +08:00
KnugiHK
bb860533d5 Add a default value for timezone_offset in Timing.__init__ 2026-03-26 21:23:59 +08:00
Tang Vu
a2bcc39e63 refactor: crash in timestamp formatting when timezone_offset is none
In `Timing.format_timestamp`, if `self.timezone_offset` is `None` (which is explicitly allowed by the `Optional[int]` type hint), it instantiates `TimeZone(None)`. 
When `datetime.fromtimestamp()` calls the `utcoffset()` method on this timezone object, it executes `timedelta(hours=self.offset)`, which evaluates to `timedelta(hours=None)`. This raises a `TypeError: unsupported type for timedelta hours component: NoneType`, causing the application to crash during export.


Affected files: data_model.py

Signed-off-by: Tang Vu <vuminhtang2212@gmail.com>
2026-03-26 03:25:44 +07:00
KnugiHK
cf1f37db38 Update compile-binary.yml 2026-03-21 18:04:16 +08:00
KnugiHK
8781459547 Update actions 2026-03-21 18:00:50 +08:00
Knugi
2b54225689 Merge pull request #194 from SoLoHK525/features/exif-support
Add support for exif timestamp export
2026-03-21 17:32:16 +08:00
KnugiHK
a161993ab9 Normalize media path handling in handlers 2026-03-21 17:22:47 +08:00
KnugiHK
8dfced9d94 Avoid self-copy when no timestamp 2026-03-21 17:16:32 +08:00
KnugiHK
517fd2d3ac Add back missing comma 2026-03-21 16:42:25 +08:00
SoLoHK525
37a52c0d7d Add support for exif timestamp export 2026-03-21 16:25:19 +08:00
Knugi
bad813eb73 Merge pull request #199 from watercrossing/fix/ios-polls-contacts-typo
iOS: Add poll support, fix contact name resolution, fix typos
2026-03-21 16:02:40 +08:00
KnugiHK
014f830fa0 Update test_incremental_merge.py 2026-03-21 15:53:20 +08:00
KnugiHK
a4d79c3892 Add backward compatibility to contact database processing
Because ZLID does not exist in the old schema
2026-03-21 15:46:00 +08:00
KnugiHK
daf6375966 Implement aliases for chat. For LID mapping 2026-03-21 15:36:32 +08:00
KnugiHK
b1886fd737 Better parse metadata message in iOS 2026-02-22 18:35:04 +08:00
KnugiHK
09a26dbe04 Add placeholder for Android on polls 2026-02-22 18:00:20 +08:00
Ingolf Becker
99474e65cc iOS: Add poll support, fix contact name resolution, fix typos
- Add poll message decoding for iOS (ZMESSAGETYPE 46) using raw protobuf
  parsing of ZWAMESSAGEINFO.ZRECEIPTINFO blobs (no external dependency).
  Polls render with vote tallies and voter names in the HTML export.
- Fix iOS contact name resolution: pull ZFULLNAME from address book,
  resolve LID-based group members, fall back to ZWAPROFILEPUSHNAME,
  and avoid overwriting real names with phone numbers.
- Fix typo: 'expoter' -> 'exporter' in android_crypt.py and __main__.py.
- Add poll field to Message data model and update test fixtures.
2026-02-22 17:44:22 +08:00
KnugiHK
ea396f0885 Update README.md 2026-02-22 17:44:22 +08:00
KnugiHK
5fa0a613b0 Fix legacy call query
This add back the backward compatibility for the legacy schema, which is broken in 0.13.0
2026-02-22 16:52:04 +08:00
KnugiHK
a0719bc2bf Update README.md 2026-01-24 23:05:00 +08:00
KnugiHK
bac2efe15a Revert "Update README.md"
This reverts commit 1c7d6f7912.
2026-01-24 18:33:10 +08:00
KnugiHK
9a6ee3ce5f Revert "Add iphone_backup_decrypt as an optional dependency (#123)"
This reverts commit 94960e4a23.
2026-01-24 18:31:59 +08:00
KnugiHK
823a89e677 Merge branch 'dev' 2026-01-24 18:21:36 +08:00
KnugiHK
945b422f71 Update ci.yml 2026-01-24 18:21:25 +08:00
KnugiHK
19008a80bc Merge branch 'dev' 2026-01-24 18:09:15 +08:00
KnugiHK
4e877987fb Bump version & update readme 2026-01-24 18:08:43 +08:00
KnugiHK
322b12a5a4 Fix a crash in message counting if chat filter is in use 2026-01-24 18:02:30 +08:00
KnugiHK
1560c49644 Update ci.yml 2026-01-24 17:42:02 +08:00
KnugiHK
28ba97d72f Fix CI on Windows 2026-01-24 17:38:22 +08:00
KnugiHK
eab98ba0d6 Fix crash on pre-release versions and enable update checks for pre-releases 2026-01-24 17:20:07 +08:00
KnugiHK
f920ca82b4 Refactor the logging facility a bit 2026-01-24 17:05:14 +08:00
KnugiHK
4eed3ca321 Refactor CLEAR_LINE in a more pythonic way
So it is easier for contributor to write a logging line for this project.
2026-01-24 16:48:07 +08:00
KnugiHK
746e4e1ac5 Fix and improve the logging facility for incremental merge 2026-01-24 16:24:10 +08:00
KnugiHK
1694ae7dd9 Update utility.py 2026-01-24 01:47:45 +08:00
KnugiHK
f05e0d3451 Refactor incremental_merge 2026-01-24 01:33:18 +08:00
KnugiHK
0c5f2b7f13 Add a comment on SQLi in get_chat_condition 2026-01-24 01:19:55 +08:00
KnugiHK
db01d05263 Refactor get_chat_condition to increase maintainability 2026-01-24 00:50:06 +08:00
KnugiHK
2e7953f4ca Add unit test for get_chat_condition 2026-01-24 00:03:21 +08:00
KnugiHK
95a52231be Fix the returning string for empty filter list 2026-01-24 00:03:08 +08:00
Knugi
e0aab06192 Update LICENSE 2026-01-21 16:06:12 +00:00
Knugi
43b00d8b48 Update README.md 2026-01-21 14:28:41 +00:00
Knugi
32c93159ac Update ci.yml 2026-01-12 15:50:19 +00:00
23 changed files with 1632 additions and 343 deletions

View File

@@ -4,7 +4,7 @@ on:
push:
branches:
- dev
pull_request:
jobs:
ci:
runs-on: ${{ matrix.os }}
@@ -18,21 +18,24 @@ jobs:
include:
- os: windows-latest
python-version: "3.13"
python_utf8: "1"
- os: macos-latest
python-version: "3.13"
- os: windows-11-arm
python-version: "3.13"
python_utf8: "1"
- os: macos-15-intel
python-version: "3.13"
- os: windows-latest
python-version: "3.14"
python_utf8: "1"
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v6
- name: Set up Python ${{ matrix.python-version }} on ${{ matrix.os }}
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: ${{ matrix.python-version }}
@@ -42,4 +45,6 @@ jobs:
pip install .[all] pytest nuitka
- name: Run pytest
env:
PYTHONUTF8: ${{ matrix.python_utf8 || '0' }}
run: pytest

View File

@@ -34,7 +34,7 @@ jobs:
uses: actions/attest-build-provenance@v3
with:
subject-path: ./wtsexporter_linux_x64
- uses: actions/upload-artifact@v6
- uses: actions/upload-artifact@v7
with:
name: binary-linux-x64
path: ./wtsexporter_linux_x64
@@ -58,10 +58,10 @@ jobs:
Rename-Item -Path "wtsexporter.exe" -NewName "wtsexporter_win_x64.exe"
Get-FileHash wtsexporter_win_x64.exe
- name: Generate artifact attestation
uses: actions/attest-build-provenance@v3
uses: actions/attest-build-provenance@v4
with:
subject-path: .\wtsexporter_win_x64.exe
- uses: actions/upload-artifact@v6
- uses: actions/upload-artifact@v7
with:
name: binary-windows-x64
path: .\wtsexporter_win_x64.exe
@@ -85,10 +85,10 @@ jobs:
Rename-Item -Path "wtsexporter.exe" -NewName "wtsexporter_win_arm64.exe"
Get-FileHash wtsexporter_win_arm64.exe
- name: Generate artifact attestation
uses: actions/attest-build-provenance@v3
uses: actions/attest-build-provenance@v4
with:
subject-path: .\wtsexporter_win_arm64.exe
- uses: actions/upload-artifact@v6
- uses: actions/upload-artifact@v7
with:
name: binary-windows-arm64
path: .\wtsexporter_win_arm64.exe
@@ -114,10 +114,10 @@ jobs:
mv wtsexporter wtsexporter_macos_arm64
shasum -a 256 wtsexporter_macos_arm64
- name: Generate artifact attestation
uses: actions/attest-build-provenance@v3
uses: actions/attest-build-provenance@v4
with:
subject-path: ./wtsexporter_macos_arm64
- uses: actions/upload-artifact@v6
- uses: actions/upload-artifact@v7
with:
name: binary-macos-arm64
path: ./wtsexporter_macos_arm64
@@ -146,7 +146,7 @@ jobs:
uses: actions/attest-build-provenance@v3
with:
subject-path: ./wtsexporter_macos_x64
- uses: actions/upload-artifact@v6
- uses: actions/upload-artifact@v7
with:
name: binary-macos-x64
path: ./wtsexporter_macos_x64

View File

@@ -22,9 +22,9 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v6
- name: Set up Python
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: '3.x'
- name: Install dependencies

View File

@@ -1,6 +1,6 @@
MIT License
Copyright (c) 2021-2025 Knugi
Copyright (c) 2021-2026 Knugi
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

View File

@@ -113,9 +113,9 @@ Do an iPhone/iPad Backup with iTunes/Finder first.
> [!NOTE]
> If you are working on unencrypted iOS/iPadOS backup, skip this.
If you want to work on an encrypted iOS/iPadOS Backup, you should install iphone_backup_decrypt from [KnugiHK/iphone_backup_decrypt](https://github.com/KnugiHK/iphone_backup_decrypt) before you run the extract_iphone_media.py.
If you want to work on an encrypted iOS/iPadOS Backup, you should install `iphone_backup_decrypt` from [KnugiHK/iphone_backup_decrypt](https://github.com/KnugiHK/iphone_backup_decrypt) before you run the extract_iphone_media.py.
```sh
pip install whatsapp-chat-exporter["ios_backup"]
pip install git+https://github.com/KnugiHK/iphone_backup_decrypt
```
> [!NOTE]
> You will need to disable the built-in end-to-end encryption for WhatsApp backups. See [WhatsApp's FAQ](https://faq.whatsapp.com/490592613091019#turn-off-end-to-end-encrypted-backup) for how to do it.
@@ -141,24 +141,33 @@ After extracting, you will get this:
![Private Message](imgs/pm.png)
## Working with Business
If you are working with WhatsApp Business, add the `--business` flag to the command
```sh
wtsexporter -a --business ...other flags
wtsexporter -i --business ...other flags
```
## More options
Invoke the wtsexporter with --help option will show you all options available.
```sh
> wtsexporter --help
usage: wtsexporter [-h] [--debug] [-a] [-i] [-e EXPORTED] [-w WA] [-m MEDIA] [-b BACKUP] [-d DB] [-k [KEY]]
[--call-db [CALL_DB_IOS]] [--wab WAB] [-o OUTPUT] [-j [JSON]] [--txt [TEXT_FORMAT]] [--no-html]
[--size [SIZE]] [--no-reply] [--avoid-encoding-json] [--pretty-print-json [PRETTY_PRINT_JSON]]
[--tg] [--per-chat] [--import] [-t TEMPLATE] [--offline OFFLINE] [--no-avatar] [--old-theme]
[--headline HEADLINE] [-c] [--create-separated-media] [--time-offset {-12 to 14}] [--date DATE]
usage: wtsexporter [-h] [--debug] [-a] [-i] [-e EXPORTED] [-w WA] [-m MEDIA] [-b BACKUP] [-d DB]
[-k [KEY]] [--call-db [CALL_DB_IOS]] [--wab WAB] [-o OUTPUT] [-j [JSON]]
[--txt [TEXT_FORMAT]] [--no-html] [--size [SIZE]] [--no-reply] [--avoid-encoding-json]
[--pretty-print-json [PRETTY_PRINT_JSON]] [--tg] [--per-chat] [--import] [-t TEMPLATE]
[--offline OFFLINE] [--no-avatar] [--old-theme] [--headline HEADLINE] [-c]
[--create-separated-media] [--time-offset {-12 to 14}] [--date DATE]
[--date-format FORMAT] [--include [phone number ...]] [--exclude [phone number ...]]
[--dont-filter-empty] [--enrich-from-vcards ENRICH_FROM_VCARDS]
[--default-country-code DEFAULT_COUNTRY_CODE] [--incremental-merge] [--source-dir SOURCE_DIR]
[--target-dir TARGET_DIR] [-s] [--check-update] [--assume-first-as-me] [--business]
[--decrypt-chunk-size DECRYPT_CHUNK_SIZE] [--max-bruteforce-worker MAX_BRUTEFORCE_WORKER]
[--no-banner]
[--default-country-code DEFAULT_COUNTRY_CODE] [--incremental-merge]
[--source-dir SOURCE_DIR] [--target-dir TARGET_DIR] [-s] [--check-update]
[--check-update-pre] [--assume-first-as-me] [--business]
[--decrypt-chunk-size DECRYPT_CHUNK_SIZE]
[--max-bruteforce-worker MAX_BRUTEFORCE_WORKER] [--no-banner] [--fix-dot-files]
A customizable Android and iOS/iPadOS WhatsApp database parser that will give you the history of your WhatsApp
conversations in HTML and JSON. Android Backup Crypt12, Crypt14 and Crypt15 supported.
A customizable Android and iOS/iPadOS WhatsApp database parser that will give you the history of your
WhatsApp conversations in HTML and JSON. Android Backup Crypt12, Crypt14 and Crypt15 supported.
options:
-h, --help show this help message and exit
@@ -174,9 +183,10 @@ Input Files:
-w, --wa WA Path to contact database (default: wa.db/ContactsV2.sqlite)
-m, --media MEDIA Path to WhatsApp media folder (default: WhatsApp)
-b, --backup BACKUP Path to Android (must be used together with -k)/iOS WhatsApp backup
-d, --db DB Path to database file (default: msgstore.db/7c7fba66680ef796b916b067077cc246adacf01d)
-k, --key [KEY] Path to key file. If this option is set for crypt15 backup but nothing is specified, you will
be prompted to enter the key.
-d, --db DB Path to database file (default:
msgstore.db/7c7fba66680ef796b916b067077cc246adacf01d)
-k, --key [KEY] Path to key file. If this option is set for crypt15 backup but nothing is
specified, you will be prompted to enter the key.
--call-db [CALL_DB_IOS]
Path to call database (default: 1b432994e958845fffe8e2f190f26d1511534088) iOS only
--wab, --wa-backup WAB
@@ -185,8 +195,8 @@ Input Files:
Output Options:
-o, --output OUTPUT Output to specific directory (default: result)
-j, --json [JSON] Save the result to a single JSON file (default if present: result.json)
--txt [TEXT_FORMAT] Export chats in text format similar to what WhatsApp officially provided (default if present:
result/)
--txt [TEXT_FORMAT] Export chats in text format similar to what WhatsApp officially provided (default
if present: result/)
--no-html Do not output html files
--size, --output-size, --split [SIZE]
Maximum (rough) size of a single output file in bytes, 0 for auto
@@ -197,7 +207,8 @@ JSON Options:
Don't encode non-ascii characters in the output JSON files
--pretty-print-json [PRETTY_PRINT_JSON]
Pretty print the output JSON.
--tg, --telegram Output the JSON in a format compatible with Telegram export (implies json-per-chat)
--tg, --telegram Output the JSON in a format compatible with Telegram export (implies json-per-
chat)
--per-chat Output the JSON file per chat
--import Import JSON file and convert to HTML output
@@ -207,7 +218,8 @@ HTML Options:
--offline OFFLINE Relative path to offline static files
--no-avatar Do not render avatar in HTML output
--old-theme Use the old Telegram-alike theme
--headline HEADLINE The custom headline for the HTML output. Use '??' as a placeholder for the chat name
--headline HEADLINE The custom headline for the HTML output. Use '??' as a placeholder for the chat
name
Media Handling:
-c, --move-media Move the media directory to output directory if the flag is set, otherwise copy it
@@ -223,24 +235,26 @@ Filtering Options:
Include chats that match the supplied phone number
--exclude [phone number ...]
Exclude chats that match the supplied phone number
--dont-filter-empty By default, the exporter will not render chats with no valid message. Setting this flag will
cause the exporter to render those. This is useful if chat(s) are missing from the output
--dont-filter-empty By default, the exporter will not render chats with no valid message. Setting this
flag will cause the exporter to render those. This is useful if chat(s) are
missing from the output
Contact Enrichment:
--enrich-from-vcards ENRICH_FROM_VCARDS
Path to an exported vcf file from Google contacts export. Add names missing from WhatsApp's
default database
Path to an exported vcf file from Google contacts export. Add names missing from
WhatsApp's default database
--default-country-code DEFAULT_COUNTRY_CODE
Use with --enrich-from-vcards. When numbers in the vcf file does not have a country code, this
will be used. 1 is for US, 66 for Thailand etc. Most likely use the number of your own country
Use with --enrich-from-vcards. When numbers in the vcf file does not have a
country code, this will be used. 1 is for US, 66 for Thailand etc. Most likely use
the number of your own country
Incremental Merging:
--incremental-merge Performs an incremental merge of two exports. Requires setting both --source-dir and --target-
dir. The chats (JSON files only) and media from the source directory will be merged into the
target directory. No chat messages or media will be deleted from the target directory; only
new chat messages and media will be added to it. This enables chat messages and media to be
deleted from the device to free up space, while ensuring they are preserved in the exported
backups.
--incremental-merge Performs an incremental merge of two exports. Requires setting both --source-dir
and --target-dir. The chats (JSON files only) and media from the source directory
will be merged into the target directory. No chat messages or media will be
deleted from the target directory; only new chat messages and media will be added
to it. This enables chat messages and media to be deleted from the device to free
up space, while ensuring they are preserved in the exported backups.
--source-dir SOURCE_DIR
Sets the source directory. Used for performing incremental merges.
--target-dir TARGET_DIR
@@ -249,16 +263,20 @@ Incremental Merging:
Miscellaneous:
-s, --showkey Show the HEX key used to decrypt the database
--check-update Check for updates (require Internet access)
--check-update-pre Check for updates including pre-releases (require Internet access)
--assume-first-as-me Assume the first message in a chat as sent by me (must be used together with -e)
--business Use Whatsapp Business default files (iOS only)
--decrypt-chunk-size DECRYPT_CHUNK_SIZE
Specify the chunk size for decrypting iOS backup, which may affect the decryption speed.
Specify the chunk size for decrypting iOS backup, which may affect the decryption
speed.
--max-bruteforce-worker MAX_BRUTEFORCE_WORKER
Specify the maximum number of worker for bruteforce decryption.
--no-banner Do not show the banner
--fix-dot-files Fix files with a dot at the end of their name (allowing the outputs be stored in
FAT filesystems)
WhatsApp Chat Exporter: 0.13.0rc2 Licensed with MIT. See https://wts.knugi.dev/docs?dest=osl for all open source
licenses.
WhatsApp Chat Exporter: 0.13.0 Licensed with MIT. See https://wts.knugi.dev/docs?dest=osl for all open
source licenses.
```
# Verifying Build Integrity
@@ -266,7 +284,7 @@ licenses.
To ensure that the binaries provided in the releases were built directly from this source code via GitHub Actions and have not been tampered with, GitHub Artifact Attestations is used. You can verify the authenticity of any pre-built binaries using the GitHub CLI.
> [!NOTE]
> Requires version 0.13.0rc1 or newer. Legacy binaries are unsupported.
> Requires version 0.13.0 or newer. Legacy binaries are unsupported.
### Using Bash (Linux/WSL/macOS)

View File

@@ -12,7 +12,7 @@ import importlib.metadata
from Whatsapp_Chat_Exporter import android_crypt, exported_handler, android_handler
from Whatsapp_Chat_Exporter import ios_handler, ios_media_handler
from Whatsapp_Chat_Exporter.data_model import ChatCollection, ChatStore, Timing
from Whatsapp_Chat_Exporter.utility import APPLE_TIME, CLEAR_LINE, CURRENT_TZ_OFFSET, Crypt
from Whatsapp_Chat_Exporter.utility import APPLE_TIME, CURRENT_TZ_OFFSET, Crypt
from Whatsapp_Chat_Exporter.utility import readable_to_bytes, safe_name, bytes_to_readable
from Whatsapp_Chat_Exporter.utility import import_from_json, incremental_merge, check_update
from Whatsapp_Chat_Exporter.utility import telegram_json_format, convert_time_unit, DbType
@@ -26,7 +26,6 @@ from typing import Optional, List, Dict
from Whatsapp_Chat_Exporter.vcards_contacts import ContactsFromVCards
logger = logging.getLogger(__name__)
__version__ = importlib.metadata.version("whatsapp_chat_exporter")
WTSEXPORTER_BANNER = f"""========================================================================================================
██╗ ██╗██╗ ██╗ █████╗ ████████╗███████╗ █████╗ ██████╗ ██████╗
@@ -198,6 +197,17 @@ def setup_argument_parser() -> ArgumentParser:
help="Create a copy of the media seperated per chat in <MEDIA>/separated/ directory"
)
# Media Timestamp Options
timestamp_group = parser.add_argument_group('Media Timestamp Options')
timestamp_group.add_argument(
"--embed-exif", dest="embed_exif", default=False, action='store_true',
help="Embed message timestamp in EXIF data of media files (requires piexif/Pillow)"
)
timestamp_group.add_argument(
"--rename-media", dest="rename_media", default=False, action='store_true',
help="Rename media files with timestamp prefix (YYYY-MM-DD_HH-MM-SS_filename)"
)
# Filtering options
filter_group = parser.add_argument_group('Filtering Options')
filter_group.add_argument(
@@ -275,6 +285,10 @@ def setup_argument_parser() -> ArgumentParser:
"--check-update", dest="check_update", default=False, action='store_true',
help="Check for updates (require Internet access)"
)
misc_group.add_argument(
"--check-update-pre", dest="check_update_pre", default=False, action='store_true',
help="Check for updates including pre-releases (require Internet access)"
)
misc_group.add_argument(
"--assume-first-as-me", dest="assume_first_as_me", default=False, action='store_true',
help="Assume the first message in a chat as sent by me (must be used together with -e)"
@@ -368,6 +382,17 @@ def validate_args(parser: ArgumentParser, args) -> None:
validate_chat_filters(parser, args.filter_chat_include)
validate_chat_filters(parser, args.filter_chat_exclude)
# EXIF dependency validation
if args.embed_exif:
try:
import piexif
from PIL import Image
except ImportError:
parser.error(
"--embed-exif requires piexif and Pillow. "
"Install with: pip install whatsapp-chat-exporter[media_timestamp]"
)
def validate_chat_filters(parser: ArgumentParser, chat_filter: Optional[List[str]]) -> None:
"""Validate chat filters to ensure they contain only phone numbers."""
@@ -440,10 +465,10 @@ def setup_contact_store(args) -> Optional['ContactsFromVCards']:
def decrypt_android_backup(args) -> int:
"""Decrypt Android backup files and return error code."""
if args.key is None or args.backup is None:
logger.error(f"You must specify the backup file with -b and a key with -k{CLEAR_LINE}")
logging.error(f"You must specify the backup file with -b and a key with -k")
return 1
logger.info(f"Decryption key specified, decrypting WhatsApp backup...{CLEAR_LINE}")
logging.info(f"Decryption key specified, decrypting WhatsApp backup...")
# Determine crypt type
if "crypt12" in args.backup:
@@ -453,8 +478,8 @@ def decrypt_android_backup(args) -> int:
elif "crypt15" in args.backup:
crypt = Crypt.CRYPT15
else:
logger.error(
f"Unknown backup format. The backup file must be crypt12, crypt14 or crypt15.{CLEAR_LINE}")
logging.error(
f"Unknown backup format. The backup file must be crypt12, crypt14 or crypt15.")
return 1
# Get key
@@ -506,15 +531,15 @@ def decrypt_android_backup(args) -> int:
def handle_decrypt_error(error: int) -> None:
"""Handle decryption errors with appropriate messages."""
if error == 1:
logger.error("Dependencies of decrypt_backup and/or extract_encrypted_key"
" are not present. For details, see README.md.\n")
logging.error("Dependencies of decrypt_backup and/or extract_encrypted_key"
" are not present. For details, see README.md.")
exit(3)
elif error == 2:
logger.error("Failed when decompressing the decrypted backup. "
"Possibly incorrect offsets used in decryption.\n")
logging.error("Failed when decompressing the decrypted backup. "
"Possibly incorrect offsets used in decryption.")
exit(4)
else:
logger.error("Unknown error occurred.\n")
logging.error("Unknown error occurred.")
exit(5)
@@ -537,9 +562,9 @@ def process_messages(args, data: ChatCollection) -> None:
msg_db = args.db if args.db else "msgstore.db" if args.android else args.identifiers.MESSAGE
if not os.path.isfile(msg_db):
logger.error(
logging.error(
"The message database does not exist. You may specify the path "
"to database file with option -d or check your provided path.\n"
"to database file with option -d or check your provided path."
)
exit(6)
@@ -566,7 +591,8 @@ def process_messages(args, data: ChatCollection) -> None:
# Process media
message_handler.media(
db, data, args.media, args.filter_date,
filter_chat, args.filter_empty, args.separate_media, args.fix_dot_files
filter_chat, args.filter_empty, args.separate_media, args.fix_dot_files,
args.embed_exif, args.rename_media, args.timezone_offset
)
# Process vcards
@@ -575,6 +601,12 @@ def process_messages(args, data: ChatCollection) -> None:
filter_chat, args.filter_empty
)
# Process polls
message_handler.polls(
db, data, args.filter_date,
filter_chat, args.filter_empty
)
# Process calls
process_calls(args, db, data, filter_chat, timing)
@@ -596,21 +628,21 @@ def handle_media_directory(args) -> None:
media_path = os.path.join(args.output, args.media)
if os.path.isdir(media_path):
logger.info(
f"WhatsApp directory already exists in output directory. Skipping...{CLEAR_LINE}")
logging.info(
f"WhatsApp directory already exists in output directory. Skipping...")
else:
if args.move_media:
try:
logger.info(f"Moving media directory...\r")
logging.info(f"Moving media directory...", extra={"clear": True})
shutil.move(args.media, f"{args.output}/")
logger.info(f"Media directory has been moved to the output directory{CLEAR_LINE}")
logging.info(f"Media directory has been moved to the output directory")
except PermissionError:
logger.warning("Cannot remove original WhatsApp directory. "
"Perhaps the directory is opened?\n")
logging.warning("Cannot remove original WhatsApp directory. "
"Perhaps the directory is opened?")
else:
logger.info(f"Copying media directory...\r")
logging.info(f"Copying media directory...", extra={"clear": True})
shutil.copytree(args.media, media_path)
logger.info(f"Media directory has been copied to the output directory{CLEAR_LINE}")
logging.info(f"Media directory has been copied to the output directory")
def create_output_files(args, data: ChatCollection) -> None:
@@ -631,7 +663,7 @@ def create_output_files(args, data: ChatCollection) -> None:
# Create text files if requested
if args.text_format:
logger.info(f"Writing text file...{CLEAR_LINE}")
logging.info(f"Writing text file...")
android_handler.create_txt(data, args.text_format)
# Create JSON files if requested
@@ -661,9 +693,9 @@ def export_single_json(args, data: Dict) -> None:
ensure_ascii=not args.avoid_encoding_json,
indent=args.pretty_print_json
)
logger.info(f"Writing JSON file...\r")
logging.info(f"Writing JSON file...", extra={"clear": True})
f.write(json_data)
logger.info(f"JSON file saved...({bytes_to_readable(len(json_data))}){CLEAR_LINE}")
logging.info(f"JSON file saved...({bytes_to_readable(len(json_data))})")
def export_multiple_json(args, data: Dict) -> None:
@@ -697,7 +729,7 @@ def export_multiple_json(args, data: Dict) -> None:
f.write(file_content)
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Generated {total} JSON files in {convert_time_unit(total_time)}{CLEAR_LINE}")
logging.info(f"Generated {total} JSON files in {convert_time_unit(total_time)}")
def process_exported_chat(args, data: ChatCollection) -> None:
@@ -722,16 +754,36 @@ def process_exported_chat(args, data: ChatCollection) -> None:
shutil.copy(file, args.output)
class ClearLineFilter(logging.Filter):
def filter(self, record):
is_clear = getattr(record, 'clear', False)
if is_clear:
record.line_end = "\r"
record.prefix = "\x1b[K"
else:
record.line_end = "\n"
record.prefix = ""
return True
def setup_logging(level):
log_handler_stdout = logging.StreamHandler()
log_handler_stdout.terminator = ""
log_handler_stdout.addFilter(ClearLineFilter())
log_handler_stdout.set_name("console")
handlers = [log_handler_stdout]
if level == logging.DEBUG:
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
handlers.append(logging.FileHandler(f"wtsexpoter-debug-{timestamp}.log", mode="w"))
log_handler_file = logging.FileHandler(f"wtsexporter-debug-{timestamp}.log", mode="w")
log_handler_file.terminator = ""
log_handler_file.addFilter(ClearLineFilter())
handlers.append(log_handler_file)
logging.basicConfig(
level=level,
format="[%(levelname)s] %(message)s",
format="[%(levelname)s] %(message)s%(line_end)s",
handlers=handlers
)
@@ -742,23 +794,29 @@ def main():
parser = setup_argument_parser()
args = parser.parse_args()
# Check for updates
if args.check_update:
exit(check_update())
# Validate arguments
validate_args(parser, args)
# Print banner if not suppressed
if not args.no_banner:
# Note: This may raise UnicodeEncodeError on Windows if the terminal
# doesn't support UTF-8 (e.g., Legacy CMD). Use a modern terminal
# or set PYTHONUTF8=1 in your environment.
print(WTSEXPORTER_BANNER)
if args.debug:
setup_logging(logging.DEBUG)
logger.debug("Debug mode enabled.\n")
logging.debug("Debug mode enabled.")
for handler in logging.getLogger().handlers:
if handler.name == "console":
handler.setLevel(logging.INFO)
else:
setup_logging(logging.INFO)
# Check for updates
if args.check_update or args.check_update_pre:
exit(check_update(args.check_update_pre))
# Validate arguments
validate_args(parser, args)
# Create output directory if it doesn't exist
os.makedirs(args.output, exist_ok=True)
@@ -821,8 +879,8 @@ def main():
ios_media_handler.extract_media(
args.backup, identifiers, args.decrypt_chunk_size)
else:
logger.info(
f"WhatsApp directory already exists, skipping WhatsApp file extraction.{CLEAR_LINE}")
logging.info(
f"WhatsApp directory already exists, skipping WhatsApp file extraction.")
# Set default DB paths if not provided
if args.db is None:
@@ -838,7 +896,7 @@ def main():
args.pretty_print_json,
args.avoid_encoding_json
)
logger.info(f"Incremental merge completed successfully.{CLEAR_LINE}")
logging.info(f"Incremental merge completed successfully.")
else:
# Process contacts
process_contacts(args, data)
@@ -856,7 +914,7 @@ def main():
# Handle media directory
handle_media_directory(args)
logger.info("Everything is done!")
logging.info("Everything is done!")
if __name__ == "__main__":

View File

@@ -7,7 +7,7 @@ from tqdm import tqdm
from typing import Tuple, Union
from hashlib import sha256
from functools import partial
from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, CRYPT14_OFFSETS, Crypt, DbType
from Whatsapp_Chat_Exporter.utility import CRYPT14_OFFSETS, Crypt, DbType
try:
import zlib
@@ -25,7 +25,6 @@ else:
support_crypt15 = True
logger = logging.getLogger(__name__)
class DecryptionError(Exception):
@@ -126,7 +125,7 @@ def _decrypt_database(db_ciphertext: bytes, main_key: bytes, iv: bytes) -> bytes
raise ValueError("Decryption/Authentication failed. Ensure you are using the correct key.")
if len(db_compressed) < 2 or db_compressed[0] != 0x78:
logger.debug(f"Data passes GCM but is not Zlib. Header: {db_compressed[:2].hex()}")
logging.debug(f"Data passes GCM but is not Zlib. Header: {db_compressed[:2].hex()}")
raise ValueError(
"Key is correct, but decrypted data is not a valid compressed stream. "
"Is this even a valid WhatsApp database backup?"
@@ -171,12 +170,12 @@ def _decrypt_crypt14(database: bytes, main_key: bytes, max_worker: int = 10) ->
except (zlib.error, ValueError):
continue
else:
logger.debug(
f"Decryption successful with known offsets: IV {iv}, DB {db}{CLEAR_LINE}"
logging.debug(
f"Decryption successful with known offsets: IV {iv}, DB {db}"
)
return decrypted_db # Successful decryption
logger.info(f"Common offsets failed. Will attempt to brute-force{CLEAR_LINE}")
logging.info(f"Common offsets failed. Will attempt to brute-force")
offset_max = 200
workers = max_worker
check_offset = partial(_attempt_decrypt_task, database=database, main_key=main_key)
@@ -195,20 +194,20 @@ def _decrypt_crypt14(database: bytes, main_key: bytes, max_worker: int = 10) ->
found = True
break
if found:
logger.info(
f"The offsets of your IV and database are {start_iv} and {start_db}, respectively.{CLEAR_LINE}"
logging.info(
f"The offsets of your IV and database are {start_iv} and {start_db}, respectively."
)
logger.info(
f"To include your offsets in the expoter, please report it in the discussion thread on GitHub:{CLEAR_LINE}"
logging.info(
f"To include your offsets in the exporter, please report it in the discussion thread on GitHub:"
)
logger.info(f"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/discussions/47{CLEAR_LINE}")
logging.info(f"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/discussions/47")
return result
except KeyboardInterrupt:
executor.shutdown(wait=False, cancel_futures=True)
print("\n")
logging.info("")
raise KeyboardInterrupt(
f"Brute force interrupted by user (Ctrl+C). Shutting down gracefully...{CLEAR_LINE}"
f"Brute force interrupted by user (Ctrl+C). Shutting down gracefully..."
)
finally:
@@ -346,7 +345,7 @@ def decrypt_backup(
main_key, hex_key = _derive_main_enc_key(key)
if show_crypt15:
hex_key_str = ' '.join([hex_key.hex()[c:c+4] for c in range(0, len(hex_key.hex()), 4)])
logger.info(f"The HEX key of the crypt15 backup is: {hex_key_str}{CLEAR_LINE}")
logging.info(f"The HEX key of the crypt15 backup is: {hex_key_str}")
else:
main_key = key[126:]

View File

@@ -11,13 +11,13 @@ from markupsafe import escape as htmle
from base64 import b64decode, b64encode
from datetime import datetime
from Whatsapp_Chat_Exporter.data_model import ChatStore, Message
from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, MAX_SIZE, ROW_SIZE, JidType, Device, get_jid_map_join
from Whatsapp_Chat_Exporter.utility import MAX_SIZE, ROW_SIZE, JidType, Device, get_jid_map_join
from Whatsapp_Chat_Exporter.utility import rendering, get_file_name, setup_template, get_cond_for_empty
from Whatsapp_Chat_Exporter.utility import get_status_location, convert_time_unit, get_jid_map_selection
from Whatsapp_Chat_Exporter.utility import get_chat_condition, safe_name, bytes_to_readable, determine_metadata
from Whatsapp_Chat_Exporter.media_timestamp import process_media_with_timestamp
logger = logging.getLogger(__name__)
def contacts(db, data, enrich_from_vcards):
@@ -38,14 +38,14 @@ def contacts(db, data, enrich_from_vcards):
if total_row_number == 0:
if enrich_from_vcards is not None:
logger.info(
"No contacts profiles found in the default database, contacts will be imported from the specified vCard file.\n")
logging.info(
"No contacts profiles found in the default database, contacts will be imported from the specified vCard file.")
else:
logger.warning(
"No contacts profiles found in the default database, consider using --enrich-from-vcards for adopting names from exported contacts from Google\n")
logging.warning(
"No contacts profiles found in the default database, consider using --enrich-from-vcards for adopting names from exported contacts from Google")
return False
else:
logger.info(f"Processed {total_row_number} contacts\n")
logging.info(f"Processed {total_row_number} contacts")
c.execute("SELECT jid, COALESCE(display_name, wa_name) as display_name, status FROM wa_contacts;")
@@ -56,7 +56,7 @@ def contacts(db, data, enrich_from_vcards):
current_chat.status = row["status"]
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} contacts in {convert_time_unit(total_time)}{CLEAR_LINE}")
logging.info(f"Processed {total_row_number} contacts in {convert_time_unit(total_time)}")
return True
@@ -81,7 +81,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
content_cursor = _get_messages_cursor_legacy(c, filter_empty, filter_date, filter_chat)
table_message = False
except sqlite3.OperationalError as e:
logger.debug(f'Got sql error "{e}" in _get_message_cursor_legacy trying fallback.\n')
logging.debug(f'Got sql error "{e}" in _get_message_cursor_legacy trying fallback.\n')
try:
content_cursor = _get_messages_cursor_new(
c,
@@ -101,7 +101,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
pbar.update(1)
total_time = pbar.format_dict['elapsed']
_get_reactions(db, data)
logger.info(f"Processed {total_row_number} messages in {convert_time_unit(total_time)}{CLEAR_LINE}")
logging.info(f"Processed {total_row_number} messages in {convert_time_unit(total_time)}")
# Helper functions for message processing
@@ -127,7 +127,7 @@ def _get_message_count(cursor, filter_empty, filter_date, filter_chat, jid_map_e
{include_filter}
{exclude_filter}""")
except sqlite3.OperationalError as e:
logger.debug(f'Got sql error "{e}" in _get_message_count trying fallback.\n')
logging.debug(f'Got sql error "{e}" in _get_message_count trying fallback.\n')
empty_filter = get_cond_for_empty(filter_empty, "key_remote_jid", "broadcast")
date_filter = f'AND timestamp {filter_date}' if filter_date is not None else ''
@@ -143,6 +143,8 @@ def _get_message_count(cursor, filter_empty, filter_date, filter_chat, jid_map_e
FROM message
LEFT JOIN chat
ON chat._id = message.chat_row_id
INNER JOIN jid
ON jid._id = chat.jid_row_id
INNER JOIN jid jid_global
ON jid_global._id = chat.jid_row_id
LEFT JOIN jid jid_group
@@ -315,8 +317,8 @@ def _fetch_row_safely(cursor):
except sqlite3.OperationalError as e:
# Not sure how often this might happen, but this check should reduce the overhead
# if DEBUG flag is not set.
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'Got sql error "{e}" in _fetch_row_safely ignoring row.\n')
if logging.isEnabledFor(logging.DEBUG):
logging.debug(f'Got sql error "{e}" in _fetch_row_safely ignoring row.\n')
continue
@@ -518,7 +520,7 @@ def _get_reactions(db, data):
if c.fetchone()[0] == 0:
return
logger.info("Processing reactions...\r")
logging.info("Processing reactions...", extra={"clear": True})
c.execute("""
SELECT
@@ -539,7 +541,7 @@ def _get_reactions(db, data):
ON chat.jid_row_id = chat_jid._id
""")
except sqlite3.OperationalError:
logger.warning(f"Could not fetch reactions (schema might be too old or incompatible){CLEAR_LINE}")
logging.warning(f"Could not fetch reactions (schema might be too old or incompatible)")
return
rows = c.fetchall()
@@ -574,10 +576,11 @@ def _get_reactions(db, data):
message.reactions[sender_name] = reaction
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} reactions in {convert_time_unit(total_time)}{CLEAR_LINE}")
logging.info(f"Processed {total_row_number} reactions in {convert_time_unit(total_time)}")
def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=True, fix_dot_files=False):
def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=True, fix_dot_files=False,
embed_exif=False, rename_media=False, timezone_offset=0):
"""
Process WhatsApp media files from the database.
@@ -589,13 +592,17 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa
filter_chat: Chat filter conditions
filter_empty: Filter for empty chats
separate_media: Whether to separate media files by chat
fix_dot_files: Whether to fix media files with leading dot in the name
embed_exif: Whether to embed EXIF timestamp in media files
rename_media: Whether to rename media files with timestamp prefix
timezone_offset: Hours offset from UTC for timestamp formatting
"""
c = db.cursor()
total_row_number = _get_media_count(c, filter_empty, filter_date, filter_chat)
try:
content_cursor = _get_media_cursor_legacy(c, filter_empty, filter_date, filter_chat)
except sqlite3.OperationalError as e:
logger.debug(f'Got sql error "{e}" in _get_media_cursor_legacy trying fallback.\n')
logging.debug(f'Got sql error "{e}" in _get_media_cursor_legacy trying fallback.\n')
content_cursor = _get_media_cursor_new(c, filter_empty, filter_date, filter_chat)
content = content_cursor.fetchone()
@@ -606,10 +613,12 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa
with tqdm(total=total_row_number, desc="Processing media", unit="media", leave=False) as pbar:
while (content := _fetch_row_safely(content_cursor)) is not None:
_process_single_media(data, content, media_folder, mime, separate_media, fix_dot_files)
_process_single_media(data, content, media_folder, mime, separate_media, fix_dot_files,
embed_exif, rename_media, timezone_offset)
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}{CLEAR_LINE}")
logging.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}")
# Helper functions for media processing
@@ -637,7 +646,7 @@ def _get_media_count(cursor, filter_empty, filter_date, filter_chat):
{include_filter}
{exclude_filter}""")
except sqlite3.OperationalError as e:
logger.debug(f'Got sql error "{e}" in _get_media_count trying fallback.\n')
logging.debug(f'Got sql error "{e}" in _get_media_count trying fallback.\n')
empty_filter = get_cond_for_empty(filter_empty, "jid.raw_string", "broadcast")
date_filter = f'AND message.timestamp {filter_date}' if filter_date is not None else ''
include_filter = get_chat_condition(
@@ -754,7 +763,8 @@ def _get_media_cursor_new(cursor, filter_empty, filter_date, filter_chat):
return cursor
def _process_single_media(data, content, media_folder, mime, separate_media, fix_dot_files=False):
def _process_single_media(data, content, media_folder, mime, separate_media, fix_dot_files=False,
embed_exif=False, rename_media=False, timezone_offset=0):
"""Process a single media file."""
file_path = f"{media_folder}/{content['file_path']}"
current_chat = data.get_chat(content["key_remote_jid"])
@@ -790,10 +800,25 @@ def _process_single_media(data, content, media_folder, mime, separate_media, fix
new_folder = os.path.join(media_folder, "separated", chat_display_name)
Path(new_folder).mkdir(parents=True, exist_ok=True)
new_path = os.path.join(new_folder, current_filename)
shutil.copy2(file_path, new_path)
message.data = new_path
# Use timestamp processing if enabled
if embed_exif or rename_media:
final_path = process_media_with_timestamp(
file_path, new_path, message.timestamp,
timezone_offset, embed_exif, rename_media
)
else:
final_path = new_path
shutil.copy2(file_path, final_path)
elif embed_exif or rename_media:
# Handle in-place processing when not separating
# Create a copy with timestamp processing in the same folder
final_path = process_media_with_timestamp(
file_path, file_path, message.timestamp,
timezone_offset, embed_exif, rename_media
)
else:
message.data = file_path
final_path = file_path
message.data = final_path
else:
message.data = "The media is missing"
message.mime = "media"
@@ -814,7 +839,7 @@ def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty):
try:
rows = _execute_vcard_query_modern(c, filter_date, filter_chat, filter_empty)
except sqlite3.OperationalError as e:
logger.debug(f'Got sql error "{e}" in _execute_vcard_query_modern trying fallback.\n')
logging.debug(f'Got sql error "{e}" in _execute_vcard_query_modern trying fallback.\n')
rows = _execute_vcard_query_legacy(c, filter_date, filter_chat, filter_empty)
total_row_number = len(rows)
@@ -828,7 +853,7 @@ def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty):
_process_vcard_row(row, path, data)
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} vCards in {convert_time_unit(total_time)}{CLEAR_LINE}")
logging.info(f"Processed {total_row_number} vCards in {convert_time_unit(total_time)}")
def _execute_vcard_query_modern(c, filter_date, filter_chat, filter_empty):
"""Execute vCard query for modern WhatsApp database schema."""
@@ -931,14 +956,26 @@ def calls(db, data, timezone_offset, filter_chat):
c = db.cursor()
# Check if there are any calls that match the filter
total_row_number = _get_calls_count(c, filter_chat)
# The order matters here, modern query should be attempted first,
# if it fails, we can be pretty sure that legacy one will work,
# but not the other way around. This is because legacy query is
# more simple and less likely to have issues with missing tables/columns.
try:
total_row_number = _get_calls_count_modern(c, filter_chat)
except sqlite3.OperationalError as e:
total_row_number = _get_calls_count_legacy(c, filter_chat)
if total_row_number == 0:
return
logger.info(f"Processing calls...({total_row_number})\r")
logging.info(f"Processing calls...({total_row_number})", extra={"clear": True})
# Fetch call data
calls_data = _fetch_calls_data(c, filter_chat)
# Again, we try modern query first and fallback to legacy if it fails,
# for the same reasons as above.
try:
calls_data = _fetch_calls_data_modern(c, filter_chat)
except sqlite3.OperationalError as e:
calls_data = _fetch_calls_data_legacy(c, filter_chat)
# Create a chat store for all calls
chat = ChatStore(Device.ANDROID, "WhatsApp Calls")
@@ -952,9 +989,31 @@ def calls(db, data, timezone_offset, filter_chat):
# Add the calls chat to the data
data.add_chat("000000000000000", chat)
logger.info(f"Processed {total_row_number} calls in {convert_time_unit(total_time)}{CLEAR_LINE}")
logging.info(f"Processed {total_row_number} calls in {convert_time_unit(total_time)}")
def _get_calls_count(c, filter_chat):
def _get_calls_count_legacy(c, filter_chat):
"""Get the count of call records that match the filter."""
# Build the filter conditions
include_filter = get_chat_condition(filter_chat[0], True, ["key_remote_jid"])
exclude_filter = get_chat_condition(filter_chat[1], False, ["key_remote_jid"])
query = f"""SELECT count(),
jid.raw_string as key_remote_jid
FROM call_log
INNER JOIN jid
ON call_log.jid_row_id = jid._id
LEFT JOIN chat
ON call_log.jid_row_id = chat.jid_row_id
WHERE 1=1
{include_filter}
{exclude_filter}"""
c.execute(query)
return c.fetchone()[0]
def _get_calls_count_modern(c, filter_chat):
"""Get the count of call records that match the filter."""
# Build the filter conditions
@@ -979,7 +1038,36 @@ def _get_calls_count(c, filter_chat):
return c.fetchone()[0]
def _fetch_calls_data(c, filter_chat):
def _fetch_calls_data_legacy(c, filter_chat):
"""Fetch call data from the database."""
# Build the filter conditions
include_filter = get_chat_condition(filter_chat[0], True, ["key_remote_jid"])
exclude_filter = get_chat_condition(filter_chat[1], False, ["key_remote_jid"])
query = f"""SELECT call_log._id,
jid.raw_string as key_remote_jid,
from_me,
call_id,
timestamp,
video_call,
duration,
call_result,
bytes_transferred,
chat.subject as chat_subject
FROM call_log
INNER JOIN jid
ON call_log.jid_row_id = jid._id
LEFT JOIN chat
ON call_log.jid_row_id = chat.jid_row_id
WHERE 1=1
{include_filter}
{exclude_filter}"""
c.execute(query)
return c
def _fetch_calls_data_modern(c, filter_chat):
"""Fetch call data from the database."""
# Build the filter conditions
@@ -1070,6 +1158,10 @@ def _construct_call_description(content, call):
return description
def polls(db, data, date_filter, chat_filter, empty_filter):
"""Placeholder for future polls processing implementation."""
return
# TODO: Marked for enhancement on multi-threaded processing
def create_html(
data,
@@ -1128,7 +1220,7 @@ def create_html(
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Generated {total_row_number} chats in {convert_time_unit(total_time)}{CLEAR_LINE}")
logging.info(f"Generated {total_row_number} chats in {convert_time_unit(total_time)}")
def _generate_single_chat(current_chat, safe_file_name, name, contact, output_folder, template, w3css, headline):
"""Generate a single HTML file for a chat."""

View File

@@ -8,14 +8,14 @@ class Timing:
Handles timestamp formatting with timezone support.
"""
def __init__(self, timezone_offset: Optional[int]) -> None:
def __init__(self, timezone_offset: Optional[Union[int, float]] = None) -> None:
"""
Initialize Timing object.
Args:
timezone_offset (Optional[int]): Hours offset from UTC
timezone_offset (Optional[Union[int, float]]): Hours offset from UTC. Defaults to None (auto-detect).
"""
self.timezone_offset = timezone_offset
self.tz = TimeZone(timezone_offset) if timezone_offset is not None else None
def format_timestamp(self, timestamp: Optional[Union[int, float]], format: str) -> Optional[str]:
"""
@@ -30,7 +30,7 @@ class Timing:
"""
if timestamp is not None:
timestamp = timestamp / 1000 if timestamp > 9999999999 else timestamp
return datetime.fromtimestamp(timestamp, TimeZone(self.timezone_offset)).strftime(format)
return datetime.fromtimestamp(timestamp, self.tz).strftime(format)
return None
@@ -39,12 +39,12 @@ class TimeZone(tzinfo):
Custom timezone class with fixed offset.
"""
def __init__(self, offset: int) -> None:
def __init__(self, offset: Union[int, float]) -> None:
"""
Initialize TimeZone object.
Args:
offset (int): Hours offset from UTC
offset (Union[int, float]): Hours offset from UTC
"""
self.offset = offset
@@ -67,6 +67,7 @@ class ChatCollection(MutableMapping):
"""Initialize an empty chat collection."""
self._chats: Dict[str, ChatStore] = {}
self._system: Dict[str, Any] = {}
self.set_system("master_lookup", {})
def __getitem__(self, key: str) -> 'ChatStore':
"""Get a chat by its ID. Required for dict-like access."""
@@ -100,21 +101,32 @@ class ChatCollection(MutableMapping):
Returns:
Optional['ChatStore']: The chat if found, None otherwise
"""
return self._chats.get(chat_id)
if chat_id in self._chats:
return self._chats[chat_id]
elif chat_id in self.get_system("master_lookup"):
return self._chats[self.get_system("master_lookup")[chat_id]]
else:
return None
def add_chat(self, chat_id: str, chat: 'ChatStore') -> None:
def add_chat(self, chat_id: str, chat: 'ChatStore', alias: Optional[str] = None) -> 'ChatStore':
"""
Add a new chat to the collection.
Args:
chat_id (str): The ID for the chat
chat (ChatStore): The chat to add
alias (Optional[str]): An optional alias to associate with the chat ID
Raises:
TypeError: If chat is not a ChatStore object
"""
if not isinstance(chat, ChatStore):
raise TypeError("Chat must be a ChatStore object")
if chat_id in self._chats:
raise ValueError("Chat ID already exists. Use get_chat to retrieve existing chat.")
if alias:
self.get_system("master_lookup")[alias] = chat_id
chat.aliases.append(alias)
self._chats[chat_id] = chat
return self._chats[chat_id]
@@ -128,6 +140,34 @@ class ChatCollection(MutableMapping):
if chat_id in self._chats:
del self._chats[chat_id]
def add_alias(self, alias: str, chat_id: str) -> bool:
"""
Add or modify an alias for a chat.
Args:
alias (str): The alias to add
chat_id (str): The ID of the chat to associate the alias with
"""
if chat_id not in self._chats:
raise ValueError("Chat ID does not exist. Add chat first.")
self.get_system("master_lookup")[alias] = chat_id
return True
def remove_alias(self, alias: str) -> bool:
"""
Remove an alias.
Args:
alias (str): The alias to remove
"""
if alias in self.get_system("master_lookup"):
self._chats[self.get_system("master_lookup")[alias]].aliases.remove(alias)
del self.get_system("master_lookup")[alias]
return True
return False
def items(self):
"""Get chat items (id, chat) pairs."""
return self._chats.items()
@@ -208,6 +248,7 @@ class ChatStore:
self.their_avatar_thumb = None
self.status = None
self.media_base = ""
self.aliases = []
def __len__(self) -> int:
"""Get number of chats. Required for dict-like access."""
@@ -361,6 +402,7 @@ class Message:
self.thumb = None # Android specific
self.sticker = False
self.reactions = {}
self.poll = None
def to_json(self) -> Dict[str, Any]:
"""Convert message to JSON-serializable dict."""

View File

@@ -6,10 +6,9 @@ from datetime import datetime
from mimetypes import MimeTypes
from tqdm import tqdm
from Whatsapp_Chat_Exporter.data_model import ChatStore, Message
from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, Device, convert_time_unit
from Whatsapp_Chat_Exporter.utility import Device, convert_time_unit
logger = logging.getLogger(__name__)
def messages(path, data, assume_first_as_me=False):
@@ -43,7 +42,7 @@ def messages(path, data, assume_first_as_me=False):
)
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} messages & media in {convert_time_unit(total_time)}{CLEAR_LINE}")
logging.info(f"Processed {total_row_number} messages & media in {convert_time_unit(total_time)}")
return data

View File

@@ -1,5 +1,6 @@
#!/usr/bin/python3
import json
import os
import logging
import shutil
@@ -9,33 +10,51 @@ from pathlib import Path
from mimetypes import MimeTypes
from markupsafe import escape as htmle
from Whatsapp_Chat_Exporter.data_model import ChatStore, Message
from Whatsapp_Chat_Exporter.utility import APPLE_TIME, CLEAR_LINE, get_chat_condition, Device
from Whatsapp_Chat_Exporter.utility import APPLE_TIME, get_chat_condition, Device
from Whatsapp_Chat_Exporter.utility import bytes_to_readable, convert_time_unit, safe_name
logger = logging.getLogger(__name__)
from Whatsapp_Chat_Exporter.poll import decode_poll_from_receipt_blob
from Whatsapp_Chat_Exporter.media_timestamp import process_media_with_timestamp
def contacts(db, data):
"""Process WhatsApp contacts with status information."""
"""Process WhatsApp contacts with name and status information."""
c = db.cursor()
c.execute("""SELECT count() FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""")
c.execute("""SELECT count() FROM ZWAADDRESSBOOKCONTACT""")
total_row_number = c.fetchone()[0]
logger.info(f"Pre-processing contacts...({total_row_number})\r")
logging.info(f"Pre-processing contacts...({total_row_number})", extra={"clear": True})
c.execute("""SELECT ZWHATSAPPID, ZABOUTTEXT FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""")
# Check if expected columns exist before querying,
# to handle different WhatsApp versions (mainly ZLID).
c.execute("PRAGMA table_info(ZWAADDRESSBOOKCONTACT)")
column_names = [info[1] for info in c.fetchall()]
all_cols = ["ZWHATSAPPID", "ZLID", "ZFULLNAME", "ZABOUTTEXT"]
columns = [col for col in all_cols if col in column_names]
c.execute(f"""SELECT {', '.join(columns)} FROM ZWAADDRESSBOOKCONTACT""")
with tqdm(total=total_row_number, desc="Processing contacts", unit="contact", leave=False) as pbar:
while (content := c.fetchone()) is not None:
zwhatsapp_id = content["ZWHATSAPPID"]
if zwhatsapp_id is None:
pbar.update(1)
continue
if not zwhatsapp_id.endswith("@s.whatsapp.net"):
zwhatsapp_id += "@s.whatsapp.net"
current_chat = ChatStore(Device.IOS)
current_chat.status = content["ZABOUTTEXT"]
data.add_chat(zwhatsapp_id, current_chat)
if content["ZFULLNAME"]:
current_chat.name = content["ZFULLNAME"]
if content["ZABOUTTEXT"]:
current_chat.status = content["ZABOUTTEXT"]
# Index by WhatsApp ID, with LID as alias if available
data.add_chat(
zwhatsapp_id,
current_chat,
content["ZLID"] if "ZLID" in columns and content["ZLID"] else None
)
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Pre-processed {total_row_number} contacts in {convert_time_unit(total_time)}{CLEAR_LINE}")
logging.info(f"Pre-processed {total_row_number} contacts in {convert_time_unit(total_time)}")
def process_contact_avatars(current_chat, media_folder, contact_id):
@@ -125,14 +144,30 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
current_chat = data.add_chat(contact_id, ChatStore(Device.IOS, contact_name, media_folder))
else:
current_chat = data.get_chat(contact_id)
current_chat.name = contact_name
# Only overwrite name if we have a better one (not a phone number)
# or if there's no existing name
if current_chat.name is None or contact_name is not None:
is_phone = contact_name.replace("+", "").replace(" ", "").isdigit() if contact_name else True
if not is_phone or current_chat.name is None:
current_chat.name = contact_name
current_chat.my_avatar = os.path.join(media_folder, "Media/Profile/Photo.jpg")
# Process avatar images
process_contact_avatars(current_chat, media_folder, contact_id)
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} contacts in {convert_time_unit(total_time)}{CLEAR_LINE}")
logging.info(f"Processed {total_row_number} contacts in {convert_time_unit(total_time)}")
# Pre-load push names for JIDs not yet in data (especially @lid group members)
c.execute("""SELECT ZJID, ZPUSHNAME FROM ZWAPROFILEPUSHNAME WHERE ZPUSHNAME IS NOT NULL""")
while (row := c.fetchone()) is not None:
jid = row["ZJID"]
if jid not in data:
push_chat = ChatStore(Device.IOS)
push_chat.name = row["ZPUSHNAME"]
data.add_chat(jid, push_chat)
elif data.get_chat(jid).name is None:
data.get_chat(jid).name = row["ZPUSHNAME"]
# Get message count
message_count_query = f"""
@@ -149,7 +184,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
"""
c.execute(message_count_query)
total_row_number = c.fetchone()[0]
logger.info(f"Processing messages...(0/{total_row_number})\r")
logging.info(f"Processing messages...(0/{total_row_number})", extra={"clear": True})
# Fetch messages
messages_query = f"""
@@ -226,7 +261,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} messages in {convert_time_unit(total_time)}{CLEAR_LINE}")
logging.info(f"Processed {total_row_number} messages in {convert_time_unit(total_time)}")
def process_message_data(message, content, is_group_message, data, message_map, no_reply):
@@ -249,7 +284,7 @@ def process_message_data(message, content, is_group_message, data, message_map,
# Handle metadata messages
if content["ZMESSAGETYPE"] == 6:
return process_metadata_message(message, content, is_group_message)
return process_metadata_message(message, content, is_group_message, data)
# Handle quoted replies
if content["ZMETADATA"] is not None and content["ZMETADATA"].startswith(b"\x2a\x14") and not no_reply:
@@ -257,6 +292,15 @@ def process_message_data(message, content, is_group_message, data, message_map,
message.reply = quoted.decode()
message.quoted_data = message_map.get(message.reply)
# Skip poll vote update messages (type 66)
if content["ZMESSAGETYPE"] == 66:
return True # Invalid, skip
# Handle poll messages (type 46) - will be enriched by polls() later
if content["ZMESSAGETYPE"] == 46:
message.data = "\U0001f4ca Poll"
return False # Valid, populated later by polls()
# Handle stickers
if content["ZMESSAGETYPE"] == 15:
message.sticker = True
@@ -267,21 +311,51 @@ def process_message_data(message, content, is_group_message, data, message_map,
return False # Message is valid
def process_metadata_message(message, content, is_group_message):
def _parse_group_action(ztext, data):
if ztext.endswith("@lid") or ztext.endswith("@s.whatsapp.net"):
# This is likely a group member change action
# Not really sure actually
name = None
if ztext in data:
name = data.get_chat(ztext).name
if "@" in ztext:
fallback = ztext.split('@')[0]
else:
fallback = None
entity = name or fallback
return f"{entity} join the group"
elif ztext.startswith("{") and ztext.endswith("}"):
try:
metadata = json.loads(ztext)
except json.JSONDecodeError:
return ztext # Not a JSON string, return as-is
entity = metadata.get('author', 'Someone')
if entity is not "Someone":
name = None
if entity in data:
name = data.get_chat(entity).name
if "@" in entity:
fallback = entity.split('@')[0]
else:
fallback = None
entity = name or fallback
return f"{entity} changed the group name to {metadata.get('subject', 'Unknown')}."
elif ztext == "admin_add":
return f"The administrator has restricted participant additions to admins only."
else:
return "Unsupported WhatsApp internal message."
def process_metadata_message(message, content, is_group_message, data):
"""Process metadata messages (action_type 6)."""
if is_group_message:
# Group
if content["ZTEXT"] is not None:
# Changed name
try:
int(content["ZTEXT"])
except ValueError:
msg = f"The group name changed to {content['ZTEXT']}"
message.data = msg
message.meta = True
return False # Valid message
else:
return True # Invalid message
message.data = _parse_group_action(content["ZTEXT"], data)
message.meta = True
return False
else:
message.data = None
return False
@@ -312,7 +386,8 @@ def process_message_text(message, content):
message.data = msg
def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=False, fix_dot_files=False):
def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=False, fix_dot_files=False,
embed_exif=False, rename_media=False, timezone_offset=0):
"""Process media files from WhatsApp messages."""
c = db.cursor()
@@ -340,7 +415,7 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa
"""
c.execute(media_count_query)
total_row_number = c.fetchone()[0]
logger.info(f"Processing media...(0/{total_row_number})\r")
logging.info(f"Processing media...(0/{total_row_number})", extra={"clear": True})
# Fetch media items
media_query = f"""
@@ -370,13 +445,15 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa
mime = MimeTypes()
with tqdm(total=total_row_number, desc="Processing media", unit="media", leave=False) as pbar:
while (content := c.fetchone()) is not None:
process_media_item(content, data, media_folder, mime, separate_media, fix_dot_files)
process_media_item(content, data, media_folder, mime, separate_media, fix_dot_files,
embed_exif, rename_media, timezone_offset)
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}{CLEAR_LINE}")
logging.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}")
def process_media_item(content, data, media_folder, mime, separate_media, fix_dot_files=False):
def process_media_item(content, data, media_folder, mime, separate_media, fix_dot_files=False,
embed_exif=False, rename_media=False, timezone_offset=0):
"""Process a single media item."""
file_path = f"{media_folder}/Message/{content['ZMEDIALOCALPATH']}"
current_chat = data.get_chat(content["ZCONTACTJID"])
@@ -412,10 +489,24 @@ def process_media_item(content, data, media_folder, mime, separate_media, fix_do
new_folder = os.path.join(media_folder, "separated", chat_display_name)
Path(new_folder).mkdir(parents=True, exist_ok=True)
new_path = os.path.join(new_folder, current_filename)
shutil.copy2(file_path, new_path)
message.data = '/'.join(new_path.split("/")[1:])
# Use timestamp processing if enabled
if embed_exif or rename_media:
final_path = process_media_with_timestamp(
file_path, new_path, message.timestamp,
timezone_offset, embed_exif, rename_media
)
else:
final_path = new_path
shutil.copy2(file_path, final_path)
elif embed_exif or rename_media:
# Handle in-place processing when not separating
final_path = process_media_with_timestamp(
file_path, file_path, message.timestamp,
timezone_offset, embed_exif, rename_media
)
else:
message.data = '/'.join(file_path.split("/")[1:])
final_path = file_path
message.data = os.path.join(*final_path.split(os.sep)[1:])
else:
# Handle missing media
message.data = "The media is missing"
@@ -462,7 +553,7 @@ def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty):
c.execute(vcard_query)
contents = c.fetchall()
total_row_number = len(contents)
logger.info(f"Processing vCards...(0/{total_row_number})\r")
logging.info(f"Processing vCards...(0/{total_row_number})", extra={"clear": True})
# Create vCards directory
path = f'{media_folder}/Message/vCards'
@@ -474,7 +565,7 @@ def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty):
process_vcard_item(content, path, data)
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Processed {total_row_number} vCards in {convert_time_unit(total_time)}{CLEAR_LINE}")
logging.info(f"Processed {total_row_number} vCards in {convert_time_unit(total_time)}")
def process_vcard_item(content, path, data):
@@ -566,7 +657,7 @@ def calls(db, data, timezone_offset, filter_chat):
# Add calls chat to data
data.add_chat("000000000000000", chat)
logger.info(f"Processed {total_row_number} calls in {convert_time_unit(total_time)}{CLEAR_LINE}")
logging.info(f"Processed {total_row_number} calls in {convert_time_unit(total_time)}")
def process_call_record(content, chat, data, timezone_offset):
@@ -597,6 +688,187 @@ def process_call_record(content, chat, data, timezone_offset):
chat.add_message(call.key_id, call)
def _resolve_voter_name(voter_jid, is_creator, message, data):
"""Resolve a voter JID to a display name.
Args:
voter_jid (str or None): The voter's JID (often LID format like '123@lid').
is_creator (bool): Whether this voter is the poll creator.
message (Message): The poll message object.
data (ChatCollection): The chat data collection for name lookups.
Returns:
str: The resolved display name.
"""
if voter_jid is None:
if is_creator:
# Field 6 in the protobuf is always the device owner's vote,
# not the poll message sender's vote
return "You"
return "Unknown"
# Try direct lookup in data
if voter_jid in data:
chat = data.get_chat(voter_jid)
if chat is not None and chat.name:
return chat.name
# Try with @s.whatsapp.net suffix
if "@" not in voter_jid:
jid_with_suffix = f"{voter_jid}@s.whatsapp.net"
if jid_with_suffix in data:
chat = data.get_chat(jid_with_suffix)
if chat is not None and chat.name:
return chat.name
# Fallback: strip domain part
if "@" in voter_jid:
return voter_jid.split("@")[0]
return voter_jid
def polls(db, data, filter_date, filter_chat, filter_empty):
"""Process WhatsApp poll messages (type 46) from the database.
Queries ZWAMESSAGEINFO.ZRECEIPTINFO for poll messages, decodes the
protobuf blobs, and enriches the corresponding Message objects with
structured poll data.
Args:
db: SQLite database connection.
data (ChatCollection): The chat data collection.
filter_date: Date filter SQL fragment or None.
filter_chat: Tuple of (include_filter, exclude_filter).
filter_empty: Whether to filter empty chats.
"""
c = db.cursor()
# Build filter conditions
chat_filter_include = get_chat_condition(
filter_chat[0], True, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")
chat_filter_exclude = get_chat_condition(
filter_chat[1], False, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")
date_filter = f'AND ZWAMESSAGE.ZMESSAGEDATE {filter_date}' if filter_date is not None else ''
# Count poll messages
count_query = f"""
SELECT count()
FROM ZWAMESSAGE
JOIN ZWAMESSAGEINFO ON ZWAMESSAGEINFO.ZMESSAGE = ZWAMESSAGE.Z_PK
INNER JOIN ZWACHATSESSION
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
LEFT JOIN ZWAGROUPMEMBER
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
WHERE ZWAMESSAGE.ZMESSAGETYPE = 46
AND ZWAMESSAGEINFO.ZRECEIPTINFO IS NOT NULL
{date_filter}
{chat_filter_include}
{chat_filter_exclude}
"""
c.execute(count_query)
total_row_number = c.fetchone()[0]
if total_row_number == 0:
return
logging.info(f"Processing polls...(0/{total_row_number})", extra={"clear": True})
# Fetch poll data
poll_query = f"""
SELECT ZWACHATSESSION.ZCONTACTJID,
ZWAMESSAGE.Z_PK AS ZMESSAGE,
ZWAMESSAGEINFO.ZRECEIPTINFO
FROM ZWAMESSAGE
JOIN ZWAMESSAGEINFO ON ZWAMESSAGEINFO.ZMESSAGE = ZWAMESSAGE.Z_PK
INNER JOIN ZWACHATSESSION
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
LEFT JOIN ZWAGROUPMEMBER
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
WHERE ZWAMESSAGE.ZMESSAGETYPE = 46
AND ZWAMESSAGEINFO.ZRECEIPTINFO IS NOT NULL
{date_filter}
{chat_filter_include}
{chat_filter_exclude}
ORDER BY ZWAMESSAGE.ZMESSAGEDATE ASC
"""
c.execute(poll_query)
with tqdm(total=total_row_number, desc="Processing polls", unit="poll", leave=False) as pbar:
while (content := c.fetchone()) is not None:
contact_id = content["ZCONTACTJID"]
message_pk = content["ZMESSAGE"]
receipt_blob = content["ZRECEIPTINFO"]
current_chat = data.get_chat(contact_id)
if current_chat is None:
pbar.update(1)
continue
message = current_chat.get_message(message_pk)
if message is None:
pbar.update(1)
continue
try:
poll_data = decode_poll_from_receipt_blob(receipt_blob)
except Exception as e:
logging.warning(f"Failed to decode poll {message_pk}: {e}")
pbar.update(1)
continue
if poll_data is None:
pbar.update(1)
continue
# Build structured poll result with vote tallies
options = poll_data['options']
votes = poll_data['votes']
# Tally votes per option
option_votes = {i: [] for i in range(len(options))}
seen_voters = set()
for vote in votes:
voter_name = _resolve_voter_name(
vote.get('voter_jid'), vote.get('is_creator', False), message, data)
voter_key = vote.get('voter_jid') or ("__creator__" if vote.get('is_creator') else "__unknown__")
if voter_key not in seen_voters:
seen_voters.add(voter_key)
for idx in vote.get('selected_indices', []):
if 0 <= idx < len(options):
option_votes[idx].append(voter_name)
# Find max vote count for percentage calculation
max_votes = max((len(v) for v in option_votes.values()), default=0)
# Build option list with tallies
option_list = []
for i, opt_text in enumerate(options):
voters = option_votes.get(i, [])
vote_count = len(voters)
vote_pct = (vote_count / max_votes * 100) if max_votes > 0 else 0
option_list.append({
'text': opt_text,
'vote_count': vote_count,
'vote_pct': vote_pct,
'voters': voters,
})
total_voters = len(seen_voters)
# Set poll data on message
message.poll = {
'type': 'poll',
'question': poll_data['question'],
'options': option_list,
'total_voters': total_voters,
}
message.data = f"\U0001f4ca {poll_data['question']}"
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logging.info(f"Processed {total_row_number} polls in {convert_time_unit(total_time)}")
def format_call_data(call, content):
"""Format call data message based on call attributes."""
# Basic call info

View File

@@ -8,7 +8,7 @@ import getpass
from sys import exit, platform as osname
import sys
from tqdm import tqdm
from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, WhatsAppIdentifier, convert_time_unit
from Whatsapp_Chat_Exporter.utility import WhatsAppIdentifier, convert_time_unit
from Whatsapp_Chat_Exporter.bplist import BPListReader
try:
from iphone_backup_decrypt import EncryptedBackup, RelativePath
@@ -18,7 +18,6 @@ else:
support_encrypted = True
logger = logging.getLogger(__name__)
class BackupExtractor:
@@ -60,7 +59,7 @@ class BackupExtractor:
return False
except sqlite3.DatabaseError as e:
if str(e) == "authorization denied" and osname == "darwin":
logger.error(
logging.error(
"You don't have permission to access the backup database. Please"
"check your permissions or try moving the backup to somewhere else."
)
@@ -73,13 +72,13 @@ class BackupExtractor:
Handles the extraction of data from an encrypted iOS backup.
"""
if not support_encrypted:
logger.error("You don't have the dependencies to handle encrypted backup."
logging.error("You don't have the dependencies to handle encrypted backup."
"Read more on how to deal with encrypted backup:"
"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/blob/main/README.md#usage"
)
return
logger.info(f"Encryption detected on the backup!{CLEAR_LINE}")
logging.info(f"Encryption detected on the backup!")
password = getpass.getpass("Enter the password for the backup:")
sys.stdout.write("\033[F\033[K")
sys.stdout.flush()
@@ -93,7 +92,7 @@ class BackupExtractor:
Args:
password (str): The password for the encrypted backup.
"""
logger.info(f"Trying to open the iOS backup...{CLEAR_LINE}")
logging.info(f"Trying to open the iOS backup...")
self.backup = EncryptedBackup(
backup_directory=self.base_dir,
passphrase=password,
@@ -101,8 +100,8 @@ class BackupExtractor:
check_same_thread=False,
decrypt_chunk_size=self.decrypt_chunk_size,
)
logger.info(f"iOS backup is opened successfully{CLEAR_LINE}")
logger.info("Decrypting WhatsApp database...\r")
logging.info(f"iOS backup is opened successfully")
logging.info("Decrypting WhatsApp database...", extra={"clear": True})
try:
self.backup.extract_file(
relative_path=RelativePath.WHATSAPP_MESSAGES,
@@ -120,17 +119,17 @@ class BackupExtractor:
output_filename=self.identifiers.CALL,
)
except ValueError:
logger.error("Failed to decrypt backup: incorrect password?")
logging.error("Failed to decrypt backup: incorrect password?")
exit(7)
except FileNotFoundError:
logger.error(
logging.error(
"Essential WhatsApp files are missing from the iOS backup. "
"Perhapse you enabled end-to-end encryption for the backup? "
"See https://wts.knugi.dev/docs.html?dest=iose2e"
)
exit(6)
else:
logger.info(f"WhatsApp database decrypted successfully{CLEAR_LINE}")
logging.info(f"WhatsApp database decrypted successfully")
def _extract_decrypted_files(self):
"""Extract all WhatsApp files after decryption"""
@@ -150,7 +149,7 @@ class BackupExtractor:
)
total_time = pbar.format_dict['elapsed']
pbar.close()
logger.info(f"All required files are decrypted and extracted in {convert_time_unit(total_time)}{CLEAR_LINE}")
logging.info(f"All required files are decrypted and extracted in {convert_time_unit(total_time)}")
def _extract_unencrypted_backup(self):
"""
@@ -169,10 +168,10 @@ class BackupExtractor:
if not os.path.isfile(wts_db_path):
if self.identifiers is WhatsAppIdentifier:
logger.error("WhatsApp database not found.")
logging.error("WhatsApp database not found.")
else:
logger.error("WhatsApp Business database not found.")
logger.error(
logging.error("WhatsApp Business database not found.")
logging.error(
"Essential WhatsApp files are missing from the iOS backup. "
"Perhapse you enabled end-to-end encryption for the backup? "
"See https://wts.knugi.dev/docs.html?dest=iose2e"
@@ -182,12 +181,12 @@ class BackupExtractor:
shutil.copyfile(wts_db_path, self.identifiers.MESSAGE)
if not os.path.isfile(contact_db_path):
logger.warning(f"Contact database not found. Skipping...{CLEAR_LINE}")
logging.warning(f"Contact database not found. Skipping...")
else:
shutil.copyfile(contact_db_path, self.identifiers.CONTACT)
if not os.path.isfile(call_db_path):
logger.warning(f"Call database not found. Skipping...{CLEAR_LINE}")
logging.warning(f"Call database not found. Skipping...")
else:
shutil.copyfile(call_db_path, self.identifiers.CALL)
@@ -236,7 +235,7 @@ class BackupExtractor:
os.utime(destination, (modification, modification))
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Extracted {total_row_number} WhatsApp files in {convert_time_unit(total_time)}{CLEAR_LINE}")
logging.info(f"Extracted {total_row_number} WhatsApp files in {convert_time_unit(total_time)}")
def extract_media(base_dir, identifiers, decrypt_chunk_size):

View File

@@ -0,0 +1,200 @@
"""
Media timestamp utilities for embedding EXIF data and renaming files.
"""
import os
import logging
import shutil
from datetime import datetime
from typing import Optional
from Whatsapp_Chat_Exporter.data_model import TimeZone
logger = logging.getLogger(__name__)
# Optional imports for EXIF support
try:
import piexif
from PIL import Image
HAS_EXIF_SUPPORT = True
except ImportError:
HAS_EXIF_SUPPORT = False
def format_timestamp_for_filename(timestamp: float, timezone_offset: int = 0) -> str:
"""
Format a Unix timestamp for use in filenames.
Args:
timestamp: Unix timestamp (seconds)
timezone_offset: Hours offset from UTC
Returns:
Formatted string: YYYY-MM-DD_HH-MM-SS
"""
dt = datetime.fromtimestamp(timestamp, TimeZone(timezone_offset))
return dt.strftime("%Y-%m-%d_%H-%M-%S")
def format_timestamp_for_exif(timestamp: float, timezone_offset: int = 0) -> str:
"""
Format a Unix timestamp for EXIF DateTime fields.
Args:
timestamp: Unix timestamp (seconds)
timezone_offset: Hours offset from UTC
Returns:
Formatted string: YYYY:MM:DD HH:MM:SS (EXIF format)
"""
dt = datetime.fromtimestamp(timestamp, TimeZone(timezone_offset))
return dt.strftime("%Y:%m:%d %H:%M:%S")
def generate_timestamped_filename(
original_path: str,
timestamp: float,
timezone_offset: int = 0
) -> str:
"""
Generate a new filename with timestamp prefix.
Args:
original_path: Original file path
timestamp: Unix timestamp (seconds)
timezone_offset: Hours offset from UTC
Returns:
New filename with format: YYYY-MM-DD_HH-MM-SS_original-name.ext
"""
directory = os.path.dirname(original_path)
original_name = os.path.basename(original_path)
timestamp_prefix = format_timestamp_for_filename(timestamp, timezone_offset)
new_name = f"{timestamp_prefix}_{original_name}"
return os.path.join(directory, new_name)
def embed_exif_timestamp(
file_path: str,
timestamp: float,
timezone_offset: int = 0
) -> bool:
"""
Embed timestamp in EXIF data for supported image formats.
Args:
file_path: Path to the image file
timestamp: Unix timestamp (seconds)
timezone_offset: Hours offset from UTC
Returns:
True if successful, False otherwise
"""
if not HAS_EXIF_SUPPORT:
logger.warning("EXIF support not available. Install piexif and Pillow.")
return False
# Check file extension
ext = os.path.splitext(file_path)[1].lower()
if ext not in ('.jpg', '.jpeg', '.tiff', '.tif'):
logger.debug(f"EXIF embedding not supported for {ext} files: {file_path}")
return False
try:
exif_datetime = format_timestamp_for_exif(timestamp, timezone_offset)
exif_datetime_bytes = exif_datetime.encode('utf-8')
# Try to load existing EXIF data
try:
exif_dict = piexif.load(file_path)
except Exception:
# No existing EXIF, create empty structure
exif_dict = {"0th": {}, "Exif": {}, "GPS": {}, "1st": {}, "thumbnail": None}
# Set DateTime fields in Exif IFD
exif_dict["Exif"][piexif.ExifIFD.DateTimeOriginal] = exif_datetime_bytes
exif_dict["Exif"][piexif.ExifIFD.DateTimeDigitized] = exif_datetime_bytes
# Set DateTime in 0th IFD (basic TIFF tag)
exif_dict["0th"][piexif.ImageIFD.DateTime] = exif_datetime_bytes
# Dump and insert EXIF data
exif_bytes = piexif.dump(exif_dict)
piexif.insert(exif_bytes, file_path)
return True
except Exception as e:
logger.warning(f"Failed to embed EXIF in {file_path}: {e}")
return False
def _handle_duplicate_filename(file_path: str) -> str:
"""
Generate a unique filename by appending a counter if file exists.
Args:
file_path: Original file path
Returns:
Unique file path with counter appended if necessary
"""
if not os.path.exists(file_path):
return file_path
base, ext = os.path.splitext(file_path)
counter = 1
while os.path.exists(file_path):
file_path = f"{base}_{counter}{ext}"
counter += 1
return file_path
def process_media_with_timestamp(
source_path: str,
dest_path: str,
timestamp: Optional[float],
timezone_offset: int = 0,
embed_exif: bool = False,
rename_media: bool = False
) -> str:
"""
Process a media file with optional timestamp embedding and renaming.
Args:
source_path: Source file path
dest_path: Destination file path (may be modified if renaming)
timestamp: Unix timestamp (seconds), or None if unavailable
timezone_offset: Hours offset from UTC
embed_exif: Whether to embed EXIF timestamp
rename_media: Whether to rename file with timestamp prefix
Returns:
Final destination path (may differ from dest_path if renamed)
"""
# If no timestamp available, just copy
if timestamp is None:
if source_path != dest_path:
logger.warning(f"No timestamp available for {source_path}, skipping timestamp operations")
shutil.copy2(source_path, dest_path)
return dest_path
# Determine final path
final_path = dest_path
if rename_media:
final_path = generate_timestamped_filename(dest_path, timestamp, timezone_offset)
# Handle duplicate filenames
if os.path.exists(final_path) and final_path != source_path:
final_path = _handle_duplicate_filename(final_path)
# Copy file to destination
shutil.copy2(source_path, final_path)
# Embed EXIF if requested
if embed_exif:
embed_exif_timestamp(final_path, timestamp, timezone_offset)
return final_path

View File

@@ -0,0 +1,190 @@
"""
WhatsApp Poll decoder for iOS/macOS.
Decodes poll messages (ZMESSAGETYPE = 46) stored as protobuf blobs
in ZWAMESSAGEINFO.ZRECEIPTINFO. Uses raw varint/wire-type parsing
with no external protobuf library dependency.
"""
import struct
import logging
def _decode_varint(data, pos):
"""Decode a protobuf varint starting at pos.
Args:
data (bytes): The protobuf data.
pos (int): Starting position.
Returns:
tuple: (value, new_pos)
Raises:
ValueError: If the varint is truncated.
"""
result = 0
shift = 0
while pos < len(data):
b = data[pos]
pos += 1
result |= (b & 0x7F) << shift
if not (b & 0x80):
return result, pos
shift += 7
raise ValueError("Truncated varint")
def decode_protobuf_fields(data):
"""
Decode raw protobuf bytes into list of (field_number, wire_type_name, value).
Handles: varint (0), fixed64 (1), length-delimited/bytes (2), fixed32 (5).
Args:
data (bytes): Raw protobuf data.
Returns:
list: List of (field_number, wire_type_name, value) tuples.
"""
fields = []
pos = 0
while pos < len(data):
try:
tag, pos = _decode_varint(data, pos)
field_num = tag >> 3
wire_type = tag & 0x7
if wire_type == 0: # varint
val, pos = _decode_varint(data, pos)
fields.append((field_num, 'varint', val))
elif wire_type == 2: # length-delimited
length, pos = _decode_varint(data, pos)
val = data[pos:pos + length]
pos += length
fields.append((field_num, 'bytes', val))
elif wire_type == 5: # fixed32
val = struct.unpack('<I', data[pos:pos + 4])[0]
pos += 4
fields.append((field_num, 'fixed32', val))
elif wire_type == 1: # fixed64
val = struct.unpack('<Q', data[pos:pos + 8])[0]
pos += 8
fields.append((field_num, 'fixed64', val))
else:
break # Unknown wire type, stop parsing
except Exception:
break
return fields
def _decode_vote_record(data):
"""Decode a single vote record sub-message.
Args:
data (bytes): Raw protobuf data for a vote record.
Returns:
dict or None: Vote record with 'voter_jid' and 'selected_indices',
or None if the record is empty.
"""
fields = decode_protobuf_fields(data)
selected_indices = []
voter_jid = None
for fn, wt, val in fields:
if fn == 1 and wt == 'varint':
selected_indices.append(val)
elif fn == 4 and wt == 'bytes':
try:
voter_jid = val.decode('utf-8')
except Exception:
voter_jid = val.hex()
if not voter_jid and not selected_indices:
return None
return {
'voter_jid': voter_jid,
'selected_indices': selected_indices,
}
def decode_poll_from_receipt_blob(receipt_blob):
"""
Decode a WhatsApp poll from the ZWAMESSAGEINFO.ZRECEIPTINFO protobuf blob.
The blob has a top-level structure where field 8 contains the poll content.
The poll content has: question (field 2), options (field 3 repeated),
other voters (field 5 repeated), and creator vote (field 6).
Args:
receipt_blob (bytes): The ZRECEIPTINFO protobuf blob.
Returns:
dict or None: Decoded poll data with keys:
question (str): The poll question text
options (list[str]): The poll option texts, in order
votes (list[dict]): Each vote has:
voter_jid (str|None): Voter's JID (LID format)
selected_indices (list[int]): 0-based indices into options
is_creator (bool): True if this is the poll creator's vote
Returns None if the blob does not contain a valid poll.
"""
if not receipt_blob:
return None
top_fields = decode_protobuf_fields(receipt_blob)
# Find the poll content in field 8
poll_content = None
for fn, wt, val in top_fields:
if fn == 8 and wt == 'bytes':
poll_content = val
break
if not poll_content:
return None
poll_fields = decode_protobuf_fields(poll_content)
# Extract question (field 2, first string)
question = None
for fn, wt, val in poll_fields:
if fn == 2 and wt == 'bytes':
try:
question = val.decode('utf-8')
except Exception:
question = repr(val)
break
if not question:
return None
# Extract options (field 3, repeated)
options = []
for fn, wt, val in poll_fields:
if fn == 3 and wt == 'bytes':
option_fields = decode_protobuf_fields(val)
for ofn, owt, oval in option_fields:
if ofn == 1 and owt == 'bytes':
try:
options.append(oval.decode('utf-8'))
except Exception:
options.append(repr(oval))
break
# Extract votes: field 5 = other participants, field 6 = creator
votes = []
for fn, wt, val in poll_fields:
if fn in (5, 6) and wt == 'bytes':
vote = _decode_vote_record(val)
if vote:
vote['is_creator'] = (fn == 6)
votes.append(vote)
return {
'question': question,
'options': options,
'votes': votes,
}

View File

@@ -13,7 +13,7 @@ from datetime import datetime, timedelta
from enum import IntEnum
from tqdm import tqdm
from Whatsapp_Chat_Exporter.data_model import ChatCollection, ChatStore, Timing
from typing import Dict, List, Optional, Tuple, Union
from typing import Dict, List, Optional, Tuple, Union, Any
try:
from enum import StrEnum, IntEnum
except ImportError:
@@ -30,9 +30,7 @@ except ImportError:
MAX_SIZE = 4 * 1024 * 1024 # Default 4MB
ROW_SIZE = 0x3D0
CURRENT_TZ_OFFSET = datetime.now().astimezone().utcoffset().seconds / 3600
CLEAR_LINE = "\x1b[K\n"
logger = logging.getLogger(__name__)
def convert_time_unit(time_second: int) -> str:
@@ -159,39 +157,40 @@ def determine_day(last: int, current: int) -> Optional[datetime.date]:
return current
def check_update():
def check_update(include_beta: bool = False) -> int:
import urllib.request
import json
import importlib
from sys import platform
from packaging import version
PACKAGE_JSON = "https://pypi.org/pypi/whatsapp-chat-exporter/json"
try:
raw = urllib.request.urlopen(PACKAGE_JSON)
except Exception:
logger.error("Failed to check for updates.")
logging.error("Failed to check for updates.")
return 1
else:
with raw:
package_info = json.load(raw)
latest_version = tuple(
map(int, package_info["info"]["version"].split(".")))
__version__ = importlib.metadata.version("whatsapp_chat_exporter")
current_version = tuple(map(int, __version__.split(".")))
if include_beta:
all_versions = [version.parse(v) for v in package_info["releases"].keys()]
latest_version = max(all_versions, key=lambda v: (v.release, v.pre))
else:
latest_version = version.parse(package_info["info"]["version"])
current_version = version.parse(importlib.metadata.version("whatsapp_chat_exporter"))
if current_version < latest_version:
logger.info(
logging.info(
"===============Update===============\n"
"A newer version of WhatsApp Chat Exporter is available.\n"
f"Current version: {__version__}\n"
f"Latest version: {package_info['info']['version']}\n"
f"Current version: {current_version}\n"
f"Latest version: {latest_version}"
)
if platform == "win32":
logger.info("Update with: pip install --upgrade whatsapp-chat-exporter\n")
else:
logger.info("Update with: pip3 install --upgrade whatsapp-chat-exporter\n")
logger.info("====================================\n")
pip_cmd = "pip" if platform == "win32" else "pip3"
logging.info(f"Update with: {pip_cmd} install --upgrade whatsapp-chat-exporter {'--pre' if include_beta else ''}")
logging.info("====================================")
else:
logger.info("You are using the latest version of WhatsApp Chat Exporter.\n")
logging.info("You are using the latest version of WhatsApp Chat Exporter.")
return 0
@@ -254,89 +253,235 @@ def import_from_json(json_file: str, data: ChatCollection):
data.add_chat(jid, chat)
pbar.update(1)
total_time = pbar.format_dict['elapsed']
logger.info(f"Imported {total_row_number} chats from JSON in {convert_time_unit(total_time)}{CLEAR_LINE}")
logging.info(f"Imported {total_row_number} chats from JSON in {convert_time_unit(total_time)}")
def incremental_merge(source_dir: str, target_dir: str, media_dir: str, pretty_print_json: int, avoid_encoding_json: bool):
"""Merges JSON files from the source directory into the target directory.
class IncrementalMerger:
"""Handles incremental merging of WhatsApp chat exports."""
def __init__(self, pretty_print_json: int, avoid_encoding_json: bool):
"""Initialize the merger with JSON formatting options.
Args:
pretty_print_json: JSON indentation level.
avoid_encoding_json: Whether to avoid ASCII encoding.
"""
self.pretty_print_json = pretty_print_json
self.avoid_encoding_json = avoid_encoding_json
def _get_json_files(self, source_dir: str) -> List[str]:
"""Get list of JSON files from source directory.
Args:
source_dir: Path to the source directory.
Returns:
List of JSON filenames.
Raises:
SystemExit: If no JSON files are found.
"""
json_files = [f for f in os.listdir(source_dir) if f.endswith('.json')]
if not json_files:
logging.error("No JSON files found in the source directory.")
raise SystemExit(1)
logging.debug("JSON files found:", json_files)
return json_files
Args:
source_dir (str): The path to the source directory containing JSON files.
target_dir (str): The path to the target directory to merge into.
media_dir (str): The path to the media directory.
"""
json_files = [f for f in os.listdir(source_dir) if f.endswith('.json')]
if not json_files:
logger.error("No JSON files found in the source directory.")
return
def _copy_new_file(self, source_path: str, target_path: str, target_dir: str, json_file: str) -> None:
"""Copy a new JSON file to target directory.
Args:
source_path: Path to source file.
target_path: Path to target file.
target_dir: Target directory path.
json_file: Name of the JSON file.
"""
logging.info(f"Copying '{json_file}' to target directory...")
os.makedirs(target_dir, exist_ok=True)
shutil.copy2(source_path, target_path)
logger.info("JSON files found:", json_files)
def _load_chat_data(self, file_path: str) -> Dict[str, Any]:
"""Load JSON data from file.
Args:
file_path: Path to JSON file.
Returns:
Loaded JSON data.
"""
with open(file_path, 'r') as file:
return json.load(file)
for json_file in json_files:
source_path = os.path.join(source_dir, json_file)
target_path = os.path.join(target_dir, json_file)
def _parse_chats_from_json(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Parse JSON data into ChatStore objects.
Args:
data: Raw JSON data.
Returns:
Dictionary of JID to ChatStore objects.
"""
return {jid: ChatStore.from_json(chat) for jid, chat in data.items()}
if not os.path.exists(target_path):
logger.info(f"Copying '{json_file}' to target directory...")
os.makedirs(target_dir, exist_ok=True)
shutil.copy2(source_path, target_path)
def _merge_chat_stores(self, source_chats: Dict[str, Any], target_chats: Dict[str, Any]) -> Dict[str, Any]:
"""Merge source chats into target chats.
Args:
source_chats: Source ChatStore objects.
target_chats: Target ChatStore objects.
Returns:
Merged ChatStore objects.
"""
for jid, chat in source_chats.items():
if jid in target_chats:
target_chats[jid].merge_with(chat)
else:
target_chats[jid] = chat
return target_chats
def _serialize_chats(self, chats: Dict[str, Any]) -> Dict[str, Any]:
"""Serialize ChatStore objects to JSON format.
Args:
chats: Dictionary of ChatStore objects.
Returns:
Serialized JSON data.
"""
return {jid: chat.to_json() for jid, chat in chats.items()}
def _has_changes(self, merged_data: Dict[str, Any], original_data: Dict[str, Any]) -> bool:
"""Check if merged data differs from original data.
Args:
merged_data: Merged JSON data.
original_data: Original JSON data.
Returns:
True if changes detected, False otherwise.
"""
return json.dumps(merged_data, sort_keys=True) != json.dumps(original_data, sort_keys=True)
def _save_merged_data(self, target_path: str, merged_data: Dict[str, Any]) -> None:
"""Save merged data to target file.
Args:
target_path: Path to target file.
merged_data: Merged JSON data.
"""
with open(target_path, 'w') as merged_file:
json.dump(
merged_data,
merged_file,
indent=self.pretty_print_json,
ensure_ascii=not self.avoid_encoding_json,
)
def _merge_json_file(self, source_path: str, target_path: str, json_file: str) -> None:
"""Merge a single JSON file.
Args:
source_path: Path to source file.
target_path: Path to target file.
json_file: Name of the JSON file.
"""
logging.info(f"Merging '{json_file}' with existing file in target directory...", extra={"clear": True})
source_data = self._load_chat_data(source_path)
target_data = self._load_chat_data(target_path)
source_chats = self._parse_chats_from_json(source_data)
target_chats = self._parse_chats_from_json(target_data)
merged_chats = self._merge_chat_stores(source_chats, target_chats)
merged_data = self._serialize_chats(merged_chats)
if self._has_changes(merged_data, target_data):
logging.info(f"Changes detected in '{json_file}', updating target file...")
self._save_merged_data(target_path, merged_data)
else:
logger.info(
f"Merging '{json_file}' with existing file in target directory...")
with open(source_path, 'r') as src_file, open(target_path, 'r') as tgt_file:
source_data = json.load(src_file)
target_data = json.load(tgt_file)
logging.info(f"No changes detected in '{json_file}', skipping update.")
# Parse JSON into ChatStore objects using from_json()
source_chats = {jid: ChatStore.from_json(
chat) for jid, chat in source_data.items()}
target_chats = {jid: ChatStore.from_json(
chat) for jid, chat in target_data.items()}
def _should_copy_media_file(self, source_file: str, target_file: str) -> bool:
"""Check if media file should be copied.
Args:
source_file: Path to source media file.
target_file: Path to target media file.
Returns:
True if file should be copied, False otherwise.
"""
return not os.path.exists(target_file) or os.path.getmtime(source_file) > os.path.getmtime(target_file)
# Merge chats using merge_with()
for jid, chat in source_chats.items():
if jid in target_chats:
target_chats[jid].merge_with(chat)
else:
target_chats[jid] = chat
# Serialize merged data
merged_data = {jid: chat.to_json()
for jid, chat in target_chats.items()}
# Check if the merged data differs from the original target data
if json.dumps(merged_data, sort_keys=True) != json.dumps(target_data, sort_keys=True):
logger.info(
f"Changes detected in '{json_file}', updating target file...")
with open(target_path, 'w') as merged_file:
json.dump(
merged_data,
merged_file,
indent=pretty_print_json,
ensure_ascii=not avoid_encoding_json,
)
else:
logger.info(
f"No changes detected in '{json_file}', skipping update.")
# Merge media directories
source_media_path = os.path.join(source_dir, media_dir)
target_media_path = os.path.join(target_dir, media_dir)
logger.info(
f"Merging media directories. Source: {source_media_path}, target: {target_media_path}")
if os.path.exists(source_media_path):
def _merge_media_directories(self, source_dir: str, target_dir: str, media_dir: str) -> None:
"""Merge media directories from source to target.
Args:
source_dir: Source directory path.
target_dir: Target directory path.
media_dir: Media directory name.
"""
source_media_path = os.path.join(source_dir, media_dir)
target_media_path = os.path.join(target_dir, media_dir)
logging.info(f"Merging media directories. Source: {source_media_path}, target: {target_media_path}")
if not os.path.exists(source_media_path):
return
for root, _, files in os.walk(source_media_path):
relative_path = os.path.relpath(root, source_media_path)
target_root = os.path.join(target_media_path, relative_path)
os.makedirs(target_root, exist_ok=True)
for file in files:
source_file = os.path.join(root, file)
target_file = os.path.join(target_root, file)
# we only copy if the file doesn't exist in the target or if the source is newer
if not os.path.exists(target_file) or os.path.getmtime(source_file) > os.path.getmtime(target_file):
logger.info(f"Copying '{source_file}' to '{target_file}'...")
if self._should_copy_media_file(source_file, target_file):
logging.debug(f"Copying '{source_file}' to '{target_file}'...")
shutil.copy2(source_file, target_file)
def merge(self, source_dir: str, target_dir: str, media_dir: str) -> None:
"""Merge JSON files and media from source to target directory.
Args:
source_dir: The path to the source directory containing JSON files.
target_dir: The path to the target directory to merge into.
media_dir: The path to the media directory.
"""
json_files = self._get_json_files(source_dir)
logging.info("Starting incremental merge process...")
for json_file in json_files:
source_path = os.path.join(source_dir, json_file)
target_path = os.path.join(target_dir, json_file)
if not os.path.exists(target_path):
self._copy_new_file(source_path, target_path, target_dir, json_file)
else:
self._merge_json_file(source_path, target_path, json_file)
self._merge_media_directories(source_dir, target_dir, media_dir)
def incremental_merge(source_dir: str, target_dir: str, media_dir: str, pretty_print_json: int, avoid_encoding_json: bool) -> None:
"""Wrapper for merging JSON files from the source directory into the target directory.
Args:
source_dir: The path to the source directory containing JSON files.
target_dir: The path to the target directory to merge into.
media_dir: The path to the media directory.
pretty_print_json: JSON indentation level.
avoid_encoding_json: Whether to avoid ASCII encoding.
"""
merger = IncrementalMerger(pretty_print_json, avoid_encoding_json)
merger.merge(source_dir, target_dir, media_dir)
def get_file_name(contact: str, chat: ChatStore) -> Tuple[str, str]:
"""Generates a sanitized filename and contact name for a chat.
@@ -384,9 +529,41 @@ def get_cond_for_empty(enable: bool, jid_field: str, broadcast_field: str) -> st
return f"AND (chat.hidden=0 OR {jid_field}='status@broadcast' OR {broadcast_field}>0)" if enable else ""
def get_chat_condition(filter: Optional[List[str]], include: bool, columns: List[str], jid: Optional[str] = None, platform: Optional[str] = None) -> str:
def _get_group_condition(jid: str, platform: str) -> str:
"""Generate platform-specific group identification condition.
Args:
jid: The JID column name.
platform: The platform ("android" or "ios").
Returns:
SQL condition string for group identification.
Raises:
ValueError: If platform is not supported.
"""
if platform == "android":
return f"{jid}.type == 1"
elif platform == "ios":
return f"{jid} IS NOT NULL"
else:
raise ValueError(
"Only android and ios are supported for argument platform if jid is not None")
def get_chat_condition(
filter: Optional[List[str]],
include: bool,
columns: List[str],
jid: Optional[str] = None,
platform: Optional[str] = None
) -> str:
"""Generates a SQL condition for filtering chats based on inclusion or exclusion criteria.
SQL injection risks from chat filters were evaluated during development and deemed negligible
due to the tool's offline, trusted-input model (user running this tool on WhatsApp
backups/databases on their own device).
Args:
filter: A list of phone numbers to include or exclude.
include: True to include chats that match the filter, False to exclude them.
@@ -400,35 +577,39 @@ def get_chat_condition(filter: Optional[List[str]], include: bool, columns: List
Raises:
ValueError: If the column count is invalid or an unsupported platform is provided.
"""
if filter is not None:
conditions = []
if len(columns) < 2 and jid is not None:
raise ValueError(
"There must be at least two elements in argument columns if jid is not None")
if jid is not None:
if platform == "android":
is_group = f"{jid}.type == 1"
elif platform == "ios":
is_group = f"{jid} IS NOT NULL"
else:
raise ValueError(
"Only android and ios are supported for argument platform if jid is not None")
for index, chat in enumerate(filter):
if include:
conditions.append(
f"{' OR' if index > 0 else ''} {columns[0]} LIKE '%{chat}%'")
if len(columns) > 1:
conditions.append(
f" OR ({columns[1]} LIKE '%{chat}%' AND {is_group})")
else:
conditions.append(
f"{' AND' if index > 0 else ''} {columns[0]} NOT LIKE '%{chat}%'")
if len(columns) > 1:
conditions.append(
f" AND ({columns[1]} NOT LIKE '%{chat}%' AND {is_group})")
return f"AND ({' '.join(conditions)})"
else:
if not filter:
return ""
if jid is not None and len(columns) < 2:
raise ValueError(
"There must be at least two elements in argument columns if jid is not None")
# Get group condition if needed
is_group_condition = None
if jid is not None:
is_group_condition = _get_group_condition(jid, platform)
# Build conditions for each chat filter
conditions = []
for index, chat in enumerate(filter):
# Add connector for subsequent conditions (with double space)
connector = " OR" if include else " AND"
prefix = connector if index > 0 else ""
# Primary column condition
operator = "LIKE" if include else "NOT LIKE"
conditions.append(f"{prefix} {columns[0]} {operator} '%{chat}%'")
# Secondary column condition for groups
if len(columns) > 1 and is_group_condition:
if include:
group_condition = f" OR ({columns[1]} {operator} '%{chat}%' AND {is_group_condition})"
else:
group_condition = f" AND ({columns[1]} {operator} '%{chat}%' AND {is_group_condition})"
conditions.append(group_condition)
combined_conditions = "".join(conditions)
return f"AND ({combined_conditions})"
# Android Specific
@@ -584,7 +765,7 @@ def check_jid_map(db: sqlite3.Connection) -> bool:
"""
cursor = db.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='jid_map'")
return cursor.fetchone()is not None
return cursor.fetchone() is not None
def get_jid_map_join(jid_map_exists: bool) -> str:
@@ -634,6 +815,7 @@ def get_transcription_selection(db: sqlite3.Connection) -> str:
else:
return "NULL AS transcription_text"
def setup_template(template: Optional[str], no_avatar: bool, experimental: bool = False) -> jinja2.Template:
"""
Sets up the Jinja2 template environment and loads the template.
@@ -711,7 +893,7 @@ def get_chat_type(chat_id: str) -> str:
return "status_broadcast"
elif chat_id.endswith("@broadcast"):
return "broadcast_channel"
logger.warning(f"Unknown chat type for {chat_id}, defaulting to private_group{CLEAR_LINE}")
logging.warning(f"Unknown chat type for {chat_id}, defaulting to private_group")
return "private_group"

View File

@@ -3,10 +3,9 @@ import re
import quopri
from typing import List, TypedDict
from Whatsapp_Chat_Exporter.data_model import ChatStore
from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, Device
from Whatsapp_Chat_Exporter.utility import Device
logger = logging.getLogger(__name__)
class ExportedContactNumbers(TypedDict):
@@ -45,9 +44,9 @@ def decode_quoted_printable(value: str, charset: str) -> str:
return bytes_val.decode(charset, errors="replace")
except Exception:
# Fallback: return the original value if decoding fails
logger.warning(
logging.warning(
f"Failed to decode quoted-printable value: {value}, "
f"charset: {charset}. Please report this issue.{CLEAR_LINE}"
f"charset: {charset}. Please report this issue."
)
return value
@@ -67,13 +66,18 @@ def _parse_vcard_line(line: str) -> tuple[str, dict[str, str], str] | None:
value = line[colon_index + 1:].strip()
# Split property name from parameters
parts = prop_and_params.split(';')
property_name = parts[0].upper()
property_part, *params = prop_and_params.split(';')
# We only care about property name for now, but the grouping mechanism may be
# useful in the future if we want to associate multiple properties together.
parts = property_part.split('.')
_, property_name = parts if len(parts) == 2 else (None, parts[0])
property_name = property_name.upper()
parameters = {}
for part in parts[1:]:
if '=' in part:
key, val = part.split('=', 1)
for param in params:
if '=' in param:
key, val = param.split('=', 1)
parameters[key.upper()] = val.strip('"') # Remove potential quotes from value
return property_name, parameters, value
@@ -99,8 +103,9 @@ def get_vcard_value(entry: str, field_name: str) -> list[str]:
values.append(decode_quoted_printable(cached_line + line, charset))
cached_line = ""
else:
# Skip empty lines or lines that don't start with the target field (after stripping)
if not line or not line.upper().startswith(target_name):
# Skip empty lines or lines that don't start with the target
# field (after stripping), considering potential grouping prefixes
if not line or (not line.upper().startswith(target_name) and f".{target_name}" not in line.upper().split(':')[0]):
continue
parsed = _parse_vcard_line(line)
@@ -176,7 +181,7 @@ def read_vcards_file(vcf_file_path, default_country_code: str):
if contact := process_vcard_entry(vcard):
contacts.append(contact)
logger.info(f"Imported {len(contacts)} contacts/vcards{CLEAR_LINE}")
logging.info(f"Imported {len(contacts)} contacts/vcards")
return map_number_to_name(contacts, default_country_code)

View File

@@ -381,7 +381,26 @@
</a>
{% endif %}
<p class="text-[#111b21] text-sm message-text">
{% if msg.meta == true or msg.media == false and msg.data is none %}
{% if msg.poll %}
<div class="mb-1">
<p class="font-semibold text-sm mb-2">📊 {{ msg.poll.question }}</p>
{% for option in msg.poll.options %}
<div class="mb-1.5">
<div class="flex justify-between text-xs mb-0.5">
<span>{{ option.text }}</span>
<span class="text-[#667781] ml-2">{{ option.vote_count }}</span>
</div>
<div class="w-full bg-gray-200 rounded-full h-1.5">
<div class="bg-whatsapp rounded-full h-1.5" style="width: {{ option.vote_pct }}%"></div>
</div>
{% if option.voters %}
<p class="text-[10px] text-[#667781] mt-0.5">{{ option.voters | join(', ') }}</p>
{% endif %}
</div>
{% endfor %}
<p class="text-[10px] text-[#667781] mt-1 border-t border-gray-200 pt-1">{{ msg.poll.total_voters }} vote{{ 's' if msg.poll.total_voters != 1 else '' }}</p>
</div>
{% elif msg.meta == true or msg.media == false and msg.data is none %}
<div class="flex justify-center mb-2">
<div class="bg-[#FFF3C5] rounded-lg px-3 py-2 text-sm text-[#856404] flex items-center">
{% if msg.safe %}
@@ -487,7 +506,26 @@
</a>
{% endif %}
<p class="text-[#111b21] text-sm">
{% if msg.meta == true or msg.media == false and msg.data is none %}
{% if msg.poll %}
<div class="mb-1">
<p class="font-semibold text-sm mb-2">📊 {{ msg.poll.question }}</p>
{% for option in msg.poll.options %}
<div class="mb-1.5">
<div class="flex justify-between text-xs mb-0.5">
<span>{{ option.text }}</span>
<span class="text-[#667781] ml-2">{{ option.vote_count }}</span>
</div>
<div class="w-full bg-gray-200 rounded-full h-1.5">
<div class="bg-whatsapp rounded-full h-1.5" style="width: {{ option.vote_pct }}%"></div>
</div>
{% if option.voters %}
<p class="text-[10px] text-[#667781] mt-0.5">{{ option.voters | join(', ') }}</p>
{% endif %}
</div>
{% endfor %}
<p class="text-[10px] text-[#667781] mt-1 border-t border-gray-200 pt-1">{{ msg.poll.total_voters }} vote{{ 's' if msg.poll.total_voters != 1 else '' }}</p>
</div>
{% elif msg.meta == true or msg.media == false and msg.data is none %}
<div class="flex justify-center mb-2">
<div class="bg-[#FFF3C5] rounded-lg px-3 py-2 text-sm text-[#856404] flex items-center">
{% if msg.safe %}

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "whatsapp-chat-exporter"
version = "0.13.0rc2"
version = "0.13.0"
description = "A Whatsapp database parser that provides history of your Whatsapp conversations in HTML and JSON. Android, iOS, iPadOS, Crypt12, Crypt14, Crypt15 supported."
readme = "README.md"
authors = [
@@ -42,13 +42,12 @@ dependencies = [
[project.optional-dependencies]
android_backup = ["pycryptodome", "javaobj-py3"]
ios_backup = ["iphone_backup_decrypt @ git+https://github.com/KnugiHK/iphone_backup_decrypt"]
crypt12 = ["pycryptodome"]
crypt14 = ["pycryptodome"]
crypt15 = ["pycryptodome", "javaobj-py3"]
all = ["pycryptodome", "javaobj-py3", "iphone_backup_decrypt @ git+https://github.com/KnugiHK/iphone_backup_decrypt"]
everything = ["pycryptodome", "javaobj-py3", "iphone_backup_decrypt @ git+https://github.com/KnugiHK/iphone_backup_decrypt"]
backup = ["pycryptodome", "javaobj-py3", "iphone_backup_decrypt @ git+https://github.com/KnugiHK/iphone_backup_decrypt"]
all = ["pycryptodome", "javaobj-py3"]
everything = ["pycryptodome", "javaobj-py3"]
backup = ["pycryptodome", "javaobj-py3"]
[project.scripts]
wtsexporter = "Whatsapp_Chat_Exporter.__main__:main"

View File

@@ -42,3 +42,12 @@ VERSION:2.1
TEL;CELL:8889990001
ORG:AAA Car Service
END:VCARD
BEGIN:VCARD
VERSION:2.1
item1.TEL;CELL:7777777778
item2.TEL;CELL:7777777779
item1.FN:Racing Team
item2.FN:Racing Team
END:VCARD

55
tests/test_data_model.py Normal file
View File

@@ -0,0 +1,55 @@
import pytest
from Whatsapp_Chat_Exporter.data_model import TimeZone, Timing
from datetime import timedelta
class TestTimeZone:
def test_utcoffset(self):
tz = TimeZone(5.5)
assert tz.utcoffset(None) == timedelta(hours=5.5)
def test_dst(self):
tz = TimeZone(2)
assert tz.dst(None) == timedelta(0)
class TestTiming:
@pytest.mark.parametrize("offset, expected_hour", [
(8, "08:00"), # Integer (e.g., Hong Kong Standard Time)
(-8, "16:00"), # Negative Integer (e.g., PST)
(5.5, "05:30"), # Positive Float (e.g., IST)
(-3.5, "20:30"), # Negative Float (e.g., Newfoundland)
])
def test_format_timestamp_various_offsets(self, offset, expected_hour):
"""Verify that both int and float offsets calculate time correctly."""
t = Timing(offset)
result = t.format_timestamp(1672531200, "%H:%M")
assert result == expected_hour
@pytest.mark.parametrize("ts_input", [
1672531200, # Unix timestamp as int
1672531200.0, # Unix timestamp as float
])
def test_timestamp_input_types(self, ts_input):
"""Verify the method accepts both int and float timestamps."""
t = Timing(0)
result = t.format_timestamp(ts_input, "%Y")
assert result == "2023"
def test_timing_none_offset(self):
"""Verify initialization with None doesn't crash and uses system time."""
t = Timing(None)
assert t.tz is None
# Should still return a valid string based on local machine time without crashing
result = t.format_timestamp(1672531200, "%Y")
assert result == "2023"
def test_millisecond_scaling(self):
"""Verify that timestamps in milliseconds are correctly scaled down."""
t = Timing(0)
# Milliseconds as int
assert t.format_timestamp(1672531200000, "%Y") == "2023"
# Milliseconds as float
assert t.format_timestamp(1672531200000.0, "%Y") == "2023"

View File

@@ -9,6 +9,7 @@ from Whatsapp_Chat_Exporter.data_model import ChatStore
BASE_PATH = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared"
chat_data_1 = {
"12345678@s.whatsapp.net": {
'aliases': [],
"name": "Friend",
"type": "ios",
"my_avatar": os.path.join(BASE_PATH, "Media", "Profile", "Photo.jpg"),
@@ -44,6 +45,7 @@ chat_data_1 = {
chat_data_2 = {
"12345678@s.whatsapp.net": {
'aliases': [],
"name": "Friend",
"type": "ios",
"my_avatar": os.path.join(BASE_PATH, "Media", "Profile", "Photo.jpg"),
@@ -80,6 +82,7 @@ chat_data_2 = {
# Expected merged data - should contain all messages with all fields initialized as they would be by Message class
chat_data_merged = {
"12345678@s.whatsapp.net": {
'aliases': [],
"name": "Friend",
"type": "ios",
"my_avatar": os.path.join(BASE_PATH, "Media", "Profile", "Photo.jpg"),
@@ -107,7 +110,8 @@ chat_data_merged = {
"sticker": False,
"message_type": None,
"received_timestamp": None,
"read_timestamp": None
"read_timestamp": None,
"poll": None
},
"24691": {
"from_me": False,
@@ -128,7 +132,8 @@ chat_data_merged = {
"sticker": False,
"message_type": None,
"received_timestamp": None,
"read_timestamp": None
"read_timestamp": None,
"poll": None
},
"24692": {
"from_me": False,
@@ -149,7 +154,8 @@ chat_data_merged = {
"sticker": False,
"message_type": None,
"received_timestamp": None,
"read_timestamp": None
"read_timestamp": None,
"poll": None
},
}
}

View File

@@ -254,3 +254,99 @@ class TestSafeName:
def test_safe_name(self, input_text, expected_output):
result = safe_name(input_text)
assert result == expected_output
class TestGetChatCondition:
def test_no_filter(self):
"""Test when filter is None"""
result = get_chat_condition(None, True, ["column1", "column2"])
assert result == ""
result = get_chat_condition(None, False, ["column1"])
assert result == ""
def test_include_single_chat_single_column(self):
"""Test including a single chat with single column"""
result = get_chat_condition(["1234567890"], True, ["phone"])
assert result == "AND ( phone LIKE '%1234567890%')"
def test_include_multiple_chats_single_column(self):
"""Test including multiple chats with single column"""
result = get_chat_condition(["1234567890", "0987654321"], True, ["phone"])
assert result == "AND ( phone LIKE '%1234567890%' OR phone LIKE '%0987654321%')"
def test_exclude_single_chat_single_column(self):
"""Test excluding a single chat with single column"""
result = get_chat_condition(["1234567890"], False, ["phone"])
assert result == "AND ( phone NOT LIKE '%1234567890%')"
def test_exclude_multiple_chats_single_column(self):
"""Test excluding multiple chats with single column"""
result = get_chat_condition(["1234567890", "0987654321"], False, ["phone"])
assert result == "AND ( phone NOT LIKE '%1234567890%' AND phone NOT LIKE '%0987654321%')"
def test_include_with_jid_android(self):
"""Test including chats with JID for Android platform"""
result = get_chat_condition(["1234567890"], True, ["phone", "name"], "jid", "android")
assert result == "AND ( phone LIKE '%1234567890%' OR (name LIKE '%1234567890%' AND jid.type == 1))"
def test_include_with_jid_ios(self):
"""Test including chats with JID for iOS platform"""
result = get_chat_condition(["1234567890"], True, ["phone", "name"], "jid", "ios")
assert result == "AND ( phone LIKE '%1234567890%' OR (name LIKE '%1234567890%' AND jid IS NOT NULL))"
def test_exclude_with_jid_android(self):
"""Test excluding chats with JID for Android platform"""
result = get_chat_condition(["1234567890"], False, ["phone", "name"], "jid", "android")
assert result == "AND ( phone NOT LIKE '%1234567890%' AND (name NOT LIKE '%1234567890%' AND jid.type == 1))"
def test_exclude_with_jid_ios(self):
"""Test excluding chats with JID for iOS platform"""
result = get_chat_condition(["1234567890"], False, ["phone", "name"], "jid", "ios")
assert result == "AND ( phone NOT LIKE '%1234567890%' AND (name NOT LIKE '%1234567890%' AND jid IS NOT NULL))"
def test_multiple_chats_with_jid_android(self):
"""Test multiple chats with JID for Android platform"""
result = get_chat_condition(["1234567890", "0987654321"], True, ["phone", "name"], "jid", "android")
expected = "AND ( phone LIKE '%1234567890%' OR (name LIKE '%1234567890%' AND jid.type == 1) OR phone LIKE '%0987654321%' OR (name LIKE '%0987654321%' AND jid.type == 1))"
assert result == expected
def test_multiple_chats_exclude_with_jid_android(self):
"""Test excluding multiple chats with JID for Android platform"""
result = get_chat_condition(["1234567890", "0987654321"], False, ["phone", "name"], "jid", "android")
expected = "AND ( phone NOT LIKE '%1234567890%' AND (name NOT LIKE '%1234567890%' AND jid.type == 1) AND phone NOT LIKE '%0987654321%' AND (name NOT LIKE '%0987654321%' AND jid.type == 1))"
assert result == expected
def test_invalid_column_count_with_jid(self):
"""Test error when column count is less than 2 but jid is provided"""
with pytest.raises(ValueError, match="There must be at least two elements in argument columns if jid is not None"):
get_chat_condition(["1234567890"], True, ["phone"], "jid", "android")
def test_unsupported_platform(self):
"""Test error when unsupported platform is provided"""
with pytest.raises(ValueError, match="Only android and ios are supported for argument platform if jid is not None"):
get_chat_condition(["1234567890"], True, ["phone", "name"], "jid", "windows")
def test_empty_filter_list(self):
"""Test with empty filter list"""
result = get_chat_condition([], True, ["phone"])
assert result == ""
result = get_chat_condition([], False, ["phone"])
assert result == ""
def test_filter_with_empty_strings(self):
"""Test with filter containing empty strings"""
result = get_chat_condition(["", "1234567890"], True, ["phone"])
assert result == "AND ( phone LIKE '%%' OR phone LIKE '%1234567890%')"
result = get_chat_condition([""], True, ["phone"])
assert result == "AND ( phone LIKE '%%')"
def test_special_characters_in_filter(self):
"""Test with special characters in filter values"""
result = get_chat_condition(["test@example.com"], True, ["email"])
assert result == "AND ( email LIKE '%test@example.com%')"
result = get_chat_condition(["user-name"], True, ["username"])
assert result == "AND ( username LIKE '%user-name%')"

View File

@@ -1,7 +1,7 @@
# from contacts_names_from_vcards import readVCardsFile
import os
from Whatsapp_Chat_Exporter.vcards_contacts import normalize_number, read_vcards_file
from Whatsapp_Chat_Exporter.vcards_contacts import normalize_number, read_vcards_file, get_vcard_value
def test_readVCardsFile():
@@ -17,7 +17,7 @@ def test_readVCardsFile():
# Print the count and the name
print(f"{count}. {name}")
print(data)
assert len(data) == 6
assert len(data) == 8
# Test simple contact name
assert data[0][1] == "Sample Contact"
# Test complex name
@@ -30,6 +30,31 @@ def test_readVCardsFile():
assert data[4][1] == "James Peacock Elementary"
# Test business entry using ORG but not F/FN
assert data[5][1] == "AAA Car Service"
# Test grouped entry
assert data[6][1] == "Racing Team (1)"
assert data[7][1] == "Racing Team (2)"
def test_grouping_mechanism():
no_group_vcf = """
BEGIN:VCARD
VERSION:2.1
TEL;CELL:7777777778
TEL;CELL:7777777779
TEL;CELL:7777777780
ORG:Racing Team
END:VCARD"""
group_vcf = """
BEGIN:VCARD
VERSION:2.1
item1.TEL;CELL:7777777778
item2.TEL;CELL:7777777779
item3.TEL;CELL:7777777780
ORG:Racing Team
END:VCARD"""
assert get_vcard_value(no_group_vcf, "TEL") == ["7777777778", "7777777779", "7777777780"]
assert get_vcard_value(group_vcf, "TEL") == ["7777777778", "7777777779", "7777777780"]
def test_create_number_to_name_dicts():