Skip to content

Commit

Permalink
Fix assertion error when an entry is deleted while being updated. #335
Browse files Browse the repository at this point in the history
  • Loading branch information
lemon24 committed Jun 15, 2024
1 parent a362623 commit fab96b0
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 32 deletions.
3 changes: 3 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ Unreleased
The fix restores the pre-3.12 first-entry-wins / last-write-wins behavior.
Thanks to `Joakim Hellsén`_ for reporting and helping debug this issue.
(:issue:`335`)

* Fix assertion error when an entry is deleted while being updated.

* Make it possible to re-run the :mod:`~reader.plugins.mark_as_read` plugin
for existing entries.
Thanks to `Michael Han`_ for the pull request.
Expand Down
36 changes: 18 additions & 18 deletions src/reader/_storage/_entries.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,18 +166,22 @@ def _get_entries_for_update_page(
# See e39b0134cb3a2fe2bb346d42355a764181926a82 for a single query version.

def row_factory(_: sqlite3.Cursor, row: sqlite3.Row) -> EntryForUpdate:
updated, published, data_hash, data_hash_changed = row
fu, fu_epoch, recent_sort, updated, data_hash, data_hash_changed = row
return EntryForUpdate(
convert_timestamp(fu),
convert_timestamp(fu_epoch),
convert_timestamp(recent_sort),
convert_timestamp(updated) if updated else None,
convert_timestamp(published) if published else None,
data_hash,
data_hash_changed,
)

query = """
SELECT
first_updated,
first_updated_epoch,
recent_sort,
updated,
published,
data_hash,
data_hash_changed
FROM entries
Expand Down Expand Up @@ -209,26 +213,26 @@ def _add_or_update_entries(self, intents: Iterable[EntryUpdateIntent]) -> None:
with self.get_db() as db:
try:
for intent in intents:
# we cannot rely on the EntryForUpdate the updater got,
# the entry may have been added by different update since then
# we cannot rely on the updater getting an EntryForUpdate
# to tell if the entry is new at *this* point in time,
# the entry may have been added/deleted by a parallel update
#
# as a consequence, EntryUpdateIntent must set all fields,
# including the ones which have a single value
# for the entire lifetime of the entry (like first_updated)

new = not list(
db.execute(
"SELECT 1 FROM entries WHERE (feed, id) = (?, ?)",
intent.entry.resource_id,
)
)
# if the entry is new, it must have been new for the updater too;
# there's still a race condition when the entry was not new
# to the updater, but was deleted before we got here;
# this can be fixed by not making first_updated{_epoch} modal
# (return on EntryForUpdate, make required on intent)
# TODO: round-trip first_updated{_epoch} after #332 is merged

if new:
assert intent.first_updated is not None, intent
assert intent.first_updated_epoch is not None, intent
self._insert_entry(db, intent)
else:
self._update_entry(db, intent)

except sqlite3.IntegrityError as e:
e_msg = str(e).lower()
if "foreign key constraint failed" in e_msg:
Expand Down Expand Up @@ -295,11 +299,7 @@ def _update_entry(self, db: sqlite3.Connection, intent: EntryUpdateIntent) -> No
enclosures = :enclosures,
last_updated = :last_updated,
feed_order = :feed_order,
recent_sort = coalesce(:recent_sort, (
SELECT recent_sort
FROM entries
WHERE id = :id AND feed = :feed_url
)),
recent_sort = :recent_sort,
original_feed = NULL,
data_hash = :data_hash,
data_hash_changed = :data_hash_changed,
Expand Down
26 changes: 16 additions & 10 deletions src/reader/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,12 +292,18 @@ class FeedForUpdate(NamedTuple):
class EntryForUpdate(NamedTuple):
"""Update-relevant information about an existing entry, from Storage."""

#: From the last :attr:`EntryUpdateIntent.first_updated`.
first_updated: datetime

#: From the last :attr:`EntryUpdateIntent.first_updated_epoch`.
first_updated_epoch: datetime

#: From the last :attr:`EntryUpdateIntent.recent_sort`.
recent_sort: datetime

#: The date the entry was last updated, according to the entry.
updated: datetime | None

#: The date the entry was published, according to the entry.
published: datetime | None

#: The :attr:`~EntryData.hash` of the corresponding EntryData.
hash: bytes | None

Expand Down Expand Up @@ -364,18 +370,18 @@ class EntryUpdateIntent(NamedTuple):
last_updated: datetime

#: First :attr:`last_updated` (sets :attr:`.Entry.added`).
#: :const:`None` if the entry already exists.
first_updated: datetime | None
#: The value from :class:`EntryForUpdate` if the entry already exists.
first_updated: datetime

#: The time at the start of updating this batch of feeds
#: (start of :meth:`~.Reader.update_feed` in :meth:`~.Reader.update_feed`,
#: start of :meth:`~.Reader.update_feeds` in :meth:`~.Reader.update_feeds`).
#: :const:`None` if the entry already exists.
first_updated_epoch: datetime | None
#: The value from :class:`EntryForUpdate` if the entry already exists.
first_updated_epoch: datetime

#: Sort key for the :meth:`~.Reader.get_entries` ``recent`` sort order.
#: If :const:`None`, keep the previous value.
recent_sort: datetime | None
#: The value from :class:`EntryForUpdate` if the entry already exists.
recent_sort: datetime

#: The index of the entry in the feed (zero-based).
feed_order: int = 0
Expand All @@ -389,7 +395,7 @@ class EntryUpdateIntent(NamedTuple):
@property
def new(self) -> bool:
"""Whether the entry is new or not."""
return self.first_updated_epoch is not None
return self.first_updated == self.last_updated


#: Like the ``tags`` argument of :meth:`.Reader.get_feeds`, except:
Expand Down
6 changes: 3 additions & 3 deletions src/reader/_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,13 @@ def get_entries_to_update(self, pairs: EntryPairs) -> Iterable[EntryUpdateIntent
else:
recent_sort = self.global_now
else:
recent_sort = None
recent_sort = old.recent_sort

yield EntryUpdateIntent(
new,
self.now,
self.now if not old else None,
self.global_now if not old else None,
self.now if not old else old.first_updated,
self.global_now if not old else old.first_updated_epoch,
recent_sort,
feed_order,
should_update.hash_changed,
Expand Down
69 changes: 68 additions & 1 deletion tests/test_reader_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def test_concurrent_update(monkeypatch, db_path, make_reader, new):
reader.add_feed(parser.feed(1))
if not new:
entry = parser.entry(1, 1, title='zero')
reader._now = lambda: datetime(2010, 1, 1)
reader.update_feeds()
entry = parser.entry(1, 1, title='one')

Expand All @@ -71,6 +72,7 @@ def target():
reader = make_reader(db_path)
reader._parser = Parser.from_parser(parser)
reader._parser.entry(1, 1, title='two')
reader._now = lambda: datetime(2010, 1, 1, 1)
reader.update_feeds()
can_return_from_make_intents.set()

Expand All @@ -89,8 +91,73 @@ def update_feed(*args, **kwargs):
t = threading.Thread(target=target)
t.start()
try:
reader._now = lambda: datetime(2010, 1, 1, 2)
reader.update_feeds()
finally:
t.join()

assert reader.get_entry(entry).title == 'one'
entry = reader.get_entry(entry)
assert entry.title == 'one'
assert entry.added == (datetime(2010, 1, 1, 1) if new else datetime(2010, 1, 1))
assert entry.last_updated == datetime(2010, 1, 1, 2)


@pytest.mark.slow
def test_entry_deleted_during_update(monkeypatch, db_path, make_reader):
"""If an entry is deleted while being updated, the update should not fail.
Additionally, first_updated/added, first_updated_epoch, and recent_sort
should be preserved from the old (just deleted) entry.
This is arbitrary, but should be consistent.
"""
reader = make_reader(db_path)
reader._parser = parser = Parser()

reader.add_feed(parser.feed(1))
entry = parser.entry(1, 1, title='zero')
reader._now = lambda: datetime(2010, 1, 1)
reader.update_feeds()

entry = parser.entry(1, 1, title='one')

in_make_intents = threading.Event()
can_return_from_make_intents = threading.Event()

def target():
in_make_intents.wait()
reader = make_reader(db_path)
reader._storage.delete_entries([entry.resource_id])
can_return_from_make_intents.set()

from reader._update import Pipeline

def update_feed(*args, **kwargs):
monkeypatch.undo()
in_make_intents.set()
can_return_from_make_intents.wait()
return Pipeline.update_feed(*args, **kwargs)

# TODO: this would have been easier if Pipeline were a reader attribute
monkeypatch.setattr(Pipeline, 'update_feed', update_feed)

before_entry = reader.get_entry(entry)
(before_efu,) = reader._storage.get_entries_for_update([entry.resource_id])

t = threading.Thread(target=target)
t.start()
try:
reader._now = lambda: datetime(2010, 1, 1, 2)
reader.update_feeds()
finally:
t.join()

after_entry = reader.get_entry(entry)
assert after_entry.title == 'one'
assert after_entry.added == before_entry.added
assert after_entry.last_updated == datetime(2010, 1, 1, 2)

(after_efu,) = reader._storage.get_entries_for_update([entry.resource_id])
assert before_efu.first_updated == after_efu.first_updated
assert before_efu.first_updated_epoch == after_efu.first_updated_epoch
assert before_efu.recent_sort == after_efu.recent_sort

0 comments on commit fab96b0

Please sign in to comment.