Skip to content

Commit

Permalink
Fixed linting
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielPDWalker committed Nov 29, 2021
1 parent de4b9ac commit d29db1a
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 32 deletions.
1 change: 0 additions & 1 deletion tap_googleads/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,3 @@ def end_date(self):
date = parser.parse(self.config.get("end_date"))
date = "'" + date.strftime("%Y-%m-%d") + "'"
return date or self._end_date

83 changes: 52 additions & 31 deletions tap_googleads/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,36 @@
# TODO: - Override `UsersStream` and `GroupsStream` with your own stream definition.
# - Copy-paste as many times as needed to create multiple stream types.


class CustomerStream(GoogleAdsStream):
"""Define custom stream."""

@property
def path(self):
return "/customers/"+self.config["customer_id"]
return "/customers/" + self.config["customer_id"]

name = "stream_customers"
replication_key = None
schema_filepath = SCHEMAS_DIR / "customer.json"


class AccessibleCustomers(GoogleAdsStream):
"""Accessible Customers"""
path="/customers:listAccessibleCustomers"

path = "/customers:listAccessibleCustomers"
name = "stream_accessible_customers"
primary_keys = None
replication_key = None
#TODO add an assert for one record
# schema_filepath = SCHEMAS_DIR / "customer.json"
# TODO add an assert for one record
# schema_filepath = SCHEMAS_DIR / "customer.json"
schema = th.PropertiesList(
th.Property("resourceNames", th.ArrayType(th.StringType))
).to_dict()
th.Property("resourceNames", th.ArrayType(th.StringType))
).to_dict()

def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
"""Return a context dictionary for child streams."""
return { "resourceNames": ["customers/" + self.config.get("customer_id")] }
return {"resourceNames": ["customers/" + self.config.get("customer_id")]}


class CustomerHierarchyStream(GoogleAdsStream):
"""
Expand All @@ -49,18 +53,19 @@ class CustomerHierarchyStream(GoogleAdsStream):
know when to query the down stream apps.
"""
#TODO add a seperate stream to get the Customer information and return i

# TODO add a seperate stream to get the Customer information and return i
rest_method = "POST"

@property
def path(self):
#Paramas
# Paramas
path = "/customers/{client_id}"
path = path + "/googleAds:search"
path = path + "?pageSize=10000"
path = path + f"&query={self.gaql}"
return path

@property
def gaql(self):
return """
Expand All @@ -75,13 +80,16 @@ def gaql(self):
FROM customer_client
WHERE customer_client.level <= 1
"""

records_jsonpath = "$.results[*]"
name = "stream_customer_hierarchy"
replication_key = None
parent_stream_type = AccessibleCustomers
#schema_filepath = SCHEMAS_DIR / "campaign.json"
# schema_filepath = SCHEMAS_DIR / "campaign.json"
schema = th.PropertiesList(
th.Property("customerClient",th.ObjectType(
th.Property(
"customerClient",
th.ObjectType(
th.Property("resourceName", th.StringType),
th.Property("clientCustomer", th.StringType),
th.Property("level", th.StringType),
Expand All @@ -90,12 +98,12 @@ def gaql(self):
th.Property("descriptiveName", th.StringType),
th.Property("currencyCode", th.StringType),
th.Property("id", th.StringType),
))
).to_dict()

),
)
).to_dict()

#Goal of this stream is to send to children stream a dict of
#login-customer-id:customer-id to query for all queries downstream
# Goal of this stream is to send to children stream a dict of
# login-customer-id:customer-id to query for all queries downstream
def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
"""Return a generator of row-type dictionary objects.
Expand All @@ -108,24 +116,27 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
One item per (possibly processed) record in the API.
"""

context["client_id"]=self.config.get("customer_id")
context["client_id"] = self.config.get("customer_id")
for row in self.request_records(context):
row = self.post_process(row, context)
#Don't search Manager accounts as we can't query them for everything
if (row["customerClient"]["manager"] == True): continue
# Don't search Manager accounts as we can't query them for everything
if row["customerClient"]["manager"] == True:
continue
yield row

def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
"""Return a context dictionary for child streams."""
return { "client_id":self.config.get("customer_id") }
return {"client_id": self.config.get("customer_id")}


class GeotargetsStream(GoogleAdsStream):
"""Geotargets, worldwide, constant across all customers"""

rest_method = "POST"

@property
def path(self):
#Paramas
# Paramas
path = f"/customers/{self.config.get('customer_id')}"
path = path + "/googleAds:search"
path = path + "?pageSize=10000"
Expand All @@ -139,40 +150,45 @@ def path(self):
name = "stream_geo_target_constant"
replication_key = None
schema_filepath = SCHEMAS_DIR / "geo_target_constant.json"
parent_stream_type = None #Override ReportsStream default as this is a constant
parent_stream_type = None # Override ReportsStream default as this is a constant


class ReportsStream(GoogleAdsStream):
rest_method = "POST"
parent_stream_type = CustomerHierarchyStream

@property
def gaql(self):
raise NotImplementedError

@property
def path(self):
#Paramas
# Paramas
path = "/customers/{client_id}"
path = path + "/googleAds:search"
path = path + "?pageSize=10000"
path = path + f"&query={self.gaql}"
return path



class CampaignsStream(ReportsStream):
"""Define custom stream."""

@property
def gaql(self):
return """
SELECT campaign.id, campaign.name FROM campaign ORDER BY campaign.id
"""

records_jsonpath = "$.results[*]"
name = "stream_campaign"
replication_key = None
schema_filepath = SCHEMAS_DIR / "campaign.json"


class AdGroupsStream(ReportsStream):
"""Define custom stream."""

@property
def gaql(self):
return """
Expand Down Expand Up @@ -205,11 +221,13 @@ def gaql(self):
ad_group.ad_rotation_mode
FROM ad_group
"""

records_jsonpath = "$.results[*]"
name = "stream_adgroups"
replication_key = None
schema_filepath = SCHEMAS_DIR / "ad_group.json"


class AdGroupsPerformance(ReportsStream):
"""AdGroups Performance"""

Expand All @@ -227,6 +245,7 @@ def gaql(self):
replication_key = None
schema_filepath = SCHEMAS_DIR / "adgroups_performance.json"


class CampaignPerformance(ReportsStream):
"""Campaign Performance"""

Expand All @@ -241,6 +260,7 @@ def gaql(self):
replication_key = None
schema_filepath = SCHEMAS_DIR / "campaign_performance.json"


class CampaignPerformanceByAgeRangeAndDevice(ReportsStream):
"""Campaign Performance By Age Range and Device"""

Expand All @@ -255,6 +275,7 @@ def gaql(self):
replication_key = None
schema_filepath = SCHEMAS_DIR / "campaign_performance_by_age_range_and_device.json"


class CampaignPerformanceByGenderAndDevice(ReportsStream):
"""Campaign Performance By Age Range and Device"""

Expand All @@ -269,6 +290,7 @@ def gaql(self):
replication_key = None
schema_filepath = SCHEMAS_DIR / "campaign_performance_by_gender_and_device.json"


class CampaignPerformanceByLocation(ReportsStream):
"""Campaign Performance By Age Range and Device"""

Expand All @@ -282,4 +304,3 @@ def gaql(self):
name = "stream_campaign_performance_by_location"
replication_key = None
schema_filepath = SCHEMAS_DIR / "campaign_performance_by_location.json"

0 comments on commit d29db1a

Please sign in to comment.