Skip to content

Commit

Permalink
support drop dictonary data
Browse files Browse the repository at this point in the history
  • Loading branch information
zclllyybb committed Jan 15, 2025
1 parent 05898bd commit 5384f63
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 24 deletions.
12 changes: 3 additions & 9 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2188,15 +2188,9 @@ void PInternalService::delete_dictionary(google::protobuf::RpcController* contro
const PDeleteDictionaryRequest* request,
PDeleteDictionaryResponse* response,
google::protobuf::Closure* done) {
bool ret = _light_work_pool.try_offer([response, done, request]() {
brpc::ClosureGuard closure_guard(done);
Status st = ExecEnv::GetInstance()->dict_factory()->delete_dict(request->dictionary_id(),
request->version_id());
st.to_protobuf(response->mutable_status());
});
if (!ret) {
offer_failed(response, done, _light_work_pool);
}
brpc::ClosureGuard closure_guard(done);
Status st = ExecEnv::GetInstance()->dict_factory()->delete_dict(request->dictionary_id());
st.to_protobuf(response->mutable_status());
}

} // namespace doris
10 changes: 3 additions & 7 deletions be/src/vec/functions/dictionary_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,16 @@ class DictionaryFactory : private boost::noncopyable {
return Status::OK();
}

Status delete_dict(int64_t dict_id, int64_t version_id) {
Status delete_dict(int64_t dict_id) {
std::unique_lock lc(_mutex);
if (!_dict_id_to_dict_map.contains(dict_id)) {
LOG_WARNING("DictionaryFactory Failed to delete dictionary")
.tag("dict_id", dict_id)
.tag("version_id", version_id);
LOG_WARNING("DictionaryFactory Failed to delete dictionary").tag("dict_id", dict_id);
return Status::InvalidArgument(
"DictionaryFactory Failed to delete dictionary dict_id : {},version_id : {} ",
dict_id, version_id);
"DictionaryFactory Failed to delete dictionary dict_id : {}", dict_id);
}
auto dict = _dict_id_to_dict_map[dict_id];
LOG_INFO("DictionaryFactory Successfully delete dictionary")
.tag("dict_id", dict_id)
.tag("version_id", version_id)
.tag("dict name", dict->dict_name());
_dict_id_to_dict_map.erase(dict_id);
_dict_id_to_version_id_map.erase(dict_id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3026,6 +3026,10 @@ public class Config extends ConfigBase {
})
public static boolean enable_feature_data_sync_job = false;

@ConfField(mutable = true, masterOnly = true, description = {"字典删除时 RPC 的超时时间",
"Timeout of dictionary deletion RPC"})
public static int dictionary_delete_rpc_timeout_ms = 5000;

//==========================================================================
// begin of cloud config
//==========================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,11 @@ public void dropDb(String dbName, boolean ifExists, boolean force) throws DdlExc
fullNameToDb.remove(db.getFullName());
DropDbInfo info = new DropDbInfo(dbName, force, recycleTime);
Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentEnv().getCurrentCatalog().getId(), db.getId());
Env.getCurrentEnv().getDictionaryManager().dropDbDictionaries(dbName);
try {
Env.getCurrentEnv().getDictionaryManager().dropDbDictionaries(dbName);
} catch (AnalysisException e) {
// acceptable. drop failed will change its status and plan to do again later.
}
Env.getCurrentEnv().getEditLog().logDropDb(info);
} finally {
unlock();
Expand Down Expand Up @@ -1041,7 +1045,11 @@ public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop,
Env.getCurrentEnv().getMtmvService().deregisterMTMV((MTMV) table);
}
Env.getCurrentEnv().getAnalysisManager().removeTableStats(table.getId());
Env.getCurrentEnv().getDictionaryManager().dropTableDictionaries(db.getName(), table.getName());
try {
Env.getCurrentEnv().getDictionaryManager().dropTableDictionaries(db.getName(), table.getName());
} catch (AnalysisException e) {
// acceptable. drop failed will change its status and plan to do again later.
}
Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentInternalCatalog().getId(), db.getId(), table.getId());
db.unregisterTable(table.getName());
StopWatch watch = StopWatch.createStarted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Status;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.MasterDaemon;
Expand All @@ -33,8 +36,12 @@
import org.apache.doris.persist.DictionaryIncreaseVersionInfo;
import org.apache.doris.persist.DropDictionaryPersistInfo;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.ArrayListMultimap;
Expand All @@ -48,10 +55,14 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

