Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Chunk tracks for audio features #30

Merged
merged 4 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion tap_spotify/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""REST client handling, including SpotifyStream base class."""

from typing import Optional
from typing import Iterable, Optional
from urllib.parse import ParseResult, parse_qsl

from memoization import cached
Expand All @@ -15,6 +15,7 @@ class SpotifyStream(RESTStream):

url_base = "https://api.spotify.com/v1"
records_jsonpath = "$.items[*]"
chunk_size = None

@property
@cached
Expand All @@ -27,3 +28,19 @@ def get_new_paginator(self):
def get_url_params(self, context, next_page_token: Optional[ParseResult]):
params = super().get_url_params(context, next_page_token)
return dict(parse_qsl(next_page_token.query)) if next_page_token else params

def chunk_records(self, records: Iterable[dict]):
if not self.chunk_size:
return [records]

chunk = []

for i, record in enumerate(records):
if i and not i % self.chunk_size:
yield list(chunk)
chunk.clear()

chunk.append(record)

if chunk:
yield list(chunk)
56 changes: 36 additions & 20 deletions tap_spotify/streams.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""Stream type classes for tap-spotify."""

from __future__ import annotations

from datetime import datetime
from typing import Iterable

from requests.models import Response as Response
from singer_sdk.streams.rest import RESTStream

from tap_spotify.client import SpotifyStream
Expand Down Expand Up @@ -39,41 +40,56 @@ def post_process(self, row, context):
return row


class _TracksStream(SpotifyStream):
"""Define a track stream."""

def get_records(self, context):
# get all track records
track_records = list(super().request_records(context))

# get all audio features records
# instantiate audio features stream inline and request records
audio_features_stream = _AudioFeaturesStream(self, track_records)
audio_features_records = audio_features_stream.request_records(context)

# merge track and audio features records
for track, audio_features in zip(track_records, audio_features_records):
# account for tracks with `null` audio features
row = {**(audio_features or {}), **track}
yield self.post_process(row, context)


class _AudioFeaturesStream(SpotifyStream):
"""Define an audio features stream."""

name = "_audio_features_stream"
path = "/audio-features"
records_jsonpath = "$.audio_features[*]"
schema = AudioFeaturesObject.schema
max_tracks = 100

def __init__(self, tracks_stream: _TracksStream, track_records: Iterable[dict]):
super().__init__(tracks_stream._tap)

total_tracks = len(track_records)

if total_tracks > self.max_tracks:
msg = f"Cannot get audio features for more than {self.max_tracks} tracks at a time: {total_tracks} requested"
raise ValueError(msg)

self._track_records = track_records

def get_url_params(self, *args, **kwargs):
return {"ids": ",".join([track["id"] for track in self._track_records])}


class _TracksStream(SpotifyStream):
"""Define a track stream."""

chunk_size = _AudioFeaturesStream.max_tracks

def get_records(self, context):
# chunk all track records
track_records = super().request_records(context)
track_records_chunks = self.chunk_records(track_records)

for track_records_chunk in track_records_chunks:
# get audio features records
# instantiate audio features stream inline and request records
audio_features_stream = _AudioFeaturesStream(self, track_records_chunk)
audio_features_records = audio_features_stream.request_records(context)

# merge chunked track and audio features records
for track, audio_features in zip(
track_records_chunk,
audio_features_records,
):
# account for tracks with `null` audio features
row = {**(audio_features or {}), **track}
yield self.post_process(row, context)


class _UserTopItemsStream(_RankStream, _SyncedAtStream, SpotifyStream):
"""Define user top items stream."""

Expand Down