Skip to content

Commit

Permalink
enabled capability of added more than one stream per api url. Also en…
Browse files Browse the repository at this point in the history
…abled common parameters among them (#13)

Co-authored-by: Josh Lloyd <[email protected]>
  • Loading branch information
jlloyd-widen and Josh Lloyd authored Feb 18, 2022
1 parent 67d4d62 commit dccbc08
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 116 deletions.
32 changes: 26 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,37 @@ tap is available by running:
tap-rest-api-msdk --about
```

Config Options:
#### Top-level config options.
Parameters that appear at the stream-level will overwrite their top-level
counterparts except where noted in the stream-level params. Otherwise, the values
provided at the top-level will be the default values for each stream.:
- `api_url`: required: the base url/endpoint for the desired api.
- `name`: required: name of the stream.
- `path`: optional: the path appeneded to the `api_url`.
- `params`: optional: an object of objects that provide the `params` in a `requests.get` method.
- `headers`: optional: an object of headers to pass into the api calls.
- `records_path`: optional: a jsonpath string representing the path in the requests response that contains the records to process. Defaults to `$[*]`.
- `pagination_request_style`: optional: style for requesting pagination, defaults to `default`, see Pagination below.
- `pagination_response_style`: optional: style of pagination results, defaults to `default`, see Pagination below.
- `pagination_page_size`: optional: limit for size of page, defaults to None.
- `next_page_token_path`: optional: a jsonpath string representing the path to the "next page" token. Defaults to `$.next_page`.
- `streams`: required: a list of objects that contain the configuration of each stream. See stream-level params below.
- `path`: optional: see stream-level params below.
- `params`: optional: see stream-level params below.
- `headers`: optional: see stream-level params below.
- `records_path`: optional: see stream-level params below.
- `primary_keys`: optional: see stream-level params below.
- `replication_key`: optional: see stream-level params below.
- `except_keys`: optional: see stream-level params below.
- `num_inference_keys`: optional: see stream-level params below.

#### Stream level config options.
Parameters that appear at the stream-level
will overwrite their top-level counterparts except where noted below:
- `name`: required: name of the stream.
- `path`: optional: the path appeneded to the `api_url`.
- `params`: optional: an object of objects that provide the `params` in a `requests.get` method.
Stream level params will be merged with top-level params with stream level params overwriting
top-level params with the same key.
- `headers`: optional: an object of headers to pass into the api calls. Stream level
headers will be merged with top-level params with stream level params overwriting
top-level params with the same key
- `records_path`: optional: a jsonpath string representing the path in the requests response that contains the records to process. Defaults to `$[*]`.
- `primary_keys`: required: a list of the json keys of the primary key for the stream.
- `replication_key`: optional: the json key of the replication key. Note that this should be an incrementing integer or datetime object.
- `except_keys`: This tap automatically flattens the entire json structure and builds keys based on the corresponding paths.
Expand Down
45 changes: 30 additions & 15 deletions meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,47 @@ plugins:
- discover
settings:
- name: api_url
- name: name
- name: path
- name: params
- name: headers
- name: records_path
kind: string
- name: next_page_token_path
kind: string
- name: pagination_request_style
kind: string
- name: pagination_response_style
kind: string
- name: pagination_page_size
kind: integer
- name: streams
kind: array
- name: path
kind: string
- name: params
kind: object
- name: headers
kind: object
- name: records_path
kind: string
- name: primary_keys
kind: array
- name: replication_key
kind: string
- name: except_keys
kind: array
- name: num_inference_records
kind: integer
config:
name: us_earthquakes
api_url: https://earthquake.usgs.gov/fdsnws
path: /event/1/query
params:
format: geojson
starttime: "2014-01-01"
endtime: "2014-01-02"
minmagnitude: 1
primary_keys:
- id
records_path: "$.features[*]"
num_inference_records: 100
streams:
- name: us_earthquakes
path: /event/1/query
params:
format: geojson
starttime: "2014-01-01"
endtime: "2014-01-02"
minmagnitude: 1
primary_keys:
- id
num_inference_records: 100
select:
- '*.*'
loaders:
Expand Down
214 changes: 132 additions & 82 deletions tap_rest_api_msdk/tap.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""rest-api tap class."""

import copy
from typing import Any, List

import requests
from genson import SchemaBuilder
from singer_sdk import Stream, Tap
from singer_sdk import Tap
from singer_sdk import typing as th
from singer_sdk.helpers.jsonpath import extract_jsonpath
from tap_rest_api_msdk.streams import DynamicStream
Expand All @@ -16,47 +17,87 @@ class TapRestApiMsdk(Tap):

name = "tap-rest-api-msdk"

config_jsonschema = th.PropertiesList(
th.Property(
"api_url",
th.StringType,
required=True,
description="the base url/endpoint for the desired api",
),
# th.Property("auth_method", th.StringType, default='no_auth', required=False),
# th.Property("auth_token", th.StringType, required=False),
th.Property(
"name", th.StringType, required=True, description="name of the stream"
),
common_properties = th.PropertiesList(
th.Property(
"path",
th.StringType,
default="",
required=False,
description="the path appeneded to the `api_url`.",
description="the path appended to the `api_url`. Stream-level path will "
"overwrite top-level path",
),
th.Property(
"params",
th.ObjectType(),
default={},
required=False,
description="an object of objects that provide the `params` in a "
"`requests.get` method.",
description="an object providing the `params` in a `requests.get` method. "
"Stream level params will be merged"
"with top-level params with stream level params overwriting"
"top-level params with the same key.",
),
th.Property(
"headers",
th.ObjectType(),
required=False,
description="an object of headers to pass into the api calls.",
description="An object of headers to pass into the api calls. Stream level"
"headers will be merged with top-level params with stream"
"level params overwriting top-level params with the same key",
),
th.Property(
"records_path",
th.StringType,
default="$[*]",
required=False,
description="a jsonpath string representing the path in the requests "
"response that contains the "
"records to process. Defaults to `$[*]`.",
"response that contains the records to process. Defaults "
"to `$[*]`. Stream level records_path will overwrite "
"the top-level records_path",
),
th.Property(
"primary_keys",
th.ArrayType(th.StringType),
required=False,
description="a list of the json keys of the primary key for the stream.",
),
th.Property(
"replication_key",
th.StringType,
required=False,
description="the json key of the replication key. Note that this should "
"be an incrementing integer or datetime object.",
),
th.Property(
"except_keys",
th.ArrayType(th.StringType),
default=[],
required=False,
description="This tap automatically flattens the entire json structure "
"and builds keys based on the corresponding paths.; Keys, "
"whether composite or otherwise, listed in this dictionary "
"will not be recursively flattened, but instead their values "
"will be; turned into a json string and processed in that "
"format. This is also automatically done for any lists within "
"the records; therefore, records are not duplicated for each "
"item in lists.",
),
th.Property(
"num_inference_records",
th.NumberType,
default=50,
required=False,
description="number of records used to infer the stream's schema. "
"Defaults to 50.",
),
)

top_level_properties = th.PropertiesList(
th.Property(
"api_url",
th.StringType,
required=True,
description="the base url/endpoint for the desired api",
),
# th.Property("auth_method", th.StringType, default='no_auth', required=False),
# th.Property("auth_token", th.StringType, required=False),
th.Property(
"next_page_token_path",
th.StringType,
Expand Down Expand Up @@ -86,81 +127,90 @@ class TapRestApiMsdk(Tap):
th.IntegerType,
default=None,
required=False,
description="the size of each page in records. " "Defaults to None",
),
th.Property(
"primary_keys",
th.ArrayType(th.StringType),
required=True,
description="a list of the json keys of the primary key for the stream.",
),
th.Property(
"replication_key",
th.StringType,
required=False,
description="the json key of the replication key. Note that this should "
"be an incrementing integer or datetime object.",
description="the size of each page in records. Defaults to None",
),
)

# add common properties to top-level properties
for prop in common_properties.wrapped:
top_level_properties.append(prop)

# add common properties to the stream schema
stream_properties = th.PropertiesList()
stream_properties.wrapped = copy.copy(common_properties.wrapped)
stream_properties.append(
th.Property(
"except_keys",
th.ArrayType(th.StringType),
default=[],
required=False,
description="This tap automatically flattens the entire json structure "
"and builds keys based on the corresponding paths.; Keys, "
"whether composite or otherwise, listed in this dictionary "
"will not be recursively flattened, but instead their values "
"will be; turned into a json string and processed in that "
"format. This is also automatically done for any lists within "
"the records; therefore, records are not duplicated for each "
"item in lists.",
"name", th.StringType, required=True, description="name of the stream"
),
)

# add streams schema to top-level properties
top_level_properties.append(
th.Property(
"num_inference_records",
th.NumberType,
default=50,
"streams",
th.ArrayType(th.ObjectType(*stream_properties.wrapped)),
required=False,
description="number of records used to infer the stream's schema. "
"Defaults to 50.",
description="An array of streams, designed for separate paths using the"
"same base url.",
),
).to_dict()
)

def discover_streams(self) -> List[Stream]:
config_jsonschema = top_level_properties.to_dict()

def discover_streams(self) -> List[DynamicStream]: # type: ignore
"""Return a list of discovered streams.
Returns:
A list of streams.
"""
return [
DynamicStream(
tap=self,
name=self.config["name"],
path=self.config["path"],
params=self.config.get("params"),
headers=self.config.get("headers"),
records_path=self.config["records_path"],
next_page_token_path=self.config["next_page_token_path"],
primary_keys=self.config["primary_keys"],
replication_key=self.config.get("replication_key"),
except_keys=self.config.get("except_keys"),
schema=self.get_schema(
self.config["records_path"],
self.config.get("except_keys"), # type: ignore
self.config.get("num_inference_records"), # type: ignore
self.config["path"],
self.config.get("params"), # type: ignore
self.config.get("headers"), # type: ignore
),
pagination_request_style=self.config.get( # type: ignore
"pagination_request_style"
),
pagination_response_style=self.config.get( # type: ignore
"pagination_response_style"
),
pagination_page_size=self.config.get("pagination_page_size"),
# print(self.top_level_properties.to_dict())

streams = []
for stream in self.config["streams"]:
# resolve config
records_path = stream.get(
"records_path", self.config.get("records_path", "$[*]")
)
]
except_keys = stream.get("except_keys", self.config.get("except_keys", []))
path = stream.get("path", self.config.get("path", ""))
params = {**self.config.get("params", {}), **stream.get("params", {})}
headers = {**self.config.get("headers", {}), **stream.get("headers", {})}

streams.append(
DynamicStream(
tap=self,
name=stream["name"],
path=path,
params=params,
headers=headers,
records_path=records_path,
primary_keys=stream.get(
"primary_keys", self.config.get("primary_keys", [])
),
replication_key=stream.get(
"replication_key", self.config.get("replication_key", "")
),
except_keys=except_keys,
next_page_token_path=self.config["next_page_token_path"],
pagination_request_style=self.config["pagination_request_style"],
pagination_response_style=self.config["pagination_response_style"],
pagination_page_size=self.config.get("pagination_page_size"),
schema=self.get_schema(
records_path,
except_keys,
stream.get(
"num_inference_records",
self.config["num_inference_records"],
),
path,
params,
headers,
),
)
)

return streams

def get_schema(
self,
Expand Down
Loading

0 comments on commit dccbc08

Please sign in to comment.