From 5ae215d800bfa70311b56d9fb5336f084543a2cf Mon Sep 17 00:00:00 2001 From: Reuben Frankel Date: Mon, 26 Feb 2024 04:12:13 +0000 Subject: [PATCH] Use max tracks for audio features stream to define track stream chunk size --- tap_spotify/streams.py | 51 +++++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/tap_spotify/streams.py b/tap_spotify/streams.py index 459e13b..a0a991d 100644 --- a/tap_spotify/streams.py +++ b/tap_spotify/streams.py @@ -1,5 +1,7 @@ """Stream type classes for tap-spotify.""" +from __future__ import annotations + from datetime import datetime from typing import Iterable @@ -39,29 +41,6 @@ def post_process(self, row, context): return row -class _TracksStream(SpotifyStream): - """Define a track stream.""" - - chunk_size = 100 - - def get_records(self, context): - # chunk all track records - track_records = super().request_records(context) - track_records_chunks = self.chunk_records(track_records) - - for track_records_chunk in track_records_chunks: - # get audio features records - # instantiate audio features stream inline and request records - audio_features_stream = _AudioFeaturesStream(self, track_records_chunk) - audio_features_records = audio_features_stream.request_records(context) - - # merge chunked track and audio features records - for track, audio_features in zip(track_records_chunk, audio_features_records): - # account for tracks with `null` audio features - row = {**(audio_features or {}), **track} - yield self.post_process(row, context) - - class _AudioFeaturesStream(SpotifyStream): """Define an audio features stream.""" @@ -86,6 +65,32 @@ def get_url_params(self, *args, **kwargs): return {"ids": ",".join([track["id"] for track in self._track_records])} +class _TracksStream(SpotifyStream): + """Define a track stream.""" + + chunk_size = _AudioFeaturesStream.max_tracks + + def get_records(self, context): + # chunk all track records + track_records = super().request_records(context) + track_records_chunks = self.chunk_records(track_records) + + for track_records_chunk in track_records_chunks: + # get audio features records + # instantiate audio features stream inline and request records + audio_features_stream = _AudioFeaturesStream(self, track_records_chunk) + audio_features_records = audio_features_stream.request_records(context) + + # merge chunked track and audio features records + for track, audio_features in zip( + track_records_chunk, + audio_features_records, + ): + # account for tracks with `null` audio features + row = {**(audio_features or {}), **track} + yield self.post_process(row, context) + + class _UserTopItemsStream(_RankStream, _SyncedAtStream, SpotifyStream): """Define user top items stream."""