mirror of
https://github.com/TagStudioDev/TagStudio.git
synced 2026-01-31 15:19:10 +00:00
fix: refactor and fix bugs with missing_files.py (#739)
* fix: relink unlinked entry to existing entry without sql error (#720) * edited and added db functions get_entry_full_by_path & merge_entries * implemented edge case for entry existing on relinking * added test for merge_entries * fix: don't remove item while iterating over list * fix: catch `FileNotFoundError` for unlinked raw files * refactor: rename methods and variables in missing_files.py * refactor: rename `missing_files_count` to `missing_file_entries_count` --------- Co-authored-by: mashed5894 <mashed5894@gmail.com>
This commit is contained in:
committed by
GitHub
parent
458925fbf7
commit
20d788af29
@@ -490,6 +490,32 @@ class Library:
|
||||
yield entry
|
||||
session.expunge(entry)
|
||||
|
||||
def get_entry_full_by_path(self, path: Path) -> Entry | None:
|
||||
"""Get the entry with the corresponding path."""
|
||||
with Session(self.engine) as session:
|
||||
stmt = select(Entry).where(Entry.path == path)
|
||||
stmt = (
|
||||
stmt.outerjoin(Entry.text_fields)
|
||||
.outerjoin(Entry.datetime_fields)
|
||||
.options(selectinload(Entry.text_fields), selectinload(Entry.datetime_fields))
|
||||
)
|
||||
stmt = (
|
||||
stmt.outerjoin(Entry.tags)
|
||||
.outerjoin(TagAlias)
|
||||
.options(
|
||||
selectinload(Entry.tags).options(
|
||||
joinedload(Tag.aliases),
|
||||
joinedload(Tag.parent_tags),
|
||||
)
|
||||
)
|
||||
)
|
||||
entry = session.scalar(stmt)
|
||||
if not entry:
|
||||
return None
|
||||
session.expunge(entry)
|
||||
make_transient(entry)
|
||||
return entry
|
||||
|
||||
@property
|
||||
def entries_count(self) -> int:
|
||||
with Session(self.engine) as session:
|
||||
@@ -698,7 +724,13 @@ class Library:
|
||||
|
||||
return res
|
||||
|
||||
def update_entry_path(self, entry_id: int | Entry, path: Path) -> None:
|
||||
def update_entry_path(self, entry_id: int | Entry, path: Path) -> bool:
|
||||
"""Set the path field of an entry.
|
||||
|
||||
Returns True if the action succeeded and False if the path already exists.
|
||||
"""
|
||||
if self.has_path_entry(path):
|
||||
return False
|
||||
if isinstance(entry_id, Entry):
|
||||
entry_id = entry_id.id
|
||||
|
||||
@@ -715,6 +747,7 @@ class Library:
|
||||
|
||||
session.execute(update_stmt)
|
||||
session.commit()
|
||||
return True
|
||||
|
||||
def remove_tag(self, tag: Tag):
|
||||
with Session(self.engine, expire_on_commit=False) as session:
|
||||
@@ -1185,6 +1218,18 @@ class Library:
|
||||
value=field.value,
|
||||
)
|
||||
|
||||
def merge_entries(self, from_entry: Entry, into_entry: Entry) -> None:
|
||||
"""Add fields and tags from the first entry to the second, and then delete the first."""
|
||||
for field in from_entry.fields:
|
||||
self.add_field_to_entry(
|
||||
entry_id=into_entry.id,
|
||||
field_id=field.type_key,
|
||||
value=field.value,
|
||||
)
|
||||
tag_ids = [tag.id for tag in from_entry.tags]
|
||||
self.add_tags_to_entry(into_entry.id, tag_ids)
|
||||
self.remove_entries([from_entry.id])
|
||||
|
||||
@property
|
||||
def tag_color_groups(self) -> dict[str, list[TagColorGroup]]:
|
||||
"""Return every TagColorGroup in the library."""
|
||||
|
||||
@@ -4,10 +4,7 @@ from pathlib import Path
|
||||
|
||||
import structlog
|
||||
from src.core.library import Entry, Library
|
||||
|
||||
IGNORE_ITEMS = [
|
||||
"$recycle.bin",
|
||||
]
|
||||
from src.core.utils.refresh_dir import GLOBAL_IGNORE_SET
|
||||
|
||||
logger = structlog.get_logger()
|
||||
|
||||
@@ -18,49 +15,73 @@ class MissingRegistry:
|
||||
|
||||
library: Library
|
||||
files_fixed_count: int = 0
|
||||
missing_files: list[Entry] = field(default_factory=list)
|
||||
missing_file_entries: list[Entry] = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def missing_files_count(self) -> int:
|
||||
return len(self.missing_files)
|
||||
def missing_file_entries_count(self) -> int:
|
||||
return len(self.missing_file_entries)
|
||||
|
||||
def refresh_missing_files(self) -> Iterator[int]:
|
||||
"""Track the number of Entries that point to an invalid file path."""
|
||||
logger.info("refresh_missing_files running")
|
||||
self.missing_files = []
|
||||
"""Track the number of entries that point to an invalid filepath."""
|
||||
logger.info("[refresh_missing_files] Refreshing missing files...")
|
||||
self.missing_file_entries = []
|
||||
for i, entry in enumerate(self.library.get_entries()):
|
||||
full_path = self.library.library_dir / entry.path
|
||||
if not full_path.exists() or not full_path.is_file():
|
||||
self.missing_files.append(entry)
|
||||
self.missing_file_entries.append(entry)
|
||||
yield i
|
||||
|
||||
def match_missing_file(self, match_item: Entry) -> list[Path]:
|
||||
"""Try to find missing entry files within the library directory.
|
||||
def match_missing_file_entry(self, match_entry: Entry) -> list[Path]:
|
||||
"""Try and match unlinked file entries with matching results in the library directory.
|
||||
|
||||
Works if files were just moved to different subfolders and don't have duplicate names.
|
||||
"""
|
||||
matches = []
|
||||
for item in self.library.library_dir.glob(f"**/{match_item.path.name}"):
|
||||
if item.name == match_item.path.name: # TODO - implement IGNORE_ITEMS
|
||||
new_path = Path(item).relative_to(self.library.library_dir)
|
||||
for path in self.library.library_dir.glob(f"**/{match_entry.path.name}"):
|
||||
# Ensure matched file isn't in a globally ignored folder
|
||||
skip: bool = False
|
||||
for part in path.parts:
|
||||
if part in GLOBAL_IGNORE_SET:
|
||||
skip = True
|
||||
break
|
||||
if skip:
|
||||
continue
|
||||
if path.name == match_entry.path.name:
|
||||
new_path = Path(path).relative_to(self.library.library_dir)
|
||||
matches.append(new_path)
|
||||
|
||||
logger.info("[MissingRegistry] Matches", matches=matches)
|
||||
return matches
|
||||
|
||||
def fix_missing_files(self) -> Iterator[int]:
|
||||
"""Attempt to fix missing files by finding a match in the library directory."""
|
||||
def fix_unlinked_entries(self) -> Iterator[int]:
|
||||
"""Attempt to fix unlinked file entries by finding a match in the library directory."""
|
||||
self.files_fixed_count = 0
|
||||
for i, entry in enumerate(self.missing_files, start=1):
|
||||
item_matches = self.match_missing_file(entry)
|
||||
matched_entries: list[Entry] = []
|
||||
for i, entry in enumerate(self.missing_file_entries):
|
||||
item_matches = self.match_missing_file_entry(entry)
|
||||
if len(item_matches) == 1:
|
||||
logger.info("fix_missing_files", entry=entry, item_matches=item_matches)
|
||||
self.library.update_entry_path(entry.id, item_matches[0])
|
||||
logger.info(
|
||||
"[fix_unlinked_entries]",
|
||||
entry=entry.path.as_posix(),
|
||||
item_matches=item_matches[0].as_posix(),
|
||||
)
|
||||
if not self.library.update_entry_path(entry.id, item_matches[0]):
|
||||
try:
|
||||
match = self.library.get_entry_full_by_path(item_matches[0])
|
||||
entry_full = self.library.get_entry_full(entry.id)
|
||||
self.library.merge_entries(entry_full, match)
|
||||
except AttributeError:
|
||||
continue
|
||||
self.files_fixed_count += 1
|
||||
# remove fixed file
|
||||
self.missing_files.remove(entry)
|
||||
matched_entries.append(entry)
|
||||
yield i
|
||||
|
||||
def execute_deletion(self) -> None:
|
||||
self.library.remove_entries(list(map(lambda missing: missing.id, self.missing_files)))
|
||||
for entry in matched_entries:
|
||||
self.missing_file_entries.remove(entry)
|
||||
|
||||
self.missing_files = []
|
||||
def execute_deletion(self) -> None:
|
||||
self.library.remove_entries(
|
||||
list(map(lambda missing: missing.id, self.missing_file_entries))
|
||||
)
|
||||
|
||||
self.missing_file_entries = []
|
||||
|
||||
@@ -43,7 +43,7 @@ class DeleteUnlinkedEntriesModal(QWidget):
|
||||
Translations.translate_qobject(
|
||||
self.desc_widget,
|
||||
"entries.unlinked.delete.confirm",
|
||||
count=self.tracker.missing_files_count,
|
||||
count=self.tracker.missing_file_entries_count,
|
||||
)
|
||||
self.desc_widget.setAlignment(Qt.AlignmentFlag.AlignCenter)
|
||||
|
||||
@@ -75,12 +75,12 @@ class DeleteUnlinkedEntriesModal(QWidget):
|
||||
def refresh_list(self):
|
||||
self.desc_widget.setText(
|
||||
Translations.translate_formatted(
|
||||
"entries.unlinked.delete.confirm", count=self.tracker.missing_files_count
|
||||
"entries.unlinked.delete.confirm", count=self.tracker.missing_file_entries_count
|
||||
)
|
||||
)
|
||||
|
||||
self.model.clear()
|
||||
for i in self.tracker.missing_files:
|
||||
for i in self.tracker.missing_file_entries:
|
||||
item = QStandardItem(str(i.path))
|
||||
item.setEditable(False)
|
||||
self.model.appendRow(item)
|
||||
@@ -90,7 +90,7 @@ class DeleteUnlinkedEntriesModal(QWidget):
|
||||
return Translations.translate_formatted(
|
||||
"entries.unlinked.delete.deleting_count",
|
||||
idx=x,
|
||||
count=self.tracker.missing_files_count,
|
||||
count=self.tracker.missing_file_entries_count,
|
||||
)
|
||||
|
||||
pw = ProgressWidget(
|
||||
|
||||
@@ -129,7 +129,7 @@ class FixUnlinkedEntriesModal(QWidget):
|
||||
if count is not None:
|
||||
self.missing_count = count
|
||||
else:
|
||||
self.missing_count = self.tracker.missing_files_count
|
||||
self.missing_count = self.tracker.missing_file_entries_count
|
||||
|
||||
if self.missing_count < 0:
|
||||
self.search_button.setDisabled(True)
|
||||
|
||||
@@ -21,7 +21,7 @@ class RelinkUnlinkedEntries(QObject):
|
||||
return Translations.translate_formatted(
|
||||
"entries.unlinked.relink.attempting",
|
||||
idx=x,
|
||||
missing_count=self.tracker.missing_files_count,
|
||||
missing_count=self.tracker.missing_file_entries_count,
|
||||
fixed_count=self.tracker.files_fixed_count,
|
||||
)
|
||||
|
||||
@@ -29,8 +29,8 @@ class RelinkUnlinkedEntries(QObject):
|
||||
label_text="",
|
||||
cancel_button_text=None,
|
||||
minimum=0,
|
||||
maximum=self.tracker.missing_files_count,
|
||||
maximum=self.tracker.missing_file_entries_count,
|
||||
)
|
||||
Translations.translate_with_setter(pw.setWindowTitle, "entries.unlinked.relink.title")
|
||||
|
||||
pw.from_iterable_function(self.tracker.fix_missing_files, displayed_text, self.done.emit)
|
||||
pw.from_iterable_function(self.tracker.fix_unlinked_entries, displayed_text, self.done.emit)
|
||||
|
||||
@@ -198,6 +198,7 @@ class PreviewThumb(QWidget):
|
||||
except (
|
||||
rawpy._rawpy.LibRawIOError,
|
||||
rawpy._rawpy.LibRawFileUnsupportedError,
|
||||
FileNotFoundError,
|
||||
):
|
||||
pass
|
||||
elif MediaCategories.is_ext_in_category(
|
||||
|
||||
@@ -9,6 +9,7 @@ from src.core.utils.missing_files import MissingRegistry
|
||||
CWD = pathlib.Path(__file__).parent
|
||||
|
||||
|
||||
# NOTE: Does this test actually work?
|
||||
@pytest.mark.parametrize("library", [TemporaryDirectory()], indirect=True)
|
||||
def test_refresh_missing_files(library: Library):
|
||||
registry = MissingRegistry(library=library)
|
||||
@@ -20,10 +21,10 @@ def test_refresh_missing_files(library: Library):
|
||||
assert list(registry.refresh_missing_files()) == [0, 1]
|
||||
|
||||
# neither of the library entries exist
|
||||
assert len(registry.missing_files) == 2
|
||||
assert len(registry.missing_file_entries) == 2
|
||||
|
||||
# iterate through two files
|
||||
assert list(registry.fix_missing_files()) == [1, 2]
|
||||
assert list(registry.fix_unlinked_entries()) == [0, 1]
|
||||
|
||||
# `bar.md` should be relinked to new correct path
|
||||
results = library.search_library(FilterState.from_path("bar.md"))
|
||||
|
||||
@@ -309,6 +309,41 @@ def test_mirror_entry_fields(library: Library, entry_full):
|
||||
}
|
||||
|
||||
|
||||
def test_merge_entries(library: Library):
|
||||
a = Entry(
|
||||
folder=library.folder,
|
||||
path=Path("a"),
|
||||
fields=[
|
||||
TextField(type_key=_FieldID.AUTHOR.name, value="Author McAuthorson", position=0),
|
||||
TextField(type_key=_FieldID.DESCRIPTION.name, value="test description", position=2),
|
||||
],
|
||||
)
|
||||
b = Entry(
|
||||
folder=library.folder,
|
||||
path=Path("b"),
|
||||
fields=[TextField(type_key=_FieldID.NOTES.name, value="test note", position=1)],
|
||||
)
|
||||
try:
|
||||
ids = library.add_entries([a, b])
|
||||
entry_a = library.get_entry_full(ids[0])
|
||||
entry_b = library.get_entry_full(ids[1])
|
||||
tag_0 = library.add_tag(Tag(id=1000, name="tag_0"))
|
||||
tag_1 = library.add_tag(Tag(id=1001, name="tag_1"))
|
||||
tag_2 = library.add_tag(Tag(id=1002, name="tag_2"))
|
||||
library.add_tags_to_entry(ids[0], [tag_0.id, tag_2.id])
|
||||
library.add_tags_to_entry(ids[1], [tag_1.id])
|
||||
library.merge_entries(entry_a, entry_b)
|
||||
assert library.has_path_entry(Path("b"))
|
||||
assert not library.has_path_entry(Path("a"))
|
||||
fields = [field.value for field in entry_a.fields]
|
||||
assert "Author McAuthorson" in fields
|
||||
assert "test description" in fields
|
||||
assert "test note" in fields
|
||||
assert b.has_tag(tag_0) and b.has_tag(tag_1) and b.has_tag(tag_2)
|
||||
except AttributeError:
|
||||
AssertionError()
|
||||
|
||||
|
||||
def test_remove_tag_from_entry(library, entry_full):
|
||||
removed_tag_id = -1
|
||||
for tag in entry_full.tags:
|
||||
|
||||
Reference in New Issue
Block a user