Skip to content

Commit

Permalink
Merge pull request #2175 from RTXteam/issue-2170
Browse files Browse the repository at this point in the history
Issue 2170
  • Loading branch information
saramsey authored Oct 19, 2023
2 parents 98ca705 + 3e5a235 commit 938a14a
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 72 deletions.
7 changes: 1 addition & 6 deletions code/ARAX/ARAXQuery/ARAX_background_tasker.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,7 @@ def run_tasks(self, config):
eprint("ERROR: NodeSynonymizer state is weird. "
f"file_counter: {file_counter} "
f"link_counter: {link_counter} "
"Recommend running the database_manager and restarting")
# try:
# subprocess.check_call( [ 'python3', node_synonymizer_path + "/../ARAXQuery/ARAX_database_manager.py" ] )
# except Exception as error:
# eprint(f"ERROR: Attempt to run database manager failed with {error}")

"Recommend restarting, which will rerun the database manager")

#### Check in on the databases directory
node_synonymizer_path = os.path.dirname(os.path.abspath(__file__)) + "/../NodeSynonymizer"
Expand Down
2 changes: 0 additions & 2 deletions code/ARAX/ARAXQuery/ARAX_expander.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!/bin/env python3
import asyncio
import copy
import logging
import pickle
import sys
import os
Expand Down Expand Up @@ -51,7 +50,6 @@ def trim_to_size(input_list, length):
class ARAXExpander:

def __init__(self):
self.logger = logging.getLogger('log')
self.bh = BiolinkHelper()
self.rtxc = RTXConfiguration()
self.plover_url = self.rtxc.plover_url
Expand Down
52 changes: 28 additions & 24 deletions code/UI/OpenAPI/python-flask-server/KG2/openapi_server/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@

import connexion
import flask_cors
import logging
import json
import openapi_server.encoder
import os
import sys
import signal
import atexit
import traceback
import setproctitle


def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs)

sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
Expand All @@ -22,9 +24,6 @@ def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs)
"/../../../..")
from RTXConfiguration import RTXConfiguration

# can change this to logging.DEBUG for debuggging
logging.basicConfig(level=logging.INFO)

child_pid = None


Expand All @@ -34,16 +33,15 @@ def receive_sigterm(signal_number, frame):
try:
os.kill(child_pid, signal.SIGKILL)
except ProcessLookupError:
logging.debug(f"child process {child_pid} is already gone; "
"exiting now")
eprint(f"child process {child_pid} is already gone; "
"exiting now")
sys.exit(0)
else:
assert False, "should not ever have child_pid be None here"


@atexit.register
def ignore_sigchld():
logging.debug("Setting SIGCHLD to SIG_IGN before exiting")
signal.signal(signal.SIGCHLD, signal.SIG_IGN)


Expand All @@ -52,19 +50,19 @@ def receive_sigchld(signal_number, frame):
while True:
try:
pid, _ = os.waitpid(-1, os.WNOHANG)
logging.debug(f"PID returned from call to os.waitpid: {pid}")
eprint(f"PID returned from call to os.waitpid: {pid}")
if pid == 0:
break
except ChildProcessError as e:
logging.debug(repr(e) +
"; this is expected if there are "
"no more child processes to reap")
eprint(repr(e) +
"; this is expected if there are "
"no more child processes to reap")
break


def receive_sigpipe(signal_number, frame):
if signal_number == signal.SIGPIPE:
logging.error("pipe error")
eprint("pipe error")


def main():
Expand All @@ -74,8 +72,6 @@ def main():
arguments={'title': 'RTX KG2 Translator KP'},
pythonic_params=True)
flask_cors.CORS(app.app)
signal.signal(signal.SIGCHLD, receive_sigchld)
signal.signal(signal.SIGPIPE, receive_sigpipe)

# Read any load configuration details for this instance
try:
Expand All @@ -88,34 +84,42 @@ def main():

dbmanager = ARAXDatabaseManager(allow_downloads=True)
try:
logging.info("Checking for complete databases")
eprint("Checking for complete databases")
if dbmanager.check_versions():
logging.warning("Databases incomplete; running update_databases")
eprint("Databases incomplete; running update_databases")
dbmanager.update_databases()
else:
logging.info("Databases seem to be complete")
eprint("Databases seem to be complete")
except Exception as e:
logging.error(traceback.format_exc())
eprint(traceback.format_exc())
raise e
del dbmanager

