Below you will find a collection of code samples which can be used for inspiration.
Below are full project samples, contributed by members in the community. Use these for inspiration or to get more information on what an SDK-based tap or target will look like.
- tap-bamboohr by Auto IDM
- tap-confluence by @edgarrmondragon
- tap-investing by @DouweM
- tap-parquet by AJ
- tap-powerbi-metadata by Slalom
- target-athena, Community Project led by Andrew Stewart
To add your project to this list, please submit an issue.
These are code samples taken from other projects. Use these as a reference if you get stuck.
- A simple Tap class definition with two streams
- Define a simple GraphQL-based stream with schema defined in a file
- Define a REST-based stream with a JSONPath expression
- Use a JSONPath expression to extract the next page URL from a HATEOAS response
- Dynamically discovering
schema
for a stream - Initialize a collection of tap streams with differing types
- Run the standard built-in tap tests
- Make all streams reuse the same authenticator instance
- Make a stream reuse the same authenticator instance for all requests
- Custom response validation
- Custom Backoff
class TapCountries(Tap):
"""Sample tap for Countries GraphQL API. This tap has no
config options and does not require authentication.
"""
name = "tap-countries"
config_jsonschema = th.PropertiesList([]).to_dict()
def discover_streams(self) -> List[Stream]:
"""Return a list containing the two stream types."""
return [
CountriesStream(tap=self),
ContinentsStream(tap=self),
]
class ContinentsStream(GraphQLStream):
"""Continents stream from the Countries API."""
name = "continents"
primary_keys = ["code"]
replication_key = None # Incremental bookmarks not needed
# Read JSON Schema definition from a text file:
schema_filepath = SCHEMAS_DIR / "continents.json"
# GraphQL API endpoint and query text:
url_base = "https://countries.trevorblades.com/"
query = """
continents {
code
name
}
"""
class LOTRCharactersStream(RESTStream):
"""Characters stream from the Lord of the Rings 'The One' API."""
# Base REST API configuration
url_base = "https://the-one-api.dev/v2"
primary_keys = ["_id"]
# Endpoint configuration
path = "/character"
name = "characters"
records_jsonpath = "$.docs[*]"
@property
def authenticator(self):
return SimpleAuthenticator(
stream=self,
auth_headers={
"Authorization": f"Bearer {self.config.get('api_key')}",
},
)
Use a JSONPath expression to extract the next page URL from a HATEOAS response
class MyStream(RESTStream):
"""A custom stream."""
# Gets the href property from the links item where rel="next"
next_page_token_jsonpath = "$.links[?(@.rel=='next')].href"
Here is an example which parses schema from a CSV file:
FAKECSV = """
Header1,Header2,Header3
val1,val2,val3
val1,val2,val3
val1,val2,val3
"""
@property
class ParquetStream(Stream):
def schema(self):
"""Dynamically detect the json schema for the stream.
This is evaluated prior to any records being retrieved.
"""
properties: List[th.Property] = []
for header in FAKECSV.split("\n")[0].split(","):
# Assume string type for all fields
properties.append(th.Property(header, th.StringType()))
return th.PropertiesList(*properties).to_dict()
Here is another example from the Parquet tap. This sample uses a
custom get_jsonschema_type()
function to return the data type.
class ParquetStream(Stream):
"""Stream class for Parquet streams."""
#...
@property
def schema(self) -> dict:
"""Dynamically detect the json schema for the stream.
This is evaluated prior to any records being retrieved.
"""
properties: List[th.Property] = []
# Get a schema object using the parquet and pyarrow libraries
parquet_schema = pq.ParquetFile(self.filepath).schema_arrow
# Loop through each column in the schema object
for i in range(len(parquet_schema.names)):
# Get the column name
name = parquet_schema.names[i]
# Translate from the Parquet type to a JSON Schema type
dtype = get_jsonschema_type(str(parquet_schema.types[i]))
# Add the new property to our list
properties.append(th.Property(name, dtype))
# Return the list as a JSON Schema dictionary object
return th.PropertiesList(*properties).to_dict()
class TapCountries(Tap):
# ...
def discover_streams(self) -> List[Stream]:
"""Return a list containing one each of the two stream types."""
return [
CountriesStream(tap=self),
ContinentsStream(tap=self),
]
Or equivalently:
# Declare list of types here at the top of the file
STREAM_TYPES = [
CountriesStream,
ContinentsStream,
]
class TapCountries(Tap):
# ...
def discover_streams(self) -> List[Stream]:
"""Return a list with one each of all defined stream types."""
return [
stream_type(tap=self)
for stream_type in STREAM_TYPES
]
# Import the tests
from singer_sdk.testing import get_standard_tap_tests
# Import our tap class
from tap_parquet.tap import TapParquet
SAMPLE_CONFIG = {
# ...
}
def test_sdk_standard_tap_tests():
"""Run the built-in tap tests from the SDK."""
tests = get_standard_tap_tests(TapParquet, config=SAMPLE_CONFIG)
for test in tests:
test()
from singer_sdk.authenticators import OAuthAuthenticator, SingletonMeta
from singer_sdk.streams import RESTStream
class SingletonAuthenticator(OAuthAuthenticator, metaclass=SingletonMeta):
"""A singleton authenticator."""
class SingletonAuthStream(RESTStream):
"""A stream with singleton authenticator."""
@property
def authenticator(self) -> SingletonAuthenticator:
"""Stream authenticator."""
return SingletonAuthenticator(stream=self)
from memoization import cached
from singer_sdk.authenticators import APIAuthenticatorBase
from singer_sdk.streams import RESTStream
class CachedAuthStream(RESTStream):
"""A stream with singleton authenticator."""
@property
@cached
def authenticator(self) -> APIAuthenticatorBase:
"""Stream authenticator."""
return APIAuthenticatorBase(stream=self)
from requests.auth import HTTPDigestAuth
from singer_sdk.streams import RESTStream
class DigestAuthStream(RESTStream):
"""A stream with digest authentication."""
@property
def authenticator(self) -> HTTPDigestAuth:
"""Stream authenticator."""
return HTTPDigestAuth(
username=self.config["username"],
password=self.config["password"],
)
HTTPBasicAuth
and
HTTPProxyAuth
are also available in requests.auth
. In addition to requests.auth
classes, the community
has published a few packages with custom authenticator classes, which are compatible with the SDK.
For example:
requests-aws4auth
: AWS v4 authenticationrequests_auth
: A collection of authenticators for various services and protocols including Azure, Okta and NTLM.
Some APIs deviate from HTTP status codes to report failures. For those cases,
you can override RESTStream.validate_response()
and raise FatalAPIError
if an unrecoverable error is detected. If the API also has transient errors, either client-side
like rate limits, or server-side like temporary 5xx, you can raise
RetriableAPIError
and the request will be retried with back-off:
from enum import Enum
from singer_sdk.exceptions import FatalAPIError, RetriableAPIError
from singer_sdk.streams.rest import RESTStream
class CustomResponseValidationStream(RESTStream):
"""Stream with non-conventional error response."""
url_base = "https://badapi.test"
name = "non_conventional"
schema = {"type": "object", "properties": {}}
path = "/dummy
class StatusMessage(str, Enum):
"""Possible status messages."""
OK = "OK"
ERROR = "ERROR"
UNAVAILABLE = "UNAVAILABLE"
def validate_response(self, response):
# Still catch error status codes
super().validate_response(response)
data = response.json()
if data["status"] == self.StatusMessage.ERROR:
raise FatalAPIError("Error message found :(")
if data["status"] == self.StatusMessage.UNAVAILABLE:
raise RetriableAPIError("API is unavailable")
Custom backoff and retry behaviour can be added by overriding the methods:
For example, to use a constant retry:
def backoff_wait_generator() -> Callable[..., Generator[int, Any, None]]:
return backoff.constant(interval=10)
To utilise a response header as a wait value you can use backoff_runtime
, and pass a method that returns a wait value:
def backoff_wait_generator() -> Callable[..., Generator[int, Any, None]]:
def _backoff_from_headers(retriable_api_error):
response_headers = retriable_api_error.response.headers
return int(response_headers.get("Retry-After", 0))
return self.backoff_runtime(value=_backoff_from_headers)
More links, resources, and example solutions are available from community
members in the #singer-tap-development
and #singer-target-development
channels on Meltano Slack.