Skip to content

Commit

Permalink
refactor(github): refactor the github module of the data pipeline (#61)
Browse files Browse the repository at this point in the history
- [x] implement modules for the github module contexts;
  • Loading branch information
rfprod authored Mar 2, 2024
1 parent 6568807 commit b341715
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 183 deletions.
188 changes: 25 additions & 163 deletions src/data_pipeline/github/mod.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,16 @@
//! GitHub module for the data pipeline.
use colored::Colorize;
use octorust::{
auth::Credentials,
types::{Order, RepoSearchResultItem, WorkflowRun},
types::{SearchReposSort, WorkflowRunStatus},
Client,
};
use std::env::{self};
use octorust::{types::Order, types::SearchReposSort};

pub use self::repositories::ReposFetchResult;
pub use self::workflows::WorkflowRunsFetchResult;

/// Custom result type for fetch results.
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

mod rate_limit_handler;

/// GitHub repos fetch result.
pub struct ReposFetchResult {
pub items: Vec<RepoSearchResultItem>,
pub total: i64,
pub retry: bool,
}

/// GitHub repo workflow runs fetch result.
pub struct WorkflowRunsFetchResult {
pub items: Vec<WorkflowRun>,
pub total: i64,
pub retry: bool,
}
mod repositories;
mod workflows;

pub struct DataPipelineGitHubFsConfiguration {
pub repos_output: String,
Expand All @@ -40,6 +24,8 @@ pub fn main() -> DataPipelineGitHub {

pub struct DataPipelineGitHub {
pub configuration: DataPipelineGitHubFsConfiguration,
repos_ctx: repositories::DataPipelineGitHubRepos,
workflows_ctx: workflows::DataPipelineGitHubWorkflows,
}

impl DataPipelineGitHub {
Expand All @@ -49,7 +35,17 @@ impl DataPipelineGitHub {
repos_output: String::from("/.data/output/github/repos"),
workflows_output: String::from("/.data/output/github/workflows"),
};
DataPipelineGitHub { configuration }
let repos_ctx = repositories::DataPipelineGitHubRepos {
rate_limit_handler: rate_limit_handler::GitHubRateLimitHandler,
};
let workflows_ctx = workflows::DataPipelineGitHubWorkflows {
rate_limit_handler: rate_limit_handler::GitHubRateLimitHandler,
};
DataPipelineGitHub {
configuration,
repos_ctx,
workflows_ctx,
}
}

/// GitHub repositories request.
Expand All @@ -61,70 +57,9 @@ impl DataPipelineGitHub {
per_page: i64,
page: i64,
) -> Result<ReposFetchResult> {
let token_env = env::var("GITHUB_TOKEN");
let token = match token_env.unwrap().trim().parse::<String>() {
Ok(value) => value,
Err(_) => String::new(),
};

let github = Client::new(String::from("user-agent-name"), Credentials::Token(token));

let mut retry: bool = false;

let client = github.unwrap();
let search = client.search();
let result = search.repos(q, sort, order, per_page, page).await;
let raw_res = match result {
Ok(res) => Ok(res),
Err(error) => {
println!(
"\n{}: {:?}",
"There was an error getting data from GitHub".red(),
error
);

let wait_timeout = rate_limit_handler::error_handler(error);
if wait_timeout > 0 {
retry = true;
rate_limit_handler::sleep_for(wait_timeout);
}

Err(())
}
};

if retry {
let result = ReposFetchResult {
items: Vec::<RepoSearchResultItem>::new(),
total: 0,
retry: true,
};
return Ok(result);
}

let res = raw_res.unwrap();
let body = res.body.to_owned();
let items = body.items;
for item in items {
let name = item.full_name;
println!("\n{}: {}", "Data".cyan(), name);
}

println!("{}: {}", "Response".green(), res.status);
println!("{}: {:#?}\n", "Headers".green(), res.headers);
// println!("{}: {:#?}\n", "Body".green(), res.body);

println!("\n\n{}", "Done!".green().bold());

let items = res.body.items.to_owned();
let total = res.body.total_count.to_owned();

let result = ReposFetchResult {
items,
total,
retry: false,
};
Ok(result)
self.repos_ctx
.repos_request(q, sort, order, per_page, page)
.await
}

/// GitHub repositories request.
Expand All @@ -137,81 +72,8 @@ impl DataPipelineGitHub {
per_page: i64,
page: i64,
) -> Result<WorkflowRunsFetchResult> {
let token_env = env::var("GITHUB_TOKEN");
let token = match token_env.unwrap().trim().parse::<String>() {
Ok(value) => value,
Err(_) => String::new(),
};

let github = Client::new(String::from("user-agent-name"), Credentials::Token(token));

let mut retry: bool = false;

let client = github.unwrap();
let actions = client.actions();
let result = actions
.list_workflow_runs_for_repo(
owner,
repo,
"",
branch,
"",
WorkflowRunStatus::Noop,
per_page,
page,
created,
)
.await;
let raw_res = match result {
Ok(res) => Ok(res),
Err(error) => {
println!(
"\n{}: {:?}",
"There was an error getting data from GitHub".red(),
error
);

let wait_timeout = rate_limit_handler::error_handler(error);
if wait_timeout > 0 {
retry = true;
rate_limit_handler::sleep_for(wait_timeout);
}

Err(())
}
};

if retry {
let result = WorkflowRunsFetchResult {
items: Vec::<WorkflowRun>::new(),
total: 0,
retry: true,
};
return Ok(result);
}

let res = raw_res.unwrap();
let body = res.body.to_owned();
let items = body.workflow_runs;
for item in items {
let name = item.name;
println!("\n{}: {}", "Data".cyan(), name);
}

println!("{}: {}", "Response".green(), res.status);
println!("{}: {:#?}\n", "Headers".green(), res.headers);
// println!("{}: {:#?}\n", "Body".green(), res.body);

println!("\n\n{}", "Done!".green().bold());

let items = res.body.workflow_runs.to_owned();
let total = res.body.total_count.to_owned();

let result = WorkflowRunsFetchResult {
items,
total,
retry: false,
};
Ok(result)
self.workflows_ctx
.workflow_runs_request(owner, repo, branch, created, per_page, page)
.await
}
}
23 changes: 3 additions & 20 deletions src/data_pipeline/github/rate_limit_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,11 @@ use colored::Colorize;
use octorust::ClientError;
use std::process::Command;

/// Sleep for N seconds defined by the `wait_timeout` value.
pub fn sleep_for(wait_timeout: i64) {
let p = GitHubRateLimitHandler::new();
p.sleep_for(wait_timeout)
}

/// Process GitHub API `ClientError` and derive the value of the `wait_timeout` variable.
pub fn error_handler(error: ClientError) -> i64 {
let p = GitHubRateLimitHandler::new();
p.error_handler(error)
}

struct GitHubRateLimitHandler;
pub struct GitHubRateLimitHandler;

impl GitHubRateLimitHandler {
/// Program constructor.
fn new() -> GitHubRateLimitHandler {
GitHubRateLimitHandler
}

/// Sleep for N seconds defined by the `wait_timeout` value.
fn sleep_for(&self, wait_timeout: i64) {
pub fn sleep_for(&self, wait_timeout: i64) {
match Command::new("sleep").arg(wait_timeout.to_string()).spawn() {
Ok(mut child) => {
println!("\n{}", "Can sleep".bold().green());
Expand All @@ -45,7 +28,7 @@ impl GitHubRateLimitHandler {
}

/// Process GitHub API `ClientError` and derive the value of the `wait_timeout` variable.
fn error_handler(&self, error: ClientError) -> i64 {
pub fn error_handler(&self, error: ClientError) -> i64 {
let err = error.to_string();

let rate_limit_regx =
Expand Down
103 changes: 103 additions & 0 deletions src/data_pipeline/github/repositories/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
//! GitHubRepos submodule for the data pipeline.
use colored::Colorize;
use octorust::{
auth::Credentials,
types::SearchReposSort,
types::{Order, RepoSearchResultItem},
Client,
};
use std::env::{self};

use super::rate_limit_handler::GitHubRateLimitHandler;

/// Custom result type for fetch results.
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

/// GitHub repos fetch result.
pub struct ReposFetchResult {
pub items: Vec<RepoSearchResultItem>,
pub total: i64,
pub retry: bool,
}

pub struct DataPipelineGitHubRepos {
pub rate_limit_handler: GitHubRateLimitHandler,
}

impl DataPipelineGitHubRepos {
/// GitHub repositories request.
pub async fn repos_request(
&self,
q: &str,
sort: SearchReposSort,
order: Order,
per_page: i64,
page: i64,
) -> Result<ReposFetchResult> {
let token_env = env::var("GITHUB_TOKEN");
let token = match token_env.unwrap().trim().parse::<String>() {
Ok(value) => value,
Err(_) => String::new(),
};

let github = Client::new(String::from("user-agent-name"), Credentials::Token(token));

let mut retry: bool = false;

let client = github.unwrap();
let search = client.search();
let result = search.repos(q, sort, order, per_page, page).await;
let raw_res = match result {
Ok(res) => Ok(res),
Err(error) => {
println!(
"\n{}: {:?}",
"There was an error getting data from GitHub".red(),
error
);

let wait_timeout = self.rate_limit_handler.error_handler(error);
if wait_timeout > 0 {
retry = true;
self.rate_limit_handler.sleep_for(wait_timeout);
}

Err(())
}
};

if retry {
let result = ReposFetchResult {
items: Vec::<RepoSearchResultItem>::new(),
total: 0,
retry: true,
};
return Ok(result);
}

let res = raw_res.unwrap();
let body = res.body.to_owned();
let items = body.items;
for item in items {
let name = item.full_name;
println!("\n{}: {}", "Data".cyan(), name);
}

println!("{}: {}", "Response".green(), res.status);
println!("{}: {:#?}\n", "Headers".green(), res.headers);
// println!("{}: {:#?}\n", "Body".green(), res.body);

println!("\n\n{}", "Done!".green().bold());

let items = res.body.items.to_owned();
let total = res.body.total_count.to_owned();

let result = ReposFetchResult {
items,
total,
retry: false,
};
Ok(result)
}
}
Loading

0 comments on commit b341715

Please sign in to comment.