/**
* Manager for dictionary operations, including creation, deletion, and data loading.
Expand Down Expand Up @@ -145,9 +156,10 @@ public Dictionary createDictionary(ConnectContext ctx, CreateDictionaryInfo info
* Delete a dictionary.
*
* @throws DdlException if the dictionary does not exist
* @throws AnalysisException
*/
public void dropDictionary(ConnectContext ctx, String dbName, String dictName, boolean ifExists)
throws DdlException {
throws DdlException, AnalysisException {
lockWrite();
Dictionary dic = null;
try {
Expand Down Expand Up @@ -175,8 +187,11 @@ public void dropDictionary(ConnectContext ctx, String dbName, String dictName, b

/**
* Drop all dictionaries in a table. Used when dropping a table. So maybe no db or table records.
*
* @throws AnalysisException
*/
public void dropTableDictionaries(String dbName, String tableName) {
public void dropTableDictionaries(String dbName, String tableName)
throws AnalysisException {
lockWrite();
List<Dictionary> droppedDictionaries = Lists.newArrayList();
try {
Expand Down Expand Up @@ -207,8 +222,10 @@ public void dropTableDictionaries(String dbName, String tableName) {

/**
* Drop all dictionaries in a database. Used when dropping a database.
*
* @throws AnalysisException
*/
public void dropDbDictionaries(String dbName) {
public void dropDbDictionaries(String dbName) throws AnalysisException {
lockWrite();
List<Dictionary> droppedDictionaries = Lists.newArrayList();
try {
Expand Down Expand Up @@ -310,8 +327,48 @@ public void scheduleDataLoad(ConnectContext ctx, Dictionary dictionary) throws E
}
}

public void scheduleDataUnload(ConnectContext ctx, Dictionary dictionary) {
// TODO: maybe here we dont need a query. just a special RPC is ok.
public void scheduleDataUnload(ConnectContext ctx, Dictionary dictionary)
throws AnalysisException {
List<Backend> aliveBes = new ArrayList<>();
try {
aliveBes = Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().stream()
.filter(be -> be.isAlive()).collect(Collectors.toList());
} catch (AnalysisException e) {
// actually it wont throw.
}
// get all alive BEs and send rpc.
List<Future<InternalService.PDeleteDictionaryResponse>> futureList = new ArrayList<>();
for (Backend be : aliveBes) {
if (!be.isAlive()) {
continue;
}
final InternalService.PDeleteDictionaryRequest request = InternalService.PDeleteDictionaryRequest
.newBuilder().setDictionaryId(dictionary.getId()).build();
Future<InternalService.PDeleteDictionaryResponse> response = BackendServiceProxy.getInstance()
.deleteDictionaryAsync(be.getBrpcAddress(), Config.dictionary_delete_rpc_timeout_ms, request);
futureList.add(response);
}
// wait all responses. if succeed, delete dictionary.
for (Future<InternalService.PDeleteDictionaryResponse> future : futureList) {
if (future == null) {
continue;
}
try {
InternalService.PDeleteDictionaryResponse response = future.get(Config.dictionary_delete_rpc_timeout_ms,
TimeUnit.SECONDS);
if (response.hasStatus()) {
Status status = new Status(response.getStatus());
if (status.getErrorCode() != TStatusCode.OK) {
LOG.warn("Failed to delete dictionary " + dictionary.getName() + " on be "
+ status.getErrorMsg());
dictionary.setStatus(Dictionary.DictionaryStatus.REMOVING);
}
}
} catch (Throwable t) {
LOG.warn("Failed to delete dictionary " + dictionary.getName(), t);
throw new AnalysisException("Failed to delete dictionary " + dictionary.getName());
}
}
}

public void replayCreateDictionary(CreateDictionaryPersistInfo info) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ public Future<InternalService.PGetBeResourceResponse> getBeResource(InternalServ
return stub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS).getBeResource(request);
}

public Future<InternalService.PDeleteDictionaryResponse> deleteDictionary(
InternalService.PDeleteDictionaryRequest request, int timeoutSec) {
return stub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS).deleteDictionary(request);
}

public void shutdown() {
ConnectivityState state = channel.getState(false);
LOG.warn("shut down backend service client: {}, channel state: {}", address, state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,4 +569,14 @@ public Future<InternalService.PGetBeResourceResponse> getBeResourceAsync(TNetwor
return null;
}

public Future<InternalService.PDeleteDictionaryResponse> deleteDictionaryAsync(TNetworkAddress address,
int timeoutSec, InternalService.PDeleteDictionaryRequest request) {
try {
final BackendServiceClient client = getProxy(address);
return client.deleteDictionary(request, timeoutSec);
} catch (Throwable e) {
LOG.warn("delete dictionary failed, address={}:{}", address.getHostname(), address.getPort(), e);
}
return null;
}
}
1 change: 0 additions & 1 deletion gensrc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,6 @@ message PGetBeResourceResponse {

message PDeleteDictionaryRequest {
optional int64 dictionary_id = 1;
optional int64 version_id = 2;
}

message PDeleteDictionaryResponse {
Expand Down

0 comments on commit 5384f63

Please sign in to comment.