Skip to content

Commit

Permalink
branch-3.0: [fix](vault) avoid encrypt twice when altering vault #45156
Browse files Browse the repository at this point in the history
… (#45571)

Cherry-picked from #45156

Co-authored-by: Yongqiang YANG <[email protected]>
  • Loading branch information
github-actions[bot] and dataroaring authored Dec 19, 2024
1 parent e7b9826 commit 7b2cfc2
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 25 deletions.
44 changes: 28 additions & 16 deletions cloud/src/meta-service/meta_service_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -648,10 +648,19 @@ static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Tran
obj_info.has_provider()) {
code = MetaServiceCode::INVALID_ARGUMENT;
std::stringstream ss;
ss << "Only ak, sk can be altered";
ss << "Bucket, endpoint, prefix and provider can not be altered";
msg = ss.str();
return -1;
}

if (obj_info.has_ak() ^ obj_info.has_sk()) {
code = MetaServiceCode::INVALID_ARGUMENT;
std::stringstream ss;
ss << "Accesskey and secretkey must be alter together";
msg = ss.str();
return -1;
}

const auto& name = vault.name();
// Here we try to get mutable iter since we might need to alter the vault name
auto name_itr = std::find_if(instance.mutable_storage_vault_names()->begin(),
Expand Down Expand Up @@ -703,22 +712,25 @@ static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Tran
*name_itr = vault.alter_name();
}
auto origin_vault_info = new_vault.DebugString();
AkSkPair pre {new_vault.obj_info().ak(), new_vault.obj_info().sk()};
const auto& plain_ak = obj_info.has_ak() ? obj_info.ak() : new_vault.obj_info().ak();
const auto& plain_sk = obj_info.has_ak() ? obj_info.sk() : new_vault.obj_info().sk();
AkSkPair plain_ak_sk_pair {plain_ak, plain_sk};
AkSkPair cipher_ak_sk_pair;
EncryptionInfoPB encryption_info;
auto ret = encrypt_ak_sk_helper(plain_ak, plain_sk, &encryption_info, &cipher_ak_sk_pair, code,
msg);
if (ret != 0) {
msg = "failed to encrypt";
code = MetaServiceCode::ERR_ENCRYPT;
LOG(WARNING) << msg;
return -1;

// For ak or sk is not altered.
EncryptionInfoPB encryption_info = new_vault.obj_info().encryption_info();
AkSkPair new_ak_sk_pair {new_vault.obj_info().ak(), new_vault.obj_info().sk()};

if (obj_info.has_ak()) {
// ak and sk must be altered together, there is check before.
auto ret = encrypt_ak_sk_helper(obj_info.ak(), obj_info.sk(), &encryption_info,
&new_ak_sk_pair, code, msg);
if (ret != 0) {
msg = "failed to encrypt";
code = MetaServiceCode::ERR_ENCRYPT;
LOG(WARNING) << msg;
return -1;
}
}
new_vault.mutable_obj_info()->set_ak(cipher_ak_sk_pair.first);
new_vault.mutable_obj_info()->set_sk(cipher_ak_sk_pair.second);

new_vault.mutable_obj_info()->set_ak(new_ak_sk_pair.first);
new_vault.mutable_obj_info()->set_sk(new_ak_sk_pair.second);
new_vault.mutable_obj_info()->mutable_encryption_info()->CopyFrom(encryption_info);
if (obj_info.has_use_path_style()) {
new_vault.mutable_obj_info()->set_use_path_style(obj_info.use_path_style());
Expand Down
5 changes: 3 additions & 2 deletions cloud/test/meta_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ TEST(MetaServiceTest, AlterS3StorageVaultTest) {
AlterObjStoreInfoResponse res;
meta_service->alter_storage_vault(
reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();
ASSERT_NE(res.status().code(), MetaServiceCode::OK) << res.status().msg();
InstanceInfoPB instance;
get_test_instance(instance);

Expand All @@ -526,7 +526,7 @@ TEST(MetaServiceTest, AlterS3StorageVaultTest) {
TxnErrorCode::TXN_OK);
StorageVaultPB get_obj;
get_obj.ParseFromString(val);
ASSERT_EQ(get_obj.obj_info().ak(), "new_ak") << get_obj.obj_info().ak();
ASSERT_EQ(get_obj.obj_info().ak(), "ak") << get_obj.obj_info().ak();
}

{
Expand Down Expand Up @@ -578,6 +578,7 @@ TEST(MetaServiceTest, AlterS3StorageVaultTest) {
vault.set_alter_name(new_vault_name);
ObjectStoreInfoPB obj;
obj_info.set_ak("new_ak");
obj_info.set_sk("new_sk");
vault.mutable_obj_info()->MergeFrom(obj);
vault.set_name(vault_name);
req.mutable_vault()->CopyFrom(vault);
Expand Down
4 changes: 2 additions & 2 deletions docker/runtime/doris-compose/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -942,8 +942,8 @@ def run(self, args):
metaServiceHttpAddress = "{ms_endpoint}"
metaServiceToken = "greedisgood9999"
recycleServiceHttpAddress = "{recycle_endpoint}"
instanceId = "default_instance_id"
multiClusterInstance = "default_instance_id"
instanceId = "12345678"
multiClusterInstance = "12345678"
multiClusterBes = "{multi_cluster_bes}"
cloudUniqueId= "{fe_cloud_unique_id}"
'''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ public void alterStorageVault(StorageVaultType type, Map<String, String> propert
}
LOG.info("Succeed to alter storage vault {}, id {}, origin default vault replaced {}",
name, response.getStorageVaultId(), response.getDefaultStorageVaultReplaced());

// Make BE eagerly fetch the storage vault info from Meta Service
ALTER_BE_SYNC_THREAD_POOL.execute(() -> alterSyncVaultTask());
} catch (RpcException e) {
LOG.warn("failed to alter storage vault due to RpcException: {}", e);
throw new DdlException(e.getMessage());
Expand Down
157 changes: 152 additions & 5 deletions regression-test/suites/vault_p0/alter/test_alter_s3_vault.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,43 @@ suite("test_alter_s3_vault", "nonConcurrent") {
);
"""

def dupVaultName = "${suiteName}" + "_dup"
sql """
CREATE STORAGE VAULT IF NOT EXISTS ${dupVaultName}
PROPERTIES (
"type"="S3",
"s3.endpoint"="${getS3Endpoint()}",
"s3.region" = "${getS3Region()}",
"s3.access_key" = "${getS3AK()}",
"s3.secret_key" = "${getS3SK()}",
"s3.root.path" = "${suiteName}",
"s3.bucket" = "${getS3BucketName()}",
"s3.external_endpoint" = "",
"provider" = "${getS3Provider()}"
);
"""

sql """
DROP TABLE IF EXISTS alter_s3_vault_tbl
"""

sql """
CREATE TABLE IF NOT EXISTS alter_s3_vault_tbl
(
`k1` INT NULL,
`v1` INT NULL
)
UNIQUE KEY (k1)
DISTRIBUTED BY HASH(`k1`) BUCKETS 1
PROPERTIES (
"replication_num" = "1",
"disable_auto_compaction" = "true",
"storage_vault_name" = "${suiteName}"
);
"""

sql """insert into alter_s3_vault_tbl values(2, 2); """

expectExceptionLike({
sql """
ALTER STORAGE VAULT ${suiteName}
Expand All @@ -62,45 +99,155 @@ suite("test_alter_s3_vault", "nonConcurrent") {
"""
}, "Alter property")

expectExceptionLike({
sql """
ALTER STORAGE VAULT ${suiteName}
PROPERTIES (
"type"="S3",
"s3.access_key" = "new_ak"
);
"""
}, "Accesskey and secretkey must be alter together")

def vaultName = suiteName
String properties;
def String properties;

def vaultInfos = try_sql """show storage vault"""
def vaultInfos = try_sql """show storage vaults"""

for (int i = 0; i < vaultInfos.size(); i++) {
def name = vaultInfos[i][0]
logger.info("name is ${name}, info ${vaultInfos[i]}")
if (name.equals(vaultName)) {
properties = vaultInfos[i][2]
}
}

def newVaultName = suiteName + "_new";
// alter ak sk
sql """
ALTER STORAGE VAULT ${vaultName}
PROPERTIES (
"type"="S3",
"s3.access_key" = "${getS3AK()}",
"s3.secret_key" = "${getS3SK()}"
);
"""

vaultInfos = sql """SHOW STORAGE VAULT;"""

for (int i = 0; i < vaultInfos.size(); i++) {
def name = vaultInfos[i][0]
logger.info("name is ${name}, info ${vaultInfos[i]}")
if (name.equals(vaultName)) {
def newProperties = vaultInfos[i][2]
assert properties == newProperties, "Properties are not the same"
}
}

sql """insert into alter_s3_vault_tbl values("2", "2"); """


// rename
newVaultName = vaultName + "_new";

sql """
ALTER STORAGE VAULT ${vaultName}
PROPERTIES (
"type"="S3",
"VAULT_NAME" = "${newVaultName}"
);
"""

vaultInfos = sql """SHOW STORAGE VAULT;"""
for (int i = 0; i < vaultInfos.size(); i++) {
def name = vaultInfos[i][0]
logger.info("name is ${name}, info ${vaultInfos[i]}")
if (name.equals(newVaultName)) {
def newProperties = vaultInfos[i][2]
assert properties == newProperties, "Properties are not the same"
}
if (name.equals(vaultName)) {
assertTrue(false);
}
}

sql """insert into alter_s3_vault_tbl values("2", "2"); """

// rename + aksk
vaultName = newVaultName
newVaultName = vaultName + "_new";

sql """
ALTER STORAGE VAULT ${vaultName}
PROPERTIES (
"type"="S3",
"VAULT_NAME" = "${newVaultName}",
"s3.access_key" = "new_ak"
"s3.access_key" = "${getS3AK()}",
"s3.secret_key" = "${getS3SK()}"
);
"""

vaultInfos = sql """SHOW STORAGE VAULT;"""
for (int i = 0; i < vaultInfos.size(); i++) {
def name = vaultInfos[i][0]
logger.info("name is ${name}, info ${vaultInfos[i]}")
if (name.equals(newVaultName)) {
def newProperties = vaultInfos[i][2]
assert properties == newProperties, "Properties are not the same"
}
if (name.equals(vaultName)) {
assertTrue(false);
}
}
sql """insert into alter_s3_vault_tbl values("2", "2"); """


vaultName = newVaultName;

newVaultName = vaultName + "_new";

vaultInfos = sql """SHOW STORAGE VAULT;"""
boolean exist = false

sql """
ALTER STORAGE VAULT ${vaultName}
PROPERTIES (
"type"="S3",
"VAULT_NAME" = "${newVaultName}",
"s3.access_key" = "new_ak_ak",
"s3.secret_key" = "sk"
);
"""

for (int i = 0; i < vaultInfos.size(); i++) {
def name = vaultInfos[i][0]
logger.info("name is ${name}, info ${vaultInfos[i]}")
if (name.equals(vaultName)) {
assertTrue(false);
}
if (name.equals(newVaultName)) {
assertTrue(vaultInfos[i][2].contains("new_ak"))
assertTrue(vaultInfos[i][2].contains("new_ak_ak"))
exist = true
}
}
assertTrue(exist)

vaultName = newVaultName;

expectExceptionLike({
sql """
ALTER STORAGE VAULT ${vaultName}
PROPERTIES (
"type"="S3",
"VAULT_NAME" = "${dupVaultName}",
"s3.access_key" = "new_ak_ak",
"s3.secret_key" = "sk"
);
"""
}, "already exists")

def count = sql """ select count() from alter_s3_vault_tbl; """
assertTrue(res[0][0] == 4)

// failed to insert due to the wrong ak
expectExceptionLike({ sql """insert into alter_s3_vault_tbl values("2", "2");""" }, "")
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,23 @@ suite("test_alter_use_path_style", "nonConcurrent") {
);
"""

sql """
CREATE TABLE IF NOT EXISTS alter_use_path_style_tbl
(
`k1` INT NULL,
`v1` INT NULL
)
UNIQUE KEY (k1)
DISTRIBUTED BY HASH(`k1`) BUCKETS 1
PROPERTIES (
"replication_num" = "1",
"disable_auto_compaction" = "true",
"storage_vault_name" = "${suiteName}"
);
"""

sql """ insert into alter_use_path_style_tbl values(2, 2); """

sql """
ALTER STORAGE VAULT ${suiteName}
PROPERTIES (
Expand All @@ -51,6 +68,8 @@ suite("test_alter_use_path_style", "nonConcurrent") {
);
"""

sql """ insert into alter_use_path_style_tbl values(2, 2); """

def vaultInfos = sql """ SHOW STORAGE VAULT; """
boolean exist = false

Expand All @@ -73,6 +92,8 @@ suite("test_alter_use_path_style", "nonConcurrent") {
);
"""

sql """ insert into alter_use_path_style_tbl values(2, 2); """

vaultInfos = sql """ SHOW STORAGE VAULT; """
exist = false

Expand Down Expand Up @@ -105,4 +126,7 @@ suite("test_alter_use_path_style", "nonConcurrent") {
);
"""
}, "Invalid use_path_style value")

def count = sql """ select count() from alter_use_path_style_tbl; """
assertTrue(res[0][0] == 3)
}

0 comments on commit 7b2cfc2

Please sign in to comment.