Skip to content

Commit

Permalink
Use max tracks for audio features stream to define track stream chunk…
Browse files Browse the repository at this point in the history
… size
  • Loading branch information
ReubenFrankel committed Feb 26, 2024
1 parent d5c2769 commit 5ae215d
Showing 1 changed file with 28 additions and 23 deletions.
51 changes: 28 additions & 23 deletions tap_spotify/streams.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Stream type classes for tap-spotify."""

from __future__ import annotations

from datetime import datetime
from typing import Iterable

Expand Down Expand Up @@ -39,29 +41,6 @@ def post_process(self, row, context):
return row


class _TracksStream(SpotifyStream):
"""Define a track stream."""

chunk_size = 100

def get_records(self, context):
# chunk all track records
track_records = super().request_records(context)
track_records_chunks = self.chunk_records(track_records)

for track_records_chunk in track_records_chunks:
# get audio features records
# instantiate audio features stream inline and request records
audio_features_stream = _AudioFeaturesStream(self, track_records_chunk)
audio_features_records = audio_features_stream.request_records(context)

# merge chunked track and audio features records
for track, audio_features in zip(track_records_chunk, audio_features_records):
# account for tracks with `null` audio features
row = {**(audio_features or {}), **track}
yield self.post_process(row, context)


class _AudioFeaturesStream(SpotifyStream):
"""Define an audio features stream."""

Expand All @@ -86,6 +65,32 @@ def get_url_params(self, *args, **kwargs):
return {"ids": ",".join([track["id"] for track in self._track_records])}


class _TracksStream(SpotifyStream):
"""Define a track stream."""

chunk_size = _AudioFeaturesStream.max_tracks

def get_records(self, context):
# chunk all track records
track_records = super().request_records(context)
track_records_chunks = self.chunk_records(track_records)

for track_records_chunk in track_records_chunks:
# get audio features records
# instantiate audio features stream inline and request records
audio_features_stream = _AudioFeaturesStream(self, track_records_chunk)
audio_features_records = audio_features_stream.request_records(context)

# merge chunked track and audio features records
for track, audio_features in zip(
track_records_chunk,
audio_features_records,
):
# account for tracks with `null` audio features
row = {**(audio_features or {}), **track}
yield self.post_process(row, context)


class _UserTopItemsStream(_RankStream, _SyncedAtStream, SpotifyStream):
"""Define user top items stream."""

Expand Down

0 comments on commit 5ae215d

Please sign in to comment.