From 4af24c040f858fd2127656557ab01f0239788bc5 Mon Sep 17 00:00:00 2001 From: Grant Gainey Date: Wed, 8 Mar 2023 16:18:56 -0500 Subject: [PATCH] Fixed data-fixup-during-sync deadlock. fixes #2980 [nocoverage] (cherry picked from commit 0907d0ecaa8357a68d359a9e8c9f954a9b4d1b85) --- CHANGES/2980.bugfix | 1 + pulp_rpm/app/tasks/synchronizing.py | 65 ++++++++++++++++++++++++----- 2 files changed, 55 insertions(+), 11 deletions(-) create mode 100644 CHANGES/2980.bugfix diff --git a/CHANGES/2980.bugfix b/CHANGES/2980.bugfix new file mode 100644 index 000000000..9e129f5de --- /dev/null +++ b/CHANGES/2980.bugfix @@ -0,0 +1 @@ +Fixed a deadlock during concurrent syncs of rpm-repos that need data fixups. diff --git a/pulp_rpm/app/tasks/synchronizing.py b/pulp_rpm/app/tasks/synchronizing.py index ba4675cac..f5ebd666f 100644 --- a/pulp_rpm/app/tasks/synchronizing.py +++ b/pulp_rpm/app/tasks/synchronizing.py @@ -1564,6 +1564,14 @@ class RpmQueryExistingContents(Stage): customization is to address any issues where data may not saved properly in the past, e.g. https://github.com/pulp/pulp_rpm/issues/2643. + Unlike the base-pulpcore workflow, this stage needs to change/fix incoming Content, + **and** make sure all existing-content is touch()'d as part of this sync. This forces + the code to care about ordering, to care about changing-fields, and to do the touch() as + the last thing it does. + + Because pulp_rpm needs to do qualitatively-different things here than what the core-version + does/cares about, maintaining compatibility is going to take effort. + Fixes can be added or removed over time as necessary. """ @@ -1584,17 +1592,24 @@ async def run(self): content_q_by_type[model_type] = content_q_by_type[model_type] | unit_q d_content_by_nat_key[d_content.content.natural_key()].append(d_content) - for model_type, content_q in content_q_by_type.items(): - try: - await sync_to_async(model_type.objects.filter(content_q).touch)() - except AttributeError: - raise TypeError( - "Plugins which declare custom ORM managers on their content classes " - "should have those managers inherit from " - "pulpcore.plugin.models.ContentManager." - ) + # For each entry in the batch, determine if we need to "fix up" the data before + # letting it pass down the pipeline. + # + # NOTE that we "remember" entities we've touched, and **which fields** we've touched, + # in order to persist **just the changes we make** below. This helps us avoid some + # nasty deadlock-windows between save() and touch() in this stage. + # + # IF/WHEN/AS YOU ADD TO THIS CODE - make sure you update modified_results_by_pk and + # modified_fields_by_pk, or your changes WILL NOT BE PERSISTED. + + # holds the to-be-saved results by-pk + modified_results_by_pk = {} + # per to-be-saved pk, holds a list of field-names of the things we've changed' + modified_fields_by_pk = defaultdict(list) + query_types = content_q_by_type.keys() + for model_type in query_types: async for result in sync_to_async_iterable( - model_type.objects.filter(content_q).iterator() + model_type.objects.filter(content_q_by_type[model_type]).iterator() ): for d_content in d_content_by_nat_key[result.natural_key()]: # ============ The below lines are added vs. pulpcore ============ @@ -1612,17 +1627,45 @@ async def run(self): if incorrect_changelogs: # Covers a class of issues with changelog data on the CDN result.changelogs = d_content.content.changelogs + modified_fields_by_pk[result.pk].append("changelogs") if incorrect_modular_relation: # Covers #2643 result.is_modular = True + modified_fields_by_pk[result.pk].append("is_modular") duplicated_files = len(result.files) != len(d_content.content.files) if duplicated_files: d_content.content.files = result.files if incorrect_changelogs or incorrect_modular_relation: log.debug("Updated data for package {}".format(result.nevra)) - await sync_to_async(result.save)() + modified_results_by_pk[result.pk] = result # ================================================================== + + # THIS is the "Important Part" that the pulpcore code is doing d_content.content = result + # Save results in guaranteed-pid-order + # NOTE: we are saving **only** the fields we've changed to avoid a deadlock-collision + # with the touch() call that ends the stage, below. + modified_rslts_pks = sorted(modified_results_by_pk.keys()) + for pk in modified_rslts_pks: + await sync_to_async(modified_results_by_pk[pk].save)( + update_fields=modified_fields_by_pk[pk] + ) + + # touch in order to mark "last seen time" for the batch + # Also part of the pulpcore class - but it happens in a different order here, + # to avoid the ordering/deadlock problems introduced by the individual save() + for query_type in query_types: + try: + await sync_to_async( + query_type.objects.filter(content_q_by_type[query_type]).touch + )() + except AttributeError: + raise TypeError( + "Plugins which declare custom ORM managers on their content classes " + "should have those managers inherit from " + "pulpcore.plugin.models.ContentManager." + ) + for d_content in batch: await self.put(d_content)