From 10ddd9c7aa963405a8d0ced860956f24cb2b7e05 Mon Sep 17 00:00:00 2001 From: Bubba Chabot <51095715+BubbaTLC@users.noreply.github.com> Date: Mon, 30 Sep 2024 14:40:48 -0300 Subject: [PATCH] feat: added config to select records with jsonpath with simple_offset (#59) --- README.md | 20 ++++++++++++++++++++ tap_rest_api_msdk/pagination.py | 17 ++++++++++++++++- tap_rest_api_msdk/streams.py | 4 ++++ tap_rest_api_msdk/tap.py | 13 +++++++++++++ 4 files changed, 53 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index b2f4bb4..00bb400 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,8 @@ plugins: kind: string - name: pagination_initial_offset kind: integer + - name: offset_records_jsonpath + kind: string - name: streams kind: array - name: name @@ -158,6 +160,7 @@ provided at the top-level will be the default values for each stream.: - `pagination_limit_per_page_param`: optional: The name of the param that indicates the limit/per_page. Defaults to None. - `pagination_total_limit_param`: optional: The name of the param that indicates the total limit e.g. total, count. Defaults to total - `pagination_initial_offset`: optional: The initial offset for the first request. Defaults to 1. +- `offset_records_jsonpath`: optional: a jsonpath string representing the path to the records. Defaults to `None`. - `next_page_token_path`: optional: a jsonpath string representing the path to the "next page" token. Defaults to `'$.next_page'` for the `jsonpath_paginator` paginator only otherwise None. - `streams`: required: a list of objects that contain the configuration of each stream. See stream-level params below. - `path`: optional: see stream-level params below. @@ -186,6 +189,7 @@ provided at the top-level will be the default values for each stream.: - `oauth_extras`: optional: see authentication params below. - `oauth_expiration_secs`: optional: see authentication params below. - `aws_credentials`: optional: see authentication params below. +- `offset_records_jsonpath`: optional: see pagination params below. #### Stream level config options. Parameters that appear at the stream-level @@ -327,6 +331,22 @@ There are additional request styles supported as follows for pagination. - `next_page_token_path` - Use to locate an appropriate link in the response. Default `"hasMore"`. - `simple_offset_paginator` - A paginator that uses `offset` and `limit` parameters to page through a collection of resources. Unlike `offset_paginator`, this paginator does not rely on any headers to determine whether it should keep paginating. Instead, it will continue paginating (by sending requests with increasing `offset`) until the API returns 0 results. You can use this paginator if the API returns a JSON array of records rather than a top-level object. - `pagination_page_size` - Sets a limit to number of records per page / response. Default `25` records. + - `offset_records_jsonpath` - The JSONPath to the records in the response. Defaults to `None`. In the example below we would select the contacts array with `"offset_records_jsonpath": "$.contacts"`. Once the number of records doe not equal `pagination_page_size` the tap will stop paginating. + + ```json + { + "contacts": [ + { + "id": 52, + "emailBlacklisted": false, + "smsBlacklisted": false, + "createdAt": "2024-09-24T01:00:00.000-00:00", + "modifiedAt": "2024-09-25T01:00:00.000-00:00", + } + ], + "count": 256 + } + ``` ### Additional Response Styles There are additional response styles supported as follows. diff --git a/tap_rest_api_msdk/pagination.py b/tap_rest_api_msdk/pagination.py index af84cea..0f51660 100644 --- a/tap_rest_api_msdk/pagination.py +++ b/tap_rest_api_msdk/pagination.py @@ -80,8 +80,15 @@ def has_more(self, response: requests.Response): class SimpleOffsetPaginator(BaseOffsetPaginator): """Simple Offset Paginator.""" - def __init__(self, *args, pagination_page_size: int = 25, **kwargs): + def __init__( + self, + *args, + offset_records_jsonpath=None, + pagination_page_size: int = 25, + **kwargs + ): super().__init__(*args, **kwargs) + self._offset_records_jsonpath = offset_records_jsonpath self._pagination_page_size = pagination_page_size def has_more(self, response: requests.Response): @@ -94,6 +101,14 @@ def has_more(self, response: requests.Response): Whether there are more pages to fetch. """ + if self._offset_records_jsonpath: + records_left = len( + next( + extract_jsonpath(self._offset_records_jsonpath, response.json()), 0 + ) + ) # type: ignore + return records_left == self._pagination_page_size + return len(response.json()) == self._pagination_page_size diff --git a/tap_rest_api_msdk/streams.py b/tap_rest_api_msdk/streams.py index 3f44a47..e5f34bb 100644 --- a/tap_rest_api_msdk/streams.py +++ b/tap_rest_api_msdk/streams.py @@ -63,6 +63,7 @@ def __init__( pagination_limit_per_page_param: Optional[str] = None, pagination_total_limit_param: Optional[str] = None, pagination_initial_offset: int = 1, + offset_records_jsonpath: Optional[str] = None, start_date: Optional[datetime] = None, source_search_field: Optional[str] = None, source_search_query: Optional[str] = None, @@ -169,6 +170,7 @@ def __init__( self.source_search_query = source_search_query self.pagination_page_size: Optional[int] self.pagination_initial_offset = pagination_initial_offset + self.offset_records_jsonpath = offset_records_jsonpath # Setting Pagination Limits if self.pagination_request_style == "restapi_header_link_paginator": @@ -329,6 +331,8 @@ def get_new_paginator(self): return SimpleOffsetPaginator( start_value=self.pagination_initial_offset, page_size=self.pagination_page_size, + offset_records_jsonpath=self.offset_records_jsonpath, + pagination_page_size=self.pagination_page_size, ) else: self.logger.error( diff --git a/tap_rest_api_msdk/tap.py b/tap_rest_api_msdk/tap.py index 7534155..50c948c 100644 --- a/tap_rest_api_msdk/tap.py +++ b/tap_rest_api_msdk/tap.py @@ -393,6 +393,14 @@ class TapRestApiMsdk(Tap): required=False, description="The initial offset to start pagination from. Defaults to 1", ), + th.Property( + "offset_records_jsonpath", + th.StringType, + default=None, + required=False, + description="Optional jsonpath string representing the path in the results " + "Defaults to `None`.", + ), ) # add common properties to top-level properties @@ -463,6 +471,10 @@ def discover_streams(self) -> List[DynamicStream]: # type: ignore source_search_query = stream.get( "source_search_query", self.config.get("source_search_query", "") ) + offset_records_jsonpath = stream.get( + "offset_records_jsonpath", + self.config.get("offset_records_jsonpath", None), + ) schema = {} schema_config = stream.get("schema") @@ -524,6 +536,7 @@ def discover_streams(self) -> List[DynamicStream]: # type: ignore "pagination_initial_offset", 1, ), + offset_records_jsonpath=offset_records_jsonpath, schema=schema, start_date=start_date, source_search_field=source_search_field,