From d29db1a9c20f22b74582189dfdda2811108fb967 Mon Sep 17 00:00:00 2001 From: danielpdwalker Date: Mon, 29 Nov 2021 15:44:56 +0000 Subject: [PATCH] Fixed linting --- tap_googleads/client.py | 1 - tap_googleads/streams.py | 83 +++++++++++++++++++++++++--------------- 2 files changed, 52 insertions(+), 32 deletions(-) diff --git a/tap_googleads/client.py b/tap_googleads/client.py index 9139ea0..c140f2f 100644 --- a/tap_googleads/client.py +++ b/tap_googleads/client.py @@ -124,4 +124,3 @@ def end_date(self): date = parser.parse(self.config.get("end_date")) date = "'" + date.strftime("%Y-%m-%d") + "'" return date or self._end_date - diff --git a/tap_googleads/streams.py b/tap_googleads/streams.py index f46d711..5b72f0a 100644 --- a/tap_googleads/streams.py +++ b/tap_googleads/streams.py @@ -13,12 +13,14 @@ # TODO: - Override `UsersStream` and `GroupsStream` with your own stream definition. # - Copy-paste as many times as needed to create multiple stream types. + class CustomerStream(GoogleAdsStream): """Define custom stream.""" + @property def path(self): - return "/customers/"+self.config["customer_id"] - + return "/customers/" + self.config["customer_id"] + name = "stream_customers" replication_key = None schema_filepath = SCHEMAS_DIR / "customer.json" @@ -26,19 +28,21 @@ def path(self): class AccessibleCustomers(GoogleAdsStream): """Accessible Customers""" - path="/customers:listAccessibleCustomers" + + path = "/customers:listAccessibleCustomers" name = "stream_accessible_customers" primary_keys = None replication_key = None - #TODO add an assert for one record -# schema_filepath = SCHEMAS_DIR / "customer.json" + # TODO add an assert for one record + # schema_filepath = SCHEMAS_DIR / "customer.json" schema = th.PropertiesList( - th.Property("resourceNames", th.ArrayType(th.StringType)) - ).to_dict() + th.Property("resourceNames", th.ArrayType(th.StringType)) + ).to_dict() def get_child_context(self, record: dict, context: Optional[dict]) -> dict: """Return a context dictionary for child streams.""" - return { "resourceNames": ["customers/" + self.config.get("customer_id")] } + return {"resourceNames": ["customers/" + self.config.get("customer_id")]} + class CustomerHierarchyStream(GoogleAdsStream): """ @@ -49,18 +53,19 @@ class CustomerHierarchyStream(GoogleAdsStream): know when to query the down stream apps. """ - - #TODO add a seperate stream to get the Customer information and return i + + # TODO add a seperate stream to get the Customer information and return i rest_method = "POST" + @property def path(self): - #Paramas + # Paramas path = "/customers/{client_id}" path = path + "/googleAds:search" path = path + "?pageSize=10000" path = path + f"&query={self.gaql}" return path - + @property def gaql(self): return """ @@ -75,13 +80,16 @@ def gaql(self): FROM customer_client WHERE customer_client.level <= 1 """ + records_jsonpath = "$.results[*]" name = "stream_customer_hierarchy" replication_key = None parent_stream_type = AccessibleCustomers - #schema_filepath = SCHEMAS_DIR / "campaign.json" + # schema_filepath = SCHEMAS_DIR / "campaign.json" schema = th.PropertiesList( - th.Property("customerClient",th.ObjectType( + th.Property( + "customerClient", + th.ObjectType( th.Property("resourceName", th.StringType), th.Property("clientCustomer", th.StringType), th.Property("level", th.StringType), @@ -90,12 +98,12 @@ def gaql(self): th.Property("descriptiveName", th.StringType), th.Property("currencyCode", th.StringType), th.Property("id", th.StringType), - )) - ).to_dict() - + ), + ) + ).to_dict() - #Goal of this stream is to send to children stream a dict of - #login-customer-id:customer-id to query for all queries downstream + # Goal of this stream is to send to children stream a dict of + # login-customer-id:customer-id to query for all queries downstream def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]: """Return a generator of row-type dictionary objects. @@ -108,24 +116,27 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]: One item per (possibly processed) record in the API. """ - context["client_id"]=self.config.get("customer_id") + context["client_id"] = self.config.get("customer_id") for row in self.request_records(context): row = self.post_process(row, context) - #Don't search Manager accounts as we can't query them for everything - if (row["customerClient"]["manager"] == True): continue + # Don't search Manager accounts as we can't query them for everything + if row["customerClient"]["manager"] == True: + continue yield row - + def get_child_context(self, record: dict, context: Optional[dict]) -> dict: """Return a context dictionary for child streams.""" - return { "client_id":self.config.get("customer_id") } + return {"client_id": self.config.get("customer_id")} + class GeotargetsStream(GoogleAdsStream): """Geotargets, worldwide, constant across all customers""" + rest_method = "POST" - + @property def path(self): - #Paramas + # Paramas path = f"/customers/{self.config.get('customer_id')}" path = path + "/googleAds:search" path = path + "?pageSize=10000" @@ -139,18 +150,20 @@ def path(self): name = "stream_geo_target_constant" replication_key = None schema_filepath = SCHEMAS_DIR / "geo_target_constant.json" - parent_stream_type = None #Override ReportsStream default as this is a constant + parent_stream_type = None # Override ReportsStream default as this is a constant + class ReportsStream(GoogleAdsStream): rest_method = "POST" parent_stream_type = CustomerHierarchyStream + @property def gaql(self): raise NotImplementedError - + @property def path(self): - #Paramas + # Paramas path = "/customers/{client_id}" path = path + "/googleAds:search" path = path + "?pageSize=10000" @@ -158,21 +171,24 @@ def path(self): return path - class CampaignsStream(ReportsStream): """Define custom stream.""" + @property def gaql(self): return """ SELECT campaign.id, campaign.name FROM campaign ORDER BY campaign.id """ + records_jsonpath = "$.results[*]" name = "stream_campaign" replication_key = None schema_filepath = SCHEMAS_DIR / "campaign.json" + class AdGroupsStream(ReportsStream): """Define custom stream.""" + @property def gaql(self): return """ @@ -205,11 +221,13 @@ def gaql(self): ad_group.ad_rotation_mode FROM ad_group """ + records_jsonpath = "$.results[*]" name = "stream_adgroups" replication_key = None schema_filepath = SCHEMAS_DIR / "ad_group.json" + class AdGroupsPerformance(ReportsStream): """AdGroups Performance""" @@ -227,6 +245,7 @@ def gaql(self): replication_key = None schema_filepath = SCHEMAS_DIR / "adgroups_performance.json" + class CampaignPerformance(ReportsStream): """Campaign Performance""" @@ -241,6 +260,7 @@ def gaql(self): replication_key = None schema_filepath = SCHEMAS_DIR / "campaign_performance.json" + class CampaignPerformanceByAgeRangeAndDevice(ReportsStream): """Campaign Performance By Age Range and Device""" @@ -255,6 +275,7 @@ def gaql(self): replication_key = None schema_filepath = SCHEMAS_DIR / "campaign_performance_by_age_range_and_device.json" + class CampaignPerformanceByGenderAndDevice(ReportsStream): """Campaign Performance By Age Range and Device""" @@ -269,6 +290,7 @@ def gaql(self): replication_key = None schema_filepath = SCHEMAS_DIR / "campaign_performance_by_gender_and_device.json" + class CampaignPerformanceByLocation(ReportsStream): """Campaign Performance By Age Range and Device""" @@ -282,4 +304,3 @@ def gaql(self): name = "stream_campaign_performance_by_location" replication_key = None schema_filepath = SCHEMAS_DIR / "campaign_performance_by_location.json" -