Skip to content

Commit

Permalink
[dagster-aws] use paginator in get_s3_keys (#25018)
Browse files Browse the repository at this point in the history
## Summary & Motivation

- Uses the `paginator` utility for improved pagination in
`list_objects_v2`


https://boto3.amazonaws.com/v1/documentation/api/latest/guide/paginators.html#creating-paginators

## How I Tested These Changes

`pytest`

## Changelog

NO CHANGELOG

- [ ] `NEW` _(added new feature or capability)_
- [ ] `BUGFIX` _(fixed a bug)_
- [ ] `DOCS` _(added or updated documentation)_

---------

Co-authored-by: Pedram Navid <[email protected]>
  • Loading branch information
cmpadden and PedramNavid authored Oct 4, 2024
1 parent cc35d71 commit 2e17ac9
Showing 1 changed file with 20 additions and 18 deletions.
38 changes: 20 additions & 18 deletions python_modules/libraries/dagster-aws/dagster_aws/s3/sensor.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,35 @@
from typing import Optional

import boto3
import dagster._check as check

MAX_KEYS = 1000

class ClientException(Exception):
pass


def get_s3_keys(bucket, prefix="", since_key=None, s3_session=None):
def get_s3_keys(
bucket: str,
prefix: str = "",
since_key: Optional[str] = None,
s3_session: Optional[boto3.Session] = None,
):
check.str_param(bucket, "bucket")
check.str_param(prefix, "prefix")
check.opt_str_param(since_key, "since_key")

if not s3_session:
s3_session = boto3.resource("s3", use_ssl=True, verify=True).meta.client
s3_session = boto3.client("s3", use_ssl=True, verify=True)

cursor = ""
contents = []
if not s3_session:
raise ClientException("Failed to initialize s3 client")

paginator = s3_session.get_paginator("list_objects_v2") # type: ignore
page_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix)

while True:
response = s3_session.list_objects_v2(
Bucket=bucket,
Delimiter="",
MaxKeys=MAX_KEYS,
Prefix=prefix,
StartAfter=cursor,
)
contents.extend(response.get("Contents", []))
if response["KeyCount"] < MAX_KEYS:
break

cursor = response["Contents"][-1]["Key"]
contents = []
for page in page_iterator:
contents.extend(page.get("Contents", []))

sorted_keys = [obj["Key"] for obj in sorted(contents, key=lambda x: x["LastModified"])]

Expand Down

0 comments on commit 2e17ac9

Please sign in to comment.