forked from rdio/playlist-import-example
-
Notifications
You must be signed in to change notification settings - Fork 0
/
playlistcreator.py
148 lines (125 loc) · 5.12 KB
/
playlistcreator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
'''A Python class for creating and updating playlists based on track and artist names'''
import shelve, re, logging, json, time
from rdioapi import Rdio
def uniq(seq):
'''return non-duplicate items from a sequence, in order'''
u = []
for i in seq:
if i not in u: u.append(i)
return u
class Fuzzy(unicode):
'''a string where equality is defined as: edit distance as a percentage of the sum of the lengths of the inputs <= 25%'''
def __eq__(self, other):
from levenshtein_distance import levenshtein_distance as distance
d = distance(self.lower(), other.lower())
return int(100 * float(d) / len(self+other)) <= 25
class Term(unicode):
'''a string that knows about fuzzy matching and simple transforms'''
PAREN_RE = re.compile(r'\([^)]*\)') # remove text in parens
FEATURE_RE = re.compile(r' (&|Feat\.|feat\.) .*') # remove & / Feat. / feat.
@property
def forms(self):
return (self,
Term.PAREN_RE.sub('', self), Term.FEATURE_RE.sub('', self),
self.replace('!', ' '), # for Wakey Wakey!
)
def __eq__(self, other):
fuzz = Fuzzy(other)
return any((fuzz == f for f in self.forms))
class PlaylistCreator(object):
def __init__(self):
self.oauth_state = shelve.open('oauth_state')
self.found_tracks = shelve.open('found_tracks')
def __del__(self):
self.oauth_state.close()
self.found_tracks.close()
__cached_rdio = None
@property
def rdio(self):
if self.__cached_rdio is None:
self.__cached_rdio = Rdio('7v2443fffahpt4fazmmh3hx7', '2nzyX96YAu', self.oauth_state)
return self.__cached_rdio
@property
def authenticated(self):
if not self.rdio.authenticated:
return False
try:
return self.rdio.currentUser() is not None
except BaseException, e:
self.rdio.logout()
return False
def authenticate(self):
# let's clear our old auth state
for k in self.oauth_state.keys():
del self.oauth_state[k]
self.__cached_rdio = None
# do a PIN based auth
import webbrowser
webbrowser.open(self.rdio.begin_authentication('oob'))
verifier = raw_input('Enter the PIN from the Rdio site: ').strip()
self.rdio.complete_authentication(verifier)
def find_track(self, artist, title):
'''try to find a track but apply various transfomations'''
artist = Term(artist)
title = Term(title)
# for each of the forms, search...
for a, t in uniq(zip(artist.forms, title.forms)):
# query the API
q = ('%s %s' % (a, t)).encode('utf-8')
result = self.rdio.search(query=q, types='Track', never_or=True)
# if there were no results then the search failed
if not result['track_count']:
logging.warning(' rdio.search failed for: '+q)
continue
# look through the results for a good match
for track in result['results']:
if artist == track['artist'] and \
title == track['name']:
return track
# none found
logging.warning('rdio.search succeeded but match failed: '+q)
return None
def make_playlist(self, name, desc, tracks):
'''make or update a playlist named @name, with a description @desc, with the tracks specified in @tracks, a list of (artistname, trackname) pairs'''
tracks_meta = []
for artistname, trackname in tracks:
key = json.dumps((artistname, trackname)).encode('utf-8')
logging.info('Looking for: %s' % key)
if key in self.found_tracks:
logging.info(' found it in the cache: %s' % self.found_tracks[key]['key'])
tracks_meta.append(self.found_tracks[key])
else:
track_meta = self.find_track(artistname, trackname)
if track_meta is not None:
logging.info(' found it in on the site: %s' % track_meta['key'])
tracks_meta.append(track_meta)
self.found_tracks[key] = track_meta
else:
logging.info(' not found')
pass
logging.info('Found %d / %d tracks' % (len(tracks_meta), len(tracks)))
track_keys = [track['key'] for track in tracks_meta]
# ask the server for playlists
playlists = self.rdio.getPlaylists()
for playlist in playlists['owned']:
# look for a playlist with the right name
if playlist['name'] == name:
logging.info('Found the playlist')
# when we find it, remove all of those tracks...
playlist = self.rdio.get(keys=playlist['key'], extras='tracks')[playlist['key']]
keys = [t['key'] for t in playlist['tracks']]
self.rdio.removeFromPlaylist(playlist=playlist['key'],
index=0, count=playlist['length'],
tracks=','.join(keys))
# now add all of th tracks we just got
self.rdio.addToPlaylist(playlist=playlist['key'],
tracks=','.join(track_keys))
logging.info('Updated the playlist')
break
else:
# didn't find the playlist
# create it!
playlist = self.rdio.createPlaylist(name=name,
description=desc,
tracks=','.join(track_keys))
logging.info('Created the playlist')