mirror of
https://github.com/KnugiHK/WhatsApp-Chat-Exporter.git
synced 2026-06-10 18:03:02 +00:00
Compare commits
1 Commits
dev
...
fix-on2-pr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1dc1b7511f |
4
.github/workflows/ci.yml
vendored
4
.github/workflows/ci.yml
vendored
@@ -32,10 +32,10 @@ jobs:
|
|||||||
|
|
||||||
steps:
|
steps:
|
||||||
- name: Checkout code
|
- name: Checkout code
|
||||||
uses: actions/checkout@v6
|
uses: actions/checkout@v4
|
||||||
|
|
||||||
- name: Set up Python ${{ matrix.python-version }} on ${{ matrix.os }}
|
- name: Set up Python ${{ matrix.python-version }} on ${{ matrix.os }}
|
||||||
uses: actions/setup-python@v6
|
uses: actions/setup-python@v5
|
||||||
with:
|
with:
|
||||||
python-version: ${{ matrix.python-version }}
|
python-version: ${{ matrix.python-version }}
|
||||||
|
|
||||||
|
|||||||
16
.github/workflows/compile-binary.yml
vendored
16
.github/workflows/compile-binary.yml
vendored
@@ -34,7 +34,7 @@ jobs:
|
|||||||
uses: actions/attest-build-provenance@v3
|
uses: actions/attest-build-provenance@v3
|
||||||
with:
|
with:
|
||||||
subject-path: ./wtsexporter_linux_x64
|
subject-path: ./wtsexporter_linux_x64
|
||||||
- uses: actions/upload-artifact@v7
|
- uses: actions/upload-artifact@v6
|
||||||
with:
|
with:
|
||||||
name: binary-linux-x64
|
name: binary-linux-x64
|
||||||
path: ./wtsexporter_linux_x64
|
path: ./wtsexporter_linux_x64
|
||||||
@@ -58,10 +58,10 @@ jobs:
|
|||||||
Rename-Item -Path "wtsexporter.exe" -NewName "wtsexporter_win_x64.exe"
|
Rename-Item -Path "wtsexporter.exe" -NewName "wtsexporter_win_x64.exe"
|
||||||
Get-FileHash wtsexporter_win_x64.exe
|
Get-FileHash wtsexporter_win_x64.exe
|
||||||
- name: Generate artifact attestation
|
- name: Generate artifact attestation
|
||||||
uses: actions/attest-build-provenance@v4
|
uses: actions/attest-build-provenance@v3
|
||||||
with:
|
with:
|
||||||
subject-path: .\wtsexporter_win_x64.exe
|
subject-path: .\wtsexporter_win_x64.exe
|
||||||
- uses: actions/upload-artifact@v7
|
- uses: actions/upload-artifact@v6
|
||||||
with:
|
with:
|
||||||
name: binary-windows-x64
|
name: binary-windows-x64
|
||||||
path: .\wtsexporter_win_x64.exe
|
path: .\wtsexporter_win_x64.exe
|
||||||
@@ -85,10 +85,10 @@ jobs:
|
|||||||
Rename-Item -Path "wtsexporter.exe" -NewName "wtsexporter_win_arm64.exe"
|
Rename-Item -Path "wtsexporter.exe" -NewName "wtsexporter_win_arm64.exe"
|
||||||
Get-FileHash wtsexporter_win_arm64.exe
|
Get-FileHash wtsexporter_win_arm64.exe
|
||||||
- name: Generate artifact attestation
|
- name: Generate artifact attestation
|
||||||
uses: actions/attest-build-provenance@v4
|
uses: actions/attest-build-provenance@v3
|
||||||
with:
|
with:
|
||||||
subject-path: .\wtsexporter_win_arm64.exe
|
subject-path: .\wtsexporter_win_arm64.exe
|
||||||
- uses: actions/upload-artifact@v7
|
- uses: actions/upload-artifact@v6
|
||||||
with:
|
with:
|
||||||
name: binary-windows-arm64
|
name: binary-windows-arm64
|
||||||
path: .\wtsexporter_win_arm64.exe
|
path: .\wtsexporter_win_arm64.exe
|
||||||
@@ -114,10 +114,10 @@ jobs:
|
|||||||
mv wtsexporter wtsexporter_macos_arm64
|
mv wtsexporter wtsexporter_macos_arm64
|
||||||
shasum -a 256 wtsexporter_macos_arm64
|
shasum -a 256 wtsexporter_macos_arm64
|
||||||
- name: Generate artifact attestation
|
- name: Generate artifact attestation
|
||||||
uses: actions/attest-build-provenance@v4
|
uses: actions/attest-build-provenance@v3
|
||||||
with:
|
with:
|
||||||
subject-path: ./wtsexporter_macos_arm64
|
subject-path: ./wtsexporter_macos_arm64
|
||||||
- uses: actions/upload-artifact@v7
|
- uses: actions/upload-artifact@v6
|
||||||
with:
|
with:
|
||||||
name: binary-macos-arm64
|
name: binary-macos-arm64
|
||||||
path: ./wtsexporter_macos_arm64
|
path: ./wtsexporter_macos_arm64
|
||||||
@@ -146,7 +146,7 @@ jobs:
|
|||||||
uses: actions/attest-build-provenance@v3
|
uses: actions/attest-build-provenance@v3
|
||||||
with:
|
with:
|
||||||
subject-path: ./wtsexporter_macos_x64
|
subject-path: ./wtsexporter_macos_x64
|
||||||
- uses: actions/upload-artifact@v7
|
- uses: actions/upload-artifact@v6
|
||||||
with:
|
with:
|
||||||
name: binary-macos-x64
|
name: binary-macos-x64
|
||||||
path: ./wtsexporter_macos_x64
|
path: ./wtsexporter_macos_x64
|
||||||
4
.github/workflows/python-publish.yml
vendored
4
.github/workflows/python-publish.yml
vendored
@@ -22,9 +22,9 @@ jobs:
|
|||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v6
|
- uses: actions/checkout@v4
|
||||||
- name: Set up Python
|
- name: Set up Python
|
||||||
uses: actions/setup-python@v6
|
uses: actions/setup-python@v5
|
||||||
with:
|
with:
|
||||||
python-version: '3.x'
|
python-version: '3.x'
|
||||||
- name: Install dependencies
|
- name: Install dependencies
|
||||||
|
|||||||
@@ -197,17 +197,6 @@ def setup_argument_parser() -> ArgumentParser:
|
|||||||
help="Create a copy of the media seperated per chat in <MEDIA>/separated/ directory"
|
help="Create a copy of the media seperated per chat in <MEDIA>/separated/ directory"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Media Timestamp Options
|
|
||||||
timestamp_group = parser.add_argument_group('Media Timestamp Options')
|
|
||||||
timestamp_group.add_argument(
|
|
||||||
"--embed-exif", dest="embed_exif", default=False, action='store_true',
|
|
||||||
help="Embed message timestamp in EXIF data of media files (requires piexif/Pillow)"
|
|
||||||
)
|
|
||||||
timestamp_group.add_argument(
|
|
||||||
"--rename-media", dest="rename_media", default=False, action='store_true',
|
|
||||||
help="Rename media files with timestamp prefix (YYYY-MM-DD_HH-MM-SS_filename)"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Filtering options
|
# Filtering options
|
||||||
filter_group = parser.add_argument_group('Filtering Options')
|
filter_group = parser.add_argument_group('Filtering Options')
|
||||||
filter_group.add_argument(
|
filter_group.add_argument(
|
||||||
@@ -382,17 +371,6 @@ def validate_args(parser: ArgumentParser, args) -> None:
|
|||||||
validate_chat_filters(parser, args.filter_chat_include)
|
validate_chat_filters(parser, args.filter_chat_include)
|
||||||
validate_chat_filters(parser, args.filter_chat_exclude)
|
validate_chat_filters(parser, args.filter_chat_exclude)
|
||||||
|
|
||||||
# EXIF dependency validation
|
|
||||||
if args.embed_exif:
|
|
||||||
try:
|
|
||||||
import piexif
|
|
||||||
from PIL import Image
|
|
||||||
except ImportError:
|
|
||||||
parser.error(
|
|
||||||
"--embed-exif requires piexif and Pillow. "
|
|
||||||
"Install with: pip install whatsapp-chat-exporter[media_timestamp]"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def validate_chat_filters(parser: ArgumentParser, chat_filter: Optional[List[str]]) -> None:
|
def validate_chat_filters(parser: ArgumentParser, chat_filter: Optional[List[str]]) -> None:
|
||||||
"""Validate chat filters to ensure they contain only phone numbers."""
|
"""Validate chat filters to ensure they contain only phone numbers."""
|
||||||
@@ -591,8 +569,7 @@ def process_messages(args, data: ChatCollection) -> None:
|
|||||||
# Process media
|
# Process media
|
||||||
message_handler.media(
|
message_handler.media(
|
||||||
db, data, args.media, args.filter_date,
|
db, data, args.media, args.filter_date,
|
||||||
filter_chat, args.filter_empty, args.separate_media, args.fix_dot_files,
|
filter_chat, args.filter_empty, args.separate_media, args.fix_dot_files
|
||||||
args.embed_exif, args.rename_media, args.timezone_offset
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Process vcards
|
# Process vcards
|
||||||
@@ -601,12 +578,6 @@ def process_messages(args, data: ChatCollection) -> None:
|
|||||||
filter_chat, args.filter_empty
|
filter_chat, args.filter_empty
|
||||||
)
|
)
|
||||||
|
|
||||||
# Process polls
|
|
||||||
message_handler.polls(
|
|
||||||
db, data, args.filter_date,
|
|
||||||
filter_chat, args.filter_empty
|
|
||||||
)
|
|
||||||
|
|
||||||
# Process calls
|
# Process calls
|
||||||
process_calls(args, db, data, filter_chat, timing)
|
process_calls(args, db, data, filter_chat, timing)
|
||||||
|
|
||||||
@@ -776,7 +747,7 @@ def setup_logging(level):
|
|||||||
|
|
||||||
if level == logging.DEBUG:
|
if level == logging.DEBUG:
|
||||||
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
|
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
|
||||||
log_handler_file = logging.FileHandler(f"wtsexporter-debug-{timestamp}.log", mode="w")
|
log_handler_file = logging.FileHandler(f"wtsexpoter-debug-{timestamp}.log", mode="w")
|
||||||
log_handler_file.terminator = ""
|
log_handler_file.terminator = ""
|
||||||
log_handler_file.addFilter(ClearLineFilter())
|
log_handler_file.addFilter(ClearLineFilter())
|
||||||
handlers.append(log_handler_file)
|
handlers.append(log_handler_file)
|
||||||
|
|||||||
@@ -198,7 +198,7 @@ def _decrypt_crypt14(database: bytes, main_key: bytes, max_worker: int = 10) ->
|
|||||||
f"The offsets of your IV and database are {start_iv} and {start_db}, respectively."
|
f"The offsets of your IV and database are {start_iv} and {start_db}, respectively."
|
||||||
)
|
)
|
||||||
logging.info(
|
logging.info(
|
||||||
f"To include your offsets in the exporter, please report it in the discussion thread on GitHub:"
|
f"To include your offsets in the expoter, please report it in the discussion thread on GitHub:"
|
||||||
)
|
)
|
||||||
logging.info(f"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/discussions/47")
|
logging.info(f"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/discussions/47")
|
||||||
return result
|
return result
|
||||||
|
|||||||
@@ -15,7 +15,6 @@ from Whatsapp_Chat_Exporter.utility import MAX_SIZE, ROW_SIZE, JidType, Device,
|
|||||||
from Whatsapp_Chat_Exporter.utility import rendering, get_file_name, setup_template, get_cond_for_empty
|
from Whatsapp_Chat_Exporter.utility import rendering, get_file_name, setup_template, get_cond_for_empty
|
||||||
from Whatsapp_Chat_Exporter.utility import get_status_location, convert_time_unit, get_jid_map_selection
|
from Whatsapp_Chat_Exporter.utility import get_status_location, convert_time_unit, get_jid_map_selection
|
||||||
from Whatsapp_Chat_Exporter.utility import get_chat_condition, safe_name, bytes_to_readable, determine_metadata
|
from Whatsapp_Chat_Exporter.utility import get_chat_condition, safe_name, bytes_to_readable, determine_metadata
|
||||||
from Whatsapp_Chat_Exporter.media_timestamp import process_media_with_timestamp
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@@ -579,8 +578,7 @@ def _get_reactions(db, data):
|
|||||||
logging.info(f"Processed {total_row_number} reactions in {convert_time_unit(total_time)}")
|
logging.info(f"Processed {total_row_number} reactions in {convert_time_unit(total_time)}")
|
||||||
|
|
||||||
|
|
||||||
def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=True, fix_dot_files=False,
|
def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=True, fix_dot_files=False):
|
||||||
embed_exif=False, rename_media=False, timezone_offset=0):
|
|
||||||
"""
|
"""
|
||||||
Process WhatsApp media files from the database.
|
Process WhatsApp media files from the database.
|
||||||
|
|
||||||
@@ -592,10 +590,6 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa
|
|||||||
filter_chat: Chat filter conditions
|
filter_chat: Chat filter conditions
|
||||||
filter_empty: Filter for empty chats
|
filter_empty: Filter for empty chats
|
||||||
separate_media: Whether to separate media files by chat
|
separate_media: Whether to separate media files by chat
|
||||||
fix_dot_files: Whether to fix media files with leading dot in the name
|
|
||||||
embed_exif: Whether to embed EXIF timestamp in media files
|
|
||||||
rename_media: Whether to rename media files with timestamp prefix
|
|
||||||
timezone_offset: Hours offset from UTC for timestamp formatting
|
|
||||||
"""
|
"""
|
||||||
c = db.cursor()
|
c = db.cursor()
|
||||||
total_row_number = _get_media_count(c, filter_empty, filter_date, filter_chat)
|
total_row_number = _get_media_count(c, filter_empty, filter_date, filter_chat)
|
||||||
@@ -613,13 +607,11 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa
|
|||||||
|
|
||||||
with tqdm(total=total_row_number, desc="Processing media", unit="media", leave=False) as pbar:
|
with tqdm(total=total_row_number, desc="Processing media", unit="media", leave=False) as pbar:
|
||||||
while (content := _fetch_row_safely(content_cursor)) is not None:
|
while (content := _fetch_row_safely(content_cursor)) is not None:
|
||||||
_process_single_media(data, content, media_folder, mime, separate_media, fix_dot_files,
|
_process_single_media(data, content, media_folder, mime, separate_media, fix_dot_files)
|
||||||
embed_exif, rename_media, timezone_offset)
|
|
||||||
pbar.update(1)
|
pbar.update(1)
|
||||||
total_time = pbar.format_dict['elapsed']
|
total_time = pbar.format_dict['elapsed']
|
||||||
logging.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}")
|
logging.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}")
|
||||||
|
|
||||||
|
|
||||||
# Helper functions for media processing
|
# Helper functions for media processing
|
||||||
|
|
||||||
def _get_media_count(cursor, filter_empty, filter_date, filter_chat):
|
def _get_media_count(cursor, filter_empty, filter_date, filter_chat):
|
||||||
@@ -763,8 +755,7 @@ def _get_media_cursor_new(cursor, filter_empty, filter_date, filter_chat):
|
|||||||
return cursor
|
return cursor
|
||||||
|
|
||||||
|
|
||||||
def _process_single_media(data, content, media_folder, mime, separate_media, fix_dot_files=False,
|
def _process_single_media(data, content, media_folder, mime, separate_media, fix_dot_files=False):
|
||||||
embed_exif=False, rename_media=False, timezone_offset=0):
|
|
||||||
"""Process a single media file."""
|
"""Process a single media file."""
|
||||||
file_path = f"{media_folder}/{content['file_path']}"
|
file_path = f"{media_folder}/{content['file_path']}"
|
||||||
current_chat = data.get_chat(content["key_remote_jid"])
|
current_chat = data.get_chat(content["key_remote_jid"])
|
||||||
@@ -800,25 +791,10 @@ def _process_single_media(data, content, media_folder, mime, separate_media, fix
|
|||||||
new_folder = os.path.join(media_folder, "separated", chat_display_name)
|
new_folder = os.path.join(media_folder, "separated", chat_display_name)
|
||||||
Path(new_folder).mkdir(parents=True, exist_ok=True)
|
Path(new_folder).mkdir(parents=True, exist_ok=True)
|
||||||
new_path = os.path.join(new_folder, current_filename)
|
new_path = os.path.join(new_folder, current_filename)
|
||||||
# Use timestamp processing if enabled
|
shutil.copy2(file_path, new_path)
|
||||||
if embed_exif or rename_media:
|
message.data = new_path
|
||||||
final_path = process_media_with_timestamp(
|
|
||||||
file_path, new_path, message.timestamp,
|
|
||||||
timezone_offset, embed_exif, rename_media
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
final_path = new_path
|
|
||||||
shutil.copy2(file_path, final_path)
|
|
||||||
elif embed_exif or rename_media:
|
|
||||||
# Handle in-place processing when not separating
|
|
||||||
# Create a copy with timestamp processing in the same folder
|
|
||||||
final_path = process_media_with_timestamp(
|
|
||||||
file_path, file_path, message.timestamp,
|
|
||||||
timezone_offset, embed_exif, rename_media
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
final_path = file_path
|
message.data = file_path
|
||||||
message.data = final_path
|
|
||||||
else:
|
else:
|
||||||
message.data = "The media is missing"
|
message.data = "The media is missing"
|
||||||
message.mime = "media"
|
message.mime = "media"
|
||||||
@@ -956,26 +932,14 @@ def calls(db, data, timezone_offset, filter_chat):
|
|||||||
c = db.cursor()
|
c = db.cursor()
|
||||||
|
|
||||||
# Check if there are any calls that match the filter
|
# Check if there are any calls that match the filter
|
||||||
# The order matters here, modern query should be attempted first,
|
total_row_number = _get_calls_count(c, filter_chat)
|
||||||
# if it fails, we can be pretty sure that legacy one will work,
|
|
||||||
# but not the other way around. This is because legacy query is
|
|
||||||
# more simple and less likely to have issues with missing tables/columns.
|
|
||||||
try:
|
|
||||||
total_row_number = _get_calls_count_modern(c, filter_chat)
|
|
||||||
except sqlite3.OperationalError as e:
|
|
||||||
total_row_number = _get_calls_count_legacy(c, filter_chat)
|
|
||||||
if total_row_number == 0:
|
if total_row_number == 0:
|
||||||
return
|
return
|
||||||
|
|
||||||
logging.info(f"Processing calls...({total_row_number})", extra={"clear": True})
|
logging.info(f"Processing calls...({total_row_number})", extra={"clear": True})
|
||||||
|
|
||||||
# Fetch call data
|
# Fetch call data
|
||||||
# Again, we try modern query first and fallback to legacy if it fails,
|
calls_data = _fetch_calls_data(c, filter_chat)
|
||||||
# for the same reasons as above.
|
|
||||||
try:
|
|
||||||
calls_data = _fetch_calls_data_modern(c, filter_chat)
|
|
||||||
except sqlite3.OperationalError as e:
|
|
||||||
calls_data = _fetch_calls_data_legacy(c, filter_chat)
|
|
||||||
|
|
||||||
# Create a chat store for all calls
|
# Create a chat store for all calls
|
||||||
chat = ChatStore(Device.ANDROID, "WhatsApp Calls")
|
chat = ChatStore(Device.ANDROID, "WhatsApp Calls")
|
||||||
@@ -991,29 +955,7 @@ def calls(db, data, timezone_offset, filter_chat):
|
|||||||
data.add_chat("000000000000000", chat)
|
data.add_chat("000000000000000", chat)
|
||||||
logging.info(f"Processed {total_row_number} calls in {convert_time_unit(total_time)}")
|
logging.info(f"Processed {total_row_number} calls in {convert_time_unit(total_time)}")
|
||||||
|
|
||||||
|
def _get_calls_count(c, filter_chat):
|
||||||
def _get_calls_count_legacy(c, filter_chat):
|
|
||||||
"""Get the count of call records that match the filter."""
|
|
||||||
|
|
||||||
# Build the filter conditions
|
|
||||||
include_filter = get_chat_condition(filter_chat[0], True, ["key_remote_jid"])
|
|
||||||
exclude_filter = get_chat_condition(filter_chat[1], False, ["key_remote_jid"])
|
|
||||||
|
|
||||||
query = f"""SELECT count(),
|
|
||||||
jid.raw_string as key_remote_jid
|
|
||||||
FROM call_log
|
|
||||||
INNER JOIN jid
|
|
||||||
ON call_log.jid_row_id = jid._id
|
|
||||||
LEFT JOIN chat
|
|
||||||
ON call_log.jid_row_id = chat.jid_row_id
|
|
||||||
WHERE 1=1
|
|
||||||
{include_filter}
|
|
||||||
{exclude_filter}"""
|
|
||||||
c.execute(query)
|
|
||||||
return c.fetchone()[0]
|
|
||||||
|
|
||||||
|
|
||||||
def _get_calls_count_modern(c, filter_chat):
|
|
||||||
"""Get the count of call records that match the filter."""
|
"""Get the count of call records that match the filter."""
|
||||||
|
|
||||||
# Build the filter conditions
|
# Build the filter conditions
|
||||||
@@ -1038,36 +980,7 @@ def _get_calls_count_modern(c, filter_chat):
|
|||||||
return c.fetchone()[0]
|
return c.fetchone()[0]
|
||||||
|
|
||||||
|
|
||||||
def _fetch_calls_data_legacy(c, filter_chat):
|
def _fetch_calls_data(c, filter_chat):
|
||||||
"""Fetch call data from the database."""
|
|
||||||
|
|
||||||
# Build the filter conditions
|
|
||||||
include_filter = get_chat_condition(filter_chat[0], True, ["key_remote_jid"])
|
|
||||||
exclude_filter = get_chat_condition(filter_chat[1], False, ["key_remote_jid"])
|
|
||||||
|
|
||||||
query = f"""SELECT call_log._id,
|
|
||||||
jid.raw_string as key_remote_jid,
|
|
||||||
from_me,
|
|
||||||
call_id,
|
|
||||||
timestamp,
|
|
||||||
video_call,
|
|
||||||
duration,
|
|
||||||
call_result,
|
|
||||||
bytes_transferred,
|
|
||||||
chat.subject as chat_subject
|
|
||||||
FROM call_log
|
|
||||||
INNER JOIN jid
|
|
||||||
ON call_log.jid_row_id = jid._id
|
|
||||||
LEFT JOIN chat
|
|
||||||
ON call_log.jid_row_id = chat.jid_row_id
|
|
||||||
WHERE 1=1
|
|
||||||
{include_filter}
|
|
||||||
{exclude_filter}"""
|
|
||||||
c.execute(query)
|
|
||||||
return c
|
|
||||||
|
|
||||||
|
|
||||||
def _fetch_calls_data_modern(c, filter_chat):
|
|
||||||
"""Fetch call data from the database."""
|
"""Fetch call data from the database."""
|
||||||
|
|
||||||
# Build the filter conditions
|
# Build the filter conditions
|
||||||
@@ -1158,10 +1071,6 @@ def _construct_call_description(content, call):
|
|||||||
return description
|
return description
|
||||||
|
|
||||||
|
|
||||||
def polls(db, data, date_filter, chat_filter, empty_filter):
|
|
||||||
"""Placeholder for future polls processing implementation."""
|
|
||||||
return
|
|
||||||
|
|
||||||
# TODO: Marked for enhancement on multi-threaded processing
|
# TODO: Marked for enhancement on multi-threaded processing
|
||||||
def create_html(
|
def create_html(
|
||||||
data,
|
data,
|
||||||
|
|||||||
@@ -8,14 +8,14 @@ class Timing:
|
|||||||
Handles timestamp formatting with timezone support.
|
Handles timestamp formatting with timezone support.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, timezone_offset: Optional[Union[int, float]] = None) -> None:
|
def __init__(self, timezone_offset: Optional[int]) -> None:
|
||||||
"""
|
"""
|
||||||
Initialize Timing object.
|
Initialize Timing object.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
timezone_offset (Optional[Union[int, float]]): Hours offset from UTC. Defaults to None (auto-detect).
|
timezone_offset (Optional[int]): Hours offset from UTC
|
||||||
"""
|
"""
|
||||||
self.tz = TimeZone(timezone_offset) if timezone_offset is not None else None
|
self.timezone_offset = timezone_offset
|
||||||
|
|
||||||
def format_timestamp(self, timestamp: Optional[Union[int, float]], format: str) -> Optional[str]:
|
def format_timestamp(self, timestamp: Optional[Union[int, float]], format: str) -> Optional[str]:
|
||||||
"""
|
"""
|
||||||
@@ -30,7 +30,7 @@ class Timing:
|
|||||||
"""
|
"""
|
||||||
if timestamp is not None:
|
if timestamp is not None:
|
||||||
timestamp = timestamp / 1000 if timestamp > 9999999999 else timestamp
|
timestamp = timestamp / 1000 if timestamp > 9999999999 else timestamp
|
||||||
return datetime.fromtimestamp(timestamp, self.tz).strftime(format)
|
return datetime.fromtimestamp(timestamp, TimeZone(self.timezone_offset)).strftime(format)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
@@ -39,12 +39,12 @@ class TimeZone(tzinfo):
|
|||||||
Custom timezone class with fixed offset.
|
Custom timezone class with fixed offset.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, offset: Union[int, float]) -> None:
|
def __init__(self, offset: int) -> None:
|
||||||
"""
|
"""
|
||||||
Initialize TimeZone object.
|
Initialize TimeZone object.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
offset (Union[int, float]): Hours offset from UTC
|
offset (int): Hours offset from UTC
|
||||||
"""
|
"""
|
||||||
self.offset = offset
|
self.offset = offset
|
||||||
|
|
||||||
@@ -67,7 +67,6 @@ class ChatCollection(MutableMapping):
|
|||||||
"""Initialize an empty chat collection."""
|
"""Initialize an empty chat collection."""
|
||||||
self._chats: Dict[str, ChatStore] = {}
|
self._chats: Dict[str, ChatStore] = {}
|
||||||
self._system: Dict[str, Any] = {}
|
self._system: Dict[str, Any] = {}
|
||||||
self.set_system("master_lookup", {})
|
|
||||||
|
|
||||||
def __getitem__(self, key: str) -> 'ChatStore':
|
def __getitem__(self, key: str) -> 'ChatStore':
|
||||||
"""Get a chat by its ID. Required for dict-like access."""
|
"""Get a chat by its ID. Required for dict-like access."""
|
||||||
@@ -101,32 +100,21 @@ class ChatCollection(MutableMapping):
|
|||||||
Returns:
|
Returns:
|
||||||
Optional['ChatStore']: The chat if found, None otherwise
|
Optional['ChatStore']: The chat if found, None otherwise
|
||||||
"""
|
"""
|
||||||
if chat_id in self._chats:
|
return self._chats.get(chat_id)
|
||||||
return self._chats[chat_id]
|
|
||||||
elif chat_id in self.get_system("master_lookup"):
|
|
||||||
return self._chats[self.get_system("master_lookup")[chat_id]]
|
|
||||||
else:
|
|
||||||
return None
|
|
||||||
|
|
||||||
def add_chat(self, chat_id: str, chat: 'ChatStore', alias: Optional[str] = None) -> 'ChatStore':
|
def add_chat(self, chat_id: str, chat: 'ChatStore') -> None:
|
||||||
"""
|
"""
|
||||||
Add a new chat to the collection.
|
Add a new chat to the collection.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
chat_id (str): The ID for the chat
|
chat_id (str): The ID for the chat
|
||||||
chat (ChatStore): The chat to add
|
chat (ChatStore): The chat to add
|
||||||
alias (Optional[str]): An optional alias to associate with the chat ID
|
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
TypeError: If chat is not a ChatStore object
|
TypeError: If chat is not a ChatStore object
|
||||||
"""
|
"""
|
||||||
if not isinstance(chat, ChatStore):
|
if not isinstance(chat, ChatStore):
|
||||||
raise TypeError("Chat must be a ChatStore object")
|
raise TypeError("Chat must be a ChatStore object")
|
||||||
if chat_id in self._chats:
|
|
||||||
raise ValueError("Chat ID already exists. Use get_chat to retrieve existing chat.")
|
|
||||||
if alias:
|
|
||||||
self.get_system("master_lookup")[alias] = chat_id
|
|
||||||
chat.aliases.append(alias)
|
|
||||||
self._chats[chat_id] = chat
|
self._chats[chat_id] = chat
|
||||||
return self._chats[chat_id]
|
return self._chats[chat_id]
|
||||||
|
|
||||||
@@ -140,34 +128,6 @@ class ChatCollection(MutableMapping):
|
|||||||
if chat_id in self._chats:
|
if chat_id in self._chats:
|
||||||
del self._chats[chat_id]
|
del self._chats[chat_id]
|
||||||
|
|
||||||
def add_alias(self, alias: str, chat_id: str) -> bool:
|
|
||||||
"""
|
|
||||||
Add or modify an alias for a chat.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
alias (str): The alias to add
|
|
||||||
chat_id (str): The ID of the chat to associate the alias with
|
|
||||||
"""
|
|
||||||
if chat_id not in self._chats:
|
|
||||||
raise ValueError("Chat ID does not exist. Add chat first.")
|
|
||||||
self.get_system("master_lookup")[alias] = chat_id
|
|
||||||
return True
|
|
||||||
|
|
||||||
def remove_alias(self, alias: str) -> bool:
|
|
||||||
"""
|
|
||||||
Remove an alias.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
alias (str): The alias to remove
|
|
||||||
"""
|
|
||||||
|
|
||||||
if alias in self.get_system("master_lookup"):
|
|
||||||
self._chats[self.get_system("master_lookup")[alias]].aliases.remove(alias)
|
|
||||||
del self.get_system("master_lookup")[alias]
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def items(self):
|
def items(self):
|
||||||
"""Get chat items (id, chat) pairs."""
|
"""Get chat items (id, chat) pairs."""
|
||||||
return self._chats.items()
|
return self._chats.items()
|
||||||
@@ -248,7 +208,6 @@ class ChatStore:
|
|||||||
self.their_avatar_thumb = None
|
self.their_avatar_thumb = None
|
||||||
self.status = None
|
self.status = None
|
||||||
self.media_base = ""
|
self.media_base = ""
|
||||||
self.aliases = []
|
|
||||||
|
|
||||||
def __len__(self) -> int:
|
def __len__(self) -> int:
|
||||||
"""Get number of chats. Required for dict-like access."""
|
"""Get number of chats. Required for dict-like access."""
|
||||||
@@ -402,7 +361,6 @@ class Message:
|
|||||||
self.thumb = None # Android specific
|
self.thumb = None # Android specific
|
||||||
self.sticker = False
|
self.sticker = False
|
||||||
self.reactions = {}
|
self.reactions = {}
|
||||||
self.poll = None
|
|
||||||
|
|
||||||
def to_json(self) -> Dict[str, Any]:
|
def to_json(self) -> Dict[str, Any]:
|
||||||
"""Convert message to JSON-serializable dict."""
|
"""Convert message to JSON-serializable dict."""
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
#!/usr/bin/python3
|
#!/usr/bin/python3
|
||||||
|
|
||||||
import json
|
|
||||||
import os
|
import os
|
||||||
import logging
|
import logging
|
||||||
import shutil
|
import shutil
|
||||||
@@ -12,46 +11,27 @@ from markupsafe import escape as htmle
|
|||||||
from Whatsapp_Chat_Exporter.data_model import ChatStore, Message
|
from Whatsapp_Chat_Exporter.data_model import ChatStore, Message
|
||||||
from Whatsapp_Chat_Exporter.utility import APPLE_TIME, get_chat_condition, Device
|
from Whatsapp_Chat_Exporter.utility import APPLE_TIME, get_chat_condition, Device
|
||||||
from Whatsapp_Chat_Exporter.utility import bytes_to_readable, convert_time_unit, safe_name
|
from Whatsapp_Chat_Exporter.utility import bytes_to_readable, convert_time_unit, safe_name
|
||||||
from Whatsapp_Chat_Exporter.poll import decode_poll_from_receipt_blob
|
|
||||||
from Whatsapp_Chat_Exporter.media_timestamp import process_media_with_timestamp
|
|
||||||
|
|
||||||
|
|
||||||
def contacts(db, data):
|
def contacts(db, data):
|
||||||
"""Process WhatsApp contacts with name and status information."""
|
"""Process WhatsApp contacts with status information."""
|
||||||
c = db.cursor()
|
c = db.cursor()
|
||||||
c.execute("""SELECT count() FROM ZWAADDRESSBOOKCONTACT""")
|
c.execute("""SELECT count() FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""")
|
||||||
total_row_number = c.fetchone()[0]
|
total_row_number = c.fetchone()[0]
|
||||||
logging.info(f"Pre-processing contacts...({total_row_number})", extra={"clear": True})
|
logging.info(f"Pre-processing contacts...({total_row_number})", extra={"clear": True})
|
||||||
|
|
||||||
# Check if expected columns exist before querying,
|
c.execute("""SELECT ZWHATSAPPID, ZABOUTTEXT FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""")
|
||||||
# to handle different WhatsApp versions (mainly ZLID).
|
|
||||||
c.execute("PRAGMA table_info(ZWAADDRESSBOOKCONTACT)")
|
|
||||||
column_names = [info[1] for info in c.fetchall()]
|
|
||||||
all_cols = ["ZWHATSAPPID", "ZLID", "ZFULLNAME", "ZABOUTTEXT"]
|
|
||||||
columns = [col for col in all_cols if col in column_names]
|
|
||||||
|
|
||||||
c.execute(f"""SELECT {', '.join(columns)} FROM ZWAADDRESSBOOKCONTACT""")
|
|
||||||
with tqdm(total=total_row_number, desc="Processing contacts", unit="contact", leave=False) as pbar:
|
with tqdm(total=total_row_number, desc="Processing contacts", unit="contact", leave=False) as pbar:
|
||||||
while (content := c.fetchone()) is not None:
|
while (content := c.fetchone()) is not None:
|
||||||
zwhatsapp_id = content["ZWHATSAPPID"]
|
zwhatsapp_id = content["ZWHATSAPPID"]
|
||||||
if zwhatsapp_id is None:
|
|
||||||
pbar.update(1)
|
|
||||||
continue
|
|
||||||
if not zwhatsapp_id.endswith("@s.whatsapp.net"):
|
if not zwhatsapp_id.endswith("@s.whatsapp.net"):
|
||||||
zwhatsapp_id += "@s.whatsapp.net"
|
zwhatsapp_id += "@s.whatsapp.net"
|
||||||
|
|
||||||
current_chat = ChatStore(Device.IOS)
|
current_chat = ChatStore(Device.IOS)
|
||||||
if content["ZFULLNAME"]:
|
current_chat.status = content["ZABOUTTEXT"]
|
||||||
current_chat.name = content["ZFULLNAME"]
|
data.add_chat(zwhatsapp_id, current_chat)
|
||||||
if content["ZABOUTTEXT"]:
|
|
||||||
current_chat.status = content["ZABOUTTEXT"]
|
|
||||||
# Index by WhatsApp ID, with LID as alias if available
|
|
||||||
data.add_chat(
|
|
||||||
zwhatsapp_id,
|
|
||||||
current_chat,
|
|
||||||
content["ZLID"] if "ZLID" in columns and content["ZLID"] else None
|
|
||||||
)
|
|
||||||
|
|
||||||
pbar.update(1)
|
pbar.update(1)
|
||||||
total_time = pbar.format_dict['elapsed']
|
total_time = pbar.format_dict['elapsed']
|
||||||
logging.info(f"Pre-processed {total_row_number} contacts in {convert_time_unit(total_time)}")
|
logging.info(f"Pre-processed {total_row_number} contacts in {convert_time_unit(total_time)}")
|
||||||
@@ -144,12 +124,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
|
|||||||
current_chat = data.add_chat(contact_id, ChatStore(Device.IOS, contact_name, media_folder))
|
current_chat = data.add_chat(contact_id, ChatStore(Device.IOS, contact_name, media_folder))
|
||||||
else:
|
else:
|
||||||
current_chat = data.get_chat(contact_id)
|
current_chat = data.get_chat(contact_id)
|
||||||
# Only overwrite name if we have a better one (not a phone number)
|
current_chat.name = contact_name
|
||||||
# or if there's no existing name
|
|
||||||
if current_chat.name is None or contact_name is not None:
|
|
||||||
is_phone = contact_name.replace("+", "").replace(" ", "").isdigit() if contact_name else True
|
|
||||||
if not is_phone or current_chat.name is None:
|
|
||||||
current_chat.name = contact_name
|
|
||||||
current_chat.my_avatar = os.path.join(media_folder, "Media/Profile/Photo.jpg")
|
current_chat.my_avatar = os.path.join(media_folder, "Media/Profile/Photo.jpg")
|
||||||
|
|
||||||
# Process avatar images
|
# Process avatar images
|
||||||
@@ -158,17 +133,6 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
|
|||||||
total_time = pbar.format_dict['elapsed']
|
total_time = pbar.format_dict['elapsed']
|
||||||
logging.info(f"Processed {total_row_number} contacts in {convert_time_unit(total_time)}")
|
logging.info(f"Processed {total_row_number} contacts in {convert_time_unit(total_time)}")
|
||||||
|
|
||||||
# Pre-load push names for JIDs not yet in data (especially @lid group members)
|
|
||||||
c.execute("""SELECT ZJID, ZPUSHNAME FROM ZWAPROFILEPUSHNAME WHERE ZPUSHNAME IS NOT NULL""")
|
|
||||||
while (row := c.fetchone()) is not None:
|
|
||||||
jid = row["ZJID"]
|
|
||||||
if jid not in data:
|
|
||||||
push_chat = ChatStore(Device.IOS)
|
|
||||||
push_chat.name = row["ZPUSHNAME"]
|
|
||||||
data.add_chat(jid, push_chat)
|
|
||||||
elif data.get_chat(jid).name is None:
|
|
||||||
data.get_chat(jid).name = row["ZPUSHNAME"]
|
|
||||||
|
|
||||||
# Get message count
|
# Get message count
|
||||||
message_count_query = f"""
|
message_count_query = f"""
|
||||||
SELECT count()
|
SELECT count()
|
||||||
@@ -284,7 +248,7 @@ def process_message_data(message, content, is_group_message, data, message_map,
|
|||||||
|
|
||||||
# Handle metadata messages
|
# Handle metadata messages
|
||||||
if content["ZMESSAGETYPE"] == 6:
|
if content["ZMESSAGETYPE"] == 6:
|
||||||
return process_metadata_message(message, content, is_group_message, data)
|
return process_metadata_message(message, content, is_group_message)
|
||||||
|
|
||||||
# Handle quoted replies
|
# Handle quoted replies
|
||||||
if content["ZMETADATA"] is not None and content["ZMETADATA"].startswith(b"\x2a\x14") and not no_reply:
|
if content["ZMETADATA"] is not None and content["ZMETADATA"].startswith(b"\x2a\x14") and not no_reply:
|
||||||
@@ -292,15 +256,6 @@ def process_message_data(message, content, is_group_message, data, message_map,
|
|||||||
message.reply = quoted.decode()
|
message.reply = quoted.decode()
|
||||||
message.quoted_data = message_map.get(message.reply)
|
message.quoted_data = message_map.get(message.reply)
|
||||||
|
|
||||||
# Skip poll vote update messages (type 66)
|
|
||||||
if content["ZMESSAGETYPE"] == 66:
|
|
||||||
return True # Invalid, skip
|
|
||||||
|
|
||||||
# Handle poll messages (type 46) - will be enriched by polls() later
|
|
||||||
if content["ZMESSAGETYPE"] == 46:
|
|
||||||
message.data = "\U0001f4ca Poll"
|
|
||||||
return False # Valid, populated later by polls()
|
|
||||||
|
|
||||||
# Handle stickers
|
# Handle stickers
|
||||||
if content["ZMESSAGETYPE"] == 15:
|
if content["ZMESSAGETYPE"] == 15:
|
||||||
message.sticker = True
|
message.sticker = True
|
||||||
@@ -311,51 +266,21 @@ def process_message_data(message, content, is_group_message, data, message_map,
|
|||||||
return False # Message is valid
|
return False # Message is valid
|
||||||
|
|
||||||
|
|
||||||
def _parse_group_action(ztext, data):
|
def process_metadata_message(message, content, is_group_message):
|
||||||
if ztext.endswith("@lid") or ztext.endswith("@s.whatsapp.net"):
|
|
||||||
# This is likely a group member change action
|
|
||||||
# Not really sure actually
|
|
||||||
name = None
|
|
||||||
if ztext in data:
|
|
||||||
name = data.get_chat(ztext).name
|
|
||||||
if "@" in ztext:
|
|
||||||
fallback = ztext.split('@')[0]
|
|
||||||
else:
|
|
||||||
fallback = None
|
|
||||||
entity = name or fallback
|
|
||||||
|
|
||||||
return f"{entity} join the group"
|
|
||||||
|
|
||||||
elif ztext.startswith("{") and ztext.endswith("}"):
|
|
||||||
try:
|
|
||||||
metadata = json.loads(ztext)
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
return ztext # Not a JSON string, return as-is
|
|
||||||
entity = metadata.get('author', 'Someone')
|
|
||||||
if entity is not "Someone":
|
|
||||||
name = None
|
|
||||||
if entity in data:
|
|
||||||
name = data.get_chat(entity).name
|
|
||||||
if "@" in entity:
|
|
||||||
fallback = entity.split('@')[0]
|
|
||||||
else:
|
|
||||||
fallback = None
|
|
||||||
entity = name or fallback
|
|
||||||
return f"{entity} changed the group name to {metadata.get('subject', 'Unknown')}."
|
|
||||||
elif ztext == "admin_add":
|
|
||||||
return f"The administrator has restricted participant additions to admins only."
|
|
||||||
else:
|
|
||||||
return "Unsupported WhatsApp internal message."
|
|
||||||
|
|
||||||
|
|
||||||
def process_metadata_message(message, content, is_group_message, data):
|
|
||||||
"""Process metadata messages (action_type 6)."""
|
"""Process metadata messages (action_type 6)."""
|
||||||
if is_group_message:
|
if is_group_message:
|
||||||
# Group
|
# Group
|
||||||
if content["ZTEXT"] is not None:
|
if content["ZTEXT"] is not None:
|
||||||
message.data = _parse_group_action(content["ZTEXT"], data)
|
# Changed name
|
||||||
message.meta = True
|
try:
|
||||||
return False
|
int(content["ZTEXT"])
|
||||||
|
except ValueError:
|
||||||
|
msg = f"The group name changed to {content['ZTEXT']}"
|
||||||
|
message.data = msg
|
||||||
|
message.meta = True
|
||||||
|
return False # Valid message
|
||||||
|
else:
|
||||||
|
return True # Invalid message
|
||||||
else:
|
else:
|
||||||
message.data = None
|
message.data = None
|
||||||
return False
|
return False
|
||||||
@@ -386,8 +311,7 @@ def process_message_text(message, content):
|
|||||||
message.data = msg
|
message.data = msg
|
||||||
|
|
||||||
|
|
||||||
def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=False, fix_dot_files=False,
|
def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=False, fix_dot_files=False):
|
||||||
embed_exif=False, rename_media=False, timezone_offset=0):
|
|
||||||
"""Process media files from WhatsApp messages."""
|
"""Process media files from WhatsApp messages."""
|
||||||
c = db.cursor()
|
c = db.cursor()
|
||||||
|
|
||||||
@@ -445,15 +369,13 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa
|
|||||||
mime = MimeTypes()
|
mime = MimeTypes()
|
||||||
with tqdm(total=total_row_number, desc="Processing media", unit="media", leave=False) as pbar:
|
with tqdm(total=total_row_number, desc="Processing media", unit="media", leave=False) as pbar:
|
||||||
while (content := c.fetchone()) is not None:
|
while (content := c.fetchone()) is not None:
|
||||||
process_media_item(content, data, media_folder, mime, separate_media, fix_dot_files,
|
process_media_item(content, data, media_folder, mime, separate_media, fix_dot_files)
|
||||||
embed_exif, rename_media, timezone_offset)
|
|
||||||
pbar.update(1)
|
pbar.update(1)
|
||||||
total_time = pbar.format_dict['elapsed']
|
total_time = pbar.format_dict['elapsed']
|
||||||
logging.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}")
|
logging.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}")
|
||||||
|
|
||||||
|
|
||||||
def process_media_item(content, data, media_folder, mime, separate_media, fix_dot_files=False,
|
def process_media_item(content, data, media_folder, mime, separate_media, fix_dot_files=False):
|
||||||
embed_exif=False, rename_media=False, timezone_offset=0):
|
|
||||||
"""Process a single media item."""
|
"""Process a single media item."""
|
||||||
file_path = f"{media_folder}/Message/{content['ZMEDIALOCALPATH']}"
|
file_path = f"{media_folder}/Message/{content['ZMEDIALOCALPATH']}"
|
||||||
current_chat = data.get_chat(content["ZCONTACTJID"])
|
current_chat = data.get_chat(content["ZCONTACTJID"])
|
||||||
@@ -489,24 +411,10 @@ def process_media_item(content, data, media_folder, mime, separate_media, fix_do
|
|||||||
new_folder = os.path.join(media_folder, "separated", chat_display_name)
|
new_folder = os.path.join(media_folder, "separated", chat_display_name)
|
||||||
Path(new_folder).mkdir(parents=True, exist_ok=True)
|
Path(new_folder).mkdir(parents=True, exist_ok=True)
|
||||||
new_path = os.path.join(new_folder, current_filename)
|
new_path = os.path.join(new_folder, current_filename)
|
||||||
# Use timestamp processing if enabled
|
shutil.copy2(file_path, new_path)
|
||||||
if embed_exif or rename_media:
|
message.data = '/'.join(new_path.split("/")[1:])
|
||||||
final_path = process_media_with_timestamp(
|
|
||||||
file_path, new_path, message.timestamp,
|
|
||||||
timezone_offset, embed_exif, rename_media
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
final_path = new_path
|
|
||||||
shutil.copy2(file_path, final_path)
|
|
||||||
elif embed_exif or rename_media:
|
|
||||||
# Handle in-place processing when not separating
|
|
||||||
final_path = process_media_with_timestamp(
|
|
||||||
file_path, file_path, message.timestamp,
|
|
||||||
timezone_offset, embed_exif, rename_media
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
final_path = file_path
|
message.data = '/'.join(file_path.split("/")[1:])
|
||||||
message.data = os.path.join(*final_path.split(os.sep)[1:])
|
|
||||||
else:
|
else:
|
||||||
# Handle missing media
|
# Handle missing media
|
||||||
message.data = "The media is missing"
|
message.data = "The media is missing"
|
||||||
@@ -688,187 +596,6 @@ def process_call_record(content, chat, data, timezone_offset):
|
|||||||
chat.add_message(call.key_id, call)
|
chat.add_message(call.key_id, call)
|
||||||
|
|
||||||
|
|
||||||
def _resolve_voter_name(voter_jid, is_creator, message, data):
|
|
||||||
"""Resolve a voter JID to a display name.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
voter_jid (str or None): The voter's JID (often LID format like '123@lid').
|
|
||||||
is_creator (bool): Whether this voter is the poll creator.
|
|
||||||
message (Message): The poll message object.
|
|
||||||
data (ChatCollection): The chat data collection for name lookups.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: The resolved display name.
|
|
||||||
"""
|
|
||||||
if voter_jid is None:
|
|
||||||
if is_creator:
|
|
||||||
# Field 6 in the protobuf is always the device owner's vote,
|
|
||||||
# not the poll message sender's vote
|
|
||||||
return "You"
|
|
||||||
return "Unknown"
|
|
||||||
|
|
||||||
# Try direct lookup in data
|
|
||||||
if voter_jid in data:
|
|
||||||
chat = data.get_chat(voter_jid)
|
|
||||||
if chat is not None and chat.name:
|
|
||||||
return chat.name
|
|
||||||
|
|
||||||
# Try with @s.whatsapp.net suffix
|
|
||||||
if "@" not in voter_jid:
|
|
||||||
jid_with_suffix = f"{voter_jid}@s.whatsapp.net"
|
|
||||||
if jid_with_suffix in data:
|
|
||||||
chat = data.get_chat(jid_with_suffix)
|
|
||||||
if chat is not None and chat.name:
|
|
||||||
return chat.name
|
|
||||||
|
|
||||||
# Fallback: strip domain part
|
|
||||||
if "@" in voter_jid:
|
|
||||||
return voter_jid.split("@")[0]
|
|
||||||
return voter_jid
|
|
||||||
|
|
||||||
|
|
||||||
def polls(db, data, filter_date, filter_chat, filter_empty):
|
|
||||||
"""Process WhatsApp poll messages (type 46) from the database.
|
|
||||||
|
|
||||||
Queries ZWAMESSAGEINFO.ZRECEIPTINFO for poll messages, decodes the
|
|
||||||
protobuf blobs, and enriches the corresponding Message objects with
|
|
||||||
structured poll data.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
db: SQLite database connection.
|
|
||||||
data (ChatCollection): The chat data collection.
|
|
||||||
filter_date: Date filter SQL fragment or None.
|
|
||||||
filter_chat: Tuple of (include_filter, exclude_filter).
|
|
||||||
filter_empty: Whether to filter empty chats.
|
|
||||||
"""
|
|
||||||
c = db.cursor()
|
|
||||||
|
|
||||||
# Build filter conditions
|
|
||||||
chat_filter_include = get_chat_condition(
|
|
||||||
filter_chat[0], True, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")
|
|
||||||
chat_filter_exclude = get_chat_condition(
|
|
||||||
filter_chat[1], False, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")
|
|
||||||
date_filter = f'AND ZWAMESSAGE.ZMESSAGEDATE {filter_date}' if filter_date is not None else ''
|
|
||||||
|
|
||||||
# Count poll messages
|
|
||||||
count_query = f"""
|
|
||||||
SELECT count()
|
|
||||||
FROM ZWAMESSAGE
|
|
||||||
JOIN ZWAMESSAGEINFO ON ZWAMESSAGEINFO.ZMESSAGE = ZWAMESSAGE.Z_PK
|
|
||||||
INNER JOIN ZWACHATSESSION
|
|
||||||
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
|
|
||||||
LEFT JOIN ZWAGROUPMEMBER
|
|
||||||
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
|
|
||||||
WHERE ZWAMESSAGE.ZMESSAGETYPE = 46
|
|
||||||
AND ZWAMESSAGEINFO.ZRECEIPTINFO IS NOT NULL
|
|
||||||
{date_filter}
|
|
||||||
{chat_filter_include}
|
|
||||||
{chat_filter_exclude}
|
|
||||||
"""
|
|
||||||
c.execute(count_query)
|
|
||||||
total_row_number = c.fetchone()[0]
|
|
||||||
|
|
||||||
if total_row_number == 0:
|
|
||||||
return
|
|
||||||
|
|
||||||
logging.info(f"Processing polls...(0/{total_row_number})", extra={"clear": True})
|
|
||||||
|
|
||||||
# Fetch poll data
|
|
||||||
poll_query = f"""
|
|
||||||
SELECT ZWACHATSESSION.ZCONTACTJID,
|
|
||||||
ZWAMESSAGE.Z_PK AS ZMESSAGE,
|
|
||||||
ZWAMESSAGEINFO.ZRECEIPTINFO
|
|
||||||
FROM ZWAMESSAGE
|
|
||||||
JOIN ZWAMESSAGEINFO ON ZWAMESSAGEINFO.ZMESSAGE = ZWAMESSAGE.Z_PK
|
|
||||||
INNER JOIN ZWACHATSESSION
|
|
||||||
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
|
|
||||||
LEFT JOIN ZWAGROUPMEMBER
|
|
||||||
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
|
|
||||||
WHERE ZWAMESSAGE.ZMESSAGETYPE = 46
|
|
||||||
AND ZWAMESSAGEINFO.ZRECEIPTINFO IS NOT NULL
|
|
||||||
{date_filter}
|
|
||||||
{chat_filter_include}
|
|
||||||
{chat_filter_exclude}
|
|
||||||
ORDER BY ZWAMESSAGE.ZMESSAGEDATE ASC
|
|
||||||
"""
|
|
||||||
c.execute(poll_query)
|
|
||||||
|
|
||||||
with tqdm(total=total_row_number, desc="Processing polls", unit="poll", leave=False) as pbar:
|
|
||||||
while (content := c.fetchone()) is not None:
|
|
||||||
contact_id = content["ZCONTACTJID"]
|
|
||||||
message_pk = content["ZMESSAGE"]
|
|
||||||
receipt_blob = content["ZRECEIPTINFO"]
|
|
||||||
|
|
||||||
current_chat = data.get_chat(contact_id)
|
|
||||||
if current_chat is None:
|
|
||||||
pbar.update(1)
|
|
||||||
continue
|
|
||||||
|
|
||||||
message = current_chat.get_message(message_pk)
|
|
||||||
if message is None:
|
|
||||||
pbar.update(1)
|
|
||||||
continue
|
|
||||||
|
|
||||||
try:
|
|
||||||
poll_data = decode_poll_from_receipt_blob(receipt_blob)
|
|
||||||
except Exception as e:
|
|
||||||
logging.warning(f"Failed to decode poll {message_pk}: {e}")
|
|
||||||
pbar.update(1)
|
|
||||||
continue
|
|
||||||
|
|
||||||
if poll_data is None:
|
|
||||||
pbar.update(1)
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Build structured poll result with vote tallies
|
|
||||||
options = poll_data['options']
|
|
||||||
votes = poll_data['votes']
|
|
||||||
|
|
||||||
# Tally votes per option
|
|
||||||
option_votes = {i: [] for i in range(len(options))}
|
|
||||||
seen_voters = set()
|
|
||||||
for vote in votes:
|
|
||||||
voter_name = _resolve_voter_name(
|
|
||||||
vote.get('voter_jid'), vote.get('is_creator', False), message, data)
|
|
||||||
voter_key = vote.get('voter_jid') or ("__creator__" if vote.get('is_creator') else "__unknown__")
|
|
||||||
if voter_key not in seen_voters:
|
|
||||||
seen_voters.add(voter_key)
|
|
||||||
for idx in vote.get('selected_indices', []):
|
|
||||||
if 0 <= idx < len(options):
|
|
||||||
option_votes[idx].append(voter_name)
|
|
||||||
|
|
||||||
# Find max vote count for percentage calculation
|
|
||||||
max_votes = max((len(v) for v in option_votes.values()), default=0)
|
|
||||||
|
|
||||||
# Build option list with tallies
|
|
||||||
option_list = []
|
|
||||||
for i, opt_text in enumerate(options):
|
|
||||||
voters = option_votes.get(i, [])
|
|
||||||
vote_count = len(voters)
|
|
||||||
vote_pct = (vote_count / max_votes * 100) if max_votes > 0 else 0
|
|
||||||
option_list.append({
|
|
||||||
'text': opt_text,
|
|
||||||
'vote_count': vote_count,
|
|
||||||
'vote_pct': vote_pct,
|
|
||||||
'voters': voters,
|
|
||||||
})
|
|
||||||
|
|
||||||
total_voters = len(seen_voters)
|
|
||||||
|
|
||||||
# Set poll data on message
|
|
||||||
message.poll = {
|
|
||||||
'type': 'poll',
|
|
||||||
'question': poll_data['question'],
|
|
||||||
'options': option_list,
|
|
||||||
'total_voters': total_voters,
|
|
||||||
}
|
|
||||||
message.data = f"\U0001f4ca {poll_data['question']}"
|
|
||||||
|
|
||||||
pbar.update(1)
|
|
||||||
total_time = pbar.format_dict['elapsed']
|
|
||||||
logging.info(f"Processed {total_row_number} polls in {convert_time_unit(total_time)}")
|
|
||||||
|
|
||||||
|
|
||||||
def format_call_data(call, content):
|
def format_call_data(call, content):
|
||||||
"""Format call data message based on call attributes."""
|
"""Format call data message based on call attributes."""
|
||||||
# Basic call info
|
# Basic call info
|
||||||
|
|||||||
@@ -1,200 +0,0 @@
|
|||||||
"""
|
|
||||||
Media timestamp utilities for embedding EXIF data and renaming files.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import os
|
|
||||||
import logging
|
|
||||||
import shutil
|
|
||||||
from datetime import datetime
|
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
from Whatsapp_Chat_Exporter.data_model import TimeZone
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
# Optional imports for EXIF support
|
|
||||||
try:
|
|
||||||
import piexif
|
|
||||||
from PIL import Image
|
|
||||||
HAS_EXIF_SUPPORT = True
|
|
||||||
except ImportError:
|
|
||||||
HAS_EXIF_SUPPORT = False
|
|
||||||
|
|
||||||
|
|
||||||
def format_timestamp_for_filename(timestamp: float, timezone_offset: int = 0) -> str:
|
|
||||||
"""
|
|
||||||
Format a Unix timestamp for use in filenames.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
timestamp: Unix timestamp (seconds)
|
|
||||||
timezone_offset: Hours offset from UTC
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Formatted string: YYYY-MM-DD_HH-MM-SS
|
|
||||||
"""
|
|
||||||
dt = datetime.fromtimestamp(timestamp, TimeZone(timezone_offset))
|
|
||||||
return dt.strftime("%Y-%m-%d_%H-%M-%S")
|
|
||||||
|
|
||||||
|
|
||||||
def format_timestamp_for_exif(timestamp: float, timezone_offset: int = 0) -> str:
|
|
||||||
"""
|
|
||||||
Format a Unix timestamp for EXIF DateTime fields.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
timestamp: Unix timestamp (seconds)
|
|
||||||
timezone_offset: Hours offset from UTC
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Formatted string: YYYY:MM:DD HH:MM:SS (EXIF format)
|
|
||||||
"""
|
|
||||||
dt = datetime.fromtimestamp(timestamp, TimeZone(timezone_offset))
|
|
||||||
return dt.strftime("%Y:%m:%d %H:%M:%S")
|
|
||||||
|
|
||||||
|
|
||||||
def generate_timestamped_filename(
|
|
||||||
original_path: str,
|
|
||||||
timestamp: float,
|
|
||||||
timezone_offset: int = 0
|
|
||||||
) -> str:
|
|
||||||
"""
|
|
||||||
Generate a new filename with timestamp prefix.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
original_path: Original file path
|
|
||||||
timestamp: Unix timestamp (seconds)
|
|
||||||
timezone_offset: Hours offset from UTC
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
New filename with format: YYYY-MM-DD_HH-MM-SS_original-name.ext
|
|
||||||
"""
|
|
||||||
directory = os.path.dirname(original_path)
|
|
||||||
original_name = os.path.basename(original_path)
|
|
||||||
timestamp_prefix = format_timestamp_for_filename(timestamp, timezone_offset)
|
|
||||||
new_name = f"{timestamp_prefix}_{original_name}"
|
|
||||||
return os.path.join(directory, new_name)
|
|
||||||
|
|
||||||
|
|
||||||
def embed_exif_timestamp(
|
|
||||||
file_path: str,
|
|
||||||
timestamp: float,
|
|
||||||
timezone_offset: int = 0
|
|
||||||
) -> bool:
|
|
||||||
"""
|
|
||||||
Embed timestamp in EXIF data for supported image formats.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
file_path: Path to the image file
|
|
||||||
timestamp: Unix timestamp (seconds)
|
|
||||||
timezone_offset: Hours offset from UTC
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
True if successful, False otherwise
|
|
||||||
"""
|
|
||||||
if not HAS_EXIF_SUPPORT:
|
|
||||||
logger.warning("EXIF support not available. Install piexif and Pillow.")
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Check file extension
|
|
||||||
ext = os.path.splitext(file_path)[1].lower()
|
|
||||||
if ext not in ('.jpg', '.jpeg', '.tiff', '.tif'):
|
|
||||||
logger.debug(f"EXIF embedding not supported for {ext} files: {file_path}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
try:
|
|
||||||
exif_datetime = format_timestamp_for_exif(timestamp, timezone_offset)
|
|
||||||
exif_datetime_bytes = exif_datetime.encode('utf-8')
|
|
||||||
|
|
||||||
# Try to load existing EXIF data
|
|
||||||
try:
|
|
||||||
exif_dict = piexif.load(file_path)
|
|
||||||
except Exception:
|
|
||||||
# No existing EXIF, create empty structure
|
|
||||||
exif_dict = {"0th": {}, "Exif": {}, "GPS": {}, "1st": {}, "thumbnail": None}
|
|
||||||
|
|
||||||
# Set DateTime fields in Exif IFD
|
|
||||||
exif_dict["Exif"][piexif.ExifIFD.DateTimeOriginal] = exif_datetime_bytes
|
|
||||||
exif_dict["Exif"][piexif.ExifIFD.DateTimeDigitized] = exif_datetime_bytes
|
|
||||||
|
|
||||||
# Set DateTime in 0th IFD (basic TIFF tag)
|
|
||||||
exif_dict["0th"][piexif.ImageIFD.DateTime] = exif_datetime_bytes
|
|
||||||
|
|
||||||
# Dump and insert EXIF data
|
|
||||||
exif_bytes = piexif.dump(exif_dict)
|
|
||||||
piexif.insert(exif_bytes, file_path)
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Failed to embed EXIF in {file_path}: {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def _handle_duplicate_filename(file_path: str) -> str:
|
|
||||||
"""
|
|
||||||
Generate a unique filename by appending a counter if file exists.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
file_path: Original file path
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Unique file path with counter appended if necessary
|
|
||||||
"""
|
|
||||||
if not os.path.exists(file_path):
|
|
||||||
return file_path
|
|
||||||
|
|
||||||
base, ext = os.path.splitext(file_path)
|
|
||||||
counter = 1
|
|
||||||
|
|
||||||
while os.path.exists(file_path):
|
|
||||||
file_path = f"{base}_{counter}{ext}"
|
|
||||||
counter += 1
|
|
||||||
|
|
||||||
return file_path
|
|
||||||
|
|
||||||
|
|
||||||
def process_media_with_timestamp(
|
|
||||||
source_path: str,
|
|
||||||
dest_path: str,
|
|
||||||
timestamp: Optional[float],
|
|
||||||
timezone_offset: int = 0,
|
|
||||||
embed_exif: bool = False,
|
|
||||||
rename_media: bool = False
|
|
||||||
) -> str:
|
|
||||||
"""
|
|
||||||
Process a media file with optional timestamp embedding and renaming.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
source_path: Source file path
|
|
||||||
dest_path: Destination file path (may be modified if renaming)
|
|
||||||
timestamp: Unix timestamp (seconds), or None if unavailable
|
|
||||||
timezone_offset: Hours offset from UTC
|
|
||||||
embed_exif: Whether to embed EXIF timestamp
|
|
||||||
rename_media: Whether to rename file with timestamp prefix
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Final destination path (may differ from dest_path if renamed)
|
|
||||||
"""
|
|
||||||
# If no timestamp available, just copy
|
|
||||||
if timestamp is None:
|
|
||||||
if source_path != dest_path:
|
|
||||||
logger.warning(f"No timestamp available for {source_path}, skipping timestamp operations")
|
|
||||||
shutil.copy2(source_path, dest_path)
|
|
||||||
return dest_path
|
|
||||||
|
|
||||||
# Determine final path
|
|
||||||
final_path = dest_path
|
|
||||||
if rename_media:
|
|
||||||
final_path = generate_timestamped_filename(dest_path, timestamp, timezone_offset)
|
|
||||||
|
|
||||||
# Handle duplicate filenames
|
|
||||||
if os.path.exists(final_path) and final_path != source_path:
|
|
||||||
final_path = _handle_duplicate_filename(final_path)
|
|
||||||
|
|
||||||
# Copy file to destination
|
|
||||||
shutil.copy2(source_path, final_path)
|
|
||||||
|
|
||||||
# Embed EXIF if requested
|
|
||||||
if embed_exif:
|
|
||||||
embed_exif_timestamp(final_path, timestamp, timezone_offset)
|
|
||||||
|
|
||||||
return final_path
|
|
||||||
@@ -1,190 +0,0 @@
|
|||||||
"""
|
|
||||||
WhatsApp Poll decoder for iOS/macOS.
|
|
||||||
|
|
||||||
Decodes poll messages (ZMESSAGETYPE = 46) stored as protobuf blobs
|
|
||||||
in ZWAMESSAGEINFO.ZRECEIPTINFO. Uses raw varint/wire-type parsing
|
|
||||||
with no external protobuf library dependency.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import struct
|
|
||||||
import logging
|
|
||||||
|
|
||||||
|
|
||||||
def _decode_varint(data, pos):
|
|
||||||
"""Decode a protobuf varint starting at pos.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
data (bytes): The protobuf data.
|
|
||||||
pos (int): Starting position.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
tuple: (value, new_pos)
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
ValueError: If the varint is truncated.
|
|
||||||
"""
|
|
||||||
result = 0
|
|
||||||
shift = 0
|
|
||||||
while pos < len(data):
|
|
||||||
b = data[pos]
|
|
||||||
pos += 1
|
|
||||||
result |= (b & 0x7F) << shift
|
|
||||||
if not (b & 0x80):
|
|
||||||
return result, pos
|
|
||||||
shift += 7
|
|
||||||
raise ValueError("Truncated varint")
|
|
||||||
|
|
||||||
|
|
||||||
def decode_protobuf_fields(data):
|
|
||||||
"""
|
|
||||||
Decode raw protobuf bytes into list of (field_number, wire_type_name, value).
|
|
||||||
Handles: varint (0), fixed64 (1), length-delimited/bytes (2), fixed32 (5).
|
|
||||||
|
|
||||||
Args:
|
|
||||||
data (bytes): Raw protobuf data.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
list: List of (field_number, wire_type_name, value) tuples.
|
|
||||||
"""
|
|
||||||
fields = []
|
|
||||||
pos = 0
|
|
||||||
while pos < len(data):
|
|
||||||
try:
|
|
||||||
tag, pos = _decode_varint(data, pos)
|
|
||||||
field_num = tag >> 3
|
|
||||||
wire_type = tag & 0x7
|
|
||||||
|
|
||||||
if wire_type == 0: # varint
|
|
||||||
val, pos = _decode_varint(data, pos)
|
|
||||||
fields.append((field_num, 'varint', val))
|
|
||||||
elif wire_type == 2: # length-delimited
|
|
||||||
length, pos = _decode_varint(data, pos)
|
|
||||||
val = data[pos:pos + length]
|
|
||||||
pos += length
|
|
||||||
fields.append((field_num, 'bytes', val))
|
|
||||||
elif wire_type == 5: # fixed32
|
|
||||||
val = struct.unpack('<I', data[pos:pos + 4])[0]
|
|
||||||
pos += 4
|
|
||||||
fields.append((field_num, 'fixed32', val))
|
|
||||||
elif wire_type == 1: # fixed64
|
|
||||||
val = struct.unpack('<Q', data[pos:pos + 8])[0]
|
|
||||||
pos += 8
|
|
||||||
fields.append((field_num, 'fixed64', val))
|
|
||||||
else:
|
|
||||||
break # Unknown wire type, stop parsing
|
|
||||||
except Exception:
|
|
||||||
break
|
|
||||||
return fields
|
|
||||||
|
|
||||||
|
|
||||||
def _decode_vote_record(data):
|
|
||||||
"""Decode a single vote record sub-message.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
data (bytes): Raw protobuf data for a vote record.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
dict or None: Vote record with 'voter_jid' and 'selected_indices',
|
|
||||||
or None if the record is empty.
|
|
||||||
"""
|
|
||||||
fields = decode_protobuf_fields(data)
|
|
||||||
|
|
||||||
selected_indices = []
|
|
||||||
voter_jid = None
|
|
||||||
|
|
||||||
for fn, wt, val in fields:
|
|
||||||
if fn == 1 and wt == 'varint':
|
|
||||||
selected_indices.append(val)
|
|
||||||
elif fn == 4 and wt == 'bytes':
|
|
||||||
try:
|
|
||||||
voter_jid = val.decode('utf-8')
|
|
||||||
except Exception:
|
|
||||||
voter_jid = val.hex()
|
|
||||||
|
|
||||||
if not voter_jid and not selected_indices:
|
|
||||||
return None
|
|
||||||
|
|
||||||
return {
|
|
||||||
'voter_jid': voter_jid,
|
|
||||||
'selected_indices': selected_indices,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def decode_poll_from_receipt_blob(receipt_blob):
|
|
||||||
"""
|
|
||||||
Decode a WhatsApp poll from the ZWAMESSAGEINFO.ZRECEIPTINFO protobuf blob.
|
|
||||||
|
|
||||||
The blob has a top-level structure where field 8 contains the poll content.
|
|
||||||
The poll content has: question (field 2), options (field 3 repeated),
|
|
||||||
other voters (field 5 repeated), and creator vote (field 6).
|
|
||||||
|
|
||||||
Args:
|
|
||||||
receipt_blob (bytes): The ZRECEIPTINFO protobuf blob.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
dict or None: Decoded poll data with keys:
|
|
||||||
question (str): The poll question text
|
|
||||||
options (list[str]): The poll option texts, in order
|
|
||||||
votes (list[dict]): Each vote has:
|
|
||||||
voter_jid (str|None): Voter's JID (LID format)
|
|
||||||
selected_indices (list[int]): 0-based indices into options
|
|
||||||
is_creator (bool): True if this is the poll creator's vote
|
|
||||||
Returns None if the blob does not contain a valid poll.
|
|
||||||
"""
|
|
||||||
if not receipt_blob:
|
|
||||||
return None
|
|
||||||
|
|
||||||
top_fields = decode_protobuf_fields(receipt_blob)
|
|
||||||
|
|
||||||
# Find the poll content in field 8
|
|
||||||
poll_content = None
|
|
||||||
for fn, wt, val in top_fields:
|
|
||||||
if fn == 8 and wt == 'bytes':
|
|
||||||
poll_content = val
|
|
||||||
break
|
|
||||||
|
|
||||||
if not poll_content:
|
|
||||||
return None
|
|
||||||
|
|
||||||
poll_fields = decode_protobuf_fields(poll_content)
|
|
||||||
|
|
||||||
# Extract question (field 2, first string)
|
|
||||||
question = None
|
|
||||||
for fn, wt, val in poll_fields:
|
|
||||||
if fn == 2 and wt == 'bytes':
|
|
||||||
try:
|
|
||||||
question = val.decode('utf-8')
|
|
||||||
except Exception:
|
|
||||||
question = repr(val)
|
|
||||||
break
|
|
||||||
|
|
||||||
if not question:
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Extract options (field 3, repeated)
|
|
||||||
options = []
|
|
||||||
for fn, wt, val in poll_fields:
|
|
||||||
if fn == 3 and wt == 'bytes':
|
|
||||||
option_fields = decode_protobuf_fields(val)
|
|
||||||
for ofn, owt, oval in option_fields:
|
|
||||||
if ofn == 1 and owt == 'bytes':
|
|
||||||
try:
|
|
||||||
options.append(oval.decode('utf-8'))
|
|
||||||
except Exception:
|
|
||||||
options.append(repr(oval))
|
|
||||||
break
|
|
||||||
|
|
||||||
# Extract votes: field 5 = other participants, field 6 = creator
|
|
||||||
votes = []
|
|
||||||
for fn, wt, val in poll_fields:
|
|
||||||
if fn in (5, 6) and wt == 'bytes':
|
|
||||||
vote = _decode_vote_record(val)
|
|
||||||
if vote:
|
|
||||||
vote['is_creator'] = (fn == 6)
|
|
||||||
votes.append(vote)
|
|
||||||
|
|
||||||
return {
|
|
||||||
'question': question,
|
|
||||||
'options': options,
|
|
||||||
'votes': votes,
|
|
||||||
}
|
|
||||||
@@ -213,6 +213,9 @@ def rendering(
|
|||||||
if "??" not in headline:
|
if "??" not in headline:
|
||||||
raise ValueError("Headline must contain '??' to replace with name")
|
raise ValueError("Headline must contain '??' to replace with name")
|
||||||
headline = headline.replace("??", name)
|
headline = headline.replace("??", name)
|
||||||
|
# Create a temporary lookup map only at render-time;
|
||||||
|
# media preview in reply is a UI-specific concern and
|
||||||
|
# is ignored by the core database processing
|
||||||
with open(output_file_name, "w", encoding="utf-8") as f:
|
with open(output_file_name, "w", encoding="utf-8") as f:
|
||||||
f.write(
|
f.write(
|
||||||
template.render(
|
template.render(
|
||||||
@@ -226,7 +229,8 @@ def rendering(
|
|||||||
previous=previous,
|
previous=previous,
|
||||||
status=chat.status,
|
status=chat.status,
|
||||||
media_base=chat.media_base,
|
media_base=chat.media_base,
|
||||||
headline=headline
|
headline=headline,
|
||||||
|
msg_map={m.key_id: m for m in msgs}.get
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -66,18 +66,13 @@ def _parse_vcard_line(line: str) -> tuple[str, dict[str, str], str] | None:
|
|||||||
value = line[colon_index + 1:].strip()
|
value = line[colon_index + 1:].strip()
|
||||||
|
|
||||||
# Split property name from parameters
|
# Split property name from parameters
|
||||||
property_part, *params = prop_and_params.split(';')
|
parts = prop_and_params.split(';')
|
||||||
|
property_name = parts[0].upper()
|
||||||
# We only care about property name for now, but the grouping mechanism may be
|
|
||||||
# useful in the future if we want to associate multiple properties together.
|
|
||||||
parts = property_part.split('.')
|
|
||||||
_, property_name = parts if len(parts) == 2 else (None, parts[0])
|
|
||||||
property_name = property_name.upper()
|
|
||||||
|
|
||||||
parameters = {}
|
parameters = {}
|
||||||
for param in params:
|
for part in parts[1:]:
|
||||||
if '=' in param:
|
if '=' in part:
|
||||||
key, val = param.split('=', 1)
|
key, val = part.split('=', 1)
|
||||||
parameters[key.upper()] = val.strip('"') # Remove potential quotes from value
|
parameters[key.upper()] = val.strip('"') # Remove potential quotes from value
|
||||||
|
|
||||||
return property_name, parameters, value
|
return property_name, parameters, value
|
||||||
@@ -103,9 +98,8 @@ def get_vcard_value(entry: str, field_name: str) -> list[str]:
|
|||||||
values.append(decode_quoted_printable(cached_line + line, charset))
|
values.append(decode_quoted_printable(cached_line + line, charset))
|
||||||
cached_line = ""
|
cached_line = ""
|
||||||
else:
|
else:
|
||||||
# Skip empty lines or lines that don't start with the target
|
# Skip empty lines or lines that don't start with the target field (after stripping)
|
||||||
# field (after stripping), considering potential grouping prefixes
|
if not line or not line.upper().startswith(target_name):
|
||||||
if not line or (not line.upper().startswith(target_name) and f".{target_name}" not in line.upper().split(':')[0]):
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
parsed = _parse_vcard_line(line)
|
parsed = _parse_vcard_line(line)
|
||||||
|
|||||||
@@ -355,7 +355,7 @@
|
|||||||
{% endif %}
|
{% endif %}
|
||||||
</p>
|
</p>
|
||||||
</div>
|
</div>
|
||||||
{% set replied_msg = msgs | selectattr('key_id', 'equalto', msg.reply) | first %}
|
{% set replied_msg = msg_map(msg.reply) %}
|
||||||
{% if replied_msg and replied_msg.media == true %}
|
{% if replied_msg and replied_msg.media == true %}
|
||||||
<div class="flex-shrink-0">
|
<div class="flex-shrink-0">
|
||||||
{% if "image/" in replied_msg.mime %}
|
{% if "image/" in replied_msg.mime %}
|
||||||
@@ -381,26 +381,7 @@
|
|||||||
</a>
|
</a>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
<p class="text-[#111b21] text-sm message-text">
|
<p class="text-[#111b21] text-sm message-text">
|
||||||
{% if msg.poll %}
|
{% if msg.meta == true or msg.media == false and msg.data is none %}
|
||||||
<div class="mb-1">
|
|
||||||
<p class="font-semibold text-sm mb-2">📊 {{ msg.poll.question }}</p>
|
|
||||||
{% for option in msg.poll.options %}
|
|
||||||
<div class="mb-1.5">
|
|
||||||
<div class="flex justify-between text-xs mb-0.5">
|
|
||||||
<span>{{ option.text }}</span>
|
|
||||||
<span class="text-[#667781] ml-2">{{ option.vote_count }}</span>
|
|
||||||
</div>
|
|
||||||
<div class="w-full bg-gray-200 rounded-full h-1.5">
|
|
||||||
<div class="bg-whatsapp rounded-full h-1.5" style="width: {{ option.vote_pct }}%"></div>
|
|
||||||
</div>
|
|
||||||
{% if option.voters %}
|
|
||||||
<p class="text-[10px] text-[#667781] mt-0.5">{{ option.voters | join(', ') }}</p>
|
|
||||||
{% endif %}
|
|
||||||
</div>
|
|
||||||
{% endfor %}
|
|
||||||
<p class="text-[10px] text-[#667781] mt-1 border-t border-gray-200 pt-1">{{ msg.poll.total_voters }} vote{{ 's' if msg.poll.total_voters != 1 else '' }}</p>
|
|
||||||
</div>
|
|
||||||
{% elif msg.meta == true or msg.media == false and msg.data is none %}
|
|
||||||
<div class="flex justify-center mb-2">
|
<div class="flex justify-center mb-2">
|
||||||
<div class="bg-[#FFF3C5] rounded-lg px-3 py-2 text-sm text-[#856404] flex items-center">
|
<div class="bg-[#FFF3C5] rounded-lg px-3 py-2 text-sm text-[#856404] flex items-center">
|
||||||
{% if msg.safe %}
|
{% if msg.safe %}
|
||||||
@@ -480,7 +461,7 @@
|
|||||||
{% endif %}
|
{% endif %}
|
||||||
</p>
|
</p>
|
||||||
</div>
|
</div>
|
||||||
{% set replied_msg = msgs | selectattr('key_id', 'equalto', msg.reply) | first %}
|
{% set replied_msg = msg_map(msg.reply) %}
|
||||||
{% if replied_msg and replied_msg.media == true %}
|
{% if replied_msg and replied_msg.media == true %}
|
||||||
<div class="flex-shrink-0">
|
<div class="flex-shrink-0">
|
||||||
{% if "image/" in replied_msg.mime %}
|
{% if "image/" in replied_msg.mime %}
|
||||||
@@ -506,26 +487,7 @@
|
|||||||
</a>
|
</a>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
<p class="text-[#111b21] text-sm">
|
<p class="text-[#111b21] text-sm">
|
||||||
{% if msg.poll %}
|
{% if msg.meta == true or msg.media == false and msg.data is none %}
|
||||||
<div class="mb-1">
|
|
||||||
<p class="font-semibold text-sm mb-2">📊 {{ msg.poll.question }}</p>
|
|
||||||
{% for option in msg.poll.options %}
|
|
||||||
<div class="mb-1.5">
|
|
||||||
<div class="flex justify-between text-xs mb-0.5">
|
|
||||||
<span>{{ option.text }}</span>
|
|
||||||
<span class="text-[#667781] ml-2">{{ option.vote_count }}</span>
|
|
||||||
</div>
|
|
||||||
<div class="w-full bg-gray-200 rounded-full h-1.5">
|
|
||||||
<div class="bg-whatsapp rounded-full h-1.5" style="width: {{ option.vote_pct }}%"></div>
|
|
||||||
</div>
|
|
||||||
{% if option.voters %}
|
|
||||||
<p class="text-[10px] text-[#667781] mt-0.5">{{ option.voters | join(', ') }}</p>
|
|
||||||
{% endif %}
|
|
||||||
</div>
|
|
||||||
{% endfor %}
|
|
||||||
<p class="text-[10px] text-[#667781] mt-1 border-t border-gray-200 pt-1">{{ msg.poll.total_voters }} vote{{ 's' if msg.poll.total_voters != 1 else '' }}</p>
|
|
||||||
</div>
|
|
||||||
{% elif msg.meta == true or msg.media == false and msg.data is none %}
|
|
||||||
<div class="flex justify-center mb-2">
|
<div class="flex justify-center mb-2">
|
||||||
<div class="bg-[#FFF3C5] rounded-lg px-3 py-2 text-sm text-[#856404] flex items-center">
|
<div class="bg-[#FFF3C5] rounded-lg px-3 py-2 text-sm text-[#856404] flex items-center">
|
||||||
{% if msg.safe %}
|
{% if msg.safe %}
|
||||||
|
|||||||
@@ -42,12 +42,3 @@ VERSION:2.1
|
|||||||
TEL;CELL:8889990001
|
TEL;CELL:8889990001
|
||||||
ORG:AAA Car Service
|
ORG:AAA Car Service
|
||||||
END:VCARD
|
END:VCARD
|
||||||
|
|
||||||
BEGIN:VCARD
|
|
||||||
VERSION:2.1
|
|
||||||
item1.TEL;CELL:7777777778
|
|
||||||
item2.TEL;CELL:7777777779
|
|
||||||
item1.FN:Racing Team
|
|
||||||
item2.FN:Racing Team
|
|
||||||
END:VCARD
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,55 +0,0 @@
|
|||||||
import pytest
|
|
||||||
from Whatsapp_Chat_Exporter.data_model import TimeZone, Timing
|
|
||||||
from datetime import timedelta
|
|
||||||
|
|
||||||
|
|
||||||
class TestTimeZone:
|
|
||||||
def test_utcoffset(self):
|
|
||||||
tz = TimeZone(5.5)
|
|
||||||
assert tz.utcoffset(None) == timedelta(hours=5.5)
|
|
||||||
|
|
||||||
def test_dst(self):
|
|
||||||
tz = TimeZone(2)
|
|
||||||
assert tz.dst(None) == timedelta(0)
|
|
||||||
|
|
||||||
|
|
||||||
class TestTiming:
|
|
||||||
@pytest.mark.parametrize("offset, expected_hour", [
|
|
||||||
(8, "08:00"), # Integer (e.g., Hong Kong Standard Time)
|
|
||||||
(-8, "16:00"), # Negative Integer (e.g., PST)
|
|
||||||
(5.5, "05:30"), # Positive Float (e.g., IST)
|
|
||||||
(-3.5, "20:30"), # Negative Float (e.g., Newfoundland)
|
|
||||||
])
|
|
||||||
|
|
||||||
def test_format_timestamp_various_offsets(self, offset, expected_hour):
|
|
||||||
"""Verify that both int and float offsets calculate time correctly."""
|
|
||||||
t = Timing(offset)
|
|
||||||
result = t.format_timestamp(1672531200, "%H:%M")
|
|
||||||
assert result == expected_hour
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("ts_input", [
|
|
||||||
1672531200, # Unix timestamp as int
|
|
||||||
1672531200.0, # Unix timestamp as float
|
|
||||||
])
|
|
||||||
|
|
||||||
def test_timestamp_input_types(self, ts_input):
|
|
||||||
"""Verify the method accepts both int and float timestamps."""
|
|
||||||
t = Timing(0)
|
|
||||||
result = t.format_timestamp(ts_input, "%Y")
|
|
||||||
assert result == "2023"
|
|
||||||
|
|
||||||
def test_timing_none_offset(self):
|
|
||||||
"""Verify initialization with None doesn't crash and uses system time."""
|
|
||||||
t = Timing(None)
|
|
||||||
assert t.tz is None
|
|
||||||
# Should still return a valid string based on local machine time without crashing
|
|
||||||
result = t.format_timestamp(1672531200, "%Y")
|
|
||||||
assert result == "2023"
|
|
||||||
|
|
||||||
def test_millisecond_scaling(self):
|
|
||||||
"""Verify that timestamps in milliseconds are correctly scaled down."""
|
|
||||||
t = Timing(0)
|
|
||||||
# Milliseconds as int
|
|
||||||
assert t.format_timestamp(1672531200000, "%Y") == "2023"
|
|
||||||
# Milliseconds as float
|
|
||||||
assert t.format_timestamp(1672531200000.0, "%Y") == "2023"
|
|
||||||
@@ -9,7 +9,6 @@ from Whatsapp_Chat_Exporter.data_model import ChatStore
|
|||||||
BASE_PATH = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared"
|
BASE_PATH = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared"
|
||||||
chat_data_1 = {
|
chat_data_1 = {
|
||||||
"12345678@s.whatsapp.net": {
|
"12345678@s.whatsapp.net": {
|
||||||
'aliases': [],
|
|
||||||
"name": "Friend",
|
"name": "Friend",
|
||||||
"type": "ios",
|
"type": "ios",
|
||||||
"my_avatar": os.path.join(BASE_PATH, "Media", "Profile", "Photo.jpg"),
|
"my_avatar": os.path.join(BASE_PATH, "Media", "Profile", "Photo.jpg"),
|
||||||
@@ -45,7 +44,6 @@ chat_data_1 = {
|
|||||||
|
|
||||||
chat_data_2 = {
|
chat_data_2 = {
|
||||||
"12345678@s.whatsapp.net": {
|
"12345678@s.whatsapp.net": {
|
||||||
'aliases': [],
|
|
||||||
"name": "Friend",
|
"name": "Friend",
|
||||||
"type": "ios",
|
"type": "ios",
|
||||||
"my_avatar": os.path.join(BASE_PATH, "Media", "Profile", "Photo.jpg"),
|
"my_avatar": os.path.join(BASE_PATH, "Media", "Profile", "Photo.jpg"),
|
||||||
@@ -82,7 +80,6 @@ chat_data_2 = {
|
|||||||
# Expected merged data - should contain all messages with all fields initialized as they would be by Message class
|
# Expected merged data - should contain all messages with all fields initialized as they would be by Message class
|
||||||
chat_data_merged = {
|
chat_data_merged = {
|
||||||
"12345678@s.whatsapp.net": {
|
"12345678@s.whatsapp.net": {
|
||||||
'aliases': [],
|
|
||||||
"name": "Friend",
|
"name": "Friend",
|
||||||
"type": "ios",
|
"type": "ios",
|
||||||
"my_avatar": os.path.join(BASE_PATH, "Media", "Profile", "Photo.jpg"),
|
"my_avatar": os.path.join(BASE_PATH, "Media", "Profile", "Photo.jpg"),
|
||||||
@@ -110,8 +107,7 @@ chat_data_merged = {
|
|||||||
"sticker": False,
|
"sticker": False,
|
||||||
"message_type": None,
|
"message_type": None,
|
||||||
"received_timestamp": None,
|
"received_timestamp": None,
|
||||||
"read_timestamp": None,
|
"read_timestamp": None
|
||||||
"poll": None
|
|
||||||
},
|
},
|
||||||
"24691": {
|
"24691": {
|
||||||
"from_me": False,
|
"from_me": False,
|
||||||
@@ -132,8 +128,7 @@ chat_data_merged = {
|
|||||||
"sticker": False,
|
"sticker": False,
|
||||||
"message_type": None,
|
"message_type": None,
|
||||||
"received_timestamp": None,
|
"received_timestamp": None,
|
||||||
"read_timestamp": None,
|
"read_timestamp": None
|
||||||
"poll": None
|
|
||||||
},
|
},
|
||||||
"24692": {
|
"24692": {
|
||||||
"from_me": False,
|
"from_me": False,
|
||||||
@@ -154,8 +149,7 @@ chat_data_merged = {
|
|||||||
"sticker": False,
|
"sticker": False,
|
||||||
"message_type": None,
|
"message_type": None,
|
||||||
"received_timestamp": None,
|
"received_timestamp": None,
|
||||||
"read_timestamp": None,
|
"read_timestamp": None
|
||||||
"poll": None
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
# from contacts_names_from_vcards import readVCardsFile
|
# from contacts_names_from_vcards import readVCardsFile
|
||||||
|
|
||||||
import os
|
import os
|
||||||
from Whatsapp_Chat_Exporter.vcards_contacts import normalize_number, read_vcards_file, get_vcard_value
|
from Whatsapp_Chat_Exporter.vcards_contacts import normalize_number, read_vcards_file
|
||||||
|
|
||||||
|
|
||||||
def test_readVCardsFile():
|
def test_readVCardsFile():
|
||||||
@@ -17,7 +17,7 @@ def test_readVCardsFile():
|
|||||||
# Print the count and the name
|
# Print the count and the name
|
||||||
print(f"{count}. {name}")
|
print(f"{count}. {name}")
|
||||||
print(data)
|
print(data)
|
||||||
assert len(data) == 8
|
assert len(data) == 6
|
||||||
# Test simple contact name
|
# Test simple contact name
|
||||||
assert data[0][1] == "Sample Contact"
|
assert data[0][1] == "Sample Contact"
|
||||||
# Test complex name
|
# Test complex name
|
||||||
@@ -30,31 +30,6 @@ def test_readVCardsFile():
|
|||||||
assert data[4][1] == "James Peacock Elementary"
|
assert data[4][1] == "James Peacock Elementary"
|
||||||
# Test business entry using ORG but not F/FN
|
# Test business entry using ORG but not F/FN
|
||||||
assert data[5][1] == "AAA Car Service"
|
assert data[5][1] == "AAA Car Service"
|
||||||
# Test grouped entry
|
|
||||||
assert data[6][1] == "Racing Team (1)"
|
|
||||||
assert data[7][1] == "Racing Team (2)"
|
|
||||||
|
|
||||||
|
|
||||||
def test_grouping_mechanism():
|
|
||||||
no_group_vcf = """
|
|
||||||
BEGIN:VCARD
|
|
||||||
VERSION:2.1
|
|
||||||
TEL;CELL:7777777778
|
|
||||||
TEL;CELL:7777777779
|
|
||||||
TEL;CELL:7777777780
|
|
||||||
ORG:Racing Team
|
|
||||||
END:VCARD"""
|
|
||||||
group_vcf = """
|
|
||||||
BEGIN:VCARD
|
|
||||||
VERSION:2.1
|
|
||||||
item1.TEL;CELL:7777777778
|
|
||||||
item2.TEL;CELL:7777777779
|
|
||||||
item3.TEL;CELL:7777777780
|
|
||||||
ORG:Racing Team
|
|
||||||
END:VCARD"""
|
|
||||||
assert get_vcard_value(no_group_vcf, "TEL") == ["7777777778", "7777777779", "7777777780"]
|
|
||||||
assert get_vcard_value(group_vcf, "TEL") == ["7777777778", "7777777779", "7777777780"]
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def test_create_number_to_name_dicts():
|
def test_create_number_to_name_dicts():
|
||||||
|
|||||||
Reference in New Issue
Block a user