Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(scheduler): add csv support for get_file_metadata grpc method #1011

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ballista-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ pub async fn exec_from_lines(
let line = line.trim_end();
query.push_str(line);
if line.ends_with(';') {
match exec_and_print(ctx, print_options, query).await {
match exec_and_print(ctx, print_options, query.clone()).await {
Ok(_) => {}
Err(err) => println!("{err:?}"),
}
query = "".to_owned();
"".clone_into(&mut query);
} else {
query.push('\n');
}
Expand Down
1 change: 1 addition & 0 deletions ballista/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ warp = "0.3"

[dev-dependencies]
ballista-core = { path = "../core", version = "0.12.0" }
tempfile = "3"

[build-dependencies]
configure_me_codegen = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions ballista/scheduler/src/api/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct SchedulerStateResponse {
version: &'static str,
}

#[allow(dead_code)]
#[derive(Debug, serde::Serialize)]
struct ExecutorsResponse {
executors: Vec<ExecutorMetaResponse>,
Expand Down
90 changes: 83 additions & 7 deletions ballista/scheduler/src/scheduler_server/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use ballista_core::serde::protobuf::{
};
use ballista_core::serde::scheduler::ExecutorMetadata;

use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion_proto::logical_plan::AsLogicalPlan;
Expand Down Expand Up @@ -297,13 +298,24 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
// TODO shouldn't this take a ListingOption object as input?

let GetFileMetadataParams { path, file_type } = request.into_inner();
let file_format: Arc<dyn FileFormat> = match file_type.as_str() {
let file_format: Result<Arc<dyn FileFormat>, Status> = match file_type.as_str() {
"parquet" => Ok(Arc::new(ParquetFormat::default())),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've noticed that in the benchmark there's slightly different default for parquet:

ParquetFormat::default().with_enable_pruning(Some(true))

I wonder if we need consistency or it's ok to leave it as is?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think pruning is default already, so in practice it will be the same.

// TODO implement for CSV
"csv" => {
let format = CsvFormat::default()
.with_delimiter(b',')
.with_has_header(true);
Ok(Arc::new(format))
}
"tbl" => {
let format = CsvFormat::default()
.with_delimiter(b'|')
.with_has_header(false);
Ok(Arc::new(format))
}
_ => Err(tonic::Status::unimplemented(
"get_file_metadata unsupported file type",
)),
}?;
Copy link
Contributor Author

@etolbakov etolbakov Apr 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to drop the "short circuit" from here to resolve the rust compilation error:

Type mismatch [E0308] expected `Result<Arc<CsvFormat>, Status>`, but found `Result<Arc<ParquetFormat>, Status>` 

please let me know if there's a better way of doing it?

};

let path = Path::from(path.as_str());
let file_metas: Vec<_> = obj_store
Expand All @@ -316,7 +328,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
tonic::Status::internal(msg)
})?;

let schema = file_format
let schema = file_format?
.infer_schema(&state, &obj_store, &file_metas)
.await
.map_err(|e| {
Expand Down Expand Up @@ -641,15 +653,17 @@ mod test {

use datafusion_proto::protobuf::LogicalPlanNode;
use datafusion_proto::protobuf::PhysicalPlanNode;
use tonic::Request;
use object_store::path::Path;
use tempfile::NamedTempFile;
use tonic::{Code, Request};

use crate::config::SchedulerConfig;
use crate::metrics::default_metrics_collector;
use ballista_core::error::BallistaError;
use ballista_core::serde::protobuf::{
executor_registration::OptionalHost, executor_status, ExecutorRegistration,
ExecutorStatus, ExecutorStoppedParams, HeartBeatParams, PollWorkParams,
RegisterExecutorParams,
ExecutorStatus, ExecutorStoppedParams, GetFileMetadataParams,
GetFileMetadataResult, HeartBeatParams, PollWorkParams, RegisterExecutorParams,
};
use ballista_core::serde::scheduler::ExecutorSpecification;
use ballista_core::serde::BallistaCodec;
Expand Down Expand Up @@ -832,6 +846,68 @@ mod test {
Ok(())
}

#[tokio::test]
async fn test_get_file_metadata() -> Result<(), BallistaError> {
let cluster = test_cluster_context();
let config = SchedulerConfig::default();
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new(
"localhost:50050".to_owned(),
cluster,
BallistaCodec::default(),
Arc::new(config),
default_metrics_collector().unwrap(),
);
scheduler.init().await?;

let test_file = NamedTempFile::new()?;
let source_location_str =
Path::from(test_file.as_ref().to_str().unwrap()).to_string();

let requests = vec![
Request::new(GetFileMetadataParams {
path: source_location_str.clone(),
file_type: "parquet".to_string(),
}),
Request::new(GetFileMetadataParams {
path: source_location_str.clone(),
file_type: "csv".to_string(),
}),
Request::new(GetFileMetadataParams {
path: source_location_str.clone(),
file_type: "tbl".to_string(),
}),
];
for request in requests {
let response = scheduler
.get_file_metadata(request)
.await
.expect("Received error response")
.into_inner();
assert_eq!(
response,
GetFileMetadataResult {
schema: Some(datafusion_proto::protobuf::Schema {
columns: vec![],
metadata: Default::default(),
}),
}
);
}

let request: Request<GetFileMetadataParams> =
Request::new(GetFileMetadataParams {
path: source_location_str,
file_type: "avro".to_string(),
});
let avro_response = scheduler
.get_file_metadata(request)
.await
.expect_err("get_file_metadata unsupported file type");
assert_eq!(avro_response.code(), Code::Unimplemented);
Ok(())
}

#[tokio::test]
async fn test_register_executor_in_heartbeat_service() -> Result<(), BallistaError> {
let cluster = test_cluster_context();
Expand Down
25 changes: 13 additions & 12 deletions ballista/scheduler/src/state/execution_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,20 +340,20 @@ impl ExecutionGraph {

match failed_reason {
Some(FailedReason::FetchPartitionError(
fetch_partiton_error,
fetch_partition_error,
)) => {
let failed_attempts = failed_stage_attempts
.entry(stage_id)
.or_default();
failed_attempts.insert(task_stage_attempt_num);
if failed_attempts.len() < max_stage_failures {
let map_stage_id =
fetch_partiton_error.map_stage_id as usize;
let map_partition_id = fetch_partiton_error
fetch_partition_error.map_stage_id as usize;
let map_partition_id = fetch_partition_error
.map_partition_id
as usize;
let executor_id =
fetch_partiton_error.executor_id;
fetch_partition_error.executor_id;

if !failed_stages.is_empty() {
let error_msg = format!(
Expand Down Expand Up @@ -436,7 +436,7 @@ impl ExecutionGraph {
successful_task,
)) = task_status.status
{
// update task metrics for successfu task
// update task metrics for successful task
running_stage
.update_task_metrics(partition_id, operator_metrics)?;

Expand Down Expand Up @@ -510,30 +510,31 @@ impl ExecutionGraph {
failed_stages.insert(stage_id, failed_task.error);
}
Some(FailedReason::FetchPartitionError(
fetch_partiton_error,
fetch_partition_error,
)) if failed_stages.is_empty()
&& current_running_stages.contains(
&(fetch_partiton_error.map_stage_id as usize),
&(fetch_partition_error.map_stage_id
as usize),
)
&& !unsolved_stage
.last_attempt_failure_reasons
.contains(
&fetch_partiton_error.executor_id,
&fetch_partition_error.executor_id,
) =>
{
should_ignore = false;
unsolved_stage
.last_attempt_failure_reasons
.insert(
fetch_partiton_error.executor_id.clone(),
fetch_partition_error.executor_id.clone(),
);
let map_stage_id =
fetch_partiton_error.map_stage_id as usize;
let map_partition_id = fetch_partiton_error
fetch_partition_error.map_stage_id as usize;
let map_partition_id = fetch_partition_error
.map_partition_id
as usize;
let executor_id =
fetch_partiton_error.executor_id;
fetch_partition_error.executor_id;
let removed_map_partitions = unsolved_stage
.remove_input_partitions(
map_stage_id,
Expand Down
Loading