Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](backup)(cooldown) backup/restore for cooldown data #43778

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion be/src/io/fs/s3_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ class S3FileSystem final : public RemoteFileSystem {
return path;
} else {
// path with no schema
return _root_path / path;
return std::filesystem::path(
fmt::format("s3://{}/{}", _s3_conf.bucket, _s3_conf.prefix)) /
path;
}
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ class RowsetMeta {
_fs = std::move(fs);
}

void clear_resource_id() { _rowset_meta_pb.clear_resource_id(); }

const std::string& resource_id() const { return _rowset_meta_pb.resource_id(); }

bool is_local() const { return !_rowset_meta_pb.has_resource_id(); }
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/single_replica_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ Status SingleReplicaCompaction::_fetch_rowset(const TReplicaInfo& addr, const st
// change all rowset ids because they maybe its id same with local rowset
auto olap_st = SnapshotManager::instance()->convert_rowset_ids(
local_path, _tablet->tablet_id(), _tablet->replica_id(), _tablet->table_id(),
_tablet->partition_id(), _tablet->schema_hash());
_tablet->partition_id(), _tablet->schema_hash(), false, 0);
if (!olap_st.ok()) {
LOG(WARNING) << "fail to convert rowset ids, path=" << local_path
<< ", tablet_id=" << _tablet->tablet_id() << ", error=" << olap_st;
Expand Down
18 changes: 12 additions & 6 deletions be/src/olap/snapshot_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "olap/data_dir.h"
#include "olap/olap_common.h"
Expand Down Expand Up @@ -146,7 +147,8 @@ Status SnapshotManager::release_snapshot(const string& snapshot_path) {

Status SnapshotManager::convert_rowset_ids(const std::string& clone_dir, int64_t tablet_id,
int64_t replica_id, int64_t table_id,
int64_t partition_id, const int32_t& schema_hash) {
int64_t partition_id, const int32_t& schema_hash,
bool is_restore, int64_t storage_policy_id) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
Status res = Status::OK();
// check clone dir existed
Expand Down Expand Up @@ -181,6 +183,10 @@ Status SnapshotManager::convert_rowset_ids(const std::string& clone_dir, int64_t
new_tablet_meta_pb.set_tablet_id(tablet_id);
*new_tablet_meta_pb.mutable_tablet_uid() = TabletUid::gen_uid().to_proto();
new_tablet_meta_pb.set_replica_id(replica_id);
if (is_restore) {
new_tablet_meta_pb.set_storage_policy_id(storage_policy_id);
new_tablet_meta_pb.clear_cooldown_meta_id();
}
if (table_id > 0) {
new_tablet_meta_pb.set_table_id(table_id);
}
Expand Down Expand Up @@ -212,6 +218,9 @@ Status SnapshotManager::convert_rowset_ids(const std::string& clone_dir, int64_t
} else {
// remote rowset
*rowset_meta = visible_rowset;
if (is_restore) {
rowset_meta->clear_resource_id();
}
}

rowset_meta->set_tablet_id(tablet_id);
Expand Down Expand Up @@ -521,11 +530,8 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet
if (!is_single_rowset_clone && (!res.ok() || request.missing_version.empty())) {
if (!request.__isset.missing_version &&
ref_tablet->tablet_meta()->cooldown_meta_id().initialized()) {
LOG(WARNING) << "currently not support backup tablet with cooldowned remote "
"data. tablet="
<< request.tablet_id;
return Status::NotSupported(
"currently not support backup tablet with cooldowned remote data");
LOG(INFO) << "Backup tablet with cooldowned remote data. tablet="
<< request.tablet_id;
}
/// not all missing versions are found, fall back to full snapshot.
res = Status::OK(); // reset res
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/snapshot_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ class SnapshotManager {
static SnapshotManager* instance();

Status convert_rowset_ids(const std::string& clone_dir, int64_t tablet_id, int64_t replica_id,
int64_t table_id, int64_t partition_id, const int32_t& schema_hash);
int64_t table_id, int64_t partition_id, const int32_t& schema_hash,
bool is_restore, int64_t storage_policy_id);

private:
SnapshotManager() : _snapshot_base_id(0) {
Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,14 @@ class TabletSchema {
segment_v2::CompressionTypePB compression_type() const { return _compression_type; }

const std::vector<TabletIndex>& indexes() const { return _indexes; }
[[nodiscard]] bool has_inverted_index() const {
for (const auto& index : _indexes) {
if (index.index_type() == IndexType::INVERTED) {
return true;
}
}
return false;
}
std::vector<const TabletIndex*> get_indexes_for_column(int32_t col_unique_id) const;
bool has_inverted_index(int32_t col_unique_id) const;
bool has_inverted_index_with_index_id(int64_t index_id) const;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ Status EngineCloneTask::_make_and_download_snapshots(DataDir& data_dir,
// change all rowset ids because they maybe its id same with local rowset
status = SnapshotManager::instance()->convert_rowset_ids(
local_data_path, _clone_req.tablet_id, _clone_req.replica_id,
_clone_req.table_id, _clone_req.partition_id, _clone_req.schema_hash);
_clone_req.table_id, _clone_req.partition_id, _clone_req.schema_hash, false, 0);
} else {
LOG_WARNING("failed to download snapshot from remote BE")
.tag("url", _mask_token(remote_url_prefix))
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/task/engine_storage_migration_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ Status EngineStorageMigrationTask::_gen_and_write_header_to_hdr_file(
// rowset create time is useful when load tablet from meta to check which tablet is the tablet to load
return SnapshotManager::instance()->convert_rowset_ids(
full_path, tablet_id, _tablet->replica_id(), _tablet->table_id(),
_tablet->partition_id(), schema_hash);
_tablet->partition_id(), schema_hash, false, 0);
}

Status EngineStorageMigrationTask::_reload_tablet(const std::string& full_path) {
Expand Down
198 changes: 170 additions & 28 deletions be/src/runtime/snapshot_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "gutil/strings/split.h"
#include "http/http_client.h"
#include "io/fs/broker_file_system.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_system.h"
#include "io/fs/hdfs_file_system.h"
#include "io/fs/local_file_system.h"
Expand All @@ -49,8 +50,10 @@
#include "io/fs/s3_file_system.h"
#include "io/hdfs_builder.h"
#include "olap/data_dir.h"
#include "olap/olap_define.h"
#include "olap/snapshot_manager.h"
#include "olap/storage_engine.h"
#include "olap/storage_policy.h"
#include "olap/tablet.h"
#include "olap/tablet_manager.h"
#include "runtime/client_cache.h"
Expand Down Expand Up @@ -100,6 +103,166 @@ Status SnapshotLoader::init(TStorageBackendType::type type, const std::string& l

SnapshotLoader::~SnapshotLoader() = default;

bool _end_with(std::string_view str, std::string_view match) {
return str.size() >= match.size() &&
str.compare(str.size() - match.size(), match.size(), match) == 0;
}

static Status list_segment_inverted_index_file(io::RemoteFileSystem* cold_fs,
const std::string& dir, const std::string& rowset,
std::vector<std::string>* remote_files) {
bool exists = true;
std::vector<io::FileInfo> files;
RETURN_IF_ERROR(cold_fs->list(dir, true, &files, &exists));
for (auto& tmp_file : files) {
io::Path path(tmp_file.file_name);
std::string file_name = path.filename();

if (file_name.substr(0, rowset.length()).compare(rowset) != 0 ||
!_end_with(file_name, ".idx")) {
continue;
}
remote_files->push_back(file_name);
}

return Status::OK();
}

static Status check_need_upload(const std::string& src_path, const std::string& local_file,
std::map<std::string, FileStat>& remote_files, std::string* md5sum,
bool* need_upload) {
// calc md5sum of localfile
RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(src_path + "/" + local_file, md5sum));
VLOG_CRITICAL << "get file checksum: " << local_file << ": " << *md5sum;

// check if this local file need upload
auto find = remote_files.find(local_file);
if (find != remote_files.end()) {
if (*md5sum != find->second.md5) {
// remote storage file exist, but with different checksum
LOG(WARNING) << "remote file checksum is invalid. remote: " << find->first
<< ", local: " << *md5sum;
// TODO(cmy): save these files and delete them later
*need_upload = true;
}
} else {
*need_upload = true;
}

return Status::OK();
}

static Status download_and_upload_one_cold_file(
io::RemoteFileSystem& dest_fs, io::RemoteFileSystem* cold_fs,
const std::string& remote_seg_path, const std::string& local_seg_path,
const std::string& dest_seg_path, const std::string& local_path,
const std::string& local_file, std::map<std::string, FileStat>& remote_files) {
RETURN_IF_ERROR(cold_fs->download(remote_seg_path, local_seg_path));

bool need_upload = false;
std::string md5sum;
RETURN_IF_ERROR(check_need_upload(local_path, local_file, remote_files, &md5sum, &need_upload));

if (!need_upload) {
VLOG_CRITICAL << "cold file exist in remote path, no need to upload: " << local_file;
return Status::OK();
}

RETURN_IF_ERROR(dest_fs.upload_with_checksum(local_seg_path, dest_seg_path, md5sum));

//delete local file
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(local_seg_path));

return Status::OK();
}

static Status upload_remote_cold_rowset(io::RemoteFileSystem& dest_fs, int64_t tablet_id,
const std::string& local_path, const std::string& dest_path,
io::RemoteFileSystem* cold_fs, const std::string& rowset_id,
int segments, int have_inverted_index,
std::map<std::string, FileStat>& remote_files) {
Status res = Status::OK();

for (int i = 0; i < segments; i++) {
std::string local_file = fmt::format("{}_{}.dat", rowset_id, i);
std::string remote_seg_path =
fmt::format("{}/{}_{}.dat", remote_tablet_path(tablet_id), rowset_id, i);
std::string local_seg_path = fmt::format("{}/{}_{}.dat", local_path, rowset_id, i);
std::string dest_seg_path = fmt::format("{}/{}_{}.dat", dest_path, rowset_id, i);

RETURN_IF_ERROR(download_and_upload_one_cold_file(dest_fs, cold_fs, remote_seg_path,
local_seg_path, dest_seg_path, local_path,
local_file, remote_files));
}

if (!have_inverted_index) {
return res;
}

std::vector<std::string> remote_index_files;
RETURN_IF_ERROR(list_segment_inverted_index_file(cold_fs, remote_tablet_path(tablet_id),
rowset_id, &remote_index_files));

for (auto& index_file : remote_index_files) {
std::string remote_index_path =
fmt::format("{}/{}", remote_tablet_path(tablet_id), index_file);
std::string local_seg_path = fmt::format("{}/{}", local_path, index_file);
std::string dest_seg_path = fmt::format("{}/{}", dest_path, index_file);

RETURN_IF_ERROR(download_and_upload_one_cold_file(dest_fs, cold_fs, remote_index_path,
local_seg_path, dest_seg_path, local_path,
index_file, remote_files));
}
return res;
}

/*
* get the cooldown data info from the hdr file, download the cooldown data and
* upload it to remote storage.
*/
static Status upload_remote_cold_file(io::RemoteFileSystem& dest_fs, int64_t tablet_id,
const std::string& local_path, const std::string& dest_path,
std::map<std::string, FileStat>& remote_files) {
Status res = Status::OK();
std::string hdr_file = local_path + "/" + std::to_string(tablet_id) + ".hdr";

auto tablet_meta = std::make_shared<TabletMeta>();
res = tablet_meta->create_from_file(hdr_file);
if (!res.ok()) {
return Status::Error<ErrorCode::ENGINE_LOAD_INDEX_TABLE_ERROR>(
"fail to load tablet_meta. file_path={}", hdr_file);
}

if (tablet_meta->tablet_id() != tablet_id) {
return Status::InternalError("Invalid tablet {}", tablet_meta->tablet_id());
}

if (!tablet_meta->cooldown_meta_id().initialized()) {
return res;
}

string rowset_id;
int segments;
int have_inverted_index;

std::shared_ptr<io::RemoteFileSystem> colddata_fs;
RETURN_IF_ERROR(get_remote_file_system(tablet_meta->storage_policy_id(), &colddata_fs));

for (auto rowset_meta : tablet_meta->all_rs_metas()) {
rowset_id = rowset_meta->rowset_id().to_string();
segments = rowset_meta->num_segments();
have_inverted_index = rowset_meta->tablet_schema()->has_inverted_index();

if (segments > 0 && !rowset_meta->is_local()) {
RETURN_IF_ERROR(upload_remote_cold_rowset(dest_fs, tablet_id, local_path, dest_path,
colddata_fs.get(), rowset_id, segments,
have_inverted_index, remote_files));
}
}

return res;
}

Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_dest_path,
std::map<int64_t, std::vector<std::string>>* tablet_files) {
if (!_remote_fs) {
Expand Down Expand Up @@ -150,28 +313,11 @@ Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_d
TTaskType::type::UPLOAD));

const std::string& local_file = *it;
// calc md5sum of localfile
bool need_upload = false;
std::string md5sum;
RETURN_IF_ERROR(
io::global_local_filesystem()->md5sum(src_path + "/" + local_file, &md5sum));
VLOG_CRITICAL << "get file checksum: " << local_file << ": " << md5sum;
check_need_upload(src_path, local_file, remote_files, &md5sum, &need_upload));
local_files_with_checksum.push_back(local_file + "." + md5sum);

// check if this local file need upload
bool need_upload = false;
auto find = remote_files.find(local_file);
if (find != remote_files.end()) {
if (md5sum != find->second.md5) {
// remote storage file exist, but with different checksum
LOG(WARNING) << "remote file checksum is invalid. remote: " << find->first
<< ", local: " << md5sum;
// TODO(cmy): save these files and delete them later
need_upload = true;
}
} else {
need_upload = true;
}

if (!need_upload) {
VLOG_CRITICAL << "file exist in remote path, no need to upload: " << local_file;
continue;
Expand All @@ -184,6 +330,10 @@ Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_d
_remote_fs->upload_with_checksum(full_local_file, full_remote_file, md5sum));
} // end for each tablet's local files

// 2.4. upload cooldown data files
RETURN_IF_ERROR(
upload_remote_cold_file(*_remote_fs, tablet_id, src_path, dest_path, remote_files));

tablet_files->emplace(tablet_id, local_files_with_checksum);
finished_num++;
LOG(INFO) << "finished to write tablet to remote. local path: " << src_path
Expand Down Expand Up @@ -734,7 +884,7 @@ Status SnapshotLoader::move(const std::string& snapshot_path, TabletSharedPtr ta
// rename the rowset ids and tabletid info in rowset meta
Status convert_status = SnapshotManager::instance()->convert_rowset_ids(
snapshot_path, tablet_id, tablet->replica_id(), tablet->table_id(),
tablet->partition_id(), schema_hash);
tablet->partition_id(), schema_hash, true, tablet->storage_policy_id());
if (!convert_status.ok()) {
std::stringstream ss;
ss << "failed to convert rowsetids in snapshot: " << snapshot_path
Expand Down Expand Up @@ -804,14 +954,6 @@ Status SnapshotLoader::move(const std::string& snapshot_path, TabletSharedPtr ta
return status;
}

bool SnapshotLoader::_end_with(const std::string& str, const std::string& match) {
if (str.size() >= match.size() &&
str.compare(str.size() - match.size(), match.size(), match) == 0) {
return true;
}
return false;
}

Status SnapshotLoader::_get_tablet_id_and_schema_hash_from_file_path(const std::string& src_path,
int64_t* tablet_id,
int32_t* schema_hash) {
Expand Down
2 changes: 0 additions & 2 deletions be/src/runtime/snapshot_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ class SnapshotLoader {
Status _get_existing_files_from_local(const std::string& local_path,
std::vector<std::string>* local_files);

bool _end_with(const std::string& str, const std::string& match);

Status _replace_tablet_id(const std::string& file_name, int64_t tablet_id,
std::string* new_file_name);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ PROPERTIES ("key"="value", ...);
- <version since="1.2" type="inline"> "reserve_dynamic_partition_enable" = "true":默认为 false。当该属性为 true 时,恢复的表会保留该表备份之前的'dynamic_partition_enable'属性值。该值不为true时,则恢复出来的表的'dynamic_partition_enable'属性值会设置为false。</version>
- "timeout" = "3600":任务超时时间,默认为一天。单位秒。
- "meta_version" = 40:使用指定的 meta_version 来读取之前备份的元数据。注意,该参数作为临时方案,仅用于恢复老版本 Doris 备份的数据。最新版本的备份数据中已经包含 meta version,无需再指定。
- "reserve_storage_policy" = "true":指定的恢复的表是否保留冷热分层属性。默认为true,备份集中保存的storage policy和对应的resource信息将在新集群中重建。恢复时数据都会下载到本地,再由降冷策略上传到远程。reserve_storage_policy设置为false,恢复后的表去除了冷热属性, 变为普通表。
- "storage_resource" = "resource_name":指定恢复后表的冷数据使用的resource。建议在跨集群恢复时指定此属性。注意恢复后的storage policy中的storage_resource属性也会更新为指定的storage_resource。若指定了"reserve_storage_policy"="false",则忽略storage_resource属性。

### Example

Expand Down
Loading
Loading