Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-31824: Use ResourcePath.mtransfer for Butler.transfer_from #1162

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions python/lsst/daf/butler/cli/cmd/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,9 @@ def retrieve_artifacts(**kwargs: Any) -> None:
@transfer_option()
@register_dataset_types_option()
@transfer_dimensions_option()
@click.option(
"--dry-run/--no-dry-run", default=False, help="Enable dry run mode and do not transfer any datasets."
)
@options_file_option()
def transfer_datasets(**kwargs: Any) -> None:
"""Transfer datasets from a source butler to a destination butler.
Expand Down
49 changes: 34 additions & 15 deletions python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -2158,10 +2158,19 @@ def retrieveArtifacts(
target_uri = to_transfer[cleaned_source_uri]
artifact_map[target_uri].append(ref.id)

# In theory can now parallelize the transfer
# Parallelize the transfer. Re-raise as a single exception if
# a FileExistsError is encountered anywhere.
log.debug("Number of artifacts to transfer to %s: %d", str(destination), len(to_transfer))
for source_uri, target_uri in to_transfer.items():
target_uri.transfer_from(source_uri, transfer=transfer, overwrite=overwrite)
try:
ResourcePath.mtransfer(
transfer,
((from_uri, to_uri) for from_uri, to_uri in to_transfer.items()),
overwrite=overwrite,
)
except* FileExistsError as egroup:
raise FileExistsError(
"Some files already exist in destination directory and overwrite is False"
) from egroup

# Transfer the Zip files and unpack them.
zipped_artifacts = unpack_zips(zips_to_transfer, requested_ids, destination, preserve_path, overwrite)
Expand Down Expand Up @@ -2748,6 +2757,9 @@ def transfer_from(
# Record each time we have done a "direct" transfer.
direct_transfers = []

# Keep track of all the file transfers that are required.
from_to: list[tuple[ResourcePath, ResourcePath]] = []

# Now can transfer the artifacts
log.verbose("Transferring artifacts")
for ref in refs:
Expand Down Expand Up @@ -2796,21 +2808,28 @@ def transfer_from(
info = info.update(path=target_location.pathInStore.path)

# Need to transfer it to the new location.
# Assume we should always overwrite. If the artifact
# is there this might indicate that a previous transfer
# was interrupted but was not able to be rolled back
# completely (eg pre-emption) so follow Datastore default
# and overwrite. Do not copy if we are in dry-run mode.
if not dry_run:
target_location.uri.transfer_from(
source_location.uri,
transfer=transfer,
overwrite=True,
transaction=self._transaction,
)
from_to.append((source_location.uri, target_location.uri))

artifacts.append((ref, info))

# Do the file transfers in bulk.
# Assume we should always overwrite. If the artifact
# is there this might indicate that a previous transfer
# was interrupted but was not able to be rolled back
# completely (eg pre-emption) so follow Datastore default
# and overwrite. Do not copy if we are in dry-run mode.
if dry_run:
log.info("Would be copying %d file artifacts", len(from_to))
else:
log.verbose("Copying %d file artifacts", len(from_to))
with time_this(log, msg="Transferring datasets into datastore", level=VERBOSE):
ResourcePath.mtransfer(
transfer,
from_to,
overwrite=True,
transaction=self._transaction,
)

if direct_transfers:
log.info(
"Transfer request for an outside-datastore artifact with absolute URI done %d time%s",
Expand Down
5 changes: 5 additions & 0 deletions python/lsst/daf/butler/script/transferDatasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def transferDatasets(
transfer: str,
register_dataset_types: bool,
transfer_dimensions: bool = True,
dry_run: bool = False,
) -> int:
"""Transfer datasets from run in source to dest.

Expand Down Expand Up @@ -84,6 +85,9 @@ def transferDatasets(
Indicate whether dimensions should be transferred along with
datasets. It can be more efficient to disable this if it is known
that all dimensions exist.
dry_run : `bool`, optional
If `True` no transfers are done but the number of transfers that
would be done is reported.
"""
source_butler = Butler.from_config(source, writeable=False)
dest_butler = Butler.from_config(dest, writeable=True)
Expand Down Expand Up @@ -112,5 +116,6 @@ def transferDatasets(
transfer=transfer,
register_dataset_types=register_dataset_types,
transfer_dimensions=transfer_dimensions,
dry_run=dry_run,
)
return len(transferred)
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@

lsst-sphgeom @ git+https://github.com/lsst/sphgeom@main
lsst-utils @ git+https://github.com/lsst/utils@main
lsst-resources[https,s3] @ git+https://github.com/lsst/resources@main
lsst-resources[https,s3] @ git+https://github.com/lsst/resources@tickets/DM-31824
lsst-daf-relation @ git+https://github.com/lsst/daf_relation@main
Loading