Skip to content

Commit

Permalink
feat(mongo): add mongo support to the data pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
rfprod committed Oct 27, 2023
1 parent 9afa8aa commit 882e5d1
Show file tree
Hide file tree
Showing 10 changed files with 1,049 additions and 159 deletions.
921 changes: 782 additions & 139 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ ciborium = "0.2.0"
octorust = "0.7.0"
serde_json = "1.0.107"
regex = "1.10.2"
mongodb = { version = "2.7.0", default-features = false, features = ["tokio-sync"] }

[dev-dependencies]

Expand Down
2 changes: 1 addition & 1 deletion src/calculator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl Calculator {
program
}

/// Initializes the calculator.
/// Initializes the program.
fn init(&mut self) {
println!("\n{}", "Calculator initialized.".blue().bold());

Expand Down
10 changes: 5 additions & 5 deletions src/data_pipeline/artifact/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub fn main(context_arg: Option<String>) {
DataPipelineArtifact::new(context_arg);
}

/// Supported contextx.
/// Supported contexts.
type Contexts<'a> = [&'a str; 2];

struct DataPipelineArtifact;
Expand All @@ -24,7 +24,7 @@ impl DataPipelineArtifact {
program
}

/// Initializes the data pipeline.
/// Initializes the program.
fn init(&mut self, context: Option<String>) {
println!("\n{}", "DataPipelineArtifact initialized.".blue().bold());

Expand Down Expand Up @@ -87,7 +87,7 @@ impl DataPipelineArtifact {
let output_path = base_path + "/github-repos.tar.gz";

Command::new("tar")
.args(["-czf", &output_path, &source_path])
.args(["-czf", &output_path, source_path])
.output()
.expect("Failed to create the artifact");

Expand Down Expand Up @@ -138,7 +138,7 @@ impl DataPipelineArtifact {
);
let base_path = cwd.display().to_string() + "/.data/artifact/github/";
let artifact_path = base_path.to_owned() + "github-repos.tar.gz";
let encrypted_artifact_path = base_path.to_owned() + "github-repos.tar.gz.gpg";
let encrypted_artifact_path = base_path + "github-repos.tar.gz.gpg";

let gpg_passphrase_env = env::var("GPG_PASSPHRASE");
let gpg_passphrase = match gpg_passphrase_env.unwrap().trim().parse::<String>() {
Expand Down Expand Up @@ -169,7 +169,7 @@ impl DataPipelineArtifact {
let output_path = "./";

Command::new("tar")
.args(["-xzf", &artifact_path, &output_path])
.args(["-xzf", &artifact_path, output_path])
.output()
.expect("Failed to unpack the artifact");

Expand Down
101 changes: 91 additions & 10 deletions src/data_pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::{
};

mod artifact;
mod mongo;

type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

Expand All @@ -26,6 +27,7 @@ pub fn main() {
/// Input arguments of the program.
struct InuputArguments {
context: Option<String>,
collection: Option<String>,
search_term: Option<String>,
}

Expand All @@ -36,9 +38,12 @@ struct FetchResult {
retry: bool,
}

/// Supported contextx.
/// Supported contexts.
type Contexts<'a> = [&'a str; 2];

/// Supported collections.
type Collections<'a> = [&'a str; 1];

struct DataPipeline;

impl DataPipeline {
Expand All @@ -49,7 +54,7 @@ impl DataPipeline {
program
}

/// Initializes the data pipeline.
/// Initializes the program.
fn init(&mut self) {
println!("\n{}", "DataPipeline initialized.".blue().bold());

Expand All @@ -65,9 +70,20 @@ impl DataPipeline {

let context = contexts[context_index];

let collections: Collections = ["repos"];

let collection_arg = args.collection.to_owned();

let collection_index = self.choose_collection(collections, collection_arg);

let collection = collections[collection_index];

match context_index {
0 => self.execute(args.search_term, context.to_owned()),
1 => artifact::main(args.context),
0 => self.execute(args.search_term, context.to_owned(), collection.to_owned()),
1 => {
artifact::main(args.context);
mongo::main(args.collection);
}
_ => {
println!(
"\n{}",
Expand All @@ -87,7 +103,8 @@ impl DataPipeline {

InuputArguments {
context: args.nth(2),
search_term: args.nth(3),
collection: args.nth(3),
search_term: args.nth(4),
}
}

Expand Down Expand Up @@ -156,14 +173,79 @@ impl DataPipeline {
context_index
}

/// Prompts input from the user, processes it, and returns the index of the selected context.
fn choose_collection(&self, collections: Collections, collection_arg: Option<String>) -> usize {
let is_some = collection_arg.is_some();
let mut collection_arg_input = if is_some {
match collection_arg.unwrap().trim().parse::<String>() {
Ok(value) => value,
Err(_) => String::new(),
}
} else {
String::new()
};

loop {
let mut collection_input = String::new();

if collection_arg_input.is_empty() {
self.print_collection_instructions(collections);

io::stdin()
.read_line(&mut collection_input)
.expect("Failed to read line");
} else {
collection_input = collection_arg_input.to_string();
}

let collection_index = match collection_input.trim().parse::<usize>() {
Ok(num) => num,
Err(_) => continue,
};

match collection_index.cmp(&collections.len()) {
Ordering::Less => {
return self.select_collection(collections, collection_index);
}
Ordering::Greater => collection_arg_input = self.reset_input_arg(),
Ordering::Equal => collection_arg_input = self.reset_input_arg(),
}
}
}

/// Prints the collection selection instructions.
fn print_collection_instructions(&self, collections: Collections) {
println!("\n{}", "Available collections:".yellow().bold());

let max_i = collections.len() - 1;
let mut i = 0;
while i <= max_i {
println!("{}: {}", i, collections[i]);
i += 1;
}

println!(
"\n{}, [0-{}]:",
"Please select a collection".yellow().bold(),
max_i
);
}

/// Prints selected collection and returns the collection index.
fn select_collection(&self, collections: Collections, collection_index: usize) -> usize {
let collection = collections[collection_index];
println!("You selected: {}", collection);
collection_index
}

/// Resets the input argument to start over if the program does not exist.
fn reset_input_arg(&self) -> String {
println!("\n{}", "The subprogram does not exist.".red());
String::new()
}

/// The data pipeline program for the provided search_term.
fn execute(&mut self, search_term_arg: Option<String>, context: String) {
fn execute(&mut self, search_term_arg: Option<String>, context: String, collection: String) {
let is_some = search_term_arg.is_some();
let search_term_arg_input = if is_some {
match search_term_arg.unwrap().trim().parse::<String>() {
Expand Down Expand Up @@ -256,8 +338,8 @@ impl DataPipeline {
continue;
} else {
println!("\n{}", "Download complete".green().bold());
// self.create_artifact();
artifact::main(Some(context));
mongo::main(Some(collection));
break;
}
}
Expand Down Expand Up @@ -309,7 +391,7 @@ impl DataPipeline {
});
let wait_timeout = match captures.as_deref() {
Some(["Rate limited for the next", x, "seconds"]) => {
let x: i64 = x.parse().expect("can't parse number");
let x: i64 = x.parse().expect("Can't parse number");
x
}
_ => panic!("Unknown Command: {}", &err),
Expand Down Expand Up @@ -400,8 +482,7 @@ impl DataPipeline {

let env_path = cwd.display().to_string() + "/.env";
let env_path_str = env_path.as_str();
let contents =
fs::read_to_string(env_path_str).expect("Should have been able to read the file");
let contents = fs::read_to_string(env_path_str).expect("Can't read file");

println!("Text content:\n{contents}");

Expand Down
Loading

0 comments on commit 882e5d1

Please sign in to comment.