Skip to content

Commit

Permalink
Feature/sdk_paginators_and_oauth_methods (#36)
Browse files Browse the repository at this point in the history
* Added Auth Methods and HATEOAS Request

* Resolving Copy/Paste issues

* Supporting Meltano paginators - Removes deprecated code.

* Updated documentation with Examples

* Adding support for nested offset dictionary

* Updated to support DBT Cloud API

* Adding logic for aws authentication

* Extended prepare_request to pass auth parm

* Refactored auth and pagination

* Cleaning up linting and documentation

* Correcting import error

* Supporting API filtering and authenticator caching

* Documentation and defaulting to no_auth.

* Correcting tap property description.

* Bug fix - missing return

* Adding support for OAuth Headers

* Commenting out current SDK version

* Correcting parameter name

* Add support for expirying OAuth Tokens

* Pointing to Meltano SDK with OAuth Header support

* SDK update to support oauth headers

---------

Co-authored-by: Steve Clarke <[email protected]>
  • Loading branch information
s7clarke10 and Steve Clarke authored Jul 26, 2023
1 parent 69643af commit 05a7f50
Show file tree
Hide file tree
Showing 10 changed files with 1,570 additions and 105 deletions.
456 changes: 429 additions & 27 deletions README.md

Large diffs are not rendered by default.

27 changes: 27 additions & 0 deletions config.sample.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"pagination_request_style": "jsonpath_paginator",
"pagination_response_style": "hateoas_body",
"api_url": "https://myexample_fhir_api_url/base_folder",
"pagination_page_size": 100,
"next_page_token_path": "$.link[?(@.relation=='next')].url",
"headers": {
"X-API-KEY": "my_secret_hex_string_for_authentication"
},
"streams": [
{
"name": "my_sample_table_name",
"path": "/ExampleService",
"params": {
"services-provided-type": "MY_INITIAL_EXAMPLE_SERVICE"
},
"primary_keys": [
"id"
],
"records_path": "$.entry[*].resource",
"replication_key": "meta_lastUpdated",
"start_date": "2001-01-01T00:00:00.00+12:00",
"source_search_field": "last-updated",
"source_search_query": "gt$last_run_date"
}
]
}
48 changes: 47 additions & 1 deletion meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,18 @@ plugins:
kind: string
- name: pagination_response_style
kind: string
- name: use_request_body_not_params
kind: boolean
- name: pagination_page_size
kind: integer
- name: pagination_results_limit
kind: integer
- name: pagination_next_page_param
kind: string
- name: pagination_limit_per_page_param
kind: string
- name: pagination_total_limit_param
kind: string
- name: streams
kind: array
- name: path
Expand All @@ -38,6 +48,42 @@ plugins:
kind: array
- name: num_inference_records
kind: integer
- name: start_date
kind: date_iso8601
- name: source_search_field
kind: string
- name: source_search_query
kind: string
- name: auth_method
kind: string
- name: api_key
kind: object
- name: client_id
kind: password
- name: client_secret
kind: password
- name: username
kind: string
- name: password
kind: password
- name: bearer_token
kind: password
- name: refresh_token
kind: oauth
- name: grant_type
kind: string
- name: scope
kind: string
- name: access_token_url
kind: string
- name: redirect_uri
kind: string
- name: oauth_extras
kind: object
- name: oauth_expiration_secs
kind: integer
- name: aws_credentials
kind: object
config:
api_url: https://earthquake.usgs.gov/fdsnws
records_path: "$.features[*]"
Expand All @@ -57,4 +103,4 @@ plugins:
loaders:
- name: target-jsonl
variant: andyh1203
pip_url: target-jsonl
pip_url: target-jsonl
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ readme = "README.md"
[tool.poetry.dependencies]
python = "<3.12,>=3.7.1"
requests = "^2.25.1"
singer-sdk = "^0.26.0"
singer-sdk = "^0.30.0"
genson = "^1.2.2"
atomicwrites = "^1.4.0"
requests-aws4auth = "^1.2.3"
boto3 = "^1.26.156"

[tool.poetry.dev-dependencies]
pytest = "^6.2.5"
Expand Down
257 changes: 257 additions & 0 deletions tap_rest_api_msdk/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
"""REST authentication handling."""

import os
from typing import Any

import boto3
from requests_aws4auth import AWS4Auth
from singer_sdk.authenticators import (
APIKeyAuthenticator,
BasicAuthenticator,
BearerTokenAuthenticator,
OAuthAuthenticator
)

class AWSConnectClient:
"""A connection class to AWS Resources"""

def __init__(
self,
connection_config,
create_signed_credentials: bool = True
):
self.connection_config = connection_config


# Initialise the variables
self.create_signed_credentials = create_signed_credentials
self.aws_auth = None
self.region = None
self.credentials = None
self.aws_service = None
self.aws_session = None

# Establish a AWS Client
self.credentials = self._create_aws_client()

# Store AWS Signed Credentials
self._store_aws4auth_credentials()


def _create_aws_client(self, config=None):
if not config:
config = self.connection_config

# Get the required parameters from config file and/or environment variables
aws_profile = config.get('aws_profile') or os.environ.get('AWS_PROFILE')
aws_access_key_id = config.get('aws_access_key_id') or os.environ.get('AWS_ACCESS_KEY_ID')
aws_secret_access_key = config.get('aws_secret_access_key') or os.environ.get('AWS_SECRET_ACCESS_KEY')
aws_session_token = config.get('aws_session_token') or os.environ.get('AWS_SESSION_TOKEN')
aws_region = config.get('aws_region') or os.environ.get('AWS_REGION')
self.aws_service = config.get('aws_service',None) or os.environ.get('AWS_SERVICE')

