refactor: remove TagBoxField and TagField (NOT WORKING)

This commit is contained in:
python357-1
2024-12-01 22:25:30 -06:00
committed by Travis Abendshien
parent 29c0dfdb2d
commit 260a4cf12e
7 changed files with 246 additions and 270 deletions

View File

@@ -11,7 +11,7 @@ from .db import Base
from .enums import FieldTypeEnum
if TYPE_CHECKING:
from .models import Entry, Tag, ValueType
from .models import Entry, ValueType
class BaseField(Base):
@@ -75,31 +75,32 @@ class TextField(BaseField):
def __eq__(self, value) -> bool:
if isinstance(value, TextField):
return self.__key() == value.__key()
elif isinstance(value, (TagBoxField, DatetimeField)):
elif isinstance(value, DatetimeField):
return False
raise NotImplementedError
class TagBoxField(BaseField):
__tablename__ = "tag_box_fields"
tags: Mapped[set[Tag]] = relationship(secondary="tag_fields")
def __key(self):
return (
self.entry_id,
self.type_key,
)
@property
def value(self) -> None:
"""For interface compatibility with other field types."""
return None
def __eq__(self, value) -> bool:
if isinstance(value, TagBoxField):
return self.__key() == value.__key()
raise NotImplementedError
# TODO: Remove
# class TagBoxField(BaseField):
# __tablename__ = "tag_box_fields"
#
# tags: Mapped[set[Tag]] = relationship(secondary="tag_fields")
#
# def __key(self):
# return (
# self.entry_id,
# self.type_key,
# )
#
# @property
# def value(self) -> None:
# """For interface compatibility with other field types."""
# return None
#
# def __eq__(self, value) -> bool:
# if isinstance(value, TagBoxField):
# return self.__key() == value.__key()
# raise NotImplementedError
class DatetimeField(BaseField):
@@ -133,9 +134,10 @@ class _FieldID(Enum):
URL = DefaultField(id=3, name="URL", type=FieldTypeEnum.TEXT_LINE)
DESCRIPTION = DefaultField(id=4, name="Description", type=FieldTypeEnum.TEXT_LINE)
NOTES = DefaultField(id=5, name="Notes", type=FieldTypeEnum.TEXT_BOX)
TAGS = DefaultField(id=6, name="Tags", type=FieldTypeEnum.TAGS)
TAGS_CONTENT = DefaultField(id=7, name="Content Tags", type=FieldTypeEnum.TAGS, is_default=True)
TAGS_META = DefaultField(id=8, name="Meta Tags", type=FieldTypeEnum.TAGS, is_default=True)
# TODO: Remove (i think)
# TAGS = DefaultField(id=6, name="Tags", type=FieldTypeEnum.TAGS)
# TAGS_CONTENT = DefaultField(id=7, name="Content Tags", type=FieldTypeEnum.TAGS, is_default=True)
# TAGS_META = DefaultField(id=8, name="Meta Tags", type=FieldTypeEnum.TAGS, is_default=True)
COLLATION = DefaultField(id=9, name="Collation", type=FieldTypeEnum.TEXT_LINE)
DATE = DefaultField(id=10, name="Date", type=FieldTypeEnum.DATETIME)
DATE_CREATED = DefaultField(id=11, name="Date Created", type=FieldTypeEnum.DATETIME)

View File

@@ -11,8 +11,16 @@ class TagSubtag(Base):
child_id: Mapped[int] = mapped_column(ForeignKey("tags.id"), primary_key=True)
class TagField(Base):
__tablename__ = "tag_fields"
# TODO: Remove
# class TagField(Base):
# __tablename__ = "tag_fields"
#
# field_id: Mapped[int] = mapped_column(ForeignKey("tag_box_fields.id"), primary_key=True)
# tag_id: Mapped[int] = mapped_column(ForeignKey("tags.id"), primary_key=True)
class TagEntry(Base):
__tablename__ = "tag_entries"
field_id: Mapped[int] = mapped_column(ForeignKey("tag_box_fields.id"), primary_key=True)
tag_id: Mapped[int] = mapped_column(ForeignKey("tags.id"), primary_key=True)
entry_id: Mapped[int] = mapped_column(ForeignKey("entries.id"), primary_key=True)

View File

