-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspotifilter.py
284 lines (238 loc) · 10.6 KB
/
spotifilter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
"""Spotifilter harnesses ChatGPT to effortlessly filter explicit tracks from your Spotify playlists,
ensuring a clean and family-friendly listening experience."""
import asyncio
import os
import re
import time
from typing import Final
import requests
import spotipy
from dotenv import load_dotenv
from lyricsgenius import Genius
from openai import OpenAI
from spotipy.oauth2 import SpotifyClientCredentials
from telegram import Update
from telegram.constants import ChatAction, ParseMode
from telegram.error import TelegramError
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters
# Load environment variables
load_dotenv()
GENIUS_API_KEY: Final = os.getenv("GENIUS_API_KEY")
OPENAI_API_KEY: Final = os.getenv("OPENAI_API_KEY")
BOT_TOKEN: Final = os.getenv("TELEGRAM_TOKEN")
BOT_USERNAME: Final = os.getenv("TELEGRAM_USERNAME")
BOT_POLLING_INTERVAL: Final = float(os.getenv("POLLING_INTERVAL", "0.0"))
def validate_playlist_id(playlist_id: str) -> bool:
"""Validate the given Spotify playlist ID / playlist full link"""
playlist_id_pattern = re.compile(
r'^(?:https://open\.spotify\.com/playlist/|spotify:playlist:)?([0-9a-zA-Z]+)(?:\?.*)?$'
)
return bool(playlist_id_pattern.match(playlist_id))
def get_playlist_info(playlist_id: str) -> tuple[bool, str, list]:
"""Return the playlist information."""
auth_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(auth_manager=auth_manager)
try:
playlist = sp.playlist(playlist_id)
return (
True,
"Looking for explicit content in playlist " \
f"'{playlist['name']}' by '{ playlist['owner']['display_name']}'... 🔍",
[ playlist["name"], playlist["owner"]["display_name"], playlist["tracks"]["total"], ]
)
except spotipy.SpotifyException as e:
if e.http_status == 404:
return False, \
"Playlist not found. Please check if the playlist ID / link is correct.", []
else:
return False, "An error occurred. Validate your input and retry, or run /report", []
def get_playlist_tracks(playlist_id):
"""Return playlist tracks."""
auth_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(auth_manager=auth_manager)
track_list = []
try:
playlist = sp.playlist(playlist_id)
for track in playlist["tracks"]["items"]:
track_list.append(
(track["track"]["name"], track["track"]["artists"][0]["name"])
)
return track_list
except spotipy.SpotifyException as e:
if e.http_status == 404:
print("Playlist not found. Please check if the playlist ID / link is correct.")
else:
print("An error occurred:", e)
return None
def get_song_lyrics(title, artist):
"""Return song lyrics."""
genius = Genius(GENIUS_API_KEY)
try:
song_lyrics = genius.search_song(title, artist)
if song_lyrics:
return song_lyrics.lyrics
except requests.exceptions.Timeout as e:
print("An error occurred:", e)
return None
def check_explicitly(title, artist, lyrics):
"""Check for explicit content in lyrics."""
client = OpenAI(api_key=OPENAI_API_KEY)
explicit_content = [
"violence",
"sex",
"drugs",
"alcoholism",
"addiction",
"smoking",
"profanity",
]
completion = client.chat.completions.create(
model="gpt-3.5-turbo-0125",
messages=[
{
"role": "system",
"content": "Your are a helpful assistant designed to determine if a track contains "
"explicit content which should be listed by line numbers.",
},
{
"role": "system",
"content": "For each track lyrics, go through these steps:\n"
"1. Check every line containing explicit content related to one or more "
f"topics: {explicit_content} with line number.\n"
"2. The header of your response should be:\nTITLE: 'title', ARTIST: "
"'artist', EXPLICIT: TRUE/FALSE, REASONS: [], EXAMPLES: \n"
"TRUE if contains explicit content, FALSE if not.\n"
"REASONS should contain why it's considered explicit, with one or "
f"more topics from this list: {explicit_content}\n."
"3. Provide only 2 examples of explicit lines with line numbers, at least one for each topic.\n"
"4. Censor profanity words in the output.\n"
},
{
"role": "user",
"content": "Song: 'Leftovers', Artist: 'Dennis Lloyd', Lyrics:\nI'm a drunk and I "
"will always be\nBegging, Baby, take my hand before I fall back down\n"
"Fuck, I'm about to lose it all\nFuck, I'm about to lose it all",
},
{
"role": "assistant",
"content": "TITLE: 'Leftovers', ARTIST: 'Dennis Lloyd', EXPLICIT: TRUE, REASONS: "
"['alcoholism', 'profanity'], EXAMPLES: \n"
"- Line 1 - I'm a drunk and I will always be\n"
"- Lines [3-4] - F***, I'm about to lose it all",
},
{
"role": "user",
"content": f"Song: '{title}', Artist: '{artist}', Lyrics:\n{lyrics}",
},
],
)
return (
completion.choices[0].message.content
+ "\n\n"
)
def parse_response(response):
escaped_chars = r'_[]()~`>#+-=|{}.!'
return re.sub(f'([{re.escape(escaped_chars)}])', r'\\\1', response)
def format_explicit_result(explicitly_result):
"""Format explicit content detection result for Telegram message."""
pattern = \
r"TITLE: '(.*?)', ARTIST: '(.*?)', EXPLICIT: (.*?), REASONS: \[(.*?)\], EXAMPLES: (.*)"
result = re.findall(pattern, explicitly_result, re.DOTALL)
if result:
title, artist, _, reasons, examples = result[0]
reasons = re.sub(r"'", "", reasons)
examples = re.sub(r'\*', r'\\*', examples)
message = str(f"🔞 *Title:* {title}\n"
f"🎤 *Artist:* {artist}\n"
f"📜 *Reasons:* {reasons}\n\n"
f"*Examples:* {examples}")
return message
return "ERROR"
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
await update.message.reply_text("Welcome to Spotifilter!")
async def send_typing_action(context, chat_id):
while True:
await context.bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
await asyncio.sleep(5)
async def filter_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not context.args:
await update.message.reply_text(\
"You need to provide a playlist ID / link after the /filter command.")
elif not validate_playlist_id(context.args[0]):
await update.message.reply_text("Playlist ID / link is invalid.")
else:
valid, message, _ = get_playlist_info(context.args[0])
await update.message.reply_text(message)
if valid:
typing_task = asyncio.create_task(send_typing_action(context, update.effective_chat.id))
loop = asyncio.get_event_loop()
report = await loop.run_in_executor(None, logic, context.args[0])
typing_task.cancel()
try:
if len(report) > 4096:
for x in range(0, len(report), 4096):
await update.message.reply_text(report[x:x+4096], parse_mode=ParseMode.MARKDOWN_V2)
else:
await update.message.reply_text(report, parse_mode=ParseMode.MARKDOWN_V2)
except TelegramError as e:
print(e)
await update.message.reply_text("An error occurred. Validate your input and retry, or run /report")
async def report_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
await update.message.reply_text("Please send any reports or bugs to us here: "
"https://github.com/doronkg/spotifilter/issues")
async def handle_unknown(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
await update.message.reply_text("I don't know what it is.")
async def handle_error(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
print(f"An error occurred in {update}: {context.error}")
def main():
app = Application.builder().token(BOT_TOKEN).concurrent_updates(True).build()
# Commands
app.add_handler(CommandHandler("start", start_command))
app.add_handler(CommandHandler("filter", filter_command))
app.add_handler(CommandHandler("report", report_command))
# Messages
app.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_unknown))
app.add_handler(MessageHandler(filters.COMMAND, handle_unknown))
# Errors
app.add_error_handler(handle_error)
# Run bot
print("Polling...")
app.run_polling(poll_interval=BOT_POLLING_INTERVAL)
def logic(playlist_id: str) -> str:
"""Track filtering starts here"""
status, _, result = get_playlist_info(playlist_id)
if status:
_, _, playlist_total = result
track_list = get_playlist_tracks(playlist_id)
explicit_counter = 0
explicit_tracks = []
unfetched_tracks = []
for i, track in enumerate(track_list, start=1):
print(f"\n({i}/{playlist_total})")
title, artist = track
song_lyrics = get_song_lyrics(title, artist)
if song_lyrics:
explicitly_result = check_explicitly(title, artist, song_lyrics)
if "EXPLICIT: TRUE" in explicitly_result:
explicit_counter += 1
explicit_tracks.append(format_explicit_result(explicitly_result))
time.sleep(3)
else:
unfetched_tracks.append(f"- {title}\n")
if explicit_counter == 0:
response = "\n\nPlaylist is valid, no explicit content was found ✔️\n"
else:
response = (
f"\n\n🔉🔉🔉\n"
f"Playlist contains {explicit_counter} explicit tracks "
f"and may not fit all audiences‼️\n"
)
for track in explicit_tracks:
response += "\n" + track
if len(unfetched_tracks) > 0:
response += "\nCouldn't fetch lyrics for these tracks:\n"
for track in unfetched_tracks:
response += track
return parse_response(response)
if __name__ == "__main__":
main()