Skip to content

Commit

Permalink
Merge pull request #42 from Matatika/feature/new-click-view-stream-ne…
Browse files Browse the repository at this point in the history
…w-customer-array-setting

Feature - New array of customer ids setting & New Click View Report Stream
  • Loading branch information
DanielPDWalker authored Sep 3, 2024
2 parents 04b0e32 + 3e2aeb7 commit e4ba01d
Show file tree
Hide file tree
Showing 5 changed files with 281 additions and 12 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,13 @@ Settings required to run this tap.
- `login_customer_id` (optional)
- `start_date` (optional)
- `end_date` (optional)
- `comma_separated_string_of_customer_ids` (optional) String of comma separated ids: `123, 456, 789`
- `enable_click_view_report_stream` (optional) Boolean, Default is `False`

If using a manager account, `login_customer_id` should be set to the customer ID of the manager account while `customer_id` should be set to the customer ID of the account you want to sync.

If you provide `comma_separated_string_of_customer_ids`, you are overriding what customer_id(s) to get data for.

How to get these settings can be found in the following Google Ads documentation:

https://developers.google.com/adwords/api/docs/guides/authentication
Expand Down
2 changes: 1 addition & 1 deletion tap_googleads/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def http_headers(self) -> dict:
if "user_agent" in self.config:
headers["User-Agent"] = self.config.get("user_agent")
headers["developer-token"] = self.config["developer_token"]
headers["login-customer-id"] = self.config.get("login_customer_id", self.config["customer_id"])
headers["login-customer-id"] = self.config.get("login_customer_id")
return headers

def get_next_page_token(
Expand Down
87 changes: 87 additions & 0 deletions tap_googleads/schemas/click_view_report.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"clickView": {
"type": "object",
"properties": {
"gclid": {
"type": "string"
},
"adGroupAd": {
"type": "string"
},
"keyword": {
"type": "string"
},
"keywordInfo": {
"type": "object",
"properties": {
"matchType": {
"type": "string"
}
}
}
}
},
"customer": {
"type": "object",
"properties": {
"id": {
"type": "string"
}
}
},
"adGroup": {
"type": "object",
"properties": {
"id": {
"type": "string"
},
"name": {
"type": "string"
}
}
},
"campaign": {
"type": "object",
"properties": {
"id": {
"type": "string"
},
"name": {
"type": "string"
}
}
},
"date": {
"type": "string",
"format": "date"
},
"segments": {
"type": "object",
"properties": {
"adNetworkType": {
"type": "string"
},
"device": {
"type": "string"
},
"slot": {
"type": "string"
},
"clickType": {
"type": "string"
}
}
},
"metrics": {
"type": "object",
"properties": {
"clicks": {
"type": "string"
}
}
}
}
}
186 changes: 175 additions & 11 deletions tap_googleads/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from pathlib import Path
from typing import Any, Dict, Iterable, Optional
from datetime import datetime, timedelta

from singer_sdk import typing as th # JSON Schema typing helpers

Expand All @@ -26,7 +27,7 @@ class AccessibleCustomers(GoogleAdsStream):

def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
"""Return a context dictionary for child streams."""
return {"resourceNames": ["customers/" + self.config.get("customer_id")]}
return {"resourceNames": ["customers/" + self.config.get('customer_id')]}


class CustomerHierarchyStream(GoogleAdsStream):
Expand Down Expand Up @@ -67,7 +68,7 @@ def gaql(self):

records_jsonpath = "$.results[*]"
name = "stream_customer_hierarchy"
primary_keys = ["customer_client__id"]
primary_keys = ["customerClient__id"]
replication_key = None
parent_stream_type = AccessibleCustomers
schema = th.PropertiesList(
Expand Down Expand Up @@ -99,17 +100,22 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
Yields:
One item per (possibly processed) record in the API.
"""

for row in self.request_records(context):
row = self.post_process(row, context)
# Don't search Manager accounts as we can't query them for everything
if row["customerClient"]["manager"] == True:
continue
yield row
if self.config.get("comma_separated_string_of_customer_ids", False):
customer_ids_list = self.config.get("comma_separated_string_of_customer_ids").replace(" ", "").split(",")
for i in customer_ids_list:
yield {"customerClient": {"id": str(i)}}
else:
for row in self.request_records(context):
row = self.post_process(row, context)
# Don't search Manager accounts as we can't query them for everything
if row["customerClient"]["manager"] == True:
continue
yield row

def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
"""Return a context dictionary for child streams."""
return {"customer_id": self.config.get("customer_id")}
return {"customer_id": record["customerClient"]["id"]}



class GeotargetsStream(GoogleAdsStream):
Expand Down Expand Up @@ -148,12 +154,170 @@ def gaql(self):
@property
def path(self):
# Paramas
path = f"/customers/{self.config.get('customer_id')}"
path = "/customers/{customer_id}"
path = path + "/googleAds:search"
path = path + "?pageSize=10000"
path = path + f"&query={self.gaql}"
return path

class ClickViewReportStream(ReportsStream):

@property
def gaql(self):
return """
SELECT
click_view.gclid
, customer.id
, click_view.ad_group_ad
, ad_group.id
, ad_group.name
, campaign.id
, campaign.name
, segments.ad_network_type
, segments.device
, segments.date
, segments.slot
, metrics.clicks
, segments.click_type
, click_view.keyword
, click_view.keyword_info.match_type
FROM click_view
WHERE segments.date = {date}
"""

records_jsonpath = "$.results[*]"
name = "stream_click_view_report"
primary_keys = [
"clickView__gclid",
"clickView__keyword",
"clickView__keywordInfo__matchType",
"customer__id",
"adGroup__id",
"campaign__id",
"segments__device",
"segments__adNetworkType",
"segments__slot",
"date"
]
replication_key = "date"
schema_filepath = SCHEMAS_DIR / "click_view_report.json"
state_partitioning_keys = []

def post_process(self, row, context):
row["date"] = row["segments"].pop("date")

if row.get("clickView", {}).get("keyword") == None:
row["clickView"]["keyword"] = 'null'
row["clickView"]["keywordInfo"] = {"matchType": "null"}

return row


def get_url_params(self, context, next_page_token):
"""Return a dictionary of values to be used in URL parameterization.
Args:
context: The stream context.
next_page_token: The next page index or value.
Returns:
A dictionary of URL query parameters.
"""
params: dict = {}
if next_page_token:
params["page"] = next_page_token
return params

def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
date_list = []

replication_values = []

value = self.stream_state.get("replication_key_value", False)
if value:
replication_values.append(value)

# Get the maximum replication_key_value
if len(replication_values) > 0:
last_replication_date = max(replication_values)
else:
last_replication_date = None

yesterdays_date = datetime.now() - timedelta(days=1)

# if last_replication_date is today or greater, set the date to yesterday (last full days data)
if last_replication_date:
if last_replication_date >= datetime.now().strftime("%Y-%m-%d"):
last_replication_date = yesterdays_date.strftime("%Y-%m-%d")

# This is if the last_replication_date defaults back to the start date (timestamp)
if 'T' in last_replication_date:
last_replication_date, _ = last_replication_date.split('T')

current_date = datetime.strptime(self.start_date.replace("'", ""), "%Y-%m-%d")

if last_replication_date:
current_date = datetime.strptime(last_replication_date.replace("'", ""), "%Y-%m-%d")

# Generate a list of dates up to yesterday
if current_date < datetime.now() - timedelta(days=1):
while current_date < datetime.now() - timedelta(days=1):
date_list.append("'" + current_date.strftime("%Y-%m-%d") + "'")
current_date += timedelta(days=1)
else:
date_list.append("'" + yesterdays_date.strftime("%Y-%m-%d") + "'")

for date in date_list:
context['date'] = date
# Call the parent get_records with the modified context (date value)
for record in super().get_records(context):
yield record

def sync(self, context):
"""Sync this stream.
This method is internal to the SDK and should not need to be overridden.
Args:
context: Stream partition or context dictionary.
Raises:
Exception: Any exception raised by the sync process.
"""
msg = f"Beginning {self.replication_method.lower()} sync of '{self.name}'"
if context:
msg += f" with context: {context}"
self.logger.info("%s...", msg)

# Send a SCHEMA message to the downstream target:
if self.selected:
self._write_schema_message()

try:
batch_config = self.get_batch_config(self.config)
if batch_config:
self._sync_batches(batch_config, context=context)
else:
# Sync the records themselves:
for _ in self._sync_records(context=context):
pass
except Exception as ex:
if hasattr(ex, 'response') and ex.response is not None:
status_code = ex.response.status_code
if status_code not in [400, 403]:
# Raise the exception if it's not 400 or 403
self.logger.exception(
"An unhandled error occurred while syncing '%s'",
self.name,
)
raise ex
else:
# Log the 400 or 403 error and continue
self.logger.exception(
"An unhandled error occurred while syncing '%s'",
self.name,
)


class CampaignsStream(ReportsStream):
"""Define custom stream."""
Expand Down
14 changes: 14 additions & 0 deletions tap_googleads/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
CustomerHierarchyStream,
GeoPerformance,
GeotargetsStream,
ClickViewReportStream,
)

STREAM_TYPES = [
Expand Down Expand Up @@ -66,8 +67,21 @@ class TapGoogleAds(Tap):
th.StringType,
description="Value to use in the login-customer-id header, if different from the customer_id to sync. Useful if you are syncing using a manager account.",
),
th.Property(
"comma_separated_string_of_customer_ids",
th.StringType,
description="Overrides the taps default get all data for all available customers logic, and will get you the data for only the the provided customer_ids",
),
th.Property(
"enable_click_view_report_stream",
th.BooleanType,
description="Enables the tap's ClickViewReportStream. This requires setting up / permission on your google ads account(s)",
),
).to_dict()

def discover_streams(self) -> List[Stream]:
"""Return a list of discovered streams."""
if self.config.get("enable_click_view_report_stream"):
if self.config.get("enable_click_view_report_stream") == True:
STREAM_TYPES.append(ClickViewReportStream)
return [stream_class(tap=self) for stream_class in STREAM_TYPES]

0 comments on commit e4ba01d

Please sign in to comment.