From 0fd2ee4aee7f3a98c7fc0dbc5dabf72ca7addea2 Mon Sep 17 00:00:00 2001 From: wlerin Date: Thu, 5 Jan 2017 02:10:47 -0800 Subject: [PATCH] Simple keyboard controls --- showroom.py | 276 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 188 insertions(+), 88 deletions(-) diff --git a/showroom.py b/showroom.py index e955eab..33163d3 100755 --- a/showroom.py +++ b/showroom.py @@ -22,7 +22,7 @@ Find the next live for a room https://www.showroom-live.com/api/room/next_live?room_id=61576 """ -from sys import stdout +from sys import stdout, stdin, exit import os import glob import time @@ -33,7 +33,15 @@ import argparse from heapq import heapify, heappush, heappop, heappushpop import itertools -from operator import attrgetter +# from operator import attrgetter + +# take basic input +# adapted from http://stackoverflow.com/a/19655992/3380530 +import threading +# asyncio also has a Queue +# why am I using both Queue and heapq? +from queue import Queue, Empty as QueueEmpty + try: from announce import Announcer @@ -53,6 +61,11 @@ def send_message(self, msg): Announcer = DefaultAnnouncer + +class DummyAnnouncer(object): + def send_message(self, *args, **kwargs): + pass + import pytz from requests import Session from requests.exceptions import ConnectionError, ChunkedEncodingError, Timeout @@ -132,7 +145,7 @@ def __init__(self): self.room_dict = {} self.room_url_lookup = {} self._filter_list = [] - self._filter_ids = [] + self._filter_ids = [] def update(self): # checks if index files (*.jdex) have been updated @@ -289,7 +302,6 @@ def rebuild(self): class WatchSession(Session): def __init__(self, *args, **kwargs): - # TODO: super(WatchSession, self).__init__(*args, **kwargs) def get(self, url, params=None, **kwargs): @@ -307,7 +319,7 @@ class WatchQueue(object): def __init__(self): self.queue = [] self.entry_map = {} - self.REMOVED = None + self.REMOVED = None self.counter = itertools.count() self.dirty = False @@ -341,7 +353,7 @@ def keys(self): def add(self, item): if item.room_id in self.entry_map: - return False# do nothing + return False # do nothing else: count = next(self.counter) entry = [item.priority, count, item] @@ -438,19 +450,36 @@ def prune(self, priority): else: return False + def list(self): + return sorted(self.entry_map[x][2].description for x in self.keys()) + + def get_links(self): + entries = sorted([self.entry_map[x][2] for x in self.keys()], key=lambda x: x.formatted_time) + return sum([(e.description, e.web_url, e.url) for e in entries], ()) + + def quit(self): + while self.queue: + # note that if the process is hung this won't kill it + # this function shouldn't need to worry about indexing into the returned entry + self.queue.pop()[2].kill() + class Downloader(object): - def __init__(self, member, session, outdir, logging): + def __init__(self, member, session, outdir, logging, noisy): self.session = session self._member = member self.process = None # self.failures = 0 - self.rootdir = outdir # set by WatchManager + self.rootdir = outdir # set by WatchManager self.destdir, self.tempdir, self.outfile = "", "", "" self._url = "" self._logging = logging - self._announcer = Announcer() + if noisy: + self._announcer = Announcer() + else: + self._announcer = DummyAnnouncer() self.sent_quit = False + self.start_time = None @property def name(self): @@ -476,13 +505,22 @@ def logging(self): def web_url(self): return self._member['web_url'] + @property + def description(self): + return ' '.join((self.formatted_time, self.name)) + + @property + def formatted_time(self): + return self.start_time.strftime('%H:%M') + def announce(self, msg): self._announcer.send_message(msg) def is_live(self): while True: try: - status = self.session.get('https://www.showroom-live.com/room/is_live', params={"room_id": self.room_id}).json()['ok'] + status = self.session.get('https://www.showroom-live.com/room/is_live', + params={"room_id": self.room_id}).json()['ok'] except JSONDecodeError: continue @@ -507,12 +545,12 @@ def check(self): return False return True # how to respond to failed exits? - def kill(self): + def kill(self, hard=False): if not self.sent_quit: print('Quitting {}'.format(self.name)) self.process.terminate() self.sent_quit = True - else: + elif hard: self.process.kill() self.sent_quit = False @@ -537,20 +575,25 @@ def move_to_dest(self): self.destdir, self.tempdir, self.outfile = ("", "", "") def start(self): - data = self.session.get('https://www.showroom-live.com/room/get_live_data', params={'room_id': self.room_id}).json() + data = self.session.get('https://www.showroom-live.com/room/get_live_data', + params={'room_id': self.room_id}).json() stream_name = data['streaming_name_rtmp'] stream_url = data["streaming_url_rtmp"] tokyo_time = datetime.datetime.now(tz=TOKYO_TZ) new_url = '{}/{}'.format(stream_url, stream_name) - self.tempdir, self.destdir, self.outfile = format_name(self.rootdir, tokyo_time.strftime('%Y-%m-%d %H%M%S'), self.member) + self.tempdir, self.destdir, self.outfile = format_name(self.rootdir, + tokyo_time.strftime('%Y-%m-%d %H%M%S'), self.member) self.sent_quit = False - + + if not self.start_time: + self.start_time = tokyo_time + if new_url != self.url: - self._url = new_url + self._url = new_url print('Downloading {}\'s Showroom'.format(self.name, self.url)) self.announce((self.web_url, self.url)) - + if self.logging is True: log_file = os.path.normpath('{}/logs/{}.log'.format(self.destdir, self.outfile)) ENV = {'FFREPORT': 'file={}:level=40'.format(log_file)} @@ -568,7 +611,7 @@ def start(self): ], stdin=subprocess.PIPE, env=ENV) - + @property def url(self): return self._url @@ -604,9 +647,9 @@ def format_name(rootdir, time_str, member): class Watcher(object): - def __init__(self, member, start_time = None): + def __init__(self, member, start_time=None): self._member = member - self.session = WatchSession() + self.session = WatchSession() self.start_time = start_time def check(self): @@ -625,8 +668,8 @@ def check(self): elif status == 1: return True - def download(self, outdir, logging): - return Downloader(self._member, self.session, outdir, logging) + def download(self, outdir, logging, noisy): + return Downloader(self._member, self.session, outdir, logging, noisy) @property def name(self): @@ -725,6 +768,10 @@ def room_id(self): def formatted_time(self): return self.start_time.strftime('%H:%M') + @property + def description(self): + return self.formatted_time + ' ' + self.name + (' (LIVE)' if self.is_live() else '') + class DownloadManager(object): def __init__(self, scheduled): @@ -774,6 +821,21 @@ def scheduled(self): def rebuild(self): self.downloads.rebuild() + @property + def list(self): + return self.downloads.list() + + @property + def list_with_links(self): + return self.downloads.get_links() + + def quit(self, soft_exit): + if soft_exit: + a = Announcer() + a.send_message(('The following downloads will continue:',) + self.list) + else: + self.downloads.quit() + class WatchManager(object): def __init__(self, scheduled, live, settings): @@ -782,8 +844,8 @@ def __init__(self, scheduled, live, settings): self.watches = WatchQueue() self._inqueue = WatchQueue() self._time = datetime.datetime.now(tz=TOKYO_TZ) - self._scheduled = scheduled # from Scheduler, tracks watches added to _inqueue - self._live = live # from Scheduler, tracks live streams + self._scheduled = scheduled # from Scheduler, tracks watches added to _inqueue + self._live = live # from Scheduler, tracks live streams self.downloads = DownloadManager(scheduled=scheduled) self.settings = settings @@ -809,7 +871,9 @@ def tick(self, new_time): # add to downloads if len(self.downloads) < self.max_downloads or self.downloads.prune(watch.member): watch = self.watches.dirty_pop(watch) - self.downloads.add(watch.download(self.outdir, self.settings['logging'])) + self.downloads.add(watch.download(self.outdir, + self.settings['logging'], + self.settings['noisy'])) except TimeoutError: print('{}\'s Watch expired'.format(watch.name)) if watch.room_id in self._scheduled: @@ -931,7 +995,7 @@ def update_live(self): new = self.upcoming.pop(room_id) new.go_live() else: - new = Schedule(start_time, self.index, room_id=room_id, is_live=True, dt = self._time) + new = Schedule(start_time, self.index, room_id=room_id, is_live=True, dt=self._time) if new.priority <= self.settings['max_priority']: self.live.update({new.room_id: new}) @@ -963,6 +1027,12 @@ def watchmanager(self): def scheduled(self): return self._scheduled + @property + def list(self): + """Returns a list of all currently scheduled and live rooms""" + return sorted([self.upcoming[x].description for x in self.upcoming] + + [self._scheduled[x].description for x in self._scheduled]) + # @property # def rooms(self): # # TODO: remove this entirely, the index should be just as fast as a frozenset @@ -972,10 +1042,17 @@ def scheduled(self): class Controller(object): + # slavish adaptation of + # http://stackoverflow.com/a/19655992/3380530 + @staticmethod + def add_input(input_queue): + while True: + input_queue.put(stdin.read(1)) + def __init__(self, index=None, outdir=OUTDIR, max_downloads=MAX_DOWNLOADS, max_priority=MAX_PRIORITY, max_watches=MAX_WATCHES, live_rate=LIVE_RATE, schedule_ticks=SCHEDULE_TICKS, end_hour=END_HOUR, resume_hour=RESUME_HOUR, - new_index=True, index_loc=NEW_INDEX_LOC, logging=False): + new_index=True, index_loc=NEW_INDEX_LOC, logging=False, exit_behaviour='hard', noisy=False): self.session = WatchSession() if new_index: @@ -989,7 +1066,8 @@ def __init__(self, index=None, outdir=OUTDIR, 'max_priority': max_priority, 'live_rate': live_rate, 'schedule_ticks': schedule_ticks, - 'logging': logging} + 'logging': logging, + 'noisy': noisy} self.end_time = datetime.time(hour=end_hour, minute=10, tzinfo=TOKYO_TZ) self.resume_time = datetime.time(hour=resume_hour-1, minute=50, tzinfo=TOKYO_TZ) @@ -1002,13 +1080,28 @@ def __init__(self, index=None, outdir=OUTDIR, self.downloaders = None self.time = None + self.input_queue = Queue() + self.input_thread = threading.Thread(target=self.add_input, args=(self.input_queue,)) + + self.input_thread.daemon = True + self.input_thread.start() + + if exit_behaviour == 'soft': + self.soft_exit = True + else: + self.soft_exit = False + + self.quitting = False + + self._announcer = Announcer() + def filter(self, names_filter): self.index.filter(names_filter) def run(self): # why are these defined here? self.scheduler = Scheduler(index=self.index, settings=self.settings) - self.watchers = self.scheduler.watchmanager + self.watchers = self.scheduler.watchmanager self.downloaders = self.watchers.downloads # self.downloaders # sleep_minutes = 20 @@ -1017,8 +1110,11 @@ def run(self): self.time = datetime.datetime.now(tz=TOKYO_TZ) if self.resume_time > self.time.time() > self.end_time: - sleep_seconds = (datetime.datetime.combine(self.time, self.resume_time) - self.time).total_seconds() + 1.0 - print('Time is {}, sleeping for {} seconds, until {}'.format(self.time.strftime('%H:%M'), sleep_seconds, self.resume_time.strftime('%H:%M'))) + sleep_seconds = (datetime.datetime.combine(self.time, self.resume_time) + - self.time).total_seconds() + 1.0 + print('Time is {}, sleeping for {} seconds, until {}'.format(self.time.strftime('%H:%M'), + sleep_seconds, + self.resume_time.strftime('%H:%M'))) self.scheduler.reset_ticks() time.sleep(sleep_seconds) @@ -1030,63 +1126,57 @@ def run(self): if len(self.watchers) == 0 and len(self.downloaders) == 0: time.sleep(self.live_rate) else: - time.sleep(1.0) - - # TODO: allow soft exit i.e. on user input, rather than ctrl+c + time.sleep(0.5) + + while not self.input_queue.empty(): + try: + key_press = self.input_queue.get(block=False) + except QueueEmpty: + break + else: + self.heed_command(key_press) + + def quit(self): + self.downloaders.quit(self.soft_exit) + print("Exiting...") + exit() + + def heed_command(self, key): + """Responds to (single) key presses: + q/Q == Quit (asks for confirmation) + s/S == Print full schedule (including current downloads) + d/D == Print all current downloads + l/L == Print all current downloads with rtmp and http links""" + if self.quitting and key.lower() == 'y': + self.quit() + else: + self.quitting = False + + if key.lower() == 'q': + self._clear_input_queue() + print('Are you sure you want to quit? (y/N)') + if self.soft_exit: + print('(Active downloads will continue until finished) ') + else: + print('(Active downloads will be stopped) ') + self.quitting = True + # TODO: encapsulate these more sanely so that other types of Announcer will work + elif key.lower() == 's': + self._clear_input_queue() + self._announcer.send_message(('Current Schedule:',) + tuple(self.scheduler.list)) + elif key.lower() == 'd': + self._clear_input_queue() + self._announcer.send_message(('Current Downloads:',) + tuple(self.downloaders.list)) + elif key.lower() == 'l': + self._clear_input_queue() + self._announcer.send_message(('Current Downloads with Links:',) + tuple(self.downloaders.list_with_links)) + + def _clear_input_queue(self): + with self.input_queue.mutex: + self.input_queue.queue.clear() + self.input_queue.all_tasks_done.notify_all() + self.input_queue.unfinished_tasks = 0 -# obsolete -""" -def watch(member, outdir): - s = WatchSession() - - params = {'room_id': member['showroom_id']} - member_name = member['engName'] - print('Watching {}\'s Room'.format(member_name)) - count = 0 - while True: - count+=1 - try: - status = s.get('https://www.showroom-live.com/room/is_live', params=params).json()['ok'] - except JSONDecodeError: - continue - - if count % 30 == 0: - print('Still watching {}\'s Room'.format(member_name)) - if status == 0: - time.sleep(2) - continue - elif status == 1: - data = s.get('https://www.showroom-live.com/room/get_live_data', params=params).json() - stream_name = data['streaming_name_rtmp'] - stream_url = data["streaming_url_rtmp"] - normed_path = os.path.normpath('{}/{}.mp4'.format(outdir, member_name.lower())) - subprocess.call(['ffmpeg', '-i', '{}/{}'.format(stream_url, stream_name), - '-user-agent', 'User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 \ - (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36', - '-headers', 'Referer: {}'.format(member['web_url']), - '-c', 'copy', normed_path]) - break - - - -def find_member(target, index): - for e in index: - if target.lower() == e['engName'].lower(): - return e - - return None - -# def merge_fragments(fragment_list, ): -# see concat.py - -# TODO: implement this, probably using WatchQueue + concurrent.futures -def simple_watcher(members, max_downloads): - # threadpool for watches, list of active Popen objects once the watches complete - # with concurrent.futures.ThreadPoolExecutor() as executor: - # result = executor.map(function, iterable) - while True: - pass -""" if __name__ == "__main__": """ @@ -1135,6 +1225,12 @@ def simple_watcher(members, max_downloads): help='Hour to resume recording (will actually start 10 minutes earlier). \ Defaults to %(default)s') parser.add_argument('--logging', action='store_true', help="Turns on ffmpeg logging.") + parser.add_argument('--noisy', action='store_true', help="Print download links when downloads start") + # NOT WORKING + # parser.add_argument('--exit-behaviour', '--exit-behavior', '-E', + # choices=['hard', 'soft'], type=str, default='soft', + # help='Behaviour on pressing q to quit. A "soft" exit leaves any current downloads running,\ + # while a "hard" exit attempts to stop them before closing.') args = parser.parse_args() # will raise an exception if not found but that's probably best @@ -1152,9 +1248,13 @@ def simple_watcher(members, max_downloads): end_hour=args.end_hour, resume_hour=args.resume_hour, logging=args.logging, - index_loc=args.index) + index_loc=args.index, + noisy=args.noisy, + # exit_behaviour=args.exit_behaviour + ) # TODO: allow the filter list to come from a file c.filter(args.names) + print('Show (S)chedule, Show (D)ownloads, Show (L)inks, (Q)uit') c.run() '''