Skip to content

Commit

Permalink
refactor: Click view report stream state (#59)
Browse files Browse the repository at this point in the history
* Keep accepting additional OAuth properties for compatibility with Matatika

* Define config defaults for start/end dates

* Refactor `ClickViewReportStream` logic to partition state by customer ID

* Fix name of URL param used for pagination

* Fix default start date representation and date range calculation

* Correct date config schema types and update descriptions

* Forcibly update state for customers in `ClickViewReportStream` where no records are present, to prevent additional requests in successive runs

* Prefer `get_starting_replication_key_value` over `get_starting_timestamp` since replication key is technically not a timestamp
  • Loading branch information
ReubenFrankel authored Oct 22, 2024
1 parent f83321f commit 19c5e98
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 110 deletions.
28 changes: 16 additions & 12 deletions tap_googleads/client.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
"""REST client handling, including GoogleAdsStream base class."""

from datetime import datetime, timedelta
from datetime import datetime
from functools import cached_property
from typing import Any, Dict, Optional

import requests
from singer_sdk.authenticators import OAuthAuthenticator
from singer_sdk.streams import RESTStream

from tap_googleads.auth import GoogleAdsAuthenticator, ProxyGoogleAdsAuthenticator


class ResumableAPIError(Exception):
def __init__(self, message: str, response: requests.Response) -> None:
super().__init__(message)
self.response = response


class GoogleAdsStream(RESTStream):
"""GoogleAds stream class."""

Expand All @@ -19,9 +26,6 @@ class GoogleAdsStream(RESTStream):
next_page_token_jsonpath = "$.nextPageToken" # Or override `get_next_page_token`.
_LOG_REQUEST_METRIC_URLS: bool = True

_end_date = datetime.now()
_start_date = _end_date - timedelta(days=91)

@cached_property
def authenticator(self) -> OAuthAuthenticator:
"""Return a new authenticator object."""
Expand Down Expand Up @@ -89,6 +93,12 @@ def get_url_params(
params["order_by"] = self.replication_key
return params

def get_records(self, context):
try:
yield from super().get_records(context)
except ResumableAPIError as e:
self.logger.warning(e)

@property
def gaql(self):
raise NotImplementedError
Expand All @@ -101,17 +111,11 @@ def path(self) -> str:

@cached_property
def start_date(self):
date = self.config.get("start_date")
date = datetime.fromisoformat(date) if date else self._start_date

return date.strftime(r"'%Y-%m-%d'")
return datetime.fromisoformat(self.config["start_date"]).strftime(r"'%Y-%m-%d'")

@cached_property
def end_date(self):
date = self.config.get("end_date")
date = datetime.fromisoformat(date) if date else self._end_date

return date.strftime(r"'%Y-%m-%d'")
return datetime.fromisoformat(self.config["end_date"]).strftime(r"'%Y-%m-%d'")

@cached_property
def customer_ids(self):
Expand Down
127 changes: 35 additions & 92 deletions tap_googleads/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

from __future__ import annotations

from datetime import datetime, timedelta
import datetime
from http import HTTPStatus
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional
from typing import TYPE_CHECKING, Any, Dict, Iterable

from singer_sdk import typing as th # JSON Schema typing helpers

from tap_googleads.client import GoogleAdsStream
from tap_googleads.client import GoogleAdsStream, ResumableAPIError

if TYPE_CHECKING:
from singer_sdk.helpers.types import Context, Record
Expand Down Expand Up @@ -99,6 +100,13 @@ def gaql(self):

seen_customer_ids = set()

def validate_response(self, response):
if response.status_code == HTTPStatus.FORBIDDEN:
msg = self.response_error_message(response)
raise ResumableAPIError(msg, response)

super().validate_response(response)

def generate_child_contexts(self, record, context):
customer_ids = self.config.get("customer_ids")

Expand Down Expand Up @@ -165,9 +173,11 @@ def get_records(self, context: Context) -> Iterable[Dict[str, Any]]:


class ClickViewReportStream(ReportsStream):
date: datetime.date

@property
def gaql(self):
return """
return f"""
SELECT
click_view.gclid
, customer.id
Expand All @@ -185,7 +195,7 @@ def gaql(self):
, click_view.keyword
, click_view.keyword_info.match_type
FROM click_view
WHERE segments.date = {date}
WHERE segments.date = '{self.date.isoformat()}'
"""

records_jsonpath = "$.results[*]"
Expand All @@ -204,7 +214,6 @@ def gaql(self):
]
replication_key = "date"
schema_filepath = SCHEMAS_DIR / "click_view_report.json"
state_partitioning_keys = []

def post_process(self, row, context):
row["date"] = row["segments"].pop("date")
Expand All @@ -228,102 +237,36 @@ def get_url_params(self, context, next_page_token):
"""
params: dict = {}
if next_page_token:
params["page"] = next_page_token
params["pageToken"] = next_page_token
return params

def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
date_list = []
def request_records(self, context):
start_value = self.get_starting_replication_key_value(context)

replication_values = []
start_date = datetime.date.fromisoformat(start_value)
end_date = datetime.date.fromisoformat(self.config["end_date"])

value = self.stream_state.get("replication_key_value", False)
if value:
replication_values.append(value)
delta = end_date - start_date
dates = (start_date + datetime.timedelta(days=i) for i in range(delta.days))

# Get the maximum replication_key_value
if len(replication_values) > 0:
last_replication_date = max(replication_values)
else:
last_replication_date = None
for self.date in dates:
records = list(super().request_records(context))

yesterdays_date = datetime.now() - timedelta(days=1)
if not records:
self._increment_stream_state({"date": self.date.isoformat()}, context=self.context)

# if last_replication_date is today or greater, set the date to yesterday (last full days data)
if last_replication_date:
if last_replication_date >= datetime.now().strftime("%Y-%m-%d"):
last_replication_date = yesterdays_date.strftime("%Y-%m-%d")
yield from records

# This is if the last_replication_date defaults back to the start date (timestamp)
if "T" in last_replication_date:
last_replication_date, _ = last_replication_date.split("T")

current_date = datetime.strptime(self.start_date.replace("'", ""), "%Y-%m-%d")

if last_replication_date:
current_date = datetime.strptime(
last_replication_date.replace("'", ""),
"%Y-%m-%d",
def validate_response(self, response):
if response.status_code == HTTPStatus.FORBIDDEN:
error = response.json()["error"]["details"][0]["errors"][0]
msg = (
"Click view report not accessible to customer "
f"'{self.context['customer_id']}': {error['message']}"
)
raise ResumableAPIError(msg, response)

# Generate a list of dates up to yesterday
if current_date < datetime.now() - timedelta(days=1):
while current_date < datetime.now() - timedelta(days=1):
date_list.append("'" + current_date.strftime("%Y-%m-%d") + "'")
current_date += timedelta(days=1)
else:
date_list.append("'" + yesterdays_date.strftime("%Y-%m-%d") + "'")

for date in date_list:
context["date"] = date
# Call the parent get_records with the modified context (date value)
for record in super().get_records(context):
yield record

def sync(self, context):
"""Sync this stream.
This method is internal to the SDK and should not need to be overridden.
Args:
context: Stream partition or context dictionary.
Raises:
Exception: Any exception raised by the sync process.
"""
msg = f"Beginning {self.replication_method.lower()} sync of '{self.name}'"
if context:
msg += f" with context: {context}"
self.logger.info("%s...", msg)

# Send a SCHEMA message to the downstream target:
if self.selected:
self._write_schema_message()

try:
batch_config = self.get_batch_config(self.config)
if batch_config:
self._sync_batches(batch_config, context=context)
else:
# Sync the records themselves:
for _ in self._sync_records(context=context):
pass
except Exception as ex:
if hasattr(ex, "response") and ex.response is not None:
status_code = ex.response.status_code
if status_code not in [400, 403]:
# Raise the exception if it's not 400 or 403
self.logger.exception(
"An unhandled error occurred while syncing '%s'",
self.name,
)
raise ex
else:
# Log the 400 or 403 error and continue
self.logger.exception(
"An unhandled error occurred while syncing '%s'",
self.name,
)
super().validate_response(response)


class CampaignsStream(ReportsStream):
Expand Down
16 changes: 10 additions & 6 deletions tap_googleads/tap.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""GoogleAds tap class."""

from datetime import datetime, timedelta, timezone
from typing import List

from singer_sdk import Stream, Tap
Expand All @@ -14,10 +15,10 @@
CampaignPerformanceByGenderAndDevice,
CampaignPerformanceByLocation,
CampaignsStream,
ClickViewReportStream,
CustomerHierarchyStream,
GeoPerformance,
GeotargetsStream,
ClickViewReportStream,
)

STREAM_TYPES = [
Expand Down Expand Up @@ -46,6 +47,8 @@ class TapGoogleAds(Tap):
required=True,
secret=True,
)
_end_date = datetime.now(timezone.utc).date()
_start_date = _end_date - timedelta(days=90)

# TODO: Add Descriptions
config_jsonschema = th.PropertiesList(
Expand All @@ -65,7 +68,6 @@ class TapGoogleAds(Tap):
secret=True,
),
_refresh_token,
additional_properties=False,
),
th.ObjectType(
th.Property(
Expand Down Expand Up @@ -106,13 +108,15 @@ class TapGoogleAds(Tap):
),
th.Property(
"start_date",
th.DateTimeType,
description="Start date for all of the streams that use date based filtering.",
th.DateType,
description="ISO start date for all of the streams that use date-based filtering. Defaults to 90 days before the current day.",
default=_start_date.isoformat(),
),
th.Property(
"end_date",
th.DateTimeType,
description="End date for all of the streams that use date based filtering.",
th.DateType,
description="ISO end date for all of the streams that use date-based filtering. Defaults to the current day.",
default=_end_date.isoformat(),
),
th.Property(
"enable_click_view_report_stream",
Expand Down

0 comments on commit 19c5e98

Please sign in to comment.