Skip to content

Commit

Permalink
plan
Browse files Browse the repository at this point in the history
  • Loading branch information
zclllyybb committed Jan 12, 2025
1 parent f269a7f commit 0f82df0
Show file tree
Hide file tree
Showing 44 changed files with 1,424 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ supportedRefreshStatement
: REFRESH CATALOG name=identifier propertyClause? #refreshCatalog
| REFRESH DATABASE name=multipartIdentifier propertyClause? #refreshDatabase
| REFRESH TABLE name=multipartIdentifier #refreshTable
| REFRESH DICTIONARY name=multipartIdentifier #refreshDictionary
;

supportedCleanStatement
Expand Down
3 changes: 2 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public abstract class Table extends MetaObject implements Writable, TableIf, Gso
// check read lock leaky
private Map<Long, String> readLockThreads = null;

// gson deserialization will call this at first by derived classes' non-parametered constructor.
public Table(TableType type) {
this.type = type;
this.fullSchema = Lists.newArrayList();
Expand All @@ -157,7 +158,7 @@ public Table(long id, String tableName, TableType type, List<Column> fullSchema)
}
} else {
// Only view in with-clause have null base
Preconditions.checkArgument(type == TableType.VIEW || type == TableType.DICTIONARY, "Table has no columns");
Preconditions.checkArgument(type == TableType.VIEW, "Table has no columns");
}
this.rwLock = new MonitoredReentrantReadWriteLock(true);
this.createTime = Instant.now().getEpochSecond();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,48 +17,52 @@

package org.apache.doris.dictionary;

