-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathPABot.py
564 lines (515 loc) · 23.6 KB
/
PABot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
import threading, time, random
from os import path
import telepot
from telepot.loop import MessageLoop
from telepot.namedtuple import InlineKeyboardMarkup as InlineMarkup
from telepot.namedtuple import InlineKeyboardButton as InlineBtn
from telepot.namedtuple import ReplyKeyboardMarkup as ReplyMarkup
from telepot.namedtuple import KeyboardButton as Btn
# for broadcast 文字轉語音
from gtts import gTTS
from os import system
# for playing music
import pafy, vlc
from youtube_search import YoutubeSearch
# broadcast message
def broadcast(msg) :
global player
global playStatus
print('Broadcasting', msg)
language = 'zh'
sound_file = gTTS(text=msg, lang=language, slow=False)
sound_file.save("output.mp3")
# if broadcast while playing music
# music will pause
if isPlaying :
playStatus = 'pause'
player.pause()
system("cvlc --play-and-exit output.mp3")
player.pause()
time.sleep(1)
playStatus = 'play'
else :
system("cvlc --play-and-exit output.mp3")
return
# show_plist - show your own personal playlist
def show_plist(user_id) :
file = open(user_id + '_playlist.txt', 'r', encoding='utf8')
personal_plist = file.readlines()
personal_plist = list(map(eval, personal_plist))
file.close()
if len(personal_plist) == 0 :
return 'You don\'t have any song in your playlist,\nplease add song first'
else :
reply_str = ''
for i in range(len(personal_plist)) :
reply_str += str(i + 1) + '. ' + personal_plist[i]['title'] + '\nhttps://www.youtube.com' + personal_plist[i]['url_suffix'] + '\n'
return reply_str
# 檢查要新增的歌曲是否已經在 playlist 中
def checkPlistHasExisted(user_id, toAddSong) :
file = open(user_id + '_playlist.txt', 'r', encoding='utf8')
data = file.readlines()
file.close()
for i in data :
if toAddSong['url_suffix'] in i :
result = 'This song is already in your playlist'
return False, result
return True, 'OK'
# 檢查個人播放清單的歌曲數量是否超過上限
def checkPlistLen(user_id) :
file = open(user_id + '_playlist.txt', 'r', encoding='utf8')
data = file.readlines()
file.close()
if len(data) < 10 :
return True, 'OK'
else :
return False, 'Your playlist is full!'
# add song to personal playlist from now playing
def addToMyPlist(user_id, nowPlayingSong) :
file = open(user_id + '_playlist.txt', 'a+', encoding='utf8')
nowPlayingSong['orderUser'] = '@' + user_id
input_str = str(nowPlayingSong) + '\n'
file.write(input_str)
file.close()
return True
def addSongsToMyPlist(user_id, addSongNum) :
global playList
# 因為允許一次選多首歌,因此 playSongNum 是個字串,用空白隔開數字
addSongNum = addSongNum.split(' ')
count = 0
for i in addSongNum :
# 排除不合法的數字
if int(i) <= 0 or int(i) > len(playList) :
continue
# 檢查清單長度
status, reply = checkPlistLen(user_id)
if status :
# 再檢查重複
status, reply = checkPlistHasExisted(user_id, playList[int(i) - 1])
if status :
file = open(user_id + '_playlist.txt', 'a+', encoding='utf8')
playList[int(i) - 1]['orderUser'] = '@' + user_id
write_str = str(playList[int(i) - 1]) + '\n'
file.write(write_str)
file.close()
else :
count += 1
reply = 'Some songs has already in your playlist'
else :
return status, reply
if count > 0 :
return True, reply
else :
return True, 'OK'
# edit_plist - delete songs from your own personal playlist
def edit_plist(user_id, toDelete) :
file = open(user_id + '_playlist.txt', 'r', encoding='utf8')
plist = file.readlines()
file.close()
toDelete = toDelete.split(' ')
toDelete.sort(reverse=True)
initLength = len(plist)
for i in toDelete :
# 排除不合法的數字
if int(i) <= 0 or int(i) > initLength :
continue
plist.pop(int(i) - 1)
file = open(user_id + '_playlist.txt', 'w', encoding='utf8')
for i in range(len(plist)) :
print(plist[i])
file.write(str(plist[i]))
file.close()
# add_from_plist - add songs to public playlist from your playlist
def add_from_plist(user_id, playSongNum) :
global playList
# 因為允許一次選多首歌,因此 playSongNum 是個字串,用空白隔開數字
playSongNum = playSongNum.split(' ')
file = open(user_id + '_playlist.txt', 'r', encoding='utf8')
data = file.readlines()
file.close()
# 把 playSongNum 裡的數字對應到 playlist 上,加入播放清單
print(playSongNum)
for i in playSongNum :
# 排除不合法的數字
if int(i) < 0 or int(i) > len(data) :
continue
# 把個人 playlist 的歌加進 public playlist
playList.append(eval(data[int(i) - 1]))
# play_all_plist - play all songs from your playlist randomly
def play_all_plist(user_id) :
global playList
file = open(user_id + '_playlist.txt', 'r', encoding='utf8')
personal_plist = file.readlines()
file.close()
random.shuffle(personal_plist)
for i in range(len(personal_plist)) :
playList.append(eval(personal_plist[i]))
# show the playlist
def show() :
global isPlaying, nowPlayingSong, playList
show_str = ''
if isPlaying :
show_str += 'Now Playing\n==========\n' + nowPlayingSong['title'] + '(Ordered By ' + nowPlayingSong['orderUser'] + ') /addToMyPlist\n\n'
show_str += 'Next /addSongsToMyPlist\n==========\n '
if len(playList) > 0 :
for i in range(len(playList)) :
show_str += str(i + 1) + '. ' + playList[i]['title'] + ' (Ordered By ' + playList[i]['orderUser'] + ')\n'
show_str += '\n'
else :
show_str += 'No Playlist!'
return show_str
# delete a song from playlist
def delete(num) :
global playList
playList.pop(int(num) - 1)
# search song on youtube
def search(msg) :
return YoutubeSearch(msg, max_results=10).to_dict()
# stop playing song
def stop() :
global stopCmd, playStatus
playStatus = 'stop'
stopCmd = True
# skip now playing
def skip() :
global player, isPlaying
if isPlaying :
player.stop()
def vol_up() :
global player, volume
volUp = volume + 5
if volUp <= 100 :
player.audio_set_volume(volUp)
volume += 5
return True
else :
return False
def vol_down() :
global player, volume
volDown = volume - 5
if volDown >= 5 :
player.audio_set_volume(volDown)
volume -= 5
return True
else :
return False
# play song
def play(msg) :
global player
global playStatus
global volume
url = "https://www.youtube.com" + msg
video = pafy.new(url)
best = video.getbestaudio()
playurl = best.url
Instance = vlc.Instance("prefer-insecure")
player = Instance.media_player_new()
Media = Instance.media_new(playurl)
Media.get_mrl()
player.set_media(Media)
volume = 60
player.audio_set_volume(volume)
player.play()
time.sleep(5)
playStatus = 'play'
# duration = player.get_length() / 1000
# time.sleep(duration)
while playStatus != 'stop' and (player.is_playing() == 1 or playStatus == 'pause') :
pass
player.stop()
playStatus = 'stop'
return
def to_play() :
global isPlaying, playing_thread, nowPlayingSong
# playlist = open('playlist.txt', 'r+', encoding='utf8')
global playList, stopCmd
while len(playList) > 0 and not stopCmd:
nowPlayingSong = playList.pop(0)
print('Now Playing =', nowPlayingSong['title'])
play(nowPlayingSong['url_suffix'])
stopCmd = False
isPlaying = False
playing_thread = None
return
def on_callback_query(msg) :
global user_status, playList
query_id, from_id, query_data = telepot.glance(msg, flavor='callback_query')
# print('Callback Query:', query_id, from_id, query_data)
user_id = msg['from']['username']
if user_status.__contains__(user_id) :
if user_status[user_id][0] == 'await_add' :
search_result = user_status[user_id][1]
addSong = search_result[int(query_data)]
addSong['orderUser'] = '@' + user_id
playList += [addSong]
msg_id = int(msg['message']['message_id'])
bot.deleteMessage((from_id, msg_id))
show_str = show()
bot.sendMessage(from_id, 'Done. Following is new playlist')
time.sleep(1)
bot.sendMessage(from_id, show_str)
del user_status[user_id]
else :
bot.sendMessage(from_id, 'you need to search again')
def on_chat_message(msg) :
global playList, nowPlayingSong, isPlaying
global playing_thread, user_status
content_type, chat_type, chat_id = telepot.glance(msg)
# print(content_type, chat_type, chat_id)
# 先檢查是否有 Username
if 'username' not in msg['chat'] :
bot.sendMessage(chat_id, 'Please set a UserName')
else :
user_id = msg['chat']['username']
# 檢查使用者是否傳送文字訊息
if content_type == 'text' :
# 確定是什麼指令
if msg['text'] == '/start' :
# 介紹機器人
bot.sendMessage(chat_id, 'Order music and play music in MOLi')
replyBtns = [
[Btn(text='/help'), Btn(text='/show'), Btn(text='/add'), Btn(text='/play'), Btn(text='/skip'), Btn(text='/stop')],
[Btn(text='/vol_down'), Btn(text='/show_plist'), Btn(text='/vol_up')],
[Btn(text='/add_from_plist'), Btn(text='/play_all_plist')],
[Btn(text='/edit_plist'), Btn(text='/create_plist')],
[Btn(text='/broadcast')],
]
time.sleep(1)
bot.sendMessage(chat_id, 'Here\'s all function buttons', reply_markup=ReplyMarkup(keyboard=replyBtns))
elif msg['text'] == '/help' :
intro = \
'show - show all public playlist\n'+\
'add - search song on youtube and add to public playlist\n'+\
'play - play music\n'+\
'skip - skip song which is playing\n'+\
'stop - stop playing music\n'+\
'vol_up - turn up the volume\n'+\
'vol_down - turn down the volume\n'+\
'show_plist - show your own personal playlist\n'+\
'add_from_plist - add songs to public playlist from your playlist\n'+\
'play_all_plist - play all songs from your playlist randomly\n'+\
'edit_plist - delete songs from your own personal playlist\n'+\
'create_plist - create a personal playlist\n'+\
'broadcast - input text, broadcast in MOLi'
# 介紹機器人
bot.sendMessage(chat_id, intro)
elif msg['text'] == '/broadcast' :
# input text, broadcast in MOLi
bot.sendMessage(chat_id, 'Now send me text you want to broadcast')
user_status[user_id] = 'broadcast'
elif msg['text'] == '/play' :
# choose a playlist to play music
if len(playList) == 0 :
bot.sendMessage(chat_id, "No playlist to play!", reply_to_message_id = msg['message_id'])
else :
if playing_thread != None :
if playing_thread.is_alive() :
bot.sendMessage(chat_id, "It's playing now!", reply_to_message_id = msg['message_id'])
else :
playing_thread = threading.Thread(target=to_play, args=())
playing_thread.start()
isPlaying = True
reply_str = show()
bot.sendMessage(chat_id, reply_str, reply_to_message_id = msg['message_id'])
elif msg['text'] == '/stop' :
if playing_thread != None :
if playing_thread.is_alive() :
stop()
bot.sendMessage(chat_id, 'Now stop')
elif msg['text'] == '/skip' :
# skip now playing song
skip()
elif msg['text'] == '/vol_up' :
# skip now playing song
if not vol_up() :
bot.sendMessage(chat_id, 'Too loud!', reply_to_message_id = msg['message_id'])
else :
bot.sendMessage(chat_id, 'Volumn:' + str(volume))
elif msg['text'] == '/vol_down' :
# skip now playing song
if not vol_down() :
bot.sendMessage(chat_id, 'Too low!', reply_to_message_id = msg['message_id'])
else :
bot.sendMessage(chat_id, 'Volumn:' + str(volume))
elif msg['text'] == '/show' :
# show all playlist
reply_str = show()
bot.sendMessage(chat_id, reply_str, reply_to_message_id = msg['message_id'])
elif msg['text'] == '/add' :
# search song on youtube and add to list
bot.sendMessage(chat_id, 'Now please send some words of the song, which you want to add')
user_status[user_id] = 'add'
elif msg['text'] == '/delete' :
# delete a song from playlist
list_str = ''
if len(playList) > 0 :
bot_reply = 'Please choose a number of songs in playlist, which you want to delete.'
bot.sendMessage(chat_id, bot_reply, reply_to_message_id = msg['message_id'])
time.sleep(1)
for i in range(len(playList)) :
list_str += str(i + 1) + '. ' + playList[i]['title'] + '\n'
bot.sendMessage(chat_id, list_str)
user_status[user_id] = 'delete'
else :
bot.sendMessage(chat_id, 'No playlist!', reply_to_message_id = msg['message_id'])
elif msg['text'] == '/create_plist' :
file = open(user_id + '_playlist.txt', 'a+', encoding='utf8')
file.close()
reply_str = 'Done, now you have your personal playlist'
bot.sendMessage(chat_id, reply_str, reply_to_message_id = msg['message_id'], disable_web_page_preview=True)
elif msg['text'] == '/show_plist' :
if path.isfile('./' + user_id + '_playlist.txt') :
reply_str = show_plist(user_id)
bot.sendMessage(chat_id, reply_str, reply_to_message_id = msg['message_id'], disable_web_page_preview=True)
else :
reply_str = 'You don\'t have playlist, please create one first.'
bot.sendMessage(chat_id, reply_str, reply_to_message_id = msg['message_id'])
elif msg['text'] == '/addToMyPlist' :
if path.isfile('./' + user_id + '_playlist.txt') :
# 先檢查清單的長度
plist_status, reply = checkPlistLen(user_id)
if plist_status :
# 再檢查是否有重複
plist_status, reply = checkPlistHasExisted(user_id, nowPlayingSong)
if plist_status :
if addToMyPlist(user_id, nowPlayingSong) :
bot.sendMessage(chat_id, 'Ok', reply_to_message_id = msg['message_id'])
else :
bot.sendMessage(chat_id, reply, reply_to_message_id = msg['message_id'])
else :
bot.sendMessage(chat_id, reply, reply_to_message_id = msg['message_id'])
else :
reply_str = 'You don\'t have playlist, please create one first.'
bot.sendMessage(chat_id, reply_str, reply_to_message_id = msg['message_id'])
elif msg['text'] == '/addSongsToMyPlist' :
if path.isfile('./' + user_id + '_playlist.txt') :
bot.sendMessage(chat_id, 'Please choose numbers of songs in playlist,\nwhich you want to add to your personal playlist.', reply_to_message_id = msg['message_id'])
reply_str = show()
time.sleep(1)
bot.sendMessage(chat_id, reply_str, reply_to_message_id = msg['message_id'])
user_status[user_id] = 'addSongs'
else :
reply_str = 'You don\'t have playlist, please create one first.'
bot.sendMessage(chat_id, reply_str, reply_to_message_id = msg['message_id'])
elif msg['text'] == '/add_from_plist' :
if path.isfile('./' + user_id + '_playlist.txt') :
bot.sendMessage(chat_id, 'Please choose numbers of songs in playlist,\nwhich you want to play.', reply_to_message_id = msg['message_id'], disable_web_page_preview=True)
reply_str = show_plist(user_id)
time.sleep(1)
bot.sendMessage(chat_id, reply_str, reply_to_message_id = msg['message_id'], disable_web_page_preview=True)
user_status[user_id] = 'add_from_plist'
else :
reply_str = 'You don\'t have playlist, please create one first.'
bot.sendMessage(chat_id, reply_str, reply_to_message_id = msg['message_id'])
elif msg['text'] == '/play_all_plist' :
if path.isfile('./' + user_id + '_playlist.txt') :
play_all_plist(user_id)
if playing_thread == None :
playing_thread = threading.Thread(target=to_play, args=())
playing_thread.start()
isPlaying = True
reply_str = show()
bot.sendMessage(chat_id, reply_str, reply_to_message_id = msg['message_id'])
else :
bot.sendMessage(chat_id, 'Add to public playlist successfully!', reply_to_message_id = msg['message_id'])
else :
reply_str = 'You don\'t have playlist, please create one first.'
bot.sendMessage(chat_id, reply_str, reply_to_message_id = msg['message_id'])
elif msg['text'] == '/edit_plist' :
if path.isfile('./' + user_id + '_playlist.txt') :
user_status[user_id] = 'edit_plist'
bot_reply = 'Please choose a number of songs in your playlist, which you want to delete.'
bot.sendMessage(chat_id, bot_reply, reply_to_message_id = msg['message_id'])
time.sleep(1)
reply_str = show_plist(user_id)
bot.sendMessage(chat_id, reply_str, reply_to_message_id = msg['message_id'], disable_web_page_preview=True)
else :
reply_str = 'You don\'t have playlist, please create one first.'
bot.sendMessage(chat_id, reply_str, reply_to_message_id = msg['message_id'])
else :
# 檢查使用者是否有指令狀態,若沒有就不理他
if user_status.__contains__(user_id) :
if user_status[user_id] == 'broadcast' :
bot.sendMessage(chat_id, 'Broadcasting...')
# 呼叫函數廣播文字
broadcast(msg['text'])
# 完成事情後洗掉指令狀態
bot.sendMessage(chat_id, 'Broadcast over')
del user_status[user_id]
elif user_status[user_id] == 'delete' :
delete(msg['text'])
show_str = show()
bot.sendMessage(chat_id, 'Done. Following is new playlist')
time.sleep(1)
bot.sendMessage(chat_id, show_str)
# 完成事情後洗掉指令狀態
del user_status[user_id]
elif user_status[user_id] == 'add' :
search_result = search(msg['text'])
songList = []
for i in range(len(search_result)) :
songList.append(
[InlineBtn(text=str(i + 1)+'. ' + search_result[i]['title'], callback_data=i)]
)
time.sleep(1)
bot.sendMessage(
chat_id,
'Following is search result, please choose a number of songs,\nwhich will be add to playlist',
reply_markup=InlineMarkup(inline_keyboard=songList),
reply_to_message_id = msg['message_id']
)
user_status[user_id] = ['await_add', search_result]
elif user_status[user_id][0] == 'await_add' :
search_result = user_status[user_id][1]
addSong = search_result[int(msg['text']) - 1]
addSong['orderUser'] = '@' + user_id
playList += [addSong]
show_str = show()
bot.sendMessage(chat_id, 'Done. Following is new playlist')
time.sleep(1)
bot.sendMessage(chat_id, show_str)
del user_status[user_id]
elif user_status[user_id] == 'add_from_plist' :
add_from_plist(user_id, msg['text'])
show_str = show()
bot.sendMessage(chat_id, 'Done. Following is new playlist')
time.sleep(1)
bot.sendMessage(chat_id, show_str)
del user_status[user_id]
elif user_status[user_id] == 'addSongs' :
status, reply = addSongsToMyPlist(user_id, msg['text'])
reply_str = show_plist(user_id)
if status and reply == 'OK':
bot.sendMessage(chat_id, 'Done. Following is your new playlist')
time.sleep(1)
bot.sendMessage(chat_id, reply_str, disable_web_page_preview=True)
else :
bot.sendMessage(chat_id, reply)
del user_status[user_id]
elif user_status[user_id] == 'edit_plist' :
edit_plist(user_id, msg['text'])
reply_str = show_plist(user_id)
bot.sendMessage(chat_id, 'Done. Following is your new playlist')
time.sleep(1)
bot.sendMessage(chat_id, reply_str, disable_web_page_preview=True)
del user_status[user_id]
# global variables
user_status = dict()
search_result = []
playStatus = 'stop'
isPlaying = False
playList = []
playing_thread = None
stopCmd = False
player = None
volume = 0
if __name__ == '__main__' :
myFile = open('TOKEN.txt')
TOKEN = myFile.read().strip()
# TOKEN = lines[0]
bot = telepot.Bot(TOKEN)
MessageLoop(bot, {'chat': on_chat_message, 'callback_query': on_callback_query}).run_as_thread()
print ('Listening ...')
# Keep the program running.
while 1:
time.sleep(10)