Skip to content

Commit

Permalink
perf(pipeline): implement mongo update for the data pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
rfprod committed Oct 28, 2023
1 parent b89881f commit 3b291f8
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 61 deletions.
42 changes: 30 additions & 12 deletions src/data_pipeline/artifact/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,13 @@ impl<'a> DataPipelineArtifact<'a> {
Err(_) => String::new(),
};

Command::new("gpg")
println!(
"\n{}:\n{:?}",
"Encrypted artifact path".green().bold(),
encrypted_artifact_path
);

match Command::new("gpg")
.args([
"--batch",
"--yes",
Expand All @@ -162,25 +168,37 @@ impl<'a> DataPipelineArtifact<'a> {
&encrypted_artifact_path,
])
.output()
.expect("Failed to decrypt the artifact");
{
Ok(output) => {
println!(
"{}\n{:?}",
"Decrypt artifact success".bold().green(),
output
);
}
Err(error) => {
println!("{}\n{:?}", "Decrypt artifact error".bold().red(), error);
}
}

println!(
"\n{}:\n{:?}",
"Decrypted the archive".green().bold(),
encrypted_artifact_path
"Unpacked the archive".green().bold(),
artifact_path
);

let output_path = "./";

Command::new("tar")
match Command::new("tar")
.args(["-xzf", &artifact_path, output_path])
.output()
.expect("Failed to unpack the artifact");

println!(
"\n{}:\n{:?}",
"Unpacked the archive".green().bold(),
artifact_path
);
{
Ok(output) => {
println!("{}\n{:?}", "Unpack artifact success".bold().green(), output);
}
Err(error) => {
println!("{}\n{:?}", "Unpack artifact error".bold().red(), error);
}
}
}
}
171 changes: 122 additions & 49 deletions src/data_pipeline/mongo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::{
};

use colored::Colorize;
use mongodb::sync::Client;
use mongodb::sync::{Client, Database};
use mongodb::{bson::doc, options::FindOneAndUpdateOptions};
use octorust::types::Repository;

/// The entry point of the program.
Expand Down Expand Up @@ -42,7 +43,7 @@ impl<'a> DataPipelineMongoDb<'a> {
match collection_index {
0 => {
let collection = self.collections[collection_index];
self.create_collection(collection);
self.execute(collection);
}
_ => {
println!(
Expand Down Expand Up @@ -78,9 +79,45 @@ impl<'a> DataPipelineMongoDb<'a> {
index
}

/// Creates a collection and inserts data.
fn create_collection(&self, collection: &str) {
println!("\n{} {:?}", "Creating collection".cyan().bold(), collection);
/// Connects to the MongoDB instance and returns the database reference.
fn connect(&self) -> Database {
let connection_url_env = env::var("MONGODB_CONNECTION_STRING");
let connection_url = match connection_url_env.unwrap().trim().parse::<String>() {
Ok(value) => value,
Err(_) => String::new(),
};

let client_connection = Client::with_uri_str(&connection_url);
let client = match client_connection {
Ok(value) => value,
Err(_) => {
panic!("\nUnable to connect, connection URL: {}", connection_url);
}
};

let db_name_env = env::var("MONGODB_DATABASE");
let db_name = match db_name_env.unwrap().trim().parse::<String>() {
Ok(value) => value,
Err(_) => String::new(),
};

let db = client.database(db_name.as_str());

match db.list_collection_names(None) {
Ok(value) => {
for (_i, col) in value.iter().enumerate() {
println!("\n{}: {:?}", "Collection".bold().cyan(), col);
}
}
Err(err) => {
panic!("\nUnable to list collection names\n {:?}", err);
}
};
db
}

/// Collects documents for further processing.
fn collect_doccuments(&self) -> Vec<Vec<Repository>> {
let cwd = env::current_dir().unwrap();
println!(
"\n{}:\n{:?}",
Expand All @@ -94,7 +131,7 @@ impl<'a> DataPipelineMongoDb<'a> {
panic!("\n{} {:?}", "Can't read directory".red().bold(), base_path);
};

let mut docs: Vec<Repository> = vec![];
let mut docs: Vec<Vec<Repository>> = vec![];

let dir_entries = dir_content.enumerate();
for (_i, dir_entries_result) in dir_entries {
Expand All @@ -109,78 +146,114 @@ impl<'a> DataPipelineMongoDb<'a> {
};

let parse_result = serde_json::from_str::<Vec<Repository>>(&file_content);
if let Ok(mut json) = parse_result {
docs.append(&mut json);
if let Ok(json) = parse_result {
docs.push(json);
} else {
println!("Error serializing JSON file: {:?}", dir_entry.path());
}
}
docs
}

let connection_url_env = env::var("MONGODB_CONNECTION_STRING");
let connection_url = match connection_url_env.unwrap().trim().parse::<String>() {
Ok(value) => value,
Err(_) => String::new(),
};

let client_connection = Client::with_uri_str(&connection_url);
let client = match client_connection {
Ok(value) => value,
Err(_) => {
panic!("Unable to connect uring {}", connection_url);
}
};

let db_name_env = env::var("MONGODB_DATABASE");
let db_name = match db_name_env.unwrap().trim().parse::<String>() {
Ok(value) => value,
Err(_) => String::new(),
};

let db = client.database(db_name.as_str());
fn execute(&self, collection: &str) {
let db = self.connect();

let mut exists = false;
match db.list_collection_names(None) {
Ok(value) => {
for (_i, col) in value.iter().enumerate() {
println!("{}", col);
if collection.eq(col) {
exists = true;
}
}
}
Err(err) => {
panic!("Unable to connect uring {}\n {:?}", connection_url, err);
panic!("\nUnable to list collection names\n {:?}", err);
}
};

if exists {
self.update_collection(collection);
} else {
self.create_collection(collection);
}
}

/// Creates a collection and inserts data.
fn create_collection(&self, collection: &str) {
println!("\n{} {:?}", "Creating collection".cyan().bold(), collection);

let docs: Vec<Vec<Repository>> = self.collect_doccuments();

let db = self.connect();

let collection_ref = db.collection::<Repository>(collection);

match collection_ref.drop(None) {
Ok(_) => {
println!("\n{}: {:?}", "Dropped collection".green(), collection);
println!("\n{}: {:?}", "Dropped".bold().green(), collection);
}
Err(err) => {
println!(
"\n{}: {:?}\n{:?}",
"Can't drop collection".bold().red(),
"Can't drop".bold().red(),
collection,
err
);
}
};

match collection_ref.insert_many(docs, None) {
Ok(_) => {
println!(
"\n{}: {:?}",
"Inserted documents, collection".green(),
collection
);
}
Err(err) => {
println!(
"\n{}: {:?}\n{:?}",
"Can't insert documents, collection".bold().red(),
collection,
err
);
for batch in docs {
match collection_ref.insert_many(batch, None) {
Ok(_) => {
println!("\n{}: {:?}", "Inserted in".green(), collection);
}
Err(err) => {
println!(
"\n{}: {:?}\n{:?}",
"Can't insert in".bold().red(),
collection,
err
);
}
};
}
}

/// Updates documents in the collection.
fn update_collection(&self, collection: &str) {
println!("\n{} {:?}", "Updating collection".cyan().bold(), collection);

let docs: Vec<Vec<Repository>> = self.collect_doccuments();

let db = self.connect();

let collection_ref = db.collection::<Repository>(collection);

for batch in docs {
for record in batch.iter().cloned() {
let url = &record.url;
let filter = doc! { "url": url };
let record_bson = mongodb::bson::to_bson(&record).unwrap();
if let mongodb::bson::Bson::Document(document) = record_bson {
let mut options = FindOneAndUpdateOptions::default();
options.upsert = Some(true);
let update = doc! { "$set": document };
match collection_ref.find_one_and_update(filter, update, options) {
Ok(_) => {
println!("\n{}: {:?}", "Updated".bold().green(), collection);
}
Err(err) => {
println!(
"\n{}: {:?}\n{:?}",
"Can't update".bold().red(),
collection,
err
);
}
}
}
}
};
}
}
}

0 comments on commit 3b291f8

Please sign in to comment.