if not config.get('create_signed_credentials',True):
self.create_signed_credentials = False

# AWS credentials based authentication
if aws_access_key_id and aws_secret_access_key:
self.aws_session = boto3.session.Session(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=aws_region,
aws_session_token=aws_session_token
)
# AWS Profile based authentication
elif aws_profile:
self.aws_session = boto3.session.Session(profile_name=aws_profile)
else:
self.aws_session = None

if self.aws_session:
self.region = self.aws_session.region_name
return self.aws_session.get_credentials()
else:
return None


def _store_aws4auth_credentials(self):
"""Stores the AWS Signed Credential for the available AWS credentials.
Returns:
The None.
"""
if self.create_signed_credentials and self.credentials:
self.aws_auth = AWS4Auth(self.credentials.access_key, self.credentials.secret_key, self.region, self.aws_service, aws_session=self.credentials.token)
else:
self.aws_auth = None


def get_awsauth(self):
"""Return the AWS Signed Connection for provided credentials.
Returns:
The awsauth object.
"""
return self.aws_auth

def get_aws_session_client(self):
"""Return the AWS Signed Connection for provided credentials.
Returns:
The an AWS Session Client.
"""
return self.aws_session.client(self.aws_service,
region_name=self.region)


class ConfigurableOAuthAuthenticator(OAuthAuthenticator):

@property
def oauth_request_body(self) -> dict:
"""Build up a list of OAuth2 parameters to use depending
on what configuration items have been set and the type of OAuth
flow set by the grant_type.
"""

# Test where the config is located in self
if self.config: # Tap Config
my_config = self.config
elif self._config: # Stream Config
my_config = self._config

client_id = my_config.get('client_id')
client_secret = my_config.get('client_secret')
username = my_config.get('username')
password = my_config.get('password')
refresh_token = my_config.get('refresh_token')
grant_type = my_config.get('grant_type')
scope = my_config.get('scope')
redirect_uri = my_config.get('redirect_uri')
oauth_extras = my_config.get('oauth_extras')

oauth_params = {}

# Test mandatory parameters based on grant_type
if grant_type:
oauth_params['grant_type'] = grant_type
else:
raise ValueError("Missing grant type for OAuth Token.")

if grant_type == 'client_credentials':
if not (client_id and client_secret):
raise ValueError(
"Missing either client_id or client_secret for 'client_credentials' grant_type."
)

if grant_type == 'password':
if not (username and password):
raise ValueError("Missing either username or password for 'password' grant_type.")

if grant_type == 'refresh_token':
if not refresh_token:
raise ValueError("Missing either refresh_token for 'refresh_token' grant_type.")

# Add parameters if they are set
if scope:
oauth_params['scope'] = scope
if client_id:
oauth_params['client_id'] = client_id
if client_secret:
oauth_params['client_secret'] = client_secret
if username:
oauth_params['username'] = username
if password:
oauth_params['password'] = password
if refresh_token:
oauth_params['refresh_token'] = refresh_token
if redirect_uri:
oauth_params['redirect_uri'] = redirect_uri
if oauth_extras:
for k, v in oauth_extras.items():
oauth_params[k] = v

return oauth_params


def select_authenticator(self) -> Any:
"""Calls an appropriate SDK Authentication method based on the the set auth_method.
If an auth_method is not provided, the tap will call the API using any settings from
the headers and params config.
Note: Each auth method requires certain configuration to be present see README.md
for each auth methods configuration requirements.
Raises:
ValueError: if the auth_method is unknown.
Returns:
A SDK Authenticator or None if no auth_method supplied.
"""

# Test where the config is located in self
if self.config: # Tap Config
my_config = self.config
elif self._config: # Stream Config
my_config = self._config

auth_method = my_config.get('auth_method', "")
api_keys = my_config.get('api_keys', '')
self.http_auth = None

# Set http headers if headers are supplied
# Some OAUTH2 API's require headers to be supplied
# In the OAUTH request.
auth_headers = my_config.get('headers',None)

# Using API Key Authenticator, keys are extracted from api_keys dict
if auth_method == "api_key":
if api_keys:
for k, v in api_keys.items():
key = k
value = v
return APIKeyAuthenticator(
stream=self,
key=key,
value=value
)
# Using Basic Authenticator
elif auth_method == "basic":
return BasicAuthenticator(
stream=self,
username=my_config.get('username', ''),
password=my_config.get('password', '')
)
# Using OAuth Authenticator
elif auth_method == "oauth":
return ConfigurableOAuthAuthenticator(
stream=self,
auth_endpoint=my_config.get('access_token_url', ''),
oauth_scopes=my_config.get('scope', ''),
default_expiration=my_config.get('oauth_expiration_secs', ''),
oauth_headers=auth_headers,
)
# Using Bearer Token Authenticator
elif auth_method == "bearer_token":
return BearerTokenAuthenticator(
stream=self,
token=my_config.get('bearer_token', ''),
)
# Using AWS Authenticator
elif auth_method == "aws":

# Establish an AWS Connection Client and returned Signed Credentials
self.aws_connection = AWSConnectClient(connection_config=my_config.get("aws_credentials",None))

if self.aws_connection.aws_auth:
self.http_auth = self.aws_connection.aws_auth
else:
self.http_auth = None

return self.http_auth
elif auth_method != "no_auth":
self.logger.error(f"Unknown authentication method {auth_method}. Use api_key, basic, oauth, bearer_token, or aws.")
raise ValueError(
f"Unknown authentication method {auth_method}. Use api_key, basic, oauth, bearer_token, or aws."
)
Loading

0 comments on commit 05a7f50

Please sign in to comment.