From 2e17ac9a5570e669be0f663fae731ea8790a4815 Mon Sep 17 00:00:00 2001 From: colton Date: Fri, 4 Oct 2024 17:14:04 -0400 Subject: [PATCH] [dagster-aws] use paginator in get_s3_keys (#25018) ## Summary & Motivation - Uses the `paginator` utility for improved pagination in `list_objects_v2` https://boto3.amazonaws.com/v1/documentation/api/latest/guide/paginators.html#creating-paginators ## How I Tested These Changes `pytest` ## Changelog NO CHANGELOG - [ ] `NEW` _(added new feature or capability)_ - [ ] `BUGFIX` _(fixed a bug)_ - [ ] `DOCS` _(added or updated documentation)_ --------- Co-authored-by: Pedram Navid <1045990+PedramNavid@users.noreply.github.com> --- .../dagster-aws/dagster_aws/s3/sensor.py | 38 ++++++++++--------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/python_modules/libraries/dagster-aws/dagster_aws/s3/sensor.py b/python_modules/libraries/dagster-aws/dagster_aws/s3/sensor.py index 686ef9c0f222e..06e609f5555ac 100644 --- a/python_modules/libraries/dagster-aws/dagster_aws/s3/sensor.py +++ b/python_modules/libraries/dagster-aws/dagster_aws/s3/sensor.py @@ -1,33 +1,35 @@ +from typing import Optional + import boto3 import dagster._check as check -MAX_KEYS = 1000 + +class ClientException(Exception): + pass -def get_s3_keys(bucket, prefix="", since_key=None, s3_session=None): +def get_s3_keys( + bucket: str, + prefix: str = "", + since_key: Optional[str] = None, + s3_session: Optional[boto3.Session] = None, +): check.str_param(bucket, "bucket") check.str_param(prefix, "prefix") check.opt_str_param(since_key, "since_key") if not s3_session: - s3_session = boto3.resource("s3", use_ssl=True, verify=True).meta.client + s3_session = boto3.client("s3", use_ssl=True, verify=True) - cursor = "" - contents = [] + if not s3_session: + raise ClientException("Failed to initialize s3 client") + + paginator = s3_session.get_paginator("list_objects_v2") # type: ignore + page_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix) - while True: - response = s3_session.list_objects_v2( - Bucket=bucket, - Delimiter="", - MaxKeys=MAX_KEYS, - Prefix=prefix, - StartAfter=cursor, - ) - contents.extend(response.get("Contents", [])) - if response["KeyCount"] < MAX_KEYS: - break - - cursor = response["Contents"][-1]["Key"] + contents = [] + for page in page_iterator: + contents.extend(page.get("Contents", [])) sorted_keys = [obj["Key"] for obj in sorted(contents, key=lambda x: x["LastModified"])]