diff --git a/tap_freshservice/streams/asset_requests.py b/tap_freshservice/streams/asset_requests.py new file mode 100644 index 0000000..d44bcf1 --- /dev/null +++ b/tap_freshservice/streams/asset_requests.py @@ -0,0 +1,34 @@ +"""Stream type classes for tap-freshservice.""" +from singer_sdk import typing as th # JSON Schema typing helpers + +from tap_freshservice.client import FreshserviceStream +from tap_freshservice.streams.assets import AssetsStream + +class AssetRequestsStream(FreshserviceStream): + name = "asset_requests" + path = "/assets/{display_id}/requests" + records_jsonpath="$.requests[*]" + + def get_url(self, context: dict): + url = super().get_url(context) + return url + + def build_prepared_request(self, *args, **kwargs): + req = super().build_prepared_request(*args, **kwargs) + return req + + schema = th.PropertiesList( + th.Property("id", th.IntegerType), + th.Property("created_at", th.DateTimeType), + th.Property("updated_at", th.DateTimeType), + th.Property("request_type", th.StringType), + th.Property("request_id", th.StringType), + th.Property("request_details", th.StringType), + th.Property("request_status", th.StringType), + th.Property("workspace_id", th.IntegerType), + th.Property("display_id", th.IntegerType), + th.Property("asset_id", th.IntegerType) + ).to_dict() + + parent_stream_type = AssetsStream + ignore_parent_replication_key = True \ No newline at end of file diff --git a/tap_freshservice/streams/assets.py b/tap_freshservice/streams/assets.py index 88ed5b4..0304a82 100644 --- a/tap_freshservice/streams/assets.py +++ b/tap_freshservice/streams/assets.py @@ -1,6 +1,6 @@ """Stream type classes for tap-freshservice.""" from singer_sdk import typing as th # JSON Schema typing helpers - +from typing import Optional from tap_freshservice.client import FreshserviceStream class AssetsStream(FreshserviceStream): @@ -15,7 +15,14 @@ def get_url(self, context: dict): def build_prepared_request(self, *args, **kwargs): req = super().build_prepared_request(*args, **kwargs) return req - + + def get_child_context(self, record: dict, context: Optional[dict]) -> dict: + """Return a context dictionary for the child streams. + Refer to https://sdk.meltano.com/en/latest/parent_streams.html""" + return {"display_id": record["display_id"], + "asset_id": record["id"] + } + schema = th.PropertiesList( th.Property("id", th.IntegerType), th.Property("display_id", th.IntegerType), diff --git a/tap_freshservice/tap.py b/tap_freshservice/tap.py index 8bf07b2..1faa942 100644 --- a/tap_freshservice/tap.py +++ b/tap_freshservice/tap.py @@ -5,7 +5,7 @@ from singer_sdk import Tap from singer_sdk import typing as th # JSON schema typing helpers -from tap_freshservice.streams import (assets, asset_types, groups, tickets) +from tap_freshservice.streams import (assets, asset_types, asset_requests, groups, tickets) class TapFreshservice(Tap): @@ -47,6 +47,7 @@ def discover_streams(self) -> list[tickets.FreshserviceStream]: groups.GroupsStream(self), assets.AssetsStream(self), asset_types.AssetTypesStream(self), + asset_requests.AssetRequestsStream(self), ]