import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.io.Text;
import org.apache.doris.nereids.trees.plans.commands.info.CreateDictionaryInfo;
import org.apache.doris.nereids.trees.plans.commands.info.DictionaryColumnDefinition;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TDictionaryTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Dictionary metadata, including its structure and data source information. saved in
*/
public class Dictionary extends Table {

// TODO: dictionary should also be able to be created in external catalog.
@SerializedName(value = "dbName")
private final String dbName;

// dict name use base class's name

// dict name use base class's name. these 3 is expected not to be null
@SerializedName(value = "sourceCtlName")
private final String sourceCtlName;

@SerializedName(value = "sourceDbName")
private final String sourceDbName;

@SerializedName(value = "sourceTableName")
private final String sourceTableName;

@SerializedName(value = "columns")
private final List<DictionaryColumnDefinition> columns;

@SerializedName(value = "properties")
private final Map<String, String> properties;

// createTime saved in base class

@SerializedName(value = "lastUpdateTime")
private long lastUpdateTime;

Expand All @@ -68,18 +72,32 @@ public enum DictionaryStatus {
OUT_OF_DATE, // wait load task be scheduled
REMOVING; // wait unload task be scheduled and finish
}

@SerializedName(value = "status")
private DictionaryStatus status;

@SerializedName(value = "layout")
private final LayoutType layout;

@SerializedName(value = "version")
private long version; // every time dictionary is updated, version will increase by 1

// we need this to call Table's constructor with no args which construct new rwLock and more.
// for gson only and it will set variables soon. so no need to set them.
public Dictionary() {
super(TableType.DICTIONARY);
this.dbName = null;
this.sourceCtlName = null;
this.sourceDbName = null;
this.sourceTableName = null;
this.columns = null;
this.properties = null;
this.lastUpdateTime = 0;
this.status = null;
this.layout = null;
this.version = 0;
}

public Dictionary(CreateDictionaryInfo info, long uniqueId) {
super(uniqueId, info.getDictName(), TableType.DICTIONARY, null);
super(uniqueId, info.getDictName(), TableType.DICTIONARY, /* source table's schema as dict's FullSchema */info
.getColumns().stream().map(DictionaryColumnDefinition::getOriginColumn).collect(Collectors.toList()));
this.dbName = info.getDbName();
this.sourceCtlName = info.getSourceCtlName();
this.sourceDbName = info.getSourceDbName();
Expand Down Expand Up @@ -112,6 +130,32 @@ public List<DictionaryColumnDefinition> getDicColumns() {
return columns;
}

public List<String> getSourceQualifiedName() {
List<String> qualifiedName = Lists.newArrayList();
if (Strings.isNullOrEmpty(sourceCtlName) || Strings.isNullOrEmpty(sourceDbName)
|| Strings.isNullOrEmpty(sourceTableName)) {
throw new IllegalArgumentException("dictionary's source name is not completed");
}
qualifiedName.add(sourceCtlName);
qualifiedName.add(sourceDbName);
qualifiedName.add(sourceTableName);
return qualifiedName;
}

@Override
public Database getDatabase() {
return Env.getCurrentInternalCatalog().getDbNullable(dbName);
}

@Override
public List<String> getFullQualifiers() {
return ImmutableList.of(Env.getCurrentEnv().getInternalCatalog().getName(), dbName, getName());
}

public List<String> getColumnNames() {
return columns.stream().map(DictionaryColumnDefinition::getName).collect(Collectors.toList());
}

public DataType getColumnType(String columnName) {
for (DictionaryColumnDefinition column : columns) {
if (column.getName().equalsIgnoreCase(columnName)) {
Expand All @@ -125,6 +169,10 @@ public Map<String, String> getProperties() {
return properties;
}

public void increaseVersion() {
this.version++;
}

public long getVersion() {
return version;
}
Expand All @@ -133,8 +181,8 @@ public long getLastUpdateTime() {
return lastUpdateTime;
}

public void setLastUpdateTime(long lastUpdateTime) {
this.lastUpdateTime = lastUpdateTime;
public void updateLastUpdateTime() {
this.lastUpdateTime = System.currentTimeMillis();
}

public DictionaryStatus getStatus() {
Expand All @@ -149,6 +197,16 @@ public LayoutType getLayout() {
return layout;
}

@Override
public TTableDescriptor toThrift() {
TDictionaryTable tDictionaryTable = new TDictionaryTable(getName());
// TODO: use this to replace TDictionarySink.name
TTableDescriptor tTableDescriptor = new TTableDescriptor(id, TTableType.DICTIONARY_TABLE, fullSchema.size(), 0,
getName(), dbName);
tTableDescriptor.setDictionaryTable(tDictionaryTable);
return tTableDescriptor;
}

@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,23 @@
package org.apache.doris.dictionary;

import org.apache.commons.compress.utils.Lists;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.job.extensions.insert.InsertTask;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.commands.info.CreateDictionaryInfo;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoDictionaryCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.persist.CreateDictionaryPersistInfo;
import org.apache.doris.persist.DropDictionaryPersistInfo;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
Expand All @@ -41,6 +49,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
Expand All @@ -49,6 +58,8 @@
public class DictionaryManager extends MasterDaemon implements Writable {
private static final Logger LOG = LogManager.getLogger(DictionaryManager.class);

private static final long jobId = -493209151411825L;

// Lock for protecting dictionaries map
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);

Expand Down Expand Up @@ -95,10 +106,10 @@ public void unlockWrite() {

/**
* Create a new dictionary based on the provided info.
*
* @throws DdlException if the dictionary already exists and ifNotExists is false
*
* @throws Exception
*/
public Dictionary createDictionary(CreateDictionaryInfo info) throws DdlException {
public Dictionary createDictionary(ConnectContext ctx, CreateDictionaryInfo info) throws Exception {
// 1. Check if dictionary already exists
if (hasDictionary(info.getDbName(), info.getDictName())) {
if (info.isIfNotExists()) {
Expand Down Expand Up @@ -129,7 +140,7 @@ public Dictionary createDictionary(CreateDictionaryInfo info) throws DdlExceptio
}
// The data in BE doesn't always have the same situation with FE(think BE restart). So the data load don't need
// transaction safety. It rely on periodic check and update.
scheduleDataLoad(dictionary);
scheduleDataLoad(ctx, dictionary);

return dictionary;
}
Expand All @@ -139,7 +150,8 @@ public Dictionary createDictionary(CreateDictionaryInfo info) throws DdlExceptio
*
* @throws DdlException if the dictionary does not exist
*/
public void dropDictionary(String dbName, String dictName, boolean ifExists) throws DdlException {
public void dropDictionary(ConnectContext ctx, String dbName, String dictName, boolean ifExists)
throws DdlException {
lockWrite();
Dictionary dic = null;
try {
Expand All @@ -162,7 +174,7 @@ public void dropDictionary(String dbName, String dictName, boolean ifExists) thr
}
// The data in BE doesn't always have the same situation with FE(think FE crash). But we have periodic report
// so that we can drop unknown dictionary at that time.
scheduleDataUnload(dic);
scheduleDataUnload(ctx, dic);
}

/**
Expand Down Expand Up @@ -193,7 +205,7 @@ public void dropTableDictionaries(String dbName, String tableName) {
unlockWrite();
}
for (Dictionary dictionary : droppedDictionaries) {
scheduleDataUnload(dictionary);
scheduleDataUnload(null, dictionary);
}
}

Expand All @@ -219,7 +231,7 @@ public void dropDbDictionaries(String dbName) {
unlockWrite();
}
for (Dictionary dictionary : droppedDictionaries) {
scheduleDataUnload(dictionary);
scheduleDataUnload(null, dictionary);
}
}

Expand Down Expand Up @@ -265,29 +277,45 @@ public Dictionary getDictionary(String dbName, String dictName) throws DdlExcept

private void checkAndUpdateDictionaries() {
// TODO: Implement dictionary data check and update logic
// This should:
// 1. Check source tables for changes
// 2. scheduleDataLoad if necessary. and increase dictionary version
// 3. Handle any errors or inconsistencies
}

/// data load and unload are also used for dictionary data check and update to keep data consistency between BE
/// and FE. So they are not private.

public void scheduleDataLoad(Dictionary dictionary) {
// TODO: Implement data load scheduling logic
// This should:
// 1. Create a load task
// 2. Submit the task to a task executor
// 3. Monitor the task progress
public void scheduleDataLoad(ConnectContext ctx, Dictionary dictionary) throws Exception {
if (ctx == null) { // for run with scheduler, not by command.
// priv check is done in relative(caller) command. so use ADMIN here is ok.
ctx = InsertTask.makeConnectContext(UserIdentity.ADMIN, dictionary.getDbName());
}

// not use rerfresh command's executor to avoid potential problems.
StmtExecutor executor = InsertTask.makeStmtExecutor(ctx);
NereidsParser parser = new NereidsParser();
InsertIntoTableCommand baseCommand = (InsertIntoTableCommand) parser
.parseSingle("insert into " + dictionary.getDbName() + "." + dictionary.getName() + " select * from "
+ dictionary.getDbName() + "." + dictionary.getSourceTableName());
TUniqueId queryId = InsertTask.generateQueryId();;
baseCommand.setLabelName(Optional.of(jobId + "_" + queryId.toString()));
baseCommand.setJobId(jobId);

dictionary.setStatus(Dictionary.DictionaryStatus.LOADING);
InsertIntoDictionaryCommand command = new InsertIntoDictionaryCommand(baseCommand, dictionary);

// run with sync
dictionary.increaseVersion();
command.run(ctx, executor);

dictionary.setStatus(Dictionary.DictionaryStatus.NORMAL);
dictionary.updateLastUpdateTime();
}

public void scheduleDataUnload(Dictionary dictionary) {
// TODO: Implement data unload scheduling logic
// This should:
// 1. Create an unload task
// 2. Submit the task to a task executor
// 3. Monitor the task progress
public void scheduleDataUnload(ConnectContext ctx, Dictionary dictionary) {
// TODO: maybe here we dont need a query. just a special RPC is ok.
if (ctx == null) {
// priv check is done in relative(caller) command. so use ADMIN here is ok.
ctx = InsertTask.makeConnectContext(UserIdentity.ADMIN, dictionary.getDbName());
}
StmtExecutor executor = InsertTask.makeStmtExecutor(ctx);
}

public void replayCreateDictionary(CreateDictionaryPersistInfo info) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@
public enum JobType {
INSERT,
MV,
DICTIONARY
}
Loading

0 comments on commit 0f82df0

Please sign in to comment.