diff --git a/killscreens.sh b/killscreens.sh new file mode 100755 index 0000000..65bb4e5 --- /dev/null +++ b/killscreens.sh @@ -0,0 +1,3 @@ +#!/bin/zsh +screen -ls | grep Detached | cut -d. -f1 | awk '{print $1}' | xargs kill + diff --git a/launch_stations.py b/launch_stations.py new file mode 100644 index 0000000..16f34cc --- /dev/null +++ b/launch_stations.py @@ -0,0 +1,27 @@ +from config import * + +from flask import Flask +from flask.ext.sqlalchemy import SQLAlchemy + +from rootio.extensions import db + +import zmq +from utils import ZMQ, init_logging + +from rootio.telephony.models import * +from rootio.radio.models import * + +import time +app = Flask("ResponseServer") +app.config['SQLALCHEMY_DATABASE_URI'] = SQLALCHEMY_DATABASE_URI +db = SQLAlchemy(app) + +logger = init_logging('launch_stations') + +daemons = [] +from station_daemon import StationDaemon +stations = db.session.query(Station).all() +for i in stations: + daemons.append(StationDaemon(i.id)) +time.sleep(1) + diff --git a/launcher.zsh b/launcher.zsh new file mode 100755 index 0000000..bc790f4 --- /dev/null +++ b/launcher.zsh @@ -0,0 +1,32 @@ +#!/bin/bash +screen -S redis_server -dm sudo /usr/local/Cellar/redis/2.8.13/bin/redis-server /usr/local/etc/redis.conf +echo "launched redis... sleeping for a second..." +sleep 1 +screen -S freeswitch -dm /usr/local/freeswitch/freeswitch +echo "launched freeswitch" + +cd /usr/local/plivo +screen -S plivo-rest -dm /usr/local/plivo/bin/plivo-rest +screen -S plivo-outbound -dm /usr/local/plivo/bin/plivo-outbound +screen -S plivo-cache -dm /usr/local/plivo/bin/plivo-cache +echo "launced three plivo processes -- rest, outbout, cache" + +cd /Users/csik/Documents/code/rootio/rootio_web/scheduler +source vnv/bin/activate +screen -S forwarder.py -dm /Users/csik/Documents/code/rootio/rootio_web/vnv/bin/python forwarder_device.py +echo "Launched forwarder.py" + +screen -S run.py -dm /Users/csik/Documents/code/rootio/rootio_web/vnv/bin/python run.py +echo "Launched run.py" + +cd /Users/csik/Documents/code/rootio/rootio_telephony +source vnv/bin/activate +screen -S telephony_server.py -dm /Users/csik/Documents/code/rootio/rootio_telephony/vnv/bin/python telephony_server.py +echo "launched telephony server" + +screen -S launch_stations.py -dm /Users/csik/Documents/code/rootio/rootio_telephony/vnv/bin/python launch_stations.py +echo "launched stations" + +set -x #echo on +screen -ls + diff --git a/news_report.py b/news_report.py index 06b5fa8..dcf192b 100644 --- a/news_report.py +++ b/news_report.py @@ -1,10 +1,11 @@ -from fluidity import StateMachine, state, transition import plivohelper +from fluidity import StateMachine, state, transition """ -Sketch of news show. +Sketch of news show. -At most basic, the show consists of making outgoing calls and playing a news report. +At most basic, the show consists of making outgoing calls and playing a news +report. Options: Have news report read live from a location @@ -13,25 +14,26 @@ Advertisements in-line """ -import zmq from config import * import redis +from program_utils import sleep_advance_on_wake from utils import call, init_logging +from sound_utils import soundLength -from zmq.eventloop import ioloop, zmqstream -ioloop.install() -#TODO: Is above just an artifact, should be in station_daemon? +# TODO: Is above just an artifact, should be in station_daemon? -import os,sys +import os +import sys sys.path.insert(1, os.path.join(sys.path[0], '..')) -from yapsy.IPlugin import IPlugin import time +from datetime import datetime, timedelta +from apscheduler.scheduler import Scheduler # get access to telephony & web database +# TODO News Report should have no understanding of flask or database from flask import Flask from flask.ext.sqlalchemy import SQLAlchemy -from rootio.extensions import db telephony_server = Flask("ResponseServer") telephony_server.debug = True @@ -42,159 +44,231 @@ telephony_server.config['SQLALCHEMY_DATABASE_URI'] = SQLALCHEMY_DATABASE_URI db = SQLAlchemy(telephony_server) - logger = init_logging('news_report') # redis is used for flagging is_master if program is across multiple stations +# TODO move setup to config file r = redis.StrictRedis(host='localhost', port=6379, db=0) + class News(StateMachine): initial_state = 'setup' - def __init__(self, episode_id, station_id): + def __init__(self, episode_id, station): self.caller_list = "caller_list-{0}".format(episode_id) - self.sound_url = "{}{}{}{}".format(TELEPHONY_SERVER_IP,'/~csik/sounds/programs/',episode_id,'/current.mp3') - self.conference = "plivo" #"news_report_conference-{}".format(episode_id) - testme = db.session.query(Station).filter(Station.id == station_id).first() - logger.info("testme : {} type : {}".format(testme, type(testme))) - self.station = testme - self.episode_id = episode_id + # self.sound_url = "{}{}{}{}".format(TELEPHONY_SERVER_IP,'/~csik/sounds/programs/',episode_id,'/current.mp3') + # "news_report_conference-{}".format(episode_id) + self.conference = episode_id + #self.conference = "plivo" + + logger.info("station : {} type : {}".format(station, type(station))) + # the database representation of the station + self.station = station.station + # the actual python object in memory + self.station_daemon = station + self.episode_id = episode_id self.is_master = False self.fnumber = None + self.on_success_next = self.go_intro + self.sched = Scheduler() + self.sched.start() super(News, self).__init__() def setup(self): logger.info("News_Report: In setup") - logger.info("... for station {}".format(self.station.name)) - # Check if this instance should be master - if r.get('is_master_'+str(self.episode_id)) == 'none': - r.set('is_master_'+str(self.episode_id),self.station.id) + logger.info("... for station {}".format(self.station.name)) + ######################################################################## + # This whole section should be moved to station, should just be called# + # with a reference to self.on_success_next, which telephony # + # should call when it successfully connects # + ######################################################################## + # Check if this instance should be master + # TODO move this to station utilities function + # None for if first run + if r.get('is_master_'+str(self.episode_id)) == 'none' or r.get('is_master_'+str(self.episode_id)) is None: + r.set('is_master_'+str(self.episode_id), self.station.id) self.is_master = True logger.info("{} is master for news report".format(str(self))) - #check soundfile - import requests - response = requests.head(self.sound_url) - if response.status_code != 200: - logger.error('No sound file available at url:'.format(self.sound_url)) + # check soundfile -- works but left out for now + # TODO move to sound utilities + # import requests + # response = requests.head(self.sound_url) + # if response.status_code != 200: + # logger.error('No sound file available at url:'.format(self.sound_url)) - #check to see if this is a simple outgoing gateway or a multi-line one - logger.info('Station name: {}'.format(self.station.name)) + # check to see if this is a simple outgoing gateway or a multi-line one + logger.info('Station name: {}'.format(self.station.name)) + # move this to station try: - top_gateway = self.station.outgoing_gateways[0] + top_gateway = self.station.outgoing_gateways[0] if top_gateway.number_top == 0: - logger.info(str("Looks like the gateway does not need to acquire a line.")) - fnumber='3124680992' #make this a database field? - self.fnumber=fnumber + logger.info( + str("Looks like the gateway does not need to acquire a line.")) + fnumber = '3124680992' # make this a database field? + self.fnumber = fnumber else: - #allocate outgoing line - logger.info(str(r.llen('outgoing_unused'))+" free phone lines available") - fnumber = str(r.rpoplpush('outgoing_unused','outgoing_busy')) + # allocate outgoing line + logger.info( + str(r.llen('outgoing_unused')) + + " free phone lines available") + fnumber = str(r.rpoplpush('outgoing_unused', 'outgoing_busy')) self.fnumber = fnumber logger.info("Allocating line {}".format(fnumber)) - except Exception, e: - logger.debug("Exception: {} -- {}".format(Exception, e)) - #place calls - #GATEWAY_PREFIX='951' # This is a hack -- make this part of station or similar db field - logger.info("Trying to get transmitter_phone.raw_number for station {}".format(self.station.name)) + except Exception as e: + logger.debug("Exception: {} -- {}".format(Exception, e)) + # place calls + # GATEWAY_PREFIX='951' # This is a hack -- make this part of station or similar db field + # TODO move this to station as well + logger.info( + "Trying to get transmitter_phone.raw_number for station {}".format( + self.station.name)) + try: + logger.info( + "self.station.transmitter_phone.raw_number = {}".format( + self.station.transmitter_phone)) + except Exception as e: + logger.info( + "Failed to get transmitter_phone.raw_number", + exc_info=True) try: - logger.info("self.station.transmitter_phone.raw_number = {}".format(self.station.transmitter_phone)) - except Exception, e: - logger.info("Failed to get transmitter_phone.raw_number", exc_info = True) - try: - call_result = call( to_number="{}".format(self.station.transmitter_phone.raw_number), - from_number=fnumber, - gateway=top_gateway.sofia_string, - answered='http://127.0.0.1:5000/confer/'+str(self.episode_id)+'/', - #extra_dial_string="bridge_early_media=true,hangup_after_bridge=true,origination_caller_id_name=rootio,caller_name=rootio,origination_caller_id_number="+fnumber, - extra_dial_string=top_gateway.extra_string, - ) - except Exception, e: + # TODO use self.station.start_program(self) or similar + call_result = call( + to_number="{}".format(self.station.transmitter_phone.raw_number), + from_number=fnumber, gateway=top_gateway.sofia_string, + answered='http://127.0.0.1:5000/confer/' + str(self.episode_id) + + '/', extra_dial_string=top_gateway.extra_string,) + except Exception as e: + # TODO change this to a station function as above logger.error('Failed to place call', exc_info=True) call_result = 'Error' - - if call_result !='Error': - if call_result.get('Success') == True: + if call_result != 'Error': + # TODO as above + if call_result.get('Success') is True: self.RequestUUID = call_result.get('RequestUUID') logger.info(str(call_result)) - time.sleep(10) - #count successful calls, - #if not successful, plan otherwise + # Should now return to station, station should save its next state, and + # a successful call return should call that state - #launch show-wide listeners + # count successful calls, + # if not successful, plan otherwise def intro(self): logger.info("News_Report: In intro") - #play music - if self.is_master == True: + # play music + if self.is_master: logger.info("In INTRO to news report {}".format(self.conference)) - #wait until intro music is finished - + # wait until intro music is finished + self.go_report() def report(self): logger.info("In REPORT of news report {}".format(self.conference)) - #play report sound - if self.is_master == True: + # play report sound + if self.is_master: logger.info("In conference: {}".format(self.conference)) # Create a REST object - plivo = plivohelper.REST(REST_API_URL, SID, AUTH_TOKEN, API_VERSION) - call_params = {'ConferenceName':'plivo', 'MemberID':'all', 'FilePath':'/home/csik/public_html/sounds/programs/5/current.mp3'} try: + # TODO remove plivo references here, move to station object + plivo = plivohelper.REST( + REST_API_URL, + SID, + AUTH_TOKEN, + API_VERSION) + # TODO remove hard coded ref to sound file + mp3len = soundLength(SOUND_FILE_ROOT+'programs/5/current.mp3') + if mp3len == 'error': + logger.error('Aborting Program', exc_info=True) + self.teardown() + return + except Exception as e: + logger.info( + "Exception in news_report REPORT conference play: {},{}".format( + Exception, + e)) + # TODO remove hardcoded ref to sound file + call_params = {'ConferenceName': self.conference, + 'MemberID': 'all', + 'FilePath': SOUND_FILE_ROOT+'programs/5/current.mp3' + } + try: + # TODO remove plivo call with return from Station object result = plivo.conference_play(call_params) - logger.info("Result of conference play: {}".format(result)) - except Exception, e: - logger.info("Exception in news_report REPORT: {},{}".format(Exception, e)) - #check on calls? - # - time.sleep(120) + logger.info("Result of conference play: {}".format(result)) + if result.get('Success') is True: + constant = 1 + logger.info( + "Playing sound file, sleeping for {} seconds.".format( + mp3len + + constant)) + # print "job scheduled was: {}".format(self.advance_on_wake(mp3len+constant,self.go_outro)) + print "job scheduled was: {}".format(sleep_advance_on_wake(self.sched, mp3len+constant, self.go_outro)) + # time.sleep(mp3len+constant) + except Exception as e: + logger.info( + "Exception in news_report REPORT conference play: {},{}".format( + Exception, + e)) + # check on calls? + + #self.go_outro() def outro(self): logger.info("In OUTRO to news report {}".format(self.conference)) - - #log - #play outgoing music + # log + # play outgoing music + self.go_teardown() - # This among all others should be "blocking", i.e. how do we assure it has + # This among all others should be "blocking", i.e. how do we assure it has # executed before trying another show? - def teardown(self): logger.info("In TEARDOWN of news report {}".format(self.conference)) - #hang up calls if they have not been hung up + # hang up calls if they have not been hung up + # TODO remove all refs to plivo in this function, replace + # with station calls plivo = plivohelper.REST(REST_API_URL, SID, AUTH_TOKEN, API_VERSION) - hangup_call_params = {'RequestUUID' : self.RequestUUID} # CallUUID for Hangup + hangup_call_params = { + 'RequestUUID': self.RequestUUID} # CallUUID for Hangup try: result = plivo.hangup_call(hangup_call_params) logger.info(str(result)) - except Exception, e: + except Exception as e: logger.error('Failed to hangup in new_report', exc_info=True) - #clear conference + # clear conference - #clear is_master semaphore - if self.is_master == True: - r.set('is_master_'+str(self.episode_id),'none') - #return numbers to stack + # clear is_master semaphore + # TODO replace with station function + if self.is_master: + r.set('is_master_'+str(self.episode_id), 'none') + # return numbers to stack top_gateway = self.station.outgoing_gateways[0] - if top_gateway>0: + if top_gateway > 0: logger.info("Returning phone numbers to stack.") - r.lrem('outgoing_busy', 0, self.fnumber) # remove all instances for somenumber - r.rpush('outgoing_unused', self.fnumber) # add them back to the queue - + # TODO replace with call to station function + r.lrem( + 'outgoing_busy', + 0, + self.fnumber) # remove all instances for somenumber + r.rpush( + 'outgoing_unused', + self.fnumber) # add them back to the queue # Set up states - state('setup',enter=setup) - state('intro',enter=intro) - state('report',enter=report) - state('outro',enter=outro) - state('teardown',enter=teardown) - # Set up transitions, note they expect serial progression except for teardown + state('setup', enter=setup) + state('intro', enter=intro) + state('report', enter=report) + state('outro', enter=outro) + state('teardown', enter=teardown) + # Set up transitions, note they expect serial progression except for + # teardown transition(from_='setup', event='go_intro', to='intro') transition(from_='intro', event='go_report', to='report') transition(from_='report', event='go_outro', to='outro') - transition(from_=['outro','report','intro','setup'], event='go_teardown', to='teardown') - - - - - - + transition( + from_=[ + 'outro', + 'report', + 'intro', + 'setup'], + event='go_teardown', + to='teardown') diff --git a/program_utils.py b/program_utils.py new file mode 100644 index 0000000..2ddff98 --- /dev/null +++ b/program_utils.py @@ -0,0 +1,52 @@ +"""Program Utilities + Tools available to programs +""" + +from datetime import datetime, timedelta + + +def get_redis_connection(): + """Pass a redis url and receive an active redis object -- should be called once on program creation. + """ + pass + +def is_master(): + """Pass a redis url and one's own info, and receive a boolean. Should check for type of passed url to make sure it is redis, allowing other mechanisms in the future. + State of mastery should be kept in program object from then on. + """ + pass + +def sleep_advance_on_wake(scheduler, sleep_time, wake_function): + execdate = datetime.now()+timedelta(seconds=sleep_time) + print "Function = {}".format(wake_function) + job = scheduler.add_date_job(wake_function,execdate) + return job + + +def connect_caller(): + pass + + +def disconnect_caller(): + pass + + +def get_number_of_waiting_callers(): + pass + + +def play_message(): + pass + +#these should be station functions, no? +def call_transmitter(): + pass + + +def disconnect_transmitter(): + pass + + + +def make_self_master(): + pass diff --git a/requirements.txt b/requirements.txt index 0505a05..7ce64e8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,58 +1,81 @@ +alembic==0.6.0 +anyjson==0.3.3 +APScheduler==2.1.2 +argparse==1.2.1 Babel==1.3 +backports.ssl-match-hostname==3.4.0.2 +beautifulsoup4==4.3.2 +BeautifulSoup==3.2.1 +blinker==1.3 +certifi==14.05.14 +coaster==0.4.2 +docflow==0.3.1 +ecdsa==0.10 Fabric==1.7.0 -Flask==0.10.1 +feedparser==5.1.3 Flask-Admin==1.0.8 +Flask-Assets==0.10 Flask-Babel==0.9 Flask-Cache==0.12 Flask-Login==0.2.6 Flask-Mail==0.9.0 Flask-OpenID==1.1.1 Flask-Restless==0.12.0 -Flask-SQLAlchemy==1.0 Flask-Script==0.5.3 +Flask-SQLAlchemy==1.0 Flask-Testing==0.4 Flask-WTF==0.9.1 +Flask==0.10.1 +FlexGet==1.2 +fluidity-sm==0.2.0 +html5lib==0.999 +ipython==1.1.0 +isodate==0.5.0 +itsdangerous==0.23 Jinja2==2.7.1 +jsonschema==2.4.0 Mako==0.8.1 +Markdown==2.4.1 MarkupSafe==0.18 -SQLAlchemy==0.8.2 -SQLAlchemy-Utils==0.19.0 -WTForms==1.0.4 -WTForms-Alchemy==0.7.12 -WTForms-Components==0.8.0 -Werkzeug==0.9.3 -Yapsy==1.10.2-pythons2n3 -alembic==0.6.0 -argparse==1.2.1 -blinker==1.3 -ecdsa==0.10 -ipython==1.1.0 -itsdangerous==0.23 +mutagen==1.24 nose==1.3.0 paramiko==1.12.0 +path.py==5.2 plivohelper==0.3.5 +plumbum==1.4.2 +progressbar==2.2 psycopg2==2.5.1 pycrypto==2.6.1 +PyExecJS==1.0.4 +Pygments==1.6 +pynzb==0.1.0 +PyRSS2Gen==1.1 python-dateutil==2.1 +python-magic==0.4.6 python-openid==2.2.5 +python-tvrage==0.4.1 pytz==2013.7 +PyYAML==3.11 +pyzmq==14.3.0 +readline==6.2.4.1 redis==2.8.0 requests==2.0.1 +rpyc==3.3.0 +semantic-version==2.3.0 +simplejson==3.5.2 six==1.4.1 speaklater==1.3 -twill==0.9 -wsgiref==0.1.2 -APScheduler==2.1.2 -BeautifulSoup==3.2.1 -Flask-Assets==0.9 -Markdown==2.4.1 -PyExecJS==1.0.4 -Pygments==1.6 +SQLAlchemy-Utils==0.22.1 +SQLAlchemy==0.8.2 +tmdb3==0.6.17 +toolz==0.7.0 +tornado==4.0.1 +twill==1.8.0 UgliPyJS==0.2.5 -coaster==0.4.2 -docflow==0.3.1 -isodate==0.5.0 -pyzmq==14.3.0 -semantic-version==2.3.0 -simplejson==3.5.2 webassets==0.9 +Werkzeug==0.9.3 +wsgiref==0.1.2 +WTForms-Alchemy==0.7.12 +WTForms-Components==0.8.0 +WTForms==1.0.4 +Yapsy==1.10.2-pythons2n3 diff --git a/sound_utils.py b/sound_utils.py new file mode 100644 index 0000000..8b1b0c3 --- /dev/null +++ b/sound_utils.py @@ -0,0 +1,23 @@ + + + + + +def soundLength(file): + """ Takes local file url, returns time in seconds. """ + import subprocess + + length_string = subprocess.check_output("""mp3info -p "%S" '{}'""".format(file), shell=True).strip('%') + length = sum(float(x) * 60 ** i for i,x in enumerate(reversed(length_string.split(":")))) + return length + +# Mutagen proved unable to correctly parse, remove this function if soundLength +# continues to work correctly. +def mutagenLength(file): + import mutagen + try: + file = mutagen.File(file) + return file.info.length #time in seconds + except Exception, e: + logger.error('Could not calculate mp3 length', exc_info=True) + return "error" diff --git a/station_daemon.py b/station_daemon.py index c8285c6..a9f58f7 100644 --- a/station_daemon.py +++ b/station_daemon.py @@ -1,7 +1,8 @@ -#SEE https://learning-0mq-with-pyzmq.readthedocs.org/en/latest/pyzmq/multisocket/tornadoeventloop.html -#for info on listeners +# SEE https://learning-0mq-with-pyzmq.readthedocs.org/en/ +# latest/pyzmq/multisocket/tornadoeventloop.html +# for info on listeners from config import * @@ -37,46 +38,64 @@ logger = init_logging('station_daemon') + # Daemon class class StationDaemon(Station): + + """Docstring test.""" + def __init__(self, station_id): logger.info("Hello World") - self.gateway = 'sofia/gateway/utl' self.caller_queue = [] self.active_workers = [] + self.listener_reponses = { + "call": self.process_call, + "sms": self.process_sms, + "program": self.process_program, + "db": self.process_db, + "telephony_callback": self.process_telephony_callback, + "digits": self.process_digits, + } + try: - self.station = db.session.query(Station).filter(Station.id == station_id).one() - except Exception, e: + self.station = db.session.query(Station).filter( + Station.id == station_id).one() + except Exception as e: logger.error('Could not load one unique station', exc_info=True) - logger.info("Initializing station: {}".format(self.station.name)) - # This is for UTL outgoing ONLY. Should be moved to a utility just for the gateway, or such. + logger.info("Initializing station: {}".format(self.station.name)) + # This is for UTL outgoing ONLY. Should be moved to a utility just for + # the gateway, or such. try: - self.r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=OUTGOING_NUMBERS_REDIS_DB) - except Exception, e: + self.r = redis.StrictRedis( + host=REDIS_HOST, + port=REDIS_PORT, + db=OUTGOING_NUMBERS_REDIS_DB) + except Exception as e: logger.error('Could not open redis connection', exc_info=True) # INITIATE OUTGOING NUMBERS HERE # Hereafter, stations can do a r.rpoplpush('outgoing_unused','outgoing_busy') to get a number - # or a r.lrem('outgoing_busy', 0, somenumber) to return it -- SHOULD be atomic :( - while self.r.rpop('outgoing_unused') != None: + # or a r.lrem('outgoing_busy', 0, somenumber) to return it -- SHOULD be + # atomic :( + while self.r.rpop('outgoing_unused') is not None: pass - while self.r.rpop('outgoing_busy') != None: + while self.r.rpop('outgoing_busy') is not None: pass for i in range(OUTGOING_NUMBER_BOTTOM, OUTGOING_NUMBER_TOP+1): self.r.rpush('outgoing_unused', '0'+str(i)) # INITIATE IS_MASTER KEYS for k in self.r.keys('is_master_*'): - self.r.set(k,'none') + self.r.set(k, 'none') # start listeners self.start_listeners() ######################################################################## -# Listeners for messages on calls, sms, +# Listeners for messages on calls, sms, # program changes, and db updates ######################################################################## - # Listener function, running + # Listener function, running def listener(self, channel, function): port = MESSAGE_QUEUE_PORT_WEB context = zmq.Context() @@ -85,61 +104,85 @@ def listener(self, channel, function): socket_sub.setsockopt(zmq.SUBSCRIBE, str(channel)) stream_sub = zmqstream.ZMQStream(socket_sub) stream_sub.on_recv(function) - print "Connected to publisher with port %s" % port + logger.info("Station daemon listener connected to publisher with port {}".format(port)) ioloop.IOLoop.instance().start() - print "Worker has stopped processing messages." + logger.info("Station daemon listener has stopped processing messages.") - #https://learning-0mq-with-pyzmq.readthedocs.org/en/latest/pyzmq/multisocket/tornadoeventloop.html + # https://learning-0mq-with-pyzmq.readthedocs.org/en/latest/pyzmq/multisocket/tornadoeventloop.html def start_listeners(self): - call_listener = Process(target=self.listener, args=(str('station.'+str(self.station.id)+'.call'), self.process_call)) - call_listener.start() - self.active_workers.append(call_listener) - - sms_listener = Process(target=self.listener, args=(str('station.'+str(self.station.id)+'.sms'), self.process_sms)) - sms_listener.start() - self.active_workers.append(sms_listener) - - program_listener = Process(target=self.listener, args=(str('station.'+str(self.station.id)+'.program'), self.process_program)) - program_listener.start() - self.active_workers.append(program_listener) + generic_listener = Process( + target=self.listener, + args=( + str('station.' + str(self.station.id) + '.'), + self.process_generic + ) + ) + generic_listener.start() + self.active_workers.append(generic_listener) + + def process_generic(self, msg): + logger.info("Processing message to station: {}".format(msg)) + message_body = json.loads(msg[1]) + logger.info("Message body: {}".format(message_body)) + message_topic = msg[0] + logger.info("Message topic: {}".format(message_topic)) - db_listener = Process(target=self.listener, args=(str('station.'+str(self.station.id)+'.db'), self.process_db)) - db_listener.start() - self.active_workers.append(db_listener) + try: + # message_specific is everything after station.number, as a list + message_specific = message_topic.split('.')[2:] + logger.info("Message specific: {}".format(message_specific)) + self.listener_reponses[ + message_specific[0]]( + message_specific, + message_body) + except Exception as e: + logger.error( + 'Could not jump to listener_response in station_daemon.process_generic', + exc_info=True) # respond to call-related messages - def process_call(self, msg): - logger.info("Processing call: {}".format(msg)) + def process_call(self, topic, body): + logger.info("Processing call: {}".format(body)) # respond to sms messages - def process_sms(self, msg): - logger.info("Processing sms: {}".format(msg)) + def process_sms(self, topic, body): + logger.info("Processing sms: {}".format(body)) # respond to program changes - def process_program(self, msg): - logger.info("Processing program: {}".format(msg)) + def process_program(self, topic, body): + logger.info("Processing program: {}".format(body)) import news_report - logger.info("for station {}".format(self.station.name)) - self.program = news_report.News(3, self.station) + logger.info("for station {}".format(self.station.name)) + self.program = news_report.News(3, self) # respond to db changes - def process_db(self, msg): - change_dict = json.loads(msg[1]) #pull payload from message + def process_db(self, topic, body): + change_dict = body logger.info("Processing db change: {}".format(change_dict)) - logger.info("about to test if conditions right to launch a program") - if (change_dict['operation'] == 'insert' or change_dict['operation'] == 'update') and isodate.parse_datetime(change_dict['start_time']) <= datetime.now(): - logger.info("We have successful conditions to launch a program!") - import news_report - self.program = news_report.News(3, self.station.id) - time.sleep(3) #this should really be a callback! see below for process_connected_transmitter() - self.program.report() - self.program.teardown() + logger.info("about to test if conditions right to launch a program") + if (change_dict['operation'] == 'insert' or change_dict['operation'] == 'update') and isodate.parse_datetime(change_dict['start_time']) <= datetime.now(): + logger.info("We have successful conditions to launch a program!") + import news_report + conf_name = "{}_{}".format(change_dict.get('obj_id'),change_dict.get('program_id')) + self.program = news_report.News(conf_name, self) # Chang to change_dict.get('obj_id')+'.'+change_dict.get('program_id') + # this should really be a callback! see below for + # process_connected_transmitter() + time.sleep(13) + # This used to call the difference sections of the news report + # but now they are daisy-chaining to each other + self.program.go_intro() # respond to successful connect to transmitter phone - def process_connected_transmitter(self, msg): - pass + def process_telephony_callback(self, topic, body): + pass + + # respond to interactive digits (probably from program host) + def process_digits(self, topic, body): + pass # self test of message server to see if daemons are receiving + + def test_receivers(): port = MESSAGE_QUEUE_PORT_WEB context = zmq.Context() @@ -147,9 +190,15 @@ def test_receivers(): socket.bind("tcp://*:%s" % port) while True: - message_topics = ['station.7.program', 'station.7.call','station.7.program', 'station.7.program','sms.station.6', 'call.station.6'] + message_topics = [ + 'station.7.program', + 'station.7.call', + 'station.7.program', + 'station.7.program', + 'sms.station.6', + 'call.station.6'] topic = message_topics[random.randrange(0, len(message_topics))] - dicked = {'this':'that',"if":'then', "1":1,"2":2} + dicked = {'this': 'that', "if": 'then', "1": 1, "2": 2} messagedata = json.dumps(dicked) print "%s %s" % (topic, messagedata) socket.send("%s %s" % (topic, messagedata)) @@ -158,7 +207,6 @@ def test_receivers(): # Silly launch of fake daemons #stations = db.session.query(Station).all() #daemons = [] -#for i in stations: +# for i in stations: # daemons.append(StationDaemon(i.id)) -#test_receivers() - +# test_receivers() diff --git a/station_utils.py b/station_utils.py new file mode 100644 index 0000000..e69de29 diff --git a/telephony_server.py b/telephony_server.py index b72cbeb..2fa8279 100644 --- a/telephony_server.py +++ b/telephony_server.py @@ -32,7 +32,7 @@ telephony_server.config['SECRET_KEY'] = SECRET_KEY -#prep the socket type, address for zmq +# prep the socket type, address for zmq telephony_server.config['ZMQ_SOCKET_TYPE'] = zmq.PUB telephony_server.config['ZMQ_BIND_ADDR'] = ZMQ_FORWARDER_SPITS_OUT @@ -57,7 +57,6 @@ admin.add_view(ModelView(Gateway, db.session)) - def get_or_create(session, model, **kwargs): instance = session.query(model).filter_by(**kwargs).first() if instance: @@ -69,8 +68,8 @@ def get_or_create(session, model, **kwargs): return instance -def debug(request, url="url"): - logger.info( "#####ENTERING {0} #####".format(url)) +def debug(request, url="url"): + logger.info("#####ENTERING {0} #####".format(url)) if request.method == 'POST': deets = request.form.items() logger.info(str(deets)) @@ -90,100 +89,131 @@ def page_not_found(error): return 'This URL does not exist', 404 - def preload_caller(func): @wraps(func) def inner(*args, **kwargs): logger.info(""" ################################################### - # entering function: --------> {0} + # entering function: --------> {0} ###################################################""".format(func.func_name)) - # Separate request into a single dict called "parameters" to erase the difference between + # Separate request into a single dict called "parameters" to erase the difference between # get and post representations if request.method == 'POST': parameters = dict(request.form.items()) parameters['request_method'] = request.method - else: - parameters = dict(request.args.items()) + else: + parameters = dict(request.args.items()) parameters['request_method'] = request.method - # Print UUID and standardize it to uuid regardless, swap the original kwargs for our version - try: + # Print UUID and standardize it to uuid regardless, swap the original + # kwargs for our version + try: if parameters.get('uuid'): - logger.info(request.method + ", CallUUID: {0}".format(parameters['uuid'])) + logger.info( + request.method + + ", CallUUID: {0}".format( + parameters['uuid'])) else: - logger.info(request.method + ", CallUUID: {0}".format(parameters['CallUUID'])) - parameters['uuid'] = parameters['CallUUID'] + logger.info( + request.method + + ", CallUUID: {0}".format( + parameters['CallUUID'])) + parameters['uuid'] = parameters['CallUUID'] # Here's where we swap the original kwargs for our version kwargs['parameters'] = parameters - except Exception, e: + except Exception as e: logger.error('Failed to get uuid', exc_info=True) pass - # Handle SMS in, different from calls if func.func_name == 'sms_in': m = Message() m.message_uuid = parameters.get('uuid') m.sendtime = parameters.get('edt') m.text = parameters.get('body') - m.from_phonenumber_id = get_or_create(db.session, PhoneNumber, raw_number=parameters.get('from_number'), number=parameters.get('from_number')).id - m.to_phonenumber_id = get_or_create(db.session, PhoneNumber, raw_number=parameters.get('fr'), number=parameters.get('fr')).id + m.from_phonenumber_id = get_or_create( + db.session, + PhoneNumber, + raw_number=parameters.get('from_number'), + number=parameters.get('from_number')).id + m.to_phonenumber_id = get_or_create( + db.session, + PhoneNumber, + raw_number=parameters.get('fr'), + number=parameters.get('fr')).id logger.info("about to commit {}".format(str(m.__dict__))) db.session.add(m) - db.session.commit() + db.session.commit kwargs['parameters']['Message_object_id'] = m.id else: - #todo, add fields to model for different call stages and times, like ringing, etc. - #TODO sent a message to a logger daemon rather than logging this directly, but perhaps increment a variable - #TODO no need to use get_or_create for anything but ringing + # todo, add fields to model for different call stages and times, like ringing, etc. + # TODO sent a message to a logger daemon rather than logging this directly, but perhaps increment a variable + # TODO no need to use get_or_create for anything but ringing if parameters.get('CallStatus') == 'ringing': - c = get_or_create(db.session, Call, call_uuid=parameters.get('CallUUID')) + c = get_or_create( + db.session, + Call, + call_uuid=parameters.get('CallUUID')) c.call_uuid = parameters.get('CallUUID') - c.start_time = datetime.now() - c.from_phonenumber_id = get_or_create(db.session, PhoneNumber, raw_number=parameters.get('From'), number=parameters.get('From')).id - c.to_phonenumber_id = get_or_create(db.session, PhoneNumber, raw_number=parameters.get('To'), number=parameters.get('To')).id + c.start_time = datetime.now() + c.from_phonenumber_id = get_or_create( + db.session, + PhoneNumber, + raw_number=parameters.get('From'), + number=parameters.get('From')).id + c.to_phonenumber_id = get_or_create( + db.session, + PhoneNumber, + raw_number=parameters.get('To'), + number=parameters.get('To')).id logger.info("about to commit {}".format(str(c.__dict__))) db.session.add(c) db.session.commit() kwargs['parameters']['Call_object_id'] = c.id if parameters.get('CallStatus') == 'completed': - c = get_or_create(db.session, Call, call_uuid=parameters.get('CallUUID')) + c = get_or_create( + db.session, + Call, + call_uuid=parameters.get('CallUUID')) kwargs['parameters']['Call_object_id'] = c.id if parameters.get('CallStatus') == 'completed': - c = get_or_create(db.session, Call, call_uuid=parameters.get('CallUUID')) - c.end_time = datetime.now() - logger.info("about to commit {}".format(str(c.__dict__))) - db.session.add(c) + c = get_or_create( + db.session, + Call, + call_uuid=parameters.get('CallUUID')) + c.end_time = datetime.now() + logger.info("about to commit {}".format(str(c.__dict__))) + db.session.add(c) db.session.commit() - - logger.info("Returning Parameters = {}".format(str(kwargs['parameters']))) + + logger.info( + "Returning Parameters = {}".format(str(kwargs['parameters']))) return func(*args, **kwargs) return inner - - -@telephony_server.route('/sms/in', methods=['GET', 'POST']) + + +@telephony_server.route('/sms/in', methods=['GET', 'POST']) @preload_caller def sms_in(parameters): """Receive an sms - { 'uuid': uuid, - 'edt': edt, - 'fr': fr, - 'to': to, - 'from_number': from_number, + { 'uuid': uuid, + 'edt': edt, + 'fr': fr, + 'to': to, + 'from_number': from_number, 'body': body, - } + } """ - logger.info("Parameters =" + str(parameters)) - logger.info("We received an SMS") + logger.info("Parameters =" + str(parameters)) + logger.info("We received an SMS") logger.info(parameters['from_number']) - logger.info(str(parameters['from_number']) == SHOW_HOST) - #look at conferenceplay - #if parameters['from_number'] == SHOW_HOST or parameters['from_number'] == SHOW_HOST[2:]: + logger.info(str(parameters['from_number']) == SHOW_HOST) + # look at conferenceplay + # if parameters['from_number'] == SHOW_HOST or parameters['from_number'] == SHOW_HOST[2:]: # answered_url = "http://127.0.0.1:5000/answered/" - # utils.call("sofia/gateway/switch2voip/",parameters['from_number'], answered_url) - #else: #obviously the below would only happen with approval of host + # utils.call("sofia/gateway/switch2voip/",parameters['from_number'], answered_url) + # else: #obviously the below would only happen with approval of host # answered_url = "http://127.0.0.1:5000/answered/" - # utils.call("sofia/gateway/switch2voip/",parameters['from_number'], answered_url) + # utils.call("sofia/gateway/switch2voip/",parameters['from_number'], answered_url) return "OK" @@ -193,13 +223,15 @@ def waitmusic(): logger.info(str(request.form.items())) else: logger.info(str(request.args.items())) - - r = plivohelper.Response() + + r = plivohelper.Response() r.addPlay("/home/csik/public_html/sounds/programs/3/current.mp3") logger.info("RESTXML Response => {}".format(r)) - #return render_template('response_template.xml', response=r) + # return render_template('response_template.xml', response=r) return "OK" +# Not currently being used. + @telephony_server.route('/hostwait/', methods=['GET', 'POST']) def hostwait(): @@ -209,51 +241,68 @@ def hostwait(): logger.info(str(request.args.items())) r = plivohelper.Response() r.addPlay(TELEPHONY_SERVER_IP+"/~csik/sounds/english/Hello_Host.mp3") - r.addPlay(TELEPHONY_SERVER_IP+"/~csik/sounds/english/You_Have_X_Listeners.mp3") + r.addPlay( + TELEPHONY_SERVER_IP + + "/~csik/sounds/english/You_Have_X_Listeners.mp3") r.addPlay(TELEPHONY_SERVER_IP+"/~csik/sounds/english/Instructions.mp3") logger.info("RESTXML Response => {}".format(r)) return render_template('response_template.xml', response=r) -@telephony_server.route('/confer///', methods=['GET', 'POST']) -@preload_caller +@telephony_server.route( + '/confer///', + methods=[ + 'GET', + 'POST']) +@preload_caller def confer(parameters, schedule_program_id, action): # Post params- 'CallUUID': unique id of call, 'Direction': direction of call, # 'To': Number which was called, 'From': calling number, # If Direction is outbound then 2 additional params: # 'ALegUUID': Unique Id for first leg, # 'ALegRequestUUID': request id given at the time of api call - logger.info("confer is receiveing schedule_program_id: {} action: {}".format(schedule_program_id, action)) + logger.info( + "confer is receiveing schedule_program_id: {} action: {}".format( + schedule_program_id, + action)) if action == "ringing": - logger.info("Ringing for scheduled_program {}".format(schedule_program_id)) + logger.info( + "Ringing for scheduled_program {}".format(schedule_program_id)) return "OK" elif action == "heartbeat": - logger.info("Heartbeat for scheduled_program {}".format(schedule_program_id)) + logger.info( + "Heartbeat for scheduled_program {}".format(schedule_program_id)) return "OK" elif action == "hangup": - # THIS IS WHERE NUMBER IS TRANSFERRED FROM outgoing_busy TO outgoing_unused - logger.info("Hangup for scheduled_program {}".format(schedule_program_id)) + # THIS IS WHERE NUMBER IS TRANSFERRED FROM outgoing_busy TO + # outgoing_unused + logger.info( + "Hangup for scheduled_program {}".format(schedule_program_id)) return "OK" elif action == "answered": - logger.info("*Answered for scheduled_program {}".format(schedule_program_id)) - # This is where station daemons are contacted - r = plivohelper.Response() + logger.info( + "*Answered for scheduled_program {}".format(schedule_program_id)) + # This is where station daemons are contacted + r = plivohelper.Response() from_number = parameters.get('From') try: logger.info("url_for format = {}".format(url_for('confer_events'))) except Exception: logger.info("unable to get url_for") - p = r.addConference("plivo", - muted=False, - enterSound="beep:2", + # TODO: get conference name from time & episode + # TODO: get station number and contact it if addConference worked + + p = r.addConference(schedule_program_id, + muted=False, + enterSound="beep:2", exitSound="beep:1", - startConferenceOnEnter=True, + startConferenceOnEnter=True, endConferenceOnExit=False, - waitSound = ANSWERED+'waitmusic/', - timeLimit = 0, + waitSound=ANSWERED+'waitmusic/', + timeLimit=0, hangupOnStar=True, - callbackUrl=ANSWERED+'confer_events/', - callbackMethod="POST", + callbackUrl=ANSWERED+'confer_events/', + callbackMethod="POST", digitsMatch="#9,#7,#8,7,8,9", ) logger.info("RESTXML Response => {}".format(r)) @@ -262,56 +311,73 @@ def confer(parameters, schedule_program_id, action): logger.info("Could not recognize plivo url variable") return "OK" + @telephony_server.route('/confer_events/', methods=['POST']) -@preload_caller -def confer_events(parameters): +@preload_caller +def confer_events(parameters): if parameters.get('ConferenceDigitsMatch'): - logger.info("Received a digit in conference:{}".format(parameters.get('ConferenceDigitsMatch'))) + logger.info( + "Received a digit in conference:{}".format( + parameters.get('ConferenceDigitsMatch'))) return "OK" -# This function should pretty much only be invoked for unsolicited calls +# This function should pretty much only be invoked for unsolicited calls @telephony_server.route('/answered/', methods=['GET', 'POST']) @telephony_server.route('/ringing/', methods=['GET', 'POST']) @telephony_server.route('/heartbeat/', methods=['GET', 'POST']) @telephony_server.route('/hangup/', methods=['GET', 'POST']) -@preload_caller +@preload_caller def root(parameters): logger.info("Request.path:{}".format(request.path)) debug(request, "root") - + if request.path == "/heartbeat/": - logger.info("Heartbeat for call from {0} to {1}".format(parameters.get('From'), parameters.get('To'))) + logger.info( + "Heartbeat for call from {0} to {1}".format( + parameters.get('From'), + parameters.get('To'))) return "OK" elif parameters.get('CallStatus') == "completed": - logger.info("Hangup of call from {0} to {1}".format(parameters.get('From'), parameters.get('To'))) + logger.info( + "Hangup of call from {0} to {1}".format( + parameters.get('From'), + parameters.get('To'))) return "OK" elif request.path == "/answered/": if parameters.get('CallStatus') == "ringing": - #Check to see if incoming call is TO a station's cloud number -- the public number of the station - phone_id = db.session.query(PhoneNumber).filter(PhoneNumber.raw_number==parameters.get('To')).one().id - station = db.session.query(Station).filter(Station.cloud_phone_id==phone_id).first() + # Check to see if incoming call is TO a station's cloud number -- + # the public number of the station + phone_id = db.session.query(PhoneNumber).filter( + PhoneNumber.raw_number == parameters.get('To')).one().id + station = db.session.query(Station).filter( + Station.cloud_phone_id == phone_id).first() if station: logger.info("Received call from cloud number:") - logger.info("Station: {}, Number:{}".format(station.name, station.cloud_phone.raw_number)) + logger.info( + "Station: {}, Number:{}".format( + station.name, + station.cloud_phone.raw_number)) logger.info("Choosing to not answer") - #should send to relevant station now.... + # should send to relevant station now.... topic = "station.{}.call".format(station.id) - from_id = db.session.query(PhoneNumber).filter(PhoneNumber.raw_number==parameters.get('From')).one().id - messagedata = { - "type":"call", - "from":parameters.get('From'), - "from_id":from_id, - "time":parameters.get('start_time'), + from_id = db.session.query(PhoneNumber).filter( + PhoneNumber.raw_number == parameters.get('From')).one().id + messagedata = { + "type": "call", + "from": parameters.get('From'), + "from_id": from_id, + "time": parameters.get('start_time'), } - #send this to Josh's dispatcher - from telephony_server import telephony_server - telephony_server.config['ZMQ_SOCKET_TYPE']=zmq.PUB - telephony_server.config['ZMQ_BIND_ADDR']=ZMQ_FORWARDER_SUCKS_IN + # send this to Josh's dispatcher + from .telephony_server import telephony_server + telephony_server.config['ZMQ_SOCKET_TYPE'] = zmq.PUB + telephony_server.config[ + 'ZMQ_BIND_ADDR'] = ZMQ_FORWARDER_SUCKS_IN if not telephony_server.extensions.get('zmq'): try: - z=ZMQ(telephony_server) - #send crap message to initiate socket + z = ZMQ(telephony_server) + # send crap message to initiate socket z.send('test me') except: print "address already taken" @@ -323,29 +389,40 @@ def root(parameters): else: logger.info("Received call from non-cloud number") return "OK" - logger.info("Ringing call from {0} to {1}".format(parameters.get('From'), parameters.get('To'))) + logger.info( + "Ringing call from {0} to {1}".format( + parameters.get('From'), + parameters.get('To'))) - # Not ringing means it is answered - else: + # Not ringing means it is answered + else: # This is where station daemons are contacted - r = plivohelper.Response() + r = plivohelper.Response() from_number = parameters.get('From') - p = r.addConference("plivo", muted=False, - enterSound="beep:2", exitSound="beep:1", - startConferenceOnEnter=True, endConferenceOnExit=False, - waitSound = ANSWERED+'waitmusic/', - timeLimit = 0, hangupOnStar=True) + p = r.addConference( + "plivo", + muted=False, + enterSound="beep:2", + exitSound="beep:1", + startConferenceOnEnter=True, + endConferenceOnExit=False, + waitSound=ANSWERED + + 'waitmusic/', + timeLimit=0, + hangupOnStar=True) logger.info("RESTXML Response => {}".format(r)) return render_template('response_template.xml', response=r) else: logger.info("Could not recognize plivo url variable") return "OK" + def main(): plugins() return -def plugins(): + +def plugins(): # Load the plugins from the plugin directory. manager = PluginManager() manager.setPluginPlaces(["plugins"]) @@ -359,7 +436,7 @@ def plugins(): if __name__ == '__main__': if not os.path.isfile("templates/response_template.xml"): - logger.info("Error : Can't find the XML template : templates/response_template.xml") + logger.info( + "Error : Can't find the XML template : templates/response_template.xml") else: telephony_server.run(host='127.0.0.1', port=5000) - diff --git a/utils.py b/utils.py index 4045994..d4ac5c2 100644 --- a/utils.py +++ b/utils.py @@ -4,7 +4,7 @@ from config import * -ANSWERED = 'http://127.0.0.1:5000/' +ANSWERED = 'http://127.0.0.1:5000/' SOUNDS_ROOT = 'http://176.58.125.166/~csik/sounds/' EXTRA_DIAL_STRING = "bridge_early_media=true,hangup_after_bridge=true" @@ -13,18 +13,18 @@ def init_logging(file): import logging logger = logging.getLogger(file) logger.setLevel(logging.INFO) - + # create a file handler handler = logging.FileHandler('logs/telephony.log', mode='a') handler.setLevel(logging.INFO) ch = logging.StreamHandler() ch.setLevel(logging.INFO) - + # create a logging format formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) ch.setFormatter(formatter) - + # add the handlers to the logger logger.addHandler(handler) logger.addHandler(ch) @@ -74,19 +74,19 @@ def call(to_number, from_number, gateway, answered=ANSWERED,extra_dial_string=EX logger.error('Failed to make utils.call', exc_info=True) pass return "Error" - + # ONLY CONNECTS FIRST SUCCESSFUL CONNECTION -def group_call(gateway, phone_numbers, answered): +def group_call(gateway, phone_numbers, answered): print "gateway = "+gateway print "phone_numbers = "+phone_numbers print "answered = "+answered # Define Channel Variable - http://wiki.freeswitch.org/wiki/Channel_Variables extra_dial_string = "bridge_early_media=true,hangup_after_bridge=true" - + # Create a REST object plivo = plivohelper.REST(REST_API_URL, SID, AUTH_TOKEN, API_VERSION) - + # Initiate a new outbound call to user/1000 using a HTTP POST # All parameters for bulk calls shall be separated by a delimeter call_params = { @@ -123,11 +123,11 @@ def group_call(gateway, phone_numbers, answered): # CONNECTS MULTIPLES -def bulk_call(to_numbers, from_number, gateway, answered=ANSWERED,extra_dial_string=EXTRA_DIAL_STRING): +def bulk_call(to_numbers, from_number, gateway, answered=ANSWERED,extra_dial_string=EXTRA_DIAL_STRING): # Create a REST object plivo = plivohelper.REST(REST_API_URL, SID, AUTH_TOKEN, API_VERSION) - + # Initiate a new outbound call using a HTTP POST # All parameters for bulk calls shall be separated by a delimeter call_params = { @@ -161,12 +161,12 @@ def bulk_call(to_numbers, from_number, gateway, answered=ANSWERED,extra_dial_str except Exception, e: logger.error('Failed to make utils.bulkcall', exc_info=True) pass - return "Error" + return "Error" class ZMQ(object): - + def __init__(self, app=None): self.zmq = None self.app = None @@ -189,3 +189,4 @@ def __getattr__(self, attr): return getattr(self.zmq, attr) +