diff --git a/CHANGES.rst b/CHANGES.rst index 9394b0dc..08d4461b 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -18,6 +18,8 @@ Unreleased instead of feed :attr:`~Feed.user_title` or :attr:`~Feed.title`. * Allow filtering entries by the entry source. +* Add :meth:`~Reader.copy_entry`. (:issue:`290`) + * Fix :meth:`~Reader.enable_search` / :meth:`~Reader.update_search` not working when the search database is missing but change tracking is enabled (e.g. when restoring the main database from backup). diff --git a/src/reader/_storage/_entries.py b/src/reader/_storage/_entries.py index 043f3d89..2227c8b8 100644 --- a/src/reader/_storage/_entries.py +++ b/src/reader/_storage/_entries.py @@ -259,6 +259,7 @@ def _insert_entry(self, db: sqlite3.Connection, intent: EntryUpdateIntent) -> No first_updated_epoch, feed_order, recent_sort, + original_feed, data_hash, data_hash_changed, added_by @@ -280,6 +281,7 @@ def _insert_entry(self, db: sqlite3.Connection, intent: EntryUpdateIntent) -> No :first_updated_epoch, :feed_order, :recent_sort, + :original_feed, :data_hash, :data_hash_changed, :added_by @@ -303,7 +305,7 @@ def _update_entry(self, db: sqlite3.Connection, intent: EntryUpdateIntent) -> No last_updated = :last_updated, feed_order = :feed_order, recent_sort = :recent_sort, - original_feed = NULL, + original_feed = :original_feed, data_hash = :data_hash, data_hash_changed = :data_hash_changed, added_by = :added_by @@ -681,6 +683,7 @@ def entry_update_intent_to_dict(intent: EntryUpdateIntent) -> dict[str, Any]: else None ), recent_sort=adapt_datetime(intent.recent_sort) if intent.recent_sort else None, + original_feed=intent.original_feed_url, data_hash=entry.hash, data_hash_changed=context.pop('hash_changed'), ) diff --git a/src/reader/_types.py b/src/reader/_types.py index 379cf28a..f115ee83 100644 --- a/src/reader/_types.py +++ b/src/reader/_types.py @@ -13,6 +13,7 @@ from functools import cached_property from types import SimpleNamespace from typing import Any +from typing import cast from typing import get_args from typing import Literal from typing import NamedTuple @@ -196,6 +197,20 @@ def entry_data_from_obj(obj: object) -> EntryData: ) +def entry_update_intent_from_obj(obj: object) -> EntryUpdateIntent: + if isinstance(obj, Mapping): # pragma: no cover + obj = SimpleNamespace(**obj) + return EntryUpdateIntent( + entry=entry_data_from_obj(obj), + last_updated=_getattr_datetime(obj, 'last_updated'), + first_updated=_getattr_datetime(obj, 'added'), + first_updated_epoch=_getattr_datetime(obj, 'added'), + recent_sort=_getattr_datetime(obj, 'recent_sort'), + added_by=_getattr_entry_added_by(obj, 'added_by'), + original_feed_url=_getattr_optional(obj, 'original_feed_url', str), + ) + + def content_from_obj(obj: object) -> Content: if isinstance(obj, Mapping): obj = SimpleNamespace(**obj) @@ -250,6 +265,11 @@ def _getattr_optional(obj: object, name: str, type: type[_T]) -> _T | None: return value +def _getattr_datetime(obj: object, name: str) -> datetime: + value = _getattr(obj, name, datetime) + return value.astimezone(timezone.utc) + + def _getattr_optional_datetime(obj: object, name: str) -> datetime | None: value = _getattr_optional(obj, name, datetime) if value is None: @@ -257,6 +277,16 @@ def _getattr_optional_datetime(obj: object, name: str) -> datetime | None: return value.astimezone(timezone.utc) +def _getattr_entry_added_by(obj: object, name: str) -> EntryAddedBy: + value = _getattr(obj, name, str) + values = get_args(EntryAddedBy) + if value not in values: # pragma: no cover + raise ValueError( + f"bad value for {name}; expected one of {values!r}, got {value!r}" + ) + return cast(EntryAddedBy, value) + + class FeedForUpdate(NamedTuple): """Update-relevant information about an existing feed, from Storage.""" @@ -376,6 +406,10 @@ class EntryUpdateIntent(NamedTuple): #: Same as :attr:`.Entry.added_by`. added_by: EntryAddedBy = 'feed' + #: Same as :attr:`.Entry.original_feed_url`. + #: Usually does not need to be set. + original_feed_url: str | None = None + # using a proxy like `first_updated == last_updated` instead of new # doesn't work because it can be true for modified entries sometimes # (e.g. repeated updates on platforms with low-precision time, diff --git a/src/reader/_update.py b/src/reader/_update.py index 9bb3ebf6..721ddd78 100644 --- a/src/reader/_update.py +++ b/src/reader/_update.py @@ -206,6 +206,7 @@ def get_entries_to_update(self, pairs: EntryPairs) -> Iterable[EntryUpdateIntent if not old: if not self.old_feed.last_updated: + # WARNING: keep in sync _update and add_entry recent_sort = new.published or new.updated or self.global_now else: recent_sort = self.global_now diff --git a/src/reader/core.py b/src/reader/core.py index 787c229a..a2bd864c 100644 --- a/src/reader/core.py +++ b/src/reader/core.py @@ -24,6 +24,7 @@ from ._storage import Storage from ._types import BoundSearchStorageType from ._types import entry_data_from_obj +from ._types import entry_update_intent_from_obj from ._types import EntryData from ._types import EntryFilter from ._types import EntryUpdateIntent @@ -1564,6 +1565,7 @@ def add_entry(self, entry: Any, /) -> None: last_updated=now, first_updated=now, first_updated_epoch=now, + # WARNING: keep in sync _update and add_entry recent_sort=entry_data.published or entry_data.updated or now, added_by='user', ) @@ -1575,8 +1577,9 @@ def add_entry(self, entry: Any, /) -> None: def delete_entry(self, entry: EntryInput, /, missing_ok: bool = False) -> None: """Delete an entry. - Currently, only entries added by :meth:`~Reader.add_entry` - (:attr:`~Entry.added_by` ``'user'``) can be deleted. + Currently, only entries added by :meth:`add_entry` + and :meth:`copy_entry` (:attr:`~Entry.added_by` ``'user'``) + can be deleted. Args: entry (tuple(str, str) or Entry): (feed URL, entry id) tuple. @@ -1604,6 +1607,54 @@ def delete_entry(self, entry: EntryInput, /, missing_ok: bool = False) -> None: if not missing_ok: raise + def copy_entry(self, src: EntryInput, dst: EntryInput, /) -> None: + """Copy an entry from one feed to another. + + All :class:`Entry` attributes that belong to the entry are copied, + including timestamps like :attr:`~Entry.added`, + and hidden attributes that affect behavior (e.g. sorting). + + If the original does not already have a :attr:`~Entry.source`, + the copy's source will be set to the original's :attr:`~Entry.feed`, + with the feed's :attr:`~Feed.user_title` taking precedence + over :attr:`~Feed.title` as the source title. + + The copy entry will be :attr:`~Entry.added_by` ``'user'``. + + Args: + src (tuple(str, str) or Entry): Source (feed URL, entry id) tuple. + dst (tuple(str, str) or Entry): Destination (feed URL, entry id) tuple. + + Raises: + EntryExistsError: If an entry with the same id as dst already exists. + FeedNotFoundError + StorageError + + .. versionadded:: 3.16 + + """ + src_entry = self.get_entry(src) + recent_sort = self._storage.get_entry_recent_sort(src_entry.resource_id) + dst_resource_id = _entry_argument(dst) + + attrs = dict(src_entry.__dict__) + attrs['feed_url'], attrs['id'] = dst_resource_id + if not src_entry.source: + feed = src_entry.feed + attrs['source'] = dict(feed.__dict__) + if feed.user_title: + attrs['source']['title'] = feed.user_title + attrs['recent_sort'] = recent_sort + attrs['added_by'] = 'user' + + intent = entry_update_intent_from_obj(attrs) + + self._storage.add_entry(intent) + + # TODO: not atomic, maybe add to intent later on + self.set_entry_read(dst, src_entry.read, src_entry.read_modified) + self.set_entry_important(dst, src_entry.important, src_entry.important_modified) + def enable_search(self) -> None: """Enable full-text search. diff --git a/tests/test_reader.py b/tests/test_reader.py index f9d22ce8..0af5077f 100644 --- a/tests/test_reader.py +++ b/tests/test_reader.py @@ -2779,6 +2779,7 @@ def test_add_entry(reader): last_updated=datetime(2010, 1, 2), added=datetime(2010, 1, 2), ) + assert reader._storage.get_entry_recent_sort(('1', '1, 1')) == datetime(2010, 1, 2) # add it by feed (update) @@ -2832,3 +2833,75 @@ def test_delete_entry(reader): assert {(e.id, e.added_by) for e in reader.get_entries()} == { ('1, 2', 'feed'), } + + +@pytest.mark.parametrize('data_file', ['full', 'empty']) +def test_copy_entry(reader, data_dir, data_file): + reader._now = lambda: datetime(2010, 1, 1) + src_url = str(data_dir.joinpath(f'{data_file}.atom')) + src_id = (src_url, 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a') + reader.add_feed(src_url) + reader.update_feeds() + + if data_file == 'full': + reader._now = lambda: datetime(2010, 1, 2) + reader.mark_entry_as_read(src_id) + reader.mark_entry_as_unimportant(src_id) + + reader._now = lambda: datetime(2010, 2, 1) + dst_url = 'dst' + dst_id = (dst_url, 'id') + reader.add_feed(dst_url) + reader.copy_entry(src_id, dst_id) + + def clean(entry): + return entry._replace( + id=None, + source=None, + added_by=None, + original_feed_url=None, + _sequence=None, + feed=None, + ) + + src = reader.get_entry(src_id) + dst = reader.get_entry(dst_id) + + assert clean(src) == clean(dst) + + assert dst.id == 'id' + assert dst.added_by == 'user' + assert dst.original_feed_url == src_url + + if src.source: + src_source = src.source.__dict__ + else: + src_source = {k: getattr(src.feed, k) for k in dst.source.__dict__} + assert src_source == dst.source.__dict__ + + get_recent_sort = reader._storage.get_entry_recent_sort + assert get_recent_sort(src_id) == get_recent_sort(dst_id) + + # src does not exist + with pytest.raises(EntryNotFoundError) as excinfo: + reader.copy_entry((src_url, 'inexistent'), dst_id) + assert excinfo.value.resource_id == (src_url, 'inexistent') + + # dst exists + with pytest.raises(EntryExistsError) as excinfo: + reader.copy_entry(src_id, dst_id) + assert excinfo.value.resource_id == dst_id + + # dst feed does not exist + with pytest.raises(FeedNotFoundError) as excinfo: + reader.copy_entry(src_id, ('inexistent', 'id')) + assert excinfo.value.resource_id == ('inexistent',) + + # delete + reader.delete_entry(dst_id) + + # user title has priority in source if original entry does not have source + reader.set_feed_user_title(src_url, 'user') + reader.copy_entry(src_id, dst_id) + dst = reader.get_entry(dst_id) + assert dst.source.title == 'user' if not src.source else src.source.title