Skip to content

Commit

Permalink
Merge pull request #137 from samply/feature/exporter-status
Browse files Browse the repository at this point in the history
exporter query status task type
  • Loading branch information
enola-dkfz authored May 22, 2024
2 parents 50ab353 + 9a094b9 commit f73134e
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 27 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "focus"
version = "0.5.1"
version = "0.5.2"
edition = "2021"
license = "Apache-2.0"

Expand Down
2 changes: 1 addition & 1 deletion src/blaze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ pub async fn run_cql_query(library: &Value, measure: &Value) -> Result<String, F
let url: String = if let Ok(value) = get_json_field(&measure.to_string(), "url") {
value.to_string().replace('"', "")
} else {
return Err(FocusError::CQLQueryError());
return Err(FocusError::CQLQueryError);
};
debug!("Evaluating the Measure with canonical URL: {}", url);

Expand Down
9 changes: 6 additions & 3 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub enum FocusError {
#[error("FHIR Measure evaluation error in Blaze: {0}")]
MeasureEvaluationErrorBlaze(String),
#[error("CQL query error")]
CQLQueryError(),
CQLQueryError,
#[error("Unable to retrieve tasks from Beam: {0}")]
UnableToRetrieveTasksHttp(beam_lib::BeamError),
#[error("Unable to answer task: {0}")]
Expand All @@ -38,15 +38,18 @@ pub enum FocusError {
UnableToPostAst(reqwest::Error),
#[error("Unable to post Exporter query: {0}")]
UnableToPostExporterQuery(reqwest::Error),
#[error("Unable to get Exporter query status: {0}")]
UnableToGetExporterQueryStatus(reqwest::Error),
#[error("Exporter query error in Reqwest: {0}")]
ExporterQueryErrorReqwest(String),
#[error("AST Posting error in Reqwest: {0}")]
AstPostingErrorReqwest(String),
#[error("Invalid Header Value: {0}")]
InvalidHeaderValue(http::header::InvalidHeaderValue),
#[error("Missing Exporter Endpoint")]
MissingExporterEndpoint(),

MissingExporterEndpoint,
#[error("Missing Exporter Task Type")]
MissingExporterTaskType,
}

impl FocusError {
Expand Down
102 changes: 90 additions & 12 deletions src/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,23 @@ use http::header;
use http::HeaderMap;
use http::HeaderValue;
use http::StatusCode;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use std::str;
use tracing::{debug, warn};

use crate::config::CONFIG;
use crate::errors::FocusError;
use crate::util;

#[derive(Clone, PartialEq, Debug, Copy, Serialize, Deserialize)]
#[serde(rename_all = "UPPERCASE")]
pub enum TaskType {
Execute,
Create,
Status,
}

struct Params {
method: &'static str,
Expand All @@ -25,21 +38,13 @@ const EXECUTE: Params = Params {
done: "executed",
};

pub async fn post_exporter_query(body: &String, execute: bool) -> Result<String, FocusError> {
pub async fn post_exporter_query(body: &String, task_type: TaskType) -> Result<String, FocusError> {
let Some(exporter_url) = &CONFIG.exporter_url else {
return Err(FocusError::MissingExporterEndpoint());
return Err(FocusError::MissingExporterEndpoint);
};

let exporter_params = if execute { EXECUTE } else { CREATE };
debug!("{} exporter query...", exporter_params.doing);

let mut headers = HeaderMap::new();

headers.insert(
header::CONTENT_TYPE,
HeaderValue::from_static("application/json"),
);

if let Some(auth_header_value) = CONFIG.auth_header.clone() {
headers.insert(
"x-api-key",
Expand All @@ -48,6 +53,81 @@ pub async fn post_exporter_query(body: &String, execute: bool) -> Result<String,
);
}

if task_type == TaskType::Status {
let value: Value = serde_json::from_str(
String::from_utf8(util::base64_decode(&body)?)
.map_err(|e| {
FocusError::DeserializationError(format!(
r#"Task body is not a valid string {}"#,
e
))
})?
.as_str(),
)
.map_err(|e| {
FocusError::DeserializationError(format!(r#"Task body is not a valid JSON: {}"#, e))
})?;
let id = value["query-execution-id"].as_str();
if id.is_none() {
return Err(FocusError::ParsingError(format!(
r#"Body does not contain the id of the query to check the status of: {}"#,
value
)));
}
let id: &str = id.unwrap(); //we already made sure that it is not None

let resp = CONFIG
.client
.get(format!("{}status?query-execution-id={}", exporter_url, id))
.headers(headers)
.send()
.await
.map_err(FocusError::UnableToGetExporterQueryStatus)?;

debug!("asked for status for query id= {} ", id);

match resp.status() {
StatusCode::OK => {
let text = resp.text().await;
match text {
Ok(ok_text) => {
return Ok(ok_text);
}
Err(e) => {
warn!(
"The code was 200 OK, but can't get the body of the Exporter's response for status of the query id={}, {}", id, e);
return Err(FocusError::ExporterQueryErrorReqwest(format!(
"Error while checking the status of the query id={}, the code was 200 OK, but can't get the body of the Exporter's response: {}",
id, e
)));
}
}
}
code => {
warn!(
"Got unexpected code {code} while checking the status of the query id={}, {:?}",
id, resp
);
return Err(FocusError::ExporterQueryErrorReqwest(format!(
"Error while checking the status of the query id={}, {:?}",
id, resp
)));
}
};
}

let exporter_params = if task_type == TaskType::Execute {
EXECUTE
} else {
CREATE
};
debug!("{} exporter query...", exporter_params.doing);

headers.insert(
header::CONTENT_TYPE,
HeaderValue::from_static("application/json"),
);

let resp = CONFIG
.client
.post(format!("{}{}", exporter_url, exporter_params.method))
Expand All @@ -73,10 +153,8 @@ pub async fn post_exporter_query(body: &String, execute: bool) -> Result<String,
"Error while {} query, the code was 200 OK, but can't get the body of the Exporter's response: {:?}",
exporter_params.doing, body
)));

}
}

}
code => {
warn!(
Expand Down
30 changes: 20 additions & 10 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ type BeamResult = TaskResult<beam_lib::RawString>;
#[derive(Debug, Deserialize, Serialize, Clone)]
struct Metadata {
project: String,
#[serde(default)]
execute: bool,
task_type: Option<exporter::TaskType>,
}

#[derive(Debug, Clone, Default)]
Expand Down Expand Up @@ -149,7 +148,7 @@ async fn process_task(

let metadata: Metadata = serde_json::from_value(task.metadata.clone()).unwrap_or(Metadata {
project: "default_obfuscation".to_string(),
execute: true,
task_type: None
});

if metadata.project == "focus-healthcheck" {
Expand All @@ -160,10 +159,12 @@ async fn process_task(
"healthy".into()
));
}

if metadata.project == "exporter" {
if metadata.task_type.is_none() {
return Err(FocusError::MissingExporterTaskType)
}
let body = &task.body;
return Ok(run_exporter_query(task, body, metadata.execute).await)?;
return Ok(run_exporter_query(task, body, metadata.task_type.unwrap()).await)?; //we already made sure that it is not None
}

if CONFIG.endpoint_type == EndpointType::Blaze {
Expand Down Expand Up @@ -337,7 +338,7 @@ async fn run_intermediate_rep_query(
async fn run_exporter_query(
task: &BeamTask,
body: &String,
execute: bool,
task_type: exporter::TaskType,
) -> Result<BeamResult, FocusError> {
let mut err = beam::beam_result::perm_failed(
CONFIG.beam_app_id_long.clone(),
Expand All @@ -346,7 +347,7 @@ async fn run_exporter_query(
String::new(),
);

let exporter_result = exporter::post_exporter_query(body, execute).await?;
let exporter_result = exporter::post_exporter_query(body, task_type).await?;

let result = beam_result(task.to_owned(), exporter_result).unwrap_or_else(|e| {
err.body = beam_lib::RawString(e.to_string());
Expand Down Expand Up @@ -402,19 +403,28 @@ fn beam_result(task: BeamTask, measure_report: String) -> Result<BeamResult, Foc
))
}


#[cfg(test)]
mod test {
use super::*;

const METADATA_STRING: &str = r#"{"project": "exliquid"}"#;
const METADATA_STRING_EXPORTER: &str = r#"{"project": "exporter", "task_type": "EXECUTE"}"#;

#[test]
fn test_metadata_deserialization_default() {
let metadata: Metadata = serde_json::from_str(METADATA_STRING).unwrap_or(Metadata {
project: "default_obfuscation".to_string(),
execute: true,
task_type: None
});

assert!(!metadata.execute);
assert_eq!(metadata.task_type, None);
}
}

#[test]
fn test_metadata_deserialization_exporter() {
let metadata: Metadata = serde_json::from_str(METADATA_STRING_EXPORTER).unwrap();

assert_eq!(metadata.task_type, Some(exporter::TaskType::Execute));
}
}
73 changes: 73 additions & 0 deletions src/projects/shared/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use std::collections::HashMap;

use indexmap::IndexSet;

use super::{CriterionRole, Project, ProjectName};

#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Hash)]
pub(crate) struct Shared;

impl Project for Shared {
fn append_code_lists(&self, map: &mut HashMap<&'static str, &'static str>) {
map.extend(
[
("icd10", "http://hl7.org/fhir/sid/icd-10"),
("icd10gm", "http://fhir.de/CodeSystem/dimdi/icd-10-gm"),
("icd10gmnew", "http://fhir.de/CodeSystem/bfarm/icd-10-gm"),
("loinc", "http://loinc.org"),
(
"SampleMaterialType",
"https://fhir.bbmri.de/CodeSystem/SampleMaterialType",
),
(
"StorageTemperature",
"https://fhir.bbmri.de/CodeSystem/StorageTemperature",
),
(
"FastingStatus",
"http://terminology.hl7.org/CodeSystem/v2-0916",
),
(
"SmokingStatus",
"http://hl7.org/fhir/uv/ips/ValueSet/current-smoking-status-uv-ips",
),
]);
}

fn append_observation_loinc_codes(&self, map: &mut HashMap<&'static str, &'static str>) {
map.extend(
[
("body_weight", "29463-7"),
("bmi", "39156-5"),
("smoking_status", "72166-2"),
]);
}

fn append_criterion_code_lists(&self, _map: &mut HashMap<&str, Vec<&str>>) {
// none
}

fn append_cql_snippets(&self, _map: &mut HashMap<(&str, CriterionRole), &str>) {
// none
}

fn append_mandatory_code_lists(&self, _set: &mut IndexSet<&str>) {
// none
}

fn append_cql_template(&self, _template: &mut String) {
// none
}

fn append_body(&self, _body: &mut String) {
// none
}

fn name(&self) -> &'static ProjectName {
&ProjectName::NotSpecified
}

fn append_sample_type_workarounds(&self, _map: &mut HashMap<&str, Vec<&str>>) {
//none
}
}

0 comments on commit f73134e

Please sign in to comment.