Skip to content

Commit

Permalink
Added start_date and end_date to all stream queries that can use them
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielPDWalker committed Nov 29, 2021
1 parent 2078cf1 commit de4b9ac
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 26 deletions.
27 changes: 25 additions & 2 deletions tap_googleads/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from singer_sdk.helpers.jsonpath import extract_jsonpath
from singer_sdk.streams import RESTStream
from singer_sdk.authenticators import OAuthAuthenticator
from datetime import datetime, timedelta
from dateutil import parser

from tap_googleads.auth import GoogleAdsAuthenticator, ProxyGoogleAdsAuthenticator

Expand All @@ -28,6 +30,10 @@ class GoogleAdsStream(RESTStream):
next_page_token_jsonpath = "$.nextPageToken" # Or override `get_next_page_token`.
_LOG_REQUEST_METRIC_URLS: bool = True

_end_date = "'" + datetime.now().strftime("%Y-%m-%d") + "'"
_start_date = datetime.now() - timedelta(days=365)
_start_date = "'" + _start_date.strftime("%Y-%m-%d") + "'"

@property
@cached
def authenticator(self) -> OAuthAuthenticator:
Expand All @@ -41,7 +47,7 @@ def authenticator(self) -> OAuthAuthenticator:
auth_url = auth_url + f"&client_secret={self.config.get('client_secret')}"
auth_url = auth_url + "&grant_type=refresh_token"

oauth_credentials = self.config.get("oauth_credentials", {})
oauth_credentials = self.config.get("oauth_credentials.refresh_proxy_url", {})

if not oauth_credentials:
return GoogleAdsAuthenticator(stream=self, auth_endpoint=auth_url)
Expand Down Expand Up @@ -70,7 +76,7 @@ def http_headers(self) -> dict:
if "user_agent" in self.config:
headers["User-Agent"] = self.config.get("user_agent")
headers["developer-token"] = self.config["developer_token"]
headers["login-customer-id"] = self.config["login_customer_id"]
headers["login-customer-id"] = self.config["customer_id"]
return headers

