diff --git a/README.md b/README.md index 0d7baa9..f637bde 100644 --- a/README.md +++ b/README.md @@ -89,7 +89,7 @@ provided at the top-level will be the default values for each stream.: Parameters that appear at the stream-level will overwrite their top-level counterparts except where noted below: - `name`: required: name of the stream. -- `path`: optional: the path appeneded to the `api_url`. +- `path`: optional: the path appended to the `api_url`. - `params`: optional: an object of objects that provide the `params` in a `requests.get` method. Stream level params will be merged with top-level params with stream level params overwriting top-level params with the same key. @@ -104,6 +104,9 @@ will overwrite their top-level counterparts except where noted below: turned into a json string and processed in that format. This is also automatically done for any lists within the records; therefore, records are not duplicated for each item in lists. - `num_inference_keys`: optional: number of records used to infer the stream's schema. Defaults to 50. +- `scheam`: optional: A valid Singer schema or a path-like string that provides + the path to a `.json` file that contains a valid Singer schema. If provided, + the schema will not be inferred from the results of an api call. ## Pagination Pagination is a complex topic as there is no real single standard, and many different implementations. Unless options are provided, both the request and results stype default to the `default`, which is the pagination style originally implemented. diff --git a/tap_rest_api_msdk/tap.py b/tap_rest_api_msdk/tap.py index e6cdbe3..d92b9f1 100644 --- a/tap_rest_api_msdk/tap.py +++ b/tap_rest_api_msdk/tap.py @@ -1,6 +1,7 @@ """rest-api tap class.""" import copy +import json from typing import Any, List import requests @@ -143,6 +144,19 @@ class TapRestApiMsdk(Tap): "name", th.StringType, required=True, description="name of the stream" ), ) + stream_properties.append( + th.Property( + "schema", + th.CustomType( + {"anyOf": [{"type": "string"}, {"type": "null"}, {"type:": "object"}]} + ), + required=False, + description="A valid Singer schema or a path-like string that provides " + "the path to a `.json` file that contains a valid Singer " + "schema. If provided, the schema will not be inferred from " + "the results of an api call.", + ), + ) # add streams schema to top-level properties top_level_properties.append( @@ -177,6 +191,33 @@ def discover_streams(self) -> List[DynamicStream]: # type: ignore params = {**self.config.get("params", {}), **stream.get("params", {})} headers = {**self.config.get("headers", {}), **stream.get("headers", {})} + schema = {} + schema_config = stream.get("schema") + if isinstance(schema_config, str): + self.logger.info("Found path to a schema, not doing discovery.") + with open(schema_config, "r") as f: + schema = json.load(f) + + elif isinstance(schema_config, dict): + self.logger.info("Found schema in config, not doing discovery.") + builder = SchemaBuilder() + builder.add_schema(schema_config) + schema = builder.to_schema() + + else: + self.logger.info("No schema found. Inferring schema from API call.") + schema = self.get_schema( + records_path, + except_keys, + stream.get( + "num_inference_records", + self.config["num_inference_records"], + ), + path, + params, + headers, + ) + streams.append( DynamicStream( tap=self, @@ -196,17 +237,7 @@ def discover_streams(self) -> List[DynamicStream]: # type: ignore pagination_request_style=self.config["pagination_request_style"], pagination_response_style=self.config["pagination_response_style"], pagination_page_size=self.config.get("pagination_page_size"), - schema=self.get_schema( - records_path, - except_keys, - stream.get( - "num_inference_records", - self.config["num_inference_records"], - ), - path, - params, - headers, - ), + schema=schema, ) ) diff --git a/tests/schema.json b/tests/schema.json new file mode 100644 index 0000000..dd15f58 --- /dev/null +++ b/tests/schema.json @@ -0,0 +1,26 @@ +{ + "$schema": "http://json-schema.org/schema#", + "type": "object", + "properties": { + "key1": { + "type": "string" + }, + "key2": { + "type": "string" + }, + "key3": { + "type": "string" + }, + "field1": { + "type": "string" + }, + "field2": { + "type": "integer" + } + }, + "required": [ + "key1", + "key2", + "key3" + ] +} diff --git a/tests/test_tap.py b/tests/test_tap.py index 57ef4e7..bee27ab 100644 --- a/tests/test_tap.py +++ b/tests/test_tap.py @@ -1,7 +1,12 @@ +import json + from tap_rest_api_msdk.tap import TapRestApiMsdk from tests.test_streams import config, setup_api +with open("tests/schema.json", "r") as f: + BASIC_SCHEMA = json.load(f) + def test_schema_inference(requests_mock): setup_api(requests_mock) @@ -10,18 +15,25 @@ def test_schema_inference(requests_mock): 0 ] - assert stream0.schema == { - "$schema": "http://json-schema.org/schema#", - "required": ["key1", "key2", "key3"], - "type": "object", - "properties": { - "field1": {"type": "string"}, - "field2": {"type": "integer"}, - "key1": {"type": "string"}, - "key2": {"type": "string"}, - "key3": {"type": "string"}, - }, - } + assert stream0.schema == BASIC_SCHEMA + + +def test_schema_from_file(): + configs = config() + configs["streams"][0]["schema"] = "tests/schema.json" + + s0 = TapRestApiMsdk(config=configs, parse_env_config=True).discover_streams()[0] + + assert s0.schema == BASIC_SCHEMA + + +def test_schema_from_object(): + configs = config() + configs["streams"][0]["schema"] = BASIC_SCHEMA + + s0 = TapRestApiMsdk(config=configs, parse_env_config=True).discover_streams()[0] + + assert s0.schema == BASIC_SCHEMA def test_multiple_streams(requests_mock):