Skip to content

Commit

Permalink
flattened records further
Browse files Browse the repository at this point in the history
  • Loading branch information
jlloyd-widen committed Mar 8, 2024
1 parent e101cc0 commit ae1b210
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 102 deletions.
40 changes: 18 additions & 22 deletions tap_clari/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

from __future__ import annotations

import typing as t
from typing import Any, Callable

import requests
from singer_sdk.authenticators import APIKeyAuthenticator
from singer_sdk.helpers.jsonpath import extract_jsonpath
from singer_sdk.streams import RESTStream

_Auth = Callable[[requests.PreparedRequest], requests.PreparedRequest]
Expand Down Expand Up @@ -74,24 +76,22 @@ def get_url_params(
"exportFormat": "JSON",
}

def post_process(
self,
row: dict,
context: dict | None = None, # noqa: ARG002
) -> dict | None:
"""Append or transform raw data to match expected structure.
def parse_response(self, response: requests.Response) -> t.Iterable[dict]:
"""Parse the response and return an iterator of result records.
Args:
row: Individual record in the stream.
context: Stream partition or context dictionary.
response: A raw :class:`requests.Response`
Returns:
The resulting record dict, or `None` if the record should be excluded.
Yields:
One item for every item found in the response.
"""
return flatten_record(row)
record = extract_jsonpath(self.records_jsonpath, input=response.json())
yield from flatten_record(next(record))


def get_list_item_values(source_list: list, target_keys: list[str], search_pair: dict) -> dict:
def get_list_item_values(
source_list: list, target_keys: list[str], search_pair: dict
) -> dict:
"""Return target items from a dict with a specific key value pair from an array of dicts."""
if len(search_pair) > 1:
raise ValueError("pair must be a dictionary with a single key value pair")
Expand All @@ -100,7 +100,7 @@ def get_list_item_values(source_list: list, target_keys: list[str], search_pair:
return {k: v for k, v in target_dict.items() if k in target_keys}


def flatten_record(row: dict) -> dict:
def flatten_record(row: dict) -> list:
"""Flatten a nested dictionary."""
entries = row.get("entries", [])
fields = row.get("fields", [])
Expand All @@ -111,19 +111,15 @@ def flatten_record(row: dict) -> dict:
new_entries = []
for entry in entries:
field = get_list_item_values(
fields,
["fieldName"],
{"fieldId": entry["fieldId"]}
fields, ["fieldName"], {"fieldId": entry["fieldId"]}
)
time_frame = get_list_item_values(
time_frames,
["startDate", "endDate"],
{"timeFrameId": entry["timeFrameId"]}
time_frames, ["startDate", "endDate"], {"timeFrameId": entry["timeFrameId"]}
)
time_period = get_list_item_values(
time_periods,
["type", "label", "year", "startDate", "endDate", "crmId"],
{"timePeriodId": entry["timePeriodId"]}
{"timePeriodId": entry["timePeriodId"]},
)
user = get_list_item_values(
users,
Expand All @@ -137,7 +133,7 @@ def flatten_record(row: dict) -> dict:
"parentHierarchyId",
"parentHierarchyName",
],
{"userId": entry["userId"]}
{"userId": entry["userId"]},
)

# prevent key clashes
Expand All @@ -155,4 +151,4 @@ def flatten_record(row: dict) -> dict:
# merge dictionaries
new_entries.append({**entry, **field, **time_frame, **time_period, **user})

return {"entries": new_entries}
return new_entries
60 changes: 35 additions & 25 deletions tap_clari/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,39 @@ def __init__(self, tap, forecast_id: str):
primary_keys: t.ClassVar[list[str]] = ["entries"]
replication_key = None
schema = th.PropertiesList(
th.Property("entries", th.ArrayType(th.ObjectType(
th.Property("fieldId", th.StringType),
th.Property("quotaValue", th.NumberType),
th.Property("forecastValue", th.NumberType),
th.Property("timeFrameId", th.StringType),
th.Property("timePeriodId", th.StringType),
th.Property("userId", th.StringType),
th.Property("fieldName", th.StringType),
th.Property("timeFrameEndDate", th.DateType),
th.Property("timeFrameStartDate", th.DateType),
th.Property("timePeriodCrmId", th.StringType),
th.Property("timePeriodEndDate", th.DateType),
th.Property("timePeriodLabel", th.StringType),
th.Property("timePeriodStartDate", th.DateType),
th.Property("timePeriodType", th.StringType),
th.Property("year", th.StringType),
th.Property("userCrmId", th.StringType),
th.Property("userEmail", th.EmailType),
th.Property("hierarchyId", th.StringType),
th.Property("hierarchyName", th.StringType),
th.Property("userName", th.StringType),
th.Property("parentHierarchyId", th.StringType),
th.Property("parentHierarchyName", th.StringType),
th.Property("scopeId", th.StringType),
))),
th.Property("adjustmentValue", th.NumberType),
th.Property("adjustedBy", th.StringType),
th.Property("adjustedOn", th.NumberType),
th.Property("aggregationTotal", th.NumberType),
th.Property("fieldId", th.StringType),
th.Property(
"currency",
th.ObjectType(
th.Property("code", th.StringType),
th.Property("symbol", th.StringType),
),
),
th.Property("quotaValue", th.NumberType),
th.Property("isUpdated", th.BooleanType),
th.Property("forecastValue", th.NumberType),
th.Property("timeFrameId", th.StringType),
th.Property("timePeriodId", th.StringType),
th.Property("userId", th.StringType),
th.Property("fieldName", th.StringType),
th.Property("timeFrameEndDate", th.DateType),
th.Property("timeFrameStartDate", th.DateType),
th.Property("timePeriodCrmId", th.StringType),
th.Property("timePeriodEndDate", th.DateType),
th.Property("timePeriodLabel", th.StringType),
th.Property("timePeriodStartDate", th.DateType),
th.Property("timePeriodType", th.StringType),
th.Property("year", th.StringType),
th.Property("userCrmId", th.StringType),
th.Property("userEmail", th.EmailType),
th.Property("hierarchyId", th.StringType),
th.Property("hierarchyName", th.StringType),
th.Property("userName", th.StringType),
th.Property("parentHierarchyId", th.StringType),
th.Property("parentHierarchyName", th.StringType),
th.Property("scopeId", th.StringType),
).to_dict()
6 changes: 3 additions & 3 deletions tap_clari/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ class TapClari(Tap):
th.StringType,
required=False,
description="Fiscal Quarter for when you'd like to run your export. Must "
"be passed in as a string (e.g. 'YYYY_QQ'). Defaults to the "
"current quarter.",
"be passed in as a string (e.g. 'YYYY_QQ'). Defaults to the "
"current quarter.",
),
th.Property(
"forecast_ids",
th.ArrayType(th.StringType),
required=True,
description="An array of IDs of the Forecast Tabs you would like to "
"export data from.",
"export data from.",
),
).to_dict()

Expand Down
102 changes: 50 additions & 52 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,55 +101,53 @@ def test_flatten_record():

}
res = flatten_record(record)
assert res == {
"entries": [
{
'fieldId': 'field_id_1',
'fieldName': 'field_name_1',
'quotaValue': None,
'timeFrameId': 'TF:2024-03-01',
'timeFrameStartDate': '2024-03-01',
'timeFrameEndDate': '2024-03-07',
'timePeriodCrmId': 'crm_id_3',
'timePeriodType': 'quarter',
'timePeriodLabel': 'Q1',
'year': '2024',
'timePeriodStartDate': '2024-01-01',
'timePeriodEndDate': '2024-03-31',
'timePeriodId': '2024_Q1',
'userId': 'user_id_1',
'userName': 'name_1',
'userEmail': '[email protected]',
'scopeId': '{"type":"blah"}',
'userCrmId': 'crm_id_1',
'hierarchyId': 'bar',
'hierarchyName': 'spam',
'parentHierarchyId': 'foo',
'parentHierarchyName': 'eggs',
},
{
'fieldId': 'field_id_2',
'fieldName': 'field_name_2',
'quotaValue': None,
'timeFrameId': 'TF:2024-03-01',
'timeFrameStartDate': '2024-03-01',
'timeFrameEndDate': '2024-03-07',
'timePeriodCrmId': 'crm_id_3',
'timePeriodType': 'quarter',
'timePeriodLabel': 'Q1',
'year': '2024',
'timePeriodStartDate': '2024-01-01',
'timePeriodEndDate': '2024-03-31',
'timePeriodId': '2024_Q1',
'userId': 'user_id_1',
'userName': 'name_1',
'userEmail': '[email protected]',
'scopeId': '{"type":"blah"}',
'userCrmId': 'crm_id_1',
'hierarchyId': 'bar',
'hierarchyName': 'spam',
'parentHierarchyId': 'foo',
'parentHierarchyName': 'eggs',
},
]
}
assert res == [
{
'fieldId': 'field_id_1',
'fieldName': 'field_name_1',
'quotaValue': None,
'timeFrameId': 'TF:2024-03-01',
'timeFrameStartDate': '2024-03-01',
'timeFrameEndDate': '2024-03-07',
'timePeriodCrmId': 'crm_id_3',
'timePeriodType': 'quarter',
'timePeriodLabel': 'Q1',
'year': '2024',
'timePeriodStartDate': '2024-01-01',
'timePeriodEndDate': '2024-03-31',
'timePeriodId': '2024_Q1',
'userId': 'user_id_1',
'userName': 'name_1',
'userEmail': '[email protected]',
'scopeId': '{"type":"blah"}',
'userCrmId': 'crm_id_1',
'hierarchyId': 'bar',
'hierarchyName': 'spam',
'parentHierarchyId': 'foo',
'parentHierarchyName': 'eggs',
},
{
'fieldId': 'field_id_2',
'fieldName': 'field_name_2',
'quotaValue': None,
'timeFrameId': 'TF:2024-03-01',
'timeFrameStartDate': '2024-03-01',
'timeFrameEndDate': '2024-03-07',
'timePeriodCrmId': 'crm_id_3',
'timePeriodType': 'quarter',
'timePeriodLabel': 'Q1',
'year': '2024',
'timePeriodStartDate': '2024-01-01',
'timePeriodEndDate': '2024-03-31',
'timePeriodId': '2024_Q1',
'userId': 'user_id_1',
'userName': 'name_1',
'userEmail': '[email protected]',
'scopeId': '{"type":"blah"}',
'userCrmId': 'crm_id_1',
'hierarchyId': 'bar',
'hierarchyName': 'spam',
'parentHierarchyId': 'foo',
'parentHierarchyName': 'eggs',
},
]

0 comments on commit ae1b210

Please sign in to comment.