Skip to content

Commit

Permalink
feat: add main database, job model, basic api implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Aug 6, 2024
1 parent a35215c commit 61e0f8e
Show file tree
Hide file tree
Showing 26 changed files with 526 additions and 203 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ async-trait = "0.1.81"
futures = "0.3.30"
clap = {version="4.5.7", features=["derive"]}
actix-web = "4.8.0"
awc="3.5.0"

[package]
name = "jz_action"
Expand All @@ -56,6 +57,7 @@ tokio-stream = {workspace = true}
tokio-util= {workspace = true}

actix-web = {workspace = true}
awc= {workspace = true}
clap = {workspace = true}
uuid = {workspace = true}
anyhow = {workspace = true}
Expand Down
11 changes: 2 additions & 9 deletions crates/compute_unit_runner/src/bin/compute_unit_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ use compute_unit_runner::{
state_controller::StateController,
};
use jz_action::{
dbrepo::{
MongoConfig,
MongoRunDbRepo,
},
dbrepo::MongoRunDbRepo,
utils::StdIntoAnyhowResult,
};

Expand Down Expand Up @@ -62,9 +59,6 @@ struct Args {
#[arg(short, long)]
mongo_url: String,

#[arg(short, long)]
database: String,

#[arg(short, long, default_value = "/unix_socket/compute_unit_runner_d")]
unix_socket_addr: String,
}
Expand All @@ -84,8 +78,7 @@ async fn main() -> Result<()> {
args.tmp_path.expect("compute node only support disk cache"),
));

let db_repo =
MongoRunDbRepo::new(MongoConfig::new(args.mongo_url.clone()), &args.database).await?;
let db_repo = MongoRunDbRepo::new(&args.mongo_url).await?;

let program = MediaDataTracker::new(db_repo.clone(), &args.node_name, fs_cache, args.buf_size);

Expand Down
11 changes: 2 additions & 9 deletions crates/dp_runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ mod state_controller;
mod stream;