@@ -46,11 +46,10 @@ from .enums import FieldTypeEnum, FilterState, TagColor
from .fields import (
BaseField,
DatetimeField,
TagBoxField,
TextField,
_FieldID,
)
from .joins import TagField, TagSubtag
from .joins import TagSubtag
from .models import Entry, Folder, Preferences, Tag, TagAlias, ValueType
from .visitors import SQLBoolExpressionBuilder
@@ -356,42 +355,43 @@ class Library:
session.delete(item)
session.commit()
def remove_field_tag(self, entry: Entry, tag_id: int, field_key: str) -> bool:
assert isinstance(field_key, str), f"field_key is {type(field_key)}"
with Session(self.engine) as session:
# find field matching entry and field_type
field = session.scalars(
select(TagBoxField).where(
and_(
TagBoxField.entry_id == entry.id,
TagBoxField.type_key == field_key,
)
)
).first()
if not field:
logger.error("no field found", entry=entry, field=field)
return False
try:
# find the record in `TagField` table and delete it
tag_field = session.scalars(
select(TagField).where(
and_(
TagField.tag_id == tag_id,
TagField.field_id == field.id,
)
)
).first()
if tag_field:
session.delete(tag_field)
session.commit()
return True
except IntegrityError as e:
logger.exception(e)
session.rollback()
return False
# TODO: Remove (i think)
# def remove_field_tag(self, entry: Entry, tag_id: int, field_key: str) -> bool:
# assert isinstance(field_key, str), f"field_key is {type(field_key)}"
# with Session(self.engine) as session:
# # find field matching entry and field_type
# field = session.scalars(
# select(TagBoxField).where(
# and_(
# TagBoxField.entry_id == entry.id,
# TagBoxField.type_key == field_key,
# )
# )
# ).first()
#
# if not field:
# logger.error("no field found", entry=entry, field=field)
# return False
#
# try:
# # find the record in `TagField` table and delete it
# tag_field = session.scalars(
# select(TagField).where(
# and_(
# TagField.tag_id == tag_id,
# TagField.field_id == field.id,
# )
# )
# ).first()
# if tag_field:
# session.delete(tag_field)
# session.commit()
#
# return True
# except IntegrityError as e:
# logger.exception(e)
# session.rollback()
# return False
def get_entry(self, entry_id: int) -> Entry | None:
"""Load entry without joins."""
@@ -438,14 +438,13 @@ class Library:
if with_joins:
# load Entry with all joins and all tags
stmt = (
stmt.outerjoin(Entry.text_fields)
.outerjoin(Entry.datetime_fields)
.outerjoin(Entry.tag_box_fields)
stmt.outerjoin(Entry.text_fields).outerjoin(Entry.datetime_fields)
# .outerjoin(Entry.tag_box_fields)
)
stmt = stmt.options(
contains_eager(Entry.text_fields),
contains_eager(Entry.datetime_fields),
contains_eager(Entry.tag_box_fields).selectinload(TagBoxField.tags),
# contains_eager(Entry.tag_box_fields).selectinload(TagBoxField.tags),
)
stmt = stmt.distinct()
@@ -567,7 +566,7 @@ class Library:
selectinload(Entry.text_fields),
selectinload(Entry.datetime_fields),
selectinload(Entry.tag_box_fields)
.joinedload(TagBoxField.tags)
# .joinedload(TagBoxField.tags)
.options(selectinload(Tag.aliases), selectinload(Tag.subtags)),
)
@@ -697,16 +696,6 @@ class Library:
return None
def remove_tag_from_field(self, tag: Tag, field: TagBoxField) -> None:
with Session(self.engine) as session:
field_ = session.scalars(select(TagBoxField).where(TagBoxField.id == field.id)).one()
tag = session.scalars(select(Tag).where(Tag.id == tag.id)).one()
field_.tags.remove(tag)
session.add(field_)
session.commit()
def update_field_position(
self,
field_class: type[BaseField],
@@ -836,24 +825,24 @@ class Library:
field_id = field_id.name
field = self.get_value_type(field_id)
field_model: TextField | DatetimeField | TagBoxField
field_model: TextField | DatetimeField # | TagBoxField
if field.type in (FieldTypeEnum.TEXT_LINE, FieldTypeEnum.TEXT_BOX):
field_model = TextField(
type_key=field.key,
value=value or "",
)
elif field.type == FieldTypeEnum.TAGS:
field_model = TagBoxField(
type_key=field.key,
)
if value:
assert isinstance(value, list)
with Session(self.engine) as session:
for tag_id in list(set(value)):
tag = session.scalar(select(Tag).where(Tag.id == tag_id))
field_model.tags.add(tag)
session.flush()
# elif field.type == FieldTypeEnum.TAGS:
# field_model = TagBoxField(
# type_key=field.key,
# )
#
# if value:
# assert isinstance(value, list)
# with Session(self.engine) as session:
# for tag_id in list(set(value)):
# tag = session.scalar(select(Tag).where(Tag.id == tag_id))
# field_model.tags.add(tag)
# session.flush()
elif field.type == FieldTypeEnum.DATETIME:
field_model = DatetimeField(
@@ -934,60 +923,61 @@ class Library:
session.rollback()
return None
def add_field_tag(
self,
entry: Entry,
tag: Tag,
field_key: str = _FieldID.TAGS.name,
create_field: bool = False,
) -> bool:
assert isinstance(field_key, str), f"field_key is {type(field_key)}"
with Session(self.engine) as session:
# find field matching entry and field_type
field = session.scalars(
select(TagBoxField).where(
and_(
TagBoxField.entry_id == entry.id,
TagBoxField.type_key == field_key,
)
)
).first()
if not field and not create_field:
logger.error("no field found", entry=entry, field_key=field_key)
return False
try:
if not field:
field = TagBoxField(
type_key=field_key,
entry_id=entry.id,
position=0,
)
session.add(field)
session.flush()
# create record for `TagField` table
if not tag.id:
session.add(tag)
session.flush()
tag_field = TagField(
tag_id=tag.id,
field_id=field.id,
)
session.add(tag_field)
session.commit()
logger.info("tag added to field", tag=tag, field=field, entry_id=entry.id)
return True
except IntegrityError as e:
logger.exception(e)
session.rollback()
return False
# TODO: Delete
# def add_field_tag(
# self,
# entry: Entry,
# tag: Tag,
# field_key: str = _FieldID.TAGS.name,
# create_field: bool = False,
# ) -> bool:
# assert isinstance(field_key, str), f"field_key is {type(field_key)}"
#
# with Session(self.engine) as session:
# # find field matching entry and field_type
# field = session.scalars(
# select(TagBoxField).where(
# and_(
# TagBoxField.entry_id == entry.id,
# TagBoxField.type_key == field_key,
# )
# )
# ).first()
#
# if not field and not create_field:
# logger.error("no field found", entry=entry, field_key=field_key)
# return False
#
# try:
# if not field:
# field = TagBoxField(
# type_key=field_key,
# entry_id=entry.id,
# position=0,
# )
# session.add(field)
# session.flush()
#
# # create record for `TagField` table
# if not tag.id:
# session.add(tag)
# session.flush()
#
# tag_field = TagField(
# tag_id=tag.id,
# field_id=field.id,
# )
#
# session.add(tag_field)
# session.commit()
# logger.info("tag added to field", tag=tag, field=field, entry_id=entry.id)
#
# return True
# except IntegrityError as e:
# logger.exception(e)
# session.rollback()
#
# return False
def save_library_backup_to_disk(self) -> Path:
assert isinstance(self.library_dir, Path)

View File

@@ -11,9 +11,7 @@ from .fields import (
BooleanField,
DatetimeField,
FieldTypeEnum,
TagBoxField,
TextField,
_FieldID,
)
from .joins import TagSubtag
@@ -125,6 +123,8 @@ class Entry(Base):
path: Mapped[Path] = mapped_column(PathType, unique=True)
suffix: Mapped[str] = mapped_column()
tags: Mapped[set[Tag]] = relationship(secondary="tag_entries")
text_fields: Mapped[list[TextField]] = relationship(
back_populates="entry",
cascade="all, delete",
@@ -133,43 +133,27 @@ class Entry(Base):
back_populates="entry",
cascade="all, delete",
)
tag_box_fields: Mapped[list[TagBoxField]] = relationship(
back_populates="entry",
cascade="all, delete",
)
@property
def fields(self) -> list[BaseField]:
fields: list[BaseField] = []
fields.extend(self.tag_box_fields)
fields.extend(self.text_fields)
fields.extend(self.datetime_fields)
fields = sorted(fields, key=lambda field: field.type.position)
return fields
@property
def tags(self) -> set[Tag]:
tag_set: set[Tag] = set()
for tag_box_field in self.tag_box_fields:
tag_set.update(tag_box_field.tags)
return tag_set
@property
def is_favorited(self) -> bool:
for tag_box_field in self.tag_box_fields:
if tag_box_field.type_key == _FieldID.TAGS_META.name:
for tag in tag_box_field.tags:
if tag.id == TAG_FAVORITE:
return True
for tag in self.tags:
if tag.id == TAG_FAVORITE:
return True
return False
@property
def is_archived(self) -> bool:
for tag_box_field in self.tag_box_fields:
if tag_box_field.type_key == _FieldID.TAGS_META.name:
for tag in tag_box_field.tags:
if tag.id == TAG_ARCHIVED:
return True
for tag in self.tags:
if tag.id == TAG_ARCHIVED:
return True
return False
def __init__(
@@ -189,27 +173,15 @@ class Entry(Base):
self.text_fields.append(field)
elif isinstance(field, DatetimeField):
self.datetime_fields.append(field)
elif isinstance(field, TagBoxField):
self.tag_box_fields.append(field)
else:
raise ValueError(f"Invalid field type: {field}")
def has_tag(self, tag: Tag) -> bool:
return tag in self.tags
def remove_tag(self, tag: Tag, field: TagBoxField | None = None) -> None:
"""Removes a Tag from the Entry.
If given a field index, the given Tag will
only be removed from that index. If left blank, all instances of that
Tag will be removed from the Entry.
"""
if field:
field.tags.remove(tag)
return
for tag_box_field in self.tag_box_fields:
tag_box_field.tags.remove(tag)
def remove_tag(self, tag: Tag) -> None:
"""Removes a Tag from the Entry."""
self.tags.remove(tag)
class ValueType(Base):
@@ -237,7 +209,6 @@ class ValueType(Base):
datetime_fields: Mapped[list[DatetimeField]] = relationship(
"DatetimeField", back_populates="type"
)
tag_box_fields: Mapped[list[TagBoxField]] = relationship("TagBoxField", back_populates="type")
boolean_fields: Mapped[list[BooleanField]] = relationship("BooleanField", back_populates="type")
@property
@@ -245,7 +216,7 @@ class ValueType(Base):
FieldClass = { # noqa: N806
FieldTypeEnum.TEXT_LINE: TextField,
FieldTypeEnum.TEXT_BOX: TextField,
FieldTypeEnum.TAGS: TagBoxField,
# FieldTypeEnum.TAGS: TagBoxField,
FieldTypeEnum.DATETIME: DatetimeField,
FieldTypeEnum.BOOLEAN: BooleanField,
}

View File

@@ -17,15 +17,19 @@ from PySide6.QtWidgets import (
QVBoxLayout,
QWidget,
)
from sqlalchemy import and_, select
from sqlalchemy import select
from sqlalchemy.orm import Session
from src.core.constants import TS_FOLDER_NAME
from src.core.enums import LibraryPrefs
from src.core.library.alchemy.enums import FieldTypeEnum, TagColor
from src.core.library.alchemy.fields import TagBoxField, _FieldID
from src.core.library.alchemy.joins import TagField, TagSubtag
from src.core.library.alchemy.enums import TagColor
# from src.core.library.alchemy.fields import TagBoxField, _FieldID
from src.core.library.alchemy.fields import _FieldID
# from src.core.library.alchemy.joins import TagField, TagSubtag
from src.core.library.alchemy.joins import TagSubtag
from src.core.library.alchemy.library import Library as SqliteLibrary
from src.core.library.alchemy.models import Entry, Tag, TagAlias
from src.core.library.alchemy.models import Entry, TagAlias
from src.core.library.json.library import Library as JsonLibrary # type: ignore
from src.qt.helpers.custom_runnable import CustomRunnable
from src.qt.helpers.function_iterator import FunctionIterator
@@ -501,25 +505,25 @@ class JsonMigrationModal(QObject):
"""Check if all JSON field data matches the new SQL field data."""
def sanitize_field(session, entry: Entry, value, type, type_key):
if type is FieldTypeEnum.TAGS:
tags = list(
session.scalars(
select(Tag.id)
.join(TagField)
.join(TagBoxField)
.where(
and_(
TagBoxField.entry_id == entry.id,
TagBoxField.id == TagField.field_id,
TagBoxField.type_key == type_key,
)
)
)
)
return set(tags) if tags else None
else:
return value if value else None
# if type is FieldTypeEnum.TAGS:
# tags = list(
# session.scalars(
# select(Tag.id)
# .join(TagField)
# .join(TagBoxField)
# .where(
# and_(
# TagBoxField.entry_id == entry.id,
# TagBoxField.id == TagField.field_id,
# TagBoxField.type_key == type_key,
# )
# )
# )
# )
#
# return set(tags) if tags else None
# else:
return value if value else None
def sanitize_json_field(value):
if isinstance(value, list):

View File

@@ -40,7 +40,7 @@ from src.core.library.alchemy.fields import (
BaseField,
DatetimeField,
FieldTypeEnum,
TagBoxField,
# TagBoxField,
TextField,
_FieldID,
)
@@ -56,7 +56,6 @@ from src.qt.translations import Translations
from src.qt.widgets.fields import FieldContainer
from src.qt.widgets.media_player import MediaPlayer
from src.qt.widgets.panel import PanelModal
from src.qt.widgets.tag_box import TagBoxWidget
from src.qt.widgets.text import TextWidget
from src.qt.widgets.text_box_edit import EditTextBox
from src.qt.widgets.text_line_edit import EditTextLine
@@ -877,59 +876,59 @@ class PreviewPanel(QWidget):
else:
container = self.containers[index]
# TODO this is in severe need of refactoring due to exessive code duplication
if isinstance(field, TagBoxField):
container.set_title(field.type.name)
container.set_inline(False)
title = f"{field.type.name} (Tag Box)" # TODO translate
if not is_mixed:
inner_container = container.get_inner_widget()
if isinstance(inner_container, TagBoxWidget):
inner_container.set_field(field)
inner_container.set_tags(list(field.tags))
try:
inner_container.updated.disconnect()
except RuntimeError:
logger.error("Failed to disconnect inner_container.updated")
else:
inner_container = TagBoxWidget(
field,
title,
self.driver,
)
container.set_inner_widget(inner_container)
inner_container.updated.connect(
lambda: (
self.write_container(index, field),
self.update_widgets(),
)
)
# NOTE: Tag Boxes have no Edit Button (But will when you can convert field types)
container.set_remove_callback(
lambda: self.remove_message_box(
prompt=self.remove_field_prompt(field.type.name),
callback=lambda: (
self.remove_field(field),
self.update_selected_entry(self.driver),
# reload entry and its fields
self.update_widgets(),
),
)
)
else:
text = "<i>Mixed Data</i>" # TODO translate
title = f"{field.type.name} (Wacky Tag Box)" # TODO translate
inner_container = TextWidget(title, text)
container.set_inner_widget(inner_container)
self.tags_updated.emit()
# self.dynamic_widgets.append(inner_container)
elif field.type.type == FieldTypeEnum.TEXT_LINE:
# if isinstance(field, TagBoxField):
# container.set_title(field.type.name)
# container.set_inline(False)
# title = f"{field.type.name} (Tag Box)"
#
# if not is_mixed:
# inner_container = container.get_inner_widget()
# if isinstance(inner_container, TagBoxWidget):
# inner_container.set_field(field)
# inner_container.set_tags(list(field.tags))
#
# try:
# inner_container.updated.disconnect()
# except RuntimeError:
# logger.error("Failed to disconnect inner_container.updated")
#
# else:
# inner_container = TagBoxWidget(
# field,
# title,
# self.driver,
# )
#
# container.set_inner_widget(inner_container)
#
# inner_container.updated.connect(
# lambda: (
# self.write_container(index, field),
# self.update_widgets(),
# )
# )
# # NOTE: Tag Boxes have no Edit Button (But will when you can convert field types)
# container.set_remove_callback(
# lambda: self.remove_message_box(
# prompt=self.remove_field_prompt(field.type.name),
# callback=lambda: (
# self.remove_field(field),
# self.update_selected_entry(self.driver),
# # reload entry and its fields
# self.update_widgets(),
# ),
# )
# )
# else:
# text = "<i>Mixed Data</i>"
# title = f"{field.type.name} (Wacky Tag Box)"
# inner_container = TextWidget(title, text)
# container.set_inner_widget(inner_container)
#
# self.tags_updated.emit()
# # self.dynamic_widgets.append(inner_container)
# elif field.type.type == FieldTypeEnum.TEXT_LINE:
if field.type.type == FieldTypeEnum.TEXT_LINE:
container.set_title(field.type.name)
container.set_inline(False)
@@ -1078,7 +1077,8 @@ class PreviewPanel(QWidget):
def update_field(self, field: BaseField, content: str) -> None:
"""Update a field in all selected Entries, given a field object."""
assert isinstance(
field, (TextField, DatetimeField, TagBoxField)
field,
(TextField, DatetimeField), # , TagBoxField)
), f"instance: {type(field)}"
entry_ids = []

View File

@@ -12,7 +12,8 @@ from PySide6.QtWidgets import QPushButton
from src.core.constants import TAG_ARCHIVED, TAG_FAVORITE
from src.core.library import Entry, Tag
from src.core.library.alchemy.enums import FilterState
from src.core.library.alchemy.fields import TagBoxField
# from src.core.library.alchemy.fields import TagBoxField
from src.qt.flowlayout import FlowLayout
from src.qt.modals.build_tag import BuildTagPanel
from src.qt.modals.tag_search import TagSearchPanel