Skip to content

Commit

Permalink
Use max tracks for audio features stream to define track stream chunk…
Browse files Browse the repository at this point in the history
… size
  • Loading branch information
ReubenFrankel committed Feb 26, 2024
1 parent 8bbe81f commit f065b39
Showing 1 changed file with 26 additions and 23 deletions.
49 changes: 26 additions & 23 deletions tap_spotify/streams.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Stream type classes for tap-spotify."""

from __future__ import annotations

from datetime import datetime
from typing import Iterable

Expand Down Expand Up @@ -39,29 +41,6 @@ def post_process(self, row, context):
return row


class _TracksStream(SpotifyStream):
"""Define a track stream."""

chunk_size = 100

def get_records(self, context):
# chunk all track records
track_records = super().request_records(context)
track_chunks = self.chunk_records(track_records)

for track_chunk in track_chunks:
# get audio features records
# instantiate audio features stream inline and request records
audio_features_stream = _AudioFeaturesStream(self, track_chunk)
audio_features_records = audio_features_stream.request_records(context)

# merge chunked track and audio features records
for track, audio_features in zip(track_records, audio_features_records):
# account for tracks with `null` audio features
row = {**(audio_features or {}), **track}
yield self.post_process(row, context)


class _AudioFeaturesStream(SpotifyStream):
"""Define an audio features stream."""

Expand All @@ -86,6 +65,29 @@ def get_url_params(self, *args, **kwargs):
return {"ids": ",".join([track["id"] for track in self._track_records])}


class _TracksStream(SpotifyStream):
"""Define a track stream."""

chunk_size = _AudioFeaturesStream.max_tracks

def get_records(self, context):
# chunk all track records
track_records = super().request_records(context)
track_chunks = self.chunk_records(track_records)

for track_chunk in track_chunks:
# get audio features records
# instantiate audio features stream inline and request records
audio_features_stream = _AudioFeaturesStream(self, track_chunk)
audio_features_records = audio_features_stream.request_records(context)

# merge chunked track and audio features records
for track, audio_features in zip(track_records, audio_features_records):
# account for tracks with `null` audio features
row = {**(audio_features or {}), **track}
yield self.post_process(row, context)


class _UserTopItemsStream(_RankStream, _SyncedAtStream, SpotifyStream):
"""Define user top items stream."""

Expand Down Expand Up @@ -225,3 +227,4 @@ class UserSavedTracksStream(_SyncedAtStream, SpotifyStream):
limit = 50
schema = TrackObject.extend_with(SyncedAt).schema
records_jsonpath = "$.items[*].track"
records_jsonpath = "$.items[*].track"

0 comments on commit f065b39

Please sign in to comment.