Skip to content

Commit

Permalink
[fix](vault) Fix bugs about altering storage vault name
Browse files Browse the repository at this point in the history
* fix altering storage name but not writing disk in meta-service
* check vault if existed when altering stoarge vault name
  • Loading branch information
SWJTU-ZhangLei committed Dec 25, 2024
1 parent b5644fd commit 8db1a5a
Show file tree
Hide file tree
Showing 12 changed files with 350 additions and 75 deletions.
57 changes: 37 additions & 20 deletions cloud/src/meta-service/meta_service_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,9 +523,18 @@ static void set_default_vault_log_helper(const InstanceInfoPB& instance,
LOG(INFO) << vault_msg;
}

static int alter_hdfs_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Transaction> txn,
static bool vault_exist(const InstanceInfoPB& instance, const std::string& new_vault_name) {
for (auto& name : instance.storage_vault_names()) {
if (new_vault_name == name) {
return true;
}
}
return false;
}

static int alter_hdfs_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Transaction>& txn,
const StorageVaultPB& vault, MetaServiceCode& code,
std::string& msg) {
std::string& msg, AlterObjStoreInfoResponse* response) {
if (!vault.has_hdfs_info()) {
code = MetaServiceCode::INVALID_ARGUMENT;
std::stringstream ss;
Expand Down Expand Up @@ -591,6 +600,13 @@ static int alter_hdfs_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Tr
msg = ss.str();
return -1;
}

if (vault_exist(instance, vault.alter_name())) {
code = MetaServiceCode::ALREADY_EXISTED;
msg = fmt::format("vault_name={} already existed", vault.alter_name());
return -1;
}

new_vault.set_name(vault.alter_name());
*name_itr = vault.alter_name();
}
Expand Down Expand Up @@ -623,19 +639,16 @@ static int alter_hdfs_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Tr
txn->put(vault_key, val);
LOG(INFO) << "put vault_id=" << vault_id << ", vault_key=" << hex(vault_key)
<< ", origin vault=" << origin_vault_info << ", new_vault=" << new_vault_info;
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
msg = fmt::format("failed to commit kv txn, err={}", err);
LOG(WARNING) << msg;
}

DCHECK_EQ(new_vault.id(), vault_id);
response->set_storage_vault_id(new_vault.id());
response->set_storage_vault_name(new_vault.name());
return 0;
}

static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Transaction> txn,
static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Transaction>& txn,
const StorageVaultPB& vault, MetaServiceCode& code,
std::string& msg) {
std::string& msg, AlterObjStoreInfoResponse* response) {
if (!vault.has_obj_info()) {
code = MetaServiceCode::INVALID_ARGUMENT;
std::stringstream ss;
Expand Down Expand Up @@ -708,6 +721,13 @@ static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Tran
msg = ss.str();
return -1;
}

if (vault_exist(instance, vault.alter_name())) {
code = MetaServiceCode::ALREADY_EXISTED;
msg = fmt::format("vault_name={} already existed", vault.alter_name());
return -1;
}

new_vault.set_name(vault.alter_name());
*name_itr = vault.alter_name();
}
Expand Down Expand Up @@ -747,13 +767,10 @@ static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Tran
txn->put(vault_key, val);
LOG(INFO) << "put vault_id=" << vault_id << ", vault_key=" << hex(vault_key)
<< ", origin vault=" << origin_vault_info << ", new vault=" << new_vault_info;
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
msg = fmt::format("failed to commit kv txn, err={}", err);
LOG(WARNING) << msg;
}

DCHECK_EQ(new_vault.id(), vault_id);
response->set_storage_vault_id(new_vault.id());
response->set_storage_vault_name(new_vault.name());
return 0;
}

Expand Down Expand Up @@ -1100,12 +1117,12 @@ void MetaServiceImpl::alter_storage_vault(google::protobuf::RpcController* contr
break;
}
case AlterObjStoreInfoRequest::ALTER_S3_VAULT: {
alter_s3_storage_vault(instance, std::move(txn), request->vault(), code, msg);
return;
alter_s3_storage_vault(instance, txn, request->vault(), code, msg, response);
break;
}
case AlterObjStoreInfoRequest::ALTER_HDFS_VAULT: {
alter_hdfs_storage_vault(instance, std::move(txn), request->vault(), code, msg);
return;
alter_hdfs_storage_vault(instance, txn, request->vault(), code, msg, response);
break;
}
case AlterObjStoreInfoRequest::DROP_S3_VAULT:
[[fallthrough]];
Expand Down
5 changes: 4 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -3725,7 +3725,10 @@ private static void addOlapTablePropertyInfo(OlapTable olapTable, StringBuilder
}

// Storage Vault
if (!olapTable.getStorageVaultName().isEmpty()) {
if (!Strings.isNullOrEmpty(olapTable.getStorageVaultId())) {
sb.append(",\n\"").append(PropertyAnalyzer
.PROPERTIES_STORAGE_VAULT_ID).append("\" = \"");
sb.append(olapTable.getStorageVaultId()).append("\"");
sb.append(",\n\"").append(PropertyAnalyzer
.PROPERTIES_STORAGE_VAULT_NAME).append("\" = \"");
sb.append(olapTable.getStorageVaultName()).append("\"");
Expand Down
20 changes: 8 additions & 12 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -265,22 +265,18 @@ public void setIsBeingSynced(boolean isBeingSynced) {
String.valueOf(isBeingSynced));
}

public void setStorageVaultName(String storageVaultName) throws DdlException {
if (storageVaultName == null || storageVaultName.isEmpty()) {
return;
}
getOrCreatTableProperty().setStorageVaultName(storageVaultName);
}

public String getStorageVaultName() {
return getOrCreatTableProperty().getStorageVaultName();
if (Strings.isNullOrEmpty(getStorageVaultId())) {
return "";
}
return Env.getCurrentEnv().getStorageVaultMgr().getVaultNameById(getStorageVaultId());
}

public void setStorageVaultId(String setStorageVaultId) throws DdlException {
if (setStorageVaultId == null || setStorageVaultId.isEmpty()) {
throw new DdlException("Invalid Storage Vault, please set one useful storage vault");
public void setStorageVaultId(String storageVaultId) throws DdlException {
if (Strings.isNullOrEmpty(storageVaultId)) {
throw new DdlException("Invalid storage vault id, please set an available storage vault");
}
getOrCreatTableProperty().setStorageVaultId(setStorageVaultId);
getOrCreatTableProperty().setStorageVaultId(storageVaultId);
}

public String getStorageVaultId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public abstract class StorageVault {
public static final String LOWER_CASE_META_NAMES = "lower_case_meta_names";
public static final String META_NAMES_MAPPING = "meta_names_mapping";

public static final String VAULT_NAME = "VAULT_NAME";

public enum StorageVaultType {
UNKNOWN,
S3,
Expand All @@ -60,7 +62,6 @@ public static StorageVaultType fromString(String storageVaultTypeType) {
}
}

protected static final String VAULT_NAME = "VAULT_NAME";
protected String name;
protected StorageVaultType type;
protected String id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.doris.thrift.TNetworkAddress;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -80,12 +82,42 @@ public void refreshVaultMap(Map<String, String> vaultMap) {
rwLock.writeLock().unlock();
}

public String getVaultIdByName(String name) {
String vaultId;
rwLock.readLock().lock();
vaultId = vaultNameToVaultId.getOrDefault(name, "");
rwLock.readLock().unlock();
return vaultId;
public String getVaultIdByName(String vaultName) {
try {
rwLock.readLock().lock();
return vaultNameToVaultId.getOrDefault(vaultName, "");
} finally {
rwLock.readLock().unlock();
}
}

public String getVaultNameById(String vaultId) {
try {
rwLock.readLock().lock();
for (Map.Entry<String, String> entry : vaultNameToVaultId.entrySet()) {
if (entry.getValue().equals(vaultId)) {
return entry.getKey();
}
}
return "";
} finally {
rwLock.readLock().unlock();
}
}

private void updateVaultNameToIdCache(String oldVaultName, String newVaultName, String vaultId) {
try {
rwLock.writeLock().lock();
String cachedVaultId = vaultNameToVaultId.get(oldVaultName);
vaultNameToVaultId.remove(oldVaultName);
Preconditions.checkArgument(!Strings.isNullOrEmpty(cachedVaultId), cachedVaultId,
"Cached vault id is null or empty");
Preconditions.checkArgument(cachedVaultId.equals(vaultId),
"Cached vault id not equal to remote storage." + cachedVaultId + " - " + vaultId);
vaultNameToVaultId.put(newVaultName, vaultId);
} finally {
rwLock.writeLock().unlock();
}
}

private Cloud.StorageVaultPB.Builder buildAlterS3VaultRequest(Map<String, String> properties, String name)
Expand Down Expand Up @@ -166,8 +198,10 @@ public void alterStorageVault(StorageVaultType type, Map<String, String> propert
LOG.warn("failed to alter storage vault response: {} ", response);
throw new DdlException(response.getStatus().getMsg());
}
LOG.info("Succeed to alter storage vault {}, id {}, origin default vault replaced {}",
name, response.getStorageVaultId(), response.getDefaultStorageVaultReplaced());

updateVaultNameToIdCache(name, response.getStorageVaultName(), response.getStorageVaultId());
LOG.info("Succeed to alter storage vault, old name:{} new name: {} id:{}", name,
response.getStorageVaultName(), response.getStorageVaultId());

// Make BE eagerly fetch the storage vault info from Meta Service
ALTER_BE_SYNC_THREAD_POOL.execute(() -> alterSyncVaultTask());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,10 +793,6 @@ public String getStorageVaultName() {
return properties.getOrDefault(PropertyAnalyzer.PROPERTIES_STORAGE_VAULT_NAME, "");
}

public void setStorageVaultName(String storageVaultName) {
properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_VAULT_NAME, storageVaultName);
}

public String getPropertiesString() throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.proto.OlapCommon;
import org.apache.doris.proto.OlapFile;
Expand Down Expand Up @@ -106,6 +105,12 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa
throws DdlException {
// create base index first.
Preconditions.checkArgument(tbl.getBaseIndexId() != -1);

if (((CloudEnv) Env.getCurrentEnv()).getEnableStorageVault()) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(tbl.getStorageVaultId()),
"Storage vault id is null or empty");
}

MaterializedIndex baseIndex = new MaterializedIndex(tbl.getBaseIndexId(), IndexState.NORMAL);

LOG.info("begin create cloud partition");
Expand All @@ -129,9 +134,6 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa

long version = partition.getVisibleVersion();

final String storageVaultName = tbl.getStorageVaultName();
boolean storageVaultIdSet = tbl.getStorageVaultId().isEmpty();

// short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
for (Map.Entry<Long, MaterializedIndex> entry : indexMap.entrySet()) {
long indexId = entry.getKey();
Expand Down Expand Up @@ -184,29 +186,11 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa
tbl.storagePageSize());
requestBuilder.addTabletMetas(builder);
}
if (!storageVaultIdSet && ((CloudEnv) Env.getCurrentEnv()).getEnableStorageVault()) {
requestBuilder.setStorageVaultName(storageVaultName);
}
requestBuilder.setDbId(dbId);

LOG.info("create tablets, dbId: {}, tableId: {}, tableName: {}, partitionId: {}, partitionName: {}, "
+ "indexId: {}, vault name: {}",
dbId, tbl.getId(), tbl.getName(), partitionId, partitionName, indexId, storageVaultName);
Cloud.CreateTabletsResponse resp = sendCreateTabletsRpc(requestBuilder);
// If the resp has no vault id set, it means the MS is running with enable_storage_vault false
if (resp.hasStorageVaultId() && !storageVaultIdSet) {
tbl.setStorageVaultId(resp.getStorageVaultId());
storageVaultIdSet = true;
if (storageVaultName.isEmpty()) {
// If user doesn't specify the vault name for this table, we should set it
// to make the show create table stmt return correct stmt
// TODO(ByteYue): setDefaultStorageVault for vaultMgr might override user's
// defualt vault, maybe we should set it using show default storage vault stmt
tbl.setStorageVaultName(resp.getStorageVaultName());
Env.getCurrentEnv().getStorageVaultMgr().setDefaultStorageVault(
Pair.of(resp.getStorageVaultName(), resp.getStorageVaultId()));
}
}
LOG.info("create tablets dbId: {} tableId: {} tableName: {} partitionId: {} partitionName: {} "
+ "indexId: {} vaultId: {}",
dbId, tbl.getId(), tbl.getName(), partitionId, partitionName, indexId, tbl.getStorageVaultId());
sendCreateTabletsRpc(requestBuilder);
if (index.getId() != tbl.getBaseIndexId()) {
// add rollup index to partition
partition.createRollupIndex(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2771,7 +2771,6 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx
+ "' for storage vault '" + storageVaultName + "'");
}

olapTable.setStorageVaultName(storageVaultName);
storageVaultId = env.getStorageVaultMgr().getVaultIdByName(storageVaultName);
if (Strings.isNullOrEmpty(storageVaultId)) {
throw new DdlException("Storage vault '" + storageVaultName + "' does not exist. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import org.apache.doris.catalog.StorageVault;
import org.apache.doris.catalog.StorageVault.StorageVaultType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;

import com.google.common.base.Preconditions;

import java.util.Map;

/**
Expand All @@ -48,6 +51,13 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
if (vaultType == StorageVault.StorageVaultType.UNKNOWN) {
throw new AnalysisException("Unsupported Storage Vault type: " + type);
}

FeNameFormat.checkStorageVaultName(name);
if (properties.containsKey(StorageVault.VAULT_NAME)) {
String newName = properties.get(StorageVault.VAULT_NAME);
FeNameFormat.checkStorageVaultName(newName);
Preconditions.checkArgument(!name.equalsIgnoreCase(newName), "vault name no change");
}
Env.getCurrentEnv().getStorageVaultMgr().alterStorageVault(vaultType, properties, name);
}

Expand Down
3 changes: 3 additions & 0 deletions gensrc/proto/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,9 @@ message AlterObjStoreInfoResponse {
optional MetaServiceResponseStatus status = 1;
optional string storage_vault_id = 2;
optional bool default_storage_vault_replaced = 3;

// storage_vault_name maybe changed, so return new storage_vault_name
optional string storage_vault_name = 4;
}

message UpdateAkSkRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -813,11 +813,11 @@ class Suite implements GroovyInterceptable {
return randomBoolean ? "true" : "false"
}

void expectExceptionLike(Closure userFunction, String errorMessage = null) {
void expectExceptionLike(Closure userFunction, String errMsg = null) {
try {
userFunction()
} catch (Exception e) {
if (!e.getMessage().contains(errorMessage)) {
if (!Strings.isNullOrEmpty(errMsg) && !e.getMessage().contains(errMsg)) {
throw e
}
}
Expand Down
Loading

0 comments on commit 8db1a5a

Please sign in to comment.