From dccbc08507501917c08c9d7022fd5f58b15a5688 Mon Sep 17 00:00:00 2001 From: jlloyd-widen <82222659+jlloyd-widen@users.noreply.github.com> Date: Fri, 18 Feb 2022 13:46:54 -0700 Subject: [PATCH] enabled capability of added more than one stream per api url. Also enabled common parameters among them (#13) Co-authored-by: Josh Lloyd --- README.md | 32 ++++-- meltano.yml | 45 +++++--- tap_rest_api_msdk/tap.py | 214 ++++++++++++++++++++++++--------------- tests/test_streams.py | 27 ++--- tests/test_tap.py | 27 +++++ 5 files changed, 229 insertions(+), 116 deletions(-) diff --git a/README.md b/README.md index cd99541..0d7baa9 100644 --- a/README.md +++ b/README.md @@ -66,17 +66,37 @@ tap is available by running: tap-rest-api-msdk --about ``` -Config Options: +#### Top-level config options. +Parameters that appear at the stream-level will overwrite their top-level +counterparts except where noted in the stream-level params. Otherwise, the values +provided at the top-level will be the default values for each stream.: - `api_url`: required: the base url/endpoint for the desired api. -- `name`: required: name of the stream. -- `path`: optional: the path appeneded to the `api_url`. -- `params`: optional: an object of objects that provide the `params` in a `requests.get` method. -- `headers`: optional: an object of headers to pass into the api calls. -- `records_path`: optional: a jsonpath string representing the path in the requests response that contains the records to process. Defaults to `$[*]`. - `pagination_request_style`: optional: style for requesting pagination, defaults to `default`, see Pagination below. - `pagination_response_style`: optional: style of pagination results, defaults to `default`, see Pagination below. - `pagination_page_size`: optional: limit for size of page, defaults to None. - `next_page_token_path`: optional: a jsonpath string representing the path to the "next page" token. Defaults to `$.next_page`. +- `streams`: required: a list of objects that contain the configuration of each stream. See stream-level params below. +- `path`: optional: see stream-level params below. +- `params`: optional: see stream-level params below. +- `headers`: optional: see stream-level params below. +- `records_path`: optional: see stream-level params below. +- `primary_keys`: optional: see stream-level params below. +- `replication_key`: optional: see stream-level params below. +- `except_keys`: optional: see stream-level params below. +- `num_inference_keys`: optional: see stream-level params below. + +#### Stream level config options. +Parameters that appear at the stream-level +will overwrite their top-level counterparts except where noted below: +- `name`: required: name of the stream. +- `path`: optional: the path appeneded to the `api_url`. +- `params`: optional: an object of objects that provide the `params` in a `requests.get` method. + Stream level params will be merged with top-level params with stream level params overwriting + top-level params with the same key. +- `headers`: optional: an object of headers to pass into the api calls. Stream level + headers will be merged with top-level params with stream level params overwriting + top-level params with the same key +- `records_path`: optional: a jsonpath string representing the path in the requests response that contains the records to process. Defaults to `$[*]`. - `primary_keys`: required: a list of the json keys of the primary key for the stream. - `replication_key`: optional: the json key of the replication key. Note that this should be an incrementing integer or datetime object. - `except_keys`: This tap automatically flattens the entire json structure and builds keys based on the corresponding paths. diff --git a/meltano.yml b/meltano.yml index b61303c..3d1018e 100644 --- a/meltano.yml +++ b/meltano.yml @@ -11,32 +11,47 @@ plugins: - discover settings: - name: api_url - - name: name - - name: path - - name: params - - name: headers - - name: records_path + kind: string - name: next_page_token_path + kind: string - name: pagination_request_style + kind: string - name: pagination_response_style + kind: string - name: pagination_page_size + kind: integer + - name: streams + kind: array + - name: path + kind: string + - name: params + kind: object + - name: headers + kind: object + - name: records_path + kind: string - name: primary_keys + kind: array - name: replication_key + kind: string - name: except_keys + kind: array - name: num_inference_records + kind: integer config: - name: us_earthquakes api_url: https://earthquake.usgs.gov/fdsnws - path: /event/1/query - params: - format: geojson - starttime: "2014-01-01" - endtime: "2014-01-02" - minmagnitude: 1 - primary_keys: - - id records_path: "$.features[*]" - num_inference_records: 100 + streams: + - name: us_earthquakes + path: /event/1/query + params: + format: geojson + starttime: "2014-01-01" + endtime: "2014-01-02" + minmagnitude: 1 + primary_keys: + - id + num_inference_records: 100 select: - '*.*' loaders: diff --git a/tap_rest_api_msdk/tap.py b/tap_rest_api_msdk/tap.py index b4e28da..e6cdbe3 100644 --- a/tap_rest_api_msdk/tap.py +++ b/tap_rest_api_msdk/tap.py @@ -1,10 +1,11 @@ """rest-api tap class.""" +import copy from typing import Any, List import requests from genson import SchemaBuilder -from singer_sdk import Stream, Tap +from singer_sdk import Tap from singer_sdk import typing as th from singer_sdk.helpers.jsonpath import extract_jsonpath from tap_rest_api_msdk.streams import DynamicStream @@ -16,47 +17,87 @@ class TapRestApiMsdk(Tap): name = "tap-rest-api-msdk" - config_jsonschema = th.PropertiesList( - th.Property( - "api_url", - th.StringType, - required=True, - description="the base url/endpoint for the desired api", - ), - # th.Property("auth_method", th.StringType, default='no_auth', required=False), - # th.Property("auth_token", th.StringType, required=False), - th.Property( - "name", th.StringType, required=True, description="name of the stream" - ), + common_properties = th.PropertiesList( th.Property( "path", th.StringType, - default="", required=False, - description="the path appeneded to the `api_url`.", + description="the path appended to the `api_url`. Stream-level path will " + "overwrite top-level path", ), th.Property( "params", th.ObjectType(), + default={}, required=False, - description="an object of objects that provide the `params` in a " - "`requests.get` method.", + description="an object providing the `params` in a `requests.get` method. " + "Stream level params will be merged" + "with top-level params with stream level params overwriting" + "top-level params with the same key.", ), th.Property( "headers", th.ObjectType(), required=False, - description="an object of headers to pass into the api calls.", + description="An object of headers to pass into the api calls. Stream level" + "headers will be merged with top-level params with stream" + "level params overwriting top-level params with the same key", ), th.Property( "records_path", th.StringType, - default="$[*]", required=False, description="a jsonpath string representing the path in the requests " - "response that contains the " - "records to process. Defaults to `$[*]`.", + "response that contains the records to process. Defaults " + "to `$[*]`. Stream level records_path will overwrite " + "the top-level records_path", + ), + th.Property( + "primary_keys", + th.ArrayType(th.StringType), + required=False, + description="a list of the json keys of the primary key for the stream.", + ), + th.Property( + "replication_key", + th.StringType, + required=False, + description="the json key of the replication key. Note that this should " + "be an incrementing integer or datetime object.", + ), + th.Property( + "except_keys", + th.ArrayType(th.StringType), + default=[], + required=False, + description="This tap automatically flattens the entire json structure " + "and builds keys based on the corresponding paths.; Keys, " + "whether composite or otherwise, listed in this dictionary " + "will not be recursively flattened, but instead their values " + "will be; turned into a json string and processed in that " + "format. This is also automatically done for any lists within " + "the records; therefore, records are not duplicated for each " + "item in lists.", + ), + th.Property( + "num_inference_records", + th.NumberType, + default=50, + required=False, + description="number of records used to infer the stream's schema. " + "Defaults to 50.", + ), + ) + + top_level_properties = th.PropertiesList( + th.Property( + "api_url", + th.StringType, + required=True, + description="the base url/endpoint for the desired api", ), + # th.Property("auth_method", th.StringType, default='no_auth', required=False), + # th.Property("auth_token", th.StringType, required=False), th.Property( "next_page_token_path", th.StringType, @@ -86,81 +127,90 @@ class TapRestApiMsdk(Tap): th.IntegerType, default=None, required=False, - description="the size of each page in records. " "Defaults to None", - ), - th.Property( - "primary_keys", - th.ArrayType(th.StringType), - required=True, - description="a list of the json keys of the primary key for the stream.", - ), - th.Property( - "replication_key", - th.StringType, - required=False, - description="the json key of the replication key. Note that this should " - "be an incrementing integer or datetime object.", + description="the size of each page in records. Defaults to None", ), + ) + + # add common properties to top-level properties + for prop in common_properties.wrapped: + top_level_properties.append(prop) + + # add common properties to the stream schema + stream_properties = th.PropertiesList() + stream_properties.wrapped = copy.copy(common_properties.wrapped) + stream_properties.append( th.Property( - "except_keys", - th.ArrayType(th.StringType), - default=[], - required=False, - description="This tap automatically flattens the entire json structure " - "and builds keys based on the corresponding paths.; Keys, " - "whether composite or otherwise, listed in this dictionary " - "will not be recursively flattened, but instead their values " - "will be; turned into a json string and processed in that " - "format. This is also automatically done for any lists within " - "the records; therefore, records are not duplicated for each " - "item in lists.", + "name", th.StringType, required=True, description="name of the stream" ), + ) + + # add streams schema to top-level properties + top_level_properties.append( th.Property( - "num_inference_records", - th.NumberType, - default=50, + "streams", + th.ArrayType(th.ObjectType(*stream_properties.wrapped)), required=False, - description="number of records used to infer the stream's schema. " - "Defaults to 50.", + description="An array of streams, designed for separate paths using the" + "same base url.", ), - ).to_dict() + ) - def discover_streams(self) -> List[Stream]: + config_jsonschema = top_level_properties.to_dict() + + def discover_streams(self) -> List[DynamicStream]: # type: ignore """Return a list of discovered streams. Returns: A list of streams. """ - return [ - DynamicStream( - tap=self, - name=self.config["name"], - path=self.config["path"], - params=self.config.get("params"), - headers=self.config.get("headers"), - records_path=self.config["records_path"], - next_page_token_path=self.config["next_page_token_path"], - primary_keys=self.config["primary_keys"], - replication_key=self.config.get("replication_key"), - except_keys=self.config.get("except_keys"), - schema=self.get_schema( - self.config["records_path"], - self.config.get("except_keys"), # type: ignore - self.config.get("num_inference_records"), # type: ignore - self.config["path"], - self.config.get("params"), # type: ignore - self.config.get("headers"), # type: ignore - ), - pagination_request_style=self.config.get( # type: ignore - "pagination_request_style" - ), - pagination_response_style=self.config.get( # type: ignore - "pagination_response_style" - ), - pagination_page_size=self.config.get("pagination_page_size"), + # print(self.top_level_properties.to_dict()) + + streams = [] + for stream in self.config["streams"]: + # resolve config + records_path = stream.get( + "records_path", self.config.get("records_path", "$[*]") ) - ] + except_keys = stream.get("except_keys", self.config.get("except_keys", [])) + path = stream.get("path", self.config.get("path", "")) + params = {**self.config.get("params", {}), **stream.get("params", {})} + headers = {**self.config.get("headers", {}), **stream.get("headers", {})} + + streams.append( + DynamicStream( + tap=self, + name=stream["name"], + path=path, + params=params, + headers=headers, + records_path=records_path, + primary_keys=stream.get( + "primary_keys", self.config.get("primary_keys", []) + ), + replication_key=stream.get( + "replication_key", self.config.get("replication_key", "") + ), + except_keys=except_keys, + next_page_token_path=self.config["next_page_token_path"], + pagination_request_style=self.config["pagination_request_style"], + pagination_response_style=self.config["pagination_response_style"], + pagination_page_size=self.config.get("pagination_page_size"), + schema=self.get_schema( + records_path, + except_keys, + stream.get( + "num_inference_records", + self.config["num_inference_records"], + ), + path, + params, + headers, + ), + ) + ) + + return streams def get_schema( self, diff --git a/tests/test_streams.py b/tests/test_streams.py index 17a1dd3..d503abb 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -17,13 +17,15 @@ def config(extras: dict = None) -> dict: """ contents = { "api_url": "https://example.com", - "name": "stream_name", - "auth_method": "no_auth", - "auth_token": "", - "path": "/path_test", - "primary_keys": ["key1", "key2"], - "replication_key": "key3", - "records_path": "$.records[*]", + "streams": [ + { + "name": "stream_name", + "path": "/path_test", + "primary_keys": ["key1", "key2"], + "replication_key": "key3", + "records_path": "$.records[*]", + } + ], } if extras: for k, v in extras.items(): @@ -71,6 +73,7 @@ def url_path(path: str = "/path_test") -> str: def setup_api( requests_mock: Any, + url_path: str = url_path(), json_extras: dict = None, headers_extras: dict = None, matcher: Any = None, @@ -79,6 +82,7 @@ def setup_api( Args: requests_mock: mock object for requests. + url_path: url to mack for mocking. json_extras: extra items to add to the response's results. headers_extras: extra items to add to the API call's header. matcher: a function that checks a request's input for the appropriate @@ -93,12 +97,12 @@ def setup_api( headers_resp[k] = v requests_mock.get( - url_path(), + url_path, headers=headers_resp, json=json_resp(json_extras), additional_matcher=matcher, ) - return requests.Session().get(url_path()) + return requests.Session().get(url_path) def test_get_next_page_token_default_jsonpath(requests_mock: Any): @@ -110,10 +114,7 @@ def test_get_next_page_token_default_jsonpath(requests_mock: Any): stream0 = TapRestApiMsdk(config=config(), parse_env_config=True).discover_streams()[ 0 ] - assert ( - stream0._get_next_page_token_default(resp, "previous_token_example") - == "next_page_token_example" - ) + assert stream0._get_next_page_token_default(resp, "t") == "next_page_token_example" assert stream0.get_next_page_token == stream0._get_next_page_token_default diff --git a/tests/test_tap.py b/tests/test_tap.py index b83e160..57ef4e7 100644 --- a/tests/test_tap.py +++ b/tests/test_tap.py @@ -22,3 +22,30 @@ def test_schema_inference(requests_mock): "key3": {"type": "string"}, }, } + + +def test_multiple_streams(requests_mock): + setup_api(requests_mock) + setup_api(requests_mock, url_path="https://example.com/path_test2") + configs = config({"records_path": "$.records[*]"}) + configs["streams"].append( + { + "name": "stream_name2", + "path": "/path_test2", + "primary_keys": ["key4", "key5"], + "replication_key": "key6", + } + ) + + streams = TapRestApiMsdk(config=configs, parse_env_config=True).discover_streams() + + assert streams[0].name == "stream_name" + assert streams[0].records_path == "$.records[*]" + assert streams[0].path == "/path_test" + assert streams[0].primary_keys == ["key1", "key2"] + assert streams[0].replication_key == "key3" + assert streams[1].name == "stream_name2" + assert streams[1].records_path == "$.records[*]" + assert streams[1].path == "/path_test2" + assert streams[1].primary_keys == ["key4", "key5"] + assert streams[1].replication_key == "key6"