use jz_action::{
dbrepo::{
MongoConfig,
MongoRunDbRepo,
},
dbrepo::MongoRunDbRepo,
network::datatransfer::data_stream_server::DataStreamServer,
utils::StdIntoAnyhowResult,
};
Expand Down Expand Up @@ -58,9 +55,6 @@ struct Args {
#[arg(short, long)]
mongo_url: String,

#[arg(short, long)]
database: String,

#[arg(short, long, default_value = "30")]
buf_size: usize,

Expand All @@ -79,8 +73,7 @@ async fn main() -> Result<()> {
let mut join_set = JoinSet::new();
let token = CancellationToken::new();

let db_repo =
MongoRunDbRepo::new(MongoConfig::new(args.mongo_url.clone()), &args.database).await?;
let db_repo = MongoRunDbRepo::new(&args.mongo_url).await?;

let fs_cache: Arc<dyn FileCache> = match args.tmp_path {
Some(path) => Arc::new(FSCache::new(path)),
Expand Down
4 changes: 2 additions & 2 deletions nodes/jz_reader/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ async fn main() -> Result<()> {
{
let shutdown_tx = shutdown_tx.clone();
let _ = tokio::spawn(async move {
if let Err(e) = read_jz_fs(args).await {
let _ = shutdown_tx.send(Err(anyhow!("read jz fs {e}"))).await;
if let Err(err) = read_jz_fs(args).await {
let _ = shutdown_tx.send(Err(anyhow!("read jz fs {err}"))).await;
}
});
}
Expand Down
134 changes: 134 additions & 0 deletions src/api/client/job.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use crate::{
core::db::{Job, JobUpdateInfo},
utils::StdIntoAnyhowResult,
};
use anyhow::{
anyhow,
Result,
};
use awc::{
http::StatusCode,
Client,
};
use mongodb::bson::oid::ObjectId;
#[derive(Clone)]
pub struct JzFlowClient {
client: Client,
base_uri: String,
}

impl JzFlowClient {
pub fn new(base_uri: &str) -> Result<Self> {
let client = Client::builder()
.add_default_header(("Content-Type", "application/json"))
.finish();
let base_uri = base_uri.to_string() + "/api/v1";
Ok(JzFlowClient { client, base_uri })
}

fn job(&self) -> JobClient {
JobClient {
client: self.client.clone(),
base_uri: self.base_uri.clone(),
}
}
}

pub struct JobClient {
client: Client,
base_uri: String,
}

impl JobClient {
pub async fn create(&self, job: &Job) -> Result<Job> {
let mut resp = self
.client
.post(self.base_uri.clone() + "/job")
.send_json(&job)
.await
.anyhow()?;

if !resp.status().is_success() {
let err_msg = resp
.body()
.await
.anyhow()
.and_then(|body| String::from_utf8(body.into()).anyhow())?;
return Err(anyhow!("request job {} reason {err_msg}", resp.status()));
}

resp.body()
.await
.anyhow()
.and_then(|body| serde_json::from_slice(&body).anyhow())
.anyhow()
}

pub async fn get(&self, job_id: &ObjectId) -> Result<Option<Job>> {
let mut resp = self
.client
.get(self.base_uri.clone() + "/job/" + job_id.to_hex().as_str())
.send()
.await
.anyhow()?;

if resp.status() == StatusCode::NOT_FOUND {
return Ok(None);
}

if !resp.status().is_success() {
let err_msg = resp
.body()
.await
.anyhow()
.and_then(|body| String::from_utf8(body.into()).anyhow())?;
return Err(anyhow!("request job {} reason {err_msg}", resp.status()));
}

resp.body()
.await
.anyhow()
.and_then(|body| serde_json::from_slice(&body).anyhow())
.anyhow()
}

pub async fn delete(&self, job_id: &ObjectId) -> Result<()> {
let mut resp = self
.client
.delete(self.base_uri.clone() + "/job/" + job_id.to_hex().as_str())
.send()
.await
.anyhow()?;

if !resp.status().is_success() {
let err_msg = resp
.body()
.await
.anyhow()
.and_then(|body| String::from_utf8(body.into()).anyhow())?;
return Err(anyhow!("request job {} reason {err_msg}", resp.status()));
}

Ok(())
}

pub async fn update(&self, job_id: &ObjectId, update_info: &JobUpdateInfo) -> Result<()> {
let mut resp = self
.client
.post(self.base_uri.clone() + "/job/" + job_id.to_hex().as_str())
.send_json(update_info)
.await
.anyhow()?;

if !resp.status().is_success() {
let err_msg = resp
.body()
.await
.anyhow()
.and_then(|body| String::from_utf8(body.into()).anyhow())?;
return Err(anyhow!("request job {} reason {err_msg}", resp.status()));
}

Ok(())
}
}
1 change: 1 addition & 0 deletions src/api/client/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod job;
70 changes: 54 additions & 16 deletions src/api/job_api.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,98 @@
use crate::core::db::{
DBConfig, Job, JobDbRepo, JobRepo, MainDbRepo
use crate::{
core::db::{
Job,
JobDbRepo,
JobRepo,
JobState,
JobUpdateInfo,
MainDbRepo,
},
driver::Driver,
};
use actix_web::{
web,
HttpResponse,
};
use mongodb::bson::oid::ObjectId;
use serde::Serialize;
use serde::{
Deserialize,
Serialize,
};

//TODO change to use route macro after https://github.com/actix/actix-web/issues/2866 resolved
async fn create<'reg, MAINR, JOBR, DBC>(db_repo: web::Data<MAINR>, data: web::Json<Job>) -> HttpResponse
async fn create<D, MAINR, JOBR>(db_repo: web::Data<MAINR>, data: web::Json<Job>) -> HttpResponse
where
D: Driver,
MAINR: MainDbRepo,
JOBR: JobDbRepo,
DBC: Clone + Serialize + Send + Sync + DBConfig + 'static,
{
match db_repo.insert(&data.0).await {
Ok(_) => HttpResponse::Ok().finish(),
Ok(inserted_result) => HttpResponse::Ok().json(&inserted_result),
Err(err) => HttpResponse::InternalServerError().body(err.to_string()),
}
}

async fn get<'reg, MAINR, JOBR, DBC>(db_repo: web::Data<MAINR>, path: web::Path<ObjectId>) -> HttpResponse
async fn get<D, MAINR, JOBR>(db_repo: web::Data<MAINR>, path: web::Path<ObjectId>) -> HttpResponse
where
D: Driver,
MAINR: MainDbRepo,
JOBR: JobDbRepo,
DBC: Clone + Serialize + Send + Sync + DBConfig + 'static,
{
match db_repo.get(&path.into_inner()).await {
Ok(_) => HttpResponse::Ok().finish(),
Ok(Some(inserted_result)) => HttpResponse::Ok().json(&inserted_result),
Ok(None) => HttpResponse::NotFound().finish(),
Err(err) => HttpResponse::InternalServerError().body(err.to_string()),
}
}

async fn delete<'reg, MAINR, JOBR, DBC>(db_repo: web::Data<MAINR>, path: web::Path<ObjectId>) -> HttpResponse
async fn delete<D, MAINR, JOBR>(
db_repo: web::Data<MAINR>,
path: web::Path<ObjectId>,
) -> HttpResponse
where
D: Driver,
MAINR: MainDbRepo,
JOBR: JobDbRepo,
DBC: Clone + Serialize + Send + Sync + DBConfig + 'static,
{
match db_repo.delete(&path.into_inner()).await {
Ok(_) => HttpResponse::Ok().finish(),
Err(err) => HttpResponse::InternalServerError().body(err.to_string()),
}
}

pub(super) fn job_route_config<'reg, MAINR, JOBR, DBC>(cfg: &mut web::ServiceConfig)
async fn update<D, MAINR, JOBR>(
db_repo: web::Data<MAINR>,
path: web::Path<ObjectId>,
query: web::Query<JobUpdateInfo>,
) -> HttpResponse
where
D: Driver,
MAINR: MainDbRepo,
JOBR: JobDbRepo,
{
match db_repo
.update(&path.into_inner(), &query.into_inner())
.await
{
Ok(_) => HttpResponse::Ok().finish(),
Err(err) => HttpResponse::InternalServerError().body(err.to_string()),
}
}

pub(super) fn job_route_config<D, MAINR, JOBR>(cfg: &mut web::ServiceConfig)
where
D: Driver,
MAINR: MainDbRepo,
JOBR: JobDbRepo,
DBC: Clone + Serialize + Send + Sync + DBConfig + 'static,
{
cfg.service(
web::resource("/job")
.route(web::post().to(create::<MAINR, JOBR, DBC>))
.route(web::delete().to(delete::<MAINR, JOBR, DBC>)),
.route(web::post().to(create::<D, MAINR, JOBR>))
.route(web::delete().to(delete::<D, MAINR, JOBR>)),
)
.service(web::resource("/job/{id}").route(web::get().to(get::<MAINR, JOBR, DBC>)));
.service(
web::resource("/job/{id}")
.route(web::get().to(get::<D, MAINR, JOBR>))
.route(web::post().to(update::<D, MAINR, JOBR>)),
);
}
1 change: 1 addition & 0 deletions src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod client;
pub mod server;

mod job_api;
Loading

0 comments on commit 61e0f8e

Please sign in to comment.