diff --git a/tap_spotify/client.py b/tap_spotify/client.py index ab47fd2..e76d2b9 100644 --- a/tap_spotify/client.py +++ b/tap_spotify/client.py @@ -1,6 +1,6 @@ """REST client handling, including SpotifyStream base class.""" -from typing import Optional +from typing import Iterable, Optional from urllib.parse import ParseResult, parse_qsl from memoization import cached @@ -15,6 +15,7 @@ class SpotifyStream(RESTStream): url_base = "https://api.spotify.com/v1" records_jsonpath = "$.items[*]" + chunk_size = None @property @cached @@ -27,3 +28,19 @@ def get_new_paginator(self): def get_url_params(self, context, next_page_token: Optional[ParseResult]): params = super().get_url_params(context, next_page_token) return dict(parse_qsl(next_page_token.query)) if next_page_token else params + + def chunk_records(self, records: Iterable[dict]): + if not self.chunk_size: + return [records] + + chunk = [] + + for i, record in enumerate(records): + if i and not i % self.chunk_size: + yield list(chunk) + chunk.clear() + + chunk.append(record) + + if chunk: + yield list(chunk) diff --git a/tap_spotify/streams.py b/tap_spotify/streams.py index 425b8b6..6bb99a6 100644 --- a/tap_spotify/streams.py +++ b/tap_spotify/streams.py @@ -1,9 +1,10 @@ """Stream type classes for tap-spotify.""" +from __future__ import annotations + from datetime import datetime from typing import Iterable -from requests.models import Response as Response from singer_sdk.streams.rest import RESTStream from tap_spotify.client import SpotifyStream @@ -39,25 +40,6 @@ def post_process(self, row, context): return row -class _TracksStream(SpotifyStream): - """Define a track stream.""" - - def get_records(self, context): - # get all track records - track_records = list(super().request_records(context)) - - # get all audio features records - # instantiate audio features stream inline and request records - audio_features_stream = _AudioFeaturesStream(self, track_records) - audio_features_records = audio_features_stream.request_records(context) - - # merge track and audio features records - for track, audio_features in zip(track_records, audio_features_records): - # account for tracks with `null` audio features - row = {**(audio_features or {}), **track} - yield self.post_process(row, context) - - class _AudioFeaturesStream(SpotifyStream): """Define an audio features stream.""" @@ -65,15 +47,49 @@ class _AudioFeaturesStream(SpotifyStream): path = "/audio-features" records_jsonpath = "$.audio_features[*]" schema = AudioFeaturesObject.schema + max_tracks = 100 def __init__(self, tracks_stream: _TracksStream, track_records: Iterable[dict]): super().__init__(tracks_stream._tap) + + total_tracks = len(track_records) + + if total_tracks > self.max_tracks: + msg = f"Cannot get audio features for more than {self.max_tracks} tracks at a time: {total_tracks} requested" + raise ValueError(msg) + self._track_records = track_records def get_url_params(self, *args, **kwargs): return {"ids": ",".join([track["id"] for track in self._track_records])} +class _TracksStream(SpotifyStream): + """Define a track stream.""" + + chunk_size = _AudioFeaturesStream.max_tracks + + def get_records(self, context): + # chunk all track records + track_records = super().request_records(context) + track_records_chunks = self.chunk_records(track_records) + + for track_records_chunk in track_records_chunks: + # get audio features records + # instantiate audio features stream inline and request records + audio_features_stream = _AudioFeaturesStream(self, track_records_chunk) + audio_features_records = audio_features_stream.request_records(context) + + # merge chunked track and audio features records + for track, audio_features in zip( + track_records_chunk, + audio_features_records, + ): + # account for tracks with `null` audio features + row = {**(audio_features or {}), **track} + yield self.post_process(row, context) + + class _UserTopItemsStream(_RankStream, _SyncedAtStream, SpotifyStream): """Define user top items stream."""