mirror of
https://github.com/KnugiHK/WhatsApp-Chat-Exporter.git
synced 2026-05-30 04:48:50 +00:00
Implement an on-the-fly fix of dot-ending files (#185)
This commit is contained in:
@@ -294,6 +294,10 @@ def setup_argument_parser() -> ArgumentParser:
|
|||||||
"--no-banner", dest="no_banner", default=False, action='store_true',
|
"--no-banner", dest="no_banner", default=False, action='store_true',
|
||||||
help="Do not show the banner"
|
help="Do not show the banner"
|
||||||
)
|
)
|
||||||
|
misc_group.add_argument(
|
||||||
|
"--fix-dot-files", dest="fix_dot_files", default=False, action='store_true',
|
||||||
|
help="Fix files with a dot at the end of their name (allowing the outputs be stored in FAT filesystems)"
|
||||||
|
)
|
||||||
|
|
||||||
return parser
|
return parser
|
||||||
|
|
||||||
@@ -557,7 +561,7 @@ def process_messages(args, data: ChatCollection) -> None:
|
|||||||
# Process media
|
# Process media
|
||||||
message_handler.media(
|
message_handler.media(
|
||||||
db, data, args.media, args.filter_date,
|
db, data, args.media, args.filter_date,
|
||||||
filter_chat, args.filter_empty, args.separate_media
|
filter_chat, args.filter_empty, args.separate_media, args.fix_dot_files
|
||||||
)
|
)
|
||||||
|
|
||||||
# Process vcards
|
# Process vcards
|
||||||
|
|||||||
@@ -485,7 +485,7 @@ def _format_message_text(text):
|
|||||||
return text
|
return text
|
||||||
|
|
||||||
|
|
||||||
def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=True):
|
def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=True, fix_dot_files=False):
|
||||||
"""
|
"""
|
||||||
Process WhatsApp media files from the database.
|
Process WhatsApp media files from the database.
|
||||||
|
|
||||||
@@ -513,7 +513,7 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa
|
|||||||
|
|
||||||
with tqdm(total=total_row_number, desc="Processing media", unit="media", leave=False) as pbar:
|
with tqdm(total=total_row_number, desc="Processing media", unit="media", leave=False) as pbar:
|
||||||
while (content := _fetch_row_safely(content_cursor)) is not None:
|
while (content := _fetch_row_safely(content_cursor)) is not None:
|
||||||
_process_single_media(data, content, media_folder, mime, separate_media)
|
_process_single_media(data, content, media_folder, mime, separate_media, fix_dot_files)
|
||||||
pbar.update(1)
|
pbar.update(1)
|
||||||
total_time = pbar.format_dict['elapsed']
|
total_time = pbar.format_dict['elapsed']
|
||||||
logger.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}{CLEAR_LINE}")
|
logger.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}{CLEAR_LINE}")
|
||||||
@@ -641,7 +641,7 @@ def _get_media_cursor_new(cursor, filter_empty, filter_date, filter_chat):
|
|||||||
return cursor
|
return cursor
|
||||||
|
|
||||||
|
|
||||||
def _process_single_media(data, content, media_folder, mime, separate_media):
|
def _process_single_media(data, content, media_folder, mime, separate_media, fix_dot_files=False):
|
||||||
"""Process a single media file."""
|
"""Process a single media file."""
|
||||||
file_path = f"{media_folder}/{content['file_path']}"
|
file_path = f"{media_folder}/{content['file_path']}"
|
||||||
current_chat = data.get_chat(content["key_remote_jid"])
|
current_chat = data.get_chat(content["key_remote_jid"])
|
||||||
@@ -649,8 +649,6 @@ def _process_single_media(data, content, media_folder, mime, separate_media):
|
|||||||
message.media = True
|
message.media = True
|
||||||
|
|
||||||
if os.path.isfile(file_path):
|
if os.path.isfile(file_path):
|
||||||
message.data = file_path
|
|
||||||
|
|
||||||
# Set mime type
|
# Set mime type
|
||||||
if content["mime_type"] is None:
|
if content["mime_type"] is None:
|
||||||
guess = mime.guess_type(file_path)[0]
|
guess = mime.guess_type(file_path)[0]
|
||||||
@@ -661,6 +659,16 @@ def _process_single_media(data, content, media_folder, mime, separate_media):
|
|||||||
else:
|
else:
|
||||||
message.mime = content["mime_type"]
|
message.mime = content["mime_type"]
|
||||||
|
|
||||||
|
if fix_dot_files and file_path.endswith("."):
|
||||||
|
extension = mime.guess_extension(message.mime)
|
||||||
|
if message.mime == "application/octet-stream" or not extension:
|
||||||
|
new_file_path = file_path[:-1]
|
||||||
|
else:
|
||||||
|
extension = mime.guess_extension(message.mime)
|
||||||
|
new_file_path = file_path[:-1] + extension
|
||||||
|
os.rename(file_path, new_file_path)
|
||||||
|
file_path = new_file_path
|
||||||
|
|
||||||
# Copy media to separate folder if needed
|
# Copy media to separate folder if needed
|
||||||
if separate_media:
|
if separate_media:
|
||||||
chat_display_name = safe_name(current_chat.name or message.sender
|
chat_display_name = safe_name(current_chat.name or message.sender
|
||||||
@@ -671,6 +679,8 @@ def _process_single_media(data, content, media_folder, mime, separate_media):
|
|||||||
new_path = os.path.join(new_folder, current_filename)
|
new_path = os.path.join(new_folder, current_filename)
|
||||||
shutil.copy2(file_path, new_path)
|
shutil.copy2(file_path, new_path)
|
||||||
message.data = new_path
|
message.data = new_path
|
||||||
|
else:
|
||||||
|
message.data = file_path
|
||||||
else:
|
else:
|
||||||
message.data = "The media is missing"
|
message.data = "The media is missing"
|
||||||
message.mime = "media"
|
message.mime = "media"
|
||||||
|
|||||||
@@ -312,7 +312,7 @@ def process_message_text(message, content):
|
|||||||
message.data = msg
|
message.data = msg
|
||||||
|
|
||||||
|
|
||||||
def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=False):
|
def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=False, fix_dot_files=False):
|
||||||
"""Process media files from WhatsApp messages."""
|
"""Process media files from WhatsApp messages."""
|
||||||
c = db.cursor()
|
c = db.cursor()
|
||||||
|
|
||||||
@@ -370,13 +370,13 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa
|
|||||||
mime = MimeTypes()
|
mime = MimeTypes()
|
||||||
with tqdm(total=total_row_number, desc="Processing media", unit="media", leave=False) as pbar:
|
with tqdm(total=total_row_number, desc="Processing media", unit="media", leave=False) as pbar:
|
||||||
while (content := c.fetchone()) is not None:
|
while (content := c.fetchone()) is not None:
|
||||||
process_media_item(content, data, media_folder, mime, separate_media)
|
process_media_item(content, data, media_folder, mime, separate_media, fix_dot_files)
|
||||||
pbar.update(1)
|
pbar.update(1)
|
||||||
total_time = pbar.format_dict['elapsed']
|
total_time = pbar.format_dict['elapsed']
|
||||||
logger.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}{CLEAR_LINE}")
|
logger.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}{CLEAR_LINE}")
|
||||||
|
|
||||||
|
|
||||||
def process_media_item(content, data, media_folder, mime, separate_media):
|
def process_media_item(content, data, media_folder, mime, separate_media, fix_dot_files=False):
|
||||||
"""Process a single media item."""
|
"""Process a single media item."""
|
||||||
file_path = f"{media_folder}/Message/{content['ZMEDIALOCALPATH']}"
|
file_path = f"{media_folder}/Message/{content['ZMEDIALOCALPATH']}"
|
||||||
current_chat = data.get_chat(content["ZCONTACTJID"])
|
current_chat = data.get_chat(content["ZCONTACTJID"])
|
||||||
@@ -387,8 +387,6 @@ def process_media_item(content, data, media_folder, mime, separate_media):
|
|||||||
current_chat.media_base = media_folder + "/"
|
current_chat.media_base = media_folder + "/"
|
||||||
|
|
||||||
if os.path.isfile(file_path):
|
if os.path.isfile(file_path):
|
||||||
message.data = '/'.join(file_path.split("/")[1:])
|
|
||||||
|
|
||||||
# Set MIME type
|
# Set MIME type
|
||||||
if content["ZVCARDSTRING"] is None:
|
if content["ZVCARDSTRING"] is None:
|
||||||
guess = mime.guess_type(file_path)[0]
|
guess = mime.guess_type(file_path)[0]
|
||||||
@@ -396,6 +394,16 @@ def process_media_item(content, data, media_folder, mime, separate_media):
|
|||||||
else:
|
else:
|
||||||
message.mime = content["ZVCARDSTRING"]
|
message.mime = content["ZVCARDSTRING"]
|
||||||
|
|
||||||
|
if fix_dot_files and file_path.endswith("."):
|
||||||
|
extension = mime.guess_extension(message.mime)
|
||||||
|
if message.mime == "application/octet-stream" or not extension:
|
||||||
|
new_file_path = file_path[:-1]
|
||||||
|
else:
|
||||||
|
extension = mime.guess_extension(message.mime)
|
||||||
|
new_file_path = file_path[:-1] + extension
|
||||||
|
os.rename(file_path, new_file_path)
|
||||||
|
file_path = new_file_path
|
||||||
|
|
||||||
# Handle separate media option
|
# Handle separate media option
|
||||||
if separate_media:
|
if separate_media:
|
||||||
chat_display_name = safe_name(
|
chat_display_name = safe_name(
|
||||||
@@ -405,7 +413,9 @@ def process_media_item(content, data, media_folder, mime, separate_media):
|
|||||||
Path(new_folder).mkdir(parents=True, exist_ok=True)
|
Path(new_folder).mkdir(parents=True, exist_ok=True)
|
||||||
new_path = os.path.join(new_folder, current_filename)
|
new_path = os.path.join(new_folder, current_filename)
|
||||||
shutil.copy2(file_path, new_path)
|
shutil.copy2(file_path, new_path)
|
||||||
message.data = '/'.join(new_path.split("\\")[1:])
|
message.data = '/'.join(new_path.split("/")[1:])
|
||||||
|
else:
|
||||||
|
message.data = '/'.join(file_path.split("/")[1:])
|
||||||
else:
|
else:
|
||||||
# Handle missing media
|
# Handle missing media
|
||||||
message.data = "The media is missing"
|
message.data = "The media is missing"
|
||||||
|
|||||||
Reference in New Issue
Block a user