mirror of
https://github.com/TagStudioDev/TagStudio.git
synced 2026-01-29 14:20:48 +00:00
Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
50fc325d80 | ||
|
|
140645943c | ||
|
|
2fc0dd03aa | ||
|
|
90b9af48e3 | ||
|
|
b2dbc5722b | ||
|
|
6490cc905d | ||
|
|
dfa4079b23 | ||
|
|
6ff7303321 | ||
|
|
4d405b5d77 | ||
|
|
bf8816f715 | ||
|
|
8c9b04d1ec | ||
|
|
5995e4d416 |
3
.github/workflows/apprun.yaml
vendored
3
.github/workflows/apprun.yaml
vendored
@@ -33,7 +33,8 @@ jobs:
|
||||
libxcb-xinerama0 \
|
||||
libopengl0 \
|
||||
libxcb-cursor0 \
|
||||
libpulse0
|
||||
libpulse0 \
|
||||
ffmpeg
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
VERSION: str = "9.4.0" # Major.Minor.Patch
|
||||
VERSION: str = "9.4.2" # Major.Minor.Patch
|
||||
VERSION_BRANCH: str = "" # Usually "" or "Pre-Release"
|
||||
|
||||
# The folder & file names where TagStudio keeps its data relative to a library.
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
import time
|
||||
import traceback
|
||||
import xml.etree.ElementTree as ET
|
||||
@@ -479,7 +480,7 @@ class Library:
|
||||
|
||||
return tag_list
|
||||
|
||||
def open_library(self, path: str | Path) -> int:
|
||||
def open_library(self, path: str | Path, is_path_file: bool = False) -> int:
|
||||
"""
|
||||
Opens a TagStudio v9+ Library.
|
||||
Returns 0 if library does not exist, 1 if successfully opened, 2 if corrupted.
|
||||
@@ -487,242 +488,264 @@ class Library:
|
||||
|
||||
return_code: int = 2
|
||||
|
||||
_path: Path = self._fix_lib_path(path)
|
||||
_path: Path = self._fix_lib_path(path) if not is_path_file else Path(path)
|
||||
lib_path: Path = (
|
||||
_path / TS_FOLDER_NAME / "ts_library.json" if not is_path_file else _path
|
||||
)
|
||||
logging.info(f"[LIBRARY] Library Save File Loaded From: {lib_path}")
|
||||
|
||||
if (_path / TS_FOLDER_NAME / "ts_library.json").exists():
|
||||
try:
|
||||
with open(
|
||||
_path / TS_FOLDER_NAME / "ts_library.json",
|
||||
"r",
|
||||
encoding="utf-8",
|
||||
) as file:
|
||||
json_dump: JsonLibary = ujson.load(file)
|
||||
self.library_dir = Path(_path)
|
||||
self.verify_ts_folders()
|
||||
major, minor, patch = json_dump["ts-version"].split(".")
|
||||
# if (lib_path).exists():
|
||||
# json_dump: JsonLibary = None
|
||||
|
||||
# Load Extension List --------------------------------------
|
||||
start_time = time.time()
|
||||
if "ignored_extensions" in json_dump:
|
||||
self.ext_list = json_dump.get(
|
||||
"ignored_extensions", self.default_ext_exclude_list
|
||||
try:
|
||||
with open(
|
||||
lib_path,
|
||||
"r",
|
||||
encoding="utf-8",
|
||||
) as file:
|
||||
json_dump = ujson.load(file)
|
||||
|
||||
except (ujson.JSONDecodeError, FileNotFoundError):
|
||||
logging.info(
|
||||
"[LIBRARY][ERROR] Blank/Corrupted Library file found. Searching for Auto Backup..."
|
||||
)
|
||||
backup_folder: Path = (
|
||||
self._fix_lib_path(path) / TS_FOLDER_NAME / BACKUP_FOLDER_NAME
|
||||
)
|
||||
if backup_folder.exists():
|
||||
auto_backup: Path = None
|
||||
dir_obj = os.scandir(backup_folder)
|
||||
|
||||
for backup_file in dir_obj:
|
||||
if backup_file.is_file() and "ts_library_backup_auto" in str(
|
||||
backup_file
|
||||
):
|
||||
auto_backup = Path(backup_file)
|
||||
break
|
||||
|
||||
if auto_backup and "ts_library_backup_auto" not in str(path):
|
||||
logging.info(f"[LIBRARY] Loading Auto Backup: {auto_backup}")
|
||||
return self.open_library(auto_backup, is_path_file=True)
|
||||
|
||||
else:
|
||||
self.library_dir = self._fix_lib_path(path)
|
||||
logging.info(f"[LIBRARY] Library Save Target Directory: {self.library_dir}")
|
||||
self.verify_ts_folders()
|
||||
major, minor, patch = json_dump["ts-version"].split(".")
|
||||
|
||||
# Load Extension List --------------------------------------
|
||||
start_time = time.time()
|
||||
if "ignored_extensions" in json_dump:
|
||||
self.ext_list = json_dump.get(
|
||||
"ignored_extensions", self.default_ext_exclude_list
|
||||
)
|
||||
else:
|
||||
self.ext_list = json_dump.get("ext_list", self.default_ext_exclude_list)
|
||||
|
||||
# Sanitizes older lists (v9.2.1) that don't use leading periods.
|
||||
# Without this, existing lists (including default lists)
|
||||
# have to otherwise be updated by hand in order to restore
|
||||
# previous functionality.
|
||||
sanitized_list: list[str] = []
|
||||
for ext in self.ext_list:
|
||||
if not ext.startswith("."):
|
||||
ext = "." + ext
|
||||
sanitized_list.append(ext)
|
||||
self.ext_list = sanitized_list
|
||||
|
||||
self.is_exclude_list = json_dump.get("is_exclude_list", True)
|
||||
end_time = time.time()
|
||||
logging.info(
|
||||
f"[LIBRARY] Extension list loaded in {(end_time - start_time):.3f} seconds"
|
||||
)
|
||||
|
||||
# Parse Tags -----------------------------------------------
|
||||
if "tags" in json_dump.keys():
|
||||
start_time = time.time()
|
||||
|
||||
# Step 1: Verify default built-in tags are present.
|
||||
json_dump["tags"] = self.verify_default_tags(json_dump["tags"])
|
||||
|
||||
for tag in json_dump["tags"]:
|
||||
# Step 2: Create a Tag object and append it to the internal Tags list,
|
||||
# then map that Tag's ID to its index in the Tags list.
|
||||
|
||||
id = int(tag.get("id", 0))
|
||||
|
||||
# Don't load tags with duplicate IDs
|
||||
if id not in {t.id for t in self.tags}:
|
||||
if id >= self._next_tag_id:
|
||||
self._next_tag_id = id + 1
|
||||
|
||||
name = tag.get("name", "")
|
||||
shorthand = tag.get("shorthand", "")
|
||||
aliases = tag.get("aliases", [])
|
||||
subtag_ids = tag.get("subtag_ids", [])
|
||||
color = tag.get("color", "")
|
||||
|
||||
t = Tag(
|
||||
id=id,
|
||||
name=name,
|
||||
shorthand=shorthand,
|
||||
aliases=aliases,
|
||||
subtags_ids=subtag_ids,
|
||||
color=color,
|
||||
)
|
||||
|
||||
# NOTE: This does NOT use the add_tag_to_library() method!
|
||||
# That method is only used for Tags added at runtime.
|
||||
# This process uses the same inner methods, but waits until all of the
|
||||
# Tags are registered in the Tags list before creating the Tag clusters.
|
||||
self.tags.append(t)
|
||||
self._map_tag_id_to_index(t, -1)
|
||||
self._map_tag_strings_to_tag_id(t)
|
||||
else:
|
||||
self.ext_list = json_dump.get(
|
||||
"ext_list", self.default_ext_exclude_list
|
||||
)
|
||||
logging.info(f"[LIBRARY]Skipping Tag with duplicate ID: {tag}")
|
||||
|
||||
# Sanitizes older lists (v9.2.1) that don't use leading periods.
|
||||
# Without this, existing lists (including default lists)
|
||||
# have to otherwise be updated by hand in order to restore
|
||||
# previous functionality.
|
||||
sanitized_list: list[str] = []
|
||||
for ext in self.ext_list:
|
||||
if not ext.startswith("."):
|
||||
ext = "." + ext
|
||||
sanitized_list.append(ext)
|
||||
self.ext_list = sanitized_list
|
||||
# Step 3: Map each Tag's subtags together now that all Tag objects in it.
|
||||
for t in self.tags:
|
||||
self._map_tag_id_to_cluster(t)
|
||||
|
||||
self.is_exclude_list = json_dump.get("is_exclude_list", True)
|
||||
end_time = time.time()
|
||||
logging.info(
|
||||
f"[LIBRARY] Extension list loaded in {(end_time - start_time):.3f} seconds"
|
||||
end_time = time.time()
|
||||
logging.info(
|
||||
f"[LIBRARY] Tags loaded in {(end_time - start_time):.3f} seconds"
|
||||
)
|
||||
|
||||
# Parse Entries --------------------------------------------
|
||||
if entries := json_dump.get("entries"):
|
||||
start_time = time.time()
|
||||
for entry in entries:
|
||||
if "id" in entry:
|
||||
id = int(entry["id"])
|
||||
if id >= self._next_entry_id:
|
||||
self._next_entry_id = id + 1
|
||||
else:
|
||||
# Version 9.1.x+ Compatibility
|
||||
id = self._next_entry_id
|
||||
self._next_entry_id += 1
|
||||
|
||||
filename = entry.get("filename", "")
|
||||
e_path = entry.get("path", "")
|
||||
fields: list = []
|
||||
if "fields" in entry:
|
||||
# Cast JSON str keys to ints
|
||||
|
||||
for f in entry["fields"]:
|
||||
f[int(list(f.keys())[0])] = f[list(f.keys())[0]]
|
||||
del f[list(f.keys())[0]]
|
||||
fields = entry["fields"]
|
||||
|
||||
# Look through fields for legacy Collation data ----
|
||||
if int(major) >= 9 and int(minor) < 1:
|
||||
for f in fields:
|
||||
if self.get_field_attr(f, "type") == "collation":
|
||||
# NOTE: This legacy support will be removed in
|
||||
# a later version, probably 9.2.
|
||||
# Legacy Collation data present in v9.0.x
|
||||
# DATA SHAPE: {name: str, page: int}
|
||||
|
||||
# We'll do an inefficient linear search each
|
||||
# time to convert the legacy data.
|
||||
matched = False
|
||||
collation_id = -1
|
||||
for c in self.collations:
|
||||
if (
|
||||
c.title
|
||||
== self.get_field_attr(f, "content")["name"]
|
||||
):
|
||||
c.e_ids_and_pages.append(
|
||||
(
|
||||
id,
|
||||
int(
|
||||
self.get_field_attr(f, "content")[
|
||||
"page"
|
||||
]
|
||||
),
|
||||
)
|
||||
)
|
||||
matched = True
|
||||
collation_id = c.id
|
||||
if not matched:
|
||||
c = Collation(
|
||||
id=self._next_collation_id,
|
||||
title=self.get_field_attr(f, "content")["name"],
|
||||
e_ids_and_pages=[],
|
||||
sort_order="",
|
||||
)
|
||||
collation_id = self._next_collation_id
|
||||
self._next_collation_id += 1
|
||||
c.e_ids_and_pages.append(
|
||||
(
|
||||
id,
|
||||
int(
|
||||
self.get_field_attr(f, "content")[
|
||||
"page"
|
||||
]
|
||||
),
|
||||
)
|
||||
)
|
||||
self.collations.append(c)
|
||||
self._map_collation_id_to_index(c, -1)
|
||||
f_id = self.get_field_attr(f, "id")
|
||||
f.clear()
|
||||
f[int(f_id)] = collation_id
|
||||
# Collation Field data present in v9.1.x+
|
||||
# DATA SHAPE: int
|
||||
elif int(major) >= 9 and int(minor) >= 1:
|
||||
pass
|
||||
|
||||
e = Entry(
|
||||
id=int(id),
|
||||
filename=filename,
|
||||
path=e_path,
|
||||
fields=fields,
|
||||
)
|
||||
self.entries.append(e)
|
||||
self._map_entry_id_to_index(e, -1)
|
||||
|
||||
end_time = time.time()
|
||||
logging.info(
|
||||
f"[LIBRARY] Entries loaded in {(end_time - start_time):.3f} seconds"
|
||||
)
|
||||
|
||||
# Parse Collations -----------------------------------------
|
||||
if "collations" in json_dump.keys():
|
||||
start_time = time.time()
|
||||
for collation in json_dump["collations"]:
|
||||
# Step 1: Create a Collation object and append it to
|
||||
# the internal Collations list, then map that
|
||||
# Collation's ID to its index in the Collations list.
|
||||
|
||||
id = int(collation.get("id", 0))
|
||||
if id >= self._next_collation_id:
|
||||
self._next_collation_id = id + 1
|
||||
|
||||
title = collation.get("title", "")
|
||||
e_ids_and_pages = collation.get("e_ids_and_pages", [])
|
||||
sort_order = collation.get("sort_order", "")
|
||||
cover_id = collation.get("cover_id", -1)
|
||||
|
||||
c = Collation(
|
||||
id=id,
|
||||
title=title,
|
||||
e_ids_and_pages=e_ids_and_pages,
|
||||
sort_order=sort_order,
|
||||
cover_id=cover_id,
|
||||
)
|
||||
|
||||
# Parse Tags -----------------------------------------------
|
||||
if "tags" in json_dump.keys():
|
||||
start_time = time.time()
|
||||
# NOTE: This does NOT use the add_collation_to_library() method
|
||||
# which is intended to be used at runtime. However, there is
|
||||
# currently no reason why it couldn't be used here, and is
|
||||
# instead not used for consistency.
|
||||
self.collations.append(c)
|
||||
self._map_collation_id_to_index(c, -1)
|
||||
end_time = time.time()
|
||||
logging.info(
|
||||
f"[LIBRARY] Collations loaded in {(end_time - start_time):.3f} seconds"
|
||||
)
|
||||
|
||||
# Step 1: Verify default built-in tags are present.
|
||||
json_dump["tags"] = self.verify_default_tags(json_dump["tags"])
|
||||
|
||||
for tag in json_dump["tags"]:
|
||||
# Step 2: Create a Tag object and append it to the internal Tags list,
|
||||
# then map that Tag's ID to its index in the Tags list.
|
||||
|
||||
id = int(tag.get("id", 0))
|
||||
|
||||
# Don't load tags with duplicate IDs
|
||||
if id not in {t.id for t in self.tags}:
|
||||
if id >= self._next_tag_id:
|
||||
self._next_tag_id = id + 1
|
||||
|
||||
name = tag.get("name", "")
|
||||
shorthand = tag.get("shorthand", "")
|
||||
aliases = tag.get("aliases", [])
|
||||
subtag_ids = tag.get("subtag_ids", [])
|
||||
color = tag.get("color", "")
|
||||
|
||||
t = Tag(
|
||||
id=id,
|
||||
name=name,
|
||||
shorthand=shorthand,
|
||||
aliases=aliases,
|
||||
subtags_ids=subtag_ids,
|
||||
color=color,
|
||||
)
|
||||
|
||||
# NOTE: This does NOT use the add_tag_to_library() method!
|
||||
# That method is only used for Tags added at runtime.
|
||||
# This process uses the same inner methods, but waits until all of the
|
||||
# Tags are registered in the Tags list before creating the Tag clusters.
|
||||
self.tags.append(t)
|
||||
self._map_tag_id_to_index(t, -1)
|
||||
self._map_tag_strings_to_tag_id(t)
|
||||
else:
|
||||
logging.info(
|
||||
f"[LIBRARY]Skipping Tag with duplicate ID: {tag}"
|
||||
)
|
||||
|
||||
# Step 3: Map each Tag's subtags together now that all Tag objects in it.
|
||||
for t in self.tags:
|
||||
self._map_tag_id_to_cluster(t)
|
||||
|
||||
end_time = time.time()
|
||||
logging.info(
|
||||
f"[LIBRARY] Tags loaded in {(end_time - start_time):.3f} seconds"
|
||||
)
|
||||
|
||||
# Parse Entries --------------------------------------------
|
||||
if entries := json_dump.get("entries"):
|
||||
start_time = time.time()
|
||||
for entry in entries:
|
||||
if "id" in entry:
|
||||
id = int(entry["id"])
|
||||
if id >= self._next_entry_id:
|
||||
self._next_entry_id = id + 1
|
||||
else:
|
||||
# Version 9.1.x+ Compatibility
|
||||
id = self._next_entry_id
|
||||
self._next_entry_id += 1
|
||||
|
||||
filename = entry.get("filename", "")
|
||||
e_path = entry.get("path", "")
|
||||
fields: list = []
|
||||
if "fields" in entry:
|
||||
# Cast JSON str keys to ints
|
||||
|
||||
for f in entry["fields"]:
|
||||
f[int(list(f.keys())[0])] = f[list(f.keys())[0]]
|
||||
del f[list(f.keys())[0]]
|
||||
fields = entry["fields"]
|
||||
|
||||
# Look through fields for legacy Collation data ----
|
||||
if int(major) >= 9 and int(minor) < 1:
|
||||
for f in fields:
|
||||
if self.get_field_attr(f, "type") == "collation":
|
||||
# NOTE: This legacy support will be removed in
|
||||
# a later version, probably 9.2.
|
||||
# Legacy Collation data present in v9.0.x
|
||||
# DATA SHAPE: {name: str, page: int}
|
||||
|
||||
# We'll do an inefficient linear search each
|
||||
# time to convert the legacy data.
|
||||
matched = False
|
||||
collation_id = -1
|
||||
for c in self.collations:
|
||||
if (
|
||||
c.title
|
||||
== self.get_field_attr(f, "content")[
|
||||
"name"
|
||||
]
|
||||
):
|
||||
c.e_ids_and_pages.append(
|
||||
(
|
||||
id,
|
||||
int(
|
||||
self.get_field_attr(
|
||||
f, "content"
|
||||
)["page"]
|
||||
),
|
||||
)
|
||||
)
|
||||
matched = True
|
||||
collation_id = c.id
|
||||
if not matched:
|
||||
c = Collation(
|
||||
id=self._next_collation_id,
|
||||
title=self.get_field_attr(f, "content")[
|
||||
"name"
|
||||
],
|
||||
e_ids_and_pages=[],
|
||||
sort_order="",
|
||||
)
|
||||
collation_id = self._next_collation_id
|
||||
self._next_collation_id += 1
|
||||
c.e_ids_and_pages.append(
|
||||
(
|
||||
id,
|
||||
int(
|
||||
self.get_field_attr(
|
||||
f, "content"
|
||||
)["page"]
|
||||
),
|
||||
)
|
||||
)
|
||||
self.collations.append(c)
|
||||
self._map_collation_id_to_index(c, -1)
|
||||
f_id = self.get_field_attr(f, "id")
|
||||
f.clear()
|
||||
f[int(f_id)] = collation_id
|
||||
# Collation Field data present in v9.1.x+
|
||||
# DATA SHAPE: int
|
||||
elif int(major) >= 9 and int(minor) >= 1:
|
||||
pass
|
||||
|
||||
e = Entry(
|
||||
id=int(id),
|
||||
filename=filename,
|
||||
path=e_path,
|
||||
fields=fields,
|
||||
)
|
||||
self.entries.append(e)
|
||||
self._map_entry_id_to_index(e, -1)
|
||||
|
||||
end_time = time.time()
|
||||
logging.info(
|
||||
f"[LIBRARY] Entries loaded in {(end_time - start_time):.3f} seconds"
|
||||
)
|
||||
|
||||
# Parse Collations -----------------------------------------
|
||||
if "collations" in json_dump.keys():
|
||||
start_time = time.time()
|
||||
for collation in json_dump["collations"]:
|
||||
# Step 1: Create a Collation object and append it to
|
||||
# the internal Collations list, then map that
|
||||
# Collation's ID to its index in the Collations list.
|
||||
|
||||
id = int(collation.get("id", 0))
|
||||
if id >= self._next_collation_id:
|
||||
self._next_collation_id = id + 1
|
||||
|
||||
title = collation.get("title", "")
|
||||
e_ids_and_pages = collation.get("e_ids_and_pages", [])
|
||||
sort_order = collation.get("sort_order", "")
|
||||
cover_id = collation.get("cover_id", -1)
|
||||
|
||||
c = Collation(
|
||||
id=id,
|
||||
title=title,
|
||||
e_ids_and_pages=e_ids_and_pages, # type: ignore
|
||||
sort_order=sort_order,
|
||||
cover_id=cover_id,
|
||||
)
|
||||
|
||||
# NOTE: This does NOT use the add_collation_to_library() method
|
||||
# which is intended to be used at runtime. However, there is
|
||||
# currently no reason why it couldn't be used here, and is
|
||||
# instead not used for consistency.
|
||||
self.collations.append(c)
|
||||
self._map_collation_id_to_index(c, -1)
|
||||
end_time = time.time()
|
||||
logging.info(
|
||||
f"[LIBRARY] Collations loaded in {(end_time - start_time):.3f} seconds"
|
||||
)
|
||||
|
||||
return_code = 1
|
||||
except ujson.JSONDecodeError:
|
||||
logging.info("[LIBRARY][ERROR]: Empty JSON file!")
|
||||
return_code = 1
|
||||
self.save_library_backup_to_disk(is_auto=True)
|
||||
|
||||
# If the Library is loaded, continue other processes.
|
||||
if return_code == 1:
|
||||
@@ -736,7 +759,9 @@ class Library:
|
||||
"""Maps a full filepath to its corresponding Entry's ID."""
|
||||
self.filename_to_entry_id_map.clear()
|
||||
for entry in self.entries:
|
||||
self.filename_to_entry_id_map[(entry.path / entry.filename)] = entry.id
|
||||
self.filename_to_entry_id_map[
|
||||
(self.library_dir / entry.path / entry.filename)
|
||||
] = entry.id
|
||||
|
||||
# def _map_filenames_to_entry_ids(self):
|
||||
# """Maps the file paths of entries to their index in the library list."""
|
||||
@@ -795,13 +820,13 @@ class Library:
|
||||
filename = "ts_library.json"
|
||||
|
||||
self.verify_ts_folders()
|
||||
|
||||
json_library: JsonLibary = self.to_json()
|
||||
with open(
|
||||
self.library_dir / TS_FOLDER_NAME / filename, "w", encoding="utf-8"
|
||||
) as outfile:
|
||||
outfile.flush()
|
||||
ujson.dump(
|
||||
self.to_json(),
|
||||
json_library,
|
||||
outfile,
|
||||
ensure_ascii=False,
|
||||
escape_forward_slashes=False,
|
||||
@@ -812,16 +837,22 @@ class Library:
|
||||
f"[LIBRARY] Library saved to disk in {(end_time - start_time):.3f} seconds"
|
||||
)
|
||||
|
||||
def save_library_backup_to_disk(self) -> str:
|
||||
def save_library_backup_to_disk(self, is_auto: bool = False) -> str:
|
||||
"""
|
||||
Saves a backup file of the Library to disk at the default TagStudio folder location.
|
||||
Returns the filename used, including the date and time."""
|
||||
|
||||
logging.info(f"[LIBRARY] Saving Library Backup to Disk...")
|
||||
start_time = time.time()
|
||||
filename = f'ts_library_backup_{datetime.datetime.utcnow().strftime("%F_%T").replace(":", "")}.json'
|
||||
|
||||
filename = (
|
||||
"ts_library_backup_auto.json"
|
||||
if is_auto
|
||||
else f'ts_library_backup_{datetime.datetime.utcnow().strftime("%F_%T").replace(":", "")}.json'
|
||||
)
|
||||
|
||||
self.verify_ts_folders()
|
||||
json_library: JsonLibary = self.to_json()
|
||||
with open(
|
||||
self.library_dir / TS_FOLDER_NAME / BACKUP_FOLDER_NAME / filename,
|
||||
"w",
|
||||
@@ -829,7 +860,7 @@ class Library:
|
||||
) as outfile:
|
||||
outfile.flush()
|
||||
ujson.dump(
|
||||
self.to_json(),
|
||||
json_library,
|
||||
outfile,
|
||||
ensure_ascii=False,
|
||||
escape_forward_slashes=False,
|
||||
@@ -883,54 +914,72 @@ class Library:
|
||||
|
||||
# Scans the directory for files, keeping track of:
|
||||
# - Total file count
|
||||
# - Files without library entries
|
||||
# for type in TYPES:
|
||||
start_time = time.time()
|
||||
# - Files without Library entries
|
||||
start_time_total = time.time()
|
||||
start_time_loop = time.time()
|
||||
ext_set = set(self.ext_list) # Should be slightly faster
|
||||
for f in self.library_dir.glob("**/*"):
|
||||
try:
|
||||
if (
|
||||
"$RECYCLE.BIN" not in f.parts
|
||||
and TS_FOLDER_NAME not in f.parts
|
||||
and "tagstudio_thumbs" not in f.parts
|
||||
and not f.is_dir()
|
||||
):
|
||||
if f.suffix.lower() not in self.ext_list and self.is_exclude_list:
|
||||
self.dir_file_count += 1
|
||||
file = f.relative_to(self.library_dir)
|
||||
if file not in self.filename_to_entry_id_map:
|
||||
self.files_not_in_library.append(file)
|
||||
elif f.suffix.lower() in self.ext_list and not self.is_exclude_list:
|
||||
self.dir_file_count += 1
|
||||
file = f.relative_to(self.library_dir)
|
||||
try:
|
||||
_ = self.filename_to_entry_id_map[file]
|
||||
except KeyError:
|
||||
# print(file)
|
||||
self.files_not_in_library.append(file)
|
||||
except PermissionError:
|
||||
logging.info(
|
||||
f"The File/Folder {f} cannot be accessed, because it requires higher permission!"
|
||||
)
|
||||
end_time = time.time()
|
||||
end_time_loop = time.time()
|
||||
# Yield output every 1/30 of a second
|
||||
if (end_time - start_time) > 0.034:
|
||||
if (end_time_loop - start_time_loop) > 0.034:
|
||||
yield self.dir_file_count
|
||||
start_time = time.time()
|
||||
# Sorts the files by date modified, descending.
|
||||
if len(self.files_not_in_library) <= 100000:
|
||||
start_time_loop = time.time()
|
||||
try:
|
||||
self.files_not_in_library = sorted(
|
||||
self.files_not_in_library,
|
||||
key=lambda t: -(self.library_dir / t).stat().st_ctime,
|
||||
)
|
||||
# Skip this file if it should be excluded
|
||||
ext: str = f.suffix.lower()
|
||||
if (ext in ext_set and self.is_exclude_list) or (
|
||||
ext not in ext_set and not self.is_exclude_list
|
||||
):
|
||||
continue
|
||||
|
||||
# Finish if the file/path is already mapped in the Library
|
||||
if self.filename_to_entry_id_map.get(f) is not None:
|
||||
# No other checks are required.
|
||||
self.dir_file_count += 1
|
||||
continue
|
||||
|
||||
# If the file is new, check for validity
|
||||
if (
|
||||
"$RECYCLE.BIN" in f.parts
|
||||
or TS_FOLDER_NAME in f.parts
|
||||
or "tagstudio_thumbs" in f.parts
|
||||
or f.is_dir()
|
||||
):
|
||||
continue
|
||||
|
||||
# Add the validated new file to the Library
|
||||
self.dir_file_count += 1
|
||||
self.files_not_in_library.append(f)
|
||||
|
||||
except PermissionError:
|
||||
logging.info(f'[LIBRARY] Cannot access "{f}": PermissionError')
|
||||
|
||||
yield self.dir_file_count
|
||||
end_time_total = time.time()
|
||||
logging.info(
|
||||
f"[LIBRARY] Scanned directories in {(end_time_total - start_time_total):.3f} seconds"
|
||||
)
|
||||
# Sorts the files by date modified, descending
|
||||
if len(self.files_not_in_library) <= 150000:
|
||||
try:
|
||||
if platform.system() == "Windows" or platform.system() == "Darwin":
|
||||
self.files_not_in_library = sorted(
|
||||
self.files_not_in_library,
|
||||
key=lambda t: -(t).stat().st_birthtime, # type: ignore[attr-defined]
|
||||
)
|
||||
else:
|
||||
self.files_not_in_library = sorted(
|
||||
self.files_not_in_library,
|
||||
key=lambda t: -(t).stat().st_ctime,
|
||||
)
|
||||
except (FileExistsError, FileNotFoundError):
|
||||
print(
|
||||
"[LIBRARY] [ERROR] Couldn't sort files, some were moved during the scanning/sorting process."
|
||||
logging.info(
|
||||
"[LIBRARY][ERROR] Couldn't sort files, some were moved during the scanning/sorting process."
|
||||
)
|
||||
pass
|
||||
else:
|
||||
print(
|
||||
"[LIBRARY][INFO] Not bothering to sort files because there's OVER 100,000! Better sorting methods will be added in the future."
|
||||
logging.info(
|
||||
"[LIBRARY][INFO] Not bothering to sort files because there's OVER 150,000! Better sorting methods will be added in the future."
|
||||
)
|
||||
|
||||
def refresh_missing_files(self):
|
||||
@@ -950,7 +999,7 @@ class Library:
|
||||
# Step [1/2]:
|
||||
# Remove this Entry from the Entries list.
|
||||
entry = self.get_entry(entry_id)
|
||||
path = entry.path / entry.filename
|
||||
path = self.library_dir / entry.path / entry.filename
|
||||
# logging.info(f'Removing path: {path}')
|
||||
|
||||
del self.filename_to_entry_id_map[path]
|
||||
@@ -1080,8 +1129,8 @@ class Library:
|
||||
)
|
||||
)
|
||||
for match in matches:
|
||||
file_1 = files[match[0]].relative_to(self.library_dir)
|
||||
file_2 = files[match[1]].relative_to(self.library_dir)
|
||||
file_1 = files[match[0]]
|
||||
file_2 = files[match[1]]
|
||||
|
||||
if (
|
||||
file_1 in self.filename_to_entry_id_map.keys()
|
||||
@@ -1282,8 +1331,7 @@ class Library:
|
||||
"""Adds files from the `files_not_in_library` list to the Library as Entries. Returns list of added indices."""
|
||||
new_ids: list[int] = []
|
||||
for file in self.files_not_in_library:
|
||||
path = Path(file)
|
||||
# print(os.path.split(file))
|
||||
path = Path(*file.parts[len(self.library_dir.parts) :])
|
||||
entry = Entry(
|
||||
id=self._next_entry_id, filename=path.name, path=path.parent, fields=[]
|
||||
)
|
||||
@@ -1294,8 +1342,6 @@ class Library:
|
||||
self.files_not_in_library.clear()
|
||||
return new_ids
|
||||
|
||||
self.files_not_in_library.clear()
|
||||
|
||||
def get_entry(self, entry_id: int) -> Entry:
|
||||
"""Returns an Entry object given an Entry ID."""
|
||||
return self.entries[self._entry_id_to_index_map[int(entry_id)]]
|
||||
@@ -1316,9 +1362,7 @@ class Library:
|
||||
"""Returns an Entry ID given the full filepath it points to."""
|
||||
try:
|
||||
if self.entries:
|
||||
return self.filename_to_entry_id_map[
|
||||
Path(filename).relative_to(self.library_dir)
|
||||
]
|
||||
return self.filename_to_entry_id_map[filename]
|
||||
except KeyError:
|
||||
return -1
|
||||
|
||||
|
||||
@@ -183,6 +183,8 @@ class MediaCategories:
|
||||
".crw",
|
||||
".dng",
|
||||
".nef",
|
||||
".orf",
|
||||
".raf",
|
||||
".raw",
|
||||
".rw2",
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# type: ignore
|
||||
# Copyright (C) 2022 James Robert (jiaaro).
|
||||
# Licensed under the MIT License.
|
||||
# Vendored from ffmpeg-python and ffmpeg-python PR#790 by amamic1803
|
||||
# Vendored from pydub
|
||||
|
||||
from __future__ import division
|
||||
|
||||
@@ -729,7 +729,10 @@ class _AudioSegment(object):
|
||||
info = None
|
||||
else:
|
||||
# PATCHED
|
||||
info = _mediainfo_json(orig_file, read_ahead_limit=read_ahead_limit)
|
||||
try:
|
||||
info = _mediainfo_json(orig_file, read_ahead_limit=read_ahead_limit)
|
||||
except FileNotFoundError:
|
||||
raise ChildProcessError
|
||||
if info:
|
||||
audio_streams = [x for x in info['streams']
|
||||
if x['codec_type'] == 'audio']
|
||||
@@ -1400,4 +1403,4 @@ class _AudioSegment(object):
|
||||
"""
|
||||
fh = self.export()
|
||||
data = base64.b64encode(fh.read()).decode('ascii')
|
||||
return src.format(base64=data)
|
||||
return src.format(base64=data)
|
||||
|
||||
@@ -106,6 +106,7 @@ class DropImport:
|
||||
continue
|
||||
|
||||
dest_file = self.get_relative_path(file)
|
||||
full_dest_path: Path = self.driver.lib.library_dir / dest_file
|
||||
|
||||
if file in self.duplicate_files:
|
||||
duplicated_files_progress += 1
|
||||
@@ -115,14 +116,12 @@ class DropImport:
|
||||
if self.choice == 2: # rename
|
||||
new_name = self.get_renamed_duplicate_filename_in_lib(dest_file)
|
||||
dest_file = dest_file.with_name(new_name)
|
||||
self.driver.lib.files_not_in_library.append(dest_file)
|
||||
self.driver.lib.files_not_in_library.append(full_dest_path)
|
||||
else: # override is simply copying but not adding a new entry
|
||||
self.driver.lib.files_not_in_library.append(dest_file)
|
||||
self.driver.lib.files_not_in_library.append(full_dest_path)
|
||||
|
||||
(self.driver.lib.library_dir / dest_file).parent.mkdir(
|
||||
parents=True, exist_ok=True
|
||||
)
|
||||
shutil.copyfile(file, self.driver.lib.library_dir / dest_file)
|
||||
(full_dest_path).parent.mkdir(parents=True, exist_ok=True)
|
||||
shutil.copyfile(file, full_dest_path)
|
||||
|
||||
fileCount += 1
|
||||
yield [fileCount, duplicated_files_progress]
|
||||
|
||||
65
tagstudio/src/qt/modals/ffmpeg_checker.py
Normal file
65
tagstudio/src/qt/modals/ffmpeg_checker.py
Normal file
@@ -0,0 +1,65 @@
|
||||
import logging
|
||||
import math
|
||||
from pathlib import Path
|
||||
from shutil import which
|
||||
import subprocess
|
||||
|
||||
from PIL import Image, ImageQt
|
||||
from PySide6.QtCore import Signal, Qt, QUrl
|
||||
from PySide6.QtGui import QPixmap, QDesktopServices
|
||||
from PySide6.QtWidgets import QMessageBox
|
||||
|
||||
|
||||
class FfmpegChecker(QMessageBox):
|
||||
"""A warning dialog for if FFmpeg is missing."""
|
||||
|
||||
HELP_URL = "https://docs.tagstud.io/help/ffmpeg/"
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
self.setWindowTitle("Warning: Missing dependency")
|
||||
self.setText("Warning: Could not find FFmpeg installation")
|
||||
self.setIcon(QMessageBox.Warning)
|
||||
# Blocks other application interactions until resolved
|
||||
self.setWindowModality(Qt.ApplicationModal)
|
||||
|
||||
self.setStandardButtons(
|
||||
QMessageBox.Help | QMessageBox.Ignore | QMessageBox.Cancel
|
||||
)
|
||||
self.setDefaultButton(QMessageBox.Ignore)
|
||||
# Enables the cancel button but hides it to allow for click X to close dialog
|
||||
self.button(QMessageBox.Cancel).hide()
|
||||
|
||||
self.ffmpeg = False
|
||||
self.ffprobe = False
|
||||
|
||||
def installed(self):
|
||||
"""Checks if both FFmpeg and FFprobe are installed and in the PATH."""
|
||||
if which("ffmpeg"):
|
||||
self.ffmpeg = True
|
||||
if which("ffprobe"):
|
||||
self.ffprobe = True
|
||||
|
||||
logging.info(
|
||||
f"[FFmpegChecker] FFmpeg found: {self.ffmpeg}, FFprobe found: {self.ffprobe}"
|
||||
)
|
||||
return self.ffmpeg and self.ffprobe
|
||||
|
||||
def show_warning(self):
|
||||
"""Displays the warning to the user and awaits respone."""
|
||||
missing = "FFmpeg"
|
||||
# If ffmpeg is installed but not ffprobe
|
||||
if not self.ffprobe and self.ffmpeg:
|
||||
missing = "FFprobe"
|
||||
|
||||
self.setText(f"Warning: Could not find {missing} installation")
|
||||
self.setInformativeText(
|
||||
f"{missing} is required for multimedia thumbnails and playback"
|
||||
)
|
||||
# Shows the dialog
|
||||
selection = self.exec()
|
||||
|
||||
# Selection will either be QMessageBox.Help or (QMessageBox.Ignore | QMessageBox.Cancel) which can be ignored
|
||||
if selection == QMessageBox.Help:
|
||||
QDesktopServices.openUrl(QUrl(self.HELP_URL))
|
||||
@@ -92,6 +92,7 @@ from src.qt.modals.fix_unlinked import FixUnlinkedEntriesModal
|
||||
from src.qt.modals.fix_dupes import FixDupeFilesModal
|
||||
from src.qt.modals.folders_to_tags import FoldersToTagsModal
|
||||
from src.qt.modals.drop_import import DropImport
|
||||
from src.qt.modals.ffmpeg_checker import FfmpegChecker
|
||||
|
||||
# this import has side-effect of import PySide resources
|
||||
import src.qt.resources_rc # pylint: disable=unused-import
|
||||
@@ -639,6 +640,9 @@ class QtDriver(QObject):
|
||||
if self.args.ci:
|
||||
# gracefully terminate the app in CI environment
|
||||
self.thumb_job_queue.put((self.SIGTERM.emit, []))
|
||||
else:
|
||||
# Startup Checks
|
||||
self.check_ffmpeg()
|
||||
|
||||
app.exec()
|
||||
|
||||
@@ -1091,7 +1095,13 @@ class QtDriver(QObject):
|
||||
)
|
||||
)
|
||||
r = CustomRunnable(lambda: iterator.run())
|
||||
r.done.connect(lambda: (pw.hide(), pw.deleteLater(), self.filter_items("")))
|
||||
r.done.connect(
|
||||
lambda: (
|
||||
pw.hide(),
|
||||
pw.deleteLater(),
|
||||
self.filter_items(self.main_window.searchField.text()),
|
||||
)
|
||||
)
|
||||
QThreadPool.globalInstance().start(r)
|
||||
|
||||
def new_file_macros_runnable(self, new_ids):
|
||||
@@ -1846,6 +1856,12 @@ class QtDriver(QObject):
|
||||
self.filter_items()
|
||||
self.main_window.toggle_landing_page(False)
|
||||
|
||||
def check_ffmpeg(self) -> None:
|
||||
"""Checks if FFmpeg is installed and displays a warning if not."""
|
||||
self.ffmpeg_checker = FfmpegChecker()
|
||||
if not self.ffmpeg_checker.installed():
|
||||
self.ffmpeg_checker.show_warning()
|
||||
|
||||
def create_collage(self) -> None:
|
||||
"""Generates and saves an image collage based on Library Entries."""
|
||||
|
||||
|
||||
@@ -492,7 +492,11 @@ class PreviewPanel(QWidget):
|
||||
def update_date_label(self, filepath: Path | None = None) -> None:
|
||||
"""Update the "Date Created" and "Date Modified" file property labels."""
|
||||
if filepath and filepath.is_file():
|
||||
created: dt = dt.fromtimestamp(filepath.stat().st_ctime)
|
||||
created: dt = None
|
||||
if platform.system() == "Windows" or platform.system() == "Darwin":
|
||||
created = dt.fromtimestamp(filepath.stat().st_birthtime) # type: ignore[attr-defined]
|
||||
else:
|
||||
created = dt.fromtimestamp(filepath.stat().st_ctime)
|
||||
modified: dt = dt.fromtimestamp(filepath.stat().st_mtime)
|
||||
self.date_created_label.setText(
|
||||
f"<b>Date Created:</b> {dt.strftime(created, "%a, %x, %X")}"
|
||||
|
||||
@@ -410,7 +410,7 @@ class ThumbRenderer(QObject):
|
||||
faded (bool): Whether or not to apply a faded version of the edge.
|
||||
Used for light themes.
|
||||
"""
|
||||
opacity: float = 0.8 if not faded else 0.6
|
||||
opacity: float = 1.0 if not faded else 0.8
|
||||
shade_reduction: float = (
|
||||
0
|
||||
if QGuiApplication.styleHints().colorScheme() is Qt.ColorScheme.Dark
|
||||
@@ -565,6 +565,7 @@ class ThumbRenderer(QObject):
|
||||
logging.error(
|
||||
f"[ThumbRenderer][WAVEFORM][ERROR]: Couldn't render waveform for {filepath.name} ({type(e).__name__})"
|
||||
)
|
||||
|
||||
return im
|
||||
|
||||
def _blender(self, filepath: Path) -> Image.Image:
|
||||
@@ -1057,7 +1058,12 @@ class ThumbRenderer(QObject):
|
||||
size=(adj_size, adj_size),
|
||||
pixel_ratio=pixel_ratio,
|
||||
)
|
||||
except (UnidentifiedImageError, DecompressionBombError, ValueError) as e:
|
||||
except (
|
||||
UnidentifiedImageError,
|
||||
DecompressionBombError,
|
||||
ValueError,
|
||||
ChildProcessError,
|
||||
) as e:
|
||||
logging.info(
|
||||
f"[ThumbRenderer][ERROR]: Couldn't render thumbnail for {_filepath.name} ({type(e).__name__})"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user