def get_next_page_token(
Expand Down Expand Up @@ -102,3 +108,20 @@ def get_url_params(
params["sort"] = "asc"
params["order_by"] = self.replication_key
return params

@property
def start_date(self):
date = self.config.get("start_date")
if date:
date = parser.parse(self.config.get("start_date"))
date = "'" + date.strftime("%Y-%m-%d") + "'"
return date or self._start_date

@property
def end_date(self):
date = self.config.get("end_date")
if date:
date = parser.parse(self.config.get("end_date"))
date = "'" + date.strftime("%Y-%m-%d") + "'"
return date or self._end_date

64 changes: 40 additions & 24 deletions tap_googleads/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,21 @@
# TODO: - Override `UsersStream` and `GroupsStream` with your own stream definition.
# - Copy-paste as many times as needed to create multiple stream types.


class CustomerStream(GoogleAdsStream):
"""Define custom stream."""
@property
def path(self):
return "/customers/"+self.config["customer_id"]

name = "customers"
name = "stream_customers"
replication_key = None
schema_filepath = SCHEMAS_DIR / "customer.json"


class AccessibleCustomers(GoogleAdsStream):
"""Accessible Customers"""
path="/customers:listAccessibleCustomers"
name = "accessible_customers"
name = "stream_accessible_customers"
primary_keys = None
replication_key = None
#TODO add an assert for one record
Expand Down Expand Up @@ -76,7 +76,7 @@ def gaql(self):
WHERE customer_client.level <= 1
"""
records_jsonpath = "$.results[*]"
name = "customer_hierarchystream"
name = "stream_customer_hierarchy"
replication_key = None
parent_stream_type = AccessibleCustomers
#schema_filepath = SCHEMAS_DIR / "campaign.json"
Expand Down Expand Up @@ -108,7 +108,6 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
One item per (possibly processed) record in the API.
"""


context["client_id"]=self.config.get("customer_id")
for row in self.request_records(context):
row = self.post_process(row, context)
Expand All @@ -127,7 +126,7 @@ class GeotargetsStream(GoogleAdsStream):
@property
def path(self):
#Paramas
path = "/customers/{login_customer_id}"
path = f"/customers/{self.config.get('customer_id')}"
path = path + "/googleAds:search"
path = path + "?pageSize=10000"
path = path + f"&query={self.gaql}"
Expand All @@ -137,7 +136,7 @@ def path(self):
SELECT geo_target_constant.canonical_name, geo_target_constant.country_code, geo_target_constant.id, geo_target_constant.name, geo_target_constant.status, geo_target_constant.target_type FROM geo_target_constant
"""
records_jsonpath = "$.results[*]"
name = "geo_target_constant"
name = "stream_geo_target_constant"
replication_key = None
schema_filepath = SCHEMAS_DIR / "geo_target_constant.json"
parent_stream_type = None #Override ReportsStream default as this is a constant
Expand All @@ -157,6 +156,8 @@ def path(self):
path = path + "?pageSize=10000"
path = path + f"&query={self.gaql}"
return path



class CampaignsStream(ReportsStream):
"""Define custom stream."""
Expand All @@ -166,7 +167,7 @@ def gaql(self):
SELECT campaign.id, campaign.name FROM campaign ORDER BY campaign.id
"""
records_jsonpath = "$.results[*]"
name = "campaign"
name = "stream_campaign"
replication_key = None
schema_filepath = SCHEMAS_DIR / "campaign.json"

Expand Down Expand Up @@ -205,65 +206,80 @@ def gaql(self):
FROM ad_group
"""
records_jsonpath = "$.results[*]"
name = "adgroups"
name = "stream_adgroups"
replication_key = None
schema_filepath = SCHEMAS_DIR / "ad_group.json"

class AdGroupsPerformance(ReportsStream):
"""AdGroups Performance"""

gaql = """
@property
def gaql(self):
return f"""
SELECT campaign.id, ad_group.id, metrics.impressions, metrics.clicks,
metrics.cost_micros
FROM ad_group
WHERE segments.date DURING LAST_7_DAYS
WHERE segments.date >= {self.start_date} and segments.date <= {self.end_date}
"""

records_jsonpath = "$.results[*]"
name = "adgroupsperformance"
name = "stream_adgroupsperformance"
replication_key = None
schema_filepath = SCHEMAS_DIR / "adgroups_performance.json"

class CampaignPerformance(ReportsStream):
"""Campaign Performance"""

gaql = """
SELECT campaign.name, campaign.status, segments.device, segments.date, metrics.impressions, metrics.clicks, metrics.ctr, metrics.average_cpc, metrics.cost_micros FROM campaign WHERE segments.date DURING LAST_7_DAYS
@property
def gaql(self):
return f"""
SELECT campaign.name, campaign.status, segments.device, segments.date, metrics.impressions, metrics.clicks, metrics.ctr, metrics.average_cpc, metrics.cost_micros FROM campaign WHERE segments.date >= {self.start_date} and segments.date <= {self.end_date}
"""

records_jsonpath = "$.results[*]"
name = "campaign_performance"
name = "stream_campaign_performance"
replication_key = None
schema_filepath = SCHEMAS_DIR / "campaign_performance.json"

class CampaignPerformanceByAgeRangeAndDevice(ReportsStream):
"""Campaign Performance By Age Range and Device"""

gaql = """
SELECT ad_group_criterion.age_range.type, campaign.name, campaign.status, ad_group.name, segments.date, segments.device, ad_group_criterion.system_serving_status, ad_group_criterion.bid_modifier, metrics.clicks, metrics.impressions, metrics.ctr, metrics.average_cpc, metrics.cost_micros, campaign.advertising_channel_type FROM age_range_view WHERE segments.date DURING LAST_7_DAYS
@property
def gaql(self):
return f"""
SELECT ad_group_criterion.age_range.type, campaign.name, campaign.status, ad_group.name, segments.date, segments.device, ad_group_criterion.system_serving_status, ad_group_criterion.bid_modifier, metrics.clicks, metrics.impressions, metrics.ctr, metrics.average_cpc, metrics.cost_micros, campaign.advertising_channel_type FROM age_range_view WHERE segments.date >= {self.start_date} and segments.date <= {self.end_date}
"""

records_jsonpath = "$.results[*]"
name = "campaign_performance_by_age_range_and_device"
name = "stream_campaign_performance_by_age_range_and_device"
replication_key = None
schema_filepath = SCHEMAS_DIR / "campaign_performance_by_age_range_and_device.json"

class CampaignPerformanceByGenderAndDevice(ReportsStream):
"""Campaign Performance By Age Range and Device"""

gaql = """
SELECT ad_group_criterion.gender.type, campaign.name, campaign.status, ad_group.name, segments.date, segments.device, ad_group_criterion.system_serving_status, ad_group_criterion.bid_modifier, metrics.clicks, metrics.impressions, metrics.ctr, metrics.average_cpc, metrics.cost_micros, campaign.advertising_channel_type FROM gender_view WHERE segments.date DURING LAST_7_DAYS
@property
def gaql(self):
return f"""
SELECT ad_group_criterion.gender.type, campaign.name, campaign.status, ad_group.name, segments.date, segments.device, ad_group_criterion.system_serving_status, ad_group_criterion.bid_modifier, metrics.clicks, metrics.impressions, metrics.ctr, metrics.average_cpc, metrics.cost_micros, campaign.advertising_channel_type FROM gender_view WHERE segments.date >= {self.start_date} and segments.date <= {self.end_date}
"""

records_jsonpath = "$.results[*]"
name = "campaign_performance_by_gender_and_device"
name = "stream_campaign_performance_by_gender_and_device"
replication_key = None
schema_filepath = SCHEMAS_DIR / "campaign_performance_by_gender_and_device.json"

class CampaignPerformanceByLocation(ReportsStream):
"""Campaign Performance By Age Range and Device"""

gaql = """
SELECT campaign_criterion.location.geo_target_constant, campaign.name, campaign_criterion.bid_modifier, segments.date, metrics.clicks, metrics.impressions, metrics.ctr, metrics.average_cpc, metrics.cost_micros FROM location_view WHERE segments.date DURING LAST_7_DAYS AND campaign_criterion.status != 'REMOVED'
@property
def gaql(self):
return f"""
SELECT campaign_criterion.location.geo_target_constant, campaign.name, campaign_criterion.bid_modifier, segments.date, metrics.clicks, metrics.impressions, metrics.ctr, metrics.average_cpc, metrics.cost_micros FROM location_view WHERE segments.date >= {self.start_date} and segments.date <= {self.end_date} AND campaign_criterion.status != 'REMOVED'
"""

records_jsonpath = "$.results[*]"
name = "campaign_performance_by_location"
name = "stream_campaign_performance_by_location"
replication_key = None
schema_filepath = SCHEMAS_DIR / "campaign_performance_by_location.json"

0 comments on commit de4b9ac

Please sign in to comment.