diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index b525721cfd8e3e..67d5c6f655bab6 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -232,6 +232,8 @@ DEFINE_mInt32(max_download_speed_kbps, "50000"); DEFINE_mInt32(download_low_speed_limit_kbps, "50"); // download low speed time(seconds) DEFINE_mInt32(download_low_speed_time, "300"); +// whether to download small files in batch +DEFINE_mBool(enable_batch_download, "false"); DEFINE_String(sys_log_dir, ""); DEFINE_String(user_function_dir, "${DORIS_HOME}/lib/udf"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 3431dfa6450449..64013cdb683cff 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -282,6 +282,8 @@ DECLARE_mInt32(max_download_speed_kbps); DECLARE_mInt32(download_low_speed_limit_kbps); // download low speed time(seconds) DECLARE_mInt32(download_low_speed_time); +// whether to download small files in batch. +DECLARE_mBool(enable_batch_download); // deprecated, use env var LOG_DIR in be.conf DECLARE_String(sys_log_dir); diff --git a/be/src/gutil/strings/stringpiece.h b/be/src/gutil/strings/stringpiece.h index 38e36a27099279..7a4ebabbf098e7 100644 --- a/be/src/gutil/strings/stringpiece.h +++ b/be/src/gutil/strings/stringpiece.h @@ -149,6 +149,12 @@ class StringPiece { assert(length <= static_cast(std::numeric_limits::max())); length_ = static_cast(length); } + StringPiece(std::string_view view) // NOLINT(runtime/explicit) + : ptr_(view.data()), length_(0) { + size_t length = view.size(); + assert(length <= static_cast(std::numeric_limits::max())); + length_ = static_cast(length); + } StringPiece(const char* offset, int len) : ptr_(offset), length_(len) { assert(len >= 0); } // Substring of another StringPiece. diff --git a/be/src/http/action/batch_download_action.cpp b/be/src/http/action/batch_download_action.cpp new file mode 100644 index 00000000000000..d486883e90be28 --- /dev/null +++ b/be/src/http/action/batch_download_action.cpp @@ -0,0 +1,216 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "http/action/batch_download_action.h" + +#include +#include +#include +#include + +#include "common/config.h" +#include "common/logging.h" +#include "common/status.h" +#include "gutil/strings/split.h" +#include "http/http_channel.h" +#include "http/http_method.h" +#include "http/http_request.h" +#include "http/utils.h" +#include "io/fs/local_file_system.h" +#include "runtime/exec_env.h" +#include "util/security.h" + +namespace doris { +namespace { +const std::string CHECK_PARAMETER = "check"; +const std::string LIST_PARAMETER = "list"; +const std::string DIR_PARAMETER = "dir"; +const std::string TOKEN_PARAMETER = "token"; +} // namespace + +BatchDownloadAction::BatchDownloadAction( + ExecEnv* exec_env, std::shared_ptr rate_limit_group, + const std::vector& allow_dirs) + : HttpHandlerWithAuth(exec_env), _rate_limit_group(std::move(rate_limit_group)) { + for (const auto& dir : allow_dirs) { + std::string p; + Status st = io::global_local_filesystem()->canonicalize(dir, &p); + if (!st.ok()) { + continue; + } + _allow_paths.emplace_back(std::move(p)); + } +} + +void BatchDownloadAction::handle(HttpRequest* req) { + if (VLOG_CRITICAL_IS_ON) { + VLOG_CRITICAL << "accept one batch download request " << req->debug_string(); + } + + if (req->param(CHECK_PARAMETER) == "true") { + // For API support check + HttpChannel::send_reply(req, "OK"); + return; + } + + // Get 'dir' parameter, then assembly file absolute path + const std::string& dir_path = req->param(DIR_PARAMETER); + if (dir_path.empty()) { + std::string error_msg = + std::string("parameter " + DIR_PARAMETER + " not specified in url."); + LOG(WARNING) << "handle batch download request: " << error_msg + << ", url: " << mask_token(req->uri()); + HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, error_msg); + return; + } + + if (dir_path.find("..") != std::string::npos) { + std::string error_msg = "Not allowed to read relative path: " + dir_path; + LOG(WARNING) << "handle batch download request: " << error_msg + << ", url: " << mask_token(req->uri()); + HttpChannel::send_reply(req, HttpStatus::FORBIDDEN, error_msg); + return; + } + + Status status; + if (config::enable_token_check) { + status = _check_token(req); + if (!status.ok()) { + std::string error_msg = status.to_string(); + if (status.is()) { + HttpChannel::send_reply(req, HttpStatus::UNAUTHORIZED, error_msg); + return; + } else { + HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, error_msg); + return; + } + } + } + + status = _check_path_is_allowed(dir_path); + if (!status.ok()) { + std::string error_msg = status.to_string(); + if (status.is() || status.is()) { + HttpChannel::send_reply(req, HttpStatus::NOT_FOUND, error_msg); + return; + } else if (status.is()) { + HttpChannel::send_reply(req, HttpStatus::UNAUTHORIZED, error_msg); + return; + } else { + HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, error_msg); + return; + } + } + + bool is_dir = false; + status = io::global_local_filesystem()->is_directory(dir_path, &is_dir); + if (!status.ok()) { + LOG(WARNING) << "handle batch download request: " << status.to_string() + << ", url: " << mask_token(req->uri()); + HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, status.to_string()); + return; + } + + if (!is_dir) { + std::string error_msg = fmt::format("The requested path is not a directory: {}", dir_path); + LOG(WARNING) << "handle batch download request: " << error_msg + << ", url: " << mask_token(req->uri()); + HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, error_msg); + return; + } + + _handle(req, dir_path); + + VLOG_CRITICAL << "deal with batch download request finished! "; +} + +void BatchDownloadAction::_handle(HttpRequest* req, const std::string& dir_path) { + bool is_list_request = req->param(LIST_PARAMETER) == "true"; + if (is_list_request) { + // return the list of files in the specified directory + bool is_acquire_filesize = true; + do_dir_response(dir_path, req, is_acquire_filesize); + } else { + _handle_batch_download(req, dir_path); + } +} + +void BatchDownloadAction::_handle_batch_download(HttpRequest* req, const std::string& dir_path) { + std::vector files = + strings::Split(req->get_request_body(), "\n", strings::SkipWhitespace()); + if (files.empty()) { + std::string error_msg = "No file specified in request body."; + LOG(WARNING) << "handle batch download request: " << error_msg + << ", url: " << mask_token(req->uri()); + HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, error_msg); + return; + } + + if (files.size() > 64) { + std::string error_msg = + "The number of files to download in a batch should be less than 64."; + LOG(WARNING) << "handle batch download request: " << error_msg + << ", url: " << mask_token(req->uri()); + HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, error_msg); + return; + } + + for (const auto& file : files) { + if (file.find('/') != std::string::npos) { + std::string error_msg = + fmt::format("Not allowed to read relative path: {}, dir: {}", file, dir_path); + LOG(WARNING) << "handle batch download request: " << error_msg + << ", url: " << mask_token(req->uri()); + HttpChannel::send_reply(req, HttpStatus::FORBIDDEN, error_msg); + return; + } + } + + HttpChannel::send_files(req, dir_path, std::move(files)); +} + +Status BatchDownloadAction::_check_token(HttpRequest* req) { + const std::string& token_str = req->param(TOKEN_PARAMETER); + if (token_str.empty()) { + LOG(WARNING) << "token is not specified in request. url: " << mask_token(req->uri()); + return Status::NotAuthorized("token is not specified."); + } + + const std::string& local_token = _exec_env->token(); + if (token_str != local_token) { + LOG(WARNING) << "invalid download token: " << mask_token(token_str) + << ", local token: " << mask_token(local_token) + << ", url: " << mask_token(req->uri()); + return Status::NotAuthorized("invalid token {}", mask_token(token_str)); + } + + return Status::OK(); +} + +Status BatchDownloadAction::_check_path_is_allowed(const std::string& file_path) { + std::string canonical_file_path; + RETURN_IF_ERROR(io::global_local_filesystem()->canonicalize(file_path, &canonical_file_path)); + for (auto& allow_path : _allow_paths) { + if (io::LocalFileSystem::contain_path(allow_path, canonical_file_path)) { + return Status::OK(); + } + } + + return Status::NotAuthorized("file path is not allowed: {}", canonical_file_path); +} + +} // end namespace doris diff --git a/be/src/http/action/batch_download_action.h b/be/src/http/action/batch_download_action.h new file mode 100644 index 00000000000000..f0b7e3576b9937 --- /dev/null +++ b/be/src/http/action/batch_download_action.h @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "common/status.h" +#include "http/http_handler.h" +#include "http/http_handler_with_auth.h" +#include "util/threadpool.h" + +struct bufferevent_rate_limit_group; + +namespace doris { + +class ExecEnv; +class HttpRequest; + +// A simple handler that serves incoming HTTP requests of batching file-download to send their +// respective HTTP responses. +// +// We use parameter named 'dir' to specify the static resource path, it is an absolute path. +// +// In HEAD request, then this handler will return the list of files in the specified directory. +// +// In GET request, the file names to download are specified in the request body as a list of strings, +// separated by '\n'. To avoid cost resource, the maximum number of files to download in a batch is 64. +class BatchDownloadAction : public HttpHandlerWithAuth { +public: + BatchDownloadAction(ExecEnv* exec_env, + std::shared_ptr rate_limit_group, + const std::vector& allow_dirs); + + ~BatchDownloadAction() override = default; + + void handle(HttpRequest* req) override; + +private: + Status _check_token(HttpRequest* req); + Status _check_path_is_allowed(const std::string& path); + + void _handle(HttpRequest* req, const std::string& dir_path); + void _handle_batch_download(HttpRequest* req, const std::string& dir_path); + + std::vector _allow_paths; + std::shared_ptr _rate_limit_group; +}; + +} // end namespace doris diff --git a/be/src/http/action/download_binlog_action.cpp b/be/src/http/action/download_binlog_action.cpp index 54701c5e463481..372f840401c4ad 100644 --- a/be/src/http/action/download_binlog_action.cpp +++ b/be/src/http/action/download_binlog_action.cpp @@ -21,11 +21,9 @@ #include #include -#include #include #include #include -#include #include "common/config.h" #include "common/logging.h" @@ -34,7 +32,6 @@ #include "http/utils.h" #include "io/fs/local_file_system.h" #include "olap/storage_engine.h" -#include "olap/tablet.h" #include "olap/tablet_manager.h" #include "runtime/exec_env.h" diff --git a/be/src/http/http_channel.cpp b/be/src/http/http_channel.cpp index 96679195316dac..312f1ab9286909 100644 --- a/be/src/http/http_channel.cpp +++ b/be/src/http/http_channel.cpp @@ -20,8 +20,8 @@ #include #include #include +#include -#include #include #include #include @@ -57,7 +57,7 @@ void HttpChannel::send_reply(HttpRequest* request, HttpStatus status) { } void HttpChannel::send_reply(HttpRequest* request, HttpStatus status, const std::string& content) { - auto evb = evbuffer_new(); + auto* evb = evbuffer_new(); std::string compressed_content; if (compress_content(request->header(HttpHeaders::ACCEPT_ENCODING), content, &compressed_content)) { @@ -72,7 +72,7 @@ void HttpChannel::send_reply(HttpRequest* request, HttpStatus status, const std: void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t size, bufferevent_rate_limit_group* rate_limit_group) { - auto evb = evbuffer_new(); + auto* evb = evbuffer_new(); evbuffer_add_file(evb, fd, off, size); auto* evhttp_request = request->get_evhttp_request(); if (rate_limit_group) { @@ -84,6 +84,56 @@ void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t siz evbuffer_free(evb); } +void HttpChannel::send_files(HttpRequest* request, const std::string& root_dir, + std::vector local_files, + bufferevent_rate_limit_group* rate_limit_group) { + if (rate_limit_group) { + auto* evhttp_request = request->get_evhttp_request(); + auto* evhttp_connection = evhttp_request_get_connection(evhttp_request); + auto* buffer_event = evhttp_connection_get_bufferevent(evhttp_connection); + bufferevent_add_to_rate_limit_group(buffer_event, rate_limit_group); + } + + send_files(request, root_dir, std::move(local_files)); +} + +void HttpChannel::send_files(HttpRequest* request, const std::string& root_dir, + std::vector local_files) { + std::unique_ptr evb(evbuffer_new(), &evbuffer_free); + for (const std::string& file : local_files) { + std::string file_path = fmt::format("{}/{}", root_dir, file); + int fd = open(file_path.c_str(), O_RDONLY); + if (fd < 0) { + std::string error_msg = "Failed to open file: " + file_path; + LOG(WARNING) << "http channel send files: " << error_msg; + HttpChannel::send_reply(request, HttpStatus::NOT_FOUND, error_msg); + return; + } + struct stat st; + auto res = fstat(fd, &st); + if (res < 0) { + close(fd); + std::string error_msg = "Failed to open file: " + file_path; + LOG(WARNING) << "http channel send files: " << error_msg; + HttpChannel::send_reply(request, HttpStatus::NOT_FOUND, error_msg); + return; + } + + int64_t file_size = st.st_size; + VLOG_DEBUG << "http channel send file " << file_path << ", size: " << file_size; + + evbuffer_add_printf(evb.get(), "File-Name: %s\r\n", file.c_str()); + evbuffer_add_printf(evb.get(), "Content-Length: %ld\r\n", file_size); + evbuffer_add_printf(evb.get(), "\r\n"); + if (file_size > 0) { + evbuffer_add_file(evb.get(), fd, 0, file_size); + } + } + + evhttp_send_reply(request->get_evhttp_request(), HttpStatus::OK, + default_reason(HttpStatus::OK).c_str(), evb.get()); +} + bool HttpChannel::compress_content(const std::string& accept_encoding, const std::string& input, std::string* output) { // Don't bother compressing empty content. diff --git a/be/src/http/http_channel.h b/be/src/http/http_channel.h index ee1e6c0888f1d3..0d5e5d4260af8c 100644 --- a/be/src/http/http_channel.h +++ b/be/src/http/http_channel.h @@ -20,6 +20,7 @@ #include #include +#include #include "http/http_status.h" @@ -47,6 +48,13 @@ class HttpChannel { static void send_file(HttpRequest* request, int fd, size_t off, size_t size, bufferevent_rate_limit_group* rate_limit_group = nullptr); + static void send_files(HttpRequest* request, const std::string& root_dir, + std::vector local_files, + bufferevent_rate_limit_group* rate_limit_group); + + static void send_files(HttpRequest* request, const std::string& root_dir, + std::vector local_files); + static bool compress_content(const std::string& accept_encoding, const std::string& input, std::string* output); }; diff --git a/be/src/http/http_client.cpp b/be/src/http/http_client.cpp index fc4c997fce8397..767377cea3f365 100644 --- a/be/src/http/http_client.cpp +++ b/be/src/http/http_client.cpp @@ -24,14 +24,225 @@ #include #include "common/config.h" +#include "common/status.h" #include "http/http_headers.h" -#include "http/http_status.h" #include "runtime/exec_env.h" #include "util/security.h" #include "util/stack_util.h" namespace doris { +class MultiFileSplitter { +public: + MultiFileSplitter(std::string local_dir, std::unordered_set expected_files) + : _local_dir_path(std::move(local_dir)), _expected_files(std::move(expected_files)) {} + ~MultiFileSplitter() { + if (_fd >= 0) { + close(_fd); + } + + if (!_status.ok() && !downloaded_files.empty()) { + LOG(WARNING) << "download files to " << _local_dir_path << " failed, try remove the " + << downloaded_files.size() << " downloaded files"; + for (const auto& file : downloaded_files) { + remove(file.c_str()); + } + } + } + + bool append(const char* data, size_t length) { + // Already failed. + if (!_status.ok()) { + return false; + } + + std::string buf; + if (!_buffer.empty()) { + buf.swap(_buffer); + buf.append(data, length); + data = buf.data(); + length = buf.size(); + } + return append_inner(data, length); + } + + Status finish() { + if (_status.ok()) { + _status = finish_inner(); + } + + return _status; + } + +private: + bool append_inner(const char* data, size_t length) { + while (length > 0) { + int consumed = 0; + if (_is_reading_header) { + consumed = parse_header(data, length); + } else { + consumed = append_file(data, length); + } + + if (consumed < 0) { + return false; + } + + DCHECK(consumed <= length); + data += consumed; + length -= consumed; + } + return true; + } + + int parse_header(const char* data, size_t length) { + DCHECK(_fd < 0); + + std::string_view buf(data, length); + size_t pos = buf.find("\r\n\r\n"); + if (pos == std::string::npos) { + _buffer.append(data, length); + return static_cast(length); + } + + // header already read. + _is_reading_header = false; + + bool has_file_name = false; + bool has_file_size = false; + std::string_view header = buf.substr(0, pos); + std::vector headers = + strings::Split(header, "\r\n", strings::SkipWhitespace()); + for (auto& s : headers) { + size_t header_pos = s.find(':'); + if (header_pos == std::string::npos) { + continue; + } + std::string_view header_view(s); + std::string_view key = header_view.substr(0, header_pos); + std::string_view value = header_view.substr(header_pos + 1); + if (value.starts_with(' ')) { + value.remove_prefix(std::min(value.find_first_not_of(' '), value.size())); + } + if (key == "File-Name") { + _file_name = value; + has_file_name = true; + } else if (key == "Content-Length") { + auto res = std::from_chars(value.data(), value.data() + value.size(), _file_size); + if (res.ec != std::errc()) { + std::string error_msg = fmt::format("invalid content length: {}", value); + LOG(WARNING) << "download files to " << _local_dir_path + << "failed, err=" << error_msg; + _status = Status::HttpError(std::move(error_msg)); + return -1; + } + has_file_size = true; + } + } + + if (!has_file_name || !has_file_size) { + std::string error_msg = + fmt::format("invalid multi part header, has file name: {}, has file size: {}", + has_file_name, has_file_size); + LOG(WARNING) << "download files to " << _local_dir_path << "failed, err=" << error_msg; + _status = Status::HttpError(std::move(error_msg)); + return -1; + } + + if (!_expected_files.contains(_file_name)) { + std::string error_msg = fmt::format("unexpected file: {}", _file_name); + LOG(WARNING) << "download files to " << _local_dir_path << "failed, err=" << error_msg; + _status = Status::HttpError(std::move(error_msg)); + return -1; + } + + VLOG_DEBUG << "receive file " << _file_name << ", size " << _file_size; + + _written_size = 0; + _local_file_path = fmt::format("{}/{}", _local_dir_path, _file_name); + _fd = open(_local_file_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644); + if (_fd < 0) { + std::string error_msg = "fail to open file to write: " + _local_file_path; + LOG(WARNING) << "download files to " << _local_dir_path << "failed, err=" << error_msg; + _status = Status::IOError(std::move(error_msg)); + return -1; + } + downloaded_files.push_back(_local_file_path); + + return static_cast(pos + 4); + } + + int append_file(const char* data, size_t length) { + DCHECK(_fd >= 0); + DCHECK(_file_size >= _written_size); + + size_t write_size = std::min(length, _file_size - _written_size); + if (write_size > 0 && write(_fd, data, write_size) < 0) { + auto msg = fmt::format("write file failed, file={}, error={}", _local_file_path, + strerror(errno)); + LOG(WARNING) << "download files to " << _local_dir_path << "failed, err=" << msg; + _status = Status::HttpError(std::move(msg)); + return -1; + } + + _written_size += write_size; + if (_written_size == _file_size) { + // This file has been downloaded, switch to the next one. + switchToNextFile(); + } + + return write_size; + } + + Status finish_inner() { + if (!_is_reading_header && _written_size == _file_size) { + switchToNextFile(); + } + + if (_fd >= 0) { + // This file is not completely downloaded. + close(_fd); + _fd = -1; + auto error_msg = fmt::format("file {} is not completely downloaded", _local_file_path); + LOG(WARNING) << "download files to " << _local_dir_path << "failed, err=" << error_msg; + return Status::HttpError(std::move(error_msg)); + } + + if (!_expected_files.empty()) { + auto error_msg = fmt::format("not all files are downloaded, {} missing files", + _expected_files.size()); + LOG(WARNING) << "download files to " << _local_dir_path << "failed, err=" << error_msg; + return Status::HttpError(std::move(error_msg)); + } + + downloaded_files.clear(); + return Status::OK(); + } + + void switchToNextFile() { + DCHECK(_fd >= 0); + DCHECK(_written_size == _file_size); + + close(_fd); + _fd = -1; + _expected_files.erase(_file_name); + _is_reading_header = true; + } + + const std::string _local_dir_path; + std::string _buffer; + std::unordered_set _expected_files; + Status _status; + + bool _is_reading_header = true; + int _fd = -1; + std::string _local_file_path; + std::string _file_name; + size_t _file_size = 0; + size_t _written_size = 0; + std::vector downloaded_files; +}; + static const char* header_error_msg(CURLHcode code) { switch (code) { case CURLHE_OK: @@ -174,6 +385,12 @@ void HttpClient::set_method(HttpMethod method) { } } +void HttpClient::set_speed_limit() { + curl_easy_setopt(_curl, CURLOPT_LOW_SPEED_LIMIT, config::download_low_speed_limit_kbps * 1024); + curl_easy_setopt(_curl, CURLOPT_LOW_SPEED_TIME, config::download_low_speed_time); + curl_easy_setopt(_curl, CURLOPT_MAX_RECV_SPEED_LARGE, config::max_download_speed_kbps * 1024); +} + size_t HttpClient::on_response_data(const void* data, size_t length) { if (*_callback != nullptr) { bool is_continue = (*_callback)(data, length); @@ -184,12 +401,6 @@ size_t HttpClient::on_response_data(const void* data, size_t length) { return length; } -// Status HttpClient::execute_post_request(const std::string& post_data, const std::function& callback = {}) { -// _callback = &callback; -// set_post_body(post_data); -// return execute(callback); -// } - Status HttpClient::execute_post_request(const std::string& payload, std::string* response) { set_method(POST); set_payload(payload); @@ -234,14 +445,8 @@ Status HttpClient::get_content_md5(std::string* md5) const { } Status HttpClient::download(const std::string& local_path) { - // set method to GET set_method(GET); - - // TODO(zc) Move this download speed limit outside to limit download speed - // at system level - curl_easy_setopt(_curl, CURLOPT_LOW_SPEED_LIMIT, config::download_low_speed_limit_kbps * 1024); - curl_easy_setopt(_curl, CURLOPT_LOW_SPEED_TIME, config::download_low_speed_time); - curl_easy_setopt(_curl, CURLOPT_MAX_RECV_SPEED_LARGE, config::max_download_speed_kbps * 1024); + set_speed_limit(); auto fp_closer = [](FILE* fp) { fclose(fp); }; std::unique_ptr fp(fopen(local_path.c_str(), "w"), fp_closer); @@ -270,6 +475,20 @@ Status HttpClient::download(const std::string& local_path) { return status; } +Status HttpClient::download_multi_files(const std::string& local_dir, + const std::unordered_set& expected_files) { + set_speed_limit(); + + MultiFileSplitter splitter(local_dir, expected_files); + auto callback = [&](const void* data, size_t length) { + return splitter.append(reinterpret_cast(data), length); + }; + if (auto s = execute(callback); !s.ok()) { + return s; + } + return splitter.finish(); +} + Status HttpClient::execute(std::string* response) { auto callback = [response](const void* data, size_t length) { response->append((char*)data, length); diff --git a/be/src/http/http_client.h b/be/src/http/http_client.h index c0c8863a9b06d4..a6f2f4fdff514b 100644 --- a/be/src/http/http_client.h +++ b/be/src/http/http_client.h @@ -24,6 +24,7 @@ #include #include #include +#include #include "common/status.h" #include "http/http_headers.h" @@ -81,6 +82,8 @@ class HttpClient { curl_easy_setopt(_curl, CURLOPT_SSL_VERIFYHOST, 0L); } + void set_speed_limit(); + // TODO(zc): support set header // void set_header(const std::string& key, const std::string& value) { // _cntl.http_request().SetHeader(key, value); @@ -141,6 +144,8 @@ class HttpClient { // helper function to download a file, you can call this function to download // a file to local_path Status download(const std::string& local_path); + Status download_multi_files(const std::string& local_dir, + const std::unordered_set& expected_files); Status execute_post_request(const std::string& payload, std::string* response); diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp index f91610476b4dc9..ee7a78113e555a 100644 --- a/be/src/http/utils.cpp +++ b/be/src/http/utils.cpp @@ -23,6 +23,8 @@ #include #include +#include +#include #include #include "common/config.h" @@ -30,6 +32,7 @@ #include "common/status.h" #include "common/utils.h" #include "http/http_channel.h" +#include "http/http_client.h" #include "http/http_common.h" #include "http/http_headers.h" #include "http/http_method.h" @@ -41,10 +44,15 @@ #include "runtime/exec_env.h" #include "util/md5.h" #include "util/path_util.h" +#include "util/security.h" #include "util/url_coding.h" namespace doris { +const uint32_t CHECK_SUPPORT_TIMEOUT = 3; +const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3; +const uint32_t LIST_REMOTE_FILE_TIMEOUT = 15; + std::string encode_basic_auth(const std::string& user, const std::string& passwd) { std::string auth = user + ":" + passwd; std::string encoded_auth; @@ -190,20 +198,26 @@ void do_file_response(const std::string& file_path, HttpRequest* req, HttpChannel::send_file(req, fd, 0, file_size, rate_limit_group); } -void do_dir_response(const std::string& dir_path, HttpRequest* req) { +void do_dir_response(const std::string& dir_path, HttpRequest* req, bool is_acquire_filesize) { bool exists = true; std::vector files; Status st = io::global_local_filesystem()->list(dir_path, true, &files, &exists); if (!st.ok()) { LOG(WARNING) << "Failed to scan dir. " << st; HttpChannel::send_error(req, HttpStatus::INTERNAL_SERVER_ERROR); + return; } + VLOG_DEBUG << "list dir: " << dir_path << ", file count: " << files.size(); + const std::string FILE_DELIMITER_IN_DIR_RESPONSE = "\n"; std::stringstream result; for (auto& file : files) { result << file.file_name << FILE_DELIMITER_IN_DIR_RESPONSE; + if (is_acquire_filesize) { + result << file.file_size << FILE_DELIMITER_IN_DIR_RESPONSE; + } } std::string result_str = result.str(); @@ -221,4 +235,118 @@ bool load_size_smaller_than_wal_limit(int64_t content_length) { return (content_length < 0.8 * max_available_size); } +Status is_support_batch_download(const std::string& endpoint) { + std::string url = fmt::format("http://{}/api/_tablet/_batch_download?check=true", endpoint); + auto check_support_cb = [&url](HttpClient* client) { + RETURN_IF_ERROR(client->init(url)); + client->set_timeout_ms(CHECK_SUPPORT_TIMEOUT * 1000); + client->set_method(HttpMethod::HEAD); + std::string response; + return client->execute(&response); + }; + return HttpClient::execute_with_retry(DOWNLOAD_FILE_MAX_RETRY, 1, check_support_cb); +} + +Status list_remote_files_v2(const std::string& address, const std::string& token, + const std::string& remote_dir, + std::vector>* file_info_list) { + std::string remote_url = + fmt::format("http://{}/api/_tablet/_batch_download?token={}&dir={}&list=true", address, + token, remote_dir); + + std::string file_list_str; + auto list_files_cb = [&](HttpClient* client) { + file_list_str.clear(); + RETURN_IF_ERROR(client->init(remote_url, false)); + client->set_method(HttpMethod::GET); + client->set_timeout_ms(LIST_REMOTE_FILE_TIMEOUT * 1000); + return client->execute(&file_list_str); + }; + Status status = HttpClient::execute_with_retry(DOWNLOAD_FILE_MAX_RETRY, 1, list_files_cb); + if (!status.ok()) { + LOG(WARNING) << "failed to list remote files from " << remote_url + << ", status: " << status.to_string() << ", response: " << file_list_str; + return status; + } + + std::vector file_list = strings::Split(file_list_str, "\n", strings::SkipWhitespace()); + if (file_list.size() % 2 != 0) { + return Status::InternalError("batch download files: invalid file list, size is not even"); + } + + VLOG_DEBUG << "list remote files from " << remote_url + << ", file count: " << file_list.size() / 2; + + for (size_t i = 0; i < file_list.size(); i += 2) { + uint64_t file_size = 0; + try { + file_size = std::stoull(file_list[i + 1]); + } catch (std::exception&) { + return Status::InternalError("batch download files: invalid file size format: " + + file_list[i + 1]); + } + file_info_list->emplace_back(std::move(file_list[i]), file_size); + } + + return Status::OK(); +} + +Status download_files_v2(const std::string& address, const std::string& token, + const std::string& remote_dir, const std::string& local_dir, + const std::vector>& file_info_list) { + std::string remote_url = fmt::format("http://{}/api/_tablet/_batch_download?dir={}&token={}", + address, remote_dir, token); + + size_t batch_file_size = 0; + std::unordered_set expected_files; + std::stringstream ss; + for (const auto& file_info : file_info_list) { + ss << file_info.first << "\n"; + batch_file_size += file_info.second; + expected_files.insert(file_info.first); + } + std::string payload = ss.str(); + + uint64_t estimate_timeout = batch_file_size / config::download_low_speed_limit_kbps / 1024; + if (estimate_timeout < config::download_low_speed_time) { + estimate_timeout = config::download_low_speed_time; + } + + LOG(INFO) << "begin to download files from " << remote_url << " to " << local_dir + << ", file count: " << file_info_list.size() << ", total size: " << batch_file_size + << ", timeout: " << estimate_timeout; + + auto callback = [&](HttpClient* client) -> Status { + RETURN_IF_ERROR(client->init(remote_url, false)); + client->set_method(HttpMethod::POST); + client->set_payload(payload); + client->set_timeout_ms(estimate_timeout * 1000); + RETURN_IF_ERROR(client->download_multi_files(local_dir, expected_files)); + for (auto&& [file_name, file_size] : file_info_list) { + std::string local_file_path = local_dir + "/" + file_name; + + std::error_code ec; + // Check file length + uint64_t local_file_size = std::filesystem::file_size(local_file_path, ec); + if (ec) { + LOG(WARNING) << "download file error: " << ec.message(); + return Status::IOError("can't retrive file_size of {}, due to {}", local_file_path, + ec.message()); + } + if (local_file_size != file_size) { + LOG(WARNING) << "download file length error" + << ", remote_path=" << mask_token(remote_url) + << ", file_name=" << file_name << ", file_size=" << file_size + << ", local_file_size=" << local_file_size; + return Status::InternalError("downloaded file size is not equal"); + } + RETURN_IF_ERROR(io::global_local_filesystem()->permission( + local_file_path, io::LocalFileSystem::PERMS_OWNER_RW)); + } + + return Status::OK(); + }; + return HttpClient::execute_with_retry(DOWNLOAD_FILE_MAX_RETRY, 1, callback); +} + } // namespace doris diff --git a/be/src/http/utils.h b/be/src/http/utils.h index 20be6c0fcd7439..b9abb7c6208efb 100644 --- a/be/src/http/utils.h +++ b/be/src/http/utils.h @@ -40,9 +40,22 @@ void do_file_response(const std::string& dir_path, HttpRequest* req, bufferevent_rate_limit_group* rate_limit_group = nullptr, bool is_acquire_md5 = false); -void do_dir_response(const std::string& dir_path, HttpRequest* req); +void do_dir_response(const std::string& dir_path, HttpRequest* req, + bool is_acquire_filesize = false); std::string get_content_type(const std::string& file_name); bool load_size_smaller_than_wal_limit(int64_t content_length); + +// Whether a backend supports batch download +Status is_support_batch_download(const std::string& address); + +Status list_remote_files_v2(const std::string& address, const std::string& token, + const std::string& remote_dir, + std::vector>* file_info_list); + +Status download_files_v2(const std::string& address, const std::string& token, + const std::string& remote_dir, const std::string& local_dir, + const std::vector>& file_info_list); + } // namespace doris diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index bea1d3b1a91e89..fa8d9b8248e3f4 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -44,6 +44,7 @@ #include "gutil/strings/split.h" #include "gutil/strings/strip.h" #include "http/http_client.h" +#include "http/utils.h" #include "io/fs/file_system.h" #include "io/fs/local_file_system.h" #include "io/fs/path.h" @@ -399,28 +400,62 @@ Status EngineCloneTask::_make_and_download_snapshots(DataDir& data_dir, .error(st); } }}; - std::string remote_url_prefix; + + std::string remote_dir; { std::stringstream ss; if (snapshot_path->back() == '/') { - ss << "http://" << get_host_port(src.host, src.http_port) << HTTP_REQUEST_PREFIX - << HTTP_REQUEST_TOKEN_PARAM << token << HTTP_REQUEST_FILE_PARAM << *snapshot_path - << _clone_req.tablet_id << "/" << _clone_req.schema_hash << "/"; + ss << *snapshot_path << _clone_req.tablet_id << "/" << _clone_req.schema_hash + << "/"; } else { - ss << "http://" << get_host_port(src.host, src.http_port) << HTTP_REQUEST_PREFIX - << HTTP_REQUEST_TOKEN_PARAM << token << HTTP_REQUEST_FILE_PARAM << *snapshot_path - << "/" << _clone_req.tablet_id << "/" << _clone_req.schema_hash << "/"; + ss << *snapshot_path << "/" << _clone_req.tablet_id << "/" << _clone_req.schema_hash + << "/"; } - remote_url_prefix = ss.str(); + remote_dir = ss.str(); } - status = _download_files(&data_dir, remote_url_prefix, local_data_path); - if (!status.ok()) [[unlikely]] { - LOG_WARNING("failed to download snapshot from remote BE") - .tag("url", mask_token(remote_url_prefix)) - .error(status); - continue; // Try another BE + std::string address = get_host_port(src.host, src.http_port); + if (config::enable_batch_download && is_support_batch_download(address).ok()) { + // download files via batch api. + LOG_INFO("remote BE supports batch download, use batch file download") + .tag("address", address) + .tag("remote_dir", remote_dir); + status = _batch_download_files(&data_dir, address, remote_dir, local_data_path); + if (!status.ok()) [[unlikely]] { + LOG_WARNING("failed to download snapshot from remote BE in batch") + .tag("address", address) + .tag("remote_dir", remote_dir) + .error(status); + continue; // Try another BE + } + } else { + if (config::enable_batch_download) { + LOG_INFO("remote BE does not support batch download, use single file download") + .tag("address", address) + .tag("remote_dir", remote_dir); + } else { + LOG_INFO("batch download is disabled, use single file download") + .tag("address", address) + .tag("remote_dir", remote_dir); + } + + std::string remote_url_prefix; + { + std::stringstream ss; + ss << "http://" << address << HTTP_REQUEST_PREFIX << HTTP_REQUEST_TOKEN_PARAM + << token << HTTP_REQUEST_FILE_PARAM << remote_dir; + remote_url_prefix = ss.str(); + } + + status = _download_files(&data_dir, remote_url_prefix, local_data_path); + if (!status.ok()) [[unlikely]] { + LOG_WARNING("failed to download snapshot from remote BE") + .tag("url", mask_token(remote_url_prefix)) + .error(status); + continue; // Try another BE + } } + // No need to try again with another BE _pending_rs_guards = DORIS_TRY(_engine.snapshot_mgr()->convert_rowset_ids( local_data_path, _clone_req.tablet_id, _clone_req.replica_id, _clone_req.table_id, @@ -514,7 +549,7 @@ Status EngineCloneTask::_download_files(DataDir* data_dir, const std::string& re // If the header file is not exist, the table couldn't loaded by olap engine. // Avoid of data is not complete, we copy the header file at last. // The header file's name is end of .hdr. - for (int i = 0; i < file_name_list.size() - 1; ++i) { + for (int i = 0; i + 1 < file_name_list.size(); ++i) { if (file_name_list[i].ends_with(".hdr")) { std::swap(file_name_list[i], file_name_list[file_name_list.size() - 1]); break; @@ -593,13 +628,91 @@ Status EngineCloneTask::_download_files(DataDir* data_dir, const std::string& re } _copy_size = (int64_t)total_file_size; _copy_time_ms = (int64_t)total_time_ms; - LOG(INFO) << "succeed to copy tablet " << _signature << ", total file size: " << total_file_size - << " B" - << ", cost: " << total_time_ms << " ms" + LOG(INFO) << "succeed to copy tablet " << _signature + << ", total files: " << file_name_list.size() + << ", total file size: " << total_file_size << " B, cost: " << total_time_ms << " ms" << ", rate: " << copy_rate << " MB/s"; return Status::OK(); } +Status EngineCloneTask::_batch_download_files(DataDir* data_dir, const std::string& address, + const std::string& remote_dir, + const std::string& local_dir) { + constexpr size_t BATCH_FILE_SIZE = 64 << 20; // 64MB + constexpr size_t BATCH_FILE_NUM = 64; + + // Check local path exist, if exist, remove it, then create the dir + // local_file_full_path = tabletid/clone, for a specific tablet, there should be only one folder + // if this folder exists, then should remove it + // for example, BE clone from BE 1 to download file 1 with version (2,2), but clone from BE 1 failed + // then it will try to clone from BE 2, but it will find the file 1 already exist, but file 1 with same + // name may have different versions. + RETURN_IF_ERROR(io::global_local_filesystem()->delete_directory(local_dir)); + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(local_dir)); + + const std::string& token = _cluster_info->token; + std::vector> file_info_list; + RETURN_IF_ERROR(list_remote_files_v2(address, token, remote_dir, &file_info_list)); + + // If the header file is not exist, the table couldn't loaded by olap engine. + // Avoid of data is not complete, we copy the header file at last. + // The header file's name is end of .hdr. + for (int i = 0; i + 1 < file_info_list.size(); ++i) { + if (file_info_list[i].first.ends_with(".hdr")) { + std::swap(file_info_list[i], file_info_list[file_info_list.size() - 1]); + break; + } + } + + MonotonicStopWatch watch; + watch.start(); + + size_t total_file_size = 0; + size_t total_files = file_info_list.size(); + std::vector> batch_files; + for (size_t i = 0; i < total_files;) { + size_t batch_file_size = 0; + for (size_t j = i; j < total_files; j++) { + // Split batchs by file number and file size, + if (BATCH_FILE_NUM <= batch_files.size() || BATCH_FILE_SIZE <= batch_file_size || + // ... or separate the last .hdr file into a single batch. + (j + 1 == total_files && !batch_files.empty())) { + break; + } + batch_files.push_back(file_info_list[j]); + batch_file_size += file_info_list[j].second; + } + + // check disk capacity + if (data_dir->reach_capacity_limit(batch_file_size)) { + return Status::Error( + "reach the capacity limit of path {}, file_size={}", data_dir->path(), + batch_file_size); + } + + RETURN_IF_ERROR(download_files_v2(address, token, remote_dir, local_dir, batch_files)); + + total_file_size += batch_file_size; + i += batch_files.size(); + batch_files.clear(); + } + + uint64_t total_time_ms = watch.elapsed_time() / 1000 / 1000; + total_time_ms = total_time_ms > 0 ? total_time_ms : 0; + double copy_rate = 0.0; + if (total_time_ms > 0) { + copy_rate = total_file_size / ((double)total_time_ms) / 1000; + } + _copy_size = (int64_t)total_file_size; + _copy_time_ms = (int64_t)total_time_ms; + LOG(INFO) << "succeed to copy tablet " << _signature + << ", total files: " << file_info_list.size() + << ", total file size: " << total_file_size << " B, cost: " << total_time_ms << " ms" + << ", rate: " << copy_rate << " MB/s"; + + return Status::OK(); +} + /// This method will only be called if tablet already exist in this BE when doing clone. /// This method will do the following things: /// 1. Link all files from CLONE dir to tablet dir if file does not exist in tablet dir diff --git a/be/src/olap/task/engine_clone_task.h b/be/src/olap/task/engine_clone_task.h index a11d4c742f4bcc..e2ced28f03c88d 100644 --- a/be/src/olap/task/engine_clone_task.h +++ b/be/src/olap/task/engine_clone_task.h @@ -79,6 +79,9 @@ class EngineCloneTask final : public EngineTask { Status _download_files(DataDir* data_dir, const std::string& remote_url_prefix, const std::string& local_path); + Status _batch_download_files(DataDir* data_dir, const std::string& endpoint, + const std::string& remote_dir, const std::string& local_dir); + Status _make_snapshot(const std::string& ip, int port, TTableId tablet_id, TSchemaHash schema_hash, int timeout_s, const std::vector& missing_versions, std::string* snapshot_path, diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index a74f00291de640..0ee484ec88607b 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -32,6 +32,7 @@ #include "common/status.h" #include "http/action/adjust_log_level.h" #include "http/action/adjust_tracing_dump.h" +#include "http/action/batch_download_action.h" #include "http/action/be_proc_thread_action.h" #include "http/action/calc_file_crc_action.h" #include "http/action/check_rpc_channel_action.h" @@ -290,6 +291,16 @@ void HttpService::register_local_handler(StorageEngine& engine) { tablet_download_action); _ev_http_server->register_handler(HttpMethod::GET, "/api/_tablet/_download", tablet_download_action); + + BatchDownloadAction* batch_download_action = + _pool.add(new BatchDownloadAction(_env, _rate_limit_group, allow_paths)); + _ev_http_server->register_handler(HttpMethod::HEAD, "/api/_tablet/_batch_download", + batch_download_action); + _ev_http_server->register_handler(HttpMethod::GET, "/api/_tablet/_batch_download", + batch_download_action); + _ev_http_server->register_handler(HttpMethod::POST, "/api/_tablet/_batch_download", + batch_download_action); + if (config::enable_single_replica_load) { DownloadAction* single_replica_download_action = _pool.add(new DownloadAction( _env, nullptr, allow_paths, config::single_replica_load_download_num_workers)); diff --git a/be/test/http/http_client_test.cpp b/be/test/http/http_client_test.cpp index c98328d7c8e37c..84e4d259ff5ccd 100644 --- a/be/test/http/http_client_test.cpp +++ b/be/test/http/http_client_test.cpp @@ -25,6 +25,7 @@ #include #include +#include #include "gtest/gtest_pred_impl.h" #include "http/ev_http_server.h" @@ -102,14 +103,32 @@ class HttpDownloadFileHandler : public HttpHandler { } }; +class HttpBatchDownloadFileHandler : public HttpHandler { +public: + void handle(HttpRequest* req) override { + if (req->param("check") == "true") { + HttpChannel::send_reply(req, "OK"); + } else if (req->param("list") == "true") { + do_dir_response(req->param("dir"), req, true); + } else { + std::vector acquire_files = + strings::Split(req->get_request_body(), "\n", strings::SkipWhitespace()); + HttpChannel::send_files(req, req->param("dir"), acquire_files); + } + } +}; + static EvHttpServer* s_server = nullptr; static int real_port = 0; static std::string hostname = ""; +static std::string address = ""; +constexpr std::string_view TMP_DIR = "./http_test_tmp"; static HttpClientTestSimpleGetHandler s_simple_get_handler; static HttpClientTestSimplePostHandler s_simple_post_handler; static HttpNotFoundHandler s_not_found_handler; static HttpDownloadFileHandler s_download_file_handler; +static HttpBatchDownloadFileHandler s_batch_download_file_handler; class HttpClientTest : public testing::Test { public: @@ -123,10 +142,17 @@ class HttpClientTest : public testing::Test { s_server->register_handler(POST, "/simple_post", &s_simple_post_handler); s_server->register_handler(GET, "/not_found", &s_not_found_handler); s_server->register_handler(HEAD, "/download_file", &s_download_file_handler); + s_server->register_handler(HEAD, "/api/_tablet/_batch_download", + &s_batch_download_file_handler); + s_server->register_handler(GET, "/api/_tablet/_batch_download", + &s_batch_download_file_handler); + s_server->register_handler(POST, "/api/_tablet/_batch_download", + &s_batch_download_file_handler); static_cast(s_server->start()); real_port = s_server->get_real_port(); EXPECT_NE(0, real_port); - hostname = "http://127.0.0.1:" + std::to_string(real_port); + address = "127.0.0.1:" + std::to_string(real_port); + hostname = "http://" + address; } static void TearDownTestCase() { delete s_server; } @@ -571,4 +597,74 @@ TEST_F(HttpClientTest, enable_http_auth) { } } +TEST_F(HttpClientTest, batch_download) { + EXPECT_TRUE(io::global_local_filesystem()->delete_directory(TMP_DIR).ok()); + EXPECT_TRUE(io::global_local_filesystem()->create_directory(TMP_DIR).ok()); + + std::string root_dir(TMP_DIR); + std::string remote_related_dir = root_dir + "/source"; + std::string local_dir = root_dir + "/target"; + EXPECT_TRUE(io::global_local_filesystem()->create_directory(remote_related_dir).ok()); + EXPECT_TRUE(io::global_local_filesystem()->create_directory(local_dir).ok()); + + std::string remote_dir; + EXPECT_TRUE(io::global_local_filesystem()->canonicalize(remote_related_dir, &remote_dir).ok()); + + // 0. create dir source and prepare a large file exceeds 1MB + { + std::string large_file = remote_dir + "/a_large_file"; + int fd = open(large_file.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644); + ASSERT_TRUE(fd >= 0); + std::string buf = "0123456789"; + for (int i = 0; i < 10; i++) { + buf += buf; + } + for (int i = 0; i < 1024; i++) { + ASSERT_TRUE(write(fd, buf.c_str(), buf.size()) > 0); + } + close(fd); + + // create some small files. + for (int i = 0; i < 32; i++) { + std::string small_file = remote_dir + "/small_file_" + std::to_string(i); + fd = open(small_file.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644); + ASSERT_TRUE(fd >= 0); + ASSERT_TRUE(write(fd, buf.c_str(), buf.size()) > 0); + close(fd); + } + + // create a empty file + std::string empty_file = remote_dir + "/empty_file"; + fd = open(empty_file.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644); + ASSERT_TRUE(fd >= 0); + close(fd); + + empty_file = remote_dir + "/zzzz"; + fd = open(empty_file.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644); + ASSERT_TRUE(fd >= 0); + close(fd); + } + + // 1. check remote support batch download + Status st = is_support_batch_download(address); + EXPECT_TRUE(st.ok()); + + // 2. list remote files + std::vector> file_info_list; + st = list_remote_files_v2(address, "token", remote_dir, &file_info_list); + EXPECT_TRUE(st.ok()); + + // 3. download files + if (file_info_list.size() > 64) { + file_info_list.resize(64); + } + + // sort file info list by file name + std::sort(file_info_list.begin(), file_info_list.end(), + [](const auto& a, const auto& b) { return a.first < b.first; }); + + st = download_files_v2(address, "token", remote_dir, local_dir, file_info_list); + EXPECT_TRUE(st.ok()); +} + } // namespace doris