From 008a95d4ade2a71acd4021933e2d70d9915089b1 Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Mon, 29 Apr 2024 11:38:30 +0100 Subject: [PATCH 1/5] feat(scheduler): add csv support for get_file_metadata grpc method --- .../scheduler/src/scheduler_server/grpc.rs | 84 +++++++++++++++++-- .../scheduler/src/state/execution_graph.rs | 25 +++--- 2 files changed, 90 insertions(+), 19 deletions(-) diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs index 2d759fb7b..c636896b7 100644 --- a/ballista/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/scheduler/src/scheduler_server/grpc.rs @@ -36,6 +36,7 @@ use ballista_core::serde::protobuf::{ }; use ballista_core::serde::scheduler::ExecutorMetadata; +use datafusion::datasource::file_format::csv::CsvFormat; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; use datafusion_proto::logical_plan::AsLogicalPlan; @@ -297,13 +298,24 @@ impl SchedulerGrpc // TODO shouldn't this take a ListingOption object as input? let GetFileMetadataParams { path, file_type } = request.into_inner(); - let file_format: Arc = match file_type.as_str() { + let file_format: Result, Status> = match file_type.as_str() { "parquet" => Ok(Arc::new(ParquetFormat::default())), - // TODO implement for CSV + "csv" => { + let format = CsvFormat::default() + .with_delimiter(b',') + .with_has_header(true); + Ok(Arc::new(format)) + } + "tbl" => { + let format = CsvFormat::default() + .with_delimiter(b'|') + .with_has_header(false); + Ok(Arc::new(format)) + } _ => Err(tonic::Status::unimplemented( "get_file_metadata unsupported file type", )), - }?; + }; let path = Path::from(path.as_str()); let file_metas: Vec<_> = obj_store @@ -316,7 +328,7 @@ impl SchedulerGrpc tonic::Status::internal(msg) })?; - let schema = file_format + let schema = file_format? .infer_schema(&state, &obj_store, &file_metas) .await .map_err(|e| { @@ -641,15 +653,15 @@ mod test { use datafusion_proto::protobuf::LogicalPlanNode; use datafusion_proto::protobuf::PhysicalPlanNode; - use tonic::Request; + use tonic::{Code, Request}; use crate::config::SchedulerConfig; use crate::metrics::default_metrics_collector; use ballista_core::error::BallistaError; use ballista_core::serde::protobuf::{ executor_registration::OptionalHost, executor_status, ExecutorRegistration, - ExecutorStatus, ExecutorStoppedParams, HeartBeatParams, PollWorkParams, - RegisterExecutorParams, + ExecutorStatus, ExecutorStoppedParams, GetFileMetadataParams, + GetFileMetadataResult, HeartBeatParams, PollWorkParams, RegisterExecutorParams, }; use ballista_core::serde::scheduler::ExecutorSpecification; use ballista_core::serde::BallistaCodec; @@ -832,6 +844,64 @@ mod test { Ok(()) } + #[tokio::test] + async fn test_get_file_metadata() -> Result<(), BallistaError> { + let cluster = test_cluster_context(); + let config = SchedulerConfig::default(); + let mut scheduler: SchedulerServer = + SchedulerServer::new( + "localhost:50050".to_owned(), + cluster, + BallistaCodec::default(), + Arc::new(config), + default_metrics_collector().unwrap(), + ); + scheduler.init().await?; + + let requests = vec![ + Request::new(GetFileMetadataParams { + path: "/tmp/file.parquet".to_string(), + file_type: "parquet".to_string(), + }), + Request::new(GetFileMetadataParams { + path: "/tmp/file.csv".to_string(), + file_type: "csv".to_string(), + }), + Request::new(GetFileMetadataParams { + path: "/tmp/file.tbl".to_string(), + file_type: "tbl".to_string(), + }), + ]; + for request in requests { + let response = scheduler + .get_file_metadata(request) + .await + .expect("Received error response") + .into_inner(); + assert_eq!( + response, + GetFileMetadataResult { + schema: Some(datafusion_proto::protobuf::Schema { + columns: vec![], + metadata: Default::default(), + }), + } + ); + } + + let request: Request = + Request::new(GetFileMetadataParams { + path: "/tmp/file.avro".to_string(), + file_type: "avro".to_string(), + }); + let avro_response = scheduler + .get_file_metadata(request) + .await + .expect_err("get_file_metadata unsupported file type"); + assert_eq!(avro_response.code(), Code::Unimplemented); + Ok(()) + } + #[tokio::test] async fn test_register_executor_in_heartbeat_service() -> Result<(), BallistaError> { let cluster = test_cluster_context(); diff --git a/ballista/scheduler/src/state/execution_graph.rs b/ballista/scheduler/src/state/execution_graph.rs index 70b2659f9..be7c5c4eb 100644 --- a/ballista/scheduler/src/state/execution_graph.rs +++ b/ballista/scheduler/src/state/execution_graph.rs @@ -340,7 +340,7 @@ impl ExecutionGraph { match failed_reason { Some(FailedReason::FetchPartitionError( - fetch_partiton_error, + fetch_partition_error, )) => { let failed_attempts = failed_stage_attempts .entry(stage_id) @@ -348,12 +348,12 @@ impl ExecutionGraph { failed_attempts.insert(task_stage_attempt_num); if failed_attempts.len() < max_stage_failures { let map_stage_id = - fetch_partiton_error.map_stage_id as usize; - let map_partition_id = fetch_partiton_error + fetch_partition_error.map_stage_id as usize; + let map_partition_id = fetch_partition_error .map_partition_id as usize; let executor_id = - fetch_partiton_error.executor_id; + fetch_partition_error.executor_id; if !failed_stages.is_empty() { let error_msg = format!( @@ -436,7 +436,7 @@ impl ExecutionGraph { successful_task, )) = task_status.status { - // update task metrics for successfu task + // update task metrics for successful task running_stage .update_task_metrics(partition_id, operator_metrics)?; @@ -510,30 +510,31 @@ impl ExecutionGraph { failed_stages.insert(stage_id, failed_task.error); } Some(FailedReason::FetchPartitionError( - fetch_partiton_error, + fetch_partition_error, )) if failed_stages.is_empty() && current_running_stages.contains( - &(fetch_partiton_error.map_stage_id as usize), + &(fetch_partition_error.map_stage_id + as usize), ) && !unsolved_stage .last_attempt_failure_reasons .contains( - &fetch_partiton_error.executor_id, + &fetch_partition_error.executor_id, ) => { should_ignore = false; unsolved_stage .last_attempt_failure_reasons .insert( - fetch_partiton_error.executor_id.clone(), + fetch_partition_error.executor_id.clone(), ); let map_stage_id = - fetch_partiton_error.map_stage_id as usize; - let map_partition_id = fetch_partiton_error + fetch_partition_error.map_stage_id as usize; + let map_partition_id = fetch_partition_error .map_partition_id as usize; let executor_id = - fetch_partiton_error.executor_id; + fetch_partition_error.executor_id; let removed_map_partitions = unsolved_stage .remove_input_partitions( map_stage_id, From b5a6e5e6b576926b835223a0b12b8c7e761d18b7 Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Thu, 2 May 2024 14:20:25 +0100 Subject: [PATCH 2/5] fix(scheduler): adjust failing tests --- ballista/scheduler/Cargo.toml | 1 + ballista/scheduler/src/scheduler_server/grpc.rs | 14 ++++++++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml index b11b091f5..a165e554f 100644 --- a/ballista/scheduler/Cargo.toml +++ b/ballista/scheduler/Cargo.toml @@ -84,6 +84,7 @@ warp = "0.3" [dev-dependencies] ballista-core = { path = "../core", version = "0.12.0" } +tempfile = "3" [build-dependencies] configure_me_codegen = { workspace = true } diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs index c636896b7..a68920352 100644 --- a/ballista/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/scheduler/src/scheduler_server/grpc.rs @@ -653,6 +653,8 @@ mod test { use datafusion_proto::protobuf::LogicalPlanNode; use datafusion_proto::protobuf::PhysicalPlanNode; + use object_store::path::Path; + use tempfile::NamedTempFile; use tonic::{Code, Request}; use crate::config::SchedulerConfig; @@ -858,17 +860,21 @@ mod test { ); scheduler.init().await?; + let mut test_file = NamedTempFile::new()?; + let source_location_str = + Path::from(test_file.as_ref().to_str().unwrap()).to_string(); + let requests = vec![ Request::new(GetFileMetadataParams { - path: "/tmp/file.parquet".to_string(), + path: source_location_str.clone(), file_type: "parquet".to_string(), }), Request::new(GetFileMetadataParams { - path: "/tmp/file.csv".to_string(), + path: source_location_str.clone(), file_type: "csv".to_string(), }), Request::new(GetFileMetadataParams { - path: "/tmp/file.tbl".to_string(), + path: source_location_str.clone(), file_type: "tbl".to_string(), }), ]; @@ -891,7 +897,7 @@ mod test { let request: Request = Request::new(GetFileMetadataParams { - path: "/tmp/file.avro".to_string(), + path: source_location_str, file_type: "avro".to_string(), }); let avro_response = scheduler From abe5118345fa6df8df100a0e397e8aa37fa54fa8 Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Thu, 2 May 2024 15:43:42 +0100 Subject: [PATCH 3/5] chore(scheduler): adjust per clippy --- ballista/scheduler/src/scheduler_server/grpc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs index a68920352..4b21d2195 100644 --- a/ballista/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/scheduler/src/scheduler_server/grpc.rs @@ -860,7 +860,7 @@ mod test { ); scheduler.init().await?; - let mut test_file = NamedTempFile::new()?; + let test_file = NamedTempFile::new()?; let source_location_str = Path::from(test_file.as_ref().to_str().unwrap()).to_string(); From 455714320c1b03820d33a2f3b4b10befb3808e09 Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Thu, 2 May 2024 16:12:30 +0100 Subject: [PATCH 4/5] chore(scheduler): more clippy --- ballista/scheduler/src/api/handlers.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/ballista/scheduler/src/api/handlers.rs b/ballista/scheduler/src/api/handlers.rs index 0645147c7..f10fe4793 100644 --- a/ballista/scheduler/src/api/handlers.rs +++ b/ballista/scheduler/src/api/handlers.rs @@ -33,6 +33,7 @@ struct SchedulerStateResponse { version: &'static str, } +#[allow(dead_code)] #[derive(Debug, serde::Serialize)] struct ExecutorsResponse { executors: Vec, From f6fcfda37dfa3639d7fe6b171ca683f2cdea14e2 Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Thu, 2 May 2024 19:07:43 +0100 Subject: [PATCH 5/5] chore(scheduler): apply another clippy suggestion --- ballista-cli/src/exec.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ballista-cli/src/exec.rs b/ballista-cli/src/exec.rs index bd075b41c..c449c8d68 100644 --- a/ballista-cli/src/exec.rs +++ b/ballista-cli/src/exec.rs @@ -49,11 +49,11 @@ pub async fn exec_from_lines( let line = line.trim_end(); query.push_str(line); if line.ends_with(';') { - match exec_and_print(ctx, print_options, query).await { + match exec_and_print(ctx, print_options, query.clone()).await { Ok(_) => {} Err(err) => println!("{err:?}"), } - query = "".to_owned(); + "".clone_into(&mut query); } else { query.push('\n'); }