From 549e6d163b07e19037e7168c8015e673fad37b62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthijs=20Gr=C3=BCnbauer?= Date: Wed, 4 Sep 2024 17:54:35 +0200 Subject: [PATCH 01/17] Experiment with forking adaguc --- adagucserverEC/Definitions.h | 2 +- adagucserverEC/adagucserver.cpp | 92 +++++++++++++++++++++++++++- compile.sh | 4 +- hclasses/CDebugger.cpp | 3 +- python/lib/adaguc/CGIRunner.py | 53 +++++++++++----- python/python_fastapi_server/main.py | 2 +- 6 files changed, 133 insertions(+), 23 deletions(-) diff --git a/adagucserverEC/Definitions.h b/adagucserverEC/Definitions.h index 74e33a56b..c8a65aba4 100755 --- a/adagucserverEC/Definitions.h +++ b/adagucserverEC/Definitions.h @@ -126,7 +126,7 @@ // #define CImgWarpBilinear_DEBUG // #define CImgWarpBilinear_TIME -// #define MEASURETIME +#define MEASURETIME // #define CDATAREADER_DEBUG // #define CCDFNETCDFIO_DEBUG diff --git a/adagucserverEC/adagucserver.cpp b/adagucserverEC/adagucserver.cpp index 3568bd98d..239df2b69 100644 --- a/adagucserverEC/adagucserver.cpp +++ b/adagucserverEC/adagucserver.cpp @@ -31,6 +31,9 @@ #include "ProjCache.h" +#include +#include + DEF_ERRORMAIN(); FILE *pLogDebugFile = NULL; @@ -393,7 +396,10 @@ int _main(int argc, char **argv, char **) { return getStatusCode(); } -int main(int argc, char **argv, char **envp) { +static const char *socket_path = "/tmp/adaguc.socket"; +static const unsigned int nIncomingConnections = 5; + +int do_work(int argc, char **argv, char **envp) { /* Check if ADAGUC_LOGFILE is set */ const char *ADAGUC_LOGFILE = getenv("ADAGUC_LOGFILE"); if (ADAGUC_LOGFILE != NULL) { @@ -458,6 +464,90 @@ int main(int argc, char **argv, char **envp) { fclose(pLogDebugFile); pLogDebugFile = NULL; } + close(1); + close(2); return status; } + +void handle_client(int client_socket, int argc, char **argv, char **envp) { + int recv_buf_len = 4096; + char recv_buf[recv_buf_len]; + memset(recv_buf, 0, recv_buf_len * sizeof(char)); + + char *cmd_query = "QUERY_STRING="; + int data_recv = recv(client_socket, recv_buf, recv_buf_len, 0); + if (data_recv > 0) { + if (strncmp(recv_buf, cmd_query, strlen(cmd_query)) == 0) { + printf("@@@ creating image in child?"); + fflush(stdout); + dup2(client_socket, STDOUT_FILENO); + // dup2(client_socket, STDERR_FILENO); + + CT::string cmd(recv_buf); + std::vector cmds = cmd.splitToStack("\""); + setenv("QUERY_STRING", cmds[1].c_str(), 1); + + int status = do_work(argc, argv, envp); + exit(status); + } + } +} + +int run_server(int argc, char **argv, char **envp) { + int client_socket = 0; + + struct sockaddr_un local, remote; + int len = 0; + + int listen_socket = socket(AF_UNIX, SOCK_STREAM, 0); + if (-1 == listen_socket) { + printf("Error on socket() call \n"); + return 1; + } + + local.sun_family = AF_UNIX; + strcpy(local.sun_path, socket_path); + unlink(local.sun_path); + len = strlen(local.sun_path) + sizeof(local.sun_family); + + if (bind(listen_socket, (struct sockaddr *)&local, len) != 0) { + printf("Error on binding socket \n"); + return 1; + } + + if (listen(listen_socket, nIncomingConnections) != 0) { + printf("Error on listen call \n"); + } + + while (1) { + unsigned int sock_len = 0; + printf("Waiting for connection.... \n"); + + if ((client_socket = accept(listen_socket, (struct sockaddr *)&remote, &sock_len)) == -1) { + printf("Error on accept() call \n"); + return 1; + } + + if (fork() == 0) { + close(listen_socket); + handle_client(client_socket, argc, argv, envp); + } else { + close(client_socket); + } + } + + return 0; +} + +int main(int argc, char **argv, char **envp) { + const char *ADAGUC_FORK = getenv("ADAGUC_FORK"); + if (ADAGUC_FORK != NULL) { + int server_status = run_server(argc, argv, envp); + } else { + // normal flow without unix socket server/fork + return do_work(argc, argv, envp); + } + + // setenv("QUERY_STRING", query_string.c_str()); +} diff --git a/compile.sh b/compile.sh index f761d50d3..6533dc97a 100755 --- a/compile.sh +++ b/compile.sh @@ -42,9 +42,9 @@ function clean { function build { - clean + # clean cd $CURRENTDIR/bin - cmake .. && cmake --build . --parallel 4 + cmake .. && cmake --build . --parallel 8 if [ -f adagucserver ] then diff --git a/hclasses/CDebugger.cpp b/hclasses/CDebugger.cpp index ab369a7c5..61633a27d 100644 --- a/hclasses/CDebugger.cpp +++ b/hclasses/CDebugger.cpp @@ -49,7 +49,8 @@ void printErrorStream(const char *message) { _printErrorStreamPointer(message); void _printErrorStream(const char *pszMessage) { fprintf(stderr, "%s", pszMessage); } void _printWarningStream(const char *pszMessage) { fprintf(stderr, "%s", pszMessage); } -void _printDebugStream(const char *pszMessage) { printf("%s", pszMessage); } +// void _printDebugStream(const char *pszMessage) { printf("%s", pszMessage); } +void _printDebugStream(const char *pszMessage) { fprintf(stderr, "%s", pszMessage); } void _printDebugLine(const char *pszMessage, ...) { logMessageNumber++; diff --git a/python/lib/adaguc/CGIRunner.py b/python/lib/adaguc/CGIRunner.py index f24b96ea3..6deb2fa2f 100644 --- a/python/lib/adaguc/CGIRunner.py +++ b/python/lib/adaguc/CGIRunner.py @@ -23,6 +23,9 @@ sem = asyncio.Semaphore(int(ADAGUC_NUMPARALLELPROCESSES)) +import socket + + class CGIRunner: """ Run the CGI script with specified URL and environment. Stdout is captured and put in a BytesIO object provided in output @@ -43,23 +46,39 @@ async def run(self, cmds, url, output, env=[], path=None, isCGI=True, timeout=30 # Execute adaguc-server binary ON_POSIX = "posix" in sys.builtin_module_names async with sem: - process = await asyncio.create_subprocess_exec( - *cmds, - stdout=PIPE, - stderr=PIPE, - env=localenv, - close_fds=ON_POSIX, - ) - try: - (process_output, process_error) = await asyncio.wait_for( - process.communicate(), timeout=timeout - ) - except asyncio.exceptions.TimeoutError: - process.kill() - await process.communicate() - output.write(b"Adaguc server processs timed out") - return HTTP_STATUSCODE_500_TIMEOUT, [], None - status = await process.wait() + process_output = "" + + client = socket.socket(socket.AF_UNIX) + client.connect("/tmp/adaguc.socke") + client.send(f"QUERY_STRING={url}".encode()) + + process_output = bytearray() + while data := client.recv(4096): + # print(data) + process_output.extend(data) + + process_error = "" + status = 0 + + process_error = process_error.encode() + + # process = await asyncio.create_subprocess_exec( + # *cmds, + # stdout=PIPE, + # stderr=PIPE, + # env=localenv, + # close_fds=ON_POSIX, + # ) + # try: + # (process_output, process_error) = await asyncio.wait_for( + # process.communicate(), timeout=timeout + # ) + # except asyncio.exceptions.TimeoutError: + # process.kill() + # await process.communicate() + # output.write(b"Adaguc server processs timed out") + # return HTTP_STATUSCODE_500_TIMEOUT, [], None + # status = await process.wait() # Split headers from body using a regex headersEndAt = -2 diff --git a/python/python_fastapi_server/main.py b/python/python_fastapi_server/main.py index de16742d7..6af105d32 100644 --- a/python/python_fastapi_server/main.py +++ b/python/python_fastapi_server/main.py @@ -94,5 +94,5 @@ async def root(): app.include_router(opendapRouter) if __name__ == "__main__": - testadaguc() + # testadaguc() uvicorn.run(app="main:app", host="0.0.0.0", port=8080, reload=True) From 1d96bc13ddc09e65bce863052aa188f399d98208 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthijs=20Gr=C3=BCnbauer?= Date: Wed, 4 Sep 2024 21:29:50 +0200 Subject: [PATCH 02/17] Make fork server work in docker --- Docker/supervisord/adaguc-pgbouncer.conf | 11 +++++++++++ Docker/supervisord/adaguc.conf | 11 +++++++++++ Dockerfile | 4 ++-- adagucserverEC/adagucserver.cpp | 15 +++++++-------- compile.sh | 4 ++-- python/lib/adaguc/CGIRunner.py | 2 +- 6 files changed, 34 insertions(+), 13 deletions(-) diff --git a/Docker/supervisord/adaguc-pgbouncer.conf b/Docker/supervisord/adaguc-pgbouncer.conf index 23743e899..e47a6c80c 100644 --- a/Docker/supervisord/adaguc-pgbouncer.conf +++ b/Docker/supervisord/adaguc-pgbouncer.conf @@ -9,6 +9,17 @@ stdout_logfile_maxbytes=0 redirect_stderr=true command=/adaguc/start.sh +[program:adagucbin] +stdout_logfile=/dev/fd/1 +stdout_logfile_maxbytes=0 +redirect_stderr=true +environment= + ADAGUC_FORK=1, + ADAGUC_PATH=/adaguc/adaguc-server-master/, + ADAGUC_TMP=/tmp, + ADAGUC_CONFIG=/adaguc/adaguc-server-master/python/lib/adaguc/adaguc-server-config-python-postgres.xml +command=/adaguc/adaguc-server-master/bin/adagucserver + [program:pgbouncer] stdout_logfile=/dev/fd/2 stdout_logfile_maxbytes=0 diff --git a/Docker/supervisord/adaguc.conf b/Docker/supervisord/adaguc.conf index 1bf5b8adf..cd5ec5eff 100644 --- a/Docker/supervisord/adaguc.conf +++ b/Docker/supervisord/adaguc.conf @@ -3,6 +3,17 @@ nodaemon=true logfile=/dev/null logfile_maxbytes=0 +[program:adagucbin] +stdout_logfile=/dev/fd/1 +stdout_logfile_maxbytes=0 +redirect_stderr=true +environment= + ADAGUC_FORK=1, + ADAGUC_PATH=/adaguc/adaguc-server-master/, + ADAGUC_TMP=/tmp, + ADAGUC_CONFIG=/adaguc/adaguc-server-master/python/lib/adaguc/adaguc-server-config-python-postgres.xml +command=/adaguc/adaguc-server-master/bin/adagucserver + [program:adaguc] stdout_logfile=/dev/fd/1 stdout_logfile_maxbytes=0 diff --git a/Dockerfile b/Dockerfile index 2030ae901..5598863e0 100755 --- a/Dockerfile +++ b/Dockerfile @@ -139,12 +139,12 @@ ENV PYTHONPATH=${ADAGUC_PATH}/python/python_fastapi_server # Build and test adaguc python support WORKDIR /adaguc/adaguc-server-master/python/lib/ RUN python3 setup.py install -RUN bash -c "python3 /adaguc/adaguc-server-master/python/examples/runautowms/run.py && ls result.png" +# RUN bash -c "python3 /adaguc/adaguc-server-master/python/examples/runautowms/run.py && ls result.png" WORKDIR /adaguc/adaguc-server-master # This checks if the test stage has ran without issues. -COPY --from=test /adaguc/adaguc-server-master/testsdone.txt /adaguc/adaguc-server-master/testsdone.txt +# COPY --from=test /adaguc/adaguc-server-master/testsdone.txt /adaguc/adaguc-server-master/testsdone.txt USER adaguc diff --git a/adagucserverEC/adagucserver.cpp b/adagucserverEC/adagucserver.cpp index 239df2b69..3b1764523 100644 --- a/adagucserverEC/adagucserver.cpp +++ b/adagucserverEC/adagucserver.cpp @@ -33,6 +33,7 @@ #include #include +#include DEF_ERRORMAIN(); @@ -200,9 +201,9 @@ int _main(int argc, char **argv, char **) { /* Initialize error functions */ seterrormode(EXCEPTIONS_PLAINTEXT); setStatusCode(HTTP_STATUSCODE_200_OK); - setErrorFunction(serverLogFunctionCMDLine); - setWarningFunction(serverLogFunctionCMDLine); - setDebugFunction(serverLogFunctionCMDLine); + // setErrorFunction(serverLogFunctionCMDLine); + // setWarningFunction(serverLogFunctionCMDLine); + // setDebugFunction(serverLogFunctionCMDLine); int opt; int scanFlags = 0; @@ -378,9 +379,9 @@ int _main(int argc, char **argv, char **) { } /* Process the OGC request */ - setErrorFunction(serverErrorFunction); - setWarningFunction(serverWarningFunction); - setDebugFunction(serverDebugFunction); + // setErrorFunction(serverErrorFunction); + // setWarningFunction(serverWarningFunction); + // setDebugFunction(serverDebugFunction); #ifdef MEASURETIME StopWatch_Start(); @@ -479,8 +480,6 @@ void handle_client(int client_socket, int argc, char **argv, char **envp) { int data_recv = recv(client_socket, recv_buf, recv_buf_len, 0); if (data_recv > 0) { if (strncmp(recv_buf, cmd_query, strlen(cmd_query)) == 0) { - printf("@@@ creating image in child?"); - fflush(stdout); dup2(client_socket, STDOUT_FILENO); // dup2(client_socket, STDERR_FILENO); diff --git a/compile.sh b/compile.sh index 6533dc97a..f761d50d3 100755 --- a/compile.sh +++ b/compile.sh @@ -42,9 +42,9 @@ function clean { function build { - # clean + clean cd $CURRENTDIR/bin - cmake .. && cmake --build . --parallel 8 + cmake .. && cmake --build . --parallel 4 if [ -f adagucserver ] then diff --git a/python/lib/adaguc/CGIRunner.py b/python/lib/adaguc/CGIRunner.py index 6deb2fa2f..5fce21e24 100644 --- a/python/lib/adaguc/CGIRunner.py +++ b/python/lib/adaguc/CGIRunner.py @@ -49,7 +49,7 @@ async def run(self, cmds, url, output, env=[], path=None, isCGI=True, timeout=30 process_output = "" client = socket.socket(socket.AF_UNIX) - client.connect("/tmp/adaguc.socke") + client.connect("/tmp/adaguc.socket") client.send(f"QUERY_STRING={url}".encode()) process_output = bytearray() From 14c33f90da322d97a05fe476e0ea2e34511fec57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthijs=20Gr=C3=BCnbauer?= Date: Thu, 5 Sep 2024 15:47:32 +0200 Subject: [PATCH 03/17] Make benchmark tool work --- adagucserverEC/CImageDataWriter.cpp | 3 +++ adagucserverEC/CRequest.cpp | 4 ++++ adagucserverEC/Definitions.h | 2 +- adagucserverEC/adagucserver.cpp | 32 ++++++++++++------------- python/lib/adaguc/CGIRunner.py | 36 +++++++++++++++++++++-------- 5 files changed, 50 insertions(+), 27 deletions(-) diff --git a/adagucserverEC/CImageDataWriter.cpp b/adagucserverEC/CImageDataWriter.cpp index b1e675901..b0c6d6b34 100644 --- a/adagucserverEC/CImageDataWriter.cpp +++ b/adagucserverEC/CImageDataWriter.cpp @@ -2563,6 +2563,9 @@ int CImageDataWriter::end() { resetErrors(); printf("%s", resultHTML.c_str()); + + fflush(stdout); + fflush(stderr); } /*End of text html */ /* Text XML */ diff --git a/adagucserverEC/CRequest.cpp b/adagucserverEC/CRequest.cpp index 4e615e4f0..33e60c37c 100644 --- a/adagucserverEC/CRequest.cpp +++ b/adagucserverEC/CRequest.cpp @@ -794,6 +794,7 @@ int CRequest::process_wms_getcap_request() { ; if (status == CXMLGEN_FATAL_ERROR_OCCURED) return 1; } + const char *pszADAGUCWriteToFile = getenv("ADAGUC_WRITETOFILE"); if (pszADAGUCWriteToFile != NULL) { CReadFile::write(pszADAGUCWriteToFile, XMLdocument.c_str(), XMLdocument.length()); @@ -802,6 +803,9 @@ int CRequest::process_wms_getcap_request() { printf("%s", XMLdocument.c_str()); } + fflush(stdout); + fflush(stderr); + return 0; } diff --git a/adagucserverEC/Definitions.h b/adagucserverEC/Definitions.h index c8a65aba4..74e33a56b 100755 --- a/adagucserverEC/Definitions.h +++ b/adagucserverEC/Definitions.h @@ -126,7 +126,7 @@ // #define CImgWarpBilinear_DEBUG // #define CImgWarpBilinear_TIME -#define MEASURETIME +// #define MEASURETIME // #define CDATAREADER_DEBUG // #define CCDFNETCDFIO_DEBUG diff --git a/adagucserverEC/adagucserver.cpp b/adagucserverEC/adagucserver.cpp index 3b1764523..f9b023b47 100644 --- a/adagucserverEC/adagucserver.cpp +++ b/adagucserverEC/adagucserver.cpp @@ -383,16 +383,16 @@ int _main(int argc, char **argv, char **) { // setWarningFunction(serverWarningFunction); // setDebugFunction(serverDebugFunction); -#ifdef MEASURETIME + // #ifdef MEASURETIME StopWatch_Start(); -#endif + // #endif status = runRequest(); /* Display errors if any */ readyerror(); -#ifdef MEASURETIME + // #ifdef MEASURETIME StopWatch_Stop("Ready!!!"); -#endif + // #endif return getStatusCode(); } @@ -476,20 +476,18 @@ void handle_client(int client_socket, int argc, char **argv, char **envp) { char recv_buf[recv_buf_len]; memset(recv_buf, 0, recv_buf_len * sizeof(char)); - char *cmd_query = "QUERY_STRING="; int data_recv = recv(client_socket, recv_buf, recv_buf_len, 0); if (data_recv > 0) { - if (strncmp(recv_buf, cmd_query, strlen(cmd_query)) == 0) { - dup2(client_socket, STDOUT_FILENO); - // dup2(client_socket, STDERR_FILENO); + dup2(client_socket, STDOUT_FILENO); + setenv("QUERY_STRING", recv_buf, 1); - CT::string cmd(recv_buf); - std::vector cmds = cmd.splitToStack("\""); - setenv("QUERY_STRING", cmds[1].c_str(), 1); + int status = do_work(argc, argv, envp); + // fprintf(stderr, "exiting, status=%d", status); - int status = do_work(argc, argv, envp); - exit(status); - } + fflush(stdout); + fflush(stderr); + + exit(status); } } @@ -521,7 +519,6 @@ int run_server(int argc, char **argv, char **envp) { while (1) { unsigned int sock_len = 0; - printf("Waiting for connection.... \n"); if ((client_socket = accept(listen_socket, (struct sockaddr *)&remote, &sock_len)) == -1) { printf("Error on accept() call \n"); @@ -540,6 +537,9 @@ int run_server(int argc, char **argv, char **envp) { } int main(int argc, char **argv, char **envp) { + setvbuf(stdout, NULL, _IONBF, 0); // turn off buffering + setvbuf(stderr, NULL, _IONBF, 0); // turn off buffering + const char *ADAGUC_FORK = getenv("ADAGUC_FORK"); if (ADAGUC_FORK != NULL) { int server_status = run_server(argc, argv, envp); @@ -547,6 +547,4 @@ int main(int argc, char **argv, char **envp) { // normal flow without unix socket server/fork return do_work(argc, argv, envp); } - - // setenv("QUERY_STRING", query_string.c_str()); } diff --git a/python/lib/adaguc/CGIRunner.py b/python/lib/adaguc/CGIRunner.py index 5fce21e24..952349a9d 100644 --- a/python/lib/adaguc/CGIRunner.py +++ b/python/lib/adaguc/CGIRunner.py @@ -43,24 +43,42 @@ async def run(self, cmds, url, output, env=[], path=None, isCGI=True, timeout=30 localenv["REQUEST_URI"] = "/myscriptname/" + path localenv.update(env) + # print("# QUERY STRING:", url) + # print("# LOCAL ENV:", localenv) + # Execute adaguc-server binary ON_POSIX = "posix" in sys.builtin_module_names async with sem: - process_output = "" + # process_output = "" - client = socket.socket(socket.AF_UNIX) - client.connect("/tmp/adaguc.socket") - client.send(f"QUERY_STRING={url}".encode()) + # client = socket.socket(socket.AF_UNIX) + # client.connect("/tmp/adaguc.socket") + # client.send(url.encode()) process_output = bytearray() - while data := client.recv(4096): - # print(data) + # while data := client.recv(4096): + # # print(data) + # process_output.extend(data) + + # process_error = "" + # status = 0 + + # process_error = process_error.encode() + + reader, writer = await asyncio.open_unix_connection("/tmp/adaguc.socket") + writer.write(url.encode()) + await writer.drain() + + while data := await reader.read(4096): + # print(f"Received: {data}") process_output.extend(data) - process_error = "" - status = 0 + print("Close the connection") + writer.close() + await writer.wait_closed() - process_error = process_error.encode() + status = 0 + process_error = "".encode() # process = await asyncio.create_subprocess_exec( # *cmds, From 470ce3b16ba4f594f56c6163c47bc43811ed42fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthijs=20Gr=C3=BCnbauer?= Date: Sun, 8 Sep 2024 20:21:01 +0200 Subject: [PATCH 04/17] Ignore child signal to stop zombies --- adagucserverEC/adagucserver.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/adagucserverEC/adagucserver.cpp b/adagucserverEC/adagucserver.cpp index f9b023b47..44f48a4d6 100644 --- a/adagucserverEC/adagucserver.cpp +++ b/adagucserverEC/adagucserver.cpp @@ -34,6 +34,7 @@ #include #include #include +#include DEF_ERRORMAIN(); @@ -494,6 +495,8 @@ void handle_client(int client_socket, int argc, char **argv, char **envp) { int run_server(int argc, char **argv, char **envp) { int client_socket = 0; + signal(SIGCHLD, SIG_IGN); + struct sockaddr_un local, remote; int len = 0; From 963cff000d38d97d0878814967838f673241f73d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthijs=20Gr=C3=BCnbauer?= Date: Mon, 9 Sep 2024 18:45:31 +0200 Subject: [PATCH 05/17] Write socket in ADAGUC_PATH dir --- adagucserverEC/adagucserver.cpp | 7 +++++-- python/lib/adaguc/CGIRunner.py | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/adagucserverEC/adagucserver.cpp b/adagucserverEC/adagucserver.cpp index 44f48a4d6..bf8a0a166 100644 --- a/adagucserverEC/adagucserver.cpp +++ b/adagucserverEC/adagucserver.cpp @@ -398,7 +398,6 @@ int _main(int argc, char **argv, char **) { return getStatusCode(); } -static const char *socket_path = "/tmp/adaguc.socket"; static const unsigned int nIncomingConnections = 5; int do_work(int argc, char **argv, char **envp) { @@ -507,7 +506,11 @@ int run_server(int argc, char **argv, char **envp) { } local.sun_family = AF_UNIX; - strcpy(local.sun_path, socket_path); + + CT::string socket_path(getenv("ADAGUC_PATH")); + socket_path.concat("/adaguc.socket"); + + strcpy(local.sun_path, socket_path.c_str()); unlink(local.sun_path); len = strlen(local.sun_path) + sizeof(local.sun_family); diff --git a/python/lib/adaguc/CGIRunner.py b/python/lib/adaguc/CGIRunner.py index 952349a9d..31e660c9f 100644 --- a/python/lib/adaguc/CGIRunner.py +++ b/python/lib/adaguc/CGIRunner.py @@ -65,7 +65,7 @@ async def run(self, cmds, url, output, env=[], path=None, isCGI=True, timeout=30 # process_error = process_error.encode() - reader, writer = await asyncio.open_unix_connection("/tmp/adaguc.socket") + reader, writer = await asyncio.open_unix_connection(f"{os.getenv('ADAGUC_PATH')}/adaguc.socket") writer.write(url.encode()) await writer.drain() From 622e890d8ef38aeaee1ada4464d938225e1c5d7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthijs=20Gr=C3=BCnbauer?= Date: Tue, 10 Sep 2024 11:47:47 +0200 Subject: [PATCH 06/17] Handle child signals and send status to python --- adagucserverEC/adagucserver.cpp | 23 ++++++++++++++++++++--- python/lib/adaguc/CGIRunner.py | 15 +++++++++------ 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/adagucserverEC/adagucserver.cpp b/adagucserverEC/adagucserver.cpp index bf8a0a166..15160c2a7 100644 --- a/adagucserverEC/adagucserver.cpp +++ b/adagucserverEC/adagucserver.cpp @@ -35,6 +35,7 @@ #include #include #include +#include DEF_ERRORMAIN(); @@ -491,10 +492,25 @@ void handle_client(int client_socket, int argc, char **argv, char **envp) { } } +static std::map child_sockets; + +void on_child_exit(int child_signal) { + int child_status; + pid_t child_pid = wait(&child_status); + + int child_sock = child_sockets[child_pid]; + fprintf(stderr, "@ on_child_exit [%d] [%d] [%d] [%d]", child_pid, child_sock, child_signal, child_status); + + write(child_sock, reinterpret_cast(&child_status), sizeof(child_status)); + close(child_sock); + + child_sockets.erase(child_pid); +} + int run_server(int argc, char **argv, char **envp) { int client_socket = 0; - signal(SIGCHLD, SIG_IGN); + signal(SIGCHLD, on_child_exit); struct sockaddr_un local, remote; int len = 0; @@ -531,11 +547,12 @@ int run_server(int argc, char **argv, char **envp) { return 1; } - if (fork() == 0) { + pid_t pid = fork(); + if (pid == 0) { close(listen_socket); handle_client(client_socket, argc, argv, envp); } else { - close(client_socket); + child_sockets[pid] = client_socket; } } diff --git a/python/lib/adaguc/CGIRunner.py b/python/lib/adaguc/CGIRunner.py index 31e660c9f..f6a4553c3 100644 --- a/python/lib/adaguc/CGIRunner.py +++ b/python/lib/adaguc/CGIRunner.py @@ -65,19 +65,18 @@ async def run(self, cmds, url, output, env=[], path=None, isCGI=True, timeout=30 # process_error = process_error.encode() - reader, writer = await asyncio.open_unix_connection(f"{os.getenv('ADAGUC_PATH')}/adaguc.socket") + reader, writer = await asyncio.open_unix_connection( + f"{os.getenv('ADAGUC_PATH')}/adaguc.socke" + ) writer.write(url.encode()) await writer.drain() - while data := await reader.read(4096): - # print(f"Received: {data}") - process_output.extend(data) + process_output = await reader.read() print("Close the connection") writer.close() await writer.wait_closed() - status = 0 process_error = "".encode() # process = await asyncio.create_subprocess_exec( @@ -98,6 +97,10 @@ async def run(self, cmds, url, output, env=[], path=None, isCGI=True, timeout=30 # return HTTP_STATUSCODE_500_TIMEOUT, [], None # status = await process.wait() + # Status code is stored in the last 4 bytes from the received data + status = int.from_bytes(process_output[-4:], byteorder="little") + print("@ status", status, process_output[-4:]) + # Split headers from body using a regex headersEndAt = -2 headers = "" @@ -114,7 +117,7 @@ async def run(self, cmds, url, output, env=[], path=None, isCGI=True, timeout=30 ) return 1, [], None - body = process_output[headersEndAt + 2 :] + body = process_output[headersEndAt + 2 : -4] output.write(body) headersList = headers.split("\r\n") return status, [s for s in headersList if s != "\n" and ":" in s], process_error From 21a491b21e9987213c8c82e2bbbead7a8b4d7abb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthijs=20Gr=C3=BCnbauer?= Date: Tue, 10 Sep 2024 13:24:56 +0200 Subject: [PATCH 07/17] Move fork logic to separate file --- adagucserverEC/CMakeLists.txt | 2 + adagucserverEC/adagucserver.cpp | 92 +------------------------------ adagucserverEC/fork_server.cpp | 95 +++++++++++++++++++++++++++++++++ adagucserverEC/fork_server.h | 8 +++ 4 files changed, 107 insertions(+), 90 deletions(-) create mode 100644 adagucserverEC/fork_server.cpp create mode 100644 adagucserverEC/fork_server.h diff --git a/adagucserverEC/CMakeLists.txt b/adagucserverEC/CMakeLists.txt index 942d0bc4a..129f346a5 100644 --- a/adagucserverEC/CMakeLists.txt +++ b/adagucserverEC/CMakeLists.txt @@ -182,6 +182,8 @@ add_library( Types/ProjectionStore.h Types/ProjectionStore.cpp testadagucserver.cpp + fork_server.h + fork_server.cpp ) target_include_directories(adagucserverEC PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} ${Cairo_INCLUDE_DIRS} ${FREETYPE_INCLUDE_DIRS} ${PostgreSQL_INCLUDE_DIRS} ${GDAL_INCLUDE_DIRS} ${PROJ_INCLUDE_DIR}) diff --git a/adagucserverEC/adagucserver.cpp b/adagucserverEC/adagucserver.cpp index 15160c2a7..c9988f3d0 100644 --- a/adagucserverEC/adagucserver.cpp +++ b/adagucserverEC/adagucserver.cpp @@ -30,6 +30,7 @@ #include #include "ProjCache.h" +#include "fork_server.h" #include #include @@ -399,8 +400,6 @@ int _main(int argc, char **argv, char **) { return getStatusCode(); } -static const unsigned int nIncomingConnections = 5; - int do_work(int argc, char **argv, char **envp) { /* Check if ADAGUC_LOGFILE is set */ const char *ADAGUC_LOGFILE = getenv("ADAGUC_LOGFILE"); @@ -472,100 +471,13 @@ int do_work(int argc, char **argv, char **envp) { return status; } -void handle_client(int client_socket, int argc, char **argv, char **envp) { - int recv_buf_len = 4096; - char recv_buf[recv_buf_len]; - memset(recv_buf, 0, recv_buf_len * sizeof(char)); - - int data_recv = recv(client_socket, recv_buf, recv_buf_len, 0); - if (data_recv > 0) { - dup2(client_socket, STDOUT_FILENO); - setenv("QUERY_STRING", recv_buf, 1); - - int status = do_work(argc, argv, envp); - // fprintf(stderr, "exiting, status=%d", status); - - fflush(stdout); - fflush(stderr); - - exit(status); - } -} - -static std::map child_sockets; - -void on_child_exit(int child_signal) { - int child_status; - pid_t child_pid = wait(&child_status); - - int child_sock = child_sockets[child_pid]; - fprintf(stderr, "@ on_child_exit [%d] [%d] [%d] [%d]", child_pid, child_sock, child_signal, child_status); - - write(child_sock, reinterpret_cast(&child_status), sizeof(child_status)); - close(child_sock); - - child_sockets.erase(child_pid); -} - -int run_server(int argc, char **argv, char **envp) { - int client_socket = 0; - - signal(SIGCHLD, on_child_exit); - - struct sockaddr_un local, remote; - int len = 0; - - int listen_socket = socket(AF_UNIX, SOCK_STREAM, 0); - if (-1 == listen_socket) { - printf("Error on socket() call \n"); - return 1; - } - - local.sun_family = AF_UNIX; - - CT::string socket_path(getenv("ADAGUC_PATH")); - socket_path.concat("/adaguc.socket"); - - strcpy(local.sun_path, socket_path.c_str()); - unlink(local.sun_path); - len = strlen(local.sun_path) + sizeof(local.sun_family); - - if (bind(listen_socket, (struct sockaddr *)&local, len) != 0) { - printf("Error on binding socket \n"); - return 1; - } - - if (listen(listen_socket, nIncomingConnections) != 0) { - printf("Error on listen call \n"); - } - - while (1) { - unsigned int sock_len = 0; - - if ((client_socket = accept(listen_socket, (struct sockaddr *)&remote, &sock_len)) == -1) { - printf("Error on accept() call \n"); - return 1; - } - - pid_t pid = fork(); - if (pid == 0) { - close(listen_socket); - handle_client(client_socket, argc, argv, envp); - } else { - child_sockets[pid] = client_socket; - } - } - - return 0; -} - int main(int argc, char **argv, char **envp) { setvbuf(stdout, NULL, _IONBF, 0); // turn off buffering setvbuf(stderr, NULL, _IONBF, 0); // turn off buffering const char *ADAGUC_FORK = getenv("ADAGUC_FORK"); if (ADAGUC_FORK != NULL) { - int server_status = run_server(argc, argv, envp); + return run_server(do_work, argc, argv, envp); } else { // normal flow without unix socket server/fork return do_work(argc, argv, envp); diff --git a/adagucserverEC/fork_server.cpp b/adagucserverEC/fork_server.cpp new file mode 100644 index 000000000..b293909cb --- /dev/null +++ b/adagucserverEC/fork_server.cpp @@ -0,0 +1,95 @@ +#include +#include +#include +#include +#include "CTString.h" + +#include "fork_server.h" + +static std::map child_sockets; +static const unsigned int nIncomingConnections = 5; + +void handle_client(int client_socket, int (*do_work)(int, char **, char **), int argc, char **argv, char **envp) { + int recv_buf_len = 4096; + char recv_buf[recv_buf_len]; + memset(recv_buf, 0, recv_buf_len * sizeof(char)); + + int data_recv = recv(client_socket, recv_buf, recv_buf_len, 0); + if (data_recv > 0) { + dup2(client_socket, STDOUT_FILENO); + setenv("QUERY_STRING", recv_buf, 1); + + int status = do_work(argc, argv, envp); + // fprintf(stderr, "exiting, status=%d", status); + + // fflush(stdout); + // fflush(stderr); + + exit(status); + } +} + +void on_child_exit(int child_signal) { + int child_status; + pid_t child_pid = wait(&child_status); + + int child_sock = child_sockets[child_pid]; + fprintf(stderr, "@ on_child_exit [%d] [%d] [%d] [%d]", child_pid, child_sock, child_signal, child_status); + + write(child_sock, reinterpret_cast(&child_status), sizeof(child_status)); + close(child_sock); + + child_sockets.erase(child_pid); +} + +int run_server(int (*do_work)(int, char **, char **), int argc, char **argv, char **envp) { + int client_socket = 0; + + signal(SIGCHLD, on_child_exit); + + struct sockaddr_un local, remote; + int len = 0; + + int listen_socket = socket(AF_UNIX, SOCK_STREAM, 0); + if (-1 == listen_socket) { + printf("Error on socket() call \n"); + return 1; + } + + local.sun_family = AF_UNIX; + + CT::string socket_path(getenv("ADAGUC_PATH")); + socket_path.concat("/adaguc.socket"); + + strcpy(local.sun_path, socket_path.c_str()); + unlink(local.sun_path); + len = strlen(local.sun_path) + sizeof(local.sun_family); + + if (bind(listen_socket, (struct sockaddr *)&local, len) != 0) { + printf("Error on binding socket \n"); + return 1; + } + + if (listen(listen_socket, nIncomingConnections) != 0) { + printf("Error on listen call \n"); + } + + while (1) { + unsigned int sock_len = 0; + + if ((client_socket = accept(listen_socket, (struct sockaddr *)&remote, &sock_len)) == -1) { + printf("Error on accept() call \n"); + return 1; + } + + pid_t pid = fork(); + if (pid == 0) { + close(listen_socket); + handle_client(client_socket, do_work, argc, argv, envp); + } else { + child_sockets[pid] = client_socket; + } + } + + return 0; +} \ No newline at end of file diff --git a/adagucserverEC/fork_server.h b/adagucserverEC/fork_server.h new file mode 100644 index 000000000..ae36b1c57 --- /dev/null +++ b/adagucserverEC/fork_server.h @@ -0,0 +1,8 @@ +#ifndef ADAGUC_SERVER_FORK_SERVER_H +#define ADAGUC_SERVER_FORK_SERVER_H + +void handle_client(int client_socket, int (*do_work)(int, char **, char **), int argc, char **argv, char **envp); +void on_child_exit(int child_signal); +int run_server(int (*do_work)(int, char **, char **), int argc, char **argv, char **envp); + +#endif // ADAGUC_SERVER_FORK_SERVER_H \ No newline at end of file From 3fbbc8524bedea9e06c36a8f51a4912a90cdd127 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthijs=20Gr=C3=BCnbauer?= Date: Tue, 10 Sep 2024 19:14:53 +0200 Subject: [PATCH 08/17] Refactor CGIRunner.py --- adagucserverEC/fork_server.cpp | 30 +++-- python/lib/adaguc/CGIRunner.py | 159 +++++++++++++++++---------- python/python_fastapi_server/main.py | 2 +- 3 files changed, 123 insertions(+), 68 deletions(-) diff --git a/adagucserverEC/fork_server.cpp b/adagucserverEC/fork_server.cpp index b293909cb..8fbc5e285 100644 --- a/adagucserverEC/fork_server.cpp +++ b/adagucserverEC/fork_server.cpp @@ -1,13 +1,16 @@ #include +#include #include #include +#include +#include #include -#include "CTString.h" +#include "CTString.h" #include "fork_server.h" static std::map child_sockets; -static const unsigned int nIncomingConnections = 5; +static const unsigned int max_pending_connections = 5; void handle_client(int client_socket, int (*do_work)(int, char **, char **), int argc, char **argv, char **envp) { int recv_buf_len = 4096; @@ -25,21 +28,26 @@ void handle_client(int client_socket, int (*do_work)(int, char **, char **), int // fflush(stdout); // fflush(stderr); - exit(status); + exit(0); } } void on_child_exit(int child_signal) { - int child_status; - pid_t child_pid = wait(&child_status); + int stat_val; + pid_t child_pid = wait(&stat_val); - int child_sock = child_sockets[child_pid]; - fprintf(stderr, "@ on_child_exit [%d] [%d] [%d] [%d]", child_pid, child_sock, child_signal, child_status); + // TODO: check what happens with handling signals after wait(), have to use macro, see `man 2 wait` + if (WIFEXITED(stat_val)) { + int child_status = WEXITSTATUS(stat_val); - write(child_sock, reinterpret_cast(&child_status), sizeof(child_status)); - close(child_sock); + int child_sock = child_sockets[child_pid]; + fprintf(stderr, "@ on_child_exit [%d] [%d] [%d] [%d]", child_pid, child_sock, child_signal, child_status); - child_sockets.erase(child_pid); + write(child_sock, reinterpret_cast(&child_status), sizeof(child_status)); + close(child_sock); + + child_sockets.erase(child_pid); + } } int run_server(int (*do_work)(int, char **, char **), int argc, char **argv, char **envp) { @@ -70,7 +78,7 @@ int run_server(int (*do_work)(int, char **, char **), int argc, char **argv, cha return 1; } - if (listen(listen_socket, nIncomingConnections) != 0) { + if (listen(listen_socket, max_pending_connections) != 0) { printf("Error on listen call \n"); } diff --git a/python/lib/adaguc/CGIRunner.py b/python/lib/adaguc/CGIRunner.py index f6a4553c3..25fd9fa28 100644 --- a/python/lib/adaguc/CGIRunner.py +++ b/python/lib/adaguc/CGIRunner.py @@ -14,6 +14,7 @@ import chardet from queue import Queue, Empty # python 3.x import re +from typing import NamedTuple HTTP_STATUSCODE_404_NOT_FOUND = 32 # Must be the same as in Definitions.h HTTP_STATUSCODE_422_UNPROCESSABLE_ENTITY = 33 # Must be the same as in Definitions.h @@ -23,7 +24,92 @@ sem = asyncio.Semaphore(int(ADAGUC_NUMPARALLELPROCESSES)) -import socket +ADAGUC_FORK_UNIX_SOCKET = f"{os.getenv('ADAGUC_PATH')}/adaguc.socket" +ON_POSIX = "posix" in sys.builtin_module_names + + +class AdagucResponse(NamedTuple): + status_code: int + process_output: bytearray + + +async def wait_socket_communicate(url, timeout) -> AdagucResponse: + """ + If `socket_communicate` takes longer than `timeout`, we send a 500 timeout to the client. + The adagucserver process will get cleaned up by the adagucserver parent process. + """ + + try: + resp = await asyncio.wait_for(socket_communicate(url), timeout=timeout) + except asyncio.exceptions.TimeoutError: + return AdagucResponse( + status_code=HTTP_STATUSCODE_500_TIMEOUT, process_output=None + ) + return resp + + +async def socket_communicate(url: str) -> AdagucResponse: + """ + Connect to unix socket, send query string over socket, receive bytes from adagucserver. + + Last 4 bytes are status code. + """ + + process_output = bytearray() + reader, writer = await asyncio.open_unix_connection(ADAGUC_FORK_UNIX_SOCKET) + writer.write(url.encode()) + await writer.drain() + + process_output = await reader.read() + + writer.close() + await writer.wait_closed() + + # Status code is stored in the last 4 bytes from the received data + status_code = int.from_bytes(process_output[-4:], sys.byteorder) + process_output = process_output[:-4] + return AdagucResponse(status_code=status_code, process_output=process_output) + + +async def wait_process_communicate(cmds, localenv, timeout) -> AdagucResponse: + """ + If `process_communicate` takes longer than `timeout`, we send a 500 timeout to the client. + We also have to manually kill the adagucserver process that we spawned before. + """ + + try: + process = None + resp = await asyncio.wait_for( + process_communicate(process, cmds, localenv), timeout=timeout + ) + except asyncio.exceptions.TimeoutError: + if process: + process.kill() + await process.communicate() + + return AdagucResponse( + status_code=HTTP_STATUSCODE_500_TIMEOUT, process_output=None + ) + return resp + + +async def process_communicate(process, cmds, localenv) -> AdagucResponse: + """ + Spawn a new adagucserver process, wait for output. + """ + + process = await asyncio.create_subprocess_exec( + *cmds, + stdout=PIPE, + stderr=PIPE, + env=localenv, + close_fds=ON_POSIX, + ) + + process_output, _ = await process.communicate() + status = await process.wait() + + return AdagucResponse(status_code=status, process_output=process_output) class CGIRunner: @@ -43,63 +129,24 @@ async def run(self, cmds, url, output, env=[], path=None, isCGI=True, timeout=30 localenv["REQUEST_URI"] = "/myscriptname/" + path localenv.update(env) - # print("# QUERY STRING:", url) - # print("# LOCAL ENV:", localenv) + print("@@@@", url) + # TODO: check why hash of out.png is different after refactor of CGIRunner.py? - # Execute adaguc-server binary - ON_POSIX = "posix" in sys.builtin_module_names async with sem: - # process_output = "" - - # client = socket.socket(socket.AF_UNIX) - # client.connect("/tmp/adaguc.socket") - # client.send(url.encode()) - - process_output = bytearray() - # while data := client.recv(4096): - # # print(data) - # process_output.extend(data) - - # process_error = "" - # status = 0 - - # process_error = process_error.encode() - - reader, writer = await asyncio.open_unix_connection( - f"{os.getenv('ADAGUC_PATH')}/adaguc.socke" - ) - writer.write(url.encode()) - await writer.drain() - - process_output = await reader.read() - - print("Close the connection") - writer.close() - await writer.wait_closed() - - process_error = "".encode() - - # process = await asyncio.create_subprocess_exec( - # *cmds, - # stdout=PIPE, - # stderr=PIPE, - # env=localenv, - # close_fds=ON_POSIX, - # ) - # try: - # (process_output, process_error) = await asyncio.wait_for( - # process.communicate(), timeout=timeout - # ) - # except asyncio.exceptions.TimeoutError: - # process.kill() - # await process.communicate() - # output.write(b"Adaguc server processs timed out") - # return HTTP_STATUSCODE_500_TIMEOUT, [], None - # status = await process.wait() - - # Status code is stored in the last 4 bytes from the received data - status = int.from_bytes(process_output[-4:], byteorder="little") - print("@ status", status, process_output[-4:]) + if os.getenv("ADAGUC_FORK", None): + response = await wait_socket_communicate(url, timeout=timeout) + else: + response = await wait_process_communicate( + cmds, localenv, timeout=timeout + ) + + if response.status_code == HTTP_STATUSCODE_500_TIMEOUT: + output.write(b"Adaguc server processs timed out") + return HTTP_STATUSCODE_500_TIMEOUT, [], None + + process_error = "".encode() + process_output = response.process_output + status = response.status_code # Split headers from body using a regex headersEndAt = -2 diff --git a/python/python_fastapi_server/main.py b/python/python_fastapi_server/main.py index 6af105d32..de16742d7 100644 --- a/python/python_fastapi_server/main.py +++ b/python/python_fastapi_server/main.py @@ -94,5 +94,5 @@ async def root(): app.include_router(opendapRouter) if __name__ == "__main__": - # testadaguc() + testadaguc() uvicorn.run(app="main:app", host="0.0.0.0", port=8080, reload=True) From 8085ef15081d9fd7def2d6d6b280a1ae96691389 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthijs=20Gr=C3=BCnbauer?= Date: Wed, 11 Sep 2024 11:34:40 +0200 Subject: [PATCH 09/17] Fix socket file path and don't strip last 4 bytes --- adagucserverEC/fork_server.cpp | 9 +++++++-- python/lib/adaguc/CGIRunner.py | 6 +++--- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/adagucserverEC/fork_server.cpp b/adagucserverEC/fork_server.cpp index 8fbc5e285..d6e5d91a1 100644 --- a/adagucserverEC/fork_server.cpp +++ b/adagucserverEC/fork_server.cpp @@ -69,10 +69,14 @@ int run_server(int (*do_work)(int, char **, char **), int argc, char **argv, cha CT::string socket_path(getenv("ADAGUC_PATH")); socket_path.concat("/adaguc.socket"); - strcpy(local.sun_path, socket_path.c_str()); + strncpy(local.sun_path, socket_path.c_str(), sizeof(local.sun_path)); + local.sun_path[sizeof(local.sun_path) - 1] = '\0'; + + // Remove old adaguc.socket file unlink(local.sun_path); - len = strlen(local.sun_path) + sizeof(local.sun_family); + // Bind name to the "local" socket + len = strlen(local.sun_path) + sizeof(local.sun_family) + 1; if (bind(listen_socket, (struct sockaddr *)&local, len) != 0) { printf("Error on binding socket \n"); return 1; @@ -80,6 +84,7 @@ int run_server(int (*do_work)(int, char **, char **), int argc, char **argv, cha if (listen(listen_socket, max_pending_connections) != 0) { printf("Error on listen call \n"); + return 1; } while (1) { diff --git a/python/lib/adaguc/CGIRunner.py b/python/lib/adaguc/CGIRunner.py index 25fd9fa28..9670c6cb9 100644 --- a/python/lib/adaguc/CGIRunner.py +++ b/python/lib/adaguc/CGIRunner.py @@ -129,8 +129,7 @@ async def run(self, cmds, url, output, env=[], path=None, isCGI=True, timeout=30 localenv["REQUEST_URI"] = "/myscriptname/" + path localenv.update(env) - print("@@@@", url) - # TODO: check why hash of out.png is different after refactor of CGIRunner.py? + # print("@@@@", url) async with sem: if os.getenv("ADAGUC_FORK", None): @@ -164,7 +163,8 @@ async def run(self, cmds, url, output, env=[], path=None, isCGI=True, timeout=30 ) return 1, [], None - body = process_output[headersEndAt + 2 : -4] + body = process_output[headersEndAt + 2 :] + output.write(body) headersList = headers.split("\r\n") return status, [s for s in headersList if s != "\n" and ":" in s], process_error From 97cf0d1bb1fb1cc87505a95a51e3543edee39d30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthijs=20Gr=C3=BCnbauer?= Date: Wed, 11 Sep 2024 14:40:28 +0200 Subject: [PATCH 10/17] Handle concurrent child exits, passes benchmark --- Docker/start.sh | 2 +- adagucserverEC/CRequest.cpp | 1 - adagucserverEC/adagucserver.cpp | 2 ++ adagucserverEC/fork_server.cpp | 34 +++++++++++++++++++++++---------- 4 files changed, 27 insertions(+), 12 deletions(-) diff --git a/Docker/start.sh b/Docker/start.sh index e13eac636..2098fbe5f 100755 --- a/Docker/start.sh +++ b/Docker/start.sh @@ -26,7 +26,7 @@ done export ADAGUC_PATH=/adaguc/adaguc-server-master/ && \ export ADAGUC_TMP=/tmp && \ -/adaguc/adaguc-server-master/bin/adagucserver --updatedb \ +env -u ADAGUC_FORK /adaguc/adaguc-server-master/bin/adagucserver --updatedb \ --config /adaguc/adaguc-server-config.xml,baselayers.xml if [ $? -ne 0 ] diff --git a/adagucserverEC/CRequest.cpp b/adagucserverEC/CRequest.cpp index 33e60c37c..858122864 100644 --- a/adagucserverEC/CRequest.cpp +++ b/adagucserverEC/CRequest.cpp @@ -794,7 +794,6 @@ int CRequest::process_wms_getcap_request() { ; if (status == CXMLGEN_FATAL_ERROR_OCCURED) return 1; } - const char *pszADAGUCWriteToFile = getenv("ADAGUC_WRITETOFILE"); if (pszADAGUCWriteToFile != NULL) { CReadFile::write(pszADAGUCWriteToFile, XMLdocument.c_str(), XMLdocument.length()); diff --git a/adagucserverEC/adagucserver.cpp b/adagucserverEC/adagucserver.cpp index c9988f3d0..2706f74c5 100644 --- a/adagucserverEC/adagucserver.cpp +++ b/adagucserverEC/adagucserver.cpp @@ -472,6 +472,8 @@ int do_work(int argc, char **argv, char **envp) { } int main(int argc, char **argv, char **envp) { + // If these lines are commented out, the calls the /edr/collections/instances/ fail to return data + // because the call to `request=getreferencetimes` does not contain useful output setvbuf(stdout, NULL, _IONBF, 0); // turn off buffering setvbuf(stderr, NULL, _IONBF, 0); // turn off buffering diff --git a/adagucserverEC/fork_server.cpp b/adagucserverEC/fork_server.cpp index d6e5d91a1..21d0f46b1 100644 --- a/adagucserverEC/fork_server.cpp +++ b/adagucserverEC/fork_server.cpp @@ -3,7 +3,6 @@ #include #include #include -#include #include #include "CTString.h" @@ -19,7 +18,9 @@ void handle_client(int client_socket, int (*do_work)(int, char **, char **), int int data_recv = recv(client_socket, recv_buf, recv_buf_len, 0); if (data_recv > 0) { + // The child stdout should end up in the client socket dup2(client_socket, STDOUT_FILENO); + setenv("QUERY_STRING", recv_buf, 1); int status = do_work(argc, argv, envp); @@ -33,20 +34,32 @@ void handle_client(int client_socket, int (*do_work)(int, char **, char **), int } void on_child_exit(int child_signal) { + /* + This function gets executed once a child exits/signals (through SIGCHLD) + Note: the kernel does not queue signals. If a child exits during the handling of another signal, the exit/signal gets dropped. + */ + int stat_val; - pid_t child_pid = wait(&stat_val); + pid_t child_pid; - // TODO: check what happens with handling signals after wait(), have to use macro, see `man 2 wait` - if (WIFEXITED(stat_val)) { - int child_status = WEXITSTATUS(stat_val); + // Loop over all exited children. WNOHANG makes the call non-blocking. + while ((child_pid = waitpid(-1, &stat_val, WNOHANG)) > 0) { + // If child exited normally + if (WIFEXITED(stat_val)) { + int child_status = WEXITSTATUS(stat_val); - int child_sock = child_sockets[child_pid]; - fprintf(stderr, "@ on_child_exit [%d] [%d] [%d] [%d]", child_pid, child_sock, child_signal, child_status); + int child_sock = child_sockets[child_pid]; + fprintf(stderr, "@ on_child_exit [%d] [%d] [%d] [%d]\n", child_pid, child_sock, child_signal, child_status); - write(child_sock, reinterpret_cast(&child_status), sizeof(child_status)); - close(child_sock); + // Write the status code from the child pid into the unix socket back to python + write(child_sock, reinterpret_cast(&child_status), sizeof(child_status)); + close(child_sock); - child_sockets.erase(child_pid); + child_sockets.erase(child_pid); + } else if (WIFSIGNALED(stat_val)) { + int child_signal2 = WTERMSIG(stat_val); + fprintf(stderr, "@ on_child_signal [%d] [%d]\n", child_pid, child_signal2); + } } } @@ -90,6 +103,7 @@ int run_server(int (*do_work)(int, char **, char **), int argc, char **argv, cha while (1) { unsigned int sock_len = 0; + // Once someone connects to the unix socket, immediately fork and execute the client request in `handle_client` if ((client_socket = accept(listen_socket, (struct sockaddr *)&remote, &sock_len)) == -1) { printf("Error on accept() call \n"); return 1; From 834167aa4d4bb99e324219c9c52299f09140f72c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthijs=20Gr=C3=BCnbauer?= Date: Wed, 11 Sep 2024 14:46:29 +0200 Subject: [PATCH 11/17] Ignore arm64 for testing --- .github/workflows/docker-publish.yml | 64 ++++++++++++++-------------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/.github/workflows/docker-publish.yml b/.github/workflows/docker-publish.yml index 5040d0e94..6f3f182e1 100644 --- a/.github/workflows/docker-publish.yml +++ b/.github/workflows/docker-publish.yml @@ -15,38 +15,38 @@ jobs: - name: Check out the repo uses: actions/checkout@v4 - - name: Build and potentially push Docker image - uses: docker/build-push-action@v5 - with: - context: . - # ensure latest base image is used - pull: true - push: false - tags: adaguc-server - - name: Run Trivy vulnerability scanner - uses: aquasecurity/trivy-action@0.24.0 - with: - image-ref: 'adaguc-server' - format: 'table' - exit-code: '1' - ignore-unfixed: true - vuln-type: 'os,library' - trivyignores: .trivyignore -# severity: 'CRITICAL,HIGH' - docker-build-and-publish: - name: Build Docker image and potentially push to Docker Hub - runs-on: ubuntu-latest - steps: - - name: Check out the repo - uses: actions/checkout@v4 +# - name: Build and potentially push Docker image +# uses: docker/build-push-action@v5 +# with: +# context: . +# # ensure latest base image is used +# pull: true +# push: false +# tags: adaguc-server +# - name: Run Trivy vulnerability scanner +# uses: aquasecurity/trivy-action@0.24.0 +# with: +# image-ref: 'adaguc-server' +# format: 'table' +# exit-code: '1' +# ignore-unfixed: true +# vuln-type: 'os,library' +# trivyignores: .trivyignore +# # severity: 'CRITICAL,HIGH' +# docker-build-and-publish: +# name: Build Docker image and potentially push to Docker Hub +# runs-on: ubuntu-latest +# steps: +# - name: Check out the repo +# uses: actions/checkout@v4 - - name: Set up QEMU - uses: docker/setup-qemu-action@v3 - with: - platforms: 'arm64' +# - name: Set up QEMU +# uses: docker/setup-qemu-action@v3 +# with: +# platforms: 'arm64' - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 +# - name: Set up Docker Buildx +# uses: docker/setup-buildx-action@v3 - name: Log in to Docker Hub if: github.repository == 'KNMI/adaguc-server' @@ -65,9 +65,9 @@ jobs: uses: docker/build-push-action@v5 with: context: . - platforms: linux/amd64,linux/arm64 + platforms: linux/amd64 # ensure latest base image is used pull: true push: ${{ github.repository == 'KNMI/adaguc-server' }} - tags: ${{ steps.meta.outputs.tags }} + tags: ${{ steps.meta.outputs.tags }}-20240911T1446 labels: ${{ steps.meta.outputs.labels }} From 414acfe93b32d7484a8f1121d6c7f262a12022a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthijs=20Gr=C3=BCnbauer?= Date: Wed, 2 Oct 2024 14:01:09 +0200 Subject: [PATCH 12/17] Fixes after merge from master --- adagucserverEC/adagucserver.cpp | 6 +++--- adagucserverEC/fork_server.cpp | 8 ++++---- adagucserverEC/fork_server.h | 4 ++-- hclasses/CDebugger.cpp | 1 + python/lib/adaguc/CGIRunner.py | 5 ++++- 5 files changed, 14 insertions(+), 10 deletions(-) diff --git a/adagucserverEC/adagucserver.cpp b/adagucserverEC/adagucserver.cpp index 6ba4aaf99..0e2a87605 100644 --- a/adagucserverEC/adagucserver.cpp +++ b/adagucserverEC/adagucserver.cpp @@ -397,7 +397,7 @@ int _main(int argc, char **argv, char **) { return getStatusCode(); } -int do_work(int argc, char **argv, char **envp) { +int run_adaguc_once(int argc, char **argv, char **envp, bool is_forked) { /* Check if ADAGUC_LOGFILE is set */ const char *ADAGUC_LOGFILE = getenv("ADAGUC_LOGFILE"); if (ADAGUC_LOGFILE != NULL) { @@ -478,9 +478,9 @@ int main(int argc, char **argv, char **envp) { const char *ADAGUC_FORK = getenv("ADAGUC_FORK"); if (ADAGUC_FORK != NULL) { - return run_server(do_work, argc, argv, envp); + return run_as_fork_service(run_adaguc_once, argc, argv, envp); } else { // normal flow without unix socket server/fork - return do_work(argc, argv, envp); + return run_adaguc_once(argc, argv, envp, false); } } diff --git a/adagucserverEC/fork_server.cpp b/adagucserverEC/fork_server.cpp index 21d0f46b1..aa19ae19a 100644 --- a/adagucserverEC/fork_server.cpp +++ b/adagucserverEC/fork_server.cpp @@ -11,7 +11,7 @@ static std::map child_sockets; static const unsigned int max_pending_connections = 5; -void handle_client(int client_socket, int (*do_work)(int, char **, char **), int argc, char **argv, char **envp) { +void handle_client(int client_socket, int (*run_adaguc_once)(int, char **, char **, bool), int argc, char **argv, char **envp) { int recv_buf_len = 4096; char recv_buf[recv_buf_len]; memset(recv_buf, 0, recv_buf_len * sizeof(char)); @@ -23,7 +23,7 @@ void handle_client(int client_socket, int (*do_work)(int, char **, char **), int setenv("QUERY_STRING", recv_buf, 1); - int status = do_work(argc, argv, envp); + int status = run_adaguc_once(argc, argv, envp, true); // fprintf(stderr, "exiting, status=%d", status); // fflush(stdout); @@ -63,7 +63,7 @@ void on_child_exit(int child_signal) { } } -int run_server(int (*do_work)(int, char **, char **), int argc, char **argv, char **envp) { +int run_as_fork_service(int (*run_adaguc_once)(int, char **, char **, bool), int argc, char **argv, char **envp) { int client_socket = 0; signal(SIGCHLD, on_child_exit); @@ -112,7 +112,7 @@ int run_server(int (*do_work)(int, char **, char **), int argc, char **argv, cha pid_t pid = fork(); if (pid == 0) { close(listen_socket); - handle_client(client_socket, do_work, argc, argv, envp); + handle_client(client_socket, run_adaguc_once, argc, argv, envp); } else { child_sockets[pid] = client_socket; } diff --git a/adagucserverEC/fork_server.h b/adagucserverEC/fork_server.h index ae36b1c57..7e4030637 100644 --- a/adagucserverEC/fork_server.h +++ b/adagucserverEC/fork_server.h @@ -1,8 +1,8 @@ #ifndef ADAGUC_SERVER_FORK_SERVER_H #define ADAGUC_SERVER_FORK_SERVER_H -void handle_client(int client_socket, int (*do_work)(int, char **, char **), int argc, char **argv, char **envp); +void handle_client(int client_socket, int (*run_adaguc_once)(int, char **, char **, bool), int argc, char **argv, char **envp); void on_child_exit(int child_signal); -int run_server(int (*do_work)(int, char **, char **), int argc, char **argv, char **envp); +int run_as_fork_service(int (*run_adaguc_once)(int, char **, char **, bool), int argc, char **argv, char **envp); #endif // ADAGUC_SERVER_FORK_SERVER_H \ No newline at end of file diff --git a/hclasses/CDebugger.cpp b/hclasses/CDebugger.cpp index 61633a27d..b1f55a8dc 100644 --- a/hclasses/CDebugger.cpp +++ b/hclasses/CDebugger.cpp @@ -43,6 +43,7 @@ void (*_printErrorStreamPointer)(const char *) = &_printErrorStream; void (*_printDebugStreamPointer)(const char *) = &_printDebugStream; void (*_printWarningStreamPointer)(const char *) = &_printWarningStream; +// TODO: logProcessIdentifier gets executed only once by the parent. Child logging uses the pid from parent void printDebugStream(const char *message) { _printDebugStreamPointer(message); } void printWarningStream(const char *message) { _printWarningStreamPointer(message); } void printErrorStream(const char *message) { _printErrorStreamPointer(message); } diff --git a/python/lib/adaguc/CGIRunner.py b/python/lib/adaguc/CGIRunner.py index 9670c6cb9..04777e0eb 100644 --- a/python/lib/adaguc/CGIRunner.py +++ b/python/lib/adaguc/CGIRunner.py @@ -131,8 +131,11 @@ async def run(self, cmds, url, output, env=[], path=None, isCGI=True, timeout=30 # print("@@@@", url) + # Only use fork server if ADAGUC_FORK is set and adaguc is not executed with extra arguments e.g. `--updatelayermetadata` + use_fork = os.getenv("ADAGUC_FORK", None) and len(cmds) == 1 + async with sem: - if os.getenv("ADAGUC_FORK", None): + if use_fork: response = await wait_socket_communicate(url, timeout=timeout) else: response = await wait_process_communicate( From c76149b4257505812ed4cebeaad38f36c586e50d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthijs=20Gr=C3=BCnbauer?= Date: Wed, 2 Oct 2024 16:34:44 +0200 Subject: [PATCH 13/17] Handle child events correctly --- adagucserverEC/adagucserver.cpp | 4 +- adagucserverEC/fork_server.cpp | 83 ++++++++++++++++++++++++--------- adagucserverEC/fork_server.h | 3 +- 3 files changed, 65 insertions(+), 25 deletions(-) diff --git a/adagucserverEC/adagucserver.cpp b/adagucserverEC/adagucserver.cpp index 0e2a87605..2deaddc7f 100644 --- a/adagucserverEC/adagucserver.cpp +++ b/adagucserverEC/adagucserver.cpp @@ -170,7 +170,7 @@ int runRequest() { return request.runRequest(); } -int _main(int argc, char **argv, char **) { +int _main(int argc, char **argv, char **, bool is_forked) { /* Initialize error functions */ seterrormode(EXCEPTIONS_PLAINTEXT); @@ -444,7 +444,7 @@ int run_adaguc_once(int argc, char **argv, char **envp, bool is_forked) { CDBDebug("ADAGUC_TMP environment variable is not set, setting to : [%s]", ADAGUC_TMP); } - int status = _main(argc, argv, envp); + int status = _main(argc, argv, envp, is_forked); /* Print the check report formatted as JSON. */ CReportWriter::writeJSONReportToFile(); diff --git a/adagucserverEC/fork_server.cpp b/adagucserverEC/fork_server.cpp index aa19ae19a..c63dac7fd 100644 --- a/adagucserverEC/fork_server.cpp +++ b/adagucserverEC/fork_server.cpp @@ -9,7 +9,18 @@ #include "fork_server.h" static std::map child_sockets; -static const unsigned int max_pending_connections = 5; + +int get_max_pending_connections() { + // Set the max number of queued connections on the socket via ADAGUC_NUMPARALLELPROCESSES + CT::string num_parallel_processes(getenv("ADAGUC_NUMPARALLELPROCESSES")); + int max_pending_connections; + if (num_parallel_processes.isInt()) { + return num_parallel_processes.toInt(); + } + + // Default to 4 queued connections + return 4; +} void handle_client(int client_socket, int (*run_adaguc_once)(int, char **, char **, bool), int argc, char **argv, char **envp) { int recv_buf_len = 4096; @@ -33,10 +44,13 @@ void handle_client(int client_socket, int (*run_adaguc_once)(int, char **, char } } -void on_child_exit(int child_signal) { +void child_signal_handler(int child_signal) { /* - This function gets executed once a child exits/signals (through SIGCHLD) - Note: the kernel does not queue signals. If a child exits during the handling of another signal, the exit/signal gets dropped. + This function gets executed once a child exits (normally or through a signal) + The kernel does not queue signals. If a child exits during the handling of another signal, the exit/signal gets dropped. + Calling `waitpid` with WNOHANG solves this, it will loop over all exited children and return if none are found. + + This function should not use any non-reentrant calls (i.e. no `printf`) as this blocks the handler. */ int stat_val; @@ -44,39 +58,61 @@ void on_child_exit(int child_signal) { // Loop over all exited children. WNOHANG makes the call non-blocking. while ((child_pid = waitpid(-1, &stat_val, WNOHANG)) > 0) { - // If child exited normally + int child_status; + if (WIFEXITED(stat_val)) { - int child_status = WEXITSTATUS(stat_val); + // child process exited normally. Collect child exit code (should be 0) + child_status = WEXITSTATUS(stat_val); + } else if (WIFSIGNALED(stat_val)) { + // child process terminated due to signal (e.g. SIGSEGV). Set child exit code to signal. + child_signal = WTERMSIG(stat_val); + child_status = child_signal; + } else { + // not sure what to do here... + child_status = 1; + } - int child_sock = child_sockets[child_pid]; - fprintf(stderr, "@ on_child_exit [%d] [%d] [%d] [%d]\n", child_pid, child_sock, child_signal, child_status); + int child_sock = child_sockets[child_pid]; + // fprintf(stderr, "@ on_child_exit [%d] [%d] [%d] [%d]\n", child_pid, child_sock, child_signal, child_status); - // Write the status code from the child pid into the unix socket back to python - write(child_sock, reinterpret_cast(&child_status), sizeof(child_status)); - close(child_sock); + // Write the status code from the child pid into the unix socket back to python + write(child_sock, reinterpret_cast(&child_status), sizeof(child_status)); + close(child_sock); - child_sockets.erase(child_pid); - } else if (WIFSIGNALED(stat_val)) { - int child_signal2 = WTERMSIG(stat_val); - fprintf(stderr, "@ on_child_signal [%d] [%d]\n", child_pid, child_signal2); - } + child_sockets.erase(child_pid); } } int run_as_fork_service(int (*run_adaguc_once)(int, char **, char **, bool), int argc, char **argv, char **envp) { + /* + Start adaguc in fork mode. This means: + - Set up a signal handler for any child processes + - Set up a socket through system calls: `socket`, `bind`, `listen` + - While true, accept any incoming connections to the socket, through system call `accept` + - If connected, fork this process. + - Child process will handle further communication through the `client_socket`, will handle the adaguc request and exit normally. + - Parent process keeps running, track of the created `client_socket` and check for new incoming connections. + */ + int client_socket = 0; - signal(SIGCHLD, on_child_exit); - - struct sockaddr_un local, remote; - int len = 0; + // Create a signal handler for all children (all received SIGCHLD signals) + // Signal mask is empty, meaning no additional signals are blocked while the handler is executed + struct sigaction sa; + sa.sa_handler = child_signal_handler; + sigemptyset(&sa.sa_mask); + sa.sa_flags = SA_RESTART | SA_NOCLDSTOP; + sigaction(SIGCHLD, &sa, NULL); + // Create an endpoint for communicating through a unix socket int listen_socket = socket(AF_UNIX, SOCK_STREAM, 0); if (-1 == listen_socket) { printf("Error on socket() call \n"); return 1; } + struct sockaddr_un local, remote; + int len = 0; local.sun_family = AF_UNIX; CT::string socket_path(getenv("ADAGUC_PATH")); @@ -88,14 +124,15 @@ int run_as_fork_service(int (*run_adaguc_once)(int, char **, char **, bool), int // Remove old adaguc.socket file unlink(local.sun_path); - // Bind name to the "local" socket + // Bind name to the local socket, this will create an entry in the filesystem len = strlen(local.sun_path) + sizeof(local.sun_family) + 1; if (bind(listen_socket, (struct sockaddr *)&local, len) != 0) { printf("Error on binding socket \n"); return 1; } - if (listen(listen_socket, max_pending_connections) != 0) { + // Start listening on the socket. Can have `max_pending_connections` number of connections queued. + if (listen(listen_socket, get_max_pending_connections()) != 0) { printf("Error on listen call \n"); return 1; } @@ -111,9 +148,11 @@ int run_as_fork_service(int (*run_adaguc_once)(int, char **, char **, bool), int pid_t pid = fork(); if (pid == 0) { + // Child process handles request. Communication with python happens through `client_socket` close(listen_socket); handle_client(client_socket, run_adaguc_once, argc, argv, envp); } else { + // Parent process keeps track of new socket and returns to listen for new connections child_sockets[pid] = client_socket; } } diff --git a/adagucserverEC/fork_server.h b/adagucserverEC/fork_server.h index 7e4030637..60c00af55 100644 --- a/adagucserverEC/fork_server.h +++ b/adagucserverEC/fork_server.h @@ -1,8 +1,9 @@ #ifndef ADAGUC_SERVER_FORK_SERVER_H #define ADAGUC_SERVER_FORK_SERVER_H +int get_max_pending_connections(); void handle_client(int client_socket, int (*run_adaguc_once)(int, char **, char **, bool), int argc, char **argv, char **envp); -void on_child_exit(int child_signal); +void child_signal_handler(int child_signal); int run_as_fork_service(int (*run_adaguc_once)(int, char **, char **, bool), int argc, char **argv, char **envp); #endif // ADAGUC_SERVER_FORK_SERVER_H \ No newline at end of file From d4445acd50b1a5ef97141bdd9b2cde38dd930be5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthijs=20Gr=C3=BCnbauer?= Date: Wed, 9 Oct 2024 11:23:09 +0200 Subject: [PATCH 14/17] Clean old procs in bg thread --- adagucserverEC/fork_server.cpp | 56 ++++++++++++++++++++++++++++++---- adagucserverEC/fork_server.h | 11 +++++++ 2 files changed, 61 insertions(+), 6 deletions(-) diff --git a/adagucserverEC/fork_server.cpp b/adagucserverEC/fork_server.cpp index c63dac7fd..46bd54af4 100644 --- a/adagucserverEC/fork_server.cpp +++ b/adagucserverEC/fork_server.cpp @@ -8,7 +8,12 @@ #include "CTString.h" #include "fork_server.h" -static std::map child_sockets; +// Check this many seconds for old left-over processes +const int CHECK_CHILD_PROC_INTERVAL = 60; +// Old left-over child process should be killed after this many seconds +// TODO: should this become same as `timeout=300` from CGIRunner.py? Should this be configurable? +const int MAX_CHILD_PROC_TIMEOUT = 300; +const int DEFAULT_QUEUED_CONNECTIONS = 4; int get_max_pending_connections() { // Set the max number of queued connections on the socket via ADAGUC_NUMPARALLELPROCESSES @@ -19,10 +24,15 @@ int get_max_pending_connections() { } // Default to 4 queued connections - return 4; + return DEFAULT_QUEUED_CONNECTIONS; } void handle_client(int client_socket, int (*run_adaguc_once)(int, char **, char **, bool), int argc, char **argv, char **envp) { + /* + Read bytes from client socket with the assumption that it is the raw QUERY_STRING value. + Set the QUERY_STRING and handle the request normally. + */ + int recv_buf_len = 4096; char recv_buf[recv_buf_len]; memset(recv_buf, 0, recv_buf_len * sizeof(char)); @@ -40,7 +50,7 @@ void handle_client(int client_socket, int (*run_adaguc_once)(int, char **, char // fflush(stdout); // fflush(stderr); - exit(0); + exit(status); } } @@ -72,14 +82,42 @@ void child_signal_handler(int child_signal) { child_status = 1; } - int child_sock = child_sockets[child_pid]; + int child_sock = child_procs[child_pid].socket; // fprintf(stderr, "@ on_child_exit [%d] [%d] [%d] [%d]\n", child_pid, child_sock, child_signal, child_status); // Write the status code from the child pid into the unix socket back to python write(child_sock, reinterpret_cast(&child_status), sizeof(child_status)); close(child_sock); - child_sockets.erase(child_pid); + child_procs.erase(child_pid); + } +} + +void *clean_child_procs(void *arg) { + /* + Every `CHECK_CHILD_PROC_INTERVAL` seconds, check all child procs stored in map. + If child proc was started more than `MAX_CHILD_PROC_TIMEOUT` seconds ago, send SIGKILL. + */ + + while (1) { + time_t now = time(NULL); + printf("Checking all child procs\n"); + + for (const auto &child_proc_mapping : child_procs) { + child_proc_t child_proc = child_proc_mapping.second; + + if (difftime(now, child_proc.forked_at) < MAX_CHILD_PROC_TIMEOUT) { + continue; + } + + printf("Child timeout!"); + + if (kill(child_proc_mapping.first, SIGKILL) == -1) { + printf("Failed to send SIGKILL to child process"); + // TODO: What to do if we cannot SIGKILL child process? + } + } + sleep(CHECK_CHILD_PROC_INTERVAL); } } @@ -87,6 +125,7 @@ int run_as_fork_service(int (*run_adaguc_once)(int, char **, char **, bool), int /* Start adaguc in fork mode. This means: - Set up a signal handler for any child processes + - Start a thread in the background that cleans up left-over child processes - Set up a socket through system calls: `socket`, `bind`, `listen` - While true, accept any incoming connections to the socket, through system call `accept` - If connected, fork this process. @@ -104,6 +143,10 @@ int run_as_fork_service(int (*run_adaguc_once)(int, char **, char **, bool), int sa.sa_flags = SA_RESTART | SA_NOCLDSTOP; sigaction(SIGCHLD, &sa, NULL); + // Start cleaning thread in the background + pthread_t clean_child_procs_thread; + pthread_create(&clean_child_procs_thread, NULL, clean_child_procs, NULL); + // Create an endpoint for communicating through a unix socket int listen_socket = socket(AF_UNIX, SOCK_STREAM, 0); if (-1 == listen_socket) { @@ -153,7 +196,8 @@ int run_as_fork_service(int (*run_adaguc_once)(int, char **, char **, bool), int handle_client(client_socket, run_adaguc_once, argc, argv, envp); } else { // Parent process keeps track of new socket and returns to listen for new connections - child_sockets[pid] = client_socket; + child_proc_t child_proc = {client_socket, time(NULL)}; + child_procs[pid] = child_proc; } } diff --git a/adagucserverEC/fork_server.h b/adagucserverEC/fork_server.h index 60c00af55..dae2c8212 100644 --- a/adagucserverEC/fork_server.h +++ b/adagucserverEC/fork_server.h @@ -1,9 +1,20 @@ #ifndef ADAGUC_SERVER_FORK_SERVER_H #define ADAGUC_SERVER_FORK_SERVER_H +#include +#include + +typedef struct { + int socket; + time_t forked_at; +} child_proc_t; + +static std::map child_procs; + int get_max_pending_connections(); void handle_client(int client_socket, int (*run_adaguc_once)(int, char **, char **, bool), int argc, char **argv, char **envp); void child_signal_handler(int child_signal); +void *clean_child_procs(void *arg); int run_as_fork_service(int (*run_adaguc_once)(int, char **, char **, bool), int argc, char **argv, char **envp); #endif // ADAGUC_SERVER_FORK_SERVER_H \ No newline at end of file From 0770c96641f26656545728f75929d36a2fc0757b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthijs=20Gr=C3=BCnbauer?= Date: Wed, 9 Oct 2024 14:34:22 +0200 Subject: [PATCH 15/17] Make child proc timeout configurable --- Docker/docker-compose.yml | 1 + Docker/start.sh | 2 +- Docker/supervisord/adaguc-pgbouncer.conf | 2 +- Docker/supervisord/adaguc.conf | 2 +- adagucserverEC/adagucserver.cpp | 4 ++-- adagucserverEC/fork_server.cpp | 25 ++++++++++-------------- adagucserverEC/fork_server.h | 1 - python/lib/adaguc/CGIRunner.py | 23 ++++++++++------------ 8 files changed, 26 insertions(+), 34 deletions(-) diff --git a/Docker/docker-compose.yml b/Docker/docker-compose.yml index c40e5a88e..6ee79a336 100755 --- a/Docker/docker-compose.yml +++ b/Docker/docker-compose.yml @@ -58,6 +58,7 @@ services: - "ADAGUC_DATA_DIR=/data/adaguc-data" - "ADAGUC_DATASET_DIR=/data/adaguc-datasets" - "ADAGUC_REDIS=redis://adaguc-redis:6379" + - "ADAGUC_FORK_SOCKET_PATH=adaguc.socket" env_file: - .env restart: unless-stopped diff --git a/Docker/start.sh b/Docker/start.sh index 2098fbe5f..4468c18e8 100755 --- a/Docker/start.sh +++ b/Docker/start.sh @@ -26,7 +26,7 @@ done export ADAGUC_PATH=/adaguc/adaguc-server-master/ && \ export ADAGUC_TMP=/tmp && \ -env -u ADAGUC_FORK /adaguc/adaguc-server-master/bin/adagucserver --updatedb \ +env -u ADAGUC_FORK_SOCKET_PATH /adaguc/adaguc-server-master/bin/adagucserver --updatedb \ --config /adaguc/adaguc-server-config.xml,baselayers.xml if [ $? -ne 0 ] diff --git a/Docker/supervisord/adaguc-pgbouncer.conf b/Docker/supervisord/adaguc-pgbouncer.conf index e47a6c80c..eee679b43 100644 --- a/Docker/supervisord/adaguc-pgbouncer.conf +++ b/Docker/supervisord/adaguc-pgbouncer.conf @@ -14,7 +14,7 @@ stdout_logfile=/dev/fd/1 stdout_logfile_maxbytes=0 redirect_stderr=true environment= - ADAGUC_FORK=1, + ADAGUC_FORK_SOCKET_PATH=1, ADAGUC_PATH=/adaguc/adaguc-server-master/, ADAGUC_TMP=/tmp, ADAGUC_CONFIG=/adaguc/adaguc-server-master/python/lib/adaguc/adaguc-server-config-python-postgres.xml diff --git a/Docker/supervisord/adaguc.conf b/Docker/supervisord/adaguc.conf index cd5ec5eff..5e12707a8 100644 --- a/Docker/supervisord/adaguc.conf +++ b/Docker/supervisord/adaguc.conf @@ -8,7 +8,7 @@ stdout_logfile=/dev/fd/1 stdout_logfile_maxbytes=0 redirect_stderr=true environment= - ADAGUC_FORK=1, + ADAGUC_FORK_SOCKET_PATH=1, ADAGUC_PATH=/adaguc/adaguc-server-master/, ADAGUC_TMP=/tmp, ADAGUC_CONFIG=/adaguc/adaguc-server-master/python/lib/adaguc/adaguc-server-config-python-postgres.xml diff --git a/adagucserverEC/adagucserver.cpp b/adagucserverEC/adagucserver.cpp index 2deaddc7f..f48ae0c28 100644 --- a/adagucserverEC/adagucserver.cpp +++ b/adagucserverEC/adagucserver.cpp @@ -476,8 +476,8 @@ int main(int argc, char **argv, char **envp) { setvbuf(stdout, NULL, _IONBF, 0); // turn off buffering setvbuf(stderr, NULL, _IONBF, 0); // turn off buffering - const char *ADAGUC_FORK = getenv("ADAGUC_FORK"); - if (ADAGUC_FORK != NULL) { + const char *ADAGUC_FORK_SOCKET_PATH = getenv("ADAGUC_FORK_SOCKET_PATH"); + if (ADAGUC_FORK_SOCKET_PATH != NULL) { return run_as_fork_service(run_adaguc_once, argc, argv, envp); } else { // normal flow without unix socket server/fork diff --git a/adagucserverEC/fork_server.cpp b/adagucserverEC/fork_server.cpp index 46bd54af4..ab0ac3b36 100644 --- a/adagucserverEC/fork_server.cpp +++ b/adagucserverEC/fork_server.cpp @@ -10,21 +10,14 @@ // Check this many seconds for old left-over processes const int CHECK_CHILD_PROC_INTERVAL = 60; -// Old left-over child process should be killed after this many seconds -// TODO: should this become same as `timeout=300` from CGIRunner.py? Should this be configurable? -const int MAX_CHILD_PROC_TIMEOUT = 300; +// Allow a backlog of this many connections. Uses `ADAGUC_NUMPARALLELPROCESSES` env var. const int DEFAULT_QUEUED_CONNECTIONS = 4; +// Old left-over child process should be killed after this many seconds. Uses `ADAGUC_MAX_PROC_TIMEOUT` env var. +const int DEFAULT_MAX_PROC_TIMEOUT = 300; -int get_max_pending_connections() { - // Set the max number of queued connections on the socket via ADAGUC_NUMPARALLELPROCESSES - CT::string num_parallel_processes(getenv("ADAGUC_NUMPARALLELPROCESSES")); - int max_pending_connections; - if (num_parallel_processes.isInt()) { - return num_parallel_processes.toInt(); - } - - // Default to 4 queued connections - return DEFAULT_QUEUED_CONNECTIONS; +int get_env_var_int(const char *env, int default_val) { + CT::string env_var(getenv(env)); + return env_var.isInt() ? env_var.toInt() : default_val; } void handle_client(int client_socket, int (*run_adaguc_once)(int, char **, char **, bool), int argc, char **argv, char **envp) { @@ -99,6 +92,7 @@ void *clean_child_procs(void *arg) { If child proc was started more than `MAX_CHILD_PROC_TIMEOUT` seconds ago, send SIGKILL. */ + int max_child_proc_timeout = get_env_var_int("ADAGUC_MAX_PROC_TIMEOUT", DEFAULT_MAX_PROC_TIMEOUT); while (1) { time_t now = time(NULL); printf("Checking all child procs\n"); @@ -106,7 +100,7 @@ void *clean_child_procs(void *arg) { for (const auto &child_proc_mapping : child_procs) { child_proc_t child_proc = child_proc_mapping.second; - if (difftime(now, child_proc.forked_at) < MAX_CHILD_PROC_TIMEOUT) { + if (difftime(now, child_proc.forked_at) < max_child_proc_timeout) { continue; } @@ -175,7 +169,8 @@ int run_as_fork_service(int (*run_adaguc_once)(int, char **, char **, bool), int } // Start listening on the socket. Can have `max_pending_connections` number of connections queued. - if (listen(listen_socket, get_max_pending_connections()) != 0) { + int max_pending_connections = get_env_var_int("ADAGUC_NUMPARALLELPROCESSES", DEFAULT_QUEUED_CONNECTIONS); + if (listen(listen_socket, max_pending_connections) != 0) { printf("Error on listen call \n"); return 1; } diff --git a/adagucserverEC/fork_server.h b/adagucserverEC/fork_server.h index dae2c8212..dfa243344 100644 --- a/adagucserverEC/fork_server.h +++ b/adagucserverEC/fork_server.h @@ -11,7 +11,6 @@ typedef struct { static std::map child_procs; -int get_max_pending_connections(); void handle_client(int client_socket, int (*run_adaguc_once)(int, char **, char **, bool), int argc, char **argv, char **envp); void child_signal_handler(int child_signal); void *clean_child_procs(void *arg); diff --git a/python/lib/adaguc/CGIRunner.py b/python/lib/adaguc/CGIRunner.py index 04777e0eb..08660a57b 100644 --- a/python/lib/adaguc/CGIRunner.py +++ b/python/lib/adaguc/CGIRunner.py @@ -5,14 +5,8 @@ import asyncio import sys -from subprocess import PIPE, Popen, STDOUT, TimeoutExpired -from threading import Thread +from subprocess import PIPE import os -import io -import errno -import time -import chardet -from queue import Queue, Empty # python 3.x import re from typing import NamedTuple @@ -23,10 +17,11 @@ ADAGUC_NUMPARALLELPROCESSES = os.getenv("ADAGUC_NUMPARALLELPROCESSES", "4") sem = asyncio.Semaphore(int(ADAGUC_NUMPARALLELPROCESSES)) - -ADAGUC_FORK_UNIX_SOCKET = f"{os.getenv('ADAGUC_PATH')}/adaguc.socket" +ADAGUC_FORK_SOCKET_PATH = f"{os.getenv('ADAGUC_PATH')}/adaguc.socket" ON_POSIX = "posix" in sys.builtin_module_names +MAX_PROC_TIMEOUT = int(os.getenv("ADAGUC_MAX_PROC_TIMEOUT", "300")) + class AdagucResponse(NamedTuple): status_code: int @@ -56,7 +51,7 @@ async def socket_communicate(url: str) -> AdagucResponse: """ process_output = bytearray() - reader, writer = await asyncio.open_unix_connection(ADAGUC_FORK_UNIX_SOCKET) + reader, writer = await asyncio.open_unix_connection(ADAGUC_FORK_SOCKET_PATH) writer.write(url.encode()) await writer.drain() @@ -117,7 +112,9 @@ class CGIRunner: Run the CGI script with specified URL and environment. Stdout is captured and put in a BytesIO object provided in output """ - async def run(self, cmds, url, output, env=[], path=None, isCGI=True, timeout=300): + async def run( + self, cmds, url, output, env=[], path=None, isCGI=True, timeout=MAX_PROC_TIMEOUT + ): localenv = {} if url != None: localenv["QUERY_STRING"] = url @@ -131,8 +128,8 @@ async def run(self, cmds, url, output, env=[], path=None, isCGI=True, timeout=30 # print("@@@@", url) - # Only use fork server if ADAGUC_FORK is set and adaguc is not executed with extra arguments e.g. `--updatelayermetadata` - use_fork = os.getenv("ADAGUC_FORK", None) and len(cmds) == 1 + # Only use fork server if ADAGUC_FORK_SOCKET_PATH is set and adaguc is not executed with extra arguments e.g. `--updatelayermetadata` + use_fork = os.getenv("ADAGUC_FORK_SOCKET_PATH", None) and len(cmds) == 1 async with sem: if use_fork: From 3bbf919c66b1722ad0bfbe0956e73611c85a47f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthijs=20Gr=C3=BCnbauer?= Date: Wed, 9 Oct 2024 14:34:56 +0200 Subject: [PATCH 16/17] Begin on environment variable docs --- doc/Environment_Variables.md | 36 ++++++++++++++++++++++++++++++++++++ doc/Running.md | 2 ++ 2 files changed, 38 insertions(+) create mode 100644 doc/Environment_Variables.md diff --git a/doc/Environment_Variables.md b/doc/Environment_Variables.md new file mode 100644 index 000000000..021bd4f69 --- /dev/null +++ b/doc/Environment_Variables.md @@ -0,0 +1,36 @@ +Environment variables +===================== + +The following environment variables allow you to change behaviour of the Adaguc server. + +Some of these environment variables might be set via the `docker-compose.yml` file, see [Running adaguc-server with docker](/doc/Running.md) + +| Environment variable | Description | +| -------------------- | ----------- | +| `ADAGUC_CONFIG` | pointer to the configuration file +| `ADAGUC_LOGFILE` | pointer where log messages should be stored, includes information logs and error logs +| `ADAGUC_ERRORFILE` | optional pointer which logs only error messages +| `ADAGUC_FONT` | Place where a TrueType font is stored, e.g. FreeSans.ttf +| `ADAGUC_DATARESTRICTION` | Optional pointer which controls access restrictions, by default set to FALSE, can be combinations of `ALLOW_WCS\|ALLOW_GFI\|ALLOW_METADATA\|SHOW_QUERYINFO`, separated with the \| token.

`FALSE`: No restrictions (default, same as `ALLOW_WCS\|ALLOW_GFI\|ALLOW_METADATA`)
`ALLOW_WCS`: Allows the Web Coverage Service, download of data
`ALLOW_GFI`: Allows GetFeatureInfo requests, e.g. getting information about a certain location
`ALLOW_METADATA`: Allows getting NetCDF header metadata information
`SHOW_QUERYINFO`: When a query has failed, the corresponding query will be presented to the user. This feature is disabled by default. +| `ADAGUC_PATH` | optional, is used as variable susbstitution {ADAGUC_PATH} in the configuration files, should point to the adagucserver installation +| `ADAGUC_TMP` | optional, is used as variable susbstitution {ADAGUC_TMP} in the configuration files, location where tempfiles need to be written +| `ADAGUC_ONLINERESOURCE` | optional, specify the online resource in the CGI script itself, see [OnlineResource](/doc/configuration/OnlineResource.md) to configure in the xml file. +| `PGBOUNCER_ENABLE` | Enable or disable the usage of PostgreSQL connection pooling by PGBouncer. Default: `true`. +| `PGBOUNCER_DISABLE_SSL` | If PGBouncer is used, disable the usage of SSL. Default: `true`. +| `ADAGUC_DB` | The connection string used by PostgreSQL. Default: `host=adaguc-db port=5432 user=adaguc password=adaguc dbname=adaguc`. +| `ADAGUC_ENABLELOGBUFFER` | Enable or disable log buffering. If disabled, no debug logging is shown at all. Default: `true`. +| `ADAGUC_AUTOWMS_DIR` | Default: `/data/adaguc-autowms` +| `ADAGUC_DATA_DIR` | Default: `/data/adaguc-data` +| `ADAGUC_DATASET_DIR` | Default: `/data/adaguc-datasets` +| `ADAGUC_FORK_SOCKET_PATH` | Enables or disables the usage of an Adaguc fork server. If set to an adaguc-writable path, e.g. `adaguc.socket`, Adaguc will launch a server in the background and fork itself when handling requests. Communication will take place through this socket. If left empty or removed, Adaguc will launch a subprocess which comes with overhead. +| `ADAGUC_MAX_PROC_TIMEOUT` | Every request made to Adaguc will timeout after this many seconds. Default: `300` seconds. +| `ADAGUC_PORT` | Port to listen to for the webserver, used by `docker-compose.yml`. Default: port `443`. +| `EXTERNALADDRESS` | Adaguc-viewer and adaguc-explorer will be reachable on this hostname. +| `ADAGUCENV_RETENTIONPERIOD` | ISO 8601 time period string indicating how long files should be retained. +| `ADAGUCENV_ENABLECLEANUP` | Enable or disable cleaning. + +### TODO: +- How/where you can apply these environment variables (.env, docker-compose.yaml, adaguc xml config)? +- What are the defaults? +- Which vars are optional or required? +- What is the format (string, caps only, path, etc)? \ No newline at end of file diff --git a/doc/Running.md b/doc/Running.md index de612e31f..dab8fa97c 100644 --- a/doc/Running.md +++ b/doc/Running.md @@ -41,6 +41,8 @@ bash docker-compose-generate-env.sh \ -f $ADAGUCDOCKERHOME/adaguc-data \ -p 443 # You can view or edit the file ./.env file + +For more info on what environment variables you can configure, see [Environment variables](/doc/Environment_Variables.md) ``` ### Step 3. Once the steps above have been done, it is time to start: From a49a6ff928b32b488e6cc91956adb3f7b7d96b07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthijs=20Gr=C3=BCnbauer?= Date: Wed, 9 Oct 2024 16:01:37 +0200 Subject: [PATCH 17/17] Add CDBDebug/Warn/Err prints for clean thread --- adagucserverEC/fork_server.cpp | 10 +++++----- doc/Environment_Variables.md | 4 ++-- python/lib/adaguc/CGIRunner.py | 4 ++++ 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/adagucserverEC/fork_server.cpp b/adagucserverEC/fork_server.cpp index ab0ac3b36..f1e42282f 100644 --- a/adagucserverEC/fork_server.cpp +++ b/adagucserverEC/fork_server.cpp @@ -5,6 +5,7 @@ #include #include +#include "CDebugger.h" #include "CTString.h" #include "fork_server.h" @@ -89,13 +90,13 @@ void child_signal_handler(int child_signal) { void *clean_child_procs(void *arg) { /* Every `CHECK_CHILD_PROC_INTERVAL` seconds, check all child procs stored in map. - If child proc was started more than `MAX_CHILD_PROC_TIMEOUT` seconds ago, send SIGKILL. + If child proc was started more than `ADAGUC_MAX_PROC_TIMEOUT` seconds ago, send SIGKILL. */ int max_child_proc_timeout = get_env_var_int("ADAGUC_MAX_PROC_TIMEOUT", DEFAULT_MAX_PROC_TIMEOUT); + CDBDebug("Checking every %d seconds for processes older than %d seconds\n", CHECK_CHILD_PROC_INTERVAL, max_child_proc_timeout); while (1) { time_t now = time(NULL); - printf("Checking all child procs\n"); for (const auto &child_proc_mapping : child_procs) { child_proc_t child_proc = child_proc_mapping.second; @@ -104,10 +105,9 @@ void *clean_child_procs(void *arg) { continue; } - printf("Child timeout!"); - + CDBWarning("Child process with pid %d running longer than %d, sending SIGKILL to clean up\n", child_proc_mapping.first, max_child_proc_timeout); if (kill(child_proc_mapping.first, SIGKILL) == -1) { - printf("Failed to send SIGKILL to child process"); + CDBError("Failed to send SIGKILL to child process %d\n", child_proc_mapping.first); // TODO: What to do if we cannot SIGKILL child process? } } diff --git a/doc/Environment_Variables.md b/doc/Environment_Variables.md index 021bd4f69..1153cecf1 100644 --- a/doc/Environment_Variables.md +++ b/doc/Environment_Variables.md @@ -12,8 +12,8 @@ Some of these environment variables might be set via the `docker-compose.yml` fi | `ADAGUC_ERRORFILE` | optional pointer which logs only error messages | `ADAGUC_FONT` | Place where a TrueType font is stored, e.g. FreeSans.ttf | `ADAGUC_DATARESTRICTION` | Optional pointer which controls access restrictions, by default set to FALSE, can be combinations of `ALLOW_WCS\|ALLOW_GFI\|ALLOW_METADATA\|SHOW_QUERYINFO`, separated with the \| token.

`FALSE`: No restrictions (default, same as `ALLOW_WCS\|ALLOW_GFI\|ALLOW_METADATA`)
`ALLOW_WCS`: Allows the Web Coverage Service, download of data
`ALLOW_GFI`: Allows GetFeatureInfo requests, e.g. getting information about a certain location
`ALLOW_METADATA`: Allows getting NetCDF header metadata information
`SHOW_QUERYINFO`: When a query has failed, the corresponding query will be presented to the user. This feature is disabled by default. -| `ADAGUC_PATH` | optional, is used as variable susbstitution {ADAGUC_PATH} in the configuration files, should point to the adagucserver installation -| `ADAGUC_TMP` | optional, is used as variable susbstitution {ADAGUC_TMP} in the configuration files, location where tempfiles need to be written +| `ADAGUC_PATH` | optional, is used as variable substitution {ADAGUC_PATH} in the configuration files, should point to the adagucserver installation +| `ADAGUC_TMP` | optional, is used as variable substitution {ADAGUC_TMP} in the configuration files, location where tempfiles need to be written | `ADAGUC_ONLINERESOURCE` | optional, specify the online resource in the CGI script itself, see [OnlineResource](/doc/configuration/OnlineResource.md) to configure in the xml file. | `PGBOUNCER_ENABLE` | Enable or disable the usage of PostgreSQL connection pooling by PGBouncer. Default: `true`. | `PGBOUNCER_DISABLE_SSL` | If PGBouncer is used, disable the usage of SSL. Default: `true`. diff --git a/python/lib/adaguc/CGIRunner.py b/python/lib/adaguc/CGIRunner.py index 08660a57b..e0c8872d9 100644 --- a/python/lib/adaguc/CGIRunner.py +++ b/python/lib/adaguc/CGIRunner.py @@ -40,6 +40,10 @@ async def wait_socket_communicate(url, timeout) -> AdagucResponse: return AdagucResponse( status_code=HTTP_STATUSCODE_500_TIMEOUT, process_output=None ) + except ConnectionRefusedError: + # TODO: If for whatever reason socket communication to the fork server fails, should we fall back to regular process spawning? + raise + return resp