diff --git a/README.md b/README.md index 09626f3..61d0e45 100644 --- a/README.md +++ b/README.md @@ -39,9 +39,13 @@ Settings required to run this tap. - `login_customer_id` (optional) - `start_date` (optional) - `end_date` (optional) +- `comma_separated_string_of_customer_ids` (optional) String of comma separated ids: `123, 456, 789` +- `enable_click_view_report_stream` (optional) Boolean, Default is `False` If using a manager account, `login_customer_id` should be set to the customer ID of the manager account while `customer_id` should be set to the customer ID of the account you want to sync. +If you provide `comma_separated_string_of_customer_ids`, you are overriding what customer_id(s) to get data for. + How to get these settings can be found in the following Google Ads documentation: https://developers.google.com/adwords/api/docs/guides/authentication diff --git a/tap_googleads/client.py b/tap_googleads/client.py index 27e79a6..4bf61d1 100644 --- a/tap_googleads/client.py +++ b/tap_googleads/client.py @@ -75,7 +75,7 @@ def http_headers(self) -> dict: if "user_agent" in self.config: headers["User-Agent"] = self.config.get("user_agent") headers["developer-token"] = self.config["developer_token"] - headers["login-customer-id"] = self.config.get("login_customer_id", self.config["customer_id"]) + headers["login-customer-id"] = self.config.get("login_customer_id") return headers def get_next_page_token( diff --git a/tap_googleads/schemas/click_view_report.json b/tap_googleads/schemas/click_view_report.json new file mode 100644 index 0000000..2b1ccd4 --- /dev/null +++ b/tap_googleads/schemas/click_view_report.json @@ -0,0 +1,87 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "clickView": { + "type": "object", + "properties": { + "gclid": { + "type": "string" + }, + "adGroupAd": { + "type": "string" + }, + "keyword": { + "type": "string" + }, + "keywordInfo": { + "type": "object", + "properties": { + "matchType": { + "type": "string" + } + } + } + } + }, + "customer": { + "type": "object", + "properties": { + "id": { + "type": "string" + } + } + }, + "adGroup": { + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "name": { + "type": "string" + } + } + }, + "campaign": { + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "name": { + "type": "string" + } + } + }, + "date": { + "type": "string", + "format": "date" + }, + "segments": { + "type": "object", + "properties": { + "adNetworkType": { + "type": "string" + }, + "device": { + "type": "string" + }, + "slot": { + "type": "string" + }, + "clickType": { + "type": "string" + } + } + }, + "metrics": { + "type": "object", + "properties": { + "clicks": { + "type": "string" + } + } + } + } +} \ No newline at end of file diff --git a/tap_googleads/streams.py b/tap_googleads/streams.py index 3c6edfa..19451f1 100644 --- a/tap_googleads/streams.py +++ b/tap_googleads/streams.py @@ -2,6 +2,7 @@ from pathlib import Path from typing import Any, Dict, Iterable, Optional +from datetime import datetime, timedelta from singer_sdk import typing as th # JSON Schema typing helpers @@ -26,7 +27,7 @@ class AccessibleCustomers(GoogleAdsStream): def get_child_context(self, record: dict, context: Optional[dict]) -> dict: """Return a context dictionary for child streams.""" - return {"resourceNames": ["customers/" + self.config.get("customer_id")]} + return {"resourceNames": ["customers/" + self.config.get('customer_id')]} class CustomerHierarchyStream(GoogleAdsStream): @@ -67,7 +68,7 @@ def gaql(self): records_jsonpath = "$.results[*]" name = "stream_customer_hierarchy" - primary_keys = ["customer_client__id"] + primary_keys = ["customerClient__id"] replication_key = None parent_stream_type = AccessibleCustomers schema = th.PropertiesList( @@ -99,17 +100,22 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]: Yields: One item per (possibly processed) record in the API. """ - - for row in self.request_records(context): - row = self.post_process(row, context) - # Don't search Manager accounts as we can't query them for everything - if row["customerClient"]["manager"] == True: - continue - yield row + if self.config.get("comma_separated_string_of_customer_ids", False): + customer_ids_list = self.config.get("comma_separated_string_of_customer_ids").replace(" ", "").split(",") + for i in customer_ids_list: + yield {"customerClient": {"id": str(i)}} + else: + for row in self.request_records(context): + row = self.post_process(row, context) + # Don't search Manager accounts as we can't query them for everything + if row["customerClient"]["manager"] == True: + continue + yield row def get_child_context(self, record: dict, context: Optional[dict]) -> dict: """Return a context dictionary for child streams.""" - return {"customer_id": self.config.get("customer_id")} + return {"customer_id": record["customerClient"]["id"]} + class GeotargetsStream(GoogleAdsStream): @@ -148,12 +154,170 @@ def gaql(self): @property def path(self): # Paramas - path = f"/customers/{self.config.get('customer_id')}" + path = "/customers/{customer_id}" path = path + "/googleAds:search" path = path + "?pageSize=10000" path = path + f"&query={self.gaql}" return path +class ClickViewReportStream(ReportsStream): + + @property + def gaql(self): + return """ + SELECT + click_view.gclid + , customer.id + , click_view.ad_group_ad + , ad_group.id + , ad_group.name + , campaign.id + , campaign.name + , segments.ad_network_type + , segments.device + , segments.date + , segments.slot + , metrics.clicks + , segments.click_type + , click_view.keyword + , click_view.keyword_info.match_type + FROM click_view + WHERE segments.date = {date} + """ + + records_jsonpath = "$.results[*]" + name = "stream_click_view_report" + primary_keys = [ + "clickView__gclid", + "clickView__keyword", + "clickView__keywordInfo__matchType", + "customer__id", + "adGroup__id", + "campaign__id", + "segments__device", + "segments__adNetworkType", + "segments__slot", + "date" + ] + replication_key = "date" + schema_filepath = SCHEMAS_DIR / "click_view_report.json" + state_partitioning_keys = [] + + def post_process(self, row, context): + row["date"] = row["segments"].pop("date") + + if row.get("clickView", {}).get("keyword") == None: + row["clickView"]["keyword"] = 'null' + row["clickView"]["keywordInfo"] = {"matchType": "null"} + + return row + + + def get_url_params(self, context, next_page_token): + """Return a dictionary of values to be used in URL parameterization. + + Args: + context: The stream context. + next_page_token: The next page index or value. + + Returns: + A dictionary of URL query parameters. + """ + params: dict = {} + if next_page_token: + params["page"] = next_page_token + return params + + def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]: + date_list = [] + + replication_values = [] + + value = self.stream_state.get("replication_key_value", False) + if value: + replication_values.append(value) + + # Get the maximum replication_key_value + if len(replication_values) > 0: + last_replication_date = max(replication_values) + else: + last_replication_date = None + + yesterdays_date = datetime.now() - timedelta(days=1) + + # if last_replication_date is today or greater, set the date to yesterday (last full days data) + if last_replication_date: + if last_replication_date >= datetime.now().strftime("%Y-%m-%d"): + last_replication_date = yesterdays_date.strftime("%Y-%m-%d") + + # This is if the last_replication_date defaults back to the start date (timestamp) + if 'T' in last_replication_date: + last_replication_date, _ = last_replication_date.split('T') + + current_date = datetime.strptime(self.start_date.replace("'", ""), "%Y-%m-%d") + + if last_replication_date: + current_date = datetime.strptime(last_replication_date.replace("'", ""), "%Y-%m-%d") + + # Generate a list of dates up to yesterday + if current_date < datetime.now() - timedelta(days=1): + while current_date < datetime.now() - timedelta(days=1): + date_list.append("'" + current_date.strftime("%Y-%m-%d") + "'") + current_date += timedelta(days=1) + else: + date_list.append("'" + yesterdays_date.strftime("%Y-%m-%d") + "'") + + for date in date_list: + context['date'] = date + # Call the parent get_records with the modified context (date value) + for record in super().get_records(context): + yield record + + def sync(self, context): + """Sync this stream. + + This method is internal to the SDK and should not need to be overridden. + + Args: + context: Stream partition or context dictionary. + + Raises: + Exception: Any exception raised by the sync process. + """ + msg = f"Beginning {self.replication_method.lower()} sync of '{self.name}'" + if context: + msg += f" with context: {context}" + self.logger.info("%s...", msg) + + # Send a SCHEMA message to the downstream target: + if self.selected: + self._write_schema_message() + + try: + batch_config = self.get_batch_config(self.config) + if batch_config: + self._sync_batches(batch_config, context=context) + else: + # Sync the records themselves: + for _ in self._sync_records(context=context): + pass + except Exception as ex: + if hasattr(ex, 'response') and ex.response is not None: + status_code = ex.response.status_code + if status_code not in [400, 403]: + # Raise the exception if it's not 400 or 403 + self.logger.exception( + "An unhandled error occurred while syncing '%s'", + self.name, + ) + raise ex + else: + # Log the 400 or 403 error and continue + self.logger.exception( + "An unhandled error occurred while syncing '%s'", + self.name, + ) + class CampaignsStream(ReportsStream): """Define custom stream.""" diff --git a/tap_googleads/tap.py b/tap_googleads/tap.py index 47140fe..e8ed8bb 100644 --- a/tap_googleads/tap.py +++ b/tap_googleads/tap.py @@ -17,6 +17,7 @@ CustomerHierarchyStream, GeoPerformance, GeotargetsStream, + ClickViewReportStream, ) STREAM_TYPES = [ @@ -66,8 +67,21 @@ class TapGoogleAds(Tap): th.StringType, description="Value to use in the login-customer-id header, if different from the customer_id to sync. Useful if you are syncing using a manager account.", ), + th.Property( + "comma_separated_string_of_customer_ids", + th.StringType, + description="Overrides the taps default get all data for all available customers logic, and will get you the data for only the the provided customer_ids", + ), + th.Property( + "enable_click_view_report_stream", + th.BooleanType, + description="Enables the tap's ClickViewReportStream. This requires setting up / permission on your google ads account(s)", + ), ).to_dict() def discover_streams(self) -> List[Stream]: """Return a list of discovered streams.""" + if self.config.get("enable_click_view_report_stream"): + if self.config.get("enable_click_view_report_stream") == True: + STREAM_TYPES.append(ClickViewReportStream) return [stream_class(tap=self) for stream_class in STREAM_TYPES]