-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DM-41966: Add Butler.transfer_dimension_records_from API #921
Changes from all commits
65bdb9e
43a946e
fad6e8a
31fc763
4845263
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
* Added new API ``Butler.transfer_dimension_records_from()`` to copy dimension records out of some refs and add them to the target butler. | ||
* This and ``Butler.transfer_from()`` now copy related dimension records as well as the records associated directly with the refs. | ||
For example, if visit is being transferred additional records such as visit_definition will also be copied. | ||
This requires a full Butler and not a limited Butler (such as the one backed by a quantum graph). |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,6 +37,7 @@ | |
import collections.abc | ||
import contextlib | ||
import io | ||
import itertools | ||
import logging | ||
import numbers | ||
import os | ||
|
@@ -1879,6 +1880,125 @@ | |
else: | ||
doImport(filename) # type: ignore | ||
|
||
def transfer_dimension_records_from( | ||
self, source_butler: LimitedButler | Butler, source_refs: Iterable[DatasetRef] | ||
) -> None: | ||
# Allowed dimensions in the target butler. | ||
elements = frozenset( | ||
element for element in self.dimensions.elements if element.hasTable() and element.viewOf is None | ||
) | ||
|
||
data_ids = {ref.dataId for ref in source_refs} | ||
|
||
dimension_records = self._extract_all_dimension_records_from_data_ids( | ||
source_butler, data_ids, elements | ||
) | ||
|
||
# Insert order is important. | ||
for element in self.dimensions.sorted(dimension_records.keys()): | ||
records = [r for r in dimension_records[element].values()] | ||
# Assume that if the record is already present that we can | ||
# use it without having to check that the record metadata | ||
# is consistent. | ||
self._registry.insertDimensionData(element, *records, skip_existing=True) | ||
_LOG.debug("Dimension '%s' -- number of records transferred: %d", element.name, len(records)) | ||
|
||
def _extract_all_dimension_records_from_data_ids( | ||
self, | ||
source_butler: LimitedButler | Butler, | ||
data_ids: set[DataCoordinate], | ||
allowed_elements: frozenset[DimensionElement], | ||
) -> dict[DimensionElement, dict[DataCoordinate, DimensionRecord]]: | ||
primary_records = self._extract_dimension_records_from_data_ids( | ||
source_butler, data_ids, allowed_elements | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's an assumption here that if the destination butler already has records for some of these elements, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. For the populated_by records, wouldn't we have to query the target butler to see if they existed already? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree it's less important and trickier to let the user control those. If we expressed the user control as an "opt out" list of elements rather than an opt-in list we could probably still trust it, though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So you want a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that's about what I was thinking vaguely of. I don't care deeply about adding it on this ticket if you just want to get somebody else unblocked right now. |
||
|
||
can_query = True if isinstance(source_butler, Butler) else False | ||
|
||
additional_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] = defaultdict(dict) | ||
for original_element, record_mapping in primary_records.items(): | ||
# Get dimensions that depend on this dimension. | ||
populated_by = self.dimensions.get_elements_populated_by( | ||
self.dimensions[original_element.name] # type: ignore | ||
) | ||
|
||
for data_id in record_mapping.keys(): | ||
for element in populated_by: | ||
if element not in allowed_elements: | ||
continue | ||
if element.name == original_element.name: | ||
continue | ||
|
||
if element.name in primary_records: | ||
# If this element has already been stored avoid | ||
# re-finding records since that may lead to additional | ||
# spurious records. e.g. visit is populated_by | ||
# visit_detector_region but querying | ||
# visit_detector_region by visit will return all the | ||
# detectors for this visit -- the visit dataId does not | ||
# constrain this. | ||
# To constrain the query the original dataIds would | ||
# have to be scanned. | ||
continue | ||
|
||
if not can_query: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it make sense to have a flag to not try to copy the derived records? This will break if a quantum graph is used as the source butler but I think that's fine because we shouldn't be enabling records transfer from the graph back to the primary butler because it's got all the records by definition. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there's also a use case for making a new repo from a QG, but I'd prefer to explicitly add that (and control what the interface for it is) rather than accidentally make it work one way and then have to continue to support that way. So I don't think we need that flag for that reason. I am a bit more worried about cases where somebody intentionally does not want to transfer something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have just realized that the
|
||
raise RuntimeError( | ||
f"Transferring populated_by records like {element.name} requires a full Butler." | ||
) | ||
|
||
records = source_butler.registry.queryDimensionRecords( # type: ignore | ||
element.name, **data_id.mapping # type: ignore | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is probably as efficient as we can make it now, but it will be really helpful when we can upload tables of data IDs to the query methods and join against them, both for efficiency and for simplifying the logic here (which is effectively a bunch table joins written in Python). Right now I think we better hope this almost never gets called. |
||
for record in records: | ||
additional_records[record.definition].setdefault(record.dataId, record) | ||
|
||
# The next step is to walk back through the additional records to | ||
# pick up any missing content (such as visit_definition needing to | ||
# know the exposure). Want to ensure we do not request records we | ||
# already have. | ||
missing_data_ids = set() | ||
for name, record_mapping in additional_records.items(): | ||
for data_id in record_mapping.keys(): | ||
if data_id not in primary_records[name]: | ||
missing_data_ids.add(data_id) | ||
|
||
# Fill out the new records. Assume that these new records do not | ||
# also need to carry over additional populated_by records. | ||
secondary_records = self._extract_dimension_records_from_data_ids( | ||
source_butler, missing_data_ids, allowed_elements | ||
) | ||
|
||
# Merge the extra sets of records in with the original. | ||
for name, record_mapping in itertools.chain(additional_records.items(), secondary_records.items()): | ||
primary_records[name].update(record_mapping) | ||
|
||
return primary_records | ||
|
||
def _extract_dimension_records_from_data_ids( | ||
self, | ||
source_butler: LimitedButler | Butler, | ||
data_ids: set[DataCoordinate], | ||
allowed_elements: frozenset[DimensionElement], | ||
) -> dict[DimensionElement, dict[DataCoordinate, DimensionRecord]]: | ||
dimension_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] = defaultdict(dict) | ||
|
||
for data_id in data_ids: | ||
# Need an expanded record, if not expanded that we need a full | ||
# butler with registry (allow mocks with registry too). | ||
if not data_id.hasRecords(): | ||
if registry := getattr(source_butler, "registry", None): | ||
data_id = registry.expandDataId(data_id) | ||
else: | ||
raise TypeError("Input butler needs to be a full butler to expand DataId.") | ||
# If this butler doesn't know about a dimension in the source | ||
# butler things will break later. | ||
for element_name in data_id.dimensions.elements: | ||
record = data_id.records[element_name] | ||
if record is not None and record.definition in allowed_elements: | ||
dimension_records[record.definition].setdefault(record.dataId, record) | ||
|
||
return dimension_records | ||
|
||
def transfer_from( | ||
self, | ||
source_butler: LimitedButler, | ||
|
@@ -1972,30 +2092,19 @@ | |
if element.hasTable() and element.viewOf is None | ||
) | ||
dataIds = {ref.dataId for ref in source_refs} | ||
# This logic comes from saveDataIds. | ||
for dataId in dataIds: | ||
# Need an expanded record, if not expanded that we need a full | ||
# butler with registry (allow mocks with registry too). | ||
if not dataId.hasRecords(): | ||
if registry := getattr(source_butler, "registry", None): | ||
dataId = registry.expandDataId(dataId) | ||
else: | ||
raise TypeError("Input butler needs to be a full butler to expand DataId.") | ||
# If this butler doesn't know about a dimension in the source | ||
# butler things will break later. | ||
for element_name in dataId.dimensions.elements: | ||
record = dataId.records[element_name] | ||
if record is not None and record.definition in elements: | ||
dimension_records[record.definition].setdefault(record.dataId, record) | ||
dimension_records = self._extract_all_dimension_records_from_data_ids( | ||
source_butler, dataIds, elements | ||
) | ||
|
||
handled_collections: set[str] = set() | ||
|
||
# Do all the importing in a single transaction. | ||
with self.transaction(): | ||
if dimension_records: | ||
_LOG.verbose("Ensuring that dimension records exist for transferred datasets.") | ||
for element, r in dimension_records.items(): | ||
records = [r[dataId] for dataId in r] | ||
# Order matters. | ||
for element in self.dimensions.sorted(dimension_records.keys()): | ||
records = [r for r in dimension_records[element].values()] | ||
# Assume that if the record is already present that we can | ||
# use it without having to check that the record metadata | ||
# is consistent. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be better to make this an iterable of
DataCoordinate
, since that's what's actually holding the records and you can get that fromIterable[DatasetRef]
with(ref.dataId for ref in source_refs)
.And even then I think that's a bit strange for a method with this name; it sounds like it should be taking a bunch of dimension records and transferring those (as well as other less-obvious associated records). But of course that's not actually the interface we need right here, so maybe this is just a naming problem.
One option might be to make this a private method, if the goal is really to support transferring datasets. On the DMTN-249 prototype I wrote a little helper class that I hoped to use to unify the
transfer-from
andimport_
/export
interfaces, by providing an abstraction over "a bunch of stuff you want to transfer self-consistently". I think we might need that in the transfer APIs to avoid a bunch of methods with names liketransfer_dimension_records_from_given_dataset_refs
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be a public API because the embargo transfers need to be able to call it before they transfer the raws from embargo to public. They can't use
Butler.transfer_from()
for raws because raws are relocated to public storage outside of butler before being ingested again (but without having to go through calling ingest-raws again since they have refs already from the embargo repo and are building up the FileDataset objects). That's also why refs are the interface and not dataIds.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I'm not thrilled with it, but if it's got a clear use case, go ahead, since the kind of generalization I want is even more design work (and I'd probably want to be even more cautious about releasing that half-baked), so we can cross the bridge of replacing this if and when we come to it.