Skip to content

Commit

Permalink
improve error messages for internal err
Browse files Browse the repository at this point in the history
  • Loading branch information
rubenfiszel committed Feb 5, 2025
1 parent 8092429 commit 2af98cd
Show file tree
Hide file tree
Showing 45 changed files with 283 additions and 260 deletions.
22 changes: 11 additions & 11 deletions backend/windmill-api/src/ai.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,20 +142,20 @@ mod openai {
tracing::debug!("Adding user to request body");
let mut json_body: HashMap<String, Box<RawValue>> = serde_json::from_slice(&body)
.map_err(|e| {
Error::InternalErr(format!("Failed to parse request body: {}", e))
Error::internal_err(format!("Failed to parse request body: {}", e))
})?;

let user_json_string = serde_json::Value::String(user.unwrap()).to_string(); // makes sure to escape characters

json_body.insert(
"user".to_string(),
RawValue::from_string(user_json_string)
.map_err(|e| Error::InternalErr(format!("Failed to parse user: {}", e)))?,
.map_err(|e| Error::internal_err(format!("Failed to parse user: {}", e)))?,
);

body = serde_json::to_vec(&json_body)
.map_err(|e| {
Error::InternalErr(format!("Failed to reserialize request body: {}", e))
Error::internal_err(format!("Failed to reserialize request body: {}", e))
})?
.into();
}
Expand Down Expand Up @@ -204,13 +204,13 @@ mod openai {
.send()
.await
.map_err(|err| {
Error::InternalErr(format!(
Error::internal_err(format!(
"Failed to get OpenAI credentials using credentials flow: {}",
err
))
})?;
let response = response.json::<OpenaiCredentials>().await.map_err(|err| {
Error::InternalErr(format!(
Error::internal_err(format!(
"Failed to parse OpenAI credentials from credentials flow: {}",
err
))
Expand All @@ -220,7 +220,7 @@ mod openai {

pub async fn get_cached_value(db: &DB, w_id: &str, resource: Value) -> Result<KeyCache> {
let config = serde_json::from_value(resource)
.map_err(|e| Error::InternalErr(format!("validating openai resource {e:#}")))?;
.map_err(|e| Error::internal_err(format!("validating openai resource {e:#}")))?;

let mut user = None::<String>;
let mut resource = match config {
Expand Down Expand Up @@ -257,7 +257,7 @@ mod openai {
let azure_base_path = if let Some(azure_base_path) = azure_base_path {
Some(
serde_json::from_value::<String>(azure_base_path).map_err(|e| {
Error::InternalErr(format!("validating openai azure base path {e:#}"))
Error::internal_err(format!("validating openai azure base path {e:#}"))
})?,
)
} else {
Expand Down Expand Up @@ -303,7 +303,7 @@ mod anthropic {

pub async fn get_cached_value(db: &DB, w_id: &str, resource: Value) -> Result<KeyCache> {
let mut resource: AnthropicCache = serde_json::from_value(resource)
.map_err(|e| Error::InternalErr(format!("validating anthropic resource {e:#}")))?;
.map_err(|e| Error::internal_err(format!("validating anthropic resource {e:#}")))?;
resource.api_key = get_variable_or_self(resource.api_key, db, w_id).await?;
Ok(KeyCache::Anthropic(resource))
}
Expand Down Expand Up @@ -335,7 +335,7 @@ mod mistral {

pub async fn get_cached_value(db: &DB, w_id: &str, resource: Value) -> Result<KeyCache> {
let mut resource: MistralCache = serde_json::from_value(resource)
.map_err(|e| Error::InternalErr(format!("validating mistral resource {e:#}")))?;
.map_err(|e| Error::internal_err(format!("validating mistral resource {e:#}")))?;
resource.api_key = get_variable_or_self(resource.api_key, db, w_id).await?;
Ok(KeyCache::Mistral(resource))
}
Expand Down Expand Up @@ -472,7 +472,7 @@ async fn proxy(
.await?;

if ai_resource.is_none() {
return Err(Error::InternalErr("AI resource not configured".to_string()));
return Err(Error::internal_err("AI resource not configured".to_string()));
}

let ai_resource = serde_json::from_value::<AIResource>(ai_resource.unwrap())
Expand All @@ -497,7 +497,7 @@ async fn proxy(
};

if resource.is_none() {
return Err(Error::InternalErr(format!(
return Err(Error::internal_err(format!(
"{:?} resource missing value",
ai_provider
)));
Expand Down
17 changes: 10 additions & 7 deletions backend/windmill-api/src/apps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ async fn list_apps(
.fields(&["dm.deployment_msg"]);
}

let sql = sqlb.sql().map_err(|e| Error::InternalErr(e.to_string()))?;
let sql = sqlb.sql().map_err(|e| Error::internal_err(e.to_string()))?;
let mut tx = user_db.begin(&authed).await?;
let rows = sqlx::query_as::<_, ListableApp>(&sql)
.fetch_all(&mut *tx)
Expand Down Expand Up @@ -611,7 +611,7 @@ async fn get_public_app_by_secret(

let decrypted = mc
.decrypt_bytes_to_bytes(&(hex::decode(secret)?))
.map_err(|e| Error::InternalErr(e.to_string()))?;
.map_err(|e| Error::internal_err(e.to_string()))?;
let bytes = str::from_utf8(&decrypted).map_err(to_anyhow)?;

let id: i64 = bytes.parse().map_err(to_anyhow)?;
Expand Down Expand Up @@ -958,7 +958,7 @@ async fn delete_app(
.execute(&db)
.await
.map_err(|e| {
Error::InternalErr(format!(
Error::internal_err(format!(
"error deleting deployment metadata for script with path {path} in workspace {w_id}: {e:#}"
))
})?;
Expand Down Expand Up @@ -1061,7 +1061,7 @@ async fn update_app(

sqlb.returning("path");

let sql = sqlb.sql().map_err(|e| Error::InternalErr(e.to_string()))?;
let sql = sqlb.sql().map_err(|e| Error::internal_err(e.to_string()))?;
let npath_o: Option<String> = sqlx::query_scalar(&sql).fetch_optional(&mut *tx).await?;
not_found_if_none(npath_o, "App", path)?
} else {
Expand Down Expand Up @@ -1710,7 +1710,7 @@ async fn upload_s3_file_from_app(
}
};

let s3_resource = s3_resource_opt.ok_or(Error::InternalErr(
let s3_resource = s3_resource_opt.ok_or(Error::internal_err(
"No files storage resource defined at the workspace level".to_string(),
))?;
let s3_client = build_object_store_client(&s3_resource).await?;
Expand Down Expand Up @@ -2034,7 +2034,10 @@ async fn build_args(
safe_args.insert(
k.to_string(),
to_raw_value(&value.unwrap_or(Ok(serde_json::Value::Null)).map_err(|e| {
Error::InternalErr(format!("failed to serialize ctx variable for {}: {}", k, e))
Error::internal_err(format!(
"failed to serialize ctx variable for {}: {}",
k, e
))
})?),
);
} else if !arg_str.contains("\"$var:") && !arg_str.contains("\"$res:") {
Expand All @@ -2054,7 +2057,7 @@ async fn build_args(
),
)
.map_err(|e| {
Error::InternalErr(format!(
Error::internal_err(format!(
"failed to remove sensitive variable(s)/resource(s) with error: {}",
e
))
Expand Down
2 changes: 1 addition & 1 deletion backend/windmill-api/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ async fn get_capture_trigger_config_and_owner<T: DeserializeOwned>(

Ok((
serde_json::from_str(trigger_config.get()).map_err(|e| {
Error::InternalErr(format!(
Error::internal_err(format!(
"error parsing capture config for {} trigger: {}",
kind, e
))
Expand Down
4 changes: 2 additions & 2 deletions backend/windmill-api/src/concurrency_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use sql_builder::bind::Bind;
use sql_builder::SqlBuilder;
use uuid::Uuid;
use windmill_common::db::UserDB;
use windmill_common::error::Error::{InternalErr, PermissionDenied};
use windmill_common::error::Error::PermissionDenied;
use windmill_common::error::{self, JsonResult};
use windmill_common::utils::require_admin;

Expand Down Expand Up @@ -82,7 +82,7 @@ async fn prune_concurrency_group(

if n_job_uuids > 0 {
tx.commit().await?;
return Err(InternalErr(
return Err(error::Error::internal_err(
"Concurrency group is currently in use, unable to remove it. Retry later.".to_string(),
));
}
Expand Down
4 changes: 2 additions & 2 deletions backend/windmill-api/src/embeddings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ async fn query_hub_scripts(

Ok(Json(results))
} else {
Err(windmill_common::error::Error::InternalErr(
Err(windmill_common::error::Error::internal_err(
"Embeddings db not initialized".to_string(),
))
}
Expand Down Expand Up @@ -124,7 +124,7 @@ async fn query_resource_types(

Ok(Json(results))
} else {
Err(windmill_common::error::Error::InternalErr(
Err(windmill_common::error::Error::internal_err(
"Embeddings db not initialized".to_string(),
))
}
Expand Down
22 changes: 11 additions & 11 deletions backend/windmill-api/src/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ async fn list_flows(
.fields(&["dm.deployment_msg"]);
}

let sql = sqlb.sql().map_err(|e| Error::InternalErr(e.to_string()))?;
let sql = sqlb.sql().map_err(|e| Error::internal_err(e.to_string()))?;
let mut tx = user_db.begin(&authed).await?;
let rows = sqlx::query_as::<_, ListableFlow>(&sql)
.fetch_all(&mut *tx)
Expand Down Expand Up @@ -704,7 +704,7 @@ async fn update_flow(
w_id,
)
.execute(&mut *tx)
.await.map_err(|e| error::Error::InternalErr(format!("Error updating flow due to flow update: {e:#}")))?;
.await.map_err(|e| error::Error::internal_err(format!("Error updating flow due to flow update: {e:#}")))?;

if is_new_path {
// if new path, must clone flow to new path and delete old flow for flow_version foreign key constraint
Expand All @@ -721,7 +721,7 @@ async fn update_flow(
.execute(&mut *tx)
.await
.map_err(|e| {
error::Error::InternalErr(format!("Error updating flow due to create new flow: {e:#}"))
error::Error::internal_err(format!("Error updating flow due to create new flow: {e:#}"))
})?;

sqlx::query!(
Expand All @@ -733,7 +733,7 @@ async fn update_flow(
.execute(&mut *tx)
.await
.map_err(|e| {
error::Error::InternalErr(format!(
error::Error::internal_err(format!(
"Error updating flow due to updating flow history path: {e:#}"
))
})?;
Expand All @@ -746,7 +746,7 @@ async fn update_flow(
.execute(&mut *tx)
.await
.map_err(|e| {
error::Error::InternalErr(format!(
error::Error::internal_err(format!(
"Error updating flow due to deleting old flow: {e:#}"
))
})?;
Expand Down Expand Up @@ -781,7 +781,7 @@ async fn update_flow(
.fetch_one(&mut *tx)
.await
.map_err(|e| {
error::Error::InternalErr(format!(
error::Error::internal_err(format!(
"Error updating flow due to flow history insert: {e:#}"
))
})?;
Expand All @@ -805,15 +805,15 @@ async fn update_flow(
.bind(&flow_path)
.bind(&w_id)
.fetch_all(&mut *tx)
.await.map_err(|e| error::Error::InternalErr(format!("Error updating flow due to related schedules update: {e:#}")))?;
.await.map_err(|e| error::Error::internal_err(format!("Error updating flow due to related schedules update: {e:#}")))?;

let schedule = sqlx::query_as::<_, Schedule>(
"UPDATE schedule SET path = $1, script_path = $1 WHERE path = $2 AND workspace_id = $3 AND is_flow IS true RETURNING *")
.bind(&nf.path)
.bind(&flow_path)
.bind(&w_id)
.fetch_optional(&mut *tx)
.await.map_err(|e| error::Error::InternalErr(format!("Error updating flow due to related schedule update: {e:#}")))?;
.await.map_err(|e| error::Error::internal_err(format!("Error updating flow due to related schedule update: {e:#}")))?;

if let Some(schedule) = schedule {
clear_schedule(&mut tx, &flow_path, &w_id).await?;
Expand Down Expand Up @@ -907,7 +907,7 @@ async fn update_flow(
.execute(&mut *new_tx)
.await
.map_err(|e| {
error::Error::InternalErr(format!(
error::Error::internal_err(format!(
"Error updating flow due to updating dependency job field: {e:#}"
))
})?;
Expand All @@ -919,7 +919,7 @@ async fn update_flow(
.execute(&mut *new_tx)
.await
.map_err(|e| {
error::Error::InternalErr(format!(
error::Error::internal_err(format!(
"Error updating flow due to cancelling dependency job: {e:#}"
))
})?;
Expand Down Expand Up @@ -1204,7 +1204,7 @@ async fn delete_flow_by_path(
.execute(&db)
.await
.map_err(|e| {
Error::InternalErr(format!(
Error::internal_err(format!(
"error deleting deployment metadata for script with path {path} in workspace {w_id}: {e:#}"
))
})?;
Expand Down
2 changes: 1 addition & 1 deletion backend/windmill-api/src/folders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ async fn update_folder(

let sql = sqlb
.sql()
.map_err(|e| error::Error::InternalErr(e.to_string()))?;
.map_err(|e| error::Error::internal_err(e.to_string()))?;
let nfolder = sqlx::query_as::<_, Folder>(&sql)
.fetch_optional(&mut *tx)
.await?;
Expand Down
8 changes: 4 additions & 4 deletions backend/windmill-api/src/http_triggers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ async fn list_triggers(
}
let sql = sqlb
.sql()
.map_err(|e| error::Error::InternalErr(e.to_string()))?;
.map_err(|e| error::Error::internal_err(e.to_string()))?;
let rows = sqlx::query_as::<_, Trigger>(&sql)
.fetch_all(&mut *tx)
.await?;
Expand Down Expand Up @@ -590,7 +590,7 @@ async fn route_job(

#[cfg(not(feature = "parquet"))]
if trigger.static_asset_config.is_some() {
return error::Error::InternalErr(
return error::Error::internal_err(
"Static asset configuration is not supported in this build".to_string(),
)
.into_response();
Expand All @@ -608,14 +608,14 @@ async fn route_job(
config.storage,
)
.await?;
let s3_resource = s3_resource_opt.ok_or(error::Error::InternalErr(
let s3_resource = s3_resource_opt.ok_or(error::Error::internal_err(
"No files storage resource defined at the workspace level".to_string(),
))?;
let s3_client = build_object_store_client(&s3_resource).await?;
let path = object_store::path::Path::from(config.s3);
let s3_object = s3_client.get(&path).await.map_err(|err| {
tracing::warn!("Error retrieving file from S3: {:?}", err);
error::Error::InternalErr(format!("Error retrieving file: {}", err.to_string()))
error::Error::internal_err(format!("Error retrieving file: {}", err.to_string()))
})?;
let mut response_headers = http::HeaderMap::new();
if let Some(ref e_tag) = s3_object.meta.e_tag {
Expand Down
10 changes: 5 additions & 5 deletions backend/windmill-api/src/job_helpers_ee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub async fn get_s3_resource<'c>(
_resource_type: Option<StorageResourceType>,
_job_id: Option<Uuid>,
) -> error::Result<ObjectStoreResource> {
Err(error::Error::InternalErr(
Err(error::Error::internal_err(
"Not implemented in Windmill's Open Source repository".to_string(),
))
}
Expand All @@ -81,7 +81,7 @@ pub async fn upload_file_from_req(
_req: axum::extract::Request,
_options: PutMultipartOpts,
) -> error::Result<()> {
Err(error::Error::InternalErr(
Err(error::Error::internal_err(
"Not implemented in Windmill's Open Source repository".to_string(),
))
}
Expand All @@ -93,7 +93,7 @@ pub async fn upload_file_internal(
_stream: impl Stream<Item = Result<Bytes, std::io::Error>> + Unpin,
_options: PutMultipartOpts,
) -> error::Result<()> {
Err(error::Error::InternalErr(
Err(error::Error::internal_err(
"Not implemented in Windmill's Open Source repository".to_string(),
))
}
Expand All @@ -107,7 +107,7 @@ pub async fn download_s3_file_internal(
_w_id: &str,
_query: DownloadFileQuery,
) -> error::Result<Response> {
Err(error::Error::InternalErr(
Err(error::Error::internal_err(
"Not implemented in Windmill's Open Source repository".to_string(),
))
}
Expand All @@ -120,7 +120,7 @@ pub async fn load_image_preview_internal(
_w_id: &str,
_query: LoadImagePreviewQuery,
) -> error::Result<Response> {
Err(error::Error::InternalErr(
Err(error::Error::internal_err(
"Not implemented in Windmill's Open Source repository".to_string(),
))
}
Loading

0 comments on commit 2af98cd

Please sign in to comment.