From c40694af453440cae5c99fb4a873c61f811c71c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=8B=E5=85=89=E7=92=A0?= Date: Thu, 19 Dec 2024 15:40:24 +0800 Subject: [PATCH] fix1 --- be/src/common/config.cpp | 2 + be/src/common/config.h | 3 + be/src/http/action/stream_load.cpp | 409 +++++++++++++++++++++++++++++ 3 files changed, 414 insertions(+) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 95a3e61fb5517aa..87f705d43709129 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -106,6 +106,8 @@ DEFINE_String(memory_mode, "moderate"); DEFINE_mBool(enable_use_cgroup_memory_info, "true"); +DEFINE_mBool(enable_stream_load_mysql, "false"); + // process memory limit specified as number of bytes // ('[bB]?'), megabytes ('[mM]'), gigabytes ('[gG]'), // or percentage of the physical memory ('%'). diff --git a/be/src/common/config.h b/be/src/common/config.h index f8a9c3f7480b337..8a676e6e11bf4f7 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -225,6 +225,9 @@ DECLARE_mInt64(crash_in_alloc_large_memory_bytes); // 2. print more memory logs. DECLARE_mBool(crash_in_memory_tracker_inaccurate); +// config to enable Stream Load functionality through SQL statements. +DECLARE_mBool(enable_stream_load_mysql); + // default is true. if any memory tracking in Orphan mem tracker will report error. // !! not modify the default value of this conf!! otherwise memory errors cannot be detected in time. // allocator free memory not need to check, because when the thread memory tracker label is Orphan, diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index e8db5cb542fb4bf..7e6dceb1bdf6162 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -420,6 +420,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, request.__set_compress_type(ctx->compress_type); request.__set_header_type(ctx->header_type); request.__set_loadId(ctx->id.to_thrift()); + if (ctx->use_streaming) { std::shared_ptr pipe; if (ctx->is_chunked_transfer) { @@ -443,6 +444,37 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, request.__set_file_size(ctx->body_bytes); ctx->body_sink = file_sink; } + + if (config::enable_stream_load_mysql) { + std::string table_name = ctx->table; + std::string columns, where_clause, column_separator = ",", format = "csv"; + std::vector sql_parts; + + if (!http_req->header(HTTP_COLUMNS).empty()) { + columns = http_req->header(HTTP_COLUMNS); + sql_parts.emplace_back("SELECT " + columns); + } else { + sql_parts.emplace_back("SELECT *"); + } + + sql_parts.emplace_back("FROM http_stream(\"format\" = \"" + format + + "\", \"column_separator\" = \"" + column_separator + "\")"); + + if (!http_req->header(HTTP_WHERE).empty()) { + where_clause = "WHERE " + http_req->header(HTTP_WHERE); + } + + std::string full_sql = "INSERT INTO " + table_name + " "; + for (size_t i = 0; i < sql_parts.size(); i++) { + full_sql += sql_parts[i]; + if (i != sql_parts.size() - 1) full_sql += ", "; + } + full_sql += where_clause; + + LOG(INFO) << "Generated SQL: " << full_sql; + request.__set_load_sql(full_sql); + } + if (!http_req->header(HTTP_COLUMNS).empty()) { request.__set_columns(http_req->header(HTTP_COLUMNS)); } @@ -782,6 +814,383 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, return _exec_env->stream_load_executor()->execute_plan_fragment(ctx); } +//Status StreamLoadAction::_process_put(HttpRequest* http_req, +// std::shared_ptr ctx) { +// // Now we use stream +// ctx->use_streaming = LoadUtil::is_format_support_streaming(ctx->format); +// +// // put request +// TStreamLoadPutRequest request; +// set_request_auth(&request, ctx->auth); +// request.db = ctx->db; +// request.tbl = ctx->table; +// request.txnId = ctx->txn_id; +// request.formatType = ctx->format; +// request.__set_compress_type(ctx->compress_type); +// request.__set_header_type(ctx->header_type); +// request.__set_loadId(ctx->id.to_thrift()); +// if (ctx->use_streaming) { +// std::shared_ptr pipe; +// if (ctx->is_chunked_transfer) { +// pipe = std::make_shared( +// io::kMaxPipeBufferedBytes /* max_buffered_bytes */); +// } else { +// pipe = std::make_shared( +// io::kMaxPipeBufferedBytes /* max_buffered_bytes */, +// MIN_CHUNK_SIZE /* min_chunk_size */, ctx->body_bytes /* total_length */); +// } +// request.fileType = TFileType::FILE_STREAM; +// ctx->body_sink = pipe; +// ctx->pipe = pipe; +// RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx)); +// } else { +// RETURN_IF_ERROR(_data_saved_path(http_req, &request.path)); +// auto file_sink = std::make_shared(request.path); +// RETURN_IF_ERROR(file_sink->open()); +// request.__isset.path = true; +// request.fileType = TFileType::FILE_LOCAL; +// request.__set_file_size(ctx->body_bytes); +// ctx->body_sink = file_sink; +// } +// if (!http_req->header(HTTP_COLUMNS).empty()) { +// request.__set_columns(http_req->header(HTTP_COLUMNS)); +// } +// if (!http_req->header(HTTP_WHERE).empty()) { +// request.__set_where(http_req->header(HTTP_WHERE)); +// } +// if (!http_req->header(HTTP_COLUMN_SEPARATOR).empty()) { +// request.__set_columnSeparator(http_req->header(HTTP_COLUMN_SEPARATOR)); +// } +// if (!http_req->header(HTTP_LINE_DELIMITER).empty()) { +// request.__set_line_delimiter(http_req->header(HTTP_LINE_DELIMITER)); +// } +// if (!http_req->header(HTTP_ENCLOSE).empty() && !http_req->header(HTTP_ENCLOSE).empty()) { +// const auto& enclose_str = http_req->header(HTTP_ENCLOSE); +// if (enclose_str.length() != 1) { +// return Status::InvalidArgument("enclose must be single-char, actually is {}", +// enclose_str); +// } +// request.__set_enclose(http_req->header(HTTP_ENCLOSE)[0]); +// } +// if (!http_req->header(HTTP_ESCAPE).empty() && !http_req->header(HTTP_ESCAPE).empty()) { +// const auto& escape_str = http_req->header(HTTP_ESCAPE); +// if (escape_str.length() != 1) { +// return Status::InvalidArgument("escape must be single-char, actually is {}", +// escape_str); +// } +// request.__set_escape(http_req->header(HTTP_ESCAPE)[0]); +// } +// if (!http_req->header(HTTP_PARTITIONS).empty()) { +// request.__set_partitions(http_req->header(HTTP_PARTITIONS)); +// request.__set_isTempPartition(false); +// if (!http_req->header(HTTP_TEMP_PARTITIONS).empty()) { +// return Status::InvalidArgument( +// "Can not specify both partitions and temporary partitions"); +// } +// } +// if (!http_req->header(HTTP_TEMP_PARTITIONS).empty()) { +// request.__set_partitions(http_req->header(HTTP_TEMP_PARTITIONS)); +// request.__set_isTempPartition(true); +// if (!http_req->header(HTTP_PARTITIONS).empty()) { +// return Status::InvalidArgument( +// "Can not specify both partitions and temporary partitions"); +// } +// } +// if (!http_req->header(HTTP_NEGATIVE).empty() && http_req->header(HTTP_NEGATIVE) == "true") { +// request.__set_negative(true); +// } else { +// request.__set_negative(false); +// } +// if (!http_req->header(HTTP_STRICT_MODE).empty()) { +// if (iequal(http_req->header(HTTP_STRICT_MODE), "false")) { +// request.__set_strictMode(false); +// } else if (iequal(http_req->header(HTTP_STRICT_MODE), "true")) { +// request.__set_strictMode(true); +// } else { +// return Status::InvalidArgument("Invalid strict mode format. Must be bool type"); +// } +// } +// // timezone first. if not, try time_zone +// if (!http_req->header(HTTP_TIMEZONE).empty()) { +// request.__set_timezone(http_req->header(HTTP_TIMEZONE)); +// } else if (!http_req->header(HTTP_TIME_ZONE).empty()) { +// request.__set_timezone(http_req->header(HTTP_TIME_ZONE)); +// } +// if (!http_req->header(HTTP_EXEC_MEM_LIMIT).empty()) { +// try { +// request.__set_execMemLimit(std::stoll(http_req->header(HTTP_EXEC_MEM_LIMIT))); +// } catch (const std::invalid_argument& e) { +// return Status::InvalidArgument("Invalid mem limit format, {}", e.what()); +// } +// } +// if (!http_req->header(HTTP_JSONPATHS).empty()) { +// request.__set_jsonpaths(http_req->header(HTTP_JSONPATHS)); +// } +// if (!http_req->header(HTTP_JSONROOT).empty()) { +// request.__set_json_root(http_req->header(HTTP_JSONROOT)); +// } +// if (!http_req->header(HTTP_STRIP_OUTER_ARRAY).empty()) { +// if (iequal(http_req->header(HTTP_STRIP_OUTER_ARRAY), "true")) { +// request.__set_strip_outer_array(true); +// } else { +// request.__set_strip_outer_array(false); +// } +// } else { +// request.__set_strip_outer_array(false); +// } +// if (!http_req->header(HTTP_NUM_AS_STRING).empty()) { +// if (iequal(http_req->header(HTTP_NUM_AS_STRING), "true")) { +// request.__set_num_as_string(true); +// } else { +// request.__set_num_as_string(false); +// } +// } else { +// request.__set_num_as_string(false); +// } +// if (!http_req->header(HTTP_FUZZY_PARSE).empty()) { +// if (iequal(http_req->header(HTTP_FUZZY_PARSE), "true")) { +// request.__set_fuzzy_parse(true); +// } else { +// request.__set_fuzzy_parse(false); +// } +// } else { +// request.__set_fuzzy_parse(false); +// } +// +// if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) { +// if (iequal(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) { +// request.__set_read_json_by_line(true); +// } else { +// request.__set_read_json_by_line(false); +// } +// } else { +// request.__set_read_json_by_line(false); +// } +// +// if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) { +// request.__set_sequence_col( +// http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL)); +// } +// +// if (!http_req->header(HTTP_SEND_BATCH_PARALLELISM).empty()) { +// try { +// request.__set_send_batch_parallelism( +// std::stoi(http_req->header(HTTP_SEND_BATCH_PARALLELISM))); +// } catch (const std::invalid_argument& e) { +// return Status::InvalidArgument("send_batch_parallelism must be an integer, {}", +// e.what()); +// } catch (const std::out_of_range& e) { +// return Status::InvalidArgument("send_batch_parallelism out of range, {}", e.what()); +// } +// } +// +// if (!http_req->header(HTTP_LOAD_TO_SINGLE_TABLET).empty()) { +// if (iequal(http_req->header(HTTP_LOAD_TO_SINGLE_TABLET), "true")) { +// request.__set_load_to_single_tablet(true); +// } else { +// request.__set_load_to_single_tablet(false); +// } +// } +// +// if (ctx->timeout_second != -1) { +// request.__set_timeout(ctx->timeout_second); +// } +// request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms); +// TMergeType::type merge_type = TMergeType::APPEND; +// StringCaseMap merge_type_map = {{"APPEND", TMergeType::APPEND}, +// {"DELETE", TMergeType::DELETE}, +// {"MERGE", TMergeType::MERGE}}; +// if (!http_req->header(HTTP_MERGE_TYPE).empty()) { +// std::string merge_type_str = http_req->header(HTTP_MERGE_TYPE); +// auto iter = merge_type_map.find(merge_type_str); +// if (iter != merge_type_map.end()) { +// merge_type = iter->second; +// } else { +// return Status::InvalidArgument("Invalid merge type {}", merge_type_str); +// } +// if (merge_type == TMergeType::MERGE && http_req->header(HTTP_DELETE_CONDITION).empty()) { +// return Status::InvalidArgument("Excepted DELETE ON clause when merge type is MERGE."); +// } else if (merge_type != TMergeType::MERGE && +// !http_req->header(HTTP_DELETE_CONDITION).empty()) { +// return Status::InvalidArgument( +// "Not support DELETE ON clause when merge type is not MERGE."); +// } +// } +// request.__set_merge_type(merge_type); +// if (!http_req->header(HTTP_DELETE_CONDITION).empty()) { +// request.__set_delete_condition(http_req->header(HTTP_DELETE_CONDITION)); +// } +// +// if (!http_req->header(HTTP_MAX_FILTER_RATIO).empty()) { +// ctx->max_filter_ratio = strtod(http_req->header(HTTP_MAX_FILTER_RATIO).c_str(), nullptr); +// request.__set_max_filter_ratio(ctx->max_filter_ratio); +// } +// +// if (!http_req->header(HTTP_HIDDEN_COLUMNS).empty()) { +// request.__set_hidden_columns(http_req->header(HTTP_HIDDEN_COLUMNS)); +// } +// if (!http_req->header(HTTP_TRIM_DOUBLE_QUOTES).empty()) { +// if (iequal(http_req->header(HTTP_TRIM_DOUBLE_QUOTES), "true")) { +// request.__set_trim_double_quotes(true); +// } else { +// request.__set_trim_double_quotes(false); +// } +// } +// if (!http_req->header(HTTP_SKIP_LINES).empty()) { +// request.__set_skip_lines(std::stoi(http_req->header(HTTP_SKIP_LINES))); +// } +// if (!http_req->header(HTTP_ENABLE_PROFILE).empty()) { +// if (iequal(http_req->header(HTTP_ENABLE_PROFILE), "true")) { +// request.__set_enable_profile(true); +// } else { +// request.__set_enable_profile(false); +// } +// } +// +// if (!http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty()) { +// static const StringCaseMap unique_key_update_mode_map = { +// {"UPSERT", TUniqueKeyUpdateMode::UPSERT}, +// {"UPDATE_FIXED_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS}, +// {"UPDATE_FLEXIBLE_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS}}; +// std::string unique_key_update_mode_str = http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE); +// auto iter = unique_key_update_mode_map.find(unique_key_update_mode_str); +// if (iter != unique_key_update_mode_map.end()) { +// TUniqueKeyUpdateMode::type unique_key_update_mode = iter->second; +// if (unique_key_update_mode == TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS) { +// // check constraints when flexible partial update is enabled +// if (ctx->format != TFileFormatType::FORMAT_JSON) { +// return Status::InvalidArgument( +// "flexible partial update only support json format as input file " +// "currently"); +// } +// if (!http_req->header(HTTP_FUZZY_PARSE).empty() && +// iequal(http_req->header(HTTP_FUZZY_PARSE), "true")) { +// return Status::InvalidArgument( +// "Don't support flexible partial update when 'fuzzy_parse' is enabled"); +// } +// if (!http_req->header(HTTP_COLUMNS).empty()) { +// return Status::InvalidArgument( +// "Don't support flexible partial update when 'columns' is specified"); +// } +// if (!http_req->header(HTTP_JSONPATHS).empty()) { +// return Status::InvalidArgument( +// "Don't support flexible partial update when 'jsonpaths' is specified"); +// } +// if (!http_req->header(HTTP_HIDDEN_COLUMNS).empty()) { +// return Status::InvalidArgument( +// "Don't support flexible partial update when 'hidden_columns' is " +// "specified"); +// } +// if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) { +// return Status::InvalidArgument( +// "Don't support flexible partial update when " +// "'function_column.sequence_col' is specified"); +// } +// if (!http_req->header(HTTP_MERGE_TYPE).empty()) { +// return Status::InvalidArgument( +// "Don't support flexible partial update when " +// "'merge_type' is specified"); +// } +// if (!http_req->header(HTTP_WHERE).empty()) { +// return Status::InvalidArgument( +// "Don't support flexible partial update when " +// "'where' is specified"); +// } +// } +// request.__set_unique_key_update_mode(unique_key_update_mode); +// } else { +// return Status::InvalidArgument( +// "Invalid unique_key_partial_mode {}, must be one of 'UPSERT', " +// "'UPDATE_FIXED_COLUMNS' or 'UPDATE_FLEXIBLE_COLUMNS'", +// unique_key_update_mode_str); +// } +// } +// if (http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty() && +// !http_req->header(HTTP_PARTIAL_COLUMNS).empty()) { +// // only consider `partial_columns` parameter when `unique_key_update_mode` is not set +// if (iequal(http_req->header(HTTP_PARTIAL_COLUMNS), "true")) { +// request.__set_unique_key_update_mode(TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS); +// // for backward compatibility +// request.__set_partial_update(true); +// } +// } +// +// if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) { +// bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true"); +// request.__set_memtable_on_sink_node(value); +// } +// if (!http_req->header(HTTP_LOAD_STREAM_PER_NODE).empty()) { +// int value = std::stoi(http_req->header(HTTP_LOAD_STREAM_PER_NODE)); +// request.__set_stream_per_node(value); +// } +// if (ctx->group_commit) { +// if (!http_req->header(HTTP_GROUP_COMMIT).empty()) { +// request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT)); +// } else { +// // used for wait_internal_group_commit_finish +// request.__set_group_commit_mode("sync_mode"); +// } +// } +// +// if (!http_req->header(HTTP_CLOUD_CLUSTER).empty()) { +// request.__set_cloud_cluster(http_req->header(HTTP_CLOUD_CLUSTER)); +// } +// +//#ifndef BE_TEST +// // plan this load +// TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; +// int64_t stream_load_put_start_time = MonotonicNanos(); +// RETURN_IF_ERROR(ThriftRpcHelper::rpc( +// master_addr.hostname, master_addr.port, +// [&request, ctx](FrontendServiceConnection& client) { +// client->streamLoadPut(ctx->put_result, request); +// })); +// ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time; +//#else +// ctx->put_result = k_stream_load_put_result; +//#endif +// Status plan_status(Status::create(ctx->put_result.status)); +// if (!plan_status.ok()) { +// LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << ctx->brief(); +// return plan_status; +// } +// if (config::is_cloud_mode() && ctx->two_phase_commit && ctx->is_mow_table()) { +// return Status::NotSupported("stream load 2pc is unsupported for mow table"); +// } +// if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") { +// // FIXME find a way to avoid chunked stream load write large WALs +// size_t content_length = 0; +// if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { +// try { +// content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); +// } catch (const std::exception& e) { +// return Status::InvalidArgument("invalid HTTP header CONTENT_LENGTH={}: {}", +// http_req->header(HttpHeaders::CONTENT_LENGTH), +// e.what()); +// } +// if (ctx->format == TFileFormatType::FORMAT_CSV_GZ || +// ctx->format == TFileFormatType::FORMAT_CSV_LZO || +// ctx->format == TFileFormatType::FORMAT_CSV_BZ2 || +// ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME || +// ctx->format == TFileFormatType::FORMAT_CSV_LZOP || +// ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK || +// ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) { +// content_length *= 3; +// } +// } +// ctx->put_result.params.__set_content_length(content_length); +// } +// +// VLOG_NOTICE << "params is " << apache::thrift::ThriftDebugString(ctx->put_result.params); +// // if we not use streaming, we must download total content before we begin +// // to process this load +// if (!ctx->use_streaming) { +// return Status::OK(); +// } +// +// return _exec_env->stream_load_executor()->execute_plan_fragment(ctx); +//} + Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* file_path) { std::string prefix; RETURN_IF_ERROR(_exec_env->load_path_mgr()->allocate_dir(req->param(HTTP_DB_KEY), "", &prefix));