Skip to content

Commit

Permalink
remove commit
Browse files Browse the repository at this point in the history
  • Loading branch information
宋光璠 committed Dec 20, 2024
1 parent 487f53a commit d479da7
Showing 1 changed file with 0 additions and 377 deletions.
377 changes: 0 additions & 377 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -816,383 +816,6 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
}

//Status StreamLoadAction::_process_put(HttpRequest* http_req,
// std::shared_ptr<StreamLoadContext> ctx) {
// // Now we use stream
// ctx->use_streaming = LoadUtil::is_format_support_streaming(ctx->format);
//
// // put request
// TStreamLoadPutRequest request;
// set_request_auth(&request, ctx->auth);
// request.db = ctx->db;
// request.tbl = ctx->table;
// request.txnId = ctx->txn_id;
// request.formatType = ctx->format;
// request.__set_compress_type(ctx->compress_type);
// request.__set_header_type(ctx->header_type);
// request.__set_loadId(ctx->id.to_thrift());
// if (ctx->use_streaming) {
// std::shared_ptr<io::StreamLoadPipe> pipe;
// if (ctx->is_chunked_transfer) {
// pipe = std::make_shared<io::StreamLoadPipe>(
// io::kMaxPipeBufferedBytes /* max_buffered_bytes */);
// } else {
// pipe = std::make_shared<io::StreamLoadPipe>(
// io::kMaxPipeBufferedBytes /* max_buffered_bytes */,
// MIN_CHUNK_SIZE /* min_chunk_size */, ctx->body_bytes /* total_length */);
// }
// request.fileType = TFileType::FILE_STREAM;
// ctx->body_sink = pipe;
// ctx->pipe = pipe;
// RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx));
// } else {
// RETURN_IF_ERROR(_data_saved_path(http_req, &request.path));
// auto file_sink = std::make_shared<MessageBodyFileSink>(request.path);
// RETURN_IF_ERROR(file_sink->open());
// request.__isset.path = true;
// request.fileType = TFileType::FILE_LOCAL;
// request.__set_file_size(ctx->body_bytes);
// ctx->body_sink = file_sink;
// }
// if (!http_req->header(HTTP_COLUMNS).empty()) {
// request.__set_columns(http_req->header(HTTP_COLUMNS));
// }
// if (!http_req->header(HTTP_WHERE).empty()) {
// request.__set_where(http_req->header(HTTP_WHERE));
// }
// if (!http_req->header(HTTP_COLUMN_SEPARATOR).empty()) {
// request.__set_columnSeparator(http_req->header(HTTP_COLUMN_SEPARATOR));
// }
// if (!http_req->header(HTTP_LINE_DELIMITER).empty()) {
// request.__set_line_delimiter(http_req->header(HTTP_LINE_DELIMITER));
// }
// if (!http_req->header(HTTP_ENCLOSE).empty() && !http_req->header(HTTP_ENCLOSE).empty()) {
// const auto& enclose_str = http_req->header(HTTP_ENCLOSE);
// if (enclose_str.length() != 1) {
// return Status::InvalidArgument("enclose must be single-char, actually is {}",
// enclose_str);
// }
// request.__set_enclose(http_req->header(HTTP_ENCLOSE)[0]);
// }
// if (!http_req->header(HTTP_ESCAPE).empty() && !http_req->header(HTTP_ESCAPE).empty()) {
// const auto& escape_str = http_req->header(HTTP_ESCAPE);
// if (escape_str.length() != 1) {
// return Status::InvalidArgument("escape must be single-char, actually is {}",
// escape_str);
// }
// request.__set_escape(http_req->header(HTTP_ESCAPE)[0]);
// }
// if (!http_req->header(HTTP_PARTITIONS).empty()) {
// request.__set_partitions(http_req->header(HTTP_PARTITIONS));
// request.__set_isTempPartition(false);
// if (!http_req->header(HTTP_TEMP_PARTITIONS).empty()) {
// return Status::InvalidArgument(
// "Can not specify both partitions and temporary partitions");
// }
// }
// if (!http_req->header(HTTP_TEMP_PARTITIONS).empty()) {
// request.__set_partitions(http_req->header(HTTP_TEMP_PARTITIONS));
// request.__set_isTempPartition(true);
// if (!http_req->header(HTTP_PARTITIONS).empty()) {
// return Status::InvalidArgument(
// "Can not specify both partitions and temporary partitions");
// }
// }
// if (!http_req->header(HTTP_NEGATIVE).empty() && http_req->header(HTTP_NEGATIVE) == "true") {
// request.__set_negative(true);
// } else {
// request.__set_negative(false);
// }
// if (!http_req->header(HTTP_STRICT_MODE).empty()) {
// if (iequal(http_req->header(HTTP_STRICT_MODE), "false")) {
// request.__set_strictMode(false);
// } else if (iequal(http_req->header(HTTP_STRICT_MODE), "true")) {
// request.__set_strictMode(true);
// } else {
// return Status::InvalidArgument("Invalid strict mode format. Must be bool type");
// }
// }
// // timezone first. if not, try time_zone
// if (!http_req->header(HTTP_TIMEZONE).empty()) {
// request.__set_timezone(http_req->header(HTTP_TIMEZONE));
// } else if (!http_req->header(HTTP_TIME_ZONE).empty()) {
// request.__set_timezone(http_req->header(HTTP_TIME_ZONE));
// }
// if (!http_req->header(HTTP_EXEC_MEM_LIMIT).empty()) {
// try {
// request.__set_execMemLimit(std::stoll(http_req->header(HTTP_EXEC_MEM_LIMIT)));
// } catch (const std::invalid_argument& e) {
// return Status::InvalidArgument("Invalid mem limit format, {}", e.what());
// }
// }
// if (!http_req->header(HTTP_JSONPATHS).empty()) {
// request.__set_jsonpaths(http_req->header(HTTP_JSONPATHS));
// }
// if (!http_req->header(HTTP_JSONROOT).empty()) {
// request.__set_json_root(http_req->header(HTTP_JSONROOT));
// }
// if (!http_req->header(HTTP_STRIP_OUTER_ARRAY).empty()) {
// if (iequal(http_req->header(HTTP_STRIP_OUTER_ARRAY), "true")) {
// request.__set_strip_outer_array(true);
// } else {
// request.__set_strip_outer_array(false);
// }
// } else {
// request.__set_strip_outer_array(false);
// }
// if (!http_req->header(HTTP_NUM_AS_STRING).empty()) {
// if (iequal(http_req->header(HTTP_NUM_AS_STRING), "true")) {
// request.__set_num_as_string(true);
// } else {
// request.__set_num_as_string(false);
// }
// } else {
// request.__set_num_as_string(false);
// }
// if (!http_req->header(HTTP_FUZZY_PARSE).empty()) {
// if (iequal(http_req->header(HTTP_FUZZY_PARSE), "true")) {
// request.__set_fuzzy_parse(true);
// } else {
// request.__set_fuzzy_parse(false);
// }
// } else {
// request.__set_fuzzy_parse(false);
// }
//
// if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) {
// if (iequal(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) {
// request.__set_read_json_by_line(true);
// } else {
// request.__set_read_json_by_line(false);
// }
// } else {
// request.__set_read_json_by_line(false);
// }
//
// if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) {
// request.__set_sequence_col(
// http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL));
// }
//
// if (!http_req->header(HTTP_SEND_BATCH_PARALLELISM).empty()) {
// try {
// request.__set_send_batch_parallelism(
// std::stoi(http_req->header(HTTP_SEND_BATCH_PARALLELISM)));
// } catch (const std::invalid_argument& e) {
// return Status::InvalidArgument("send_batch_parallelism must be an integer, {}",
// e.what());
// } catch (const std::out_of_range& e) {
// return Status::InvalidArgument("send_batch_parallelism out of range, {}", e.what());
// }
// }
//
// if (!http_req->header(HTTP_LOAD_TO_SINGLE_TABLET).empty()) {
// if (iequal(http_req->header(HTTP_LOAD_TO_SINGLE_TABLET), "true")) {
// request.__set_load_to_single_tablet(true);
// } else {
// request.__set_load_to_single_tablet(false);
// }
// }
//
// if (ctx->timeout_second != -1) {
// request.__set_timeout(ctx->timeout_second);
// }
// request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms);
// TMergeType::type merge_type = TMergeType::APPEND;
// StringCaseMap<TMergeType::type> merge_type_map = {{"APPEND", TMergeType::APPEND},
// {"DELETE", TMergeType::DELETE},
// {"MERGE", TMergeType::MERGE}};
// if (!http_req->header(HTTP_MERGE_TYPE).empty()) {
// std::string merge_type_str = http_req->header(HTTP_MERGE_TYPE);
// auto iter = merge_type_map.find(merge_type_str);
// if (iter != merge_type_map.end()) {
// merge_type = iter->second;
// } else {
// return Status::InvalidArgument("Invalid merge type {}", merge_type_str);
// }
// if (merge_type == TMergeType::MERGE && http_req->header(HTTP_DELETE_CONDITION).empty()) {
// return Status::InvalidArgument("Excepted DELETE ON clause when merge type is MERGE.");
// } else if (merge_type != TMergeType::MERGE &&
// !http_req->header(HTTP_DELETE_CONDITION).empty()) {
// return Status::InvalidArgument(
// "Not support DELETE ON clause when merge type is not MERGE.");
// }
// }
// request.__set_merge_type(merge_type);
// if (!http_req->header(HTTP_DELETE_CONDITION).empty()) {
// request.__set_delete_condition(http_req->header(HTTP_DELETE_CONDITION));
// }
//
// if (!http_req->header(HTTP_MAX_FILTER_RATIO).empty()) {
// ctx->max_filter_ratio = strtod(http_req->header(HTTP_MAX_FILTER_RATIO).c_str(), nullptr);
// request.__set_max_filter_ratio(ctx->max_filter_ratio);
// }
//
// if (!http_req->header(HTTP_HIDDEN_COLUMNS).empty()) {
// request.__set_hidden_columns(http_req->header(HTTP_HIDDEN_COLUMNS));
// }
// if (!http_req->header(HTTP_TRIM_DOUBLE_QUOTES).empty()) {
// if (iequal(http_req->header(HTTP_TRIM_DOUBLE_QUOTES), "true")) {
// request.__set_trim_double_quotes(true);
// } else {
// request.__set_trim_double_quotes(false);
// }
// }
// if (!http_req->header(HTTP_SKIP_LINES).empty()) {
// request.__set_skip_lines(std::stoi(http_req->header(HTTP_SKIP_LINES)));
// }
// if (!http_req->header(HTTP_ENABLE_PROFILE).empty()) {
// if (iequal(http_req->header(HTTP_ENABLE_PROFILE), "true")) {
// request.__set_enable_profile(true);
// } else {
// request.__set_enable_profile(false);
// }
// }
//
// if (!http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty()) {
// static const StringCaseMap<TUniqueKeyUpdateMode::type> unique_key_update_mode_map = {
// {"UPSERT", TUniqueKeyUpdateMode::UPSERT},
// {"UPDATE_FIXED_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS},
// {"UPDATE_FLEXIBLE_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS}};
// std::string unique_key_update_mode_str = http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE);
// auto iter = unique_key_update_mode_map.find(unique_key_update_mode_str);
// if (iter != unique_key_update_mode_map.end()) {
// TUniqueKeyUpdateMode::type unique_key_update_mode = iter->second;
// if (unique_key_update_mode == TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS) {
// // check constraints when flexible partial update is enabled
// if (ctx->format != TFileFormatType::FORMAT_JSON) {
// return Status::InvalidArgument(
// "flexible partial update only support json format as input file "
// "currently");
// }
// if (!http_req->header(HTTP_FUZZY_PARSE).empty() &&
// iequal(http_req->header(HTTP_FUZZY_PARSE), "true")) {
// return Status::InvalidArgument(
// "Don't support flexible partial update when 'fuzzy_parse' is enabled");
// }
// if (!http_req->header(HTTP_COLUMNS).empty()) {
// return Status::InvalidArgument(
// "Don't support flexible partial update when 'columns' is specified");
// }
// if (!http_req->header(HTTP_JSONPATHS).empty()) {
// return Status::InvalidArgument(
// "Don't support flexible partial update when 'jsonpaths' is specified");
// }
// if (!http_req->header(HTTP_HIDDEN_COLUMNS).empty()) {
// return Status::InvalidArgument(
// "Don't support flexible partial update when 'hidden_columns' is "
// "specified");
// }
// if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) {
// return Status::InvalidArgument(
// "Don't support flexible partial update when "
// "'function_column.sequence_col' is specified");
// }
// if (!http_req->header(HTTP_MERGE_TYPE).empty()) {
// return Status::InvalidArgument(
// "Don't support flexible partial update when "
// "'merge_type' is specified");
// }
// if (!http_req->header(HTTP_WHERE).empty()) {
// return Status::InvalidArgument(
// "Don't support flexible partial update when "
// "'where' is specified");
// }
// }
// request.__set_unique_key_update_mode(unique_key_update_mode);
// } else {
// return Status::InvalidArgument(
// "Invalid unique_key_partial_mode {}, must be one of 'UPSERT', "
// "'UPDATE_FIXED_COLUMNS' or 'UPDATE_FLEXIBLE_COLUMNS'",
// unique_key_update_mode_str);
// }
// }
// if (http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty() &&
// !http_req->header(HTTP_PARTIAL_COLUMNS).empty()) {
// // only consider `partial_columns` parameter when `unique_key_update_mode` is not set
// if (iequal(http_req->header(HTTP_PARTIAL_COLUMNS), "true")) {
// request.__set_unique_key_update_mode(TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS);
// // for backward compatibility
// request.__set_partial_update(true);
// }
// }
//
// if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) {
// bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true");
// request.__set_memtable_on_sink_node(value);
// }
// if (!http_req->header(HTTP_LOAD_STREAM_PER_NODE).empty()) {
// int value = std::stoi(http_req->header(HTTP_LOAD_STREAM_PER_NODE));
// request.__set_stream_per_node(value);
// }
// if (ctx->group_commit) {
// if (!http_req->header(HTTP_GROUP_COMMIT).empty()) {
// request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT));
// } else {
// // used for wait_internal_group_commit_finish
// request.__set_group_commit_mode("sync_mode");
// }
// }
//
// if (!http_req->header(HTTP_CLOUD_CLUSTER).empty()) {
// request.__set_cloud_cluster(http_req->header(HTTP_CLOUD_CLUSTER));
// }
//
//#ifndef BE_TEST
// // plan this load
// TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
// int64_t stream_load_put_start_time = MonotonicNanos();
// RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
// master_addr.hostname, master_addr.port,
// [&request, ctx](FrontendServiceConnection& client) {
// client->streamLoadPut(ctx->put_result, request);
// }));
// ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time;
//#else
// ctx->put_result = k_stream_load_put_result;
//#endif
// Status plan_status(Status::create(ctx->put_result.status));
// if (!plan_status.ok()) {
// LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << ctx->brief();
// return plan_status;
// }
// if (config::is_cloud_mode() && ctx->two_phase_commit && ctx->is_mow_table()) {
// return Status::NotSupported("stream load 2pc is unsupported for mow table");
// }
// if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
// // FIXME find a way to avoid chunked stream load write large WALs
// size_t content_length = 0;
// if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
// try {
// content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
// } catch (const std::exception& e) {
// return Status::InvalidArgument("invalid HTTP header CONTENT_LENGTH={}: {}",
// http_req->header(HttpHeaders::CONTENT_LENGTH),
// e.what());
// }
// if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
// ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
// ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
// ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
// ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
// ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
// ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
// content_length *= 3;
// }
// }
// ctx->put_result.params.__set_content_length(content_length);
// }
//
// VLOG_NOTICE << "params is " << apache::thrift::ThriftDebugString(ctx->put_result.params);
// // if we not use streaming, we must download total content before we begin
// // to process this load
// if (!ctx->use_streaming) {
// return Status::OK();
// }
//
// return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
//}

Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* file_path) {
std::string prefix;
RETURN_IF_ERROR(_exec_env->load_path_mgr()->allocate_dir(req->param(HTTP_DB_KEY), "", &prefix));
Expand Down

0 comments on commit d479da7

Please sign in to comment.