-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspoetify.py
126 lines (106 loc) · 4.06 KB
/
spoetify.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import gevent
from gevent import monkey; monkey.patch_socket()
from gevent.queue import Queue
from networkx import DiGraph, shortest_path
from util import pairwise, process_text, term_conditioner, gevent_throttle
import requests
import requests_cache
# request-level cache using HTTP Cache Control
requests_cache.configure('spotify_cache')
# term-level cache
_Q_CACHE = {}
def search_track_url(q, page):
params = dict(q=q.encode('UTF-8'), page=page)
return requests.Request("http://ws.spotify.com/search/1/track.json",
params=params).full_url
# Spotify MetaAPI dictates max rate of 10 requests per second
throttle = gevent_throttle(10)(lambda:None)
# TODO timeout and exception handling
# TODO cache only useful stuff | if the track name is a substring of a poem
# TODO there could be multiple title matches - perhaps, should
# make them all available - maybe even tweak by popularity or genre
def search_track(q, page=1):
q = term_conditioner(q)
has_tracks = False
if not _Q_CACHE.has_key(q):
url = search_track_url(q, page)
# decide if we need throttle depending on HTTP Cache vs API
if not requests_cache.has_url(url):
throttle()
response = requests.get(url).json
tracks = response['tracks']
has_tracks = bool(tracks)
for track in tracks:
name = track['name'].lower()
_Q_CACHE[name] = track
exact_match = _Q_CACHE.get(q)
return has_tracks, exact_match
def slicer(sequence, start):
"""Returns a generator of incremental :slice: objects to a sequence from a
'start' index.
"""
for stop in range(start, len(sequence)):
yield slice(start, stop + 1)
def _search_boss(graph, words, n):
"""Fetch results for incremental slices and decide whether or not to
continue. If a smaller slice from a particular position can't be found,
there's no sense in trying larger slices.
If there's an exact match for the substring, an edge is added to the graph.
"""
for s in slicer(words, n):
q = ' '.join(words[s])
g = gevent.spawn(search_track, q)
g.join()
has_results, exact_match = g.value
if not (has_results or exact_match):
break
elif exact_match:
graph.add_edge(s.start, s.stop, track=exact_match)
def _build_graph(words):
"""Returns a graph of substrings.
Nodes are indices of super string array (implicit).
Edges are substring matches (explicit).
Spawns parallel 'fetchers' for every word which handle incremental slices.
Example: ['do', 're', 'mi', 'fa', 'sol'],
do re mi fa sol
do re re mi mi fa fa sol
do re mi re mi fa mi fa sol
do re mi fa re mi fa sol
do re mi fa sol
"""
graph = DiGraph()
gevent.joinall([gevent.spawn(_search_boss, graph, words, n) for n in range(len(words))])
return graph
# TODO better exception handling
# TODO logging
def _build_playlist(graph, words):
playlist = []
if len(graph) > 0:
try:
path = shortest_path(graph, 0, len(words))
except:
raise SystemExit
for start, stop in pairwise(path):
track = graph.edge[start][stop]['track']
playlist.append((track['name'], track['artists'][0]['name'],
track['href']))
return playlist
# TODO turn this thing into a webapp :)
def _print_playlist(playlist):
for track in playlist:
print "{!s:<20} by {!s:<30} {!s:<30}".format(*track)
# TODO improve text conditioning
def spoetify(text):
words = input_string.split()
graph = _build_graph(words)
playlist = _build_playlist(graph, words)
if not playlist:
raise SystemExit
_print_playlist(playlist)
if __name__ == '__main__':
import fileinput
input_parts = []
for line in fileinput.input():
input_parts.append(process_text(line))
input_string = u' '.join(input_parts)
spoetify(input_string)