From 5384f6342db7ad053e476dde26ce2d1ecfabfcdd Mon Sep 17 00:00:00 2001 From: zhaochangle Date: Thu, 16 Jan 2025 00:38:58 +0800 Subject: [PATCH] support drop dictonary data --- be/src/service/internal_service.cpp | 12 +--- be/src/vec/functions/dictionary_factory.h | 10 +-- .../java/org/apache/doris/common/Config.java | 4 ++ .../doris/datasource/InternalCatalog.java | 12 +++- .../doris/dictionary/DictionaryManager.java | 67 +++++++++++++++++-- .../doris/rpc/BackendServiceClient.java | 5 ++ .../apache/doris/rpc/BackendServiceProxy.java | 10 +++ gensrc/proto/internal_service.proto | 1 - 8 files changed, 97 insertions(+), 24 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index aa37172d4845e1..ea7b6e7f9a2dd7 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -2188,15 +2188,9 @@ void PInternalService::delete_dictionary(google::protobuf::RpcController* contro const PDeleteDictionaryRequest* request, PDeleteDictionaryResponse* response, google::protobuf::Closure* done) { - bool ret = _light_work_pool.try_offer([response, done, request]() { - brpc::ClosureGuard closure_guard(done); - Status st = ExecEnv::GetInstance()->dict_factory()->delete_dict(request->dictionary_id(), - request->version_id()); - st.to_protobuf(response->mutable_status()); - }); - if (!ret) { - offer_failed(response, done, _light_work_pool); - } + brpc::ClosureGuard closure_guard(done); + Status st = ExecEnv::GetInstance()->dict_factory()->delete_dict(request->dictionary_id()); + st.to_protobuf(response->mutable_status()); } } // namespace doris diff --git a/be/src/vec/functions/dictionary_factory.h b/be/src/vec/functions/dictionary_factory.h index f464f6c86494ef..7f23f0d67679da 100644 --- a/be/src/vec/functions/dictionary_factory.h +++ b/be/src/vec/functions/dictionary_factory.h @@ -69,20 +69,16 @@ class DictionaryFactory : private boost::noncopyable { return Status::OK(); } - Status delete_dict(int64_t dict_id, int64_t version_id) { + Status delete_dict(int64_t dict_id) { std::unique_lock lc(_mutex); if (!_dict_id_to_dict_map.contains(dict_id)) { - LOG_WARNING("DictionaryFactory Failed to delete dictionary") - .tag("dict_id", dict_id) - .tag("version_id", version_id); + LOG_WARNING("DictionaryFactory Failed to delete dictionary").tag("dict_id", dict_id); return Status::InvalidArgument( - "DictionaryFactory Failed to delete dictionary dict_id : {},version_id : {} ", - dict_id, version_id); + "DictionaryFactory Failed to delete dictionary dict_id : {}", dict_id); } auto dict = _dict_id_to_dict_map[dict_id]; LOG_INFO("DictionaryFactory Successfully delete dictionary") .tag("dict_id", dict_id) - .tag("version_id", version_id) .tag("dict name", dict->dict_name()); _dict_id_to_dict_map.erase(dict_id); _dict_id_to_version_id_map.erase(dict_id); diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 4a6833455560b3..c8890034a4cc02 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -3026,6 +3026,10 @@ public class Config extends ConfigBase { }) public static boolean enable_feature_data_sync_job = false; + @ConfField(mutable = true, masterOnly = true, description = {"字典删除时 RPC 的超时时间", + "Timeout of dictionary deletion RPC"}) + public static int dictionary_delete_rpc_timeout_ms = 5000; + //========================================================================== // begin of cloud config //========================================================================== diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index a6a0935870b4ef..9bd85e1fcdce9e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -567,7 +567,11 @@ public void dropDb(String dbName, boolean ifExists, boolean force) throws DdlExc fullNameToDb.remove(db.getFullName()); DropDbInfo info = new DropDbInfo(dbName, force, recycleTime); Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentEnv().getCurrentCatalog().getId(), db.getId()); - Env.getCurrentEnv().getDictionaryManager().dropDbDictionaries(dbName); + try { + Env.getCurrentEnv().getDictionaryManager().dropDbDictionaries(dbName); + } catch (AnalysisException e) { + // acceptable. drop failed will change its status and plan to do again later. + } Env.getCurrentEnv().getEditLog().logDropDb(info); } finally { unlock(); @@ -1041,7 +1045,11 @@ public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop, Env.getCurrentEnv().getMtmvService().deregisterMTMV((MTMV) table); } Env.getCurrentEnv().getAnalysisManager().removeTableStats(table.getId()); - Env.getCurrentEnv().getDictionaryManager().dropTableDictionaries(db.getName(), table.getName()); + try { + Env.getCurrentEnv().getDictionaryManager().dropTableDictionaries(db.getName(), table.getName()); + } catch (AnalysisException e) { + // acceptable. drop failed will change its status and plan to do again later. + } Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentInternalCatalog().getId(), db.getId(), table.getId()); db.unregisterTable(table.getName()); StopWatch watch = StopWatch.createStarted(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/dictionary/DictionaryManager.java b/fe/fe-core/src/main/java/org/apache/doris/dictionary/DictionaryManager.java index 46212346c42be5..1518657d8e7728 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/dictionary/DictionaryManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/dictionary/DictionaryManager.java @@ -19,7 +19,10 @@ import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.common.Status; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.MasterDaemon; @@ -33,8 +36,12 @@ import org.apache.doris.persist.DictionaryIncreaseVersionInfo; import org.apache.doris.persist.DropDictionaryPersistInfo; import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.proto.InternalService; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.ArrayListMultimap; @@ -48,10 +55,14 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; /** * Manager for dictionary operations, including creation, deletion, and data loading. @@ -145,9 +156,10 @@ public Dictionary createDictionary(ConnectContext ctx, CreateDictionaryInfo info * Delete a dictionary. * * @throws DdlException if the dictionary does not exist + * @throws AnalysisException */ public void dropDictionary(ConnectContext ctx, String dbName, String dictName, boolean ifExists) - throws DdlException { + throws DdlException, AnalysisException { lockWrite(); Dictionary dic = null; try { @@ -175,8 +187,11 @@ public void dropDictionary(ConnectContext ctx, String dbName, String dictName, b /** * Drop all dictionaries in a table. Used when dropping a table. So maybe no db or table records. + * + * @throws AnalysisException */ - public void dropTableDictionaries(String dbName, String tableName) { + public void dropTableDictionaries(String dbName, String tableName) + throws AnalysisException { lockWrite(); List droppedDictionaries = Lists.newArrayList(); try { @@ -207,8 +222,10 @@ public void dropTableDictionaries(String dbName, String tableName) { /** * Drop all dictionaries in a database. Used when dropping a database. + * + * @throws AnalysisException */ - public void dropDbDictionaries(String dbName) { + public void dropDbDictionaries(String dbName) throws AnalysisException { lockWrite(); List droppedDictionaries = Lists.newArrayList(); try { @@ -310,8 +327,48 @@ public void scheduleDataLoad(ConnectContext ctx, Dictionary dictionary) throws E } } - public void scheduleDataUnload(ConnectContext ctx, Dictionary dictionary) { - // TODO: maybe here we dont need a query. just a special RPC is ok. + public void scheduleDataUnload(ConnectContext ctx, Dictionary dictionary) + throws AnalysisException { + List aliveBes = new ArrayList<>(); + try { + aliveBes = Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().stream() + .filter(be -> be.isAlive()).collect(Collectors.toList()); + } catch (AnalysisException e) { + // actually it wont throw. + } + // get all alive BEs and send rpc. + List> futureList = new ArrayList<>(); + for (Backend be : aliveBes) { + if (!be.isAlive()) { + continue; + } + final InternalService.PDeleteDictionaryRequest request = InternalService.PDeleteDictionaryRequest + .newBuilder().setDictionaryId(dictionary.getId()).build(); + Future response = BackendServiceProxy.getInstance() + .deleteDictionaryAsync(be.getBrpcAddress(), Config.dictionary_delete_rpc_timeout_ms, request); + futureList.add(response); + } + // wait all responses. if succeed, delete dictionary. + for (Future future : futureList) { + if (future == null) { + continue; + } + try { + InternalService.PDeleteDictionaryResponse response = future.get(Config.dictionary_delete_rpc_timeout_ms, + TimeUnit.SECONDS); + if (response.hasStatus()) { + Status status = new Status(response.getStatus()); + if (status.getErrorCode() != TStatusCode.OK) { + LOG.warn("Failed to delete dictionary " + dictionary.getName() + " on be " + + status.getErrorMsg()); + dictionary.setStatus(Dictionary.DictionaryStatus.REMOVING); + } + } + } catch (Throwable t) { + LOG.warn("Failed to delete dictionary " + dictionary.getName(), t); + throw new AnalysisException("Failed to delete dictionary " + dictionary.getName()); + } + } } public void replayCreateDictionary(CreateDictionaryPersistInfo info) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index 54c5e68144c57c..193ade9ca266d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -193,6 +193,11 @@ public Future getBeResource(InternalServ return stub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS).getBeResource(request); } + public Future deleteDictionary( + InternalService.PDeleteDictionaryRequest request, int timeoutSec) { + return stub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS).deleteDictionary(request); + } + public void shutdown() { ConnectivityState state = channel.getState(false); LOG.warn("shut down backend service client: {}, channel state: {}", address, state); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 053a7428b524cd..13aea9ecf71b4e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -569,4 +569,14 @@ public Future getBeResourceAsync(TNetwor return null; } + public Future deleteDictionaryAsync(TNetworkAddress address, + int timeoutSec, InternalService.PDeleteDictionaryRequest request) { + try { + final BackendServiceClient client = getProxy(address); + return client.deleteDictionary(request, timeoutSec); + } catch (Throwable e) { + LOG.warn("delete dictionary failed, address={}:{}", address.getHostname(), address.getPort(), e); + } + return null; + } } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index ea660b2ac2ec78..9f086c34c651ce 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -991,7 +991,6 @@ message PGetBeResourceResponse { message PDeleteDictionaryRequest { optional int64 dictionary_id = 1; - optional int64 version_id = 2; } message PDeleteDictionaryResponse {