Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
DE-1657 Tap-Hubspot - Sync issues reharding large format of associati…
Browse files Browse the repository at this point in the history
…ons (#56)

* WIP

* Reintroduced strptime_to_utc

* Added environment
  • Loading branch information
SBurwash authored May 18, 2023
1 parent 27f4fa6 commit b8f21dd
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
5 changes: 5 additions & 0 deletions meltano.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
version: 1
send_anonymous_usage_stats: false
project_id: 40eb93e3-36f8-492b-bb71-deec50c35380
default_environment: test
environments:
- name: test
plugins:
extractors:
- name: tap-hubspot
Expand All @@ -20,6 +23,8 @@ plugins:
value: '2010-01-01T00:00:00Z'
config:
start_date: '2010-01-01T00:00:00Z'
select:
- deals.*
loaders:
- name: target-jsonl
variant: andyh1203
Expand Down
14 changes: 10 additions & 4 deletions tap_hubspot/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,14 @@
from typing import Any, Dict, Optional, List, Iterable, Callable

import pytz

from singer_sdk._singerlib.utils import strptime_to_utc
from singer_sdk.exceptions import RetriableAPIError
from singer_sdk.helpers.jsonpath import extract_jsonpath
from singer_sdk.streams import RESTStream
from singer_sdk.authenticators import BearerTokenAuthenticator
from singer_sdk import typing as th

SCHEMAS_DIR = Path(__file__).parent / Path("./schemas")


class HubspotStream(RESTStream):
"""Hubspot stream class."""

Expand Down Expand Up @@ -87,6 +85,14 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]:
"""Parse the response and return an iterator of result rows."""
yield from extract_jsonpath(self.records_jsonpath, input=response.json())

def post_process(self, row: dict, context: Optional[dict]) -> dict:
"""As needed, append or transform raw data to match expected structure.
Returns row, or None if row is to be excluded"""
if self.replication_key:
if strptime_to_utc(row[self.replication_key]) <= self.get_starting_timestamp(context).astimezone(pytz.utc):
return None
return row

def get_json_schema(self, from_type: str) -> dict:
"""Return the JSON Schema dict that describes the sql type.
Expand Down Expand Up @@ -172,7 +178,7 @@ def get_properties(self) -> List[dict]:
response.raise_for_status()
return data.get("results", [])
except requests.exceptions.HTTPError as e:
LOGGER.warning(
self.logger.warning(
"Dynamic discovery of properties failed with an exception, "
f"continuing gracefully with no dynamic properties: {e}, {data}"
)
Expand Down

0 comments on commit b8f21dd

Please sign in to comment.