Skip to content

Commit

Permalink
[fix](vault) Fix bugs about altering storage vault name (#45685)
Browse files Browse the repository at this point in the history
* fix altering storage name but not writing disk in meta-service
* check vault if existed when altering stoarge vault name
  • Loading branch information
SWJTU-ZhangLei authored and Your Name committed Dec 25, 2024
1 parent 09cacab commit b9e152e
Show file tree
Hide file tree
Showing 12 changed files with 386 additions and 75 deletions.
55 changes: 35 additions & 20 deletions cloud/src/meta-service/meta_service_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,9 +523,18 @@ static void set_default_vault_log_helper(const InstanceInfoPB& instance,
LOG(INFO) << vault_msg;
}

static int alter_hdfs_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Transaction> txn,
static bool vault_exist(const InstanceInfoPB& instance, const std::string& new_vault_name) {
for (auto& name : instance.storage_vault_names()) {
if (new_vault_name == name) {
return true;
}
}
return false;
}

static int alter_hdfs_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Transaction>& txn,
const StorageVaultPB& vault, MetaServiceCode& code,
std::string& msg) {
std::string& msg, AlterObjStoreInfoResponse* response) {
if (!vault.has_hdfs_info()) {
code = MetaServiceCode::INVALID_ARGUMENT;
std::stringstream ss;
Expand Down Expand Up @@ -591,6 +600,13 @@ static int alter_hdfs_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Tr
msg = ss.str();
return -1;
}

if (vault_exist(instance, vault.alter_name())) {
code = MetaServiceCode::ALREADY_EXISTED;
msg = fmt::format("vault_name={} already existed", vault.alter_name());
return -1;
}

new_vault.set_name(vault.alter_name());
*name_itr = vault.alter_name();
}
Expand Down Expand Up @@ -623,19 +639,15 @@ static int alter_hdfs_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Tr
txn->put(vault_key, val);
LOG(INFO) << "put vault_id=" << vault_id << ", vault_key=" << hex(vault_key)
<< ", origin vault=" << origin_vault_info << ", new_vault=" << new_vault_info;
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
msg = fmt::format("failed to commit kv txn, err={}", err);
LOG(WARNING) << msg;
}

DCHECK_EQ(new_vault.id(), vault_id);
response->set_storage_vault_id(new_vault.id());
return 0;
}

static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Transaction> txn,
static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Transaction>& txn,
const StorageVaultPB& vault, MetaServiceCode& code,
std::string& msg) {
std::string& msg, AlterObjStoreInfoResponse* response) {
if (!vault.has_obj_info()) {
code = MetaServiceCode::INVALID_ARGUMENT;
std::stringstream ss;
Expand Down Expand Up @@ -708,6 +720,13 @@ static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Tran
msg = ss.str();
return -1;
}

if (vault_exist(instance, vault.alter_name())) {
code = MetaServiceCode::ALREADY_EXISTED;
msg = fmt::format("vault_name={} already existed", vault.alter_name());
return -1;
}

new_vault.set_name(vault.alter_name());
*name_itr = vault.alter_name();
}
Expand Down Expand Up @@ -747,13 +766,9 @@ static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Tran
txn->put(vault_key, val);
LOG(INFO) << "put vault_id=" << vault_id << ", vault_key=" << hex(vault_key)
<< ", origin vault=" << origin_vault_info << ", new vault=" << new_vault_info;
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
msg = fmt::format("failed to commit kv txn, err={}", err);
LOG(WARNING) << msg;
}

DCHECK_EQ(new_vault.id(), vault_id);
response->set_storage_vault_id(new_vault.id());
return 0;
}

Expand Down Expand Up @@ -1100,12 +1115,12 @@ void MetaServiceImpl::alter_storage_vault(google::protobuf::RpcController* contr
break;
}
case AlterObjStoreInfoRequest::ALTER_S3_VAULT: {
alter_s3_storage_vault(instance, std::move(txn), request->vault(), code, msg);
return;
alter_s3_storage_vault(instance, txn, request->vault(), code, msg, response);
break;
}
case AlterObjStoreInfoRequest::ALTER_HDFS_VAULT: {
alter_hdfs_storage_vault(instance, std::move(txn), request->vault(), code, msg);
return;
alter_hdfs_storage_vault(instance, txn, request->vault(), code, msg, response);
break;
}
case AlterObjStoreInfoRequest::DROP_S3_VAULT:
[[fallthrough]];
Expand Down
39 changes: 39 additions & 0 deletions cloud/test/meta_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,25 @@ TEST(MetaServiceTest, AlterS3StorageVaultTest) {
meta_service->alter_storage_vault(
reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();

{
AlterObjStoreInfoRequest req;
req.set_cloud_unique_id("test_cloud_unique_id");
req.set_op(AlterObjStoreInfoRequest::ALTER_S3_VAULT);
StorageVaultPB vault;
vault.set_alter_name(new_vault_name);
ObjectStoreInfoPB obj;
obj_info.set_ak("new_ak");
obj_info.set_sk("new_sk");
vault.mutable_obj_info()->MergeFrom(obj);
vault.set_name(new_vault_name);
req.mutable_vault()->CopyFrom(vault);
meta_service->alter_storage_vault(
reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res,
nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED) << res.status().msg();
}

InstanceInfoPB instance;
get_test_instance(instance);

Expand Down Expand Up @@ -677,6 +696,7 @@ TEST(MetaServiceTest, AlterHdfsStorageVaultTest) {
meta_service->alter_storage_vault(
reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();

InstanceInfoPB instance;
get_test_instance(instance);

Expand Down Expand Up @@ -744,6 +764,25 @@ TEST(MetaServiceTest, AlterHdfsStorageVaultTest) {
meta_service->alter_storage_vault(
reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();

{
AlterObjStoreInfoRequest req;
req.set_cloud_unique_id("test_cloud_unique_id");
req.set_op(AlterObjStoreInfoRequest::ALTER_HDFS_VAULT);
StorageVaultPB vault;
vault.mutable_hdfs_info()->mutable_build_conf()->set_user("hadoop");
vault.set_name(new_vault_name);
vault.set_alter_name(new_vault_name);
req.mutable_vault()->CopyFrom(vault);

brpc::Controller cntl;
AlterObjStoreInfoResponse res;
meta_service->alter_storage_vault(
reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res,
nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED) << res.status().msg();
}

InstanceInfoPB instance;
get_test_instance(instance);

Expand Down
5 changes: 4 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -3693,7 +3693,10 @@ private static void addOlapTablePropertyInfo(OlapTable olapTable, StringBuilder
}

// Storage Vault
if (!olapTable.getStorageVaultName().isEmpty()) {
if (!Strings.isNullOrEmpty(olapTable.getStorageVaultId())) {
sb.append(",\n\"").append(PropertyAnalyzer
.PROPERTIES_STORAGE_VAULT_ID).append("\" = \"");
sb.append(olapTable.getStorageVaultId()).append("\"");
sb.append(",\n\"").append(PropertyAnalyzer
.PROPERTIES_STORAGE_VAULT_NAME).append("\" = \"");
sb.append(olapTable.getStorageVaultName()).append("\"");
Expand Down
20 changes: 8 additions & 12 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,22 +263,18 @@ public void setIsBeingSynced(boolean isBeingSynced) {
String.valueOf(isBeingSynced));
}

public void setStorageVaultName(String storageVaultName) throws DdlException {
if (storageVaultName == null || storageVaultName.isEmpty()) {
return;
}
getOrCreatTableProperty().setStorageVaultName(storageVaultName);
}

public String getStorageVaultName() {
return getOrCreatTableProperty().getStorageVaultName();
if (Strings.isNullOrEmpty(getStorageVaultId())) {
return "";
}
return Env.getCurrentEnv().getStorageVaultMgr().getVaultNameById(getStorageVaultId());
}

public void setStorageVaultId(String setStorageVaultId) throws DdlException {
if (setStorageVaultId == null || setStorageVaultId.isEmpty()) {
throw new DdlException("Invalid Storage Vault, please set one useful storage vault");
public void setStorageVaultId(String storageVaultId) throws DdlException {
if (Strings.isNullOrEmpty(storageVaultId)) {
throw new DdlException("Invalid storage vault id, please set an available storage vault");
}
getOrCreatTableProperty().setStorageVaultId(setStorageVaultId);
getOrCreatTableProperty().setStorageVaultId(storageVaultId);
}

public String getStorageVaultId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public abstract class StorageVault {
public static final String LOWER_CASE_META_NAMES = "lower_case_meta_names";
public static final String META_NAMES_MAPPING = "meta_names_mapping";

public static final String VAULT_NAME = "VAULT_NAME";

public enum StorageVaultType {
UNKNOWN,
S3,
Expand All @@ -60,7 +62,6 @@ public static StorageVaultType fromString(String storageVaultTypeType) {
}
}

protected static final String VAULT_NAME = "VAULT_NAME";
protected String name;
protected StorageVaultType type;
protected String id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.doris.thrift.TNetworkAddress;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -82,12 +84,42 @@ public void refreshVaultMap(Map<String, String> vaultMap) {
rwLock.writeLock().unlock();
}

public String getVaultIdByName(String name) {
String vaultId;
rwLock.readLock().lock();
vaultId = vaultNameToVaultId.getOrDefault(name, "");
rwLock.readLock().unlock();
return vaultId;
public String getVaultIdByName(String vaultName) {
try {
rwLock.readLock().lock();
return vaultNameToVaultId.getOrDefault(vaultName, "");
} finally {
rwLock.readLock().unlock();
}
}

public String getVaultNameById(String vaultId) {
try {
rwLock.readLock().lock();
for (Map.Entry<String, String> entry : vaultNameToVaultId.entrySet()) {
if (entry.getValue().equals(vaultId)) {
return entry.getKey();
}
}
return "";
} finally {
rwLock.readLock().unlock();
}
}

private void updateVaultNameToIdCache(String oldVaultName, String newVaultName, String vaultId) {
try {
rwLock.writeLock().lock();
String cachedVaultId = vaultNameToVaultId.get(oldVaultName);
vaultNameToVaultId.remove(oldVaultName);
Preconditions.checkArgument(!Strings.isNullOrEmpty(cachedVaultId), cachedVaultId,
"Cached vault id is null or empty");
Preconditions.checkArgument(cachedVaultId.equals(vaultId),
"Cached vault id not equal to remote storage." + cachedVaultId + " - " + vaultId);
vaultNameToVaultId.put(newVaultName, vaultId);
} finally {
rwLock.writeLock().unlock();
}
}

private Cloud.StorageVaultPB.Builder buildAlterS3VaultRequest(Map<String, String> properties, String name)
Expand Down Expand Up @@ -168,8 +200,12 @@ public void alterStorageVault(StorageVaultType type, Map<String, String> propert
LOG.warn("failed to alter storage vault response: {} ", response);
throw new DdlException(response.getStatus().getMsg());
}
LOG.info("Succeed to alter storage vault {}, id {}, origin default vault replaced {}",
name, response.getStorageVaultId(), response.getDefaultStorageVaultReplaced());

if (request.hasVault() && request.getVault().hasAlterName()) {
updateVaultNameToIdCache(name, request.getVault().getAlterName(), response.getStorageVaultId());
LOG.info("Succeed to alter storage vault, old name:{} new name: {} id:{}", name,
request.getVault().getAlterName(), response.getStorageVaultId());
}

// Make BE eagerly fetch the storage vault info from Meta Service
ALTER_BE_SYNC_THREAD_POOL.execute(() -> alterSyncVaultTask());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,10 +770,6 @@ public String getStorageVaultName() {
return properties.getOrDefault(PropertyAnalyzer.PROPERTIES_STORAGE_VAULT_NAME, "");
}

public void setStorageVaultName(String storageVaultName) {
properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_VAULT_NAME, storageVaultName);
}

public String getPropertiesString() throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.proto.OlapCommon;
import org.apache.doris.proto.OlapFile;
Expand Down Expand Up @@ -105,6 +104,12 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa
throws DdlException {
// create base index first.
Preconditions.checkArgument(tbl.getBaseIndexId() != -1);

if (((CloudEnv) Env.getCurrentEnv()).getEnableStorageVault()) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(tbl.getStorageVaultId()),
"Storage vault id is null or empty");
}

MaterializedIndex baseIndex = new MaterializedIndex(tbl.getBaseIndexId(), IndexState.NORMAL);

LOG.info("begin create cloud partition");
Expand All @@ -128,9 +133,6 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa

long version = partition.getVisibleVersion();

final String storageVaultName = tbl.getStorageVaultName();
boolean storageVaultIdSet = tbl.getStorageVaultId().isEmpty();

// short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
for (Map.Entry<Long, MaterializedIndex> entry : indexMap.entrySet()) {
long indexId = entry.getKey();
Expand Down Expand Up @@ -177,29 +179,11 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa
tbl.variantEnableFlattenNested());
requestBuilder.addTabletMetas(builder);
}
if (!storageVaultIdSet && ((CloudEnv) Env.getCurrentEnv()).getEnableStorageVault()) {
requestBuilder.setStorageVaultName(storageVaultName);
}
requestBuilder.setDbId(dbId);

LOG.info("create tablets, dbId: {}, tableId: {}, tableName: {}, partitionId: {}, partitionName: {}, "
+ "indexId: {}, vault name: {}",
dbId, tbl.getId(), tbl.getName(), partitionId, partitionName, indexId, storageVaultName);
Cloud.CreateTabletsResponse resp = sendCreateTabletsRpc(requestBuilder);
// If the resp has no vault id set, it means the MS is running with enable_storage_vault false
if (resp.hasStorageVaultId() && !storageVaultIdSet) {
tbl.setStorageVaultId(resp.getStorageVaultId());
storageVaultIdSet = true;
if (storageVaultName.isEmpty()) {
// If user doesn't specify the vault name for this table, we should set it
// to make the show create table stmt return correct stmt
// TODO(ByteYue): setDefaultStorageVault for vaultMgr might override user's
// defualt vault, maybe we should set it using show default storage vault stmt
tbl.setStorageVaultName(resp.getStorageVaultName());
Env.getCurrentEnv().getStorageVaultMgr().setDefaultStorageVault(
Pair.of(resp.getStorageVaultName(), resp.getStorageVaultId()));
}
}
LOG.info("create tablets dbId: {} tableId: {} tableName: {} partitionId: {} partitionName: {} "
+ "indexId: {} vaultId: {}",
dbId, tbl.getId(), tbl.getName(), partitionId, partitionName, indexId, tbl.getStorageVaultId());
sendCreateTabletsRpc(requestBuilder);
if (index.getId() != tbl.getBaseIndexId()) {
// add rollup index to partition
partition.createRollupIndex(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2746,7 +2746,6 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx
+ "' for storage vault '" + storageVaultName + "'");
}

olapTable.setStorageVaultName(storageVaultName);
storageVaultId = env.getStorageVaultMgr().getVaultIdByName(storageVaultName);
if (Strings.isNullOrEmpty(storageVaultId)) {
throw new DdlException("Storage vault '" + storageVaultName + "' does not exist. "
Expand Down
Loading

0 comments on commit b9e152e

Please sign in to comment.