From e5a4e538cbc7ab43862a6e8f75b55d9b6774b12d Mon Sep 17 00:00:00 2001 From: Alex Revetchi Date: Wed, 22 Sep 2021 00:47:10 +0100 Subject: [PATCH 1/2] Use streaming API, as it fails on big buckets, eventually it runs out of memory --- s3cmd | 86 ++++++++++++++++++++++++++++++++--------------------------- 1 file changed, 47 insertions(+), 39 deletions(-) diff --git a/s3cmd b/s3cmd index 77b4e74f..5b748f7a 100755 --- a/s3cmd +++ b/s3cmd @@ -157,7 +157,7 @@ def cmd_ls(args): if len(args) > 0: uri = S3Uri(args[0]) if uri.type == "s3" and uri.has_bucket(): - subcmd_bucket_list(s3, uri, cfg.limit) + subcmd_bucket_list(s3, uri, cfg.marker, cfg.limit) return EX_OK # If not a s3 type uri or no bucket was provided, list all the buckets @@ -183,7 +183,7 @@ def cmd_all_buckets_list_all_content(args): output(u"") return EX_OK -def subcmd_bucket_list(s3, uri, limit): +def subcmd_bucket_list(s3, uri, marker, limit): cfg = Config() bucket = uri.bucket() @@ -192,12 +192,6 @@ def subcmd_bucket_list(s3, uri, limit): debug(u"Bucket 's3://%s':" % bucket) if prefix.endswith('*'): prefix = prefix[:-1] - try: - response = s3.bucket_list(bucket, prefix = prefix, limit = limit) - except S3Error as e: - if e.info["Code"] in S3.codes: - error(S3.codes[e.info["Code"]] % bucket) - raise # md5 are 32 char long, but for multipart there could be a suffix if Config().human_readable_sizes: @@ -214,38 +208,51 @@ def subcmd_bucket_list(s3, uri, limit): else: format_string = u"%(timestamp)16s %(size)s %(uri)s" - for prefix in response['common_prefixes']: - output(format_string % { - "timestamp": "", - "size": dir_str, - "md5": "", - "storageclass": "", - "uri": uri.compose_uri(bucket, prefix["Prefix"])}) - - for object in response["list"]: - md5 = object.get('ETag', '').strip('"\'') - storageclass = object.get('StorageClass','') - - if cfg.list_md5: - if '-' in md5: # need to get md5 from the object - object_uri = uri.compose_uri(bucket, object["Key"]) - info_response = s3.object_info(S3Uri(object_uri)) - try: - md5 = info_response['s3cmd-attrs']['md5'] - except KeyError: - pass + truncated = False + common_prefixes = [] + + try: + for _trunkated, _common_prefixes, _objects in s3.bucket_list_streaming(bucket, prefix = prefix, marker=marker, limit = limit): + truncated = _trunkated + + if common_prefixes != _common_prefixes: + common_prefixes = _common_prefixes + for prefix in common_prefixes: + output(format_string % { + "timestamp": "", + "size": dir_str, + "md5": "", + "storageclass": "", + "uri": uri.compose_uri(bucket, prefix["Prefix"])}) + + for object in _objects: + md5 = object.get('ETag', '').strip('"\'') + storageclass = object.get('StorageClass','') + + if cfg.list_md5: + if '-' in md5: # need to get md5 from the object + object_uri = uri.compose_uri(bucket, object["Key"]) + info_response = s3.object_info(S3Uri(object_uri)) + try: + md5 = info_response['s3cmd-attrs']['md5'] + except KeyError: + pass + + size_and_coeff = formatSize(object["Size"], + Config().human_readable_sizes) + output(format_string % { + "timestamp": formatDateTime(object["LastModified"]), + "size" : format_size % size_and_coeff, + "md5" : md5, + "storageclass" : storageclass, + "uri": uri.compose_uri(bucket, object["Key"]), + }) + except S3Error as e: + if e.info["Code"] in S3.codes: + error(S3.codes[e.info["Code"]] % bucket) + raise - size_and_coeff = formatSize(object["Size"], - Config().human_readable_sizes) - output(format_string % { - "timestamp": formatDateTime(object["LastModified"]), - "size" : format_size % size_and_coeff, - "md5" : md5, - "storageclass" : storageclass, - "uri": uri.compose_uri(bucket, object["Key"]), - }) - - if response["truncated"]: + if truncated: warning(u"The list is truncated because the settings limit was reached.") def cmd_bucket_create(args): @@ -2769,6 +2776,7 @@ def main(): optparser.add_option( "--delay-updates", dest="delay_updates", action="store_true", help="*OBSOLETE* Put all updated files into place at end [sync]") # OBSOLETE optparser.add_option( "--max-delete", dest="max_delete", action="store", help="Do not delete more than NUM files. [del] and [sync]", metavar="NUM") optparser.add_option( "--limit", dest="limit", action="store", help="Limit number of objects returned in the response body (only for [ls] and [la] commands)", metavar="NUM") + optparser.add_option( "--marker", dest="marker", action="store", help="Marker is where you want S3 to start listing from. S3 starts listing after this specified key. Marker can be any key in the bucket.") optparser.add_option( "--add-destination", dest="additional_destinations", action="append", help="Additional destination for parallel uploads, in addition to last arg. May be repeated.") optparser.add_option( "--delete-after-fetch", dest="delete_after_fetch", action="store_true", help="Delete remote objects after fetching to local file (only for [get] and [sync] commands).") optparser.add_option("-p", "--preserve", dest="preserve_attrs", action="store_true", help="Preserve filesystem attributes (mode, ownership, timestamps). Default for [sync] command.") From 796aa094f8b6af4e6b09082db5866f0fc14bccd2 Mon Sep 17 00:00:00 2001 From: Alex Revetchi Date: Wed, 22 Sep 2021 03:37:41 +0100 Subject: [PATCH 2/2] - remove marker changes from another PR --- s3cmd | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/s3cmd b/s3cmd index 5b748f7a..4ad686d0 100755 --- a/s3cmd +++ b/s3cmd @@ -157,7 +157,7 @@ def cmd_ls(args): if len(args) > 0: uri = S3Uri(args[0]) if uri.type == "s3" and uri.has_bucket(): - subcmd_bucket_list(s3, uri, cfg.marker, cfg.limit) + subcmd_bucket_list(s3, uri, cfg.limit) return EX_OK # If not a s3 type uri or no bucket was provided, list all the buckets @@ -183,7 +183,7 @@ def cmd_all_buckets_list_all_content(args): output(u"") return EX_OK -def subcmd_bucket_list(s3, uri, marker, limit): +def subcmd_bucket_list(s3, uri, limit): cfg = Config() bucket = uri.bucket() @@ -212,7 +212,7 @@ def subcmd_bucket_list(s3, uri, marker, limit): common_prefixes = [] try: - for _trunkated, _common_prefixes, _objects in s3.bucket_list_streaming(bucket, prefix = prefix, marker=marker, limit = limit): + for _trunkated, _common_prefixes, _objects in s3.bucket_list_streaming(bucket, prefix = prefix, limit = limit): truncated = _trunkated if common_prefixes != _common_prefixes: @@ -2776,7 +2776,6 @@ def main(): optparser.add_option( "--delay-updates", dest="delay_updates", action="store_true", help="*OBSOLETE* Put all updated files into place at end [sync]") # OBSOLETE optparser.add_option( "--max-delete", dest="max_delete", action="store", help="Do not delete more than NUM files. [del] and [sync]", metavar="NUM") optparser.add_option( "--limit", dest="limit", action="store", help="Limit number of objects returned in the response body (only for [ls] and [la] commands)", metavar="NUM") - optparser.add_option( "--marker", dest="marker", action="store", help="Marker is where you want S3 to start listing from. S3 starts listing after this specified key. Marker can be any key in the bucket.") optparser.add_option( "--add-destination", dest="additional_destinations", action="append", help="Additional destination for parallel uploads, in addition to last arg. May be repeated.") optparser.add_option( "--delete-after-fetch", dest="delete_after_fetch", action="store_true", help="Delete remote objects after fetching to local file (only for [get] and [sync] commands).") optparser.add_option("-p", "--preserve", dest="preserve_attrs", action="store_true", help="Preserve filesystem attributes (mode, ownership, timestamps). Default for [sync] command.")