Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](vault) avoid duplicated name and id #45158

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 89 additions & 55 deletions cloud/src/meta-service/meta_service_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,22 @@ void MetaServiceImpl::get_obj_store_info(google::protobuf::RpcController* contro
}
}

static bool is_vault_id_not_used(const InstanceInfoPB& instance, const std::string& vault_id) {
if (std::find_if(instance.resource_ids().begin(), instance.resource_ids().end(),
[&vault_id](const auto& id) { return id == vault_id; }) !=
instance.resource_ids().end()) {
return false;
}

if (std::find_if(instance.obj_info().begin(), instance.obj_info().end(),
[&vault_id](const auto& obj) { return obj.id() == vault_id; }) !=
instance.obj_info().end()) {
return false;
}

return true;
}

// The next available vault id would be max(max(obj info id), max(vault id)) + 1.
static std::string next_available_vault_id(const InstanceInfoPB& instance) {
int vault_id = 0;
Expand All @@ -328,6 +344,12 @@ static std::string next_available_vault_id(const InstanceInfoPB& instance) {
instance.resource_ids().begin(), instance.resource_ids().end(),
std::accumulate(instance.obj_info().begin(), instance.obj_info().end(), vault_id, max),
max);

// just add a defensive logic, we should use int64 in the future.
while (!is_vault_id_not_used(instance, std::to_string(prev + 1))) {
prev += 1;
}

return std::to_string(prev + 1);
}

Expand Down Expand Up @@ -454,14 +476,41 @@ static void create_object_info_with_encrypt(const InstanceInfoPB& instance, Obje
obj->set_sse_enabled(sse_enabled);
}

static int add_vault_into_instance(InstanceInfoPB& instance, Transaction* txn,
StorageVaultPB& vault_param, MetaServiceCode& code,
std::string& msg) {
static int check_duplicated_vault_name(const InstanceInfoPB& instance, const std::string& candidate_name,
MetaServiceCode& code, std::string& msg) {
if (std::find_if(instance.storage_vault_names().begin(), instance.storage_vault_names().end(),
[&vault_param](const auto& name) { return name == vault_param.name(); }) !=
[&candidate_name](const auto& name) { return name == candidate_name; }) !=
instance.storage_vault_names().end()) {
code = MetaServiceCode::ALREADY_EXISTED;
msg = fmt::format("vault_name={} already created", vault_param.name());
msg = fmt::format("candidate_name={} already created", candidate_name);
return -1;
}

return 0;
}

static int check_new_vault_name(const InstanceInfoPB& instance, const std::string& candidate_name,
MetaServiceCode& code, std::string& msg) {
if (!is_valid_storage_vault_name(candidate_name)) {
code = MetaServiceCode::INVALID_ARGUMENT;
std::stringstream ss;
ss << "invalid storage vault name =" << candidate_name << " the name must satisfy "
<< pattern_str;
msg = ss.str();
return -1;
}

if (check_duplicated_vault_name(instance, candidate_name, code, msg) != 0) {
return -1;
}

return 0;
}

static int add_vault_into_instance(InstanceInfoPB& instance, Transaction* txn,
StorageVaultPB& vault_param, MetaServiceCode& code,
std::string& msg) {
if (check_new_vault_name(instance, vault_param.name(), code, msg) != 0) {
return -1;
}

Expand Down Expand Up @@ -583,12 +632,7 @@ static int alter_hdfs_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Tr

auto origin_vault_info = new_vault.DebugString();
if (vault.has_alter_name()) {
if (!is_valid_storage_vault_name(vault.alter_name())) {
code = MetaServiceCode::INVALID_ARGUMENT;
std::stringstream ss;
ss << "invalid storage vault name =" << vault.alter_name() << " the name must satisfy "
<< pattern_str;
msg = ss.str();
if (check_new_vault_name(instance, vault.alter_name(), code, msg) != 0) {
return -1;
}
new_vault.set_name(vault.alter_name());
Expand Down Expand Up @@ -623,12 +667,6 @@ static int alter_hdfs_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Tr
txn->put(vault_key, val);
LOG(INFO) << "put vault_id=" << vault_id << ", vault_key=" << hex(vault_key)
<< ", origin vault=" << origin_vault_info << ", new_vault=" << new_vault_info;
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
msg = fmt::format("failed to commit kv txn, err={}", err);
LOG(WARNING) << msg;
}

return 0;
}
Expand All @@ -648,10 +686,19 @@ static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Tran
obj_info.has_provider()) {
code = MetaServiceCode::INVALID_ARGUMENT;
std::stringstream ss;
ss << "Only ak, sk can be altered";
ss << "Bucket, endpoint, prefix and provider can not be altered";
msg = ss.str();
return -1;
}

if (obj_info.has_ak() ^ obj_info.has_sk()) {
code = MetaServiceCode::INVALID_ARGUMENT;
std::stringstream ss;
ss << "Accesskey and secretkey must be alter together";
msg = ss.str();
return -1;
}

const auto& name = vault.name();
// Here we try to get mutable iter since we might need to alter the vault name
auto name_itr = std::find_if(instance.mutable_storage_vault_names()->begin(),
Expand Down Expand Up @@ -691,34 +738,33 @@ static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Tran
}

if (vault.has_alter_name()) {
if (!is_valid_storage_vault_name(vault.alter_name())) {
code = MetaServiceCode::INVALID_ARGUMENT;
std::stringstream ss;
ss << "invalid storage vault name =" << vault.alter_name() << " the name must satisfy "
<< pattern_str;
msg = ss.str();
if (check_new_vault_name(instance, vault.alter_name(), code, msg) != 0) {
return -1;
}

new_vault.set_name(vault.alter_name());
*name_itr = vault.alter_name();
}
auto origin_vault_info = new_vault.DebugString();
AkSkPair pre {new_vault.obj_info().ak(), new_vault.obj_info().sk()};
const auto& plain_ak = obj_info.has_ak() ? obj_info.ak() : new_vault.obj_info().ak();
const auto& plain_sk = obj_info.has_ak() ? obj_info.sk() : new_vault.obj_info().sk();
AkSkPair plain_ak_sk_pair {plain_ak, plain_sk};
AkSkPair cipher_ak_sk_pair;
EncryptionInfoPB encryption_info;
auto ret = encrypt_ak_sk_helper(plain_ak, plain_sk, &encryption_info, &cipher_ak_sk_pair, code,
msg);
if (ret != 0) {
msg = "failed to encrypt";
code = MetaServiceCode::ERR_ENCRYPT;
LOG(WARNING) << msg;
return -1;

// For ak or sk is not altered.
EncryptionInfoPB encryption_info = new_vault.obj_info().encryption_info();
AkSkPair new_ak_sk_pair {new_vault.obj_info().ak(), new_vault.obj_info().sk()};

if (obj_info.has_ak()) {
// ak and sk must be altered together, there is check before.
auto ret = encrypt_ak_sk_helper(obj_info.ak(), obj_info.sk(), &encryption_info,
&new_ak_sk_pair, code, msg);
if (ret != 0) {
msg = "failed to encrypt";
code = MetaServiceCode::ERR_ENCRYPT;
LOG(WARNING) << msg;
return -1;
}
}
new_vault.mutable_obj_info()->set_ak(cipher_ak_sk_pair.first);
new_vault.mutable_obj_info()->set_sk(cipher_ak_sk_pair.second);

new_vault.mutable_obj_info()->set_ak(new_ak_sk_pair.first);
new_vault.mutable_obj_info()->set_sk(new_ak_sk_pair.second);
new_vault.mutable_obj_info()->mutable_encryption_info()->CopyFrom(encryption_info);
if (obj_info.has_use_path_style()) {
new_vault.mutable_obj_info()->set_use_path_style(obj_info.use_path_style());
Expand All @@ -735,12 +781,6 @@ static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Tran
txn->put(vault_key, val);
LOG(INFO) << "put vault_id=" << vault_id << ", vault_key=" << hex(vault_key)
<< ", origin vault=" << origin_vault_info << ", new vault=" << new_vault_info;
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
msg = fmt::format("failed to commit kv txn, err={}", err);
LOG(WARNING) << msg;
}

return 0;
}
Expand Down Expand Up @@ -986,16 +1026,10 @@ void MetaServiceImpl::alter_storage_vault(google::protobuf::RpcController* contr
ObjectStorageDesc {ak, sk, bucket, prefix, endpoint, external_endpoint, region};
ObjectStoreInfoPB last_item = object_info_pb_factory(tmp_tuple, obj, instance,
encryption_info, cipher_ak_sk_pair);
if (instance.storage_vault_names().end() !=
std::find_if(instance.storage_vault_names().begin(),
instance.storage_vault_names().end(),
[&](const std::string& candidate_name) {
return candidate_name == request->vault().name();
})) {
code = MetaServiceCode::ALREADY_EXISTED;
msg = fmt::format("vault_name={} already created", request->vault().name());
if (check_new_vault_name(instance, request->vault().name(), code, msg) != 0) {
return;
}

StorageVaultPB vault;
vault.set_id(last_item.id());
vault.set_name(request->vault().name());
Expand Down Expand Up @@ -1085,11 +1119,11 @@ void MetaServiceImpl::alter_storage_vault(google::protobuf::RpcController* contr
}
case AlterObjStoreInfoRequest::ALTER_S3_VAULT: {
alter_s3_storage_vault(instance, std::move(txn), request->vault(), code, msg);
return;
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ret = alter_s3_storage_vault()
if ret != 0 should not break => nothing should be changed to isntancePB

}
case AlterObjStoreInfoRequest::ALTER_HDFS_VAULT: {
alter_hdfs_storage_vault(instance, std::move(txn), request->vault(), code, msg);
return;
break;
}
case AlterObjStoreInfoRequest::DROP_S3_VAULT:
[[fallthrough]];
Expand Down
4 changes: 2 additions & 2 deletions docker/runtime/doris-compose/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -937,8 +937,8 @@ def run(self, args):
metaServiceHttpAddress = "{ms_endpoint}"
metaServiceToken = "greedisgood9999"
recycleServiceHttpAddress = "{recycle_endpoint}"
instanceId = "default_instance_id"
multiClusterInstance = "default_instance_id"
instanceId = "12345678"
multiClusterInstance = "12345678"
multiClusterBes = "{multi_cluster_bes}"
cloudUniqueId= "{fe_cloud_unique_id}"
'''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ public void alterStorageVault(StorageVaultType type, Map<String, String> propert
}
LOG.info("Succeed to alter storage vault {}, id {}, origin default vault replaced {}",
name, response.getStorageVaultId(), response.getDefaultStorageVaultReplaced());

// Make BE eagerly fetch the storage vault info from Meta Service
ALTER_BE_SYNC_THREAD_POOL.execute(() -> alterSyncVaultTask());
} catch (RpcException e) {
LOG.warn("failed to alter storage vault due to RpcException: {}", e);
throw new DdlException(e.getMessage());
Expand Down
Loading
Loading