pid = os.fork()
if pid == 0: # I am the child process
sys.stdout = open('/dev/null', 'w')
sys.stdin = open('/dev/null', 'r')

logging.info("Starting background tasker in a child process")
ARAXBackgroundTasker().run_tasks(local_config)
setproctitle.setproctitle("python3 ARAX_background_tasker::run_tasks")
eprint("Starting background tasker in a child process")
try:
ARAXBackgroundTasker().run_tasks(local_config)
except Exception as e:
eprint("Error in ARAXBackgroundTasker.run_tasks()")
eprint(traceback.format_exc())
raise e
eprint("Background tasker child process ended unexpectedly")
elif pid > 0: # I am the parent process
# Start the service
logging.info(f"Background tasker is running in child process {pid}")
eprint(f"Background tasker is running in child process {pid}")
global child_pid
child_pid = pid
signal.signal(signal.SIGCHLD, receive_sigchld)
signal.signal(signal.SIGPIPE, receive_sigpipe)
signal.signal(signal.SIGTERM, receive_sigterm)
logging.info("Starting flask application in the parent process")
eprint("Starting flask application in the parent process")
app.run(port=local_config['port'], threaded=True)
else:
logging.error("[__main__]: fork() unsuccessful")
eprint("[__main__]: fork() unsuccessful")
assert False, "****** fork() unsuccessful in __main__"


Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import connexion, flask
import connexion
import flask
import json
import os, sys, signal
import os
import sys
import signal
import resource
import logging
import traceback
from typing import Iterable, Callable
import setproctitle


def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs)


rlimit_child_process_bytes = 34359738368 # 32 GiB

Expand All @@ -17,15 +24,19 @@

def child_receive_sigpipe(signal_number, frame):
if signal_number == signal.SIGPIPE:
logging.info("[query_controller]: child process detected a SIGPIPE; exiting python")
eprint("[query_controller]: child process detected a "
"SIGPIPE; exiting python")
os._exit(0)


def run_query_dict_in_child_process(query_dict: dict,
query_runner: Callable) -> Iterable[str]:
logging.debug("[query_controller]: Creating pipe and forking a child to handle the query")
eprint("[query_controller]: Creating pipe and "
"forking a child to handle the query")
read_fd, write_fd = os.pipe()

# always flush stdout and stderr before calling fork(); someone could have turned off auto-flushing and we don't want double-output
# always flush stdout and stderr before calling fork(); someone could have
# turned off auto-flushing and we don't want double-output
sys.stderr.flush()
sys.stdout.flush()

Expand All @@ -35,6 +46,7 @@ def run_query_dict_in_child_process(query_dict: dict,
sys.stdout = open('/dev/null', 'w') # parent and child process should not share the same stdout stream object
sys.stdin = open('/dev/null', 'r') # parent and child process should not share the same stdin stream object
os.close(read_fd) # child doesn't read from the pipe, it writes to it
setproctitle.setproctitle("python3 query_controller::run_query_dict_in_child_process")
resource.setrlimit(resource.RLIMIT_AS, (rlimit_child_process_bytes, rlimit_child_process_bytes)) # set a virtual memory limit for the child process
signal.signal(signal.SIGPIPE, child_receive_sigpipe) # get rid of signal handler so we don't double-print to the log on SIGPIPE error
signal.signal(signal.SIGCHLD, signal.SIG_IGN) # disregard any SIGCHLD signal in the child process
Expand All @@ -50,10 +62,10 @@ def run_query_dict_in_child_process(query_dict: dict,
os._exit(0)
elif pid > 0: # I am the parent process
os.close(write_fd) # the parent does not write to the pipe, it reads from it
logging.debug(f"[query_controller]: child process pid={pid}")
eprint(f"[query_controller]: child process pid={pid}")
read_fo = os.fdopen(read_fd, "r")
else:
logging.error("[query_controller]: fork() unsuccessful")
eprint("[query_controller]: fork() unsuccessful")
assert False, "********** fork() unsuccessful; something went very wrong *********"
return read_fo

Expand Down
51 changes: 28 additions & 23 deletions code/UI/OpenAPI/python-flask-server/openapi_server/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@

import connexion
import flask_cors
import logging
import json
import openapi_server.encoder
import os
import sys
import signal
import atexit
import traceback
import setproctitle


def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs)

sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
Expand All @@ -22,8 +24,6 @@ def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs)
"/../../../..")
from RTXConfiguration import RTXConfiguration

# can change this to logging.DEBUG for debuggging
logging.basicConfig(level=logging.INFO)

child_pid = None

Expand All @@ -34,16 +34,15 @@ def receive_sigterm(signal_number, frame):
try:
os.kill(child_pid, signal.SIGKILL)
except ProcessLookupError:
logging.debug(f"child process {child_pid} is already gone; "
"exiting now")
eprint(f"child process {child_pid} is already gone; "
"exiting now")
sys.exit(0)
else:
assert False, "should not ever have child_pid be None here"


@atexit.register
def ignore_sigchld():
logging.debug("Setting SIGCHLD to SIG_IGN before exiting")
signal.signal(signal.SIGCHLD, signal.SIG_IGN)


Expand All @@ -52,19 +51,19 @@ def receive_sigchld(signal_number, frame):
while True:
try:
pid, _ = os.waitpid(-1, os.WNOHANG)
logging.debug(f"PID returned from call to os.waitpid: {pid}")
eprint(f"PID returned from call to os.waitpid: {pid}")
if pid == 0:
break
except ChildProcessError as e:
logging.debug(repr(e) +
"; this is expected if there are "
"no more child processes to reap")
eprint(repr(e) +
"; this is expected if there are "
"no more child processes to reap")
break


def receive_sigpipe(signal_number, frame):
if signal_number == signal.SIGPIPE:
logging.error("pipe error")
eprint("pipe error")


def main():
Expand All @@ -74,8 +73,6 @@ def main():
arguments={'title': 'ARAX Translator Reasoner'},
pythonic_params=True)
flask_cors.CORS(app.app)
signal.signal(signal.SIGCHLD, receive_sigchld)
signal.signal(signal.SIGPIPE, receive_sigpipe)

# Read any load configuration details for this instance
try:
Expand All @@ -88,34 +85,42 @@ def main():

dbmanager = ARAXDatabaseManager(allow_downloads=True)
try:
logging.info("Checking for complete databases")
eprint("Checking for complete databases")
if dbmanager.check_versions():
logging.warning("Databases incomplete; running update_databases")
eprint("Databases incomplete; running update_databases")
dbmanager.update_databases()
else:
logging.info("Databases seem to be complete")
eprint("Databases seem to be complete")
except Exception as e:
logging.error(traceback.format_exc())
eprint(traceback.format_exc())
raise e
del dbmanager

pid = os.fork()
if pid == 0: # I am the child process
sys.stdout = open('/dev/null', 'w')
sys.stdin = open('/dev/null', 'r')

logging.info("Starting background tasker in a child process")
ARAXBackgroundTasker().run_tasks(local_config)
setproctitle.setproctitle("python3 ARAX_background_tasker::run_tasks")
eprint("Starting background tasker in a child process")
try:
ARAXBackgroundTasker().run_tasks(local_config)
except Exception as e:
eprint("Error in ARAXBackgroundTasker.run_tasks()")
eprint(traceback.format_exc())
raise e
eprint("Background tasker child process ended unexpectedly")
elif pid > 0: # I am the parent process
# Start the service
logging.info(f"Background tasker is running in child process {pid}")
eprint(f"Background tasker is running in child process {pid}")
global child_pid
child_pid = pid
signal.signal(signal.SIGCHLD, receive_sigchld)
signal.signal(signal.SIGPIPE, receive_sigpipe)
signal.signal(signal.SIGTERM, receive_sigterm)
logging.info("Starting flask application in the parent process")
eprint("Starting flask application in the parent process")
app.run(port=local_config['port'], threaded=True)
else:
logging.error("[__main__]: fork() unsuccessful")
eprint("[__main__]: fork() unsuccessful")
assert False, "****** fork() unsuccessful in __main__"


Expand Down
Loading

0 comments on commit 938a14a

Please sign in to comment.