feat!: add file date metadata to db

This commit is contained in:
Travis Abendshien
2026-06-14 22:47:02 -07:00
parent 2c85c082b7
commit 75089cea73
6 changed files with 269 additions and 25 deletions

View File

@@ -7,6 +7,7 @@
# pyright: reportDeprecated=false
import platform
import re
import shutil
import sys
@@ -92,6 +93,7 @@ from tagstudio.core.library.alchemy.fields import (
TextFieldTemplate,
)
from tagstudio.core.library.alchemy.joins import TagEntry, TagParent
from tagstudio.core.library.alchemy.metadata import FileMetadata
from tagstudio.core.library.alchemy.models import (
Entry,
Folder,
@@ -104,6 +106,7 @@ from tagstudio.core.library.alchemy.models import (
from tagstudio.core.library.alchemy.visitors import SQLBoolExpressionBuilder
from tagstudio.core.library.ignore import migrate_ext_list
from tagstudio.core.library.json.library import Library as JsonLibrary
from tagstudio.core.utils.stat import get_date_created, get_date_modified
from tagstudio.core.utils.types import unwrap
from tagstudio.qt.translations import Translations
@@ -934,7 +937,11 @@ class Library:
return entry
def get_entry_full(
self, entry_id: int, with_fields: bool = True, with_tags: bool = True
self,
entry_id: int,
with_fields: bool = True,
with_tags: bool = True,
with_metadata: bool = True,
) -> Entry | None:
"""Load entry and join with all joins and all tags."""
# NOTE: TODO: Currently this method makes multiple separate queries to the db and combines
@@ -964,6 +971,11 @@ class Library:
)
)
if with_metadata:
entry_stmt = entry_stmt.outerjoin(Entry.file_metadata).options(
selectinload(Entry.file_metadata),
)
start_time = time.time()
entry = session.scalar(entry_stmt)
if with_tags:
@@ -1152,10 +1164,79 @@ class Library:
session.query(Entry).where(Entry.id.in_(sub_list)).delete()
session.commit()
def has_entry_with_path(self, path: Path) -> bool:
"""Check if an entry with this path is in the library."""
def get_entry_id_from_path(self, path: Path) -> int:
"""Attempt to return an Entry ID given a filepath, else return -1."""
with Session(self.engine) as session:
return session.query(exists().where(Entry.path == path)).scalar()
return session.scalar(select(Entry.id).where(Entry.path == path).limit(1)) or -1
# def update_entry_file_metadata(
# self, entry_id: int, date_created: datetime | None, date_modified: datetime | None
# ):
# with Session(self.engine) as session:
# stmt = update(FileMetadata).where(
# and_(
# FileMetadata.entry_id == entry_id,
# )
# )
# if date_created:
# stmt = stmt.values(date_created=date_created)
# if date_modified:
# stmt = stmt.values(date_modified=date_modified)
# session.execute(stmt)
# session.commit()
def refresh_file_entry_stats(self, entry_id: int, path: Path | None):
"""Updates a file entry's associated stat() data."""
needs_update = False
entry = self.get_entry_full(
entry_id, with_fields=False, with_tags=False, with_metadata=True
)
if not entry:
return
if not path:
full_path = unwrap(self.library_dir) / entry.path
else:
full_path = unwrap(self.library_dir) / path
logger.info(full_path)
file_date_created = get_date_created(full_path)
file_date_modified = get_date_modified(full_path)
# Log info
if entry.date_created != file_date_created:
logger.info(f"Difference in date_created!: {entry.date_created}/{file_date_created}")
needs_update = True
else:
logger.info("No difference in date_created.")
if entry.date_modified != file_date_modified:
logger.info(f"Difference in date_modified!: {entry.date_modified}/{file_date_modified}")
needs_update = True
else:
logger.info("No difference in date_modified")
if needs_update:
return
else:
logger.info(f"Updating entry file_metadata for {full_path}")
with Session(self.engine) as session:
stmt = update(FileMetadata).where(
and_(
FileMetadata.entry_id == entry_id,
)
)
if file_date_created:
stmt = stmt.values(date_created=file_date_created)
if file_date_modified:
stmt = stmt.values(date_modified=file_date_modified)
session.execute(stmt)
session.commit()
def get_paths(self, limit: int = -1) -> list[str]:
path_strings: list[str] = []
@@ -1317,7 +1398,7 @@ class Library:
Returns True if the action succeeded and False if the path already exists.
"""
if self.has_entry_with_path(path):
if self.get_entry_id_from_path(path) >= 0:
return False
if isinstance(entry_id, Entry):
entry_id = entry_id.id

View File

@@ -0,0 +1,116 @@
# SPDX-FileCopyrightText: (c) TagStudio Contributors
# SPDX-License-Identifier: MIT
from __future__ import annotations
from datetime import datetime as dt
from pathlib import Path
from typing import TYPE_CHECKING, Any, override
from sqlalchemy import ForeignKey, ForeignKeyConstraint, Integer, null
from sqlalchemy.orm import Mapped, declared_attr, mapped_column, relationship
from tagstudio.core.library.alchemy.db import Base, PathType
from tagstudio.core.library.alchemy.joins import TagParent
if TYPE_CHECKING:
from tagstudio.core.library.alchemy.models import Entry
class FileMetadata(Base):
"""Table that includes file data and metadata obtained from os.stat() for entries."""
__tablename__ = "file_metadata"
entry_id: Mapped[int] = mapped_column(
ForeignKey("entries.id"), primary_key=True, nullable=False
)
# NOTE: These dates are stored as floats because that's their natural form from os.stat()
# and comparisons are quicker without having to convert to/from datetime objects.
date_created: Mapped[float | None]
date_modified: Mapped[float | None]
def __init__(
self,
entry_id: int,
date_created: float | None = None,
date_modified: float | None = None,
) -> None:
super().__init__()
self.entry_id = entry_id
# # Path data
# self.path = path
# self.filename = path.name
# self.suffix = path.suffix.lstrip(".").lower()
# File metadata
self.date_created = date_created # st_birthtime on Windows and Mac, st_ctime on Linux
self.date_modified = date_modified # st_mtime
class ExifMetadata(Base):
"""Contains Exif metadata for a entries."""
__tablename__ = "exif_metadata"
entry_id: Mapped[int] = mapped_column(
ForeignKey("entries.id"), primary_key=True, nullable=False
)
date_taken: Mapped[dt | None]
def __init__(
self,
entry_id: int,
date_taken: dt | None = None,
) -> None:
super().__init__()
self.entry_id = entry_id
self.date_taken = date_taken # Exif.Image.DateTime
class DimensionMetadata(Base):
"""Contains dimension metadata for entries (e.g. image and video files)."""
__tablename__ = "dimension_metadata"
entry_id: Mapped[int] = mapped_column(
ForeignKey("entries.id"), primary_key=True, nullable=False
)
width: Mapped[int] = mapped_column(nullable=False)
height: Mapped[int] = mapped_column(nullable=False)
def __init__(
self,
entry_id: int,
width: int,
height: int,
) -> None:
super().__init__()
self.entry_id = entry_id
self.width = width
self.height = height
class DurationMetadata(Base):
"""Contains duration metadata for entries (e.g. audio and video files)."""
__tablename__ = "duration_metadata"
entry_id: Mapped[int] = mapped_column(
ForeignKey("entries.id"), primary_key=True, nullable=False
)
duration: Mapped[float] = mapped_column(nullable=False)
def __init__(
self,
entry_id: int,
duration: float,
) -> None:
super().__init__()
self.entry_id = entry_id
self.duration = duration

View File

@@ -17,6 +17,8 @@ from tagstudio.core.library.alchemy.fields import (
TextField,
)
from tagstudio.core.library.alchemy.joins import TagParent
from tagstudio.core.library.alchemy.metadata import FileMetadata
from tagstudio.core.utils.stat import get_date_created, get_date_modified
class Namespace(Base):
@@ -181,6 +183,7 @@ class Tag(Base):
return self.name >= other.name
# TODO: Use or replace these with an actual multi-root implementation
class Folder(Base):
__tablename__ = "folders"
@@ -195,15 +198,16 @@ class Entry(Base):
id: Mapped[int] = mapped_column(primary_key=True)
# TODO: Use or replace these with an actual multi-root implementation
folder_id: Mapped[int] = mapped_column(ForeignKey("folders.id"))
folder: Mapped[Folder] = relationship("Folder")
# TODO: Possibly move to FileMetadata table if Entry is split into Entry/FileEntry (see #588)
path: Mapped[Path] = mapped_column(PathType, unique=True)
filename: Mapped[str] = mapped_column()
suffix: Mapped[str] = mapped_column()
date_created: Mapped[dt | None]
date_modified: Mapped[dt | None]
date_added: Mapped[dt | None]
date_added: Mapped[dt | None] # The date this entry was added to the library
tags: Mapped[set[Tag]] = relationship(secondary="tag_entries")
@@ -216,6 +220,11 @@ class Entry(Base):
cascade="all, delete",
)
file_metadata: Mapped["FileMetadata"] = relationship(
uselist=False,
cascade="all, delete-orphan",
)
@property
def fields(self) -> list[BaseField]:
fields: list[BaseField] = []
@@ -231,30 +240,35 @@ class Entry(Base):
def is_archived(self) -> bool:
return any(tag.id == TAG_ARCHIVED for tag in self.tags)
@property
def date_created(self) -> float | None:
return self.file_metadata.date_created if self.file_metadata else None
@property
def date_modified(self) -> float | None:
return self.file_metadata.date_modified if self.file_metadata else None
def __init__(
self,
path: Path,
folder: Folder,
fields: list[BaseField],
id: int | None = None,
date_created: dt | None = None,
date_modified: dt | None = None,
date_added: dt | None = None,
# date_created: float | None = None,
# date_modified: float | None = None,
path_for_file_metadata: Path | None = None,
) -> None:
super().__init__()
self.path = path
self.folder = folder
self.id = id # pyright: ignore[reportAttributeAccessIssue]
self.folder = folder # NOTE: Currently unused
self.path = path
self.filename = path.name
self.suffix = path.suffix.lstrip(".").lower()
# The date the file associated with this entry was created.
# st_birthtime on Windows and Mac, st_ctime on Linux.
self.date_created = date_created
# The date the file associated with this entry was last modified: st_mtime.
self.date_modified = date_modified
# The date this entry was added to the library.
self.date_added = date_added
self.date_added = date_added # The date this entry was added to the library
for field in fields:
if isinstance(field, TextField):
@@ -264,6 +278,13 @@ class Entry(Base):
else:
raise ValueError(f"Invalid field type: {field}")
if path_for_file_metadata:
self.file_metadata = FileMetadata(
entry_id=self.id,
date_created=get_date_created(path_for_file_metadata),
date_modified=get_date_modified(path_for_file_metadata),
)
def has_tag(self, tag: Tag) -> bool:
return tag in self.tags

View File

@@ -8,6 +8,7 @@ from dataclasses import dataclass, field
from datetime import datetime as dt
from pathlib import Path
from time import time
import platform
import structlog
from wcmatch import pathlib
@@ -38,12 +39,14 @@ class RefreshTracker:
while index < len(self.files_not_in_library):
yield index
end = min(len(self.files_not_in_library), index + batch_size)
lib_dir = unwrap(self.library.library_dir)
entries = [
Entry(
path=entry_path,
folder=unwrap(self.library.folder),
fields=[],
date_added=dt.now(),
path_for_file_metadata=(lib_dir / entry_path),
)
for entry_path in self.files_not_in_library[index:end]
]
@@ -144,8 +147,11 @@ class RefreshTracker:
dir_file_count += 1
self.library.included_files.add(f)
if not self.library.has_entry_with_path(f):
entry_id = self.library.get_entry_id_from_path(f)
if entry_id < 0:
self.files_not_in_library.append(f)
else:
self.library.refresh_file_entry_stats(entry_id, path=f)
end_time_total = time()
yield dir_file_count
@@ -189,8 +195,12 @@ class RefreshTracker:
relative_path = f.relative_to(library_dir)
if not self.library.has_entry_with_path(relative_path):
entry_id = self.library.get_entry_id_from_path(relative_path)
if entry_id < 0:
self.files_not_in_library.append(relative_path)
else:
self.library.refresh_file_entry_stats(entry_id, path=relative_path)
except ValueError:
logger.info("[Refresh]: ValueError when refreshing directory with wcmatch!")

View File

@@ -0,0 +1,16 @@
# SPDX-FileCopyrightText: (c) TagStudio Contributors
# SPDX-License-Identifier: MIT
import platform
from pathlib import Path
def get_date_modified(path: Path) -> float:
return path.stat().st_mtime
def get_date_created(path: Path) -> float:
if platform.system() in {"Windows", "Darwin"}:
return path.stat().st_birthtime
else:
return path.stat().st_ctime

View File

@@ -83,9 +83,9 @@ def test_library_add_file(library: Library):
fields=[TextField(name="Title", value="I'm a Test Title")],
)
assert not library.has_entry_with_path(entry.path)
assert not library.get_entry_id_from_path(entry.path)
assert library.add_entries([entry])
assert library.has_entry_with_path(entry.path)
assert library.get_entry_id_from_path(entry.path)
def test_create_tag(library: Library, generate_tag: Callable[..., Tag]):
@@ -345,8 +345,8 @@ def test_merge_entries(library: Library):
entry_b_: Entry = unwrap(library.get_entry_full(entry_b_id))
assert library.merge_entries(entry_a_, entry_b_)
assert not library.has_entry_with_path(Path("a"))
assert library.has_entry_with_path(Path("b"))
assert not library.get_entry_id_from_path(Path("a"))
assert library.get_entry_id_from_path(Path("b"))
entry_b_merged = unwrap(library.get_entry_full(entry_b_id))