diff --git a/README.md b/README.md index f637bde..946731d 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,5 @@ # tap-rest-api-msdk +![singer_rest_api_tap](https://user-images.githubusercontent.com/84364906/220881634-c0d0145a-ab85-44e9-91b6-e8d365da25f3.png) `tap-rest-api-msdk` is a Singer tap for generic rest-apis. The novelty of this particular tap is that it will auto-discover the stream's schema. @@ -7,9 +8,14 @@ This is particularly useful if you have a stream with a very large and complex s a stream that outputs records with varying schemas for each record. Can also be used for simpler more reliable streams. -Please note that authentication capabilities have not yet been developed for this tap, -unless you are able to pass the authentication through the header. -If you are interested in contributing this, please fork and make a pull request. +There are many forms of Authentication supported by this tap. By default for legacy support, you can pass Authentication via headers. If you want to use the built in support for Authentication, this tap supports +- Basic Authentication +- API Key +- Bearer Token +- OAuth +- AWS + +Please note that OAuthJWTAuthentication has not been developed. If you are interested in contributing this, please fork and make a pull request. Built with the Meltano [SDK](https://gitlab.com/meltano/sdk) for Singer Taps. @@ -31,23 +37,81 @@ plugins: - discover settings: - name: api_url + kind: string + - name: next_page_token_path + kind: string + - name: pagination_request_style + kind: string + - name: pagination_response_style + kind: string + - name: use_request_body_not_params + kind: boolean + - name: pagination_page_size + kind: integer + - name: pagination_results_limit + kind: integer + - name: pagination_next_page_param + kind: string + - name: pagination_limit_per_page_param + kind: string + - name: pagination_total_limit_param + kind: string + - name: streams + kind: array - name: name + kind: string - name: path + kind: string - name: params + kind: object - name: headers + kind: object - name: records_path - - name: next_page_token_path - - name: pagination_request_style - - name: pagination_response_style - - name: pagination_page_size + kind: string - name: primary_keys + kind: array - name: replication_key + kind: string - name: except_keys + kind: array - name: num_inference_records - - name: pagination_request_style - - name: pagination_response_style - - name: pagination_page_size - - name: next_page_token_path + kind: integer + - name: start_date + kind: date_iso8601 + - name: source_search_field + kind: string + - name: source_search_query + kind: string + - name: auth_method + kind: string + - name: api_key + kind: object + - name: client_id + kind: password + - name: client_secret + kind: password + - name: username + kind: string + - name: password + kind: password + - name: bearer_token + kind: password + - name: refresh_token + kind: oauth + - name: grant_type + kind: string + - name: scope + kind: string + - name: access_token_url + kind: string + - name: redirect_uri + kind: string + - name: oauth_extras + kind: object + - name: oauth_expiration_secs + kind: integer + - name: aws_credentials + kind: object ``` ```bash @@ -71,10 +135,15 @@ Parameters that appear at the stream-level will overwrite their top-level counterparts except where noted in the stream-level params. Otherwise, the values provided at the top-level will be the default values for each stream.: - `api_url`: required: the base url/endpoint for the desired api. -- `pagination_request_style`: optional: style for requesting pagination, defaults to `default`, see Pagination below. -- `pagination_response_style`: optional: style of pagination results, defaults to `default`, see Pagination below. +- `pagination_request_style`: optional: style for requesting pagination, defaults to `default` which is the `jsonpath_paginator`, see Pagination below. +- `pagination_response_style`: optional: style of pagination results, defaults to `default` which is the `page` style response, see Pagination below. +- `use_request_body_not_params`: optional: sends the request parameters in the request body. This is normally not required, a few API's like OpenSearch require this. Defaults to `False`" - `pagination_page_size`: optional: limit for size of page, defaults to None. -- `next_page_token_path`: optional: a jsonpath string representing the path to the "next page" token. Defaults to `$.next_page`. +- `pagination_results_limit`: optional: limits the max number of records. Note: Will cause an exception if the limit is hit (except for the `restapi_header_link_paginator`). This should be used for development purposes to restrict the total number of records returned by the API. Defaults to None. +- `pagination_next_page_param`: optional: The name of the param that indicates the page/offset. Defaults to None. +- `pagination_limit_per_page_param`: optional: The name of the param that indicates the limit/per_page. Defaults to None. +- `pagination_total_limit_param`: optional: The name of the param that indicates the total limit e.g. total, count. Defaults to total +- `next_page_token_path`: optional: a jsonpath string representing the path to the "next page" token. Defaults to `'$.next_page'` for the `jsonpath_paginator` paginator only otherwise None. - `streams`: required: a list of objects that contain the configuration of each stream. See stream-level params below. - `path`: optional: see stream-level params below. - `params`: optional: see stream-level params below. @@ -83,7 +152,25 @@ provided at the top-level will be the default values for each stream.: - `primary_keys`: optional: see stream-level params below. - `replication_key`: optional: see stream-level params below. - `except_keys`: optional: see stream-level params below. -- `num_inference_keys`: optional: see stream-level params below. +- `num_inference_keys`: optional: see stream-level params below. +- `start_date`: optional: see stream-level params below. +- `source_search_field`: optional: see stream-level params below. +- `source_search_query`: optional: see stream-level params below. +- `auth_method`: optional: see authentication params below. +- `api_key`: optional: see authentication params below. +- `client_id`: optional: see authentication params below. +- `client_secret`: optional: see authentication params below. +- `username`: optional: see authentication params below. +- `password`: optional: see authentication params below. +- `bearer_token`: optional: see authentication params below. +- `refresh_token`: optional: see authentication params below. +- `grant_type`: optional: see authentication params below. +- `scope`: optional: see authentication params below. +- `access_token_url`: optional: see authentication params below. +- `redirect_uri`: optional: see authentication params below. +- `oauth_extras`: optional: see authentication params below. +- `oauth_expiration_secs`: optional: see authentication params below. +- `aws_credentials`: optional: see authentication params below. #### Stream level config options. Parameters that appear at the stream-level @@ -104,31 +191,129 @@ will overwrite their top-level counterparts except where noted below: turned into a json string and processed in that format. This is also automatically done for any lists within the records; therefore, records are not duplicated for each item in lists. - `num_inference_keys`: optional: number of records used to infer the stream's schema. Defaults to 50. -- `scheam`: optional: A valid Singer schema or a path-like string that provides +- `schema`: optional: A valid Singer schema or a path-like string that provides the path to a `.json` file that contains a valid Singer schema. If provided, the schema will not be inferred from the results of an api call. +- `start_date`: optional: used by the the **offset**, **page**, and **hateoas_body** response styles. This is an initial starting date for an incremental replication if there is no + existing state provided for an incremental replication. Example format 2022-06-10:23:10:10+1200. +- `source_search_field`: optional: used by the **offset**, **page**, and **hateoas_body** response style. This is a search/query parameter used by the API for an incremental replication. + + The difference between the `replication_key` and the `source_search_field` is the search field used in request parameters whereas the replication_key is the name of the field in the API reponse. Example if the source_search_field = **last-updated** the generated schema from the api discovery + might be **meta_lastUpdated**. The replication_key is set to meta_lastUpdated, and the search_parameter to last-updated. Note: Please set the `replication_key`, `start_date`, `source_search_field`, and `source_search_query` parameters all together. +- `source_search_query`: optional: used by the **offset**, **page**, and **hateoas_body** response style. This is a query template to be issued against the API. A simple query template example for FHIR API's is **gt$last_run_date**. + + A more complex example against an Opensearch API, **{\\"bool\\": {\\"filter\\": [{\\"range\\": { \\"meta.lastUpdated\\": { \\"gt\\": \\"$last_run_date\\" }}}] }}**. Note: Any required double quotes in the query template must be escaped. + + At run-time, the tap will dynamically change the value **$last_run_date** with either the defined `start_date` parameter or the last bookmark / state value. + Example: source_search_field=**last-updated**, the + source_search_query = **gt$last_run_date**, and the current replication state = 2022-08-10:23:10:10+1200. At run time this creates a request parameter **last-updated=gt2022-06-10:23:10:10+1200**. + +#### Top-Level Authentication config options. +- `auth_method`: optional: The method of authentication used by the API. Supported options + include: + - **oauth**: for OAuth2 authentication + - **basic**: Basic Header authentication - base64-encoded username + password config items + - **api_key**: for API Keys in the header e.g. X-API-KEY. + - **bearer_token**: for Bearer token authentication. + - **aws**: for AWS authentication. Works with the `aws_credentials` parameter. + - Defaults to no_auth which will take authentication parameters passed via the headers config. +- `api_keys`: optional: A dictionary of API Key/Value pairs used by the api_key auth method + Example: { "X-API-KEY": "my secret value"}. +- `client_id`: optional: Used for the OAuth2 authentication method. The public application ID + that's assigned for Authentication. The **client_id** should accompany a **client_secret**. +- `client_secret`: optional: Used for the OAuth2 authentication method. The client_secret is a + secret known only to the application and the authorization server. It is essential the + application's own password. +- `username`: optional: Used for a number of authentication methods that use a user + password combination for authentication. +- `password`: optional: Used for a number of authentication methods that use a user password + combination for authentication. +- `bearer_token`: optional: Used for the Bearer Authentication method, which uses a token as part + of the authorization header for authentication. +- `refresh_token`: optional: An OAuth2 Refresh Token is a string that the OAuth2 client can use to + get a new access token without the user's interaction. +- `grant_type`: optional: Used for the OAuth2 authentication method. The grant_type is required + to describe the OAuth2 flow. Flows support by this tap include **client_credentials**, **refresh_token**, **password**. +- `scope`: optional: Used for the OAuth2 authentication method. The scope is optional, it is a + mechanism to limit the amount of access that is granted to an access token. One or more scopes + can be provided delimited by a space. +- `access_token_url`: optional: Used for the OAuth2 authentication method. This is the end-point + for the authentication server used to exchange the authorization codes for a access token. +- `redirect_uri`: optional: Used for the OAuth2 authentication method. This optional as the + redirect_uri may be part of the token returned by the authentication server. If a redirect_uri + is provided, it determines where the API server redirects the user after the user completes the + authorization flow. +- `oauth_extras`: optional: A object of Key/Value pairs for additional oauth config parameters + which may be required by the authorization server. Example: { "resource": "https://analysis.windows.net/powerbi/api" }. +- `oauth_expiration_secs`: optional: Used for OAuth2 authentication method. This optional setting + is a timer for the expiration of a token in seconds. If not set the OAuth will use the default + expiration set in the token by the authorization server. +- `aws_credentials`: optional: A object of Key/Value pairs to support AWS authentication when using the AWS authenticator. While the tap can use AWS [boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html) environment variables and aws_profiles instead of supplying the keys and region, the [AWS service code](https://docs.aws.amazon.com/general/latest/gr/rande.html) needs to be specified e.g. `es` for OpenSearch / Elastic Search. By default the requirement to use `use_signed_credentials` is set to true. Config example: + ```json + { "aws_access_key_id": "my_aws_key_id", + "aws_secret_access_key": "my_aws_secret_access_key", + "aws_region": "us-east-1", + "aws_service": "es", + "use_signed_credentials": true} + ``` + +#### Complex Authentication + +The previous section showed out of the box methods for a single factor of authentication e.g. x-api-key, basic or oauth. If the API requires multiple forms of authentication, you may need to pass some of the authentication methods via the headers to be be combined with the main auth_method. + +Example: +An API may use OAuth2 for authentication but also requires an X-API-KEY to be supplied as well. In this situation pass the X-API-KEY as part of the `headers` config, and the rest of the config should be set for OAuth e.g. + +- headers = '{"x-api-key": "my_secret_api_key"}' +- auth_method = "oauth" +- grant_type = "client_credentials" +- access_token_url = "https://auth.example.server/oauth2/token" +- client_id = "my_example_client_id" +- client_secret = "my_example_client_secret" + +Some servers may require additional information like a `Request-Context` which is usually Base64 encoded. If this is the case it should be included in the `headers` config as well. + +Example: + +- headers = '{"x-api-key": "my_secret_api_key", "Request-Context": "my_example_Base64_encoded_json_object"}' ## Pagination -Pagination is a complex topic as there is no real single standard, and many different implementations. Unless options are provided, both the request and results stype default to the `default`, which is the pagination style originally implemented. +API Pagination is a complex topic as there is no real single standard, and many different implementations. Unless options are provided, both the request and results style type default to the `default`, which is the pagination style originally implemented. Where possible, this tap utilises the Meltano SDK paginators https://sdk.meltano.com/en/latest/reference.html#pagination . ### Default Request Style -The default request style for pagination is described below: -- Use next_page_token_path if provided to extract the token from response if found; otherwise -- use X-Next-Page header from response +The default request style for pagination is using a `JSONPath Paginator` to locate the next page token. ### Default Response Style The default response style for pagination is described below: - If there is a token, add that as a `page` URL parameter. -### Additional Request Styles -There are additional request styles supported as follows. -- `style1` - This style uses URL parameters named offset and limit +### Additional Request / Paginator Styles +There are additional request styles supported as follows for pagination. +- `jsonpath_paginator` or `default` - This style obtains the token for the next page from a specific location in the response body via JSONPath notation. In many situations the `jsonpath_paginator` is a more appropriate paginator to the `hateoas_paginator`. + - `next_page_token_path` - The jsonpath to next page token. Example: `"$['@odata.nextLink']"`, this locates the token returned via the Microsoft Graph API. Default `'$.next_page'` for the `jsonpath_paginator` paginator only otherwise None. +- `offset_paginator` or `style1` - This style uses URL parameters named offset and limit - `offset` is calculated from the previous response, or not set if there is no previous response - - `limit` is set to the `pagination_page_size` value, if specified, or not set + - `pagination_page_size` - Sets a limit to number of records per page / response. Default `25` records. + - `pagination_limit_per_page_param` - the name of the API parameter to limit number of records per page. Default parameter name `limit`. + - `pagination_total_limit_param` - The name of the param that indicates the total limit e.g. total, count. Defaults to total + - `next_page_token_path` - Used to locate an appropriate link in the response. Default None - but looks in the `pagination` section of the JSON response by default. Example, jsonpath to get the offset from the NOAA API `'$.metadata.resultset'`. +- `simple_header_paginator` - This style uses links in the Header Response to locate the next page. Example the `x-next-page` link used by the Gitlab API. +- `header_link_paginator` - This style uses the default header link paginator from the Meltano SDK. +- `restapi_header_link_paginator` - This style is a variant on the header_link_paginator. It supports the ability to read from GitHub API. + - `pagination_page_size` - Sets a limit to number of records per page / response. Default `25` records. + - `pagination_limit_per_page_param` - the name of the API parameter to limit number of records per page. Default parameter name `per_page`. + - `pagination_results_limit` - Restricts the total number of records returned from the API. Default None i.e. no limit. +- `hateoas_paginator` - This style parses the next_token response for the parameters to pass. It is used by API's utilising the HATEOAS Rest style [HATEOAS](https://en.wikipedia.org/wiki/HATEOAS), including [FHIR API's](https://hl7.org/fhir/http.html). + - `pagination_page_size` - Sets a limit to number of records per page / response. Default None. + - `pagination_limit_per_page_param` - the name of the API parameter to limit number of records per page e.g. `_count` for [FHIR API's](https://hl7.org/fhir/http.html). Default None. +- `single_page_paginator` - A paginator that does works with single-page endpoints. +- `page_number_paginator` - Paginator class for APIs that use page number. Looks at the response link to determine more pages. + - `next_page_token_path` - Use to locate an appropriate link in the response. Default `"hasMore"`. ### Additional Response Styles There are additional response styles supported as follows. -- `style1` - This style retrieves pagination information from the `pagination` top-level element in the response. Expected format is as follows: +- `default` or `page` - This style uses page style offsets params to identify the next page. +- `offset` or `style1` - This style retrieves pagination information by default from the `pagination` top-level element in the response. Expected format is as follows: ```json "pagination": { "total": 136, @@ -136,10 +321,221 @@ There are additional response styles supported as follows. "offset": 2 } ``` - The next page token, which in this case is really the next starting record number, is calculated by the limit, current offset, or None is returned to indicate no more data. For this style, the response style _must_ include the limit in the response, even if none is specified in the request, as well as total and offset to calculate the next token value. + The next page token, which in this case is really the next starting record number, is calculated by the limit, current offset, or None is returned to indicate no more data. For this style, the response style _must_ include the limit in the response, even if none is specified in the request, as well as ( `total` or `count` ) and offset to calculate the next token value. + + It is expected that this API Response Style will be used with request style of `offset_paginator` or `style1`. + - The `next_page_token_jsonpath` can be used to provide a JSONPath location to the pagination location e.g. `'$.metadata.resultset'`. Default `pagination` from the tap-level element in the response. +- `header_link` - This style parses the next page link in the Header Response. It is expected that this response will be used with an appropriate request style e.g. `restapi_header_link_paginator`. + - `pagination_page_size` - Sets a limit to number of records per page / response. Default `25` records. + - `pagination_limit_per_page_param` - the name of the API parameter to limit number of records per page. Default parameter name `per_page`. + - `pagination_results_limit` - Restricts the total number of records returned from the API. Default None i.e. no limit. +- `hateoas_body` - This style requires a well crafted `next_page_token_path` configuration + parameter to retrieve the request parameters from the GET request response for a subsequent request. + +### JSON Path for extracting tokens + The `next_page_token_path` and `records_path` use JSONPath to locate sections within the request reponse. + + The following example extracts the URL for the next pagination page. + ```json + "next_page_token_path": "$.link[?(@.relation=='next')].url." + ``` + + The following example demonstrates the power of JSONPath extensions by further splitting the URL and extracting just the parameters. Note: This is not required for FHIR API's but is provided for illustration of added functionality for complex use cases. + ```json + "next_page_token_path": "$.link[?(@.relation=='next')].url.`split(?, 1, 1)`" + ``` + The [JSONPath Evaluator](https://jsonpath.com/) website is useful to test the correct json path expression to use. + + Example json response from a FHIR API. + + + ```json + { + "resourceType": "Bundle", + "id": "44f2zf06-g53c-4218-a3ef-08bb6c2fde4a", + "meta": { + "lastUpdated": "2022-06-28T18:25:01.165+12:00" + }, + "type": "searchset", + "total": 63, + "link": [ + { + "relation": "self", + "url": "https://myexample_fhir_api_url/base_folder/ExampleService?_count=10&_getpageoffset=10&services-provided-type=MY_INITIAL_EXAMPLE_SERVICE" + }, + { + "relation": "next", + "url": "https://myexample_fhir_api_url/base_folder?_getpages=44f2zf06-g53c-4218-a3ef-08bb6c2fde4a&_getpagesoffset=10&_count=10&_pretty=true&_bundletype=searchset" + } + ], + "entry": [ + { + "fullUrl": "https://myexample_fhir_api_url/base_folder/ExampleService/example-service-123456", + "resource": { + "resourceType": "ExampleService", + "id": "example-service-123456" + } + } + ] + } + ``` + + Note: If you wish to extract the body from example GET request response above the following configuration parameter `records_path` will return the actual json content. + ```json + "records_path": "$.entry[*].resource" + ``` + +## Example settings for different API's + +This section provides examples of settings for accessing different API's. The tap configuration examples are provide in the form of environment variables. You could easily provide a configuration file [config.json](config.sample.json) instead of environment variables. + +Where config values have with `` replace the text with your Authentication and API config. + +### Microsoft Graph API v1.0 + +This example uses the `jsonpath paginator`. In this example, it requires a Microsoft Azure AD admin to register an APP to obtain an OAuth Token to perform an OAuth flow with the Microsoft Graph API. The details below may be different based on your setup, adjust accordingly. + +Result: Two streamed datasets, one `whoami` a simple json response about yourself, two a sharepoint list `my_sharepoint_list`. + +``` +# Access MSOFFICE objects via the GraphAPI +export TAP_REST_API_MSDK_API_URL=https://graph.microsoft.com +export TAP_REST_API_MSDK_PAGINATION_REQUEST_STYLE="jsonpath_paginator" +export TAP_REST_API_MSDK_PAGINATION_RESPONSE_STYLE="hateoas_body" +export TAP_REST_API_MSDK_NEXT_PAGE_TOKEN_PATH="$['@odata.nextLink']" +export TAP_REST_API_MSDK_START_DATE="2001-01-01T00:00:00.00+12:00" +export TAP_REST_API_MSDK_AUTH_METHOD="oauth" +export TAP_REST_API_MSDK_USERNAME="" +export TAP_REST_API_MSDK_PASSWORD="" +export TAP_REST_API_MSDK_GRANT_TYPE="password" +export TAP_REST_API_MSDK_ACCESS_TOKEN_URL="https://login.microsoftonline.com//oauth2/v2.0/token" +export TAP_REST_API_MSDK_CLIENT_ID="" +export TAP_REST_API_MSDK_CLIENT_SECRET="" +export TAP_REST_API_MSDK_SCOPE="" +export TAP_REST_API_MSDK_STREAMS='[{"name": "whoami", "path": "/v1.0/me", "primary_keys": ["id"]},{"name": "my_sharepoint_list", "path": "/v1.0/sites//Lists//items/?expand=columns,items(expand=fields)", "primary_keys": ["id"], "records_path": "$.value[*].fields"}]' +``` + +### Gitlab API + +This example uses the `simple header paginator` and returns 50 records from the Gitlab API for Projects. Note: There is an exception raised due to the 50 record limit - this is an example hence the limit. + +``` +# Access Gitlab projects via the GitLab API +export TAP_REST_API_MSDK_API_URL=https://gitlab.com/api/v4/projects +export TAP_REST_API_MSDK_PAGINATION_REQUEST_STYLE="simple_header_paginator" +export TAP_REST_API_MSDK_PAGINATION_RESULTS_LIMIT=50 +export TAP_REST_API_MSDK_STREAMS='[{"name": "gitlab_projects", "primary_keys": ["id"]}]' +``` + +You could authenticate to Gitlab using a Personal Access Token (PAT) by adding this config. +``` +export TAP_REST_API_MSDK_HEADERS='{"Authorization": "Bearer "}' +``` + +### GitHub API + +This example uses the `headerlink paginator` and returns approximately 250 records from the GitHub API for Projects. + +``` +# Access GitHub users via the GitHub API +export TAP_REST_API_MSDK_API_URL=https://api.github.com/users +export TAP_REST_API_MSDK_PAGINATION_REQUEST_STYLE="restapi_header_link_paginator" +export TAP_REST_API_MSDK_PAGINATION_RESPONSE_STYLE="header_link" +export TAP_REST_API_MSDK_PAGINATION_PAGE_SIZE=50 +export TAP_REST_API_MSDK_PAGINATION_RESULTS_LIMIT=250 +export TAP_REST_API_MSDK_STREAMS='[{"name": "github_users", "primary_keys": ["id"]}]' +``` + +You could authenticate to GitHub using a Personal Access Token (PAT) by adding this config. +``` +export TAP_REST_API_MSDK_HEADERS='{"Authorization": "Bearer "}' +``` + +### FHIR API + +This example uses the `jsonpath paginator` to access a FHIR API. It uses the `hateoas response style` to process the next tokens. + +This particular configuration will do an intial load of all data for a given resource defined in the `streams` config from the 01-Jan-2001. It will in subsequent runs incrementally pull changed data based on the lastUpdated timestamp by searching for records greater than the highest last updated timestamp. In this example the PlanDefinition FHIR resource is being extracted. + +You will need appropriate OAuth Token details provided by the Administrator of the API. + +``` +export TAP_REST_API_MSDK_API_URL= +export TAP_REST_API_MSDK_PAGINATION_REQUEST_STYLE="jsonpath_paginator" +export TAP_REST_API_MSDK_PAGINATION_RESPONSE_STYLE="hateoas_body" +export TAP_REST_API_MSDK_NEXT_PAGE_TOKEN_PATH="$.link[?(@.relation=='next')].url" +export TAP_REST_API_MSDK_START_DATE="2001-01-01T00:00:00.00+12:00" +export TAP_REST_API_MSDK_AUTH_METHOD="oauth" +export TAP_REST_API_MSDK_GRANT_TYPE="client_credentials" +export TAP_REST_API_MSDK_ACCESS_TOKEN_URL="https://login.microsoftonline.com//oauth2/v2.0/token" +export TAP_REST_API_MSDK_CLIENT_ID="" +export TAP_REST_API_MSDK_CLIENT_SECRET="" +export TAP_REST_API_MSDK_SCOPE="" +export TAP_REST_API_MSDK_STREAMS='[{"name":"plan_definition","path":"/PlanDefinition","primary_keys":["id"],"records_path":"$.entry[*].resource","replication_key":"meta_lastUpdated","search_parameter":"_lastUpdated","source_search_query": "gt$last_run_date"}]' +``` + +### NOAA API Example + +This example uses the `offset paginator` to access the NOAA API to return location categories. In this example the offset tokens are not in the default location of `pagination` so the `next_page_token_path` is set to the NOAA API offset location in the json response i.e. `'$.metadata.resultset'`. This example also sets a limit parameter in the `streams` to only return 5 records at a time to prove the pagination is working. + +``` +# Access Locations Categories objects via the NOAA API +export TAP_REST_API_MSDK_API_URL=https://www.ncei.noaa.gov/cdo-web/api/v2 +export TAP_REST_API_MSDK_HEADERS='{"token": ""}' +export TAP_REST_API_MSDK_NEXT_PAGE_TOKEN_PATH='$.metadata.resultset' +export TAP_REST_API_MSDK_PAGINATION_REQUEST_STYLE="offset_paginator" +export TAP_REST_API_MSDK_PAGINATION_RESPONSE_STYLE="style1" +export TAP_REST_API_MSDK_PAGINATION_TOTAL_LIMIT_PARAM="count" +export TAP_REST_API_MSDK_STREAMS='[{"name": "locationcategories", "params": {"limit": "5"}, "path": "/locationcategories", "primary_keys": ["id"], "records_path": "$.results[*]"}]' +``` + +### dbt Cloud API Example + +This example uses the `offset paginator` to access the dbt Cloud API to return location categories. In this example the offset tokens are not in the default location of `pagination` so the `next_page_token_path` is set to the dbt API offset location in the json response i.e. `'$.extra'`. This example also sets the streams record_path to `"$.data[*]"` which is the location of the data. + +``` +# Access Locations Categories objects via the dbt Cloud API +# Access Gitlab objects via the dbt Cloud API +export TAP_REST_API_MSDK_API_URL=https://.getdbt.com/api/v2/accounts/ +export TAP_REST_API_MSDK_HEADERS='{"Authorization": "Bearer "}' +export TAP_REST_API_MSDK_NEXT_PAGE_TOKEN_PATH='$.extra' +export TAP_REST_API_MSDK_PAGINATION_REQUEST_STYLE="offset_paginator" +export TAP_REST_API_MSDK_PAGINATION_RESPONSE_STYLE="style1" +export TAP_REST_API_MSDK_PAGINATION_TOTAL_LIMIT_PARAM="total_count" +export TAP_REST_API_MSDK_STREAMS='[{"name": "jobs", "path": "/jobs", "primary_keys": ["id"], "records_path": "$.data[*]"}]' +``` + +### AWS OpenSearch API Example + +This complex example uses the [AWS4Auth](https://github.com/tedder/requests-aws4auth) authenticator to provide signed AWS credentials in the requests to the AWS OpenSearch API endpoint. The `auth_method` is set to 'aws', and the required `aws_credentials` are provided. + +Note: The AWS authentication does support [boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html) environment variables and aws_profiles. + +For pagination, the next page token is located in the **last** returned record. Using JSON Path the appropriate token response can be extracted via '$.hits.hits[-1:].sort' (-1 selects the last record in the array) and this is set in the `next_page_token_path` config setting. The API parameter used to select the next page is 'search_after' and this is set in the `pagination_next_page_param` config setting. To enable pagination in OpenSearch an API parameter named 'sort' must be set to a unique key e.g. '_id'. The number of records to returned per page is controlled via an API parameter called 'size'. + +The OpenSearch API has a complex incremental replication query which must be sent in the request body. This is enabled by setting the `use_request_body_not_params` to True. + +Finally set the replication suite of config settings ( `start_date`, `replication_key`, `source_search_field`, and `source_search_query` ) to enable incremental replication of data since the last run. For OpenSearch there is a complex query template which must be set in the streams `source_search_query` config setting. + +Unlike most API requests, the API query is against an API parameter named `query` rather than the name of the API field. For this reason the `source_search_field` is set to 'query' in the streams array. Additionally, the streams record_path to `"$.hits.hits[*]"` which is the location of the records in the requests response. + +``` +# Access AWS objects via the AWS Open/Elastic Search API +export TAP_REST_API_MSDK_API_URL="https://...amazonaws.com" +export TAP_REST_API_MSDK_AWS_CREDENTIALS='{"aws_access_key_id": "", "aws_secret_access_key": "removed aws secret access key>", "aws_region": "", "aws_service": "", "create_signed_credentials": true}' +export TAP_REST_API_MSDK_START_DATE="2001-01-01T00:00:00.00+12:00" +export TAP_REST_API_MSDK_PAGINATION_REQUEST_STYLE="jsonpath_paginator" +export TAP_REST_API_MSDK_PAGINATION_RESPONSE_STYLE="offset" +export TAP_REST_API_MSDK_USE_REQUEST_BODY_NOT_PARAMS=true +export TAP_REST_API_MSDK_NEXT_PAGE_TOKEN_PATH='$.hits.hits[-1:].sort' +export TAP_REST_API_MSDK_PAGINATION_NEXT_PAGE_PARAM="search_after" +export TAP_REST_API_MSDK_AUTH_METHOD='aws' +export TAP_REST_API_MSDK_STREAMS='[{"name": "careplan", "params": {"size": 100, "sort": "_id"}, "path": "/careplan/_search", "primary_keys": [], "records_path": "$.hits.hits[*]", "replication_key": "_source_meta_lastUpdated", "source_search_field": "query", "source_search_query": "{\"bool\": {\"filter\": [{\"range\": { \"meta.lastUpdated\": { \"gt\": \"$last_run_date\" }}}] }}"}]' +``` ## Usage + You can easily run `tap-rest-api-msdk` by itself or in a pipeline using [Meltano](www.meltano.com). ### Executing the Tap Directly @@ -150,6 +546,12 @@ tap-rest-api-msdk --help tap-rest-api-msdk --config CONFIG --discover > ./catalog.json ``` +or + +```bash +bash tap-rest-api-msdk --config=config.sample.json +``` + ## Developer Resources ### Initialize your Development Environment @@ -212,4 +614,4 @@ meltano elt tap-rest-api-msdk target-jsonl ### SDK Dev Guide See the [dev guide](https://sdk.meltano.com/en/latest/dev_guide.html) for more instructions on how to use the SDK to -develop your own taps and targets. +develop your own taps and targets. \ No newline at end of file diff --git a/config.sample.json b/config.sample.json new file mode 100644 index 0000000..085f499 --- /dev/null +++ b/config.sample.json @@ -0,0 +1,27 @@ +{ + "pagination_request_style": "jsonpath_paginator", + "pagination_response_style": "hateoas_body", + "api_url": "https://myexample_fhir_api_url/base_folder", + "pagination_page_size": 100, + "next_page_token_path": "$.link[?(@.relation=='next')].url", + "headers": { + "X-API-KEY": "my_secret_hex_string_for_authentication" + }, + "streams": [ + { + "name": "my_sample_table_name", + "path": "/ExampleService", + "params": { + "services-provided-type": "MY_INITIAL_EXAMPLE_SERVICE" + }, + "primary_keys": [ + "id" + ], + "records_path": "$.entry[*].resource", + "replication_key": "meta_lastUpdated", + "start_date": "2001-01-01T00:00:00.00+12:00", + "source_search_field": "last-updated", + "source_search_query": "gt$last_run_date" + } + ] +} \ No newline at end of file diff --git a/meltano.yml b/meltano.yml index 3d1018e..b4058f7 100644 --- a/meltano.yml +++ b/meltano.yml @@ -18,8 +18,18 @@ plugins: kind: string - name: pagination_response_style kind: string + - name: use_request_body_not_params + kind: boolean - name: pagination_page_size kind: integer + - name: pagination_results_limit + kind: integer + - name: pagination_next_page_param + kind: string + - name: pagination_limit_per_page_param + kind: string + - name: pagination_total_limit_param + kind: string - name: streams kind: array - name: path @@ -38,6 +48,42 @@ plugins: kind: array - name: num_inference_records kind: integer + - name: start_date + kind: date_iso8601 + - name: source_search_field + kind: string + - name: source_search_query + kind: string + - name: auth_method + kind: string + - name: api_key + kind: object + - name: client_id + kind: password + - name: client_secret + kind: password + - name: username + kind: string + - name: password + kind: password + - name: bearer_token + kind: password + - name: refresh_token + kind: oauth + - name: grant_type + kind: string + - name: scope + kind: string + - name: access_token_url + kind: string + - name: redirect_uri + kind: string + - name: oauth_extras + kind: object + - name: oauth_expiration_secs + kind: integer + - name: aws_credentials + kind: object config: api_url: https://earthquake.usgs.gov/fdsnws records_path: "$.features[*]" @@ -57,4 +103,4 @@ plugins: loaders: - name: target-jsonl variant: andyh1203 - pip_url: target-jsonl + pip_url: target-jsonl \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 4cf75cc..b4c34e4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,9 +20,11 @@ readme = "README.md" [tool.poetry.dependencies] python = "<3.12,>=3.7.1" requests = "^2.25.1" -singer-sdk = "^0.26.0" +singer-sdk = "^0.30.0" genson = "^1.2.2" atomicwrites = "^1.4.0" +requests-aws4auth = "^1.2.3" +boto3 = "^1.26.156" [tool.poetry.dev-dependencies] pytest = "^6.2.5" diff --git a/tap_rest_api_msdk/auth.py b/tap_rest_api_msdk/auth.py new file mode 100644 index 0000000..79661af --- /dev/null +++ b/tap_rest_api_msdk/auth.py @@ -0,0 +1,257 @@ +"""REST authentication handling.""" + +import os +from typing import Any + +import boto3 +from requests_aws4auth import AWS4Auth +from singer_sdk.authenticators import ( + APIKeyAuthenticator, + BasicAuthenticator, + BearerTokenAuthenticator, + OAuthAuthenticator +) + +class AWSConnectClient: + """A connection class to AWS Resources""" + + def __init__( + self, + connection_config, + create_signed_credentials: bool = True + ): + self.connection_config = connection_config + + + # Initialise the variables + self.create_signed_credentials = create_signed_credentials + self.aws_auth = None + self.region = None + self.credentials = None + self.aws_service = None + self.aws_session = None + + # Establish a AWS Client + self.credentials = self._create_aws_client() + + # Store AWS Signed Credentials + self._store_aws4auth_credentials() + + + def _create_aws_client(self, config=None): + if not config: + config = self.connection_config + + # Get the required parameters from config file and/or environment variables + aws_profile = config.get('aws_profile') or os.environ.get('AWS_PROFILE') + aws_access_key_id = config.get('aws_access_key_id') or os.environ.get('AWS_ACCESS_KEY_ID') + aws_secret_access_key = config.get('aws_secret_access_key') or os.environ.get('AWS_SECRET_ACCESS_KEY') + aws_session_token = config.get('aws_session_token') or os.environ.get('AWS_SESSION_TOKEN') + aws_region = config.get('aws_region') or os.environ.get('AWS_REGION') + self.aws_service = config.get('aws_service',None) or os.environ.get('AWS_SERVICE') + + if not config.get('create_signed_credentials',True): + self.create_signed_credentials = False + + # AWS credentials based authentication + if aws_access_key_id and aws_secret_access_key: + self.aws_session = boto3.session.Session( + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + region_name=aws_region, + aws_session_token=aws_session_token + ) + # AWS Profile based authentication + elif aws_profile: + self.aws_session = boto3.session.Session(profile_name=aws_profile) + else: + self.aws_session = None + + if self.aws_session: + self.region = self.aws_session.region_name + return self.aws_session.get_credentials() + else: + return None + + + def _store_aws4auth_credentials(self): + """Stores the AWS Signed Credential for the available AWS credentials. + + Returns: + The None. + + """ + if self.create_signed_credentials and self.credentials: + self.aws_auth = AWS4Auth(self.credentials.access_key, self.credentials.secret_key, self.region, self.aws_service, aws_session=self.credentials.token) + else: + self.aws_auth = None + + + def get_awsauth(self): + """Return the AWS Signed Connection for provided credentials. + + Returns: + The awsauth object. + + """ + return self.aws_auth + + def get_aws_session_client(self): + """Return the AWS Signed Connection for provided credentials. + + Returns: + The an AWS Session Client. + + """ + return self.aws_session.client(self.aws_service, + region_name=self.region) + + +class ConfigurableOAuthAuthenticator(OAuthAuthenticator): + + @property + def oauth_request_body(self) -> dict: + """Build up a list of OAuth2 parameters to use depending + on what configuration items have been set and the type of OAuth + flow set by the grant_type. + """ + + # Test where the config is located in self + if self.config: # Tap Config + my_config = self.config + elif self._config: # Stream Config + my_config = self._config + + client_id = my_config.get('client_id') + client_secret = my_config.get('client_secret') + username = my_config.get('username') + password = my_config.get('password') + refresh_token = my_config.get('refresh_token') + grant_type = my_config.get('grant_type') + scope = my_config.get('scope') + redirect_uri = my_config.get('redirect_uri') + oauth_extras = my_config.get('oauth_extras') + + oauth_params = {} + + # Test mandatory parameters based on grant_type + if grant_type: + oauth_params['grant_type'] = grant_type + else: + raise ValueError("Missing grant type for OAuth Token.") + + if grant_type == 'client_credentials': + if not (client_id and client_secret): + raise ValueError( + "Missing either client_id or client_secret for 'client_credentials' grant_type." + ) + + if grant_type == 'password': + if not (username and password): + raise ValueError("Missing either username or password for 'password' grant_type.") + + if grant_type == 'refresh_token': + if not refresh_token: + raise ValueError("Missing either refresh_token for 'refresh_token' grant_type.") + + # Add parameters if they are set + if scope: + oauth_params['scope'] = scope + if client_id: + oauth_params['client_id'] = client_id + if client_secret: + oauth_params['client_secret'] = client_secret + if username: + oauth_params['username'] = username + if password: + oauth_params['password'] = password + if refresh_token: + oauth_params['refresh_token'] = refresh_token + if redirect_uri: + oauth_params['redirect_uri'] = redirect_uri + if oauth_extras: + for k, v in oauth_extras.items(): + oauth_params[k] = v + + return oauth_params + + +def select_authenticator(self) -> Any: + """Calls an appropriate SDK Authentication method based on the the set auth_method. + If an auth_method is not provided, the tap will call the API using any settings from + the headers and params config. + Note: Each auth method requires certain configuration to be present see README.md + for each auth methods configuration requirements. + + Raises: + ValueError: if the auth_method is unknown. + + Returns: + A SDK Authenticator or None if no auth_method supplied. + """ + + # Test where the config is located in self + if self.config: # Tap Config + my_config = self.config + elif self._config: # Stream Config + my_config = self._config + + auth_method = my_config.get('auth_method', "") + api_keys = my_config.get('api_keys', '') + self.http_auth = None + + # Set http headers if headers are supplied + # Some OAUTH2 API's require headers to be supplied + # In the OAUTH request. + auth_headers = my_config.get('headers',None) + + # Using API Key Authenticator, keys are extracted from api_keys dict + if auth_method == "api_key": + if api_keys: + for k, v in api_keys.items(): + key = k + value = v + return APIKeyAuthenticator( + stream=self, + key=key, + value=value + ) + # Using Basic Authenticator + elif auth_method == "basic": + return BasicAuthenticator( + stream=self, + username=my_config.get('username', ''), + password=my_config.get('password', '') + ) + # Using OAuth Authenticator + elif auth_method == "oauth": + return ConfigurableOAuthAuthenticator( + stream=self, + auth_endpoint=my_config.get('access_token_url', ''), + oauth_scopes=my_config.get('scope', ''), + default_expiration=my_config.get('oauth_expiration_secs', ''), + oauth_headers=auth_headers, + ) + # Using Bearer Token Authenticator + elif auth_method == "bearer_token": + return BearerTokenAuthenticator( + stream=self, + token=my_config.get('bearer_token', ''), + ) + # Using AWS Authenticator + elif auth_method == "aws": + + # Establish an AWS Connection Client and returned Signed Credentials + self.aws_connection = AWSConnectClient(connection_config=my_config.get("aws_credentials",None)) + + if self.aws_connection.aws_auth: + self.http_auth = self.aws_connection.aws_auth + else: + self.http_auth = None + + return self.http_auth + elif auth_method != "no_auth": + self.logger.error(f"Unknown authentication method {auth_method}. Use api_key, basic, oauth, bearer_token, or aws.") + raise ValueError( + f"Unknown authentication method {auth_method}. Use api_key, basic, oauth, bearer_token, or aws." + ) diff --git a/tap_rest_api_msdk/client.py b/tap_rest_api_msdk/client.py index 90ea41b..93ee056 100644 --- a/tap_rest_api_msdk/client.py +++ b/tap_rest_api_msdk/client.py @@ -3,7 +3,10 @@ from pathlib import Path from typing import Any +from memoization import cached +from singer_sdk.authenticators import APIAuthenticatorBase from singer_sdk.streams import RESTStream +from tap_rest_api_msdk.auth import select_authenticator SCHEMAS_DIR = Path(__file__).parent / Path("./schemas") @@ -11,6 +14,11 @@ class RestApiStream(RESTStream): """rest-api stream class.""" + # Intialise self.http_auth used by prepare_request + http_auth = None + # Cache the authenticator using a Smart Singleton pattern + _authenticator = None + @property def url_base(self) -> Any: """Return the API URL root, configurable via tap settings. @@ -20,3 +28,38 @@ def url_base(self) -> Any: """ return self.config["api_url"] + + + @property + def authenticator(self) -> Any: + """Calls an appropriate SDK Authentication method based on the the set auth_method + which is set via the config. + If an authenticator (auth_method) is not specified, REST-based taps will simply pass + `http_headers` as defined in the tap and stream classes. + + Note 1: Each auth method requires certain configuration to be present see README.md + for each auth methods configuration requirements. + + Note 2: Using Singleton Pattern on the autenticator for caching with a check + if an OAuth Token has expired and needs to be refreshed. + + Raises: + ValueError: if the auth_method is unknown. + + Returns: + A SDK Authenticator or APIAuthenticatorBase if no auth_method supplied. + """ + + auth_method = self.config.get("auth_method",None) + + if not self._authenticator: + self._authenticator = select_authenticator(self) + if not self._authenticator: + # No Auth Method, use default Authenticator + self._authenticator = APIAuthenticatorBase(stream=self) + elif auth_method == 'oauth': + if not self._authenticator.is_token_valid(): + # Obtain a new OAuth token as it has expired + self._authenticator = select_authenticator(self) + + return self._authenticator diff --git a/tap_rest_api_msdk/pagination.py b/tap_rest_api_msdk/pagination.py new file mode 100644 index 0000000..866ac8c --- /dev/null +++ b/tap_rest_api_msdk/pagination.py @@ -0,0 +1,181 @@ +"""REST API pagination handling.""" +from typing import Any, Optional, cast +from urllib.parse import urlparse, parse_qs + +import requests +from dateutil.parser import parse +from singer_sdk.helpers.jsonpath import extract_jsonpath +from singer_sdk.pagination import BaseOffsetPaginator, HeaderLinkPaginator, BasePageNumberPaginator +from tap_rest_api_msdk.utils import unnest_dict + +class RestAPIBasePageNumberPaginator(BasePageNumberPaginator): + def __init__( + self, + *args, + jsonpath: str = None, + **kwargs + ): + super().__init__(*args, **kwargs) + self._jsonpath = jsonpath + + def has_more(self, response: requests.Response): + """Return True if there are more pages to fetch. + + Args: + response: The most recent response object. + jsonpath: An optional jsonpath to where the tokens are located in + the response, defaults to `hasMore` in the response. + + Returns: + Whether there are more pages to fetch. + """ + + if self._jsonpath: + return next(extract_jsonpath(self._jsonpath, response.json()), None) + else: + return response.json().get("hasMore", None) + +class RestAPIOffsetPaginator(BaseOffsetPaginator): + def __init__( + self, + *args, + jsonpath: str = None, + pagination_total_limit_param: str, + **kwargs + ): + super().__init__(*args, **kwargs) + self.jsonpath = jsonpath + self.pagination_total_limit_param = pagination_total_limit_param + + def has_more(self, response: requests.Response): + """Return True if there are more pages to fetch. + + Args: + response: The most recent response object. + jsonpath: An optional jsonpath to where the tokens are located in + the response, defaults to pagination in the response. + + Returns: + Whether there are more pages to fetch. + """ + + if self.jsonpath: + pagination = next(extract_jsonpath(self.jsonpath, response.json()), None) + else: + pagination = response.json().get("pagination", None) + if pagination: + pagination = unnest_dict(pagination) + + if pagination and all(x in pagination for x in ["offset", "limit"]): + record_limit = pagination.get(self.pagination_total_limit_param,0) + records_read = pagination["offset"] + pagination["limit"] + if records_read <= record_limit: + return True + + return False + +class RestAPIHeaderLinkPaginator(HeaderLinkPaginator): + def __init__( + self, + *args, + pagination_page_size: int = 25, + pagination_results_limit: Optional[int] = None, + use_fake_since_parameter: Optional[bool] = False, + replication_key: Optional[str] = None, + **kwargs + ): + super().__init__(*args, **kwargs) + self.pagination_page_size = pagination_page_size + self.pagination_results_limit = pagination_results_limit + self.use_fake_since_parameter = use_fake_since_parameter + self.replication_key = replication_key + + def get_next_url( + self, response: requests.Response + ) -> Optional[Any]: + """Return next page parameter(s) for identifying the next page + or None if no more pages. + + Logic based on https://github.com/MeltanoLabs/tap-github + + Args: + response: The most recent response object. + pagination_page_size: A limit for records per page. Default=25 + pagination_results_limit: A limit to the number of pages returned + use_fake_since_parameter: A work around for GitHub. default=False + replication_key: Key for incremental processing + + Returns: + Page Parameters if there are more pages to fetch, else None. + """ + # Exit if the set Record Limit is reached. + if ( + self._page_count + and self.pagination_results_limit + and ( + cast(int, self._page_count) * self.pagination_page_size >= self.pagination_results_limit + ) + ): + return None + + # Leverage header links returned by the GitHub API. + if "next" not in response.links.keys(): + return None + + # Exit early if there is no URL in the next links + if not response.links.get("next",{}).get("url"): + return None + + resp_json = response.json() + if isinstance(resp_json, list): + results = resp_json + else: + results = resp_json.get("items") + + # Exit early if the response has no items. ? Maybe duplicative the "next" link check. + if not results: + return None + + # Unfortunately endpoints such as /starred, /stargazers, /events and /pulls do not support + # the "since" parameter out of the box. So we use a workaround here to exit early. + # For such streams, we sort by descending dates (most recent first), and paginate + # "back in time" until we reach records before our "fake_since" parameter. + if self.replication_key and self.use_fake_since_parameter: + request_parameters = parse_qs(str(urlparse(response.request.url).query)) + # parse_qs interprets "+" as a space, revert this to keep an aware datetime + try: + since = ( + request_parameters["fake_since"][0].replace(" ", "+") + if "fake_since" in request_parameters + else "" + ) + except IndexError: + return None + + direction = ( + request_parameters["direction"][0] + if "direction" in request_parameters + else None + ) + + # commit_timestamp is a constructed key which does not exist in the raw response + replication_date = ( + results[-1][self.replication_key] + if self.replication_key != "commit_timestamp" + else results[-1]["commit"]["committer"]["date"] + ) + # exit early if the replication_date is before our since parameter + if ( + since + and direction == "desc" + and (parse(replication_date) < parse(since)) + ): + return None + + # Use header links returned by the API to return the query parameters. + parsed_url = urlparse(response.links["next"]["url"]) + + if parsed_url.query: + return(parsed_url.query) + + return None diff --git a/tap_rest_api_msdk/streams.py b/tap_rest_api_msdk/streams.py index 32f9517..ce7f96a 100644 --- a/tap_rest_api_msdk/streams.py +++ b/tap_rest_api_msdk/streams.py @@ -1,11 +1,30 @@ """Stream type classes for tap-rest-api-msdk.""" +import json +from datetime import datetime from typing import Any, Dict, Iterable, Optional +from string import Template +import email.utils import requests from singer_sdk.helpers.jsonpath import extract_jsonpath +from singer_sdk.pagination import SinglePagePaginator, BaseHATEOASPaginator, JSONPathPaginator, HeaderLinkPaginator, SimpleHeaderPaginator +from urllib.parse import urlparse, parse_qsl, parse_qs from tap_rest_api_msdk.client import RestApiStream -from tap_rest_api_msdk.utils import flatten_json +from tap_rest_api_msdk.pagination import RestAPIHeaderLinkPaginator, RestAPIOffsetPaginator, RestAPIBasePageNumberPaginator +from tap_rest_api_msdk.utils import flatten_json, get_start_date + +# Remove commented section to show http_request for debugging +#import logging +#import http.client + +#http.client.HTTPConnection.debuglevel = 1 + +#logging.basicConfig() +#logging.getLogger().setLevel(logging.DEBUG) +#requests_log = logging.getLogger("requests.packages.urllib3") +#requests_log.setLevel(logging.DEBUG) +#requests_log.propagate = True class DynamicStream(RestApiStream): @@ -27,6 +46,14 @@ def __init__( pagination_request_style: str = "default", pagination_response_style: str = "default", pagination_page_size: Optional[int] = None, + pagination_results_limit: Optional[int] = None, + pagination_next_page_param: Optional[str] = None, + pagination_limit_per_page_param: Optional[str] = None, + pagination_total_limit_param: Optional[str] = None, + start_date: Optional[datetime] = None, + source_search_field: Optional[str] = None, + source_search_query: Optional[str] = None, + use_request_body_not_params: Optional[bool] = False, ) -> None: """Class initialization. @@ -45,6 +72,14 @@ def __init__( pagination_request_style: see tap.py pagination_response_style: see tap.py pagination_page_size: see tap.py + pagination_results_limit: see tap.py + pagination_next_page_param: see tap.py + pagination_limit_per_page_param: see tap.py + pagination_total_limit_param: see tap.py + start_date: see tap.py + source_search_field: see tap.py + source_search_query: see tap.py + use_request_body_not_params: see tap.py """ super().__init__(tap=tap, name=tap.name, schema=schema) @@ -60,18 +95,72 @@ def __init__( self.replication_key = replication_key self.except_keys = except_keys self.records_path = records_path - self.next_page_token_jsonpath = ( - next_page_token_path # Or override `get_next_page_token`. - ) - self.pagination_page_size = pagination_page_size - get_url_params_styles = {"style1": self._get_url_params_style1} - self.get_url_params = get_url_params_styles.get( # type: ignore - pagination_response_style, self._get_url_params_default - ) - get_next_page_token_styles = {"style1": self._get_next_page_token_style1} - self.get_next_page_token = get_next_page_token_styles.get( # type: ignore - pagination_response_style, self._get_next_page_token_default - ) + if next_page_token_path: + self.next_page_token_jsonpath = next_page_token_path + elif pagination_request_style == 'jsonpath_paginator' or pagination_request_style == 'default': + self.next_page_token_jsonpath = "$.next_page" # Set default for jsonpath_paginator + get_url_params_styles = {"style1": self._get_url_params_offset_style, + "offset": self._get_url_params_offset_style, + "page": self._get_url_params_page_style, + "header_link": self._get_url_params_header_link, + "hateoas_body": self._get_url_params_hateoas_body} + + # Selecting the appropriate method to send Parameters as part of the + # request. If use_request_body_not_params is set the parameters are sent + # in the request body instead of request parameters. The + # pagination_response_style config determines what style of parameter + # processing is invoked. + + self.use_request_body_not_params = use_request_body_not_params + if self.use_request_body_not_params: + self.prepare_request_payload = get_url_params_styles.get( # type: ignore + pagination_response_style, self._get_url_params_page_style + ) # Defaults to page_style url_params + else: + self.get_url_params = get_url_params_styles.get( # type: ignore + pagination_response_style, self._get_url_params_page_style + ) # Defaults to page_style url_params + + self.pagination_request_style = pagination_request_style + self.pagination_results_limit = pagination_results_limit + self.pagination_next_page_param = pagination_next_page_param + self.pagination_limit_per_page_param = pagination_limit_per_page_param + self.pagination_total_limit_param = pagination_total_limit_param + self.start_date = start_date + self.source_search_field = source_search_field + self.source_search_query = source_search_query + + # Setting Pagination Limits + if self.pagination_request_style == 'restapi_header_link_paginator': + if pagination_page_size: + self.pagination_page_size = pagination_page_size + else: + if self.pagination_limit_per_page_param: + page_limit_param = self.pagination_limit_per_page_param + else: + page_limit_param = "per_page" + self.pagination_page_size = int(self.params.get(page_limit_param, 25)) # Default to requesting 25 records + elif self.pagination_request_style == 'style1' or self.pagination_request_style == 'offset_paginator': + if self.pagination_results_limit: + self.ABORT_AT_RECORD_COUNT = self.pagination_results_limit # Will raise an exception. + if pagination_page_size: + self.pagination_page_size = pagination_page_size + else: + if self.pagination_limit_per_page_param: + page_limit_param = self.pagination_limit_per_page_param + else: + page_limit_param = "limit" + self.pagination_page_size = int(self.params.get(page_limit_param, 25)) # Default to requesting 25 records + else: + if self.pagination_results_limit: + self.ABORT_AT_RECORD_COUNT = self.pagination_results_limit # Will raise an exception. + self.pagination_page_size = pagination_page_size + + # GitHub is missing the "since" parameter on a few endpoints + # set this parameter to True if your stream needs to navigate data in descending order + # and try to exit early on its own. + # This only has effect on streams whose `replication_key` is `updated_at`. + self.use_fake_since_parameter = False @property def http_headers(self) -> dict: @@ -93,59 +182,86 @@ def http_headers(self) -> dict: return headers - def _get_next_page_token_default( - self, response: requests.Response, previous_token: Optional[Any] - ) -> Optional[str]: - """Return a token for identifying next page or None if no more pages. - This method follows the default style of getting the next page token from the - default path provided in the config or, if that doesn't exist, the header. - - Args: - response: the requests.Response given by the api call. - previous_token: optional - the token representing the current/previous page - of results. + def get_new_paginator(self): + """Return the requested paginator required to retrieve all data from the API. Returns: - A str representing the next page to be queried or `None`. + Paginator Class. """ - if self.next_page_token_jsonpath: - all_matches = extract_jsonpath( - self.next_page_token_jsonpath, response.json() - ) - first_match = next(iter(all_matches), None) - next_page_token = first_match - else: - next_page_token = response.headers.get("X-Next-Page", None) - return next_page_token - - def _get_next_page_token_style1( - self, response: requests.Response, previous_token: Optional[Any] - ) -> Any: - """Return a token for identifying next page or None if no more pages. + self.logger.info(f"the next_page_token_jsonpath = {self.next_page_token_jsonpath}.") + + if self.pagination_request_style == 'jsonpath_paginator' or self.pagination_request_style == 'default': + return JSONPathPaginator(self.next_page_token_jsonpath) + elif self.pagination_request_style == 'simple_header_paginator': # Example Gitlab.com + return SimpleHeaderPaginator('X-Next-Page') + elif self.pagination_request_style == 'header_link_paginator': + return HeaderLinkPaginator() + elif self.pagination_request_style == 'restapi_header_link_paginator': # Example GitHub.com + return RestAPIHeaderLinkPaginator(pagination_page_size=self.pagination_page_size, + pagination_results_limit=self.pagination_results_limit, + replication_key=self.replication_key) + elif self.pagination_request_style == 'style1' or self.pagination_request_style == 'offset_paginator': + return RestAPIOffsetPaginator(start_value=1, + page_size=self.pagination_page_size, + jsonpath=self.next_page_token_jsonpath, + pagination_total_limit_param=self.pagination_total_limit_param) + elif self.pagination_request_style == 'hateoas_paginator': + return BaseHATEOASPaginator() + elif self.pagination_request_style == 'single_page_paginator': + return SinglePagePaginator() + elif self.pagination_request_style == 'page_number_paginator': + return RestAPIBasePageNumberPaginator(jsonpath=self.next_page_token_jsonpath) + else: + self.logger.error(f"Unknown paginator {self.pagination_request_style}. Please declare a valid paginator.") + raise ValueError( + f"Unknown paginator {self.pagination_request_style}. Please declare a valid paginator." + ) - This method follows method of calculating the next page token from the - offsets, limits, and totals provided by the API. + def _get_url_params_page_style( + self, context: Optional[dict], next_page_token: Optional[Any] + ) -> Dict[str, Any]: + """Return a dictionary of values to be used in URL parameterization. Args: - response: required - the requests.Response given by the api call. - previous_token: optional - the token representing the current/previous page - of results. + context: optional - the singer context object. + next_page_token: optional - the token for the next page of results. Returns: - A str representing the next page to be queried or `None`. + An object containing the parameters to add to the request. """ - pagination = response.json().get("pagination", {}) - if pagination and all(x in pagination for x in ["offset", "limit", "total"]): - next_page_token = pagination["offset"] + pagination["limit"] - if next_page_token <= pagination["total"]: - return next_page_token - return None - - def _get_url_params_default( + # Initialise Starting Values + last_run_date = get_start_date(self, context) + params: dict = {} + if self.params: + for k, v in self.params.items(): + params[k] = v + if next_page_token: + if self.pagination_next_page_param: + next_page_parm = self.pagination_next_page_param + else: + next_page_parm = "page" + params[next_page_parm] = next_page_token + if self.replication_key: + # Use incremental replication (if available) via a filter query being sent to the API + # This assumes storing a replication timestamp and querying records greater than that + # date in subsequent runs. Config the appropriate source field and query template. + if self.source_search_field and self.source_search_query and last_run_date: + query_template = Template(self.source_search_query) + if self.use_request_body_not_params: + params[self.source_search_field] = json.loads(query_template.substitute(last_run_date=last_run_date)) + else: + params[self.source_search_field] = query_template.substitute(last_run_date=last_run_date) + else: + params["sort"] = "asc" + params["order_by"] = self.replication_key + + return params + + def _get_url_params_offset_style( self, context: Optional[dict], next_page_token: Optional[Any] ) -> Dict[str, Any]: """Return a dictionary of values to be used in URL parameterization. @@ -158,18 +274,104 @@ def _get_url_params_default( An object containing the parameters to add to the request. """ + # Initialise Starting Values + last_run_date = get_start_date(self, context) params: dict = {} + if self.params: for k, v in self.params.items(): params[k] = v if next_page_token: - params["page"] = next_page_token + if self.pagination_next_page_param: + next_page_parm = self.pagination_next_page_param + else: + next_page_parm = "offset" + params[next_page_parm] = next_page_token + if self.pagination_page_size is not None: + if self.pagination_limit_per_page_param: + limit_per_page_param = self.pagination_limit_per_page_param + else: + limit_per_page_param = "limit" + params[limit_per_page_param] = self.pagination_page_size if self.replication_key: - params["sort"] = "asc" - params["order_by"] = self.replication_key + # Use incremental replication (if available) via a filter query being sent to the API + # This assumes storing a replication timestamp and querying records greater than that + # date in subsequent runs. Config the appropriate source field and query template. + if self.source_search_field and self.source_search_query and last_run_date: + query_template = Template(self.source_search_query) + if self.use_request_body_not_params: + params[self.source_search_field] = json.loads(query_template.substitute(last_run_date=last_run_date)) + else: + params[self.source_search_field] = query_template.substitute(last_run_date=last_run_date) + else: + params["sort"] = "asc" + params["order_by"] = self.replication_key + return params - def _get_url_params_style1( + def _get_url_params_header_link( + self, context: Optional[Dict], next_page_token: Optional[Any] + ) -> Dict[str, Any]: + """Return a dictionary of values to be used in URL parameterization. + Logic based on https://github.com/MeltanoLabs/tap-github + + Args: + context: optional - the singer context object. + next_page_token: optional - the token for the next page of results. + + Returns: + An object containing the parameters to add to the request. + + """ + params: dict = {} + if self.params: + for k, v in self.params.items(): + params[k] = v + if self.pagination_page_size: + pagination_page_size = self.pagination_page_size + else: + pagination_page_size = 25 # Default to 25 per page if not set + if self.pagination_limit_per_page_param: + limit_per_page_param = self.pagination_limit_per_page_param + else: + limit_per_page_param = "per_page" + params[limit_per_page_param] = pagination_page_size + if next_page_token: + request_parameters = parse_qs(str(next_page_token)) + for k, v in request_parameters.items(): + params[k] = v + + if self.replication_key == "updated_at": + params["sort"] = "updated" + params["direction"] = "desc" if self.use_fake_since_parameter else "asc" + + # Unfortunately the /starred, /stargazers (starred_at) and /events (created_at) endpoints do not support + # the "since" parameter out of the box. But we use a workaround in 'get_next_page_token'. + elif self.replication_key in ["starred_at", "created_at"]: + params["sort"] = "created" + params["direction"] = "desc" + + # Warning: /commits endpoint accept "since" but results are ordered by descending commit_timestamp + elif self.replication_key == "commit_timestamp": + params["direction"] = "desc" + + elif self.replication_key: + self.logger.warning( + f"The replication key '{self.replication_key}' is not fully supported by this client yet." + ) + + since = self.get_starting_timestamp(context) + since_key = "since" if not self.use_fake_since_parameter else "fake_since" + if self.replication_key and since: + params[since_key] = since + # Leverage conditional requests to save API quotas + # https://github.community/t/how-does-if-modified-since-work/139627 + self._http_headers["If-modified-since"] = email.utils.format_datetime(since) + + return params + + + def _get_url_params_hateoas_body( self, context: Optional[dict], next_page_token: Optional[Any] ) -> Dict[str, Any]: """Return a dictionary of values to be used in URL parameterization. @@ -178,21 +380,55 @@ def _get_url_params_style1( context: optional - the singer context object. next_page_token: optional - the token for the next page of results. + + HATEOAS stands for "Hypermedia as the Engine of Application State". + See https://en.wikipedia.org/wiki/HATEOAS. + + Note: Under the HATEOAS model, the returned token contains all the + required parameters for the subsequent call. The function splits the + parameters into Dict key value pairs for subsequent requests. + Returns: An object containing the parameters to add to the request. """ + + # Initialise Starting Values + last_run_date = get_start_date(self, context) params: dict = {} + if self.params: for k, v in self.params.items(): params[k] = v + + # Set Pagination Limits if required. + if self.pagination_page_size and self.pagination_limit_per_page_param: + params[self.pagination_limit_per_page_param] = self.pagination_page_size + if next_page_token: - params["offset"] = next_page_token - if self.pagination_page_size is not None: - params["limit"] = self.pagination_page_size - if self.replication_key: - params["sort"] = "asc" - params["order_by"] = self.replication_key + # Parse the next_page_token for the path and parameters + url_parsed = urlparse(next_page_token) + if url_parsed.query: + params.update(parse_qsl(url_parsed.query)) + else: + params.update(parse_qsl(url_parsed.path)) + if url_parsed.path == next_page_token: + self.path = "" + else: + self.path=url_parsed.path + elif self.replication_key: + # Use incremental replication (if available) via a filter query being sent to the API + # This assumes storing a replication timestamp and querying records greater than that + # date in subsequent runs. Config the appropriate source field and query template. + if self.source_search_field and self.source_search_query and last_run_date: + query_template = Template(self.source_search_query) + if self.use_request_body_not_params: + params[self.source_search_field] = json.loads(query_template.substitute(last_run_date=last_run_date)) + else: + params[self.source_search_field] = query_template.substitute(last_run_date=last_run_date) + elif self.source_search_field and last_run_date: + params[self.source_search_field] = "gt" + last_run_date + return params def parse_response(self, response: requests.Response) -> Iterable[dict]: diff --git a/tap_rest_api_msdk/tap.py b/tap_rest_api_msdk/tap.py index 50da474..96c2844 100644 --- a/tap_rest_api_msdk/tap.py +++ b/tap_rest_api_msdk/tap.py @@ -9,15 +9,18 @@ from singer_sdk import Tap from singer_sdk import typing as th from singer_sdk.helpers.jsonpath import extract_jsonpath +from tap_rest_api_msdk.auth import select_authenticator from tap_rest_api_msdk.streams import DynamicStream from tap_rest_api_msdk.utils import flatten_json - class TapRestApiMsdk(Tap): """rest-api tap class.""" name = "tap-rest-api-msdk" + # Required for Authentication in tap.py - function APIAuthenticatorBase + tap_name = name + common_properties = th.PropertiesList( th.Property( "path", @@ -42,7 +45,7 @@ class TapRestApiMsdk(Tap): required=False, description="An object of headers to pass into the api calls. Stream level" "headers will be merged with top-level params with stream" - "level params overwriting top-level params with the same key", + "level params overwriting top-level params with the same key.", ), th.Property( "records_path", @@ -63,8 +66,8 @@ class TapRestApiMsdk(Tap): "replication_key", th.StringType, required=False, - description="the json key of the replication key. Note that this should " - "be an incrementing integer or datetime object.", + description="the json response field representing the replication key." + "Note that this should be an incrementing integer or datetime object.", ), th.Property( "except_keys", @@ -88,6 +91,37 @@ class TapRestApiMsdk(Tap): description="number of records used to infer the stream's schema. " "Defaults to 50.", ), + th.Property( + "start_date", + th.DateTimeType, + required=False, + description="An optional field. Normally required when using the" + "replication_key. This is the initial starting date when using a" + "date based replication key and there is no state available.", + ), + th.Property( + "source_search_field", + th.StringType, + required=False, + description="An optional field name which can be used for querying specific " + "records from supported API's. The intend for this parameter is to continue " + "incrementally processing from a previous state. Example `last-updated`. " + "Note: You must also set the replication_key, where the replication_key is" + "json response representation of the API `source_search_field`. You should" + "also supply the `source_search_query`, `replication_key` and `start_date`.", + ), + th.Property( + "source_search_query", + th.StringType, + required=False, + description="An optional query template to be issued against the API." + "Substitute the query field you are querying against with $last_run_date. At" + "run-time, the tap will dynamically update the token with either the `start_date`" + "or the last bookmark / state value. A simple template Example for FHIR API's: " + "gt$last_run_date. A more complex example against an Opensearch API, " + "{\"bool\": {\"filter\": [{\"range\": { \"meta.lastUpdated\": { \"gt\": \"$last_run_date\" }}}] }} ." + "Note: Any required double quotes in the query template must be escaped.", + ), ) top_level_properties = th.PropertiesList( @@ -97,12 +131,136 @@ class TapRestApiMsdk(Tap): required=True, description="the base url/endpoint for the desired api", ), - # th.Property("auth_method", th.StringType, default='no_auth', required=False), - # th.Property("auth_token", th.StringType, required=False), + th.Property( + "auth_method", + th.StringType, + default='no_auth', + required=False, + description="The method of authentication used by the API. Supported options include " + "oauth: for OAuth2 authentication, basic: Basic Header authorization - base64-encoded " + "username + password config items, api_key: for API Keys in the header e.g. X-API-KEY," + "bearer_token: for Bearer token authorization, aws: for AWS Authentication." + "Defaults to no_auth which will take authentication parameters passed via the headers" + "config." + ), + th.Property( + "api_keys", + th.ObjectType(), + required=False, + description="A object of API Key/Value pairs used by the api_key auth method " + "Example: { ""X-API-KEY"": ""my secret value""}." + ), + th.Property( + "client_id", + th.StringType, + required=False, + description="Used for the OAuth2 authentication method. The public application ID that's " + "assigned for Authentication. The client_id should accompany a client_secret." + ), + th.Property( + "client_secret", + th.StringType, + required=False, + description="Used for the OAuth2 authentication method. The client_secret is a secret " + "known only to the application and the authorization server. It is essential the " + "application's own password." + ), + th.Property( + "username", + th.StringType, + required=False, + description="Used for a number of authentication methods that use a user " + "password combination for authentication." + ), + th.Property( + "password", + th.StringType, + required=False, + description="Used for a number of authentication methods that use a user " + "password combination for authentication." + ), + th.Property( + "bearer_token", + th.StringType, + required=False, + description="Used for the Bearer Authentication method, which uses a token " + "as part of the authorization header for authentication." + ), + th.Property( + "refresh_token", + th.StringType, + required=False, + description="An OAuth2 Refresh Token is a string that the OAuth2 client can use to " + "get a new access token without the user's interaction." + ), + th.Property( + "grant_type", + th.StringType, + required=False, + description="Used for the OAuth2 authentication method. The grant_type is required " + "to describe the OAuth2 flow. Flows support by this tap include client_credentials, " + "refresh_token, password." + ), + th.Property( + "scope", + th.StringType, + required=False, + description="Used for the OAuth2 authentication method. The scope is optional, " + "it is a mechanism to limit the amount of access that is granted to an access token. " + "One or more scopes can be provided delimited by a space." + ), + th.Property( + "access_token_url", + th.StringType, + required=False, + description="Used for the OAuth2 authentication method. This is the end-point for " + "the authentication server used to exchange the authorization codes for a access " + "token." + ), + th.Property( + "redirect_uri", + th.StringType, + required=False, + description="Used for the OAuth2 authentication method. This is optional as the " + "redirect_uri may be part of the token returned by the authentication server. If a " + "redirect_uri is provided, it determines where the API server redirects the user " + "after the user completes the authorization flow." + ), + th.Property( + "oauth_extras", + th.ObjectType(), + required=False, + description="A object of Key/Value pairs for additional oauth config parameters " + "which may be required by the authorization server." + "Example: { ""resource"": ""https://analysis.windows.net/powerbi/api""}." + ), + th.Property( + "oauth_expiration_secs", + th.IntegerType, + default=None, + required=False, + description="Used for OAuth2 authentication method. This optional setting is a " + "timer for the expiration of a token in seconds. If not set the OAuth will use " + "the default expiration set in the token by the authorization server." + ), + th.Property( + "aws_credentials", + th.ObjectType(), + default=None, + required=False, + description="An object of aws credentials to authenticate to access AWS services." + "This example is to access the AWS OpenSearch service." + "Example: { ""aws_access_key_id"": ""my_aws_key_id"", " + " ""aws_secret_access_key"": ""my_aws_secret_access_key"", " + " ""aws_region"": ""us-east-1"", " + " ""aws_service"": ""es"", " + " ""use_signed_credentials"": true} " + + ), th.Property( "next_page_token_path", th.StringType, - default="$.next_page", + default=None, required=False, description="a jsonpath string representing the path to the 'next page' " "token. Defaults to `$.next_page`", @@ -123,6 +281,15 @@ class TapRestApiMsdk(Tap): description="the pagination style to use for response. " "Defaults to `default`", ), + th.Property( + "use_request_body_not_params", + th.BooleanType, + default=False, + required=False, + description="sends the request parameters in the request body." + "This is normally not required, a few API's like OpenSearch" + "require this. Defaults to `False`", + ), th.Property( "pagination_page_size", th.IntegerType, @@ -130,6 +297,34 @@ class TapRestApiMsdk(Tap): required=False, description="the size of each page in records. Defaults to None", ), + th.Property( + "pagination_results_limit", + th.IntegerType, + default=None, + required=False, + description="limits the max number of records. Defaults to None", + ), + th.Property( + "pagination_next_page_param", + th.StringType, + default=None, + required=False, + description="The name of the param that indicates the page/offset. Defaults to None", + ), + th.Property( + "pagination_limit_per_page_param", + th.StringType, + default=None, + required=False, + description="The name of the param that indicates the limit/per_page. Defaults to None", + ), + th.Property( + "pagination_total_limit_param", + th.StringType, + default="total", + required=False, + description="The name of the param that indicates the total limit e.g. total, count. Defaults to total", + ), ) # add common properties to top-level properties @@ -190,6 +385,16 @@ def discover_streams(self) -> List[DynamicStream]: # type: ignore path = stream.get("path", self.config.get("path", "")) params = {**self.config.get("params", {}), **stream.get("params", {})} headers = {**self.config.get("headers", {}), **stream.get("headers", {})} + start_date = stream.get("start_date", self.config.get("start_date", "")) + replication_key=stream.get( + "replication_key", self.config.get("replication_key", "") + ) + source_search_field=stream.get( + "source_search_field", self.config.get("source_search_field", "") + ) + source_search_query=stream.get( + "source_search_query", self.config.get("source_search_query", "") + ) schema = {} schema_config = stream.get("schema") @@ -229,15 +434,21 @@ def discover_streams(self) -> List[DynamicStream]: # type: ignore primary_keys=stream.get( "primary_keys", self.config.get("primary_keys", []) ), - replication_key=stream.get( - "replication_key", self.config.get("replication_key", "") - ), + replication_key=replication_key, except_keys=except_keys, - next_page_token_path=self.config["next_page_token_path"], + next_page_token_path=self.config.get("next_page_token_path"), pagination_request_style=self.config["pagination_request_style"], pagination_response_style=self.config["pagination_response_style"], pagination_page_size=self.config.get("pagination_page_size"), + pagination_results_limit=self.config.get("pagination_results_limit"), + pagination_next_page_param=self.config.get("pagination_next_page_param"), + pagination_limit_per_page_param=self.config.get("pagination_limit_per_page_param"), + pagination_total_limit_param=self.config.get("pagination_total_limit_param"), schema=schema, + start_date=start_date, + source_search_field=source_search_field, + source_search_query=source_search_query, + use_request_body_not_params=self.config.get("use_request_body_not_params"), ) ) @@ -253,6 +464,10 @@ def get_schema( headers: dict, ) -> Any: """Infer schema from the first records returned by api. Creates a Stream object. + + If auth_method is set, will call select_authenticator to obtain credentials + to issue a request to sample some records. The select_authenticator will + set the self.http_auth if required by the request authenticator. Args: records_path: required - see config_jsonschema. @@ -269,17 +484,33 @@ def get_schema( A schema for the stream. """ - # todo: this request format is not very robust - r = requests.get(self.config["api_url"] + path, params=params, headers=headers) + # TODO: this request format is not very robust + + # Initialise Variables + auth_method = self.config.get("auth_method", "") + self.http_auth = None + + if auth_method and not auth_method == "no_auth": + # Initializing Authenticator for authorisation to obtain a schema. + # Will set the self.http_auth if required by a given authenticator + authenticator = select_authenticator(self) + if hasattr(authenticator, "auth_headers"): + headers.update(authenticator.auth_headers or {}) + if hasattr(authenticator, "auth_params"): + params.update(authenticator.auth_params or {}) + + r = requests.get(self.config["api_url"] + path, auth=self.http_auth, params=params, headers=headers) if r.ok: records = extract_jsonpath(records_path, input=r.json()) else: + self.logger.error(f"Error Connecting, message = {r.text}") raise ValueError(r.text) builder = SchemaBuilder() builder.add_schema(th.PropertiesList().to_dict()) for i, record in enumerate(records): if type(record) is not dict: + self.logger.error("Input must be a dict object.") raise ValueError("Input must be a dict object.") flat_record = flatten_json(record, except_keys) diff --git a/tap_rest_api_msdk/utils.py b/tap_rest_api_msdk/utils.py index 6bae646..85edf47 100644 --- a/tap_rest_api_msdk/utils.py +++ b/tap_rest_api_msdk/utils.py @@ -64,3 +64,43 @@ def flatten(o: Any, exception_keys: list, name: str = "") -> None: flatten(obj, exception_keys=except_keys) return out + +def unnest_dict(d): + """Flattens a dict object by create a new object with the key value pairs. + + Recursive flattening any nested dicts to a single level. + + Args: + obj: the dict object to be flattened. + + Returns: + A flattened dict object. + + """ + result = {} + for k,v in d.items(): + if isinstance(v, dict): + result.update(unnest_dict(v)) + else: + result[k] = v + return result + +def get_start_date( + self, + context: dict +) -> Any: + """Returns a start date if a DateTime bookmark is available. + Otherwise it returns the starting date as defined in + the start_date parameter. + + Args: + context: - the singer context object. + + Returns: + An start date else and empty string. + + """ + try: + return self.get_starting_timestamp(context).strftime("%Y-%m-%dT%H:%M:%S") + except (ValueError, AttributeError): + return self.get_starting_replication_key_value(context)