From b8f21dda08c47e25e7b39bee6528b3fe033079da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Burwash?= <35510512+SBurwash@users.noreply.github.com> Date: Thu, 18 May 2023 08:59:49 -0400 Subject: [PATCH] DE-1657 Tap-Hubspot - Sync issues reharding large format of associations (#56) * WIP * Reintroduced strptime_to_utc * Added environment --- meltano.yml | 5 +++++ tap_hubspot/client.py | 14 ++++++++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/meltano.yml b/meltano.yml index 9319961..a549662 100644 --- a/meltano.yml +++ b/meltano.yml @@ -1,6 +1,9 @@ version: 1 send_anonymous_usage_stats: false project_id: 40eb93e3-36f8-492b-bb71-deec50c35380 +default_environment: test +environments: +- name: test plugins: extractors: - name: tap-hubspot @@ -20,6 +23,8 @@ plugins: value: '2010-01-01T00:00:00Z' config: start_date: '2010-01-01T00:00:00Z' + select: + - deals.* loaders: - name: target-jsonl variant: andyh1203 diff --git a/tap_hubspot/client.py b/tap_hubspot/client.py index 9622ffa..6ad7892 100644 --- a/tap_hubspot/client.py +++ b/tap_hubspot/client.py @@ -5,7 +5,7 @@ from typing import Any, Dict, Optional, List, Iterable, Callable import pytz - +from singer_sdk._singerlib.utils import strptime_to_utc from singer_sdk.exceptions import RetriableAPIError from singer_sdk.helpers.jsonpath import extract_jsonpath from singer_sdk.streams import RESTStream @@ -13,8 +13,6 @@ from singer_sdk import typing as th SCHEMAS_DIR = Path(__file__).parent / Path("./schemas") - - class HubspotStream(RESTStream): """Hubspot stream class.""" @@ -87,6 +85,14 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: """Parse the response and return an iterator of result rows.""" yield from extract_jsonpath(self.records_jsonpath, input=response.json()) + def post_process(self, row: dict, context: Optional[dict]) -> dict: + """As needed, append or transform raw data to match expected structure. + Returns row, or None if row is to be excluded""" + if self.replication_key: + if strptime_to_utc(row[self.replication_key]) <= self.get_starting_timestamp(context).astimezone(pytz.utc): + return None + return row + def get_json_schema(self, from_type: str) -> dict: """Return the JSON Schema dict that describes the sql type. @@ -172,7 +178,7 @@ def get_properties(self) -> List[dict]: response.raise_for_status() return data.get("results", []) except requests.exceptions.HTTPError as e: - LOGGER.warning( + self.logger.warning( "Dynamic discovery of properties failed with an exception, " f"continuing gracefully with no dynamic properties: {e}, {data}" )