Skip to content

Commit

Permalink
Fixed data-fixup-during-sync deadlock.
Browse files Browse the repository at this point in the history
fixes #2980
[nocoverage]

(cherry picked from commit 0907d0e)
  • Loading branch information
ggainey authored and dralley committed Jun 13, 2023
1 parent 83cd0a6 commit 4af24c0
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGES/2980.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed a deadlock during concurrent syncs of rpm-repos that need data fixups.
65 changes: 54 additions & 11 deletions pulp_rpm/app/tasks/synchronizing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1564,6 +1564,14 @@ class RpmQueryExistingContents(Stage):
customization is to address any issues where data may not saved properly in the past,
e.g. https://github.com/pulp/pulp_rpm/issues/2643.
Unlike the base-pulpcore workflow, this stage needs to change/fix incoming Content,
**and** make sure all existing-content is touch()'d as part of this sync. This forces
the code to care about ordering, to care about changing-fields, and to do the touch() as
the last thing it does.
Because pulp_rpm needs to do qualitatively-different things here than what the core-version
does/cares about, maintaining compatibility is going to take effort.
Fixes can be added or removed over time as necessary.
"""

Expand All @@ -1584,17 +1592,24 @@ async def run(self):
content_q_by_type[model_type] = content_q_by_type[model_type] | unit_q
d_content_by_nat_key[d_content.content.natural_key()].append(d_content)

for model_type, content_q in content_q_by_type.items():
try:
await sync_to_async(model_type.objects.filter(content_q).touch)()
except AttributeError:
raise TypeError(
"Plugins which declare custom ORM managers on their content classes "
"should have those managers inherit from "
"pulpcore.plugin.models.ContentManager."
)
# For each entry in the batch, determine if we need to "fix up" the data before
# letting it pass down the pipeline.
#
# NOTE that we "remember" entities we've touched, and **which fields** we've touched,
# in order to persist **just the changes we make** below. This helps us avoid some
# nasty deadlock-windows between save() and touch() in this stage.
#
# IF/WHEN/AS YOU ADD TO THIS CODE - make sure you update modified_results_by_pk and
# modified_fields_by_pk, or your changes WILL NOT BE PERSISTED.

# holds the to-be-saved results by-pk
modified_results_by_pk = {}
# per to-be-saved pk, holds a list of field-names of the things we've changed'
modified_fields_by_pk = defaultdict(list)
query_types = content_q_by_type.keys()
for model_type in query_types:
async for result in sync_to_async_iterable(
model_type.objects.filter(content_q).iterator()
model_type.objects.filter(content_q_by_type[model_type]).iterator()
):
for d_content in d_content_by_nat_key[result.natural_key()]:
# ============ The below lines are added vs. pulpcore ============
Expand All @@ -1612,17 +1627,45 @@ async def run(self):
if incorrect_changelogs:
# Covers a class of issues with changelog data on the CDN
result.changelogs = d_content.content.changelogs
modified_fields_by_pk[result.pk].append("changelogs")
if incorrect_modular_relation:
# Covers #2643
result.is_modular = True
modified_fields_by_pk[result.pk].append("is_modular")
duplicated_files = len(result.files) != len(d_content.content.files)
if duplicated_files:
d_content.content.files = result.files
if incorrect_changelogs or incorrect_modular_relation:
log.debug("Updated data for package {}".format(result.nevra))
await sync_to_async(result.save)()
modified_results_by_pk[result.pk] = result
# ==================================================================

# THIS is the "Important Part" that the pulpcore code is doing
d_content.content = result

# Save results in guaranteed-pid-order
# NOTE: we are saving **only** the fields we've changed to avoid a deadlock-collision
# with the touch() call that ends the stage, below.
modified_rslts_pks = sorted(modified_results_by_pk.keys())
for pk in modified_rslts_pks:
await sync_to_async(modified_results_by_pk[pk].save)(
update_fields=modified_fields_by_pk[pk]
)

# touch in order to mark "last seen time" for the batch
# Also part of the pulpcore class - but it happens in a different order here,
# to avoid the ordering/deadlock problems introduced by the individual save()
for query_type in query_types:
try:
await sync_to_async(
query_type.objects.filter(content_q_by_type[query_type]).touch
)()
except AttributeError:
raise TypeError(
"Plugins which declare custom ORM managers on their content classes "
"should have those managers inherit from "
"pulpcore.plugin.models.ContentManager."
)

for d_content in batch:
await self.put(d_content)

0 comments on commit 4af24c0

Please sign in to comment.