Skip to content

Commit

Permalink
feat: added config to select records with jsonpath with simple_offset (
Browse files Browse the repository at this point in the history
  • Loading branch information
BubbaTLC authored Sep 30, 2024
1 parent cec080f commit 10ddd9c
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 1 deletion.
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ plugins:
kind: string
- name: pagination_initial_offset
kind: integer
- name: offset_records_jsonpath
kind: string
- name: streams
kind: array
- name: name
Expand Down Expand Up @@ -158,6 +160,7 @@ provided at the top-level will be the default values for each stream.:
- `pagination_limit_per_page_param`: optional: The name of the param that indicates the limit/per_page. Defaults to None.
- `pagination_total_limit_param`: optional: The name of the param that indicates the total limit e.g. total, count. Defaults to total
- `pagination_initial_offset`: optional: The initial offset for the first request. Defaults to 1.
- `offset_records_jsonpath`: optional: a jsonpath string representing the path to the records. Defaults to `None`.
- `next_page_token_path`: optional: a jsonpath string representing the path to the "next page" token. Defaults to `'$.next_page'` for the `jsonpath_paginator` paginator only otherwise None.
- `streams`: required: a list of objects that contain the configuration of each stream. See stream-level params below.
- `path`: optional: see stream-level params below.
Expand Down Expand Up @@ -186,6 +189,7 @@ provided at the top-level will be the default values for each stream.:
- `oauth_extras`: optional: see authentication params below.
- `oauth_expiration_secs`: optional: see authentication params below.
- `aws_credentials`: optional: see authentication params below.
- `offset_records_jsonpath`: optional: see pagination params below.

#### Stream level config options.
Parameters that appear at the stream-level
Expand Down Expand Up @@ -327,6 +331,22 @@ There are additional request styles supported as follows for pagination.
- `next_page_token_path` - Use to locate an appropriate link in the response. Default `"hasMore"`.
- `simple_offset_paginator` - A paginator that uses `offset` and `limit` parameters to page through a collection of resources. Unlike `offset_paginator`, this paginator does not rely on any headers to determine whether it should keep paginating. Instead, it will continue paginating (by sending requests with increasing `offset`) until the API returns 0 results. You can use this paginator if the API returns a JSON array of records rather than a top-level object.
- `pagination_page_size` - Sets a limit to number of records per page / response. Default `25` records.
- `offset_records_jsonpath` - The JSONPath to the records in the response. Defaults to `None`. In the example below we would select the contacts array with `"offset_records_jsonpath": "$.contacts"`. Once the number of records doe not equal `pagination_page_size` the tap will stop paginating.

```json
{
"contacts": [
{
"id": 52,
"emailBlacklisted": false,
"smsBlacklisted": false,
"createdAt": "2024-09-24T01:00:00.000-00:00",
"modifiedAt": "2024-09-25T01:00:00.000-00:00",
}
],
"count": 256
}
```

### Additional Response Styles
There are additional response styles supported as follows.
Expand Down
17 changes: 16 additions & 1 deletion tap_rest_api_msdk/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,15 @@ def has_more(self, response: requests.Response):
class SimpleOffsetPaginator(BaseOffsetPaginator):
"""Simple Offset Paginator."""

def __init__(self, *args, pagination_page_size: int = 25, **kwargs):
def __init__(
self,
*args,
offset_records_jsonpath=None,
pagination_page_size: int = 25,
**kwargs
):
super().__init__(*args, **kwargs)
self._offset_records_jsonpath = offset_records_jsonpath
self._pagination_page_size = pagination_page_size

def has_more(self, response: requests.Response):
Expand All @@ -94,6 +101,14 @@ def has_more(self, response: requests.Response):
Whether there are more pages to fetch.
"""
if self._offset_records_jsonpath:
records_left = len(
next(
extract_jsonpath(self._offset_records_jsonpath, response.json()), 0
)
) # type: ignore
return records_left == self._pagination_page_size

return len(response.json()) == self._pagination_page_size


Expand Down
4 changes: 4 additions & 0 deletions tap_rest_api_msdk/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(
pagination_limit_per_page_param: Optional[str] = None,
pagination_total_limit_param: Optional[str] = None,
pagination_initial_offset: int = 1,
offset_records_jsonpath: Optional[str] = None,
start_date: Optional[datetime] = None,
source_search_field: Optional[str] = None,
source_search_query: Optional[str] = None,
Expand Down Expand Up @@ -169,6 +170,7 @@ def __init__(
self.source_search_query = source_search_query
self.pagination_page_size: Optional[int]
self.pagination_initial_offset = pagination_initial_offset
self.offset_records_jsonpath = offset_records_jsonpath

# Setting Pagination Limits
if self.pagination_request_style == "restapi_header_link_paginator":
Expand Down Expand Up @@ -329,6 +331,8 @@ def get_new_paginator(self):
return SimpleOffsetPaginator(
start_value=self.pagination_initial_offset,
page_size=self.pagination_page_size,
offset_records_jsonpath=self.offset_records_jsonpath,
pagination_page_size=self.pagination_page_size,
)
else:
self.logger.error(
Expand Down
13 changes: 13 additions & 0 deletions tap_rest_api_msdk/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,14 @@ class TapRestApiMsdk(Tap):
required=False,
description="The initial offset to start pagination from. Defaults to 1",
),
th.Property(
"offset_records_jsonpath",
th.StringType,
default=None,
required=False,
description="Optional jsonpath string representing the path in the results "
"Defaults to `None`.",
),
)

# add common properties to top-level properties
Expand Down Expand Up @@ -463,6 +471,10 @@ def discover_streams(self) -> List[DynamicStream]: # type: ignore
source_search_query = stream.get(
"source_search_query", self.config.get("source_search_query", "")
)
offset_records_jsonpath = stream.get(
"offset_records_jsonpath",
self.config.get("offset_records_jsonpath", None),
)

schema = {}
schema_config = stream.get("schema")
Expand Down Expand Up @@ -524,6 +536,7 @@ def discover_streams(self) -> List[DynamicStream]: # type: ignore
"pagination_initial_offset",
1,
),
offset_records_jsonpath=offset_records_jsonpath,
schema=schema,
start_date=start_date,
source_search_field=source_search_field,
Expand Down

0 comments on commit 10ddd9c

Please sign in to comment.