Skip to content

Commit

Permalink
Added logic to a "reverse lookup" - given an object ID, return all fi…
Browse files Browse the repository at this point in the history
…les in that object in terms of the user path. Added logic to keep track of the user directory a given task/thread is running in.
  • Loading branch information
dsherril committed Nov 27, 2024
1 parent 38f6c22 commit 1cf7bb6
Show file tree
Hide file tree
Showing 7 changed files with 503 additions and 49 deletions.
112 changes: 100 additions & 12 deletions src/mustang/mustang_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ size_t id_cache_capacity;

/**
* This private routine encapulates the logic used to generete the thread
* tasks from the arguments given on the command line. While the logic itself
* tasks from path arguments given on the command line. While the logic itself
* could be part of the body of main(), having a separate routine helps
* focus on the creating of tasks.
* @param the_arg : the commandline argument to process
Expand All @@ -107,7 +107,7 @@ size_t id_cache_capacity;
* passed to the tasks
* @param task_queue* queue : Queue of available threads - passed to the tasks.
*/
void push_args2queue(char *the_arg, marfs_config* config, marfs_position* start_position, hashtable* output_table, pthread_mutex_t* hashtable_lock, task_queue* queue) {
void push_pathargs2queue(char *the_arg, marfs_config* config, marfs_position* start_position, hashtable* output_table, pthread_mutex_t* hashtable_lock, task_queue* queue) {
struct stat arg_statbuf;
int statcode = stat(the_arg, &arg_statbuf);
// Need to hold a "mutable" path for config_traverse() to modify as needed
Expand Down Expand Up @@ -177,14 +177,11 @@ void push_args2queue(char *the_arg, marfs_config* config, marfs_position* start_
return;
}

MDAL_DHANDLE task_dirhandle;

MDAL task_mdal = new_task_position->ns->prepo->metascheme.mdal;

// If new depth > 0 (guaranteed by previous logic), the target is a
// directory, so "place" new task within directory.
if (new_task_depth != 0) {
task_dirhandle = task_mdal->opendir(new_task_position->ctxt, next_basepath);
MDAL task_mdal = new_task_position->ns->prepo->metascheme.mdal;
MDAL_DHANDLE task_dirhandle = task_mdal->opendir(new_task_position->ctxt, next_basepath);

if (task_dirhandle == NULL) {
LOG(LOG_ERR, "Failed to open target directory \"%s\" (%s)\n", next_basepath, strerror(errno));
Expand Down Expand Up @@ -222,17 +219,17 @@ void push_args2queue(char *the_arg, marfs_config* config, marfs_position* start_
switch (new_task_depth) {
case 0:
// Namespace case. Enqueue a new namespace traversal task.
top_task = task_init(config, new_task_position, next_file, output_table, hashtable_lock, queue, &traverse_ns);
top_task = task_init(config, new_task_position, strdup(next_basepath), next_file, output_table, hashtable_lock, queue, &traverse_ns);
task_enqueue(queue, top_task);
LOG(LOG_DEBUG, "Created top-level namespace traversal task at basepath: \"%s\"\n", next_basepath);
break;
default:
// "Regular" (directory or file) case. Enqueue a new directory traversal task.
if (next_file) {
top_task = task_init(config, new_task_position, next_file, output_table, hashtable_lock, queue, &traverse_file);
top_task = task_init(config, new_task_position, strdup(next_basepath), next_file, output_table, hashtable_lock, queue, &traverse_file);
LOG(LOG_DEBUG,"Created task to get Object ID(s) for file \"%s\"\n", next_file);
} else {
top_task = task_init(config, new_task_position, next_file, output_table, hashtable_lock, queue, &traverse_dir);
top_task = task_init(config, new_task_position, strdup(next_basepath), next_file, output_table, hashtable_lock, queue, &traverse_dir);
LOG(LOG_DEBUG, "Created top-level directory traversal task at basepath: \"%s\"\n", next_basepath);
}
task_enqueue(queue, top_task);
Expand All @@ -244,6 +241,92 @@ void push_args2queue(char *the_arg, marfs_config* config, marfs_position* start_

}

/**
* This private routine encapulates the logic used to generete the thread
* tasks from object ID arguments given on the command line. While the logic itself
* could be part of the body of main(), having a separate routine helps
* focus on the creating of tasks.
*
* Like push_pathargs2queue(), this routine adds initial tasks to the task queue.
* However these tasks will only run in the namespaces designated by the object IDs.
* @param the_arg : the commandline argument to process
* @param marfs_position* start_position : the Start Position of directory in MarFS file system.
* Used to create the Task Position
* @param hashtable* output_table : Hash Table holding the file paths of files that are in
* object(s) passed to the tasks
* @param pthread_mutex_t* hashtable_lock : Lock to use when accessing output_table -
* passed to the tasks
* @param task_queue* queue : Queue of available threads - passed to the tasks.
*/
void push_objargs2queue(char* the_arg, marfs_config* config, marfs_position* start_position, hashtable* output_table, pthread_mutex_t* hashtable_lock, task_queue* queue) {
char* the_objid = strdup(the_arg);//holds the object ID of the files to look for in the namespace
char nsrootbuf[PATH_MAX]; //buffer to hold the namespace root path found in the object ID
char nsmntbuf[PATH_MAX]; //buffer to hold the actual MarFS user mount point
size_t nsrootlen = ftag_nspath(the_objid, nsrootbuf, PATH_MAX);//the length of the namespace root path string

// If namespace string is bigger than maxpath, then something wierd is happening.
// If it is zero then there was a problem parsing it out of the object ID
if (!nsrootlen || (nsrootlen >= PATH_MAX)) {
LOG(LOG_ERR, "Failed to extract namespace path from object ID arg \"%s\" (path length = %d)--skipping to next\n", the_arg, nsrootlen);
if (the_objid) free(the_objid);
return;
}
snprintf(nsmntbuf, PATH_MAX, "%s%s", config->mountpoint, nsrootbuf);

// Once we have the namespace root, need to postion ourselves in the user metadata tree
marfs_position* new_task_position = calloc(1, sizeof(marfs_position));

if (config_duplicateposition(start_position, new_task_position)) {
LOG(LOG_ERR, "Failed to duplicate parent position to new task--skipping to next\n");
if (the_objid) free(the_objid);
free(new_task_position);
return;
}

char* nsroot = strdup(nsmntbuf);//needs to be a maluable pointer to use later...
int new_task_depth = config_traverse(config, new_task_position, &nsroot, 0);

if (new_task_depth < 0) {
LOG(LOG_ERR, "Failed to traverse (got depth: %d)--skipping to next\n", new_task_depth);
if (the_objid) free(the_objid);
free(nsroot);
config_abandonposition(new_task_position);
free(new_task_position);
return;
}
// If new depth != 0, then the namespace root must not designate a valid namespace
if (new_task_depth > 0) {
LOG(LOG_ERR, "\"%s\" is NOT a valid namespace root (got depth: %d)--skipping to next\n", nsroot, new_task_depth);
if (the_objid) free(the_objid);
free(nsroot);
config_abandonposition(new_task_position);
free(new_task_position);
return;
}
// ... fortify the new position
if (config_fortifyposition(new_task_position)) {
LOG(LOG_ERR, "Failed to fortify new_task position after new_task traverse!\n");
if (the_objid) free(the_objid);
free(nsroot);
config_abandonposition(new_task_position);
free(new_task_position);
return;
}

// Tell the new task where it is (at the top of the namespace) by recording its
// depth in its state.
new_task_position->depth = new_task_depth;

// Now add the task to traverse the namespace, looking for files in the object to
// the task queue
mustang_task* top_task = task_init(config, new_task_position, strdup(nsmntbuf), the_objid, output_table, hashtable_lock, queue, &traverse_objns);
task_enqueue(queue, top_task);
LOG(LOG_DEBUG, "Created top-level namespace traversal task at namespace path: \"%s\" looking for files in object \"%s\"\n", nsmntbuf, the_objid);


return;
}

// Argument definition string for getopt(). The command line arguments are as
// follows:
// -t <max_threads> Maximum number of threads used by this process. This
Expand Down Expand Up @@ -488,7 +571,12 @@ int main(int argc, char** argv) {
// Parse each path argument, check them for validity, and pass along initial tasks
for (; patharg_idx < argc; patharg_idx++){
LOG(LOG_INFO, "Processing arg \"%s\"\n", argv[patharg_idx]);
push_args2queue(argv[patharg_idx], parent_config, &parent_position, output_table, &ht_lock, queue);
// If the arg has a "|" in it, then it is an object ID, not a path.
// Need to initilize the queue/task differently
if (strchr(argv[patharg_idx],'|'))
push_objargs2queue(argv[patharg_idx], parent_config, &parent_position, output_table, &ht_lock, queue);
else
push_pathargs2queue(argv[patharg_idx], parent_config, &parent_position, output_table, &ht_lock, queue);
}

pthread_mutex_lock(queue->lock);
Expand All @@ -506,7 +594,7 @@ int main(int argc, char** argv) {

// Once there are no tasks left in the queue to do, send workers sentinel (all-NULL) tasks so that they know to exit.
for (size_t i = 0; i < max_threads; i += 1) {
mustang_task* sentinel = task_init(NULL, NULL, NULL, NULL, NULL, NULL, NULL);
mustang_task* sentinel = task_init(NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
task_enqueue(queue, sentinel);
}

Expand Down
Loading

0 comments on commit 1cf7bb6

Please sign in to comment.