From 69213c6c5b79d33bada9a1ed1d1e3171fce3f289 Mon Sep 17 00:00:00 2001 From: Jason Lee Date: Fri, 23 Aug 2019 10:37:20 -0600 Subject: [PATCH] buffered output and fine grained debug macros --- contrib/CMakeLists.txt | 6 - include/OutputBuffers.h | 95 ++++++ scripts/gufi_find | 6 +- src/CMakeLists.txt | 15 +- src/OutputBuffers.c | 126 ++++++++ src/gufi_query.c | 624 ++++++++++++++++++++++++---------------- 6 files changed, 613 insertions(+), 259 deletions(-) create mode 100644 include/OutputBuffers.h create mode 100644 src/OutputBuffers.c diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 74a3d09cc..ce1a34a74 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -80,12 +80,6 @@ add_executable(gendir gendir.c) target_link_libraries(gendir ${COMMON_LIBRARIES}) add_dependencies(gendir GUFI) - -add_executable(fill_queues fill_queues.c) -target_link_libraries(fill_queues ${COMMON_LIBRARIES}) -add_dependencies(fill_queues GUFI) - - # potentially useful C++ executables if (CMAKE_CXX_COMPILER) # a more complex index generator diff --git a/include/OutputBuffers.h b/include/OutputBuffers.h new file mode 100644 index 000000000..30a26479f --- /dev/null +++ b/include/OutputBuffers.h @@ -0,0 +1,95 @@ +/* +This file is part of GUFI, which is part of MarFS, which is released +under the BSD license. + + +Copyright (c) 2017, Los Alamos National Security (LANS), LLC +All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this +list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, +this list of conditions and the following disclaimer in the documentation and/or +other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its contributors +may be used to endorse or promote products derived from this software without +specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, +INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE +OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +----- +NOTE: +----- + +GUFI uses the C-Thread-Pool library. The original version, written by +Johan Hanssen Seferidis, is found at +https://github.com/Pithikos/C-Thread-Pool/blob/master/LICENSE, and is +released under the MIT License. LANS, LLC added functionality to the +original work. The original work, plus LANS, LLC added functionality is +found at https://github.com/jti-lanl/C-Thread-Pool, also under the MIT +License. The MIT License can be found at +https://opensource.org/licenses/MIT. + + +From Los Alamos National Security, LLC: +LA-CC-15-039 + +Copyright (c) 2017, Los Alamos National Security, LLC All rights reserved. +Copyright 2017. Los Alamos National Security, LLC. This software was produced +under U.S. Government contract DE-AC52-06NA25396 for Los Alamos National +Laboratory (LANL), which is operated by Los Alamos National Security, LLC for +the U.S. Department of Energy. The U.S. Government has rights to use, +reproduce, and distribute this software. NEITHER THE GOVERNMENT NOR LOS +ALAMOS NATIONAL SECURITY, LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR +ASSUMES ANY LIABILITY FOR THE USE OF THIS SOFTWARE. If software is +modified to produce derivative works, such modified software should be +clearly marked, so as not to confuse it with the version available from +LANL. + +THIS SOFTWARE IS PROVIDED BY LOS ALAMOS NATIONAL SECURITY, LLC AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL LOS ALAMOS NATIONAL SECURITY, LLC OR +CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT +OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING +IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY +OF SUCH DAMAGE. +*/ + + + +#include +#include + +/* Single Buffer */ +struct OutputBuffer { + char * buf; + size_t capacity; + size_t filled; +}; + +/* Buffers for all threads */ +struct OutputBuffers { + struct OutputBuffer * buffers; + pthread_mutex_t mutex; +}; + +struct OutputBuffers * OutputBuffers_init(struct OutputBuffers * obufs, const size_t count, const size_t capacity); +void OutputBuffers_destroy(struct OutputBuffers * obufs, const size_t count); diff --git a/scripts/gufi_find b/scripts/gufi_find index f70f2e425..d32a2bfcc 100755 --- a/scripts/gufi_find +++ b/scripts/gufi_find @@ -403,10 +403,9 @@ if __name__=='__main__': parser.add_argument('--smallest', dest='smallest', action='store_true', help='top n smallest files') parser.add_argument('--largest', dest='largest', action='store_true', help='top n largest files') parser.add_argument('--threads', '-t', metavar='n', dest='threads', type=qb.get_positive, default=qb.cpus(), help='number of threads') - parser.add_argument('--intermediate_dbs', metavar='n', dest='intermediates', type=qb.get_non_negative, help='number of intermediate databases') - parser.add_argument('--intermediate_skip', metavar='n', dest='skip', type=qb.get_non_negative, help='number of intermediate databases to skip when selecting the next database') parser.add_argument('--aggregate', dest='aggregate', action='store_true', help='aggregate results before printing') parser.add_argument('--exec', dest='query_exec', default=EXEC, help='Location of gufi_query executable') + parser.add_argument('--output', dest='output', type=str, help='Output file prefix (Creates file .tid)') args = parser.parse_args(); @@ -489,6 +488,9 @@ if __name__=='__main__': if args.mindepth: query_cmd += ['-y', str(args.mindepth)] + if args.output: + query_cmd += ['-o', args.output] + query = subprocess.Popen(query_cmd + find_out.split()) # positional arguments must appear after flags query.communicate() # block until query finishes diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 1383dc583..a53f317d9 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -106,7 +106,20 @@ endif() # ########################################## # create the GUFI library, which contains all of the common source files -set(GUFI_SOURCES bf.c dbutils.c debug.c opendb.c outfiles.c outdbs.c QueuePerThreadPool.c SinglyLinkedList.c structq.c template_db.c trace.c utils.c) +set(GUFI_SOURCES + bf.c + dbutils.c + debug.c + opendb.c + outfiles.c + outdbs.c + OutputBuffers.c + QueuePerThreadPool.c + SinglyLinkedList.c + structq.c + template_db.c + trace.c + utils.c) add_library(GUFI STATIC ${GUFI_SOURCES}) add_dependencies(GUFI install_dependencies) install(TARGETS GUFI DESTINATION lib COMPONENT Server) diff --git a/src/OutputBuffers.c b/src/OutputBuffers.c new file mode 100644 index 000000000..45266d649 --- /dev/null +++ b/src/OutputBuffers.c @@ -0,0 +1,126 @@ +/* +This file is part of GUFI, which is part of MarFS, which is released +under the BSD license. + + +Copyright (c) 2017, Los Alamos National Security (LANS), LLC +All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this +list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, +this list of conditions and the following disclaimer in the documentation and/or +other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its contributors +may be used to endorse or promote products derived from this software without +specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, +INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE +OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +----- +NOTE: +----- + +GUFI uses the C-Thread-Pool library. The original version, written by +Johan Hanssen Seferidis, is found at +https://github.com/Pithikos/C-Thread-Pool/blob/master/LICENSE, and is +released under the MIT License. LANS, LLC added functionality to the +original work. The original work, plus LANS, LLC added functionality is +found at https://github.com/jti-lanl/C-Thread-Pool, also under the MIT +License. The MIT License can be found at +https://opensource.org/licenses/MIT. + + +From Los Alamos National Security, LLC: +LA-CC-15-039 + +Copyright (c) 2017, Los Alamos National Security, LLC All rights reserved. +Copyright 2017. Los Alamos National Security, LLC. This software was produced +under U.S. Government contract DE-AC52-06NA25396 for Los Alamos National +Laboratory (LANL), which is operated by Los Alamos National Security, LLC for +the U.S. Department of Energy. The U.S. Government has rights to use, +reproduce, and distribute this software. NEITHER THE GOVERNMENT NOR LOS +ALAMOS NATIONAL SECURITY, LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR +ASSUMES ANY LIABILITY FOR THE USE OF THIS SOFTWARE. If software is +modified to produce derivative works, such modified software should be +clearly marked, so as not to confuse it with the version available from +LANL. + +THIS SOFTWARE IS PROVIDED BY LOS ALAMOS NATIONAL SECURITY, LLC AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL LOS ALAMOS NATIONAL SECURITY, LLC OR +CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT +OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING +IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY +OF SUCH DAMAGE. +*/ + +#include "OutputBuffers.h" + +#include + +static struct OutputBuffer * OutputBuffer_init(struct OutputBuffer * obuf, const size_t capacity) { + if (obuf) { + if (!(obuf->buf = malloc(capacity))) { + return NULL; + } + obuf->capacity = capacity; + obuf->filled = 0; + } + + return obuf; +} + +static void OutputBuffer_destroy(struct OutputBuffer * obuf) { + if (obuf) { + free(obuf->buf); + } +} + +struct OutputBuffers * OutputBuffers_init(struct OutputBuffers * obufs, const size_t count, const size_t capacity) { + if (!obufs) { + return NULL; + } + + if (pthread_mutex_init(&obufs->mutex, NULL)) { + return NULL; + } + + obufs->buffers = malloc(count * sizeof(struct OutputBuffer)); + for(size_t i = 0; i < count; i++) { + if (!OutputBuffer_init(&obufs->buffers[i], capacity)) { + OutputBuffers_destroy(obufs, i); + return NULL; + } + } + + return obufs; +} + +void OutputBuffers_destroy(struct OutputBuffers * obufs, const size_t count) { + for(size_t i = 0; i < count; i++) { + OutputBuffer_destroy(&obufs->buffers[i]); + } + free(obufs->buffers); + obufs->buffers = NULL; + + pthread_mutex_destroy(&obufs->mutex); +} diff --git a/src/gufi_query.c b/src/gufi_query.c index c7dfcba31..a45517312 100644 --- a/src/gufi_query.c +++ b/src/gufi_query.c @@ -102,6 +102,7 @@ OF SUCH DAMAGE. #endif #include "outdbs.h" #include "outfiles.h" +#include "OutputBuffers.h" #include "pcre.h" #include "SinglyLinkedList.h" #include "utils.h" @@ -127,8 +128,6 @@ static int total_files_callback(void * unused, int count, char ** data, char ** #endif #ifdef DEBUG -static pthread_mutex_t print_mutex = PTHREAD_MUTEX_INITIALIZER; - #ifdef CUMULATIVE_TIMES long double total_opendir_time = 0; long double total_open_time = 0; @@ -136,7 +135,9 @@ long double total_sqlite3_open_time = 0; long double total_create_tables_time = 0; long double total_set_pragmas_time = 0; long double total_attach_time = 0; +long double total_addqueryfuncs_time = 0; long double total_descend_time = 0; +long double total_check_args_time = 0; long double total_level_time = 0; long double total_readdir_time = 0; long double total_strncmp_time = 0; @@ -193,13 +194,15 @@ int create_tables(const char *name, sqlite3 *db); int set_pragmas(sqlite3 * db); static sqlite3 * opendb2(const char * name, const int rdonly, const int createtables, const int setpragmas - , struct timespec * sqlite3_open_start - , struct timespec * sqlite3_open_end - , struct timespec * create_tables_start - , struct timespec * create_tables_end - , struct timespec * set_pragmas_start - , struct timespec * set_pragmas_end -) { + #ifdef CUMULATIVE_TIMES + , struct timespec * sqlite3_open_start + , struct timespec * sqlite3_open_end + , struct timespec * create_tables_start + , struct timespec * create_tables_end + , struct timespec * set_pragmas_start + , struct timespec * set_pragmas_end + #endif + ) { sqlite3 * db = NULL; if (rdonly && createtables) { @@ -207,7 +210,7 @@ static sqlite3 * opendb2(const char * name, const int rdonly, const int createta return NULL; } - #ifdef DEBUG + #ifdef CUMULATIVE_TIMES clock_gettime(CLOCK_MONOTONIC, sqlite3_open_start); #endif @@ -221,18 +224,18 @@ static sqlite3 * opendb2(const char * name, const int rdonly, const int createta // no need to create because the file should already exist if (sqlite3_open_v2(name, &db, flags, GUFI_SQLITE_VFS) != SQLITE_OK) { - #ifdef DEBUG + #ifdef CUMULATIVE_TIMES clock_gettime(CLOCK_MONOTONIC, sqlite3_open_end); #endif /* fprintf(stderr, "Cannot open database: %s %s rc %d\n", name, sqlite3_errmsg(db), sqlite3_errcode(db)); */ sqlite3_close(db); // close db even if it didn't open to avoid memory leaks return NULL; } - #ifdef DEBUG + #ifdef CUMULATIVE_TIMES clock_gettime(CLOCK_MONOTONIC, sqlite3_open_end); #endif - #ifdef DEBUG + #ifdef CUMULATIVE_TIMES clock_gettime(CLOCK_MONOTONIC, create_tables_start); #endif if (createtables) { @@ -242,18 +245,18 @@ static sqlite3 * opendb2(const char * name, const int rdonly, const int createta return NULL; } } - #ifdef DEBUG + #ifdef CUMULATIVE_TIMES clock_gettime(CLOCK_MONOTONIC, create_tables_end); #endif - #ifdef DEBUG + #ifdef CUMULATIVE_TIMES clock_gettime(CLOCK_MONOTONIC, set_pragmas_start); #endif if (setpragmas) { // ignore errors set_pragmas(db); } - #ifdef DEBUG + #ifdef CUMULATIVE_TIMES clock_gettime(CLOCK_MONOTONIC, set_pragmas_end); #endif @@ -267,7 +270,9 @@ static size_t descend2(struct QPTPool *ctx, struct work *passmywork, DIR *dir, const size_t max_level - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) + , struct sll *check_args_starts + , struct sll *check_args_ends , struct sll *level_starts , struct sll *level_ends , struct sll *readdir_starts @@ -290,32 +295,51 @@ static size_t descend2(struct QPTPool *ctx, , struct sll *pushdir_ends #endif ) { + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) + struct timespec * check_args_start = malloc(sizeof(struct timespec)); + clock_gettime(CLOCK_MONOTONIC, check_args_start); + sll_push(check_args_starts, check_args_start); + #endif if (!passmywork) { fprintf(stderr, "Got NULL work\n"); + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) + struct timespec * check_args_end = malloc(sizeof(struct timespec)); + clock_gettime(CLOCK_MONOTONIC, check_args_end); + sll_push(check_args_ends, check_args_end); + #endif return 0; } if (!dir) { fprintf(stderr, "Could not open directory %s: %d %s\n", passmywork->name, errno, strerror(errno)); + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) + struct timespec * check_args_end = malloc(sizeof(struct timespec)); + clock_gettime(CLOCK_MONOTONIC, check_args_end); + sll_push(check_args_ends, check_args_end); + #endif return 0; } + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) + struct timespec * check_args_end = malloc(sizeof(struct timespec)); + clock_gettime(CLOCK_MONOTONIC, check_args_end); + sll_push(check_args_ends, check_args_end); + #endif - size_t pushed = 0; - - - #ifdef DEBUG - struct timespec * readdir_start = malloc(sizeof(struct timespec)); - clock_gettime(CLOCK_MONOTONIC, readdir_start); - sll_push(readdir_starts, readdir_start); + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) + struct timespec * level_start = malloc(sizeof(struct timespec)); + clock_gettime(CLOCK_MONOTONIC, level_start); + sll_push(level_starts, level_start); #endif + size_t pushed = 0; + const size_t next_level = passmywork->level + 1; const int level_check = (next_level <= max_level); - #ifdef DEBUG - struct timespec * readdir_end = malloc(sizeof(struct timespec)); - clock_gettime(CLOCK_MONOTONIC, readdir_end); - sll_push(readdir_ends, readdir_end); + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) + struct timespec * level_end = malloc(sizeof(struct timespec)); + clock_gettime(CLOCK_MONOTONIC, level_end); + sll_push(level_ends, level_end); #endif if (level_check) { @@ -325,13 +349,13 @@ static size_t descend2(struct QPTPool *ctx, // each struct dirent *entry = NULL; while (1) { - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) struct timespec * readdir_start = malloc(sizeof(struct timespec)); clock_gettime(CLOCK_MONOTONIC, readdir_start); sll_push(readdir_starts, readdir_start); #endif entry = readdir(dir); - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) struct timespec * readdir_end = malloc(sizeof(struct timespec)); clock_gettime(CLOCK_MONOTONIC, readdir_end); sll_push(readdir_ends, readdir_end); @@ -340,7 +364,7 @@ static size_t descend2(struct QPTPool *ctx, break; } - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) struct timespec * strncmp_start = malloc(sizeof(struct timespec)); clock_gettime(CLOCK_MONOTONIC, strncmp_start); sll_push(strncmp_starts, strncmp_start); @@ -348,7 +372,7 @@ static size_t descend2(struct QPTPool *ctx, const size_t len = strlen(entry->d_name); const int skip = (((len == 1) && (strncmp(entry->d_name, ".", 1) == 0)) || ((len == 2) && (strncmp(entry->d_name, "..", 2) == 0))); - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) struct timespec * strncmp_end = malloc(sizeof(struct timespec)); clock_gettime(CLOCK_MONOTONIC, strncmp_end); sll_push(strncmp_ends, strncmp_end); @@ -357,73 +381,74 @@ static size_t descend2(struct QPTPool *ctx, continue; } - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) struct timespec * snprintf_start = malloc(sizeof(struct timespec)); clock_gettime(CLOCK_MONOTONIC, snprintf_start); sll_push(snprintf_starts, snprintf_start); #endif struct work qwork; SNPRINTF(qwork.name, MAXPATH, "%s/%s", passmywork->name, entry->d_name); - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) struct timespec * snprintf_end = malloc(sizeof(struct timespec)); clock_gettime(CLOCK_MONOTONIC, snprintf_end); sll_push(snprintf_ends, snprintf_end); #endif - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) struct timespec * lstat_start = malloc(sizeof(struct timespec)); clock_gettime(CLOCK_MONOTONIC, lstat_start); sll_push(lstat_starts, lstat_start); #endif lstat(qwork.name, &qwork.statuso); - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) struct timespec * lstat_end = malloc(sizeof(struct timespec)); clock_gettime(CLOCK_MONOTONIC, lstat_end); sll_push(lstat_ends, lstat_end); #endif - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) struct timespec * isdir_start = malloc(sizeof(struct timespec)); clock_gettime(CLOCK_MONOTONIC, isdir_start); sll_push(isdir_starts, isdir_start); #endif const int isdir = S_ISDIR(qwork.statuso.st_mode); - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) struct timespec * isdir_end = malloc(sizeof(struct timespec)); clock_gettime(CLOCK_MONOTONIC, isdir_end); sll_push(isdir_ends, isdir_end); #endif if (isdir) { - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) struct timespec * access_start = malloc(sizeof(struct timespec)); clock_gettime(CLOCK_MONOTONIC, access_start); sll_push(access_starts, access_start); #endif const int accessible = !access(qwork.name, R_OK | X_OK); - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) struct timespec * access_end = malloc(sizeof(struct timespec)); clock_gettime(CLOCK_MONOTONIC, access_end); sll_push(access_ends, access_end); #endif if (accessible) { - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) struct timespec * set_start = malloc(sizeof(struct timespec)); clock_gettime(CLOCK_MONOTONIC, set_start); sll_push(set_starts, set_start); #endif qwork.level = next_level; qwork.type[0] = 'd'; + // this is how the parent gets passed on qwork.pinode = passmywork->statuso.st_ino; - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) struct timespec * set_end = malloc(sizeof(struct timespec)); clock_gettime(CLOCK_MONOTONIC, set_end); sll_push(set_ends, set_end); #endif - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) struct timespec * clone_start = malloc(sizeof(struct timespec)); clock_gettime(CLOCK_MONOTONIC, clone_start); sll_push(clone_starts, clone_start); @@ -434,20 +459,20 @@ static size_t descend2(struct QPTPool *ctx, struct work * clone = (struct work *) calloc(1, sizeof(struct work)); memcpy(clone, &qwork, sizeof(struct work)); - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) struct timespec * clone_end = malloc(sizeof(struct timespec)); clock_gettime(CLOCK_MONOTONIC, clone_end); sll_push(clone_ends, clone_end); #endif // this pushes the dir onto queue - pushdir does locking around queue update - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) struct timespec * pushdir_start = malloc(sizeof(struct timespec)); clock_gettime(CLOCK_MONOTONIC, pushdir_start); sll_push(pushdir_starts, pushdir_start); #endif QPTPool_enqueue(ctx, id, clone); - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) struct timespec * pushdir_end = malloc(sizeof(struct timespec)); clock_gettime(CLOCK_MONOTONIC, pushdir_end); sll_push(pushdir_ends, pushdir_end); @@ -485,36 +510,74 @@ int addqueryfuncs2(sqlite3 *db, struct QPTPool * ctx) { } // ////////////////////////////////////////////////////// -// print the results of the query static size_t rows = 0; -static int print_callback(void *out, int count, char **data, char **columns) { - static pthread_mutex_t print_mutex = PTHREAD_MUTEX_INITIALIZER; - static char ffielddelim[2]; - switch (in.dodelim) { - case 0: - SNPRINTF(ffielddelim,2,"|"); - break; - case 1: - SNPRINTF(ffielddelim,2,"%s",fielddelim); - break; - case 2: - SNPRINTF(ffielddelim,2,"%s",in.delim); - break; - } - rows++; +/* sqlite3_exec callback argument data */ +struct CallbackArgs { + struct OutputBuffers * output_buffers; + int id; +}; - if (out) { - pthread_mutex_lock(&print_mutex); +void flush_buffer(pthread_mutex_t * print_mutex, struct OutputBuffer * output_buffer, FILE * out) { + /* skip argument checking */ + + output_buffer->buf[output_buffer->filled] = '\0'; + + pthread_mutex_lock(print_mutex); + fwrite(output_buffer->buf, sizeof(char), output_buffer->filled, out); + pthread_mutex_unlock(print_mutex); + + output_buffer->filled = 0; +} + +static int print_callback(void * args, int count, char **data, char **columns) { + /* skip argument checking */ + + struct CallbackArgs * ca = (struct CallbackArgs *) args; + const int id = ca->id; + /* if (gts.outfd[id]) { */ + size_t * lens = malloc(count * sizeof(size_t)); + size_t row_len = count + 1; // one delimiter per column + newline for(int i = 0; i < count; i++) { - fprintf((FILE *) out, "%s%s", data[i], ffielddelim); + lens[i] = strlen(data[i]); + row_len += lens[i]; } - fprintf((FILE *) out, "\n"); - pthread_mutex_unlock(&print_mutex); - } + + // if theres not enough space in the buffer to fit the whole row, flush it first + if ((ca->output_buffers->buffers[id].filled + row_len + 1) >= ca->output_buffers->buffers[id].capacity) { + flush_buffer(&ca->output_buffers->mutex, &ca->output_buffers->buffers[id], gts.outfd[id]); + } + + char * buf = ca->output_buffers->buffers[id].buf; + size_t filled = ca->output_buffers->buffers[id].filled; + for(int i = 0; i < count; i++) { + memcpy(&buf[filled], data[i], lens[i]); + filled += lens[i]; + + buf[filled] = in.delim[0]; + filled++; + } + + buf[filled] = '\n'; + filled++; + + ca->output_buffers->buffers[id].filled = filled; + + // if the new data filled up the buffer, flush it + if ((ca->output_buffers->buffers[id].filled + 1) >= ca->output_buffers->buffers[id].capacity) { + flush_buffer(&ca->output_buffers->mutex, &ca->output_buffers->buffers[id], gts.outfd[id]); + } + + free(lens); + /* } */ return 0; } +struct ThreadArgs { + struct OutputBuffers output_buffers; + int (*print_callback_func)(void*,int,char**,char**); +}; + int processdir(struct QPTPool * ctx, void * data , const size_t id, void * args) { sqlite3 *db = NULL; int recs; @@ -523,28 +586,23 @@ int processdir(struct QPTPool * ctx, void * data , const size_t id, void * args) char endname[MAXPATH]; DIR * dir = NULL; - if (!data) { - return 1; - } + /* /\* Can probably skip this *\/ */ + /* if (!data) { */ + /* return 1; */ + /* } */ - if (!ctx || (id >= ctx->size)) { - free(data); - return 1; - } + /* /\* Can probably skip this *\/ */ + /* if (!ctx || (id >= ctx->size) || !args) { */ + /* free(data); */ + /* return 1; */ + /* } */ struct work * work = (struct work *) data; - // only print if not aggregating and user wants to print - // where to output to when printing - FILE *out = stdout; - if (in.outfile > 0) { - out = gts.outfd[id]; - } - char dbname[MAXSQL]; SNPRINTF(dbname, MAXSQL, "%s/" DBNAME, work->name); - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) struct timespec opendir_start; struct timespec opendir_end; struct timespec open_start; @@ -557,8 +615,12 @@ int processdir(struct QPTPool * ctx, void * data , const size_t id, void * args) struct timespec set_pragmas_end; struct timespec attach_start; struct timespec attach_end; + struct timespec addqueryfuncs_start; + struct timespec addqueryfuncs_end; struct timespec descend_start; struct timespec descend_end; + struct sll check_args_starts; + struct sll check_args_ends; struct sll level_starts; struct sll level_ends; struct sll readdir_starts; @@ -588,6 +650,8 @@ int processdir(struct QPTPool * ctx, void * data , const size_t id, void * args) struct timespec closedir_start; struct timespec closedir_end; + sll_init(&check_args_starts); + sll_init(&check_args_ends); sll_init(&level_starts); sll_init(&level_ends); sll_init(&readdir_starts); @@ -622,17 +686,17 @@ int processdir(struct QPTPool * ctx, void * data , const size_t id, void * args) } // open directory - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) clock_gettime(CLOCK_MONOTONIC, &opendir_start); #endif // keep opendir near opendb to help speed up sqlite3_open_v2 dir = opendir(work->name); - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) clock_gettime(CLOCK_MONOTONIC, &opendir_end); #endif // if we have out db then we have that db open so we just attach the gufi db - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) clock_gettime(CLOCK_MONOTONIC, &open_start); #endif if (in.outdb > 0) { @@ -640,7 +704,7 @@ int processdir(struct QPTPool * ctx, void * data , const size_t id, void * args) attachdb(dbname, db, "tree"); } else { db = opendb2(dbname, 1, 0, 0 - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) , &sqlite3_open_start , &sqlite3_open_end , &create_tables_start @@ -650,11 +714,11 @@ int processdir(struct QPTPool * ctx, void * data , const size_t id, void * args) #endif ); } - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) clock_gettime(CLOCK_MONOTONIC, &open_end); #endif - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) clock_gettime(CLOCK_MONOTONIC, &attach_start); #endif if (in.aggregate_or_print == AGGREGATE) { @@ -667,14 +731,20 @@ int processdir(struct QPTPool * ctx, void * data , const size_t id, void * args) goto out_dir; } } - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) clock_gettime(CLOCK_MONOTONIC, &attach_end); #endif + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) + clock_gettime(CLOCK_MONOTONIC, &addqueryfuncs_start); + #endif // this is needed to add some query functions like path() uidtouser() gidtogroup() if (db) { addqueryfuncs2(db, ctx); } + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) + clock_gettime(CLOCK_MONOTONIC, &addqueryfuncs_end); + #endif recs=1; /* set this to one record - if the sql succeeds it will set to 0 or 1 */ /* if it fails then this will be set to 1 and will go on */ @@ -701,14 +771,18 @@ int processdir(struct QPTPool * ctx, void * data , const size_t id, void * args) // so we have to go on and query summary and entries possibly if (recs > 0) { #ifdef DEBUG + #ifdef CUMULATIVE_TIMES clock_gettime(CLOCK_MONOTONIC, &descend_start); + #endif #ifdef SUBDIRECTORY_COUNTS const size_t pushed = #endif #endif // push subdirectories into the queue descend2(ctx, id, work, dir, in.max_level - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) + , &check_args_starts + , &check_args_ends , &level_starts , &level_ends , &readdir_starts @@ -732,12 +806,17 @@ int processdir(struct QPTPool * ctx, void * data , const size_t id, void * args) #endif ); #ifdef DEBUG + #ifdef CUMULATIVE_TIMES + clock_gettime(CLOCK_MONOTONIC, &descend_end); + #endif #ifdef SUBDIRECTORY_COUNTS - pthread_mutex_lock(&print_mutex); - fprintf(stderr, "%s %zu\n", work->name, pushed); - pthread_mutex_unlock(&print_mutex); + { + static pthread_mutex_t print_mutex = PTHREAD_MUTEX_INITIALIZER; + pthread_mutex_lock(&print_mutex); + fprintf(stderr, "%s %zu\n", work->name, pushed); + pthread_mutex_unlock(&print_mutex); + } #endif - clock_gettime(CLOCK_MONOTONIC, &descend_end); #endif if (db) { @@ -772,15 +851,24 @@ int processdir(struct QPTPool * ctx, void * data , const size_t id, void * args) SNPRINTF(gps[id].gpath,MAXPATH,"%s",work->name); realpath(work->name,gps[id].gfpath); - #ifdef DEBUG + struct ThreadArgs * ta = (struct ThreadArgs *) args; + struct CallbackArgs ca; + ca.output_buffers = &ta->output_buffers; + ca.id = id; + + /* // make a copy of the print arguments to allow for changing of output file */ + /* struct print_args pa = * (struct print_args *) args; */ + /* pa.out = out; */ + + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) clock_gettime(CLOCK_MONOTONIC, &exec_start); #endif char *err = NULL; - if (sqlite3_exec(db, in.sqlent, (int (*)(void*,int,char**,char**)) args, out, &err) != SQLITE_OK) { + if (sqlite3_exec(db, in.sqlent, ta->print_callback_func, &ca, &err) != SQLITE_OK) { fprintf(stderr, "Error: %s: %s\n", err, dbname); sqlite3_free(err); } - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) clock_gettime(CLOCK_MONOTONIC, &exec_end); #endif @@ -799,7 +887,7 @@ int processdir(struct QPTPool * ctx, void * data , const size_t id, void * args) } } - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) clock_gettime(CLOCK_MONOTONIC, &detach_start); #endif if (in.aggregate_or_print == AGGREGATE) { @@ -809,12 +897,12 @@ int processdir(struct QPTPool * ctx, void * data , const size_t id, void * args) goto out_dir; } } - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) clock_gettime(CLOCK_MONOTONIC, &detach_end); #endif // if we have an out db we just detach gufi db - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) clock_gettime(CLOCK_MONOTONIC, &close_start); #endif if (in.outdb > 0) { @@ -822,7 +910,7 @@ int processdir(struct QPTPool * ctx, void * data , const size_t id, void * args) } else { closedb(db); } - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) clock_gettime(CLOCK_MONOTONIC, &close_end); #endif @@ -830,11 +918,11 @@ int processdir(struct QPTPool * ctx, void * data , const size_t id, void * args) ; // close dir - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) clock_gettime(CLOCK_MONOTONIC, &closedir_start); #endif closedir(dir); - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) clock_gettime(CLOCK_MONOTONIC, &closedir_end); #endif @@ -847,81 +935,92 @@ int processdir(struct QPTPool * ctx, void * data , const size_t id, void * args) ; #ifdef DEBUG #ifdef CUMULATIVE_TIMES - pthread_mutex_lock(&print_mutex); - total_opendir_time += elapsed(&opendir_start, &opendir_end); - total_open_time += elapsed(&open_start, &open_end); - total_sqlite3_open_time += elapsed(&sqlite3_open_start, &sqlite3_open_end); - total_create_tables_time += elapsed(&create_tables_start, &create_tables_end); - total_set_pragmas_time += elapsed(&set_pragmas_start, &set_pragmas_end); - total_attach_time += elapsed(&attach_start, &attach_end); - total_descend_time += elapsed(&descend_start, &descend_end); - total_level_time += sll_loop_sum(&level_starts, &level_ends); - total_readdir_time += sll_loop_sum(&readdir_starts, &readdir_ends); - total_strncmp_time += sll_loop_sum(&strncmp_starts, &strncmp_ends); - total_snprintf_time += sll_loop_sum(&snprintf_starts, &snprintf_ends); - total_lstat_time += sll_loop_sum(&lstat_starts, &lstat_ends); - total_isdir_time += sll_loop_sum(&isdir_starts, &isdir_ends); - total_access_time += sll_loop_sum(&access_starts, &access_ends); - total_set_time += sll_loop_sum(&set_starts, &set_ends); - total_clone_time += sll_loop_sum(&clone_starts, &clone_ends); - total_pushdir_time += sll_loop_sum(&pushdir_starts, &pushdir_ends); - total_closedir_time += elapsed(&closedir_start, &closedir_end); - total_exec_time += elapsed(&exec_start, &exec_end); - total_detach_time += elapsed(&detach_start, &detach_end); - total_close_time += elapsed(&close_start, &close_end); - pthread_mutex_unlock(&print_mutex); - - sll_destroy(&level_starts); - sll_destroy(&level_ends); - sll_destroy(&readdir_starts); - sll_destroy(&readdir_ends); - sll_destroy(&strncmp_starts); - sll_destroy(&strncmp_ends); - sll_destroy(&snprintf_starts); - sll_destroy(&snprintf_ends); - sll_destroy(&lstat_starts); - sll_destroy(&lstat_ends); - sll_destroy(&isdir_starts); - sll_destroy(&isdir_ends); - sll_destroy(&access_starts); - sll_destroy(&access_ends); - sll_destroy(&set_starts); - sll_destroy(&set_ends); - sll_destroy(&clone_starts); - sll_destroy(&clone_ends); - sll_destroy(&pushdir_starts); - sll_destroy(&pushdir_ends); + { + static pthread_mutex_t print_mutex = PTHREAD_MUTEX_INITIALIZER; + pthread_mutex_lock(&print_mutex); + total_opendir_time += elapsed(&opendir_start, &opendir_end); + total_open_time += elapsed(&open_start, &open_end); + total_sqlite3_open_time += elapsed(&sqlite3_open_start, &sqlite3_open_end); + total_create_tables_time += elapsed(&create_tables_start, &create_tables_end); + total_set_pragmas_time += elapsed(&set_pragmas_start, &set_pragmas_end); + total_attach_time += elapsed(&attach_start, &attach_end); + total_addqueryfuncs_time += elapsed(&addqueryfuncs_start, &addqueryfuncs_end); + total_descend_time += elapsed(&descend_start, &descend_end); + total_check_args_time += sll_loop_sum(&check_args_starts, &check_args_ends); + total_level_time += sll_loop_sum(&level_starts, &level_ends); + total_readdir_time += sll_loop_sum(&readdir_starts, &readdir_ends); + total_strncmp_time += sll_loop_sum(&strncmp_starts, &strncmp_ends); + total_snprintf_time += sll_loop_sum(&snprintf_starts, &snprintf_ends); + total_lstat_time += sll_loop_sum(&lstat_starts, &lstat_ends); + total_isdir_time += sll_loop_sum(&isdir_starts, &isdir_ends); + total_access_time += sll_loop_sum(&access_starts, &access_ends); + total_set_time += sll_loop_sum(&set_starts, &set_ends); + total_clone_time += sll_loop_sum(&clone_starts, &clone_ends); + total_pushdir_time += sll_loop_sum(&pushdir_starts, &pushdir_ends); + total_closedir_time += elapsed(&closedir_start, &closedir_end); + total_exec_time += elapsed(&exec_start, &exec_end); + total_detach_time += elapsed(&detach_start, &detach_end); + total_close_time += elapsed(&close_start, &close_end); + pthread_mutex_unlock(&print_mutex); + sll_destroy(&check_args_starts); + sll_destroy(&check_args_ends); + sll_destroy(&level_starts); + sll_destroy(&level_ends); + sll_destroy(&readdir_starts); + sll_destroy(&readdir_ends); + sll_destroy(&strncmp_starts); + sll_destroy(&strncmp_ends); + sll_destroy(&snprintf_starts); + sll_destroy(&snprintf_ends); + sll_destroy(&lstat_starts); + sll_destroy(&lstat_ends); + sll_destroy(&isdir_starts); + sll_destroy(&isdir_ends); + sll_destroy(&access_starts); + sll_destroy(&access_ends); + sll_destroy(&set_starts); + sll_destroy(&set_ends); + sll_destroy(&clone_starts); + sll_destroy(&clone_ends); + sll_destroy(&pushdir_starts); + sll_destroy(&pushdir_ends); + } #endif #ifdef PER_THREAD_STATS - pthread_mutex_lock(&print_mutex); - fprintf(stderr, "%zu ", id); - fprintf(stderr, "%s ", work->name); - fprintf(stderr, "%" PRIu64 " ", timestamp(&opendir_start) - epoch); - fprintf(stderr, "%" PRIu64 " ", timestamp(&opendir_end) - epoch); - fprintf(stderr, "%" PRIu64 " ", timestamp(&open_start) - epoch); - fprintf(stderr, "%" PRIu64 " ", timestamp(&open_end) - epoch); - fprintf(stderr, "%" PRIu64 " ", timestamp(&sqlite3_open_start) - epoch); - fprintf(stderr, "%" PRIu64 " ", timestamp(&sqlite3_open_end) - epoch); - fprintf(stderr, "%" PRIu64 " ", timestamp(&create_tables_start) - epoch); - fprintf(stderr, "%" PRIu64 " ", timestamp(&create_tables_end) - epoch); - fprintf(stderr, "%" PRIu64 " ", timestamp(&set_pragmas_start) - epoch); - fprintf(stderr, "%" PRIu64 " ", timestamp(&set_pragmas_end) - epoch); - fprintf(stderr, "%" PRIu64 " ", timestamp(&attach_start) - epoch); - fprintf(stderr, "%" PRIu64 " ", timestamp(&attach_end) - epoch); - fprintf(stderr, "%" PRIu64 " ", timestamp(&descend_start) - epoch); - fprintf(stderr, "%" PRIu64 " ", timestamp(&descend_end) - epoch); - fprintf(stderr, "%" PRIu64 " ", timestamp(&exec_start) - epoch); - fprintf(stderr, "%" PRIu64 " ", timestamp(&exec_end) - epoch); - fprintf(stderr, "%" PRIu64 " ", timestamp(&detach_start) - epoch); - fprintf(stderr, "%" PRIu64 " ", timestamp(&detach_end) - epoch); - fprintf(stderr, "%" PRIu64 " ", timestamp(&close_start) - epoch); - fprintf(stderr, "%" PRIu64 " ", timestamp(&close_end) - epoch); - fprintf(stderr, "%" PRIu64 " ", timestamp(&closedir_start) - epoch); - fprintf(stderr, "%" PRIu64 " ", timestamp(&closedir_end) - epoch); - fprintf(stderr, "\n"); - pthread_mutex_unlock(&print_mutex); + { + static pthread_mutex_t print_mutex = PTHREAD_MUTEX_INITIALIZER; + pthread_mutex_lock(&print_mutex); + fprintf(stderr, "%zu ", id); + fprintf(stderr, "%s ", work->name); + fprintf(stderr, "%" PRIu64 " ", timestamp(&opendir_start) - epoch); + fprintf(stderr, "%" PRIu64 " ", timestamp(&opendir_end) - epoch); + fprintf(stderr, "%" PRIu64 " ", timestamp(&open_start) - epoch); + fprintf(stderr, "%" PRIu64 " ", timestamp(&open_end) - epoch); + fprintf(stderr, "%" PRIu64 " ", timestamp(&sqlite3_open_start) - epoch); + fprintf(stderr, "%" PRIu64 " ", timestamp(&sqlite3_open_end) - epoch); + fprintf(stderr, "%" PRIu64 " ", timestamp(&create_tables_start) - epoch); + fprintf(stderr, "%" PRIu64 " ", timestamp(&create_tables_end) - epoch); + fprintf(stderr, "%" PRIu64 " ", timestamp(&set_pragmas_start) - epoch); + fprintf(stderr, "%" PRIu64 " ", timestamp(&set_pragmas_end) - epoch); + fprintf(stderr, "%" PRIu64 " ", timestamp(&attach_start) - epoch); + fprintf(stderr, "%" PRIu64 " ", timestamp(&attach_end) - epoch); + fprintf(stderr, "%" PRIu64 " ", timestamp(&addqueryfuncs_start) - epoch); + fprintf(stderr, "%" PRIu64 " ", timestamp(&addqueryfuncs_end) - epoch); + fprintf(stderr, "%" PRIu64 " ", timestamp(&descend_start) - epoch); + fprintf(stderr, "%" PRIu64 " ", timestamp(&descend_end) - epoch); + fprintf(stderr, "%" PRIu64 " ", timestamp(&exec_start) - epoch); + fprintf(stderr, "%" PRIu64 " ", timestamp(&exec_end) - epoch); + fprintf(stderr, "%" PRIu64 " ", timestamp(&detach_start) - epoch); + fprintf(stderr, "%" PRIu64 " ", timestamp(&detach_end) - epoch); + fprintf(stderr, "%" PRIu64 " ", timestamp(&close_start) - epoch); + fprintf(stderr, "%" PRIu64 " ", timestamp(&close_end) - epoch); + fprintf(stderr, "%" PRIu64 " ", timestamp(&closedir_start) - epoch); + fprintf(stderr, "%" PRIu64 " ", timestamp(&closedir_end) - epoch); + fprintf(stderr, "\n"); + pthread_mutex_unlock(&print_mutex); + } #endif #endif @@ -944,12 +1043,12 @@ static void cleanup_intermediates(sqlite3 **intermediates, const size_t count) { int main(int argc, char *argv[]) { - #if defined(DEBUG) || BENCHMARK + #if (defined(DEBUG) && defined(CUMULATIVE_TIMES)) || BENCHMARK struct timespec start; clock_gettime(CLOCK_MONOTONIC, &start); #endif - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) epoch = timestamp(&start); #endif @@ -970,18 +1069,21 @@ int main(int argc, char *argv[]) return -1; } - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) struct timespec setup_globals_start; clock_gettime(CLOCK_MONOTONIC, &setup_globals_start); #endif + struct ThreadArgs args; + // initialize globals if (!outfiles_init(gts.outfd, in.outfile, in.outfilen, in.maxthreads) || - !outdbs_init (gts.outdbd, in.outdb, in.outdbn, in.maxthreads, in.sqlinit)) { + !outdbs_init (gts.outdbd, in.outdb, in.outdbn, in.maxthreads, in.sqlinit) || + !OutputBuffers_init(&args.output_buffers, in.maxthreads, 65536)) { return -1; } - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) struct timespec setup_globals_end; clock_gettime(CLOCK_MONOTONIC, &setup_globals_end); const long double setup_globals_time = elapsed(&setup_globals_start, &setup_globals_end); @@ -995,7 +1097,7 @@ int main(int argc, char *argv[]) fprintf(stderr, " with %d threads\n", in.maxthreads); #endif - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) struct timespec setup_aggregate_start; clock_gettime(CLOCK_MONOTONIC, &setup_aggregate_start); #endif @@ -1028,15 +1130,15 @@ int main(int argc, char *argv[]) } } - #ifdef DEBUG + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) struct timespec setup_aggregate_end; clock_gettime(CLOCK_MONOTONIC, &setup_aggregate_end); const long double setup_aggregate_time = elapsed(&setup_aggregate_start, &setup_aggregate_end); #endif + #if (defined(DEBUG) && defined(CUMULATIVE_TIMES)) || BENCHMARK long double total_time = 0; - #if defined(DEBUG) || BENCHMARK struct timespec work_start; clock_gettime(CLOCK_MONOTONIC, &work_start); #endif @@ -1071,35 +1173,47 @@ int main(int argc, char *argv[]) } // provide a function to print if PRINT is set - int (*print_callback_func)(void*,int,char**,char**) = (((in.aggregate_or_print == PRINT) && in.printdir)?print_callback:NULL); - if (QPTPool_start(pool, processdir, print_callback_func) != (size_t) in.maxthreads) { + args.print_callback_func = (((in.aggregate_or_print == PRINT) && in.printdir)?print_callback:NULL); + if (QPTPool_start(pool, processdir, &args) != (size_t) in.maxthreads) { fprintf(stderr, "Failed to start all threads\n"); return -1; } QPTPool_wait(pool); - const size_t thread_count = QPTPool_threads_started(pool); + + #if (defined(DEBUG) && defined(CUMULATIVE_TIMES)) || BENCHMARK + const size_t thread_count = + #endif + QPTPool_threads_started(pool); QPTPool_destroy(pool); - #if defined(DEBUG) || BENCHMARK + // clear out buffered data + for(int i = 0; i < in.maxthreads; i++) { + flush_buffer(&args.output_buffers.mutex, &args.output_buffers.buffers[i], gts.outfd[i]); + } + + #if (defined(DEBUG) && defined(CUMULATIVE_TIMES)) || BENCHMARK struct timespec work_end; clock_gettime(CLOCK_MONOTONIC, &work_end); const long double work_time = elapsed(&work_start, &work_end); total_time += work_time; #endif + #if (defined(DEBUG) && defined(CUMULATIVE_TIMES)) || BENCHMARK long double aggregate_time = 0; long double cleanup_time = 0; long double output_time = 0; + #endif + /* size_t rows = 0; */ if (in.aggregate_or_print == AGGREGATE) { // prepend the intermediate database query with "INSERT INTO" to move // the data from the databases into the final aggregation database char intermediate[MAXSQL]; sqlite3_snprintf(MAXSQL, intermediate, "INSERT INTO %s.entries %s", AGGREGATE_ATTACH_NAME, in.intermediate); - #if defined(DEBUG) || BENCHMARK + #if (defined(DEBUG) && defined(CUMULATIVE_TIMES)) || BENCHMARK struct timespec aggregate_start; clock_gettime(CLOCK_MONOTONIC, &aggregate_start); #endif @@ -1112,14 +1226,14 @@ int main(int argc, char *argv[]) } } - #if defined(DEBUG) || BENCHMARK + #if (defined(DEBUG) && defined(CUMULATIVE_TIMES)) || BENCHMARK struct timespec aggregate_end; clock_gettime(CLOCK_MONOTONIC, &aggregate_end); aggregate_time = elapsed(&aggregate_start, &aggregate_end); total_time += aggregate_time; #endif - #if defined(DEBUG) || BENCHMARK + #if (defined(DEBUG) && defined(CUMULATIVE_TIMES)) || BENCHMARK struct timespec cleanup_start; clock_gettime(CLOCK_MONOTONIC, &cleanup_start); #endif @@ -1127,20 +1241,19 @@ int main(int argc, char *argv[]) // cleanup the intermediate databases outside of the timing (no need to detach) cleanup_intermediates(intermediates, in.maxthreads); - #if defined(DEBUG) || BENCHMARK + #if (defined(DEBUG) && defined(CUMULATIVE_TIMES)) || BENCHMARK struct timespec cleanup_end; clock_gettime(CLOCK_MONOTONIC, &cleanup_end); cleanup_time = elapsed(&cleanup_start, &cleanup_end); total_time += cleanup_time; #endif - #if defined(DEBUG) || BENCHMARK + #if (defined(DEBUG) && defined(CUMULATIVE_TIMES)) || BENCHMARK struct timespec output_start; clock_gettime(CLOCK_MONOTONIC, &output_start); #endif // run the aggregate query on the aggregated results - rows = 0; sqlite3_stmt *res = NULL; if (sqlite3_prepare_v2(aggregate, in.aggregate, MAXSQL, &res, NULL) == SQLITE_OK) { rows = print_results(res, stdout, 1, 0, in.printing, in.delim); @@ -1150,7 +1263,7 @@ int main(int argc, char *argv[]) } sqlite3_finalize(res); - #if defined(DEBUG) || BENCHMARK + #if (defined(DEBUG) && defined(CUMULATIVE_TIMES)) || BENCHMARK struct timespec output_end; clock_gettime(CLOCK_MONOTONIC, &output_end); output_time = elapsed(&output_start, &output_end); @@ -1158,60 +1271,71 @@ int main(int argc, char *argv[]) #endif closedb(aggregate); - } - - // clean up globals - outdbs_fin (gts.outdbd, in.maxthreads, in.sqlfin); - outfiles_fin(gts.outfd, in.maxthreads); - - #ifdef DEBUG - #ifdef CUMULATIVE_TIMES - fprintf(stderr, "set up globals: %.2Lfs\n", setup_globals_time); - fprintf(stderr, "set up intermediate databases: %.2Lfs\n", setup_aggregate_time); - fprintf(stderr, "thread pool: %.2Lfs\n", work_time); - fprintf(stderr, " open directories: %.2Lfs\n", total_opendir_time); - fprintf(stderr, " open databases: %.2Lfs\n", total_open_time); - fprintf(stderr, " sqlite3_open_v2: %.2Lfs\n", total_sqlite3_open_time); - fprintf(stderr, " create tables: %.2Lfs\n", total_create_tables_time); - fprintf(stderr, " set pragmas: %.2Lfs\n", total_set_pragmas_time); - fprintf(stderr, " attach intermediate databases: %.2Lfs\n", total_attach_time); - fprintf(stderr, " descend: %.2Lfs\n", total_descend_time); - fprintf(stderr, " check level: %.2Lfs\n", total_level_time); - fprintf(stderr, " readdir: %.2Lfs\n", total_readdir_time); - fprintf(stderr, " strncmp: %.2Lfs\n", total_strncmp_time); - fprintf(stderr, " snprintf: %.2Lfs\n", total_snprintf_time); - fprintf(stderr, " lstat: %.2Lfs\n", total_lstat_time); - fprintf(stderr, " isdir: %.2Lfs\n", total_isdir_time); - fprintf(stderr, " access: %.2Lfs\n", total_access_time); - fprintf(stderr, " set: %.2Lfs\n", total_set_time); - fprintf(stderr, " clone: %.2Lfs\n", total_clone_time); - fprintf(stderr, " pushdir: %.2Lfs\n", total_pushdir_time); - fprintf(stderr, " sqlite3_exec %.2Lfs\n", total_exec_time); - fprintf(stderr, " detach intermediate databases: %.2Lfs\n", total_detach_time); - fprintf(stderr, " close databases: %.2Lfs\n", total_close_time); - fprintf(stderr, " close directories: %.2Lfs\n", total_closedir_time); - fprintf(stderr, "aggregate into final databases: %.2Lfs\n", aggregate_time); - fprintf(stderr, "clean up intermediate databases: %.2Lfs\n", cleanup_time); - fprintf(stderr, "print aggregated results: %.2Lfs\n", output_time); - fprintf(stderr, "\n"); - fprintf(stderr, "Rows returned: %zu\n", rows); - fprintf(stderr, "Queries performed: %zu\n", (size_t) (thread_count + in.maxthreads + 1)); - fprintf(stderr, "Real time: %.2Lfs\n", total_time); - #endif - #endif - - #if BENCHMARK - /* struct timespec end; */ - /* clock_gettime(CLOCK_MONOTONIC, &end); */ - - /* const long double main_time = elapsed(&start, &end); */ - - fprintf(stderr, "Total Dirs: %zu\n", thread_count); - fprintf(stderr, "Total Files: %zu\n", total_files); - fprintf(stderr, "Time Spent Querying: %.2Lfs\n", total_time); - fprintf(stderr, "Dirs/Sec: %.2Lf\n", thread_count / total_time); - fprintf(stderr, "Files/Sec: %.2Lf\n", total_files / total_time); - #endif - - return 0; + } + + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) + struct timespec cleanup_globals_start; + clock_gettime(CLOCK_MONOTONIC, &cleanup_globals_start); + #endif + + // clean up globals + OutputBuffers_destroy(&args.output_buffers, in.maxthreads); + outdbs_fin (gts.outdbd, in.maxthreads, in.sqlfin); + outfiles_fin(gts.outfd, in.maxthreads); + + #if defined(DEBUG) && defined(CUMULATIVE_TIMES) + struct timespec cleanup_globals_end; + clock_gettime(CLOCK_MONOTONIC, &cleanup_globals_end); + const long double cleanup_globals_time = elapsed(&cleanup_globals_start, &cleanup_globals_end); + + fprintf(stderr, "set up globals: %.2Lfs\n", setup_globals_time); + fprintf(stderr, "set up intermediate databases: %.2Lfs\n", setup_aggregate_time); + fprintf(stderr, "thread pool: %.2Lfs\n", work_time); + fprintf(stderr, " open directories: %.2Lfs\n", total_opendir_time); + fprintf(stderr, " open databases: %.2Lfs\n", total_open_time); + fprintf(stderr, " sqlite3_open_v2: %.2Lfs\n", total_sqlite3_open_time); + fprintf(stderr, " create tables: %.2Lfs\n", total_create_tables_time); + fprintf(stderr, " set pragmas: %.2Lfs\n", total_set_pragmas_time); + fprintf(stderr, " attach intermediate databases: %.2Lfs\n", total_attach_time); + fprintf(stderr, " addqueryfuncs: %.2Lfs\n", total_addqueryfuncs_time); + fprintf(stderr, " descend: %.2Lfs\n", total_descend_time); + fprintf(stderr, " check args: %.2Lfs\n", total_check_args_time); + fprintf(stderr, " check level: %.2Lfs\n", total_level_time); + fprintf(stderr, " readdir: %.2Lfs\n", total_readdir_time); + fprintf(stderr, " strncmp: %.2Lfs\n", total_strncmp_time); + fprintf(stderr, " snprintf: %.2Lfs\n", total_snprintf_time); + fprintf(stderr, " lstat: %.2Lfs\n", total_lstat_time); + fprintf(stderr, " isdir: %.2Lfs\n", total_isdir_time); + fprintf(stderr, " access: %.2Lfs\n", total_access_time); + fprintf(stderr, " set: %.2Lfs\n", total_set_time); + fprintf(stderr, " clone: %.2Lfs\n", total_clone_time); + fprintf(stderr, " pushdir: %.2Lfs\n", total_pushdir_time); + fprintf(stderr, " sqlite3_exec %.2Lfs\n", total_exec_time); + fprintf(stderr, " detach intermediate databases: %.2Lfs\n", total_detach_time); + fprintf(stderr, " close databases: %.2Lfs\n", total_close_time); + fprintf(stderr, " close directories: %.2Lfs\n", total_closedir_time); + fprintf(stderr, "aggregate into final databases: %.2Lfs\n", aggregate_time); + fprintf(stderr, "clean up intermediate databases: %.2Lfs\n", cleanup_time); + fprintf(stderr, "print aggregated results: %.2Lfs\n", output_time); + fprintf(stderr, "clean up globals: %.2Lfs\n", cleanup_globals_time); + fprintf(stderr, "\n"); + fprintf(stderr, "Rows returned: %zu\n", rows); + fprintf(stderr, "Queries performed: %zu\n", (size_t) (thread_count + in.maxthreads + 1)); + fprintf(stderr, "Real time: %.2Lfs\n", total_time); + #endif + + #if BENCHMARK + /* struct timespec end; */ + /* clock_gettime(CLOCK_MONOTONIC, &end); */ + + /* const long double main_time = elapsed(&start, &end); */ + + fprintf(stderr, "Total Dirs: %zu\n", thread_count); + fprintf(stderr, "Total Files: %zu\n", total_files); + fprintf(stderr, "Time Spent Querying: %.2Lfs\n", total_time); + fprintf(stderr, "Dirs/Sec: %.2Lf\n", thread_count / total_time); + fprintf(stderr, "Files/Sec: %.2Lf\n", total_files / total_time); + #endif + + return 0; }