diff --git a/src/tagstudio/qt/cache_manager.py b/src/tagstudio/qt/cache_manager.py index 043c7073..bb3f60f4 100644 --- a/src/tagstudio/qt/cache_manager.py +++ b/src/tagstudio/qt/cache_manager.py @@ -2,189 +2,170 @@ # Licensed under the GPL-3.0 License. # Created for TagStudio: https://github.com/CyanVoxel/TagStudio -import contextlib import math -import typing +from collections.abc import Iterable from datetime import datetime as dt from pathlib import Path +from threading import RLock import structlog from PIL import Image from tagstudio.core.constants import THUMB_CACHE_NAME, TS_FOLDER_NAME -from tagstudio.core.singleton import Singleton - -# Only import for type checking/autocompletion, will not be imported at runtime. -if typing.TYPE_CHECKING: - from tagstudio.core.library import Library logger = structlog.get_logger(__name__) -class CacheManager(metaclass=Singleton): - FOLDER_SIZE = 10000000 # Each cache folder assumed to be 10 MiB - size_limit = 500000000 # 500 MiB default +class CacheEntry: + def __init__(self, path: Path, size: int): + self.path: Path = path + self.size: int = size - folder_dict: dict[Path, int] = {} - def __init__(self): - self.lib: Library | None = None - self.last_lib_path: Path | None = None +class CacheManager: + DEFAULT_MAX_SIZE = 500_000_000 + DEFAULT_MAX_FOLDER_SIZE = 10_000_000 - @staticmethod - def clear_cache(library_dir: Path | None) -> bool: - """Clear all files and folders within the cached folder. + def __init__( + self, + library_dir: Path, + max_size: int = DEFAULT_MAX_SIZE, + max_folder_size: int = DEFAULT_MAX_FOLDER_SIZE, + ): + self._lock = RLock() + self.cache_folder = library_dir / TS_FOLDER_NAME / THUMB_CACHE_NAME + self.max_folder_size = max_folder_size + self.max_size = max(max_size, max_folder_size) - Returns: - bool: True if successfully deleted, else False. - """ - cleared = True + self.folders: list[CacheEntry] = [] + self.current_size = 0 + if self.cache_folder.exists(): + for folder in self.cache_folder.iterdir(): + if not folder.is_dir(): + continue + folder_size = 0 + for file in folder.iterdir(): + folder_size += file.stat().st_size + self.folders.append(CacheEntry(folder, folder_size)) + self.current_size += folder_size - if library_dir: - tree: Path = library_dir / TS_FOLDER_NAME / THUMB_CACHE_NAME + def _set_mru(self, index: int): + """Move entry at index so it's considered the most recently used.""" + with self._lock as _lock: + if index == (len(self.folders) - 1): + return + entry = self.folders.pop(index) + self.folders.append(entry) - for folder in tree.glob("*"): - for file in folder.glob("*"): - # NOTE: On macOS with non-native file systems, this will commonly raise - # FileNotFound errors due to trying to delete "._" files that have - # already been deleted: https://bugs.python.org/issue29699 - with contextlib.suppress(FileNotFoundError): - file.unlink() + def _mru(self) -> Iterable[int]: + """Get each folders index sorted most recently used first.""" + with self._lock as _lock: + return reversed(range(len(self.folders))) + + def _lru(self) -> Iterable[int]: + """Get each folders index sorted least recently used first.""" + with self._lock as _lock: + return range(len(self.folders)) + + def clear_cache(self): + """Clear all files and folders within the cached folder.""" + with self._lock as _lock: + folders = [] + for folder in self.folders: + if not self._remove_folder(folder): + folders.append(folders) + logger.warn("[CacheManager] Failed to remove folder", folder=folder) + self.folders = folders + logger.info("[CacheManager] Cleared cache!") + + def _remove_folder(self, entry: CacheEntry) -> bool: + with self._lock as _lock: + self.current_size -= entry.size + if not entry.path.is_dir(): + return True + + is_empty = True + for file in entry.path.iterdir(): + assert file.is_file() and file.suffix == ".webp" try: - folder.rmdir() - with contextlib.suppress(KeyError): - CacheManager.folder_dict.pop(folder) - except Exception as e: - logger.error( - "[CacheManager] Couldn't unlink empty cache folder!", - error=e, - folder=folder, - tree=tree, - ) + file.unlink(missing_ok=True) + except BaseException as e: + is_empty = False + logger.warn("[CacheManager] Failed to remove file", file=file, error=e) - for _ in tree.glob("*"): - cleared = False - - if cleared: - logger.info("[CacheManager] Cleared cache!") + if is_empty: + entry.path.rmdir() + return True else: - logger.error("[CacheManager] Couldn't delete cache!", tree=tree) + size = 0 + for file in entry.path.iterdir(): + size += file.stat().st_size + entry.size = size + self.current_size += size + return False - return cleared + def get_file_path(self, file_name: Path) -> Path | None: + with self._lock as _lock: + for i in self._mru(): + entry = self.folders[i] + file_path = entry.path / file_name + if file_path.exists(): + self._set_mru(i) + return file_path + return None - def set_library(self, library): - """Set the TagStudio library for the cache manager.""" - self.lib = library - self.last_lib_path = self.lib.library_dir - if library.library_dir: - self.check_folder_status() - - def cache_dir(self) -> Path | None: - """Return the current cache directory, not including folder slugs.""" - if not self.lib.library_dir: - return None - return Path(self.lib.library_dir / TS_FOLDER_NAME / THUMB_CACHE_NAME) - - def save_image(self, image: Image.Image, path: Path, mode: str = "RGBA"): + def save_image(self, image: Image.Image, file_name: Path, mode: str = "RGBA"): """Save an image to the cache.""" - folder = self.get_current_folder() - if folder: - image_path: Path = folder / path - image.save(image_path, mode=mode) - with contextlib.suppress(KeyError): - CacheManager.folder_dict[folder] += image_path.stat().st_size + with self._lock as _lock: + entry = self._get_current_folder() + file_path = entry.path / file_name + image.save(file_path, mode=mode) - def check_folder_status(self): - """Check the status of the cache folders. + size = file_path.stat().st_size + entry.size += size + self.current_size += size + self._cull_folders() - This includes registering existing ones and creating new ones if needed. - """ - if ( - (self.last_lib_path != self.lib.library_dir) - or not self.cache_dir() - or not self.cache_dir().exists() - ): - self.register_existing_folders() - - def create_folder() -> Path | None: - """Create a new cache folder.""" - if not self.lib.library_dir: - return None - folder_path = Path(self.cache_dir() / str(math.floor(dt.timestamp(dt.now())))) - logger.info("[CacheManager] Creating new folder", folder=folder_path) + def _create_folder(self) -> CacheEntry: + with self._lock as _lock: + folder = self.cache_folder / Path(str(math.floor(dt.timestamp(dt.now())))) try: - folder_path.mkdir(exist_ok=True) - except NotADirectoryError: - logger.error("[CacheManager] Not a directory", path=folder_path) - return folder_path + folder.mkdir(parents=True) + except FileExistsError: + for entry in self.folders: + if entry.path == folder: + return entry + entry = CacheEntry(folder, 0) + self.folders.append(entry) + return entry - # Get size of most recent folder, if any exist. - if CacheManager.folder_dict: - last_folder = sorted(CacheManager.folder_dict.keys())[-1] + def _get_current_folder(self) -> CacheEntry: + with self._lock as _lock: + if len(self.folders) == 0: + return self._create_folder() - if CacheManager.folder_dict[last_folder] > CacheManager.FOLDER_SIZE: - new_folder = create_folder() - CacheManager.folder_dict[new_folder] = 0 - else: - new_folder = create_folder() - CacheManager.folder_dict[new_folder] = 0 + for i in self._mru(): + entry = self.folders[i] + if entry.size < self.max_folder_size: + self._set_mru(i) + return entry - def get_current_folder(self) -> Path: - """Get the current cache folder path that should be used.""" - self.check_folder_status() - self.cull_folders() + return self._create_folder() - return sorted(CacheManager.folder_dict.keys())[-1] - - def register_existing_folders(self): - """Scan and register any pre-existing cache folders with the most recent size.""" - self.last_lib_path = self.lib.library_dir - CacheManager.folder_dict.clear() - - if self.last_lib_path: - # Ensure thumbnail cache path exists. - self.cache_dir().mkdir(exist_ok=True) - # Registers any existing folders and counts the capacity of the most recent one. - for f in sorted(self.cache_dir().glob("*")): - if f.is_dir(): - # A folder is found. Add it to the class dict.BlockingIOError - CacheManager.folder_dict[f] = 0 - CacheManager.folder_dict = dict( - sorted(CacheManager.folder_dict.items(), key=lambda kv: kv[0]) - ) - - if CacheManager.folder_dict: - last_folder = sorted(CacheManager.folder_dict.keys())[-1] - for f in last_folder.glob("*"): - if not f.is_dir(): - with contextlib.suppress(KeyError): - CacheManager.folder_dict[last_folder] += f.stat().st_size - - def cull_folders(self): + def _cull_folders(self): """Remove folders and their cached context based on size or age limits.""" - # Ensure that the user's configured size limit isn't less than the internal folder size. - size_limit = max(CacheManager.size_limit, CacheManager.FOLDER_SIZE) + with self._lock as _lock: + if self.current_size < self.max_size: + return - if len(CacheManager.folder_dict) > (size_limit / CacheManager.FOLDER_SIZE): - f = sorted(CacheManager.folder_dict.keys())[0] - folder = self.cache_dir() / f - logger.info("[CacheManager] Removing folder due to size limit", folder=folder) + removed: list[int] = [] + for i in self._lru(): + entry = self.folders[i] + logger.info("[CacheManager] Removing folder due to size limit", folder=entry.path) + if self._remove_folder(entry): + removed.append(i) + if self.current_size < self.max_size: + break - for file in folder.glob("*"): - try: - file.unlink() - except Exception as e: - logger.error( - "[CacheManager] Couldn't cull file inside of folder!", - error=e, - file=file, - folder=folder, - ) - try: - folder.rmdir() - with contextlib.suppress(KeyError): - CacheManager.folder_dict.pop(f) - self.cull_folders() - except Exception as e: - logger.error("[CacheManager] Couldn't cull folder!", error=e, folder=folder) - pass + for index in sorted(removed, reverse=True): + self.folders.pop(index) diff --git a/src/tagstudio/qt/ts_qt.py b/src/tagstudio/qt/ts_qt.py index 3b043158..6c68ec0e 100644 --- a/src/tagstudio/qt/ts_qt.py +++ b/src/tagstudio/qt/ts_qt.py @@ -183,6 +183,7 @@ class QtDriver(DriverMixin, QObject): applied_theme: Theme lib: Library + cache_manager: CacheManager browsing_history: History[BrowsingState] @@ -245,24 +246,6 @@ class QtDriver(DriverMixin, QObject): Translations.change_language(self.settings.language) - # NOTE: This should be a per-library setting rather than an application setting. - thumb_cache_size_limit: int = int( - str( - self.cached_values.value( - SettingItems.THUMB_CACHE_SIZE_LIMIT, - defaultValue=CacheManager.size_limit, - type=int, - ) - ) - ) - - CacheManager.size_limit = thumb_cache_size_limit - self.cached_values.setValue(SettingItems.THUMB_CACHE_SIZE_LIMIT, CacheManager.size_limit) - self.cached_values.sync() - logger.info( - f"[Config] Thumbnail cache size limit: {format_size(CacheManager.size_limit)}", - ) - def __reset_navigation(self) -> None: self.browsing_history = History(BrowsingState.show_all()) @@ -520,7 +503,7 @@ class QtDriver(DriverMixin, QObject): # TODO: Move this to a settings screen. self.main_window.menu_bar.clear_thumb_cache_action.triggered.connect( - lambda: CacheManager.clear_cache(self.lib.library_dir) + lambda: self.cache_manager.clear_cache() ) # endregion @@ -732,6 +715,7 @@ class QtDriver(DriverMixin, QObject): self.__reset_navigation() self.lib.close() + self.cache_manager = None self.thumb_job_queue.queue.clear() if is_shutdown: @@ -1688,6 +1672,16 @@ class QtDriver(DriverMixin, QObject): success=False, library_path=path, message=type(e).__name__, msg_description=str(e) ) + max_size: int = self.cached_values.value( + SettingItems.THUMB_CACHE_SIZE_LIMIT, + defaultValue=CacheManager.DEFAULT_MAX_SIZE, + type=int, + ) # type: ignore + self.cache_manager = CacheManager(path, max_size=max_size) + logger.info( + f"[Config] Thumbnail cache size limit: {format_size(max_size)}", + ) + # Migration is required if open_status.json_migration_req: self.migration_modal = JsonMigrationModal(path) diff --git a/src/tagstudio/qt/view/widgets/preview/preview_thumb_view.py b/src/tagstudio/qt/view/widgets/preview/preview_thumb_view.py index d3615f9a..5a16e0a4 100644 --- a/src/tagstudio/qt/view/widgets/preview/preview_thumb_view.py +++ b/src/tagstudio/qt/view/widgets/preview/preview_thumb_view.py @@ -98,7 +98,7 @@ class PreviewThumbView(QWidget): self.__media_player_page = QWidget() self.__stacked_page_setup(self.__media_player_page, self.__media_player) - self.__thumb_renderer = ThumbRenderer(library, driver) + self.__thumb_renderer = ThumbRenderer(driver, library) self.__thumb_renderer.updated.connect(self.__thumb_renderer_updated_callback) self.__thumb_renderer.updated_ratio.connect(self.__thumb_renderer_updated_ratio_callback) diff --git a/src/tagstudio/qt/widgets/item_thumb.py b/src/tagstudio/qt/widgets/item_thumb.py index f011d726..77fc9eab 100644 --- a/src/tagstudio/qt/widgets/item_thumb.py +++ b/src/tagstudio/qt/widgets/item_thumb.py @@ -201,7 +201,7 @@ class ItemThumb(FlowWidget): self.thumb_layout.addWidget(self.bottom_container) self.thumb_button = ThumbButton(self.thumb_container, thumb_size) - self.renderer = ThumbRenderer(self.lib, self.driver) + self.renderer = ThumbRenderer(driver, self.lib) self.renderer.updated.connect( lambda timestamp, image, size, filename: ( self.update_thumb(image, timestamp), diff --git a/src/tagstudio/qt/widgets/thumb_renderer.py b/src/tagstudio/qt/widgets/thumb_renderer.py index 3d06cd47..6823bfc8 100644 --- a/src/tagstudio/qt/widgets/thumb_renderer.py +++ b/src/tagstudio/qt/widgets/thumb_renderer.py @@ -52,15 +52,12 @@ from PySide6.QtSvg import QSvgRenderer from tagstudio.core.constants import ( FONT_SAMPLE_SIZES, FONT_SAMPLE_TEXT, - THUMB_CACHE_NAME, - TS_FOLDER_NAME, ) from tagstudio.core.exceptions import NoRendererError from tagstudio.core.library.ignore import Ignore from tagstudio.core.media_types import MediaCategories, MediaType from tagstudio.core.palette import UI_COLORS, ColorType, UiColor, get_ui_color from tagstudio.core.utils.encoding import detect_char_encoding -from tagstudio.qt.cache_manager import CacheManager from tagstudio.qt.helpers.blender_thumbnailer import blend_thumb from tagstudio.qt.helpers.color_overlay import theme_fg_overlay from tagstudio.qt.helpers.file_tester import is_readable_video @@ -73,6 +70,7 @@ from tagstudio.qt.helpers.vendored.pydub.audio_segment import ( from tagstudio.qt.resource_manager import ResourceManager if TYPE_CHECKING: + from tagstudio.core.library.alchemy.library import Library from tagstudio.qt.ts_qt import QtDriver ImageFile.LOAD_TRUNCATED_IMAGES = True @@ -93,21 +91,17 @@ class ThumbRenderer(QObject): """A class for rendering image and file thumbnails.""" rm: ResourceManager = ResourceManager() - cache: CacheManager = CacheManager() updated = Signal(float, QPixmap, QSize, Path) updated_ratio = Signal(float) cached_img_res: int = 256 # TODO: Pull this from config cached_img_ext: str = ".webp" # TODO: Pull this from config - last_cache_folder: Path | None = None - - def __init__(self, library, driver: "QtDriver") -> None: + def __init__(self, driver: "QtDriver", library: "Library") -> None: """Initialize the class.""" super().__init__() - self.lib = library self.driver = driver - ThumbRenderer.cache.set_library(self.lib) + self.lib = library # Cached thumbnail elements. # Key: Size + Pixel Ratio Tuple + Radius Scale @@ -739,7 +733,7 @@ class ThumbRenderer(QObject): if QGuiApplication.styleHints().colorScheme() is Qt.ColorScheme.Dark else "#FFFFFF" ) - im: Image.Image = None + im: Image.Image | None = None try: blend_image = blend_thumb(str(filepath)) @@ -782,14 +776,14 @@ class ThumbRenderer(QObject): return im @staticmethod - def _open_doc_thumb(filepath: Path) -> Image.Image: + def _open_doc_thumb(filepath: Path) -> Image.Image | None: """Extract and render a thumbnail for an OpenDocument file. Args: filepath (Path): The path of the file. """ file_path_within_zip = "Thumbnails/thumbnail.png" - im: Image.Image = None + im: Image.Image | None = None with zipfile.ZipFile(filepath, "r") as zip_file: # Check if the file exists in the zip if file_path_within_zip in zip_file.namelist(): @@ -805,14 +799,14 @@ class ThumbRenderer(QObject): return im @staticmethod - def _krita_thumb(filepath: Path) -> Image.Image: + def _krita_thumb(filepath: Path) -> Image.Image | None: """Extract and render a thumbnail for an Krita file. Args: filepath (Path): The path of the file. """ file_path_within_zip = "preview.png" - im: Image.Image = None + im: Image.Image | None = None with zipfile.ZipFile(filepath, "r") as zip_file: # Check if the file exists in the zip if file_path_within_zip in zip_file.namelist(): @@ -1373,40 +1367,21 @@ class ThumbRenderer(QObject): return im_ - def fetch_cached_image(folder: Path): + def fetch_cached_image(file_name: Path): image: Image.Image | None = None - cached_path: Path | None = None + cached_path = self.driver.cache_manager.get_file_path(file_name) - if hash_value and self.lib.library_dir: - cached_path = Path( - self.lib.library_dir - / TS_FOLDER_NAME - / THUMB_CACHE_NAME - / folder - / f"{hash_value}{ThumbRenderer.cached_img_ext}" - ) - if cached_path and cached_path.exists() and not cached_path.is_dir(): + if cached_path and cached_path.is_file(): try: image = Image.open(cached_path) if not image: raise UnidentifiedImageError # pyright: ignore[reportUnreachable] - ThumbRenderer.last_cache_folder = folder except Exception as e: logger.error( "[ThumbRenderer] Couldn't open cached thumbnail!", path=cached_path, error=e, ) - # If the cached thumbnail failed, try rendering a new one - image = self._render( - timestamp, - filepath, - (ThumbRenderer.cached_img_res, ThumbRenderer.cached_img_res), - 1, - is_grid_thumb, - save_to_file=cached_path, - ) - return image image: Image.Image | None = None @@ -1418,29 +1393,8 @@ class ThumbRenderer(QObject): mod_time = str(filepath.stat().st_mtime_ns) hashable_str: str = f"{str(filepath)}{mod_time}" hash_value = hashlib.shake_128(hashable_str.encode("utf-8")).hexdigest(8) - - # Check the last successful folder first. - if ThumbRenderer.last_cache_folder: - image = fetch_cached_image(ThumbRenderer.last_cache_folder) - - # If there was no last folder or the check failed, check all folders. - if not image: - thumb_folders: list[Path] = [] - try: - for f in (self.lib.library_dir / TS_FOLDER_NAME / THUMB_CACHE_NAME).glob("*"): - if f.is_dir() and f is not ThumbRenderer.last_cache_folder: - thumb_folders.append(f) - except TypeError: - logger.error( - "[ThumbRenderer] Couldn't check thumb cache folder, is the library closed?", - library_dir=self.lib.library_dir, - ) - - for folder in thumb_folders: - image = fetch_cached_image(folder) - if image: - ThumbRenderer.last_cache_folder = folder - break + file_name = Path(f"{hash_value}{ThumbRenderer.cached_img_ext}") + image = fetch_cached_image(file_name) if not image and self.driver.settings.generate_thumbs: # Render from file, return result, and try to save a cached version. @@ -1452,7 +1406,7 @@ class ThumbRenderer(QObject): (ThumbRenderer.cached_img_res, ThumbRenderer.cached_img_res), 1, is_grid_thumb, - save_to_file=Path(f"{hash_value}{ThumbRenderer.cached_img_ext}"), + save_to_file=file_name, ) # If the normal renderer failed, fallback the the defaults @@ -1663,7 +1617,7 @@ class ThumbRenderer(QObject): image = self._resize_image(image, (adj_size, adj_size)) if save_to_file and savable_media_type and image: - ThumbRenderer.cache.save_image(image, save_to_file, mode="RGBA") + self.driver.cache_manager.save_image(image, save_to_file, mode="RGBA") except ( UnidentifiedImageError,