Skip to content

Commit

Permalink
support set group commit backend
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Oct 12, 2024
1 parent fe3a313 commit 1f84b76
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@
import org.apache.doris.planner.DataSink;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectContext.ConnectType;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.system.Backend;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.sparkproject.guava.base.Throwables;
Expand All @@ -66,6 +68,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -204,10 +207,10 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor stmtExec
// because Nereids's DistributePlan are not gernerated, so we return factory and after the
// DistributePlan have been generated, we can create InsertExecutor
private ExecutorFactory selectInsertExecutorFactory(
NereidsPlanner planner, ConnectContext ctx, StmtExecutor executor, TableIf targetTableIf) {
NereidsPlanner planner, ConnectContext ctx, StmtExecutor stmtExecutor, TableIf targetTableIf) {
try {
executor.setPlanner(planner);
executor.checkBlockRules();
stmtExecutor.setPlanner(planner);
stmtExecutor.checkBlockRules();
if (ctx.getConnectType() == ConnectType.MYSQL && ctx.getMysqlChannel() != null) {
ctx.getMysqlChannel().reset();
}
Expand All @@ -224,9 +227,11 @@ private ExecutorFactory selectInsertExecutorFactory(
if (physicalSink instanceof PhysicalOlapTableSink) {
boolean emptyInsert = childIsEmptyRelation(physicalSink);
OlapTable olapTable = (OlapTable) targetTableIf;

ExecutorFactory executorFactory;
// the insertCtx contains some variables to adjust SinkNode
if (ctx.isTxnModel()) {
return ExecutorFactory.from(
executorFactory = ExecutorFactory.from(
planner,
dataSink,
physicalSink,
Expand All @@ -238,7 +243,7 @@ private ExecutorFactory selectInsertExecutorFactory(
.selectBackendForGroupCommit(targetTableIf.getId(), ctx);
// set groupCommitBackend for Nereids's DistributePlanner
planner.getCascadesContext().getStatementContext().setGroupCommitMergeBackend(groupCommitBackend);
return ExecutorFactory.from(
executorFactory = ExecutorFactory.from(
planner,
dataSink,
physicalSink,
Expand All @@ -247,13 +252,20 @@ private ExecutorFactory selectInsertExecutorFactory(
)
);
} else {
return ExecutorFactory.from(
executorFactory = ExecutorFactory.from(
planner,
dataSink,
physicalSink,
() -> new OlapInsertExecutor(ctx, olapTable, label, planner, insertCtx, emptyInsert)
);
}

return executorFactory.onCreate(executor -> {
Coordinator coordinator = executor.getCoordinator();
boolean isEnableMemtableOnSinkNode = olapTable.getTableProperty().getUseSchemaLightChange()
&& coordinator.getQueryOptions().isEnableMemtableOnSinkNode();
coordinator.getQueryOptions().setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
});
} else if (physicalSink instanceof PhysicalHiveTableSink) {
boolean emptyInsert = childIsEmptyRelation(physicalSink);
HMSExternalTable hiveExternalTable = (HMSExternalTable) targetTableIf;
Expand Down Expand Up @@ -313,6 +325,13 @@ private ExecutorFactory selectInsertExecutorFactory(
private BuildInsertExecutorResult planInsertExecutor(
ConnectContext ctx, StmtExecutor stmtExecutor,
LogicalPlanAdapter logicalPlanAdapter, TableIf targetTableIf) throws Throwable {
// the key logical when use new coordinator:
// 1. use NereidsPlanner to generate PhysicalPlan
// 2. use PhysicalPlan to select InsertExecutorFactory, some InsertExecutors want to control
// which backend should be used
// 3. NereidsPlanner use PhysicalPlan and the provided backend to generate DistributePlan
// 4. ExecutorFactory use the DistributePlan to generate the NereidsSqlCoordinator and InsertExecutor

// we should compute group commit backend first,
// then we can do distribute and assign backend to the instance in Nereids's DistributePlan
StatementContext statementContext = ctx.getStatementContext();
Expand Down Expand Up @@ -373,22 +392,20 @@ public StmtType stmtType() {
return StmtType.INSERT;
}

private interface InsertExecutorBuilder {
BuildInsertExecutorResult build(NereidsPlanner planner) throws Throwable;
}

private static class ExecutorFactory {
public final NereidsPlanner planner;
public final DataSink dataSink;
public final PhysicalSink<?> physicalSink;
public final Supplier<AbstractInsertExecutor> executorSupplier;
private List<Consumer<AbstractInsertExecutor>> createCallback;

private ExecutorFactory(NereidsPlanner planner, DataSink dataSink, PhysicalSink<?> physicalSink,
Supplier<AbstractInsertExecutor> executorSupplier) {
this.planner = planner;
this.dataSink = dataSink;
this.physicalSink = physicalSink;
this.executorSupplier = executorSupplier;
this.createCallback = Lists.newArrayList();
}

public static ExecutorFactory from(
Expand All @@ -397,8 +414,17 @@ public static ExecutorFactory from(
return new ExecutorFactory(planner, dataSink, physicalSink, executorSupplier);
}

public ExecutorFactory onCreate(Consumer<AbstractInsertExecutor> onCreate) {
this.createCallback.add(onCreate);
return this;
}

public BuildInsertExecutorResult build() {
return new BuildInsertExecutorResult(planner, executorSupplier.get(), dataSink, physicalSink);
AbstractInsertExecutor executor = executorSupplier.get();
for (Consumer<AbstractInsertExecutor> callback : createCallback) {
callback.accept(executor);
}
return new BuildInsertExecutorResult(planner, executor, dataSink, physicalSink);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ protected static void analyzeGroupCommit(ConnectContext ctx, TableIf table, Logi
}

@Override
protected void doBeforeExec() {
protected void beforeExec() {
if (Env.getCurrentEnv().getGroupCommitManager().isBlock(this.table.getId())) {
String msg = "insert table " + this.table.getId() + GroupCommitPlanner.SCHEMA_CHANGE;
LOG.info(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,17 +191,7 @@ protected void addTableIndexes(TransactionState state) {
}

@Override
protected final void beforeExec() {
boolean isEnableMemtableOnSinkNode =
olapTable.getTableProperty().getUseSchemaLightChange()
&& getCoordinator().getQueryOptions().isEnableMemtableOnSinkNode();
getCoordinator().getQueryOptions()
.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);

doBeforeExec();
}

protected void doBeforeExec() {
protected void beforeExec() {
String queryId = DebugUtil.printId(ctx.queryId());
LOG.info("start insert [{}] with query id {} and txn id {}", labelName, queryId, txnId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void beginTransaction() {
}

@Override
protected void doBeforeExec() {
protected void beforeExec() {
String queryId = DebugUtil.printId(ctx.queryId());
LOG.info("start insert [{}] with query id {} and txn id {}, txn_model=true", labelName, queryId, txnId);
}
Expand Down

0 comments on commit 1f84b76

Please sign in to comment.