Skip to content

Commit

Permalink
Fix duplicate results in non-find-first search
Browse files Browse the repository at this point in the history
Fix an issue where duplicate results could appear in a non-find-first dataset search, if the same dataset appeared in multiple collections in a chain.

This was occurring because we were forcing the addition of the collection key field to make the rows distinct.  But on a non-find-first search, we don't have the window function to de-duplicate the rows by dataset ID, so we need to keep the collection key out of the rows and treat dataset ID as a unique key  instead.
  • Loading branch information
dhirving committed Feb 6, 2025
1 parent e7f47c9 commit f98563f
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 19 deletions.
22 changes: 20 additions & 2 deletions python/lsst/daf/butler/direct_query_driver/_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,12 +571,15 @@ def build_query(
union_dataset_dimensions=tree.any_dataset.dimensions,
projection_columns=projection_columns,
final_columns=final_columns,
find_first_dataset=find_first_dataset,
)
else:
assert find_first_dataset != qt.AnyDatasetType.ANY_DATASET
builder = SingleSelectQueryBuilder(
tree_analysis=query_tree_analysis,
projection_columns=projection_columns,
final_columns=final_columns,
find_first_dataset=find_first_dataset,
)
# Finish setting up the projection part of the builder.
builder.analyze_projection()
Expand All @@ -590,7 +593,7 @@ def build_query(
builder.joins_analysis.columns.update(builder.projection_columns)
# Set up the find-first part of the builder.
if find_first_dataset is not None:
builder.analyze_find_first(find_first_dataset)
builder.analyze_find_first()
# At this point, analysis is complete, and we can proceed to making
# the select_builder(s) reflect that analysis.
if not analyze_only:
Expand Down Expand Up @@ -1059,6 +1062,7 @@ def apply_query_projection(
# it depends on the kinds of collection(s) we're searching and whether
# it's a find-first query.
for dataset_type, fields_for_dataset in projection_columns.dataset_fields.items():
is_find_first = dataset_type == find_first_dataset
dataset_search: ResolvedDatasetSearch[Any]
if dataset_type is qt.ANY_DATASET:
assert union_datasets is not None
Expand All @@ -1076,7 +1080,7 @@ def apply_query_projection(
derived_fields.append((dataset_type, "collection_key"))
elif dataset_field == "timespan" and dataset_search.is_calibration_search:
# The timespan is also a unique key...
if dataset_type == find_first_dataset:
if is_find_first:
# ...unless we're doing a find-first search on this
# dataset, in which case we need to use ANY_VALUE on
# the timespan and check that _VALIDITY_MATCH_COUNT
Expand All @@ -1095,6 +1099,20 @@ def apply_query_projection(
].apply_any_aggregate(self.db.apply_any_aggregate)
else:
unique_keys.extend(select_builder.joins.timespans[dataset_type].flatten())
elif (
dataset_field == "dataset_id"
and len(dataset_search.collection_records) > 1
and not is_find_first
):
# If we have more than one collection in the search, we can
# find multiple dataset IDs with the same data ID, in
# different collections.
# In a non-find-first search, we have to make dataset ID a
# unique key to prevent de-duplication for rows with the
# same data ID but different dataset IDs.
# We don't do this for a find-first search because the
# window function will take care of it.
unique_keys.append(select_builder.joins.fields[dataset_type]["dataset_id"])
else:
# Other dataset fields derive their uniqueness from key
# fields.
Expand Down
91 changes: 74 additions & 17 deletions python/lsst/daf/butler/direct_query_driver/_query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ class QueryBuilder(ABC):
or DISTINCT may be performed.
final_columns : `.queries.tree.ColumnSet`
Columns to include in the final query.
find_first_dataset : `str` or ``...`` or None
Name of the dataset type that needs a find-first search. ``...``
is used to indicate the dataset types in a union dataset query.
`None` means find-first is not used.
"""

def __init__(
Expand All @@ -86,13 +90,14 @@ def __init__(
*,
projection_columns: qt.ColumnSet,
final_columns: qt.ColumnSet,
find_first_dataset: str | qt.AnyDatasetType | None,
):
self.joins_analysis = tree_analysis.joins
self.postprocessing = tree_analysis.postprocessing
self.projection_columns = projection_columns
self.final_columns = final_columns
self.needs_dimension_distinct = False
self.find_first_dataset = None
self.find_first_dataset = find_first_dataset

joins_analysis: QueryJoinsAnalysis
"""Description of the "joins" stage of query construction."""
Expand Down Expand Up @@ -161,20 +166,14 @@ def analyze_projection(self) -> None:
self.needs_dimension_distinct = True

@abstractmethod
def analyze_find_first(self, find_first_dataset: str | qt.AnyDatasetType) -> None:
def analyze_find_first(self) -> None:
"""Analyze the "find first" stage of query construction, in which a
Common Table Expression with PARTITION ON may be used to find the first
dataset for each data ID and dataset type in an ordered collection
sequence.
This modifies the builder in place, and should be called immediately
after `analyze_projection`.
Parameters
----------
find_first_dataset : `str` or ``...``
Name of the dataset type that needs a find-first search. ``...``
is used to indicate the dataset types in a union dataset query.
"""
raise NotImplementedError()

Expand Down Expand Up @@ -284,6 +283,47 @@ def finish_nested(self, cte: bool = False) -> SqlSelectBuilder:
"""
raise NotImplementedError()

def _needs_collection_key_field(
self, dataset_search: ResolvedDatasetSearch, fields_for_dataset: set[str]
) -> bool:
"""Return `True` if the ``collection_key`` dataset field is needed to
provide uniqueness for rows.
"""
# For a dataset search, we sometimes want just one row for each dataset
# and sometimes we need multiple rows, one for each collection that
# the dataset was found in.
#
# We need multiple rows if any of the following are true:
# - This is a find-first dataset search. The rows will be ranked using
# a window function to determine the first collection containing a
# matching dataset, so we need a row for each collection to feed into
# the window.
# - The user requested dataset fields that differ depending on which
# collection the dataset was found in, so we need a row for each
# collection to get all the possible values for the dataset fields.
#
# To ensure that we keep the necessary rows after DISTINCT or GROUP BY
# is applied, we add a "collection_key" field that is unique for each
# collection.

# If there is only one collection, there will only be one row per
# dataset, so we don't need to disambiguate.
if len(dataset_search.collection_records) > 1:
if (
# We need a row for each collection, which will later
# be filtered down using the window function.
self.find_first_dataset is not None
# We might have multiple calibration collections containing the
# same dataset with the same timespan.
or "timespan" in fields_for_dataset
# The user specifically asked for a row for each collection we
# found the dataset in.
or "collection" in fields_for_dataset
):
return True

return False


class SingleSelectQueryBuilder(QueryBuilder):
"""An implementation of `QueryBuilder` for queries that are structured as
Expand All @@ -304,6 +344,9 @@ class SingleSelectQueryBuilder(QueryBuilder):
or DISTINCT may be performed.
final_columns : `.queries.tree.ColumnSet`
Columns to include in the final query.
find_first_dataset : `str` or None
Name of the dataset type that needs a find-first search.
`None` means find-first is not used.
"""

def __init__(
Expand All @@ -312,11 +355,13 @@ def __init__(
*,
projection_columns: qt.ColumnSet,
final_columns: qt.ColumnSet,
find_first_dataset: str | None,
) -> None:
super().__init__(
tree_analysis=tree_analysis,
projection_columns=projection_columns,
final_columns=final_columns,
find_first_dataset=find_first_dataset,
)
assert not tree_analysis.union_datasets, "UnionQueryPlan should be used instead."
self._select_builder = tree_analysis.initial_select_builder
Expand Down Expand Up @@ -359,13 +404,16 @@ def analyze_projection(self) -> None:
# or GROUP BY columns.
for dataset_type, fields_for_dataset in self.projection_columns.dataset_fields.items():
assert dataset_type is not qt.ANY_DATASET, "Union dataset in non-dataset-union query."
if len(self.joins_analysis.datasets[dataset_type].collection_records) > 1:
if self._needs_collection_key_field(
self.joins_analysis.datasets[dataset_type], fields_for_dataset
):
fields_for_dataset.add("collection_key")

def analyze_find_first(self, find_first_dataset: str | qt.AnyDatasetType) -> None:
def analyze_find_first(self) -> None:
# Docstring inherited.
assert find_first_dataset is not qt.ANY_DATASET, "No dataset union in this query"
self.find_first = QueryFindFirstAnalysis(self.joins_analysis.datasets[find_first_dataset])
assert self.find_first_dataset is not qt.ANY_DATASET, "No dataset union in this query"
assert self.find_first_dataset is not None
self.find_first = QueryFindFirstAnalysis(self.joins_analysis.datasets[self.find_first_dataset])
# If we're doing a find-first search and there's a calibration
# collection in play, we need to make sure the rows coming out of
# the base query have only one timespan for each data ID +
Expand Down Expand Up @@ -502,6 +550,10 @@ class UnionQueryBuilder(QueryBuilder):
Columns to include in the final query.
union_dataset_dimensions : `DimensionGroup`
Dimensions of the dataset types that comprise the union.
find_first_dataset : `str` or ``...`` or None
Name of the dataset type that needs a find-first search. ``...``
is used to indicate the dataset types in a union dataset query.
`None` means find-first is not used.
Notes
-----
Expand All @@ -522,11 +574,13 @@ def __init__(
projection_columns: qt.ColumnSet,
final_columns: qt.ColumnSet,
union_dataset_dimensions: DimensionGroup,
find_first_dataset: str | qt.AnyDatasetType | None,
):
super().__init__(
tree_analysis=tree_analysis,
projection_columns=projection_columns,
final_columns=final_columns,
find_first_dataset=find_first_dataset,
)
self._initial_select_builder: SqlSelectBuilder | None = tree_analysis.initial_select_builder
self.union_dataset_dimensions = union_dataset_dimensions
Expand Down Expand Up @@ -580,15 +634,18 @@ def analyze_projection(self) -> None:
# If there is more than one collection for one union term,
# we need to add collection_key to all of them to keep the
# SELECT columns uniform.
if len(union_term.datasets.collection_records) > 1:
if self._needs_collection_key_field(union_term.datasets, fields_for_dataset):
fields_for_dataset.add("collection_key")
break
elif len(self.joins_analysis.datasets[dataset_type].collection_records) > 1:
elif self._needs_collection_key_field(
self.joins_analysis.datasets[dataset_type], fields_for_dataset
):
fields_for_dataset.add("collection_key")

def analyze_find_first(self, find_first_dataset: str | qt.AnyDatasetType) -> None:
def analyze_find_first(self) -> None:
# Docstring inherited.
if find_first_dataset is qt.ANY_DATASET:
assert self.find_first_dataset is not None
if self.find_first_dataset is qt.ANY_DATASET:

Check warning on line 648 in python/lsst/daf/butler/direct_query_driver/_query_builder.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/direct_query_driver/_query_builder.py#L647-L648

Added lines #L647 - L648 were not covered by tests
for union_term in self.union_terms:
union_term.find_first = QueryFindFirstAnalysis(union_term.datasets)
# If we're doing a find-first search and there's a calibration
Expand All @@ -609,7 +666,7 @@ def analyze_find_first(self, find_first_dataset: str | qt.AnyDatasetType) -> Non
# like it'd be useful, so it's better not to have to maintain that
# logic branch.
raise NotImplementedError(
f"Additional dataset search {find_first_dataset!r} can only be joined into a "
f"Additional dataset search {self.find_first_dataset!r} can only be joined into a "
"union dataset query as a constraint in data IDs, not as a find-first result."
)

Expand Down

0 comments on commit f98563f

Please sign in to comment.