Skip to content

Commit

Permalink
New Butler API for transferring dimension records
Browse files Browse the repository at this point in the history
Also copies related dimensions populated by the original set.
Butler.transfer_from now uses a part of this API.
  • Loading branch information
timj committed Dec 5, 2023
1 parent 3a43e52 commit 564e43e
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 15 deletions.
19 changes: 19 additions & 0 deletions python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1242,6 +1242,25 @@ def import_(
"""
raise NotImplementedError()

@abstractmethod
def transfer_dimension_records_from(
self, source_butler: LimitedButler, source_refs: Iterable[DatasetRef]
) -> None:
"""Transfer dimension records to this Butler from another Butler.
Parameters
----------
source_butler : `LimitedButler`
Butler from which the records are to be transferred. If data IDs
in ``source_refs`` are not expanded then this has to be a full
`Butler` whose registry will be used to expand data IDs.
source_refs : iterable of `DatasetRef`
Datasets defined in the source butler whose dimension records
should be transferred to this butler. In most circumstances.
transfer is faster if the dataset refs are expanded.
"""
raise NotImplementedError()

@abstractmethod
def transfer_from(
self,
Expand Down
126 changes: 111 additions & 15 deletions python/lsst/daf/butler/direct_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import collections.abc
import contextlib
import io
import itertools
import logging
import numbers
import os
Expand Down Expand Up @@ -1875,6 +1876,113 @@ def doImport(importStream: TextIO | ResourceHandleProtocol) -> None:
else:
doImport(filename) # type: ignore

def transfer_dimension_records_from(
self, source_butler: LimitedButler, source_refs: Iterable[DatasetRef]
) -> None:
# Allowed dimensions in the target butler.
elements = frozenset(
element for element in self.dimensions.elements if element.hasTable() and element.viewOf is None
)

data_ids = {ref.dataId for ref in source_refs}

dimension_records = self._extract_all_dimension_records_from_data_ids(
source_butler, data_ids, elements
)

for element, r in dimension_records.items():
records = [r[dataId] for dataId in r]
# Assume that if the record is already present that we can
# use it without having to check that the record metadata
# is consistent.
self._registry.insertDimensionData(element, *records, skip_existing=True)
_LOG.debug("Dimension '%s' -- number of records transferred: %d", element.name, len(records))

def _extract_all_dimension_records_from_data_ids(
self,
source_butler: LimitedButler,
data_ids: set[DataCoordinate],
allowed_elements: frozenset[DimensionElement],
) -> dict[DimensionElement, dict[DataCoordinate, DimensionRecord]]:
primary_records = self._extract_dimension_records_from_data_ids(
source_butler, data_ids, allowed_elements
)

additional_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] = defaultdict(dict)
for name, records in primary_records.items():
# Get dimensions that depend on this dimension.
populated_by = self.dimensions.get_elements_populated_by(self.dimensions[name])

for data_id in records.keys():
for element in populated_by:
if element not in allowed_elements:
continue
if element.name == name:
continue

if element.name in primary_records:
# If this element has already been stored avoid
# re-finding records since that may lead to additional
# spurious records. e.g. visit is populated_by
# visit_detector_region but querying
# visit_detector_region by visit will return all the
# detectors for this visit -- the visit dataId does not
# constrain this.
# To constrain the query the original dataIds would
# have to be scanned.
continue

records = source_butler.registry.queryDimensionRecords(element.name, **data_id.mapping)
for record in records:
additional_records[record.definition].setdefault(record.dataId, record)

# The next step is to walk back through the additional records to
# pick up any missing content (such as visit_definition needing to
# know the exposure). Want to ensure we do not request records we
# already have.
missing_data_ids = set()
for name, records in additional_records.items():
for data_id in records.keys():
if data_id not in primary_records[name]:
missing_data_ids.add(data_id)

# Fill out the new records. Assume that these new records do not
# also need to carry over additional populated_by records.
secondary_records = self._extract_dimension_records_from_data_ids(
source_butler, missing_data_ids, allowed_elements
)

# Merge the extra sets of records in with the original.
for name, records in itertools.chain(additional_records.items(), secondary_records.items()):
primary_records[name].update(records)

return primary_records

def _extract_dimension_records_from_data_ids(
self,
source_butler: LimitedButler,
data_ids: set[DataCoordinate],
allowed_elements: frozenset[DimensionElement],
) -> dict[DimensionElement, dict[DataCoordinate, DimensionRecord]]:
dimension_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] = defaultdict(dict)

for data_id in data_ids:
# Need an expanded record, if not expanded that we need a full
# butler with registry (allow mocks with registry too).
if not data_id.hasRecords():
if registry := getattr(source_butler, "registry", None):
data_id = registry.expandDataId(data_id)
else:
raise TypeError("Input butler needs to be a full butler to expand DataId.")
# If this butler doesn't know about a dimension in the source
# butler things will break later.
for element_name in data_id.dimensions.elements:
record = data_id.records[element_name]
if record is not None and record.definition in allowed_elements:
dimension_records[record.definition].setdefault(record.dataId, record)

return dimension_records

def transfer_from(
self,
source_butler: LimitedButler,
Expand Down Expand Up @@ -1968,21 +2076,9 @@ def transfer_from(
if element.hasTable() and element.viewOf is None
)
dataIds = {ref.dataId for ref in source_refs}
# This logic comes from saveDataIds.
for dataId in dataIds:
# Need an expanded record, if not expanded that we need a full
# butler with registry (allow mocks with registry too).
if not dataId.hasRecords():
if registry := getattr(source_butler, "registry", None):
dataId = registry.expandDataId(dataId)
else:
raise TypeError("Input butler needs to be a full butler to expand DataId.")
# If this butler doesn't know about a dimension in the source
# butler things will break later.
for element_name in dataId.dimensions.elements:
record = dataId.records[element_name]
if record is not None and record.definition in elements:
dimension_records[record.definition].setdefault(record.dataId, record)
dimension_records = self._extract_all_dimension_records_from_data_ids(
source_butler, dataIds, elements
)

handled_collections: set[str] = set()

Expand Down
6 changes: 6 additions & 0 deletions python/lsst/daf/butler/remote_butler/_remote_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,12 @@ def import_(
# Docstring inherited.
raise NotImplementedError()

def transfer_dimension_records_from(
self, source_butler: LimitedButler, source_refs: Iterable[DatasetRef]
) -> None:
# Docstring inherited.
raise NotImplementedError()

def transfer_from(
self,
source_butler: LimitedButler,
Expand Down
5 changes: 5 additions & 0 deletions tests/test_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2308,6 +2308,11 @@ def assertButlerTransfers(self, purge: bool = False, storageClassName: str = "St
# Can remove with DM-35498.
self.target_butler.registry.refresh()

# Transfer the records for one ref to test the alternative API.
with self.assertLogs(logger="lsst", level=logging.DEBUG) as log_cm:
self.target_butler.transfer_dimension_records_from(self.source_butler, [source_refs[0]])
self.assertIn("number of records transferred: 1", ";".join(log_cm.output))

# Now transfer them to the second butler, including dimensions.
with self.assertLogs(logger="lsst", level=logging.DEBUG) as log_cm:
transferred = self.target_butler.transfer_from(
Expand Down
19 changes: 19 additions & 0 deletions tests/test_simpleButler.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,25 @@ def testJson(self):
from_json = type(test_item).from_json(json_str, universe=butler.dimensions)
self.assertEqual(from_json, test_item, msg=f"From JSON '{json_str}' using universe")

def test_populated_by(self):
"""Test that dimension records can find other records."""
butler = self.makeButler(writeable=True)
butler.import_(filename=os.path.join(TESTDIR, "data", "registry", "hsc-rc2-subset.yaml"))

elements = frozenset(
element for element in butler.dimensions.elements if element.hasTable() and element.viewOf is None
)

# Get a visit-based dataId.
data_ids = set(butler.registry.queryDataIds("visit", visit=1232, instrument="HSC"))

# Request all the records related to it.
records = butler._extract_all_dimension_records_from_data_ids(butler, data_ids, elements)

self.assertIn(butler.dimensions["visit_detector_region"], records, f"Keys: {records.keys()}")
self.assertIn(butler.dimensions["visit_system_membership"], records)
self.assertIn(butler.dimensions["visit_system"], records)

def testJsonDimensionRecordsAndHtmlRepresentation(self):
# Dimension Records
butler = self.makeButler(writeable=True)
Expand Down

0 comments on commit 564e43e

Please sign in to comment.