diff --git a/src/mustang/mustang_engine.c b/src/mustang/mustang_engine.c index ed522629..1f583a2a 100644 --- a/src/mustang/mustang_engine.c +++ b/src/mustang/mustang_engine.c @@ -95,7 +95,7 @@ size_t id_cache_capacity; /** * This private routine encapulates the logic used to generete the thread - * tasks from the arguments given on the command line. While the logic itself + * tasks from path arguments given on the command line. While the logic itself * could be part of the body of main(), having a separate routine helps * focus on the creating of tasks. * @param the_arg : the commandline argument to process @@ -107,7 +107,7 @@ size_t id_cache_capacity; * passed to the tasks * @param task_queue* queue : Queue of available threads - passed to the tasks. */ -void push_args2queue(char *the_arg, marfs_config* config, marfs_position* start_position, hashtable* output_table, pthread_mutex_t* hashtable_lock, task_queue* queue) { +void push_pathargs2queue(char *the_arg, marfs_config* config, marfs_position* start_position, hashtable* output_table, pthread_mutex_t* hashtable_lock, task_queue* queue) { struct stat arg_statbuf; int statcode = stat(the_arg, &arg_statbuf); // Need to hold a "mutable" path for config_traverse() to modify as needed @@ -177,14 +177,11 @@ void push_args2queue(char *the_arg, marfs_config* config, marfs_position* start_ return; } - MDAL_DHANDLE task_dirhandle; - - MDAL task_mdal = new_task_position->ns->prepo->metascheme.mdal; - // If new depth > 0 (guaranteed by previous logic), the target is a // directory, so "place" new task within directory. if (new_task_depth != 0) { - task_dirhandle = task_mdal->opendir(new_task_position->ctxt, next_basepath); + MDAL task_mdal = new_task_position->ns->prepo->metascheme.mdal; + MDAL_DHANDLE task_dirhandle = task_mdal->opendir(new_task_position->ctxt, next_basepath); if (task_dirhandle == NULL) { LOG(LOG_ERR, "Failed to open target directory \"%s\" (%s)\n", next_basepath, strerror(errno)); @@ -222,17 +219,17 @@ void push_args2queue(char *the_arg, marfs_config* config, marfs_position* start_ switch (new_task_depth) { case 0: // Namespace case. Enqueue a new namespace traversal task. - top_task = task_init(config, new_task_position, next_file, output_table, hashtable_lock, queue, &traverse_ns); + top_task = task_init(config, new_task_position, strdup(next_basepath), next_file, output_table, hashtable_lock, queue, &traverse_ns); task_enqueue(queue, top_task); LOG(LOG_DEBUG, "Created top-level namespace traversal task at basepath: \"%s\"\n", next_basepath); break; default: // "Regular" (directory or file) case. Enqueue a new directory traversal task. if (next_file) { - top_task = task_init(config, new_task_position, next_file, output_table, hashtable_lock, queue, &traverse_file); + top_task = task_init(config, new_task_position, strdup(next_basepath), next_file, output_table, hashtable_lock, queue, &traverse_file); LOG(LOG_DEBUG,"Created task to get Object ID(s) for file \"%s\"\n", next_file); } else { - top_task = task_init(config, new_task_position, next_file, output_table, hashtable_lock, queue, &traverse_dir); + top_task = task_init(config, new_task_position, strdup(next_basepath), next_file, output_table, hashtable_lock, queue, &traverse_dir); LOG(LOG_DEBUG, "Created top-level directory traversal task at basepath: \"%s\"\n", next_basepath); } task_enqueue(queue, top_task); @@ -244,6 +241,92 @@ void push_args2queue(char *the_arg, marfs_config* config, marfs_position* start_ } +/** + * This private routine encapulates the logic used to generete the thread + * tasks from object ID arguments given on the command line. While the logic itself + * could be part of the body of main(), having a separate routine helps + * focus on the creating of tasks. + * + * Like push_pathargs2queue(), this routine adds initial tasks to the task queue. + * However these tasks will only run in the namespaces designated by the object IDs. + * @param the_arg : the commandline argument to process + * @param marfs_position* start_position : the Start Position of directory in MarFS file system. + * Used to create the Task Position + * @param hashtable* output_table : Hash Table holding the file paths of files that are in + * object(s) passed to the tasks + * @param pthread_mutex_t* hashtable_lock : Lock to use when accessing output_table - + * passed to the tasks + * @param task_queue* queue : Queue of available threads - passed to the tasks. + */ +void push_objargs2queue(char* the_arg, marfs_config* config, marfs_position* start_position, hashtable* output_table, pthread_mutex_t* hashtable_lock, task_queue* queue) { + char* the_objid = strdup(the_arg);//holds the object ID of the files to look for in the namespace + char nsrootbuf[PATH_MAX]; //buffer to hold the namespace root path found in the object ID + char nsmntbuf[PATH_MAX]; //buffer to hold the actual MarFS user mount point + size_t nsrootlen = ftag_nspath(the_objid, nsrootbuf, PATH_MAX);//the length of the namespace root path string + + // If namespace string is bigger than maxpath, then something wierd is happening. + // If it is zero then there was a problem parsing it out of the object ID + if (!nsrootlen || (nsrootlen >= PATH_MAX)) { + LOG(LOG_ERR, "Failed to extract namespace path from object ID arg \"%s\" (path length = %d)--skipping to next\n", the_arg, nsrootlen); + if (the_objid) free(the_objid); + return; + } + snprintf(nsmntbuf, PATH_MAX, "%s%s", config->mountpoint, nsrootbuf); + + // Once we have the namespace root, need to postion ourselves in the user metadata tree + marfs_position* new_task_position = calloc(1, sizeof(marfs_position)); + + if (config_duplicateposition(start_position, new_task_position)) { + LOG(LOG_ERR, "Failed to duplicate parent position to new task--skipping to next\n"); + if (the_objid) free(the_objid); + free(new_task_position); + return; + } + + char* nsroot = strdup(nsmntbuf);//needs to be a maluable pointer to use later... + int new_task_depth = config_traverse(config, new_task_position, &nsroot, 0); + + if (new_task_depth < 0) { + LOG(LOG_ERR, "Failed to traverse (got depth: %d)--skipping to next\n", new_task_depth); + if (the_objid) free(the_objid); + free(nsroot); + config_abandonposition(new_task_position); + free(new_task_position); + return; + } + // If new depth != 0, then the namespace root must not designate a valid namespace + if (new_task_depth > 0) { + LOG(LOG_ERR, "\"%s\" is NOT a valid namespace root (got depth: %d)--skipping to next\n", nsroot, new_task_depth); + if (the_objid) free(the_objid); + free(nsroot); + config_abandonposition(new_task_position); + free(new_task_position); + return; + } + // ... fortify the new position + if (config_fortifyposition(new_task_position)) { + LOG(LOG_ERR, "Failed to fortify new_task position after new_task traverse!\n"); + if (the_objid) free(the_objid); + free(nsroot); + config_abandonposition(new_task_position); + free(new_task_position); + return; + } + + // Tell the new task where it is (at the top of the namespace) by recording its + // depth in its state. + new_task_position->depth = new_task_depth; + + // Now add the task to traverse the namespace, looking for files in the object to + // the task queue + mustang_task* top_task = task_init(config, new_task_position, strdup(nsmntbuf), the_objid, output_table, hashtable_lock, queue, &traverse_objns); + task_enqueue(queue, top_task); + LOG(LOG_DEBUG, "Created top-level namespace traversal task at namespace path: \"%s\" looking for files in object \"%s\"\n", nsmntbuf, the_objid); + + + return; +} + // Argument definition string for getopt(). The command line arguments are as // follows: // -t Maximum number of threads used by this process. This @@ -488,7 +571,12 @@ int main(int argc, char** argv) { // Parse each path argument, check them for validity, and pass along initial tasks for (; patharg_idx < argc; patharg_idx++){ LOG(LOG_INFO, "Processing arg \"%s\"\n", argv[patharg_idx]); - push_args2queue(argv[patharg_idx], parent_config, &parent_position, output_table, &ht_lock, queue); + // If the arg has a "|" in it, then it is an object ID, not a path. + // Need to initilize the queue/task differently + if (strchr(argv[patharg_idx],'|')) + push_objargs2queue(argv[patharg_idx], parent_config, &parent_position, output_table, &ht_lock, queue); + else + push_pathargs2queue(argv[patharg_idx], parent_config, &parent_position, output_table, &ht_lock, queue); } pthread_mutex_lock(queue->lock); @@ -506,7 +594,7 @@ int main(int argc, char** argv) { // Once there are no tasks left in the queue to do, send workers sentinel (all-NULL) tasks so that they know to exit. for (size_t i = 0; i < max_threads; i += 1) { - mustang_task* sentinel = task_init(NULL, NULL, NULL, NULL, NULL, NULL, NULL); + mustang_task* sentinel = task_init(NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); task_enqueue(queue, sentinel); } diff --git a/src/mustang/mustang_threading.c b/src/mustang/mustang_threading.c index 4ff87a72..aed8d25c 100644 --- a/src/mustang/mustang_threading.c +++ b/src/mustang/mustang_threading.c @@ -237,6 +237,7 @@ void process_file(marfs_position* task_position, char * current_file, hashtable* * the object id(s) of the file, and places them in the output_table. * @param marfs_config* base_config : Configuration for MarFS file system * @param marfs_position* task_position : Location or position of directory in MarFS file system + * @param char* usrpath : the path of the directory the task in currently in * @param char* file_name : a "short file name". No path/parent included. Used only by this routine! * @param hashtable* output_table : Hash Table holding Object Ids of scanned files * @param pthread_mutex_t* table_lock : Lock to use when accessing output_table @@ -246,8 +247,10 @@ void process_file(marfs_position* task_position, char * current_file, hashtable* * always logged to the logfile passed as a program argument since all * build settings at least log errors. */ -void traverse_file(marfs_config* base_config, marfs_position* task_position, char* file_name, hashtable* output_table, pthread_mutex_t* table_lock, task_queue* pool_queue) { - if (file_name == NULL) { +void traverse_file(marfs_config* base_config, marfs_position* task_position, char* usrpath, void* file_name, hashtable* output_table, pthread_mutex_t* table_lock, task_queue* pool_queue) { + char* fname = (char*)file_name; + + if (fname == NULL) { LOG(LOG_ERR, "Attempted to traverse a blank file name!\n"); return; } @@ -266,11 +269,12 @@ void traverse_file(marfs_config* base_config, marfs_position* task_position, cha // Do the actual scanning of the file. Since this is always a single file, no // ID Cache is needed. - process_file(task_position, file_name, output_table, table_lock, NULL); + process_file(task_position, fname, output_table, table_lock, NULL); // The task is done with file_name. So we can clean it up! free(file_name); file_name = NULL; + free(usrpath); return; } @@ -284,7 +288,8 @@ void traverse_file(marfs_config* base_config, marfs_position* task_position, cha * worker thread to complete. * @param marfs_config* base_config : Configuration for MarFS file system * @param marfs_position* task_position : Location or position of directory in MarFS file system - * @param char* file_name : a "short file name". No path/parent included. Not used by this routine. + * @param char* usrpath : the path of the directory the task in currently in + * @param void* file_name : a "short file name". No path/parent included. Not used by this routine. * @param hashtable* output_table : Hash Table holding Object Ids of scanned files * @param pthread_mutex_t* table_lock : Lock to use when accessing output_table * @param task_queue* pool_queue : Queue of available threads @@ -293,7 +298,7 @@ void traverse_file(marfs_config* base_config, marfs_position* task_position, cha * always logged to the logfile passed as a program argument since all * build settings at least log errors. */ -void traverse_dir(marfs_config* base_config, marfs_position* task_position, char* file_name, hashtable* output_table, pthread_mutex_t* table_lock, task_queue* pool_queue) { +void traverse_dir(marfs_config* base_config, marfs_position* task_position, char* usrpath, void* file_name, hashtable* output_table, pthread_mutex_t* table_lock, task_queue* pool_queue) { id_cache* this_id_cache = id_cache_init(id_cache_capacity); if (this_id_cache == NULL) { @@ -402,20 +407,23 @@ void traverse_dir(marfs_config* base_config, marfs_position* task_position, char continue; } - new_dir_position->depth = new_depth; + char dnamebuf[PATH_MAX]; mustang_task* new_task; + snprintf(dnamebuf, PATH_MAX, "%s/%s", usrpath, current_entry->d_name); + new_dir_position->depth = new_depth; + // depth == -1 case (config_traverse() error) has already been handled, so presuming that 0 and > 0 are exhaustive is safe. switch (new_depth) { case 0: // Namespace case. Enqueue a new namespace traversal task. - new_task = task_init(base_config, new_dir_position, file_name, output_table, table_lock, pool_queue, &traverse_ns); + new_task = task_init(base_config, new_dir_position, strdup(dnamebuf), file_name, output_table, table_lock, pool_queue, &traverse_ns); task_enqueue(pool_queue, new_task); LOG(LOG_DEBUG, "Created new task to traverse namespace \"%s\"\n", new_basepath); break; default: // "Regular" (directory) case. Enqueue a new directory traversal task. - new_task = task_init(base_config, new_dir_position, file_name, output_table, table_lock, pool_queue, &traverse_dir); + new_task = task_init(base_config, new_dir_position, strdup(dnamebuf), file_name, output_table, table_lock, pool_queue, &traverse_dir); task_enqueue(pool_queue, new_task); LOG(LOG_DEBUG, "Created new task to traverse directory \"%s\"\n", new_basepath); break; @@ -445,6 +453,222 @@ void traverse_dir(marfs_config* base_config, marfs_position* task_position, char } free(task_position); + free(usrpath); +} + +/** + * Using a task's parameters, traverse the directory that the current setting + * of `task_position` corresponds to, reading all directory entries and acting + * accordingly. If an entry corresponds to a regular file, get its FTAG and its + * object ID(s), and compares against the object ID specified in objid. If there + * is a matchi then the pathname is storedin the hashtable. If an entry corresponds + * to a regular directory, create a new task for that directory bundled with + * appropriate state for a worker thread to complete. If namespace is encountered, + * it is ignored. + * @param marfs_config* base_config : Configuration for MarFS file system + * @param marfs_position* task_position : Location or position of directory in MarFS file system + * @param char* usrpath : the path of the directory the task in currently in + * @param void* objid : the object ID to look for + * @param hashtable* output_table : Hash Table holding Object Ids of scanned files + * @param pthread_mutex_t* table_lock : Lock to use when accessing output_table + * @param task_queue* pool_queue : Queue of available threads + * + * NOTE: This function always returns, including on failure. Failures are + * always logged to the logfile passed as a program argument since all + * build settings at least log errors. + */ +void traverse_objdir(marfs_config* base_config, marfs_position* task_position, char* usrpath, void* objid, hashtable* output_table, pthread_mutex_t* table_lock, task_queue* pool_queue) { + char* dataobj_id = (char*)objid; + + // Attempt to fortify the thread's position (if not already fortified) and check for errors + if ((task_position->ctxt == NULL) && config_fortifyposition(task_position)) { + LOG(LOG_ERR, "Failed to fortify MarFS position while traversing a directory!\n"); + return; + } + + // Define a convenient alias for this thread's relevant MDAL, from which + // all metadata ops will be launched + MDAL thread_mdal = task_position->ns->prepo->metascheme.mdal; + + // Recover a directory handle for the cwd to enable later readdir() + MDAL_DHANDLE cwd_handle = thread_mdal->opendir(task_position->ctxt, "."); + + // Cannot proceed with "main" traversal logic if handle is NULL (i.e., + // if an error occurred on opendir) + if (cwd_handle == NULL) { + LOG(LOG_ERR, "Failed to open current directory for reading! (%s)\n", strerror(errno)); + config_abandonposition(task_position); + free(task_position); + return; + } + + // "Regular" readdir logic + struct dirent* current_entry = thread_mdal->readdir(cwd_handle); + + // Unlike a standard collection, directories are not strictly "ordered". + // So, simply retrieve all entries until readdir() returns NULL to + // indicate "no more entries" + while (current_entry != NULL) { + // Ignore dirents corresponding to "invalid" paths (reference tree, + // etc.) + if (thread_mdal->pathfilter(current_entry->d_name) != 0) { + current_entry = thread_mdal->readdir(cwd_handle); + continue; + } + + if (current_entry->d_type == DT_DIR) { + // Skip current directory "." and parent directory ".." to avoid infinite loop in directory traversal + if ( (strncmp(current_entry->d_name, ".", strlen(current_entry->d_name)) == 0) || (strncmp(current_entry->d_name, "..", strlen(current_entry->d_name)) == 0) ) { + current_entry = thread_mdal->readdir(cwd_handle); + continue; + } + + marfs_position* new_dir_position = (marfs_position*) calloc(1, sizeof(marfs_position)); + if (new_dir_position == NULL) { + LOG(LOG_ERR, "Failed to allocate memory for new new_task position (current entry: %s)\n", current_entry->d_name); + current_entry = thread_mdal->readdir(cwd_handle); + continue; + } + + if (config_duplicateposition(task_position, new_dir_position)) { + LOG(LOG_ERR, "Failed to duplicate parent position to new_task (current entry: %s)\n", current_entry->d_name); + config_abandonposition(new_dir_position); + free(new_dir_position); + current_entry = thread_mdal->readdir(cwd_handle); + continue; + } + + char* new_basepath = strdup(current_entry->d_name); + int new_depth = config_traverse(base_config, new_dir_position, &new_basepath, 0); + + if (new_depth <= 0) { + // Skipping any namespace entries + if (!new_depth) { + LOG(LOG_DEBUG, "Skipping namespace: \"%s\"\n", current_entry->d_name); + } else { + LOG(LOG_ERR, "Failed to traverse to target: \"%s\"\n", current_entry->d_name); + } + + free(new_basepath); + config_abandonposition(new_dir_position); + free(new_dir_position); + + current_entry = thread_mdal->readdir(cwd_handle); + continue; + } + + // Open a directory handle for the "child" task that is being + // created to enable chdir() for that task and starting "directly" + // at the new position + MDAL_DHANDLE next_cwd_handle = thread_mdal->opendir(new_dir_position->ctxt, current_entry->d_name); + + if (next_cwd_handle == NULL) { + LOG(LOG_ERR, "Failed to open directory handle for new_task (%s) (directory: \"%s\")\n", strerror(errno), current_entry->d_name); + + free(new_basepath); + config_abandonposition(new_dir_position); + free(new_dir_position); + + current_entry = thread_mdal->readdir(cwd_handle); + continue; + } + + if (thread_mdal->chdir(new_dir_position->ctxt, next_cwd_handle)) { + LOG(LOG_ERR, "Failed to chdir to target directory \"%s\" (%s).\n", current_entry->d_name, strerror(errno)); + + free(new_basepath); + config_abandonposition(new_dir_position); + free(new_dir_position); + thread_mdal->closedir(next_cwd_handle); + + current_entry = thread_mdal->readdir(cwd_handle); + continue; + } + + char dnamebuf[PATH_MAX]; + + snprintf(dnamebuf, PATH_MAX, "%s/%s", usrpath, current_entry->d_name); + new_dir_position->depth = new_depth; + mustang_task* new_task = task_init(base_config, new_dir_position, strdup(dnamebuf), objid, output_table, table_lock, pool_queue, &traverse_objdir); + + // Put new task on queue + task_enqueue(pool_queue, new_task); + LOG(LOG_DEBUG, "Created new task to traverse directory \"%s\"\n", new_basepath); + + // Basepath no longer needed after logging since new_task reopens + // dirhandle not with basepath, but with post-chdir "." reference. + free(new_basepath); + } else if (current_entry->d_type == DT_REG) { + // Get FTAG of the file + char* file_ftagstr = get_ftag(task_position, thread_mdal, current_entry->d_name); + FTAG retrieved_tag = {0}; + char* retrieved_id = NULL; + + // Initialize FTAG struct from string representation + if (ftag_initstr(&retrieved_tag, file_ftagstr)) { + LOG(LOG_ERR, "Failed to initialize FTAG for file: \"%s\"\n", current_entry->d_name); + free(file_ftagstr); + free(usrpath); + return; + } + + size_t objno_min = retrieved_tag.objno; + size_t objno_max = datastream_filebounds(&retrieved_tag); + ne_erasure placeholder_erasure; + ne_location placeholder_location; + + // Sufficiently large files may be "chunked" (i.e., logically + // separated) into multiple backend MarFS objects depending on file + // size and the MarFS config. Make sure that object IDs for *all* chunks + // of the file are retrieved and compared, not just the ID for the + // first chunk. + for (size_t i = objno_min; i <= objno_max; i += 1) { + int found = 0; // a flag to set if the object ID is found in the file's objects + retrieved_tag.objno = i; + if (datastream_objtarget(&retrieved_tag, &(task_position->ns->prepo->datascheme), &retrieved_id, &placeholder_erasure, &placeholder_location)) { + LOG(LOG_ERR, "Failed to get object ID for chunk %zu of current object \"%s\"\n", i, current_entry->d_name); + continue; + } + + // Compare the retrieved object ID with the object ID we are looking for + if (!strcmp(retrieved_id,dataobj_id)) { + char fnamebuf[PATH_MAX]; + // We have a winner! + snprintf(fnamebuf, PATH_MAX, "%s/%s", usrpath, current_entry->d_name); + pthread_mutex_lock(table_lock); + put(output_table, fnamebuf); // put() dupes string into new heap space + pthread_mutex_unlock(table_lock); + LOG(LOG_DEBUG, "Recorded file \"%s\" in output table for object \"%s\".\n", fnamebuf, retrieved_id); + found = 1; + } + + free(retrieved_id); + retrieved_id = NULL; // make ptr NULL to better discard stale reference + if (found) break; + } + +// Paul defined ftag_cleanup(). But currently not availible in current INSTALLED version of MarFS library +// ftag_cleanup(&retrieved_tag); // free internal allocated memory for FTAG's ctag and streamid fields + if (retrieved_tag.ctag) free(retrieved_tag.ctag); + if (retrieved_tag.streamid) free(retrieved_tag.streamid); + free(file_ftagstr); + file_ftagstr = NULL; // discard stale reference to FTAG to prevent double-free + } + + current_entry = thread_mdal->readdir(cwd_handle); + } // end readdir loop + + // Clean up other per-task state + if (thread_mdal->closedir(cwd_handle)) { + LOG(LOG_WARNING, "Failed to close handle for current working directory!\n"); + } + + if (config_abandonposition(task_position)) { + LOG(LOG_WARNING, "Failed to abandon base position!\n"); + } + + free(task_position); + free(usrpath); } /** @@ -456,7 +680,8 @@ void traverse_dir(marfs_config* base_config, marfs_position* task_position, char * traverse_dir(). * @param marfs_config* base_config : Configuration for MarFS file system * @param marfs_position* task_position : Location or position of directory in MarFS file system - * @param char* file_name : a "short file name". No path/parent included. Not used by this routine. + * @param char* usrpath : the path of the current directory the task is in + * @param void* file_name : a "short file name". No path/parent included. Not used by this routine. * @param hashtable* output_table : Hash Table holding Object Ids of scanned files * @param pthread_mutex_t* table_lock : Lock to use when accessing output_table * @param task_queue* pool_queue : Queue of available threads @@ -465,10 +690,11 @@ void traverse_dir(marfs_config* base_config, marfs_position* task_position, char * failure. Failures are always logged to the relevant logfile since all build * settings for MUSTANG log errors. */ -void traverse_ns(marfs_config* base_config, marfs_position* task_position, char * file_name, hashtable* output_table, pthread_mutex_t* table_lock, task_queue* pool_queue) { +void traverse_ns(marfs_config* base_config, marfs_position* task_position, char* usrpath, void* file_name, hashtable* output_table, pthread_mutex_t* table_lock, task_queue* pool_queue) { // Attempt to fortify the thread's position (if not already fortified) and check for errors if ((task_position->ctxt == NULL) && config_fortifyposition(task_position)) { LOG(LOG_ERR, "Failed to fortify MarFS position!\n"); + free(usrpath); return; } @@ -508,7 +734,7 @@ void traverse_ns(marfs_config* base_config, marfs_position* task_position, char } // subspaces of this namespace are namespaces "prima facie" --- automatically create traverse_ns task - mustang_task* new_ns_task = task_init(base_config, new_ns_position, file_name, output_table, table_lock, pool_queue, &traverse_ns); + mustang_task* new_ns_task = task_init(base_config, new_ns_position, strdup(current_subnode.name), (void*)file_name, output_table, table_lock, pool_queue, &traverse_ns); task_enqueue(pool_queue, new_ns_task); LOG(LOG_DEBUG, "Created new namespace traversal task at basepath: \"%s\"\n", new_ns_path); free(new_ns_path); @@ -518,7 +744,40 @@ void traverse_ns(marfs_config* base_config, marfs_position* task_position, char // Namespaces may also contain subdirectories and files. Proceed to the // directory traversal routine (the "regular" common case) and examine any // other namespace contents. - traverse_dir(base_config, task_position, file_name, output_table, table_lock, pool_queue); + traverse_dir(base_config, task_position, strdup(usrpath), file_name, output_table, table_lock, pool_queue); + + free(usrpath); + return; +} + +/** + * This basically wrapper for traverse_objdir(). Since we are only looking + * for files in a specified namespace, no need to follow subspaces, like + * traverse_ns(). + * @param marfs_config* base_config : Configuration for MarFS file system + * @param marfs_position* task_position : Location or position of directory in MarFS file system + * @param char* usrpath : the path of the current directory the task is in + * @param void* objid : the object ID to look for + * @param hashtable* output_table : Hash Table holding scanned files who are apart of objid + * @param pthread_mutex_t* table_lock : Lock to use when accessing output_table + * @param task_queue* pool_queue : Queue of available threads + * + * NOTE: Like traverse_dir(), this function always returns, including on + * failure. Failures are always logged to the relevant logfile since all build + * settings for MUSTANG log errors. + */ +void traverse_objns(marfs_config* base_config, marfs_position* task_position, char* usrpath, void* objid, hashtable* output_table, pthread_mutex_t* table_lock, task_queue* pool_queue) { + // Attempt to fortify the thread's position (if not already fortified) and check for errors + if ((task_position->ctxt == NULL) && config_fortifyposition(task_position)) { + LOG(LOG_ERR, "Failed to fortify MarFS position!\n"); + free(usrpath); + return; + } + + traverse_objdir(base_config, task_position, strdup(usrpath), objid, output_table, table_lock, pool_queue); + + free(usrpath); + return; } /** @@ -546,7 +805,7 @@ void* thread_launcher(void* args) { // In all other circumstances, tasks will be initialized with a // function pointer indicating what work (traversing a namespace or // traversing a directory) needs to be performed, so jump to that. - next_task->task_func(next_task->config, next_task->position, next_task->fname, next_task->ht, next_task->ht_lock, queue); + next_task->task_func(next_task->config, next_task->position, next_task->usrpath, next_task->taskarg, next_task->ht, next_task->ht_lock, queue); pthread_mutex_lock(queue->lock); diff --git a/src/mustang/mustang_threading.h b/src/mustang/mustang_threading.h index f61c895a..ed9cdcec 100644 --- a/src/mustang/mustang_threading.h +++ b/src/mustang/mustang_threading.h @@ -105,7 +105,7 @@ char* get_ftag(marfs_position* current_position, MDAL current_mdal, char* path); * always logged to the logfile passed as a program argument since all * build settings at least log errors. */ -void traverse_file(marfs_config* base_config, marfs_position* task_position, char* file_name, hashtable* output_table, pthread_mutex_t* table_lock, task_queue* pool_queue); +void traverse_file(marfs_config* base_config, marfs_position* task_position, char* usrpath, void* file_name, hashtable* output_table, pthread_mutex_t* table_lock, task_queue* pool_queue); /** * Using a task's parameters, traverse the directory that the current setting @@ -119,7 +119,22 @@ void traverse_file(marfs_config* base_config, marfs_position* task_position, cha * always logged to the logfile passed as a program argument since all * build settings at least log errors. */ -void traverse_dir(marfs_config* base_config, marfs_position* task_position, char* file_name, hashtable* output_table, pthread_mutex_t* table_lock, task_queue* pool_queue); +void traverse_dir(marfs_config* base_config, marfs_position* task_position, char* usrpath, void* file_name, hashtable* output_table, pthread_mutex_t* table_lock, task_queue* pool_queue); + +/** + * Using a task's parameters, traverse the directory that the current setting + * of `task_position` corresponds to, reading all directory entries and acting + * accordingly. If an entry corresponds to a regular file, get its FTAG and its + * object ID(s), and compares against the object ID specified in objid. If there + * is a matchi then the pathname is storedin the hashtable. If an entry corresponds + * to a regular directory, create a new task for that directory bundled with + * appropriate state for a worker thread to complete. If namespace is encountered, + * + * NOTE: This function always returns, including on failure. Failures are + * always logged to the logfile passed as a program argument since all + * build settings at least log errors. + */ +void traverse_objdir(marfs_config* base_config, marfs_position* task_position, char* usrpath, void* objid, hashtable* output_table, pthread_mutex_t* table_lock, task_queue* pool_queue); /** * Using a task's parameters, check the namespace that the current setting of @@ -133,6 +148,17 @@ void traverse_dir(marfs_config* base_config, marfs_position* task_position, char * failure. Failures are always logged to the relevant logfile since all build * settings for MUSTANG log errors. */ -void traverse_ns(marfs_config* base_config, marfs_position* task_position, char* file_name, hashtable* output_table, pthread_mutex_t* table_lock, task_queue* pool_queue); +void traverse_ns(marfs_config* base_config, marfs_position* task_position, char* usrpath, void* file_name, hashtable* output_table, pthread_mutex_t* table_lock, task_queue* pool_queue); + +/** + * This basically wrapper for traverse_objdir(). Since we are only looking + * for files in a specified namespace, no need to follow subspaces, like + * traverse_ns(). + * + * NOTE: Like traverse_dir(), this function always returns, including on + * failure. Failures are always logged to the relevant logfile since all build + * settings for MUSTANG log errors. + */ +void traverse_objns(marfs_config* base_config, marfs_position* task_position, char* usrpath, void* objid, hashtable* output_table, pthread_mutex_t* table_lock, task_queue* pool_queue); #endif diff --git a/src/mustang/task_queue.c b/src/mustang/task_queue.c index d7a09ca8..18944d73 100644 --- a/src/mustang/task_queue.c +++ b/src/mustang/task_queue.c @@ -69,8 +69,8 @@ GNU licenses can be found at http://www.gnu.org/licenses/. * Returns: valid pointer to mustang_task struct on success, or NULL on * failure. */ -mustang_task* task_init(marfs_config* task_config, marfs_position* task_position, char * task_fname, hashtable* task_ht, pthread_mutex_t* task_ht_lock, task_queue* queue_ref, - void (*traversal_routine)(marfs_config*, marfs_position*, char*, hashtable*, pthread_mutex_t*, task_queue*)) { +mustang_task* task_init(marfs_config* task_config, marfs_position* task_position, char * task_path, void* task_arg, hashtable* task_ht, pthread_mutex_t* task_ht_lock, task_queue* queue_ref, + void (*traversal_routine)(marfs_config*, marfs_position*, char*, void*, hashtable*, pthread_mutex_t*, task_queue*)) { mustang_task* new_task = (mustang_task*) calloc(1, sizeof(mustang_task)); if (new_task == NULL) { @@ -80,7 +80,8 @@ mustang_task* task_init(marfs_config* task_config, marfs_position* task_position // Initialize new task state like in a constructor based on arguments new_task->config = task_config; new_task->position = task_position; - new_task->fname = task_fname; + new_task->usrpath = task_path; + new_task->taskarg = task_arg; new_task->ht = task_ht; new_task->ht_lock = task_ht_lock; new_task->queue_ptr = queue_ref; diff --git a/src/mustang/task_queue.h b/src/mustang/task_queue.h index 14d62c33..74b579fe 100644 --- a/src/mustang/task_queue.h +++ b/src/mustang/task_queue.h @@ -71,13 +71,14 @@ typedef struct mustang_task_queue_struct task_queue; typedef struct mustang_task_struct { marfs_config* config; marfs_position* position; - char* fname; + char* usrpath; // holds the user path of the directory the task is operating in + void* taskarg; // holds an optional task argument hashtable* ht; pthread_mutex_t* ht_lock; task_queue* queue_ptr; // Tasks are retrieved from the queue, but during task execution other tasks may need to be enqueued. // The routine to execute. For the current version of Mustang (1.2.x), either `traverse_ns()` for a namespace or `traverse_dir()` for a regular directory. // If NULL, workers will detect this, clean up their state, and exit. - void (*task_func)(marfs_config*, marfs_position*, char *, hashtable*, pthread_mutex_t*, task_queue*); + void (*task_func)(marfs_config*, marfs_position*, char *, void*, hashtable*, pthread_mutex_t*, task_queue*); mustang_task* prev; // Queue implemented as doubly-linked list of tasks mustang_task* next; } mustang_task; @@ -103,7 +104,7 @@ typedef struct mustang_task_queue_struct { * Returns: valid pointer to mustang_task struct on success, or NULL on * failure. */ -mustang_task* task_init(marfs_config* task_config, marfs_position* task_position, char *task_fname, hashtable* task_ht, pthread_mutex_t* task_ht_lock, task_queue* task_queue_ref, void (*traversal_routine)(marfs_config*, marfs_position*, char *, hashtable*, pthread_mutex_t*, task_queue*)); +mustang_task* task_init(marfs_config* task_config, marfs_position* task_position, char* task_path, void* task_arg, hashtable* task_ht, pthread_mutex_t* task_ht_lock, task_queue* task_queue_ref, void (*traversal_routine)(marfs_config*, marfs_position*, char *, void*, hashtable*, pthread_mutex_t*, task_queue*)); /** * Allocate space for, and return a pointer to, a new task_queue struct on the diff --git a/src/tagging/tagging.c b/src/tagging/tagging.c index 4480759f..439cc1b4 100644 --- a/src/tagging/tagging.c +++ b/src/tagging/tagging.c @@ -811,6 +811,65 @@ size_t ftag_datatgt( const FTAG* ftag, char* tgtstr, size_t len ) { } +/** + * Parses a given object ID string, and populates the given string buffer with the + * namespace associated with the object ID in a path format (i.e. /namespace). + * @param const char *objid : String containing the object ID + * @param char* tgtstr : String buffer to be populated with the namespace path + * @param size_t len : Byte length of the target buffer + * @return size_t : Length of the produced string ( excluding NULL-terminator ), or zero if + * an error occurred. + * NOTE -- if this value is >= the length of the provided buffer, this + * indicates that insufficint buffer space was provided and the resulting + * output string was truncated. + */ +size_t ftag_nspath( const char* objid, char* tgtstr, size_t len ) { + // check for NULL object ID string + if ( objid == NULL ) { + LOG( LOG_ERR, "Received a NULL Object ID\n" ); + return 0; + } + // check for NULL string target + if ( len && tgtstr == NULL ) { + LOG( LOG_ERR, "Receieved a NULL tgtstr value w/ non-zero len\n" ); + return 0; + } + // find the namespace path in the objstr + char* objstr = strdup( objid); + char* parse = objstr; + char* nspath = NULL; + while ( *parse != '\0' ) { + // 2 #'s next to each other means the namespace has been found in the object ID + if ( *parse == '#' && *(parse+1) == '#' ) { nspath = parse+1; } + if ( *parse == '#' ) { *parse = '/'; } + // if '|' is found after nspath has been assigned -> end of streamid + if ( *parse == '|' && nspath ) { + *parse = '\0'; + break; + } + parse++; + } + // verify that a namespace path was found + if ( nspath == NULL ) { + LOG( LOG_ERR, "Failed to find a namespace path for object ID %s\n", objid ); + free( objstr ); + return 0; + } + // need to set the "real" end of the namespace path + char* nspathend = strrchr( nspath, '/'); + if ( nspathend == NULL ) { + LOG( LOG_ERR, "Failed to find the end of the namespace path for stream ID %s\n", nspath ); + free( objstr ); + return 0; + } + *nspathend = '\0'; + // assign nspath to the buffer + size_t retval = snprintf( tgtstr, len, "%s", nspath ); + free( objstr ); + return retval; +} + + /** * Free allocated memory for internal ctag and streamid fields within an ftag. * NOTE: this function does not free the ftag itself since this function cannot @@ -818,21 +877,21 @@ size_t ftag_datatgt( const FTAG* ftag, char* tgtstr, size_t len ) { * @param FTAG* ftag: the ftag whose ctag and streamid fields will be freed. */ void ftag_cleanup(FTAG* ftag) { - if (ftag == NULL) { - LOG( LOG_ERR, "Received a NULL FTAG reference\n" ); - } - - if (ftag->ctag == NULL) { - LOG( LOG_ERR, "FTAG ctag field is NULL--skipping free\n" ); - } else { - free(ftag->ctag); - } - - if (ftag->streamid == NULL) { - LOG( LOG_ERR, "FTAG streamid field is NULL--skipping free\n" ); - } else { - free(ftag->streamid); - } + if (ftag == NULL) { + LOG( LOG_ERR, "Received a NULL FTAG reference\n" ); + } + + if (ftag->ctag == NULL) { + LOG( LOG_ERR, "FTAG ctag field is NULL--skipping free\n" ); + } else { + free(ftag->ctag); + } + // check for NULL stream ID + if (ftag->streamid == NULL) { + LOG( LOG_ERR, "FTAG streamid field is NULL--skipping free\n" ); + } else { + free(ftag->streamid); + } } diff --git a/src/tagging/tagging.h b/src/tagging/tagging.h index 0255130b..81290e98 100644 --- a/src/tagging/tagging.h +++ b/src/tagging/tagging.h @@ -212,6 +212,26 @@ ssize_t ftag_metainfo( const char* fileid, char* entrytype ); */ size_t ftag_datatgt( const FTAG* ftag, char* tgtstr, size_t len ); +/** + * Parses a given object ID string, and populates the given string buffer with the + * namespace associated with the object ID in a path format (i.e. /namespace). + * @param const char *objid : String containing the object ID + * @param char* tgtstr : String buffer to be populated with the namespace path + * @param size_t len : Byte length of the target buffer + * @return size_t : Length of the produced string ( excluding NULL-terminator ), or zero if + * an error occurred. + * NOTE -- if this value is >= the length of the provided buffer, this + * indicates that insufficint buffer space was provided and the resulting + * output string was truncated. + */ +size_t ftag_nspath( const char* objid, char* tgtstr, size_t len ); + +/** + * Free allocated memory for internal ctag and streamid fields within an ftag. + * NOTE: this function does not free the ftag itself since this function cannot + * make assumptions about whether an ftag is heap-allocated or on the stack. + * @param FTAG* ftag: the ftag whose ctag and streamid fields will be freed. + */ void ftag_cleanup(FTAG* ftag); // MARFS REBUILD TAG -- attached to rebuild marker files, providing rebuild info