From 33c7f1e491b2bee8211f5397c5ed3e6c22305102 Mon Sep 17 00:00:00 2001 From: Tao Wang <18234030530@163.com> Date: Fri, 3 Jan 2025 19:27:59 +0800 Subject: [PATCH] [feature][dingo-calcite,dingo-exec] Support select... for update --- .../src/main/codegen/templates/Parser.jj | 2 +- .../java/io/dingodb/calcite/DingoParser.java | 31 +- .../calcite/grammar/dql/SqlSelect.java | 6 +- .../dingodb/calcite/rel/DingoForUpdate.java | 56 ++++ .../dingodb/calcite/rel/LogicalForUpdate.java | 50 ++++ .../calcite/rule/DingoForUpdateRule.java | 56 ++++ .../io/dingodb/calcite/rule/DingoRules.java | 4 + .../calcite/visitor/DingoExplainVisitor.java | 11 + .../calcite/visitor/DingoJobVisitor.java | 23 +- .../calcite/visitor/DingoRelVisitor.java | 3 + .../function/DingoForUpdateVisitFun.java | 111 ++++++++ .../function/DingoTableModifyVisitFun.java | 11 +- .../io/dingodb/driver/DingoDriverParser.java | 7 +- .../java/io/dingodb/driver/DingoMeta.java | 1 + .../java/io/dingodb/exec/OperatorFactory.java | 3 + .../exec/operator/ForUpdateOperator.java | 266 ++++++++++++++++++ .../PessimisticLockDeleteOperator.java | 5 + .../operator/PessimisticLockOperator.java | 4 + .../PessimisticLockUpdateOperator.java | 7 + .../exec/operator/params/AbstractParams.java | 3 +- .../exec/operator/params/ForUpdateParam.java | 81 ++++++ .../operator/params/PessimisticLockParam.java | 5 +- .../operator/PessimisticRollBackOperator.java | 2 + .../io/dingodb/exec/utils/OpStateUtils.java | 3 +- .../dingodb/exec/utils/OperatorCodeUtils.java | 1 + 25 files changed, 730 insertions(+), 22 deletions(-) create mode 100644 dingo-calcite/src/main/java/io/dingodb/calcite/rel/DingoForUpdate.java create mode 100644 dingo-calcite/src/main/java/io/dingodb/calcite/rel/LogicalForUpdate.java create mode 100644 dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoForUpdateRule.java create mode 100644 dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoForUpdateVisitFun.java create mode 100644 dingo-exec/src/main/java/io/dingodb/exec/operator/ForUpdateOperator.java create mode 100644 dingo-exec/src/main/java/io/dingodb/exec/operator/params/ForUpdateParam.java diff --git a/dingo-calcite/src/main/codegen/templates/Parser.jj b/dingo-calcite/src/main/codegen/templates/Parser.jj index 00f99550a2..c46a8e9c8a 100644 --- a/dingo-calcite/src/main/codegen/templates/Parser.jj +++ b/dingo-calcite/src/main/codegen/templates/Parser.jj @@ -1434,7 +1434,7 @@ SqlNode SqlSelect() : new SqlNodeList(selectList, Span.of(selectList).pos()), fromClause, where, groupBy, having, windowDecls, null, null, null, new SqlNodeList(hints, getPos()), - exportOptions, flashbackQuery, flashBackStr, flashBackTsoStr); + exportOptions, flashbackQuery, flashBackStr, flashBackTsoStr, forUpdate); } } diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/DingoParser.java b/dingo-calcite/src/main/java/io/dingodb/calcite/DingoParser.java index 1379ae3d6c..e9dda2d4c8 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/DingoParser.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/DingoParser.java @@ -51,12 +51,14 @@ import io.dingodb.calcite.grammar.dql.SqlBackUpTimePoint; import io.dingodb.calcite.grammar.dql.SqlBackUpTsoPoint; import io.dingodb.calcite.grammar.dql.SqlNextAutoIncrement; +import io.dingodb.calcite.grammar.dql.SqlSelect; import io.dingodb.calcite.grammar.dql.SqlShow; import io.dingodb.calcite.grammar.dql.SqlStartGc; import io.dingodb.calcite.meta.DingoRelMetadataProvider; import io.dingodb.calcite.program.DecorrelateProgram; import io.dingodb.calcite.rel.DingoCost; import io.dingodb.calcite.rel.LogicalExportData; +import io.dingodb.calcite.rel.LogicalForUpdate; import io.dingodb.calcite.rel.logical.LogicalDingoRoot; import io.dingodb.calcite.rule.DingoRules; import io.dingodb.calcite.runtime.DingoResource; @@ -82,6 +84,7 @@ import org.apache.calcite.plan.ConventionTraitDef; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.plan.ViewExpanders; import org.apache.calcite.plan.volcano.AbstractConverter; @@ -266,7 +269,7 @@ public RelRoot convert(@NonNull SqlNode sqlNode, boolean needsValidation) { ); if (needExport(sqlNode)) { - io.dingodb.calcite.grammar.dql.SqlSelect sqlSelect = (io.dingodb.calcite.grammar.dql.SqlSelect) sqlNode; + SqlSelect sqlSelect = (SqlSelect) sqlNode; validatorExportParam(sqlSelect.getExportOptions()); relNode = new LogicalExportData( cluster, @@ -283,22 +286,32 @@ public RelRoot convert(@NonNull SqlNode sqlNode, boolean needsValidation) { context.getTimeZone() ); } + if (forUpdate(sqlNode)) { + SqlSelect sqlSelect = (SqlSelect) sqlNode; + relNode = new LogicalForUpdate( + cluster, + planner.emptyTraitSet(), + relRoot.rel, + Objects.requireNonNull( + sqlValidator.getNamespace(Objects.requireNonNull(sqlSelect.getFrom()))).getTable().unwrap(RelOptTable.class) + ); + } } // Insert a `DingoRoot` to collect the results. return relRoot.withRel(new LogicalDingoRoot(cluster, planner.emptyTraitSet(), relNode, selection)); } public static boolean needExport(@NonNull SqlNode sqlNode) { - if (sqlNode instanceof io.dingodb.calcite.grammar.dql.SqlSelect) { - io.dingodb.calcite.grammar.dql.SqlSelect sqlSelect = (io.dingodb.calcite.grammar.dql.SqlSelect) sqlNode; + if (sqlNode instanceof SqlSelect) { + SqlSelect sqlSelect = (SqlSelect) sqlNode; return sqlSelect.isExport(); } return false; } public static boolean flashBackQuery(@NonNull SqlNode sqlNode) { - if (sqlNode instanceof io.dingodb.calcite.grammar.dql.SqlSelect) { - io.dingodb.calcite.grammar.dql.SqlSelect sqlSelect = (io.dingodb.calcite.grammar.dql.SqlSelect) sqlNode; + if (sqlNode instanceof SqlSelect) { + SqlSelect sqlSelect = (SqlSelect) sqlNode; if (sqlSelect.getFrom() instanceof FlashBackSqlIdentifier) { return true; } else { @@ -308,6 +321,14 @@ public static boolean flashBackQuery(@NonNull SqlNode sqlNode) { return false; } + public static boolean forUpdate(@NonNull SqlNode sqlNode) { + if (sqlNode instanceof SqlSelect) { + SqlSelect sqlSelect = (SqlSelect) sqlNode; + return sqlSelect.isForUpdate(); + } + return false; + } + /** * Optimize a {@link RelNode} tree. * diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/grammar/dql/SqlSelect.java b/dingo-calcite/src/main/java/io/dingodb/calcite/grammar/dql/SqlSelect.java index a1b70ccb03..b159e21082 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/grammar/dql/SqlSelect.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/grammar/dql/SqlSelect.java @@ -39,6 +39,8 @@ public class SqlSelect extends org.apache.calcite.sql.SqlSelect { long flashBackTso; + boolean forUpdate; + public SqlSelect(SqlParserPos pos, @Nullable SqlNodeList keywordList, SqlNodeList selectList, @@ -69,7 +71,8 @@ public SqlSelect(SqlParserPos pos, ExportOptions exportOptions, boolean flashbackQuery, String flashBackStr, - String flashBackTsoStr) { + String flashBackTsoStr, + boolean forUpdate) { super(pos, keywordList, selectList, from, where, groupBy, having, windowDecls, orderBy, offset, fetch, hints); this.exportOptions = exportOptions; this.flashBackQuery = flashbackQuery; @@ -77,6 +80,7 @@ public SqlSelect(SqlParserPos pos, if (flashBackTsoStr != null) { this.flashBackTso = Long.parseLong(flashBackTsoStr); } + this.forUpdate = forUpdate; } public SqlSelect(SqlParserPos pos, diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rel/DingoForUpdate.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rel/DingoForUpdate.java new file mode 100644 index 0000000000..f42938ecce --- /dev/null +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rel/DingoForUpdate.java @@ -0,0 +1,56 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.calcite.rel; + +import io.dingodb.calcite.visitor.DingoRelVisitor; +import lombok.Getter; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.SingleRel; +import org.checkerframework.checker.nullness.qual.NonNull; + +import java.util.List; + +public class DingoForUpdate extends SingleRel implements DingoRel { + + @Getter + private RelOptTable table; + + /** + * Creates a SingleRel. + * + * @param cluster Cluster this relational expression belongs to + * @param traits + * @param input Input relational expression + */ + public DingoForUpdate(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelOptTable table) { + super(cluster, traits, input); + this.table = table; + } + + @Override + public T accept(@NonNull DingoRelVisitor visitor) { + return visitor.visit(this); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List inputs) { + return new DingoForUpdate(getCluster(), traitSet, sole(inputs), table); + } +} diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rel/LogicalForUpdate.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rel/LogicalForUpdate.java new file mode 100644 index 0000000000..7dfc3c60d6 --- /dev/null +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rel/LogicalForUpdate.java @@ -0,0 +1,50 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.calcite.rel; + +import io.dingodb.calcite.DingoRelOptTable; +import lombok.Getter; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.SingleRel; + +import java.util.List; + +public class LogicalForUpdate extends SingleRel { + + @Getter + private final RelOptTable table; + + /** + * Creates a SingleRel. + * + * @param cluster Cluster this relational expression belongs to + * @param traits + * @param input Input relational expression + */ + public LogicalForUpdate(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelOptTable table) { + super(cluster, traits, input); + this.table = table; + } + + @Override + public RelNode copy(RelTraitSet traitSet, List inputs) { + return new LogicalForUpdate(getCluster(), traitSet, sole(inputs), table); + } +} diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoForUpdateRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoForUpdateRule.java new file mode 100644 index 0000000000..bc945594e4 --- /dev/null +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoForUpdateRule.java @@ -0,0 +1,56 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.calcite.rule; + +import io.dingodb.calcite.rel.DingoForUpdate; +import io.dingodb.calcite.rel.LogicalForUpdate; +import io.dingodb.calcite.traits.DingoConvention; +import io.dingodb.calcite.traits.DingoRelStreaming; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.checkerframework.checker.nullness.qual.Nullable; + +public class DingoForUpdateRule extends ConverterRule { + + public static final Config DEFAULT = Config.INSTANCE + .withConversion( + LogicalForUpdate.class, + Convention.NONE, + DingoConvention.INSTANCE, + "DingoForUpdateRule" + ) + .withRuleFactory(DingoForUpdateRule::new); + + protected DingoForUpdateRule(Config config) { + super(config); + } + + @Override + public @Nullable RelNode convert(RelNode rel) { + LogicalForUpdate logicalForUpdate = (LogicalForUpdate) rel; + RelTraitSet traits = logicalForUpdate.getTraitSet() + .replace(DingoConvention.INSTANCE) + .replace(DingoRelStreaming.ROOT); + return new DingoForUpdate( + logicalForUpdate.getCluster(), + traits, + convert(logicalForUpdate.getInput(), traits), + logicalForUpdate.getTable()); + } +} diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoRules.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoRules.java index 84ac4c78af..1e5b5a6934 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoRules.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoRules.java @@ -110,6 +110,9 @@ public final class DingoRules { public static final DingoExportDataRule EXPORT_DATA_RULE = DingoExportDataRule.DEFAULT.toRule(DingoExportDataRule.class); + public static final DingoForUpdateRule FOR_UPDATE_RULE = + DingoForUpdateRule.DEFAULT.toRule(DingoForUpdateRule.class); + public static final DingoWithoutPriModifyRule WITHOUT_PRI_DELETE_RULE = DingoWithoutPriModifyRule.Config.DELETE.toRule(); @@ -223,6 +226,7 @@ public final class DingoRules { FILTER_SUB_QUERY_TO_CORRELATE, JOIN_SUB_QUERY_TO_CORRELATE, EXPORT_DATA_RULE, + FOR_UPDATE_RULE, WITHOUT_PRI_DELETE_RULE, WITHOUT_PRI_UPDATE_RULE, DINGO_VECTOR_PROJECT_RULE, diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/DingoExplainVisitor.java b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/DingoExplainVisitor.java index 92c6438400..781268af8e 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/DingoExplainVisitor.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/DingoExplainVisitor.java @@ -26,6 +26,7 @@ import io.dingodb.calcite.rel.DingoDocument; import io.dingodb.calcite.rel.DingoExportData; import io.dingodb.calcite.rel.DingoFilter; +import io.dingodb.calcite.rel.DingoForUpdate; import io.dingodb.calcite.rel.DingoFunctionScan; import io.dingodb.calcite.rel.DingoGetByIndex; import io.dingodb.calcite.rel.DingoGetByIndexMerge; @@ -394,6 +395,16 @@ public Explain visit(@NonNull DingoExportData rel) { return explain1; } + @Override + public Explain visit(@NonNull DingoForUpdate rel) { + String filter = ""; + String tableNames = ""; + return new Explain( + "dingoForUpdate", rel.getRowCount(), "root", + tableNames, filter + ); + } + @Override public Explain visit(@NonNull IndexFullScan rel) { StringBuilder info = new StringBuilder(); diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/DingoJobVisitor.java b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/DingoJobVisitor.java index c3c544a3f9..6cfcb3f5ad 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/DingoJobVisitor.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/DingoJobVisitor.java @@ -25,6 +25,7 @@ import io.dingodb.calcite.rel.DingoDocument; import io.dingodb.calcite.rel.DingoExportData; import io.dingodb.calcite.rel.DingoFilter; +import io.dingodb.calcite.rel.DingoForUpdate; import io.dingodb.calcite.rel.DingoFunctionScan; import io.dingodb.calcite.rel.DingoGetByIndex; import io.dingodb.calcite.rel.DingoGetByIndexMerge; @@ -66,6 +67,7 @@ import io.dingodb.calcite.visitor.function.DingoDocumentVisitFun; import io.dingodb.calcite.visitor.function.DingoExportDataVisitFun; import io.dingodb.calcite.visitor.function.DingoFilterVisitFun; +import io.dingodb.calcite.visitor.function.DingoForUpdateVisitFun; import io.dingodb.calcite.visitor.function.DingoFunctionScanVisitFun; import io.dingodb.calcite.visitor.function.DingoGetByIndexMergeVisitFun; import io.dingodb.calcite.visitor.function.DingoGetByIndexVisitFun; @@ -134,8 +136,11 @@ public class DingoJobVisitor implements DingoRelVisitor> { @Getter private long pointTs; - private DingoJobVisitor(Job job, IdGenerator idGenerator, Location currentLocation, - ITransaction transaction, SqlKind kind, ExecuteVariables executeVariables, long pointTs) { + @Getter + private boolean forUpdate; + + private DingoJobVisitor(Job job, IdGenerator idGenerator, Location currentLocation, ITransaction transaction, + SqlKind kind, ExecuteVariables executeVariables, long pointTs, boolean forUpdate) { this.job = job; this.idGenerator = idGenerator; this.currentLocation = currentLocation; @@ -143,6 +148,7 @@ private DingoJobVisitor(Job job, IdGenerator idGenerator, Location currentLocati this.kind = kind; this.executeVariables = executeVariables; this.pointTs = pointTs; + this.forUpdate = forUpdate; } public static void renderJob(JobManager jobManager, Job job, RelNode input, Location currentLocation) { @@ -154,16 +160,16 @@ public static void renderJob(JobManager jobManager, Job job, RelNode input, Loca boolean checkRoot, ITransaction transaction, SqlKind kind, ExecuteVariables executeVariables) { renderJob(jobManager, job, input, currentLocation, checkRoot, transaction, kind, - executeVariables, 0); + executeVariables, 0, false); } public static void renderJob(JobManager jobManager, Job job, RelNode input, Location currentLocation, boolean checkRoot, ITransaction transaction, SqlKind kind, - ExecuteVariables executeVariables, long pointTs) { + ExecuteVariables executeVariables, long pointTs, boolean forUpdate) { try { IdGenerator idGenerator = new IdGeneratorImpl(job.getJobId().seq); DingoJobVisitor visitor = new DingoJobVisitor( - job, idGenerator, currentLocation, transaction, kind, executeVariables, pointTs + job, idGenerator, currentLocation, transaction, kind, executeVariables, pointTs, forUpdate ); Collection outputs = dingo(input).accept(visitor); if (checkRoot && !outputs.isEmpty()) { @@ -202,7 +208,7 @@ public Collection visit(@NonNull DingoHashJoin rel) { @Override public Collection visit(@NonNull DingoTableModify rel) { - return DingoTableModifyVisitFun.visit(job, idGenerator, currentLocation, transaction, this, rel); + return DingoTableModifyVisitFun.visit(job, idGenerator, currentLocation, transaction, this, rel, forUpdate); } @Override @@ -330,6 +336,11 @@ public Collection visit(@NonNull IndexRangeScan rel) { return DingoIndexRangeScanVisitFun.visit(job, idGenerator, currentLocation, this, transaction, rel); } + @Override + public Collection visit(@NonNull DingoForUpdate rel) { + return DingoForUpdateVisitFun.visit(job, idGenerator, currentLocation, this, transaction, rel, forUpdate); + } + @Override public Collection visitDingoRelOp(@NonNull DingoRelOp rel) { return DingoRelOpVisitFun.visit(job, idGenerator, currentLocation, this, rel); diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/DingoRelVisitor.java b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/DingoRelVisitor.java index 17ab3bb7ac..f15907aa1f 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/DingoRelVisitor.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/DingoRelVisitor.java @@ -25,6 +25,7 @@ import io.dingodb.calcite.rel.DingoDocument; import io.dingodb.calcite.rel.DingoExportData; import io.dingodb.calcite.rel.DingoFilter; +import io.dingodb.calcite.rel.DingoForUpdate; import io.dingodb.calcite.rel.DingoFunctionScan; import io.dingodb.calcite.rel.DingoGetByIndex; import io.dingodb.calcite.rel.DingoGetByIndexMerge; @@ -135,4 +136,6 @@ public interface DingoRelVisitor { T visit(@NonNull DingoDiskAnnBuild dingoDiskAnnBuild); T visit(@NonNull DingoDiskAnnLoad dingoDiskAnnLoad); + + T visit(@NonNull DingoForUpdate dingoForUpdate); } diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoForUpdateVisitFun.java b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoForUpdateVisitFun.java new file mode 100644 index 0000000000..c9c0cf86e9 --- /dev/null +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoForUpdateVisitFun.java @@ -0,0 +1,111 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.calcite.visitor.function; + +import io.dingodb.calcite.DingoTable; +import io.dingodb.calcite.rel.DingoForUpdate; +import io.dingodb.calcite.visitor.DingoJobVisitor; +import io.dingodb.common.CommonId; +import io.dingodb.common.Location; +import io.dingodb.exec.base.IdGenerator; +import io.dingodb.exec.base.Job; +import io.dingodb.exec.base.Task; +import io.dingodb.exec.dag.Edge; +import io.dingodb.exec.dag.Vertex; +import io.dingodb.exec.operator.params.ForUpdateParam; +import io.dingodb.exec.operator.params.PessimisticLockParam; +import io.dingodb.exec.transaction.base.ITransaction; +import io.dingodb.meta.entity.Table; +import org.checkerframework.checker.nullness.qual.NonNull; + +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; + +import static io.dingodb.calcite.rel.DingoRel.dingo; +import static io.dingodb.common.util.Utils.sole; +import static io.dingodb.exec.utils.OperatorCodeUtils.FOR_UPDATE; +import static io.dingodb.exec.utils.OperatorCodeUtils.PESSIMISTIC_LOCK; + +public final class DingoForUpdateVisitFun { + + private DingoForUpdateVisitFun() { + } + + public static Collection visit( + Job job, + IdGenerator idGenerator, + Location currentLocation, + DingoJobVisitor visitor, + ITransaction transaction, + @NonNull DingoForUpdate rel, + boolean forUpdate + ) { + Collection inputs = dingo(rel.getInput()).accept(visitor); + + List outputs = new LinkedList<>(); + Vertex input = sole(inputs); + Task task = input.getTask(); + if (forUpdate) { + Table table = rel.getTable().unwrap(DingoTable.class).getTable(); + CommonId tableId = table.tableId; + boolean isScan = visitor.isScan() && !task.getBachTask(); + Vertex lockVertex; + if (transaction.getPrimaryKeyLock() == null) { + PessimisticLockParam param = new PessimisticLockParam( + tableId, + table.tupleType(), + table.keyMapping(), + transaction.getIsolationLevel(), + transaction.getStartTs(), + transaction.getForUpdateTs(), + true, + transaction.getPrimaryKeyLock(), + transaction.getLockTimeOut(), + false, + isScan, + "select", + table, + false, + forUpdate + ); + lockVertex = new Vertex(PESSIMISTIC_LOCK, param); + } else { + ForUpdateParam param = new ForUpdateParam( + tableId, + table.tupleType(), + transaction.getPrimaryKeyLock(), + transaction.getStartTs(), + transaction.getForUpdateTs(), + transaction.getIsolationLevel(), + transaction.getLockTimeOut(), + isScan, + table); + lockVertex = new Vertex(FOR_UPDATE, param); + } + lockVertex.setId(idGenerator.getOperatorId(task.getId())); + Edge edge = new Edge(input, lockVertex); + input.addEdge(edge); + lockVertex.addIn(edge); + task.putVertex(lockVertex); + outputs.add(lockVertex); + return outputs; + } else { + return inputs; + } + } +} diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoTableModifyVisitFun.java b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoTableModifyVisitFun.java index d8802ebad5..ee1e3d2c5a 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoTableModifyVisitFun.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoTableModifyVisitFun.java @@ -67,7 +67,7 @@ private DingoTableModifyVisitFun() { } public static Collection visit(Job job, IdGenerator idGenerator, Location currentLocation, - ITransaction transaction, DingoJobVisitor visitor, DingoTableModify rel + ITransaction transaction, DingoJobVisitor visitor, DingoTableModify rel, boolean forUpdate ) { Collection inputs = dingo(rel.getInput()).accept(visitor); List outputs = new LinkedList<>(); @@ -114,7 +114,8 @@ public static Collection visit(Job job, IdGenerator idGenerator, Locatio isScan, "insert", td, - isUpdate + isUpdate, + forUpdate ); lockVertex = new Vertex(PESSIMISTIC_LOCK, pessimisticLockParam); } else { @@ -238,7 +239,8 @@ public static Collection visit(Job job, IdGenerator idGenerator, Locatio isScan, "update", td, - false + false, + forUpdate ); lockVertex = new Vertex(PESSIMISTIC_LOCK, pessimisticLockParam); } else { @@ -375,7 +377,8 @@ public static Collection visit(Job job, IdGenerator idGenerator, Locatio isScan, "delete", td, - false + false, + forUpdate ); lockVertex = new Vertex(PESSIMISTIC_LOCK, pessimisticLockParam); } else { diff --git a/dingo-driver/host/src/main/java/io/dingodb/driver/DingoDriverParser.java b/dingo-driver/host/src/main/java/io/dingodb/driver/DingoDriverParser.java index 5a7cd3cbf1..4ae4ad751a 100644 --- a/dingo-driver/host/src/main/java/io/dingodb/driver/DingoDriverParser.java +++ b/dingo-driver/host/src/main/java/io/dingodb/driver/DingoDriverParser.java @@ -350,6 +350,7 @@ public Meta.Signature parseQuery( io.dingodb.calcite.grammar.dql.SqlSelect sqlSelect = (io.dingodb.calcite.grammar.dql.SqlSelect) sqlNode; pointTs = sqlSelect.getPointStartTs(); } + boolean forUpdate = forUpdate(sqlNode); long startTs; CommonId txnId; @@ -516,7 +517,8 @@ public Meta.Signature parseQuery( transaction.getType() == NONE ? null : connection.getTransaction(), sqlNode.getKind(), new ExecuteVariables(isJoinConcurrency(), getConcurrencyLevel(), isInsertCheckInplace()), - pointTs + pointTs, + forUpdate ); if (explain != null) { statementType = Meta.StatementType.CALL; @@ -780,11 +782,12 @@ private static void runPessimisticPrimaryKeyJob( ExecuteVariables executeVariables ) { Integer retry = Optional.mapOrGet(DingoConfiguration.instance().find("retry", int.class), __ -> __, () -> 30); + boolean forUpdate = forUpdate(sqlNode); while (retry-- > 0) { Job job = jobManager.createJob(transaction.getStartTs(), jobSeqId, transaction.getTxnId(), dingoType); DingoJobVisitor.renderJob( jobManager, job, relNode, currentLocation, true, - transaction, sqlNode.getKind(), executeVariables + transaction, sqlNode.getKind(), executeVariables, 0, forUpdate ); try { Iterator iterator = jobManager.createIterator(job, null); diff --git a/dingo-driver/host/src/main/java/io/dingodb/driver/DingoMeta.java b/dingo-driver/host/src/main/java/io/dingodb/driver/DingoMeta.java index faa2387971..b9cf64f7d3 100644 --- a/dingo-driver/host/src/main/java/io/dingodb/driver/DingoMeta.java +++ b/dingo-driver/host/src/main/java/io/dingodb/driver/DingoMeta.java @@ -615,6 +615,7 @@ private Frame getFrame(StatementHandle sh, long offset, int fetchMaxRowCount) th LogUtils.error(log, "run job exception:{}", e, e); if (transaction != null && transaction.isPessimistic() && transaction.getPrimaryKeyLock() != null && isDml(signature)) { + if (e instanceof LockWaitException) { return requireNonNull( resolveLockWait( diff --git a/dingo-exec/src/main/java/io/dingodb/exec/OperatorFactory.java b/dingo-exec/src/main/java/io/dingodb/exec/OperatorFactory.java index efb2ce86d5..f2212d5c2b 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/OperatorFactory.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/OperatorFactory.java @@ -29,6 +29,7 @@ import io.dingodb.exec.operator.EmptySourceOperator; import io.dingodb.exec.operator.ExportDataOperator; import io.dingodb.exec.operator.FilterOperator; +import io.dingodb.exec.operator.ForUpdateOperator; import io.dingodb.exec.operator.GetByIndexOperator; import io.dingodb.exec.operator.GetByKeysOperator; import io.dingodb.exec.operator.GetDistributionOperator; @@ -111,6 +112,7 @@ import static io.dingodb.exec.utils.OperatorCodeUtils.EMPTY_SOURCE; import static io.dingodb.exec.utils.OperatorCodeUtils.EXPORT_DATA; import static io.dingodb.exec.utils.OperatorCodeUtils.FILTER; +import static io.dingodb.exec.utils.OperatorCodeUtils.FOR_UPDATE; import static io.dingodb.exec.utils.OperatorCodeUtils.GET_BY_INDEX; import static io.dingodb.exec.utils.OperatorCodeUtils.GET_BY_KEYS; import static io.dingodb.exec.utils.OperatorCodeUtils.GET_DISTRIBUTION; @@ -259,6 +261,7 @@ public final class OperatorFactory { OPERATORS.put(TXN_DISK_ANN_RESET, TxnDiskAnnResetOperator.INSTANCE); OPERATORS.put(TXN_DISK_ANN_BUILD, TxnDiskAnnBuildOperator.INSTANCE); OPERATORS.put(TXN_DISK_ANN_LOAD, TxnDiskAnnLoadOperator.INSTANCE); + OPERATORS.put(FOR_UPDATE, ForUpdateOperator.INSTANCE); } private OperatorFactory() { diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/ForUpdateOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/ForUpdateOperator.java new file mode 100644 index 0000000000..8ed17f36b8 --- /dev/null +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/ForUpdateOperator.java @@ -0,0 +1,266 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.exec.operator; + +import io.dingodb.codec.CodecService; +import io.dingodb.codec.KeyValueCodec; +import io.dingodb.common.CommonId; +import io.dingodb.common.codec.PrimitiveCodec; +import io.dingodb.common.log.LogUtils; +import io.dingodb.common.meta.SchemaState; +import io.dingodb.common.store.KeyValue; +import io.dingodb.common.type.DingoType; +import io.dingodb.exec.Services; +import io.dingodb.exec.base.Status; +import io.dingodb.exec.converter.ValueConverter; +import io.dingodb.exec.dag.Vertex; +import io.dingodb.exec.exception.TaskCancelException; +import io.dingodb.exec.fin.Fin; +import io.dingodb.exec.operator.data.Context; +import io.dingodb.exec.operator.params.ForUpdateParam; +import io.dingodb.exec.transaction.base.ITransaction; +import io.dingodb.exec.transaction.impl.TransactionManager; +import io.dingodb.exec.transaction.util.TransactionUtil; +import io.dingodb.meta.entity.Column; +import io.dingodb.meta.entity.IndexTable; +import io.dingodb.meta.entity.IndexType; +import io.dingodb.meta.entity.Table; +import io.dingodb.store.api.StoreInstance; +import io.dingodb.store.api.transaction.data.Op; +import io.dingodb.store.api.transaction.data.pessimisticlock.TxnPessimisticLock; +import io.dingodb.store.api.transaction.exception.DuplicateEntryException; +import lombok.extern.slf4j.Slf4j; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static io.dingodb.common.util.NoBreakFunctions.wrap; +import static io.dingodb.exec.utils.ByteUtils.decodePessimisticKey; +import static io.dingodb.exec.utils.ByteUtils.encode; +import static io.dingodb.exec.utils.ByteUtils.getKeyByOp; + +@Slf4j +public class ForUpdateOperator extends SoleOutOperator { + public static final ForUpdateOperator INSTANCE = new ForUpdateOperator(); + + @Override + public boolean push(Context context, @Nullable Object[] tuple, Vertex vertex) { + ForUpdateParam param = vertex.getParam(); + CommonId txnId = vertex.getTask().getTxnId(); + CommonId tableId = param.getTableId(); + CommonId partId = context.getDistribution().getId(); + CommonId jobId = vertex.getTask().getJobId(); + byte[] primaryLockKey = param.getPrimaryLockKey(); + ITransaction transaction = TransactionManager.getTransaction(txnId); + if (transaction == null || (primaryLockKey == null && transaction.getPrimaryKey() != null)) { + return false; + } + DingoType schema = param.getSchema(); + StoreInstance localStore = Services.LOCAL_STORE.getInstance(tableId, partId); + KeyValueCodec codec = param.getCodec(); + if (context.getIndexId() != null) { + Table indexTable = (Table) TransactionManager.getIndex(txnId, context.getIndexId()); + if (indexTable == null) { + LogUtils.error(log, "[ddl] Pessimistic for update get index table null, indexId:{}", context.getIndexId()); + return false; + } + List columnIndices = param.getTable().getColumnIndices(indexTable.columns.stream() + .map(Column::getName) + .collect(Collectors.toList())); + Object defaultVal = null; + if (columnIndices.contains(-1)) { + Column addColumn = indexTable.getColumns().stream() + .filter(column -> column.getSchemaState() != SchemaState.SCHEMA_PUBLIC) + .findFirst().orElse(null); + if (addColumn != null) { + defaultVal = addColumn.getDefaultVal(); + } + } + tableId = context.getIndexId(); + Object[] finalTuple = tuple; + Object finalDefaultVal = defaultVal; + tuple = columnIndices.stream().map(i -> { + if (i == -1) { + return finalDefaultVal; + } + return finalTuple[i]; + }).toArray(); + schema = indexTable.tupleType(); + localStore = Services.LOCAL_STORE.getInstance(context.getIndexId(), partId); + codec = CodecService.getDefault().createKeyValueCodec(indexTable.version, indexTable.tupleType(), indexTable.keyMapping()); + } + StoreInstance kvStore = Services.KV_STORE.getInstance(tableId, partId); + Object[] newTuple = (Object[]) schema.convertFrom(tuple, ValueConverter.INSTANCE); + KeyValue keyValue = wrap(codec::encode).apply(newTuple); + CodecService.getDefault().setId(keyValue.getKey(), partId.domain); + byte[] key = keyValue.getKey(); + byte[] txnIdByte = txnId.encode(); + byte[] tableIdByte = tableId.encode(); + byte[] partIdByte = partId.encode(); + byte[] jobIdByte = jobId.encode(); + int len = txnIdByte.length + tableIdByte.length + partIdByte.length; + byte[] lockKeyBytes = encode( + CommonId.CommonType.TXN_CACHE_LOCK, + key, + Op.LOCK.getCode(), + len, + txnIdByte, + tableIdByte, + partIdByte); + KeyValue oldKeyValue = localStore.get(lockKeyBytes); + if (oldKeyValue == null) { + byte[] deadLockKeyBytes = encode( + CommonId.CommonType.TXN_CACHE_BLOCK_LOCK, + key, + Op.LOCK.getCode(), + len, + txnIdByte, + tableIdByte, + partIdByte + ); + KeyValue deadLockKeyValue = new KeyValue(deadLockKeyBytes, null); + localStore.put(deadLockKeyValue); + + byte[] primaryLockKeyBytes = decodePessimisticKey(primaryLockKey); + long forUpdateTs = jobId.seq; + byte[] forUpdateTsByte = PrimitiveCodec.encodeLong(forUpdateTs); + LogUtils.debug(log, "{}, forUpdateTs:{} txnPessimisticLock :{}", txnId, forUpdateTs, Arrays.toString(key)); + if (vertex.getTask().getStatus() == Status.STOPPED) { + LogUtils.warn(log, "Task status is stop..."); + // delete deadLockKey + localStore.delete(deadLockKeyBytes); + return false; + } else if (vertex.getTask().getStatus() == Status.CANCEL) { + LogUtils.warn(log, "Task status is cancel..."); + // delete deadLockKey + localStore.delete(deadLockKeyBytes); + throw new TaskCancelException("task is cancel"); + } + TxnPessimisticLock txnPessimisticLock = TransactionUtil.getTxnPessimisticLock( + txnId, + tableId, + partId, + primaryLockKeyBytes, + key, + param.getStartTs(), + forUpdateTs, + param.getIsolationLevel(), + true + ); + + KeyValue kvKeyValue = null; + try { + kvKeyValue = TransactionUtil.pessimisticLock( + txnPessimisticLock, + param.getLockTimeOut(), + txnId, + tableId, + partId, + key, + param.isScan() + ); + long newForUpdateTs = txnPessimisticLock.getForUpdateTs(); + if (newForUpdateTs != forUpdateTs) { + forUpdateTs = newForUpdateTs; + forUpdateTsByte = PrimitiveCodec.encodeLong(newForUpdateTs); + } + LogUtils.debug(log, "{}, forUpdateTs:{} txnPessimisticLock :{}", txnId, newForUpdateTs, Arrays.toString(key)); + if (vertex.getTask().getStatus() == Status.STOPPED) { + TransactionUtil.resolvePessimisticLock( + param.getIsolationLevel(), + txnId, + tableId, + partId, + deadLockKeyBytes, + key, + param.getStartTs(), + txnPessimisticLock.getForUpdateTs(), + false, + null + ); + return vertex.getSoleEdge().transformToNext(context, tuple); + } else if (vertex.getTask().getStatus() == Status.CANCEL) { + throw new TaskCancelException("task is cancel"); + } + } catch (Throwable throwable) { + LogUtils.error(log, throwable.getMessage(), throwable); + TransactionUtil.resolvePessimisticLock( + param.getIsolationLevel(), + txnId, + tableId, + partId, + deadLockKeyBytes, + key, + param.getStartTs(), + txnPessimisticLock.getForUpdateTs(), + true, + throwable + ); + } + // get lock success, delete deadLockKey + localStore.delete(deadLockKeyBytes); + if (kvKeyValue != null && kvKeyValue.getValue() != null) { + TransactionUtil.resolvePessimisticLock( + param.getIsolationLevel(), + txnId, + tableId, + partId, + deadLockKeyBytes, + key, + param.getStartTs(), + forUpdateTs, + true, + new DuplicateEntryException("Duplicate entry " + + TransactionUtil.duplicateEntryKey(CommonId.decode(tableIdByte), key, txnId) + " for key 'PRIMARY'") + ); + } + byte[] lockKey = getKeyByOp(CommonId.CommonType.TXN_CACHE_LOCK, Op.LOCK, deadLockKeyBytes); + // lockKeyValue + KeyValue lockKeyValue = new KeyValue(lockKey, forUpdateTsByte); + localStore.put(lockKeyValue); + // extraKeyValue + KeyValue extraKeyValue = new KeyValue( + encode( + CommonId.CommonType.TXN_CACHE_EXTRA_DATA, + key, Op.NONE.getCode(), + len, + jobIdByte, + tableIdByte, + partIdByte), + keyValue.getValue() + ); + localStore.put(extraKeyValue); + byte[] rollBackKey = getKeyByOp( + CommonId.CommonType.TXN_CACHE_RESIDUAL_LOCK, Op.LOCK, deadLockKeyBytes + ); + localStore.put(new KeyValue(rollBackKey, null)); + @Nullable Object[] finalTuple1 = tuple; + vertex.getOutList().forEach(o -> o.transformToNext(context, finalTuple1)); + } else { + @Nullable Object[] finalTuple2 = tuple; + vertex.getOutList().forEach(o -> o.transformToNext(context, finalTuple2)); + } + return true; + } + + @Override + public void fin(int pin, @Nullable Fin fin, Vertex vertex) { + vertex.getSoleEdge().fin(fin); + } +} diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockDeleteOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockDeleteOperator.java index 410d32c711..7d745b8b14 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockDeleteOperator.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockDeleteOperator.java @@ -282,6 +282,11 @@ public boolean push(Context context, @Nullable Object[] tuple, Vertex vertex) { Object[] result = codec.decode(kvKeyValue); vertex.getOutList().forEach(o -> o.transformToNext(context, result)); } else { + byte[] dataKey = getKeyByOp(CommonId.CommonType.TXN_CACHE_DATA, Op.PUT, lockKeyBytes); + byte[] rollBackKey = getKeyByOp(CommonId.CommonType.TXN_CACHE_RESIDUAL_LOCK, Op.LOCK, dataKey); + if (localStore.get(rollBackKey) != null) { + localStore.delete(rollBackKey); + } @Nullable Object[] finalTuple1 = tuple; vertex.getOutList().forEach(o -> o.transformToNext(context, finalTuple1)); } diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockOperator.java index 33a79c0ebf..1f8e7603e6 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockOperator.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockOperator.java @@ -325,6 +325,10 @@ public boolean push(Context context, @Nullable Object[] tuple, Vertex vertex) { byte[] rollBackKey = ByteUtils.getKeyByOp(CommonId.CommonType.TXN_CACHE_RESIDUAL_LOCK, Op.DELETE, deadLockKeyBytes); localStore.put(new KeyValue(rollBackKey, null)); } + if (param.isForUpdate()) { + byte[] rollBackKey = getKeyByOp(CommonId.CommonType.TXN_CACHE_RESIDUAL_LOCK, Op.LOCK, deadLockKeyBytes); + localStore.put(new KeyValue(rollBackKey, null)); + } } byte[] lockKey = getKeyByOp(CommonId.CommonType.TXN_CACHE_LOCK, Op.LOCK, deadLockKeyBytes); // lockKeyValue diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockUpdateOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockUpdateOperator.java index c875b93da6..fdad9d8247 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockUpdateOperator.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockUpdateOperator.java @@ -429,6 +429,13 @@ public boolean push(Context context, @Nullable Object[] tuple, Vertex vertex) { vertex.getOutList().forEach(o -> o.transformToNext(context, result)); return true; } else { + // delete for update lock + byte[] rollBackKey = ByteUtils.getKeyByOp( + CommonId.CommonType.TXN_CACHE_RESIDUAL_LOCK, Op.LOCK, dataKey + ); + if (localStore.get(rollBackKey) != null) { + localStore.delete(rollBackKey); + } if (context.getIndexId() != null) { LogUtils.debug(log, "{}, repeat primary key :{} keyValue is not null, index is not null", txnId, diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/AbstractParams.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/AbstractParams.java index 2adec43028..f341a72ec0 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/AbstractParams.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/AbstractParams.java @@ -122,7 +122,8 @@ @JsonSubTypes.Type(DocumentPreFilterParam.class), @JsonSubTypes.Type(TxnDiskAnnStatusParam.class), @JsonSubTypes.Type(TxnDiskAnnCountMemoryParam.class), - @JsonSubTypes.Type(TxnDiskAnnResetParam.class) + @JsonSubTypes.Type(TxnDiskAnnResetParam.class), + @JsonSubTypes.Type(ForUpdateParam.class) }) @JsonInclude(JsonInclude.Include.NON_NULL) public abstract class AbstractParams { diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/ForUpdateParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/ForUpdateParam.java new file mode 100644 index 0000000000..13eb9b3040 --- /dev/null +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/ForUpdateParam.java @@ -0,0 +1,81 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.exec.operator.params; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import io.dingodb.codec.CodecService; +import io.dingodb.codec.KeyValueCodec; +import io.dingodb.common.CommonId; +import io.dingodb.common.type.DingoType; +import io.dingodb.meta.entity.Table; +import lombok.Getter; + +@Getter +@JsonTypeName("forUpdate") +@JsonPropertyOrder({"table", "primaryLockKey", "schema", "startTs", + "forUpdateTs", "isolationLevel", "lockTimeOut", "isScan"}) +public class ForUpdateParam extends AbstractParams { + + @JsonProperty("table") + @JsonSerialize(using = CommonId.JacksonSerializer.class) + @JsonDeserialize(using = CommonId.JacksonDeserializer.class) + private final CommonId tableId; + @JsonProperty("primaryLockKey") + private final byte[] primaryLockKey; + @JsonProperty("schema") + private DingoType schema; + @JsonProperty("startTs") + private long startTs; + @JsonProperty("forUpdateTs") + private long forUpdateTs; + @JsonProperty("isolationLevel") + private int isolationLevel; + @JsonProperty("lockTimeOut") + private long lockTimeOut; + @JsonProperty("isScan") + private boolean isScan; + private Table table; + private KeyValueCodec codec; + + public ForUpdateParam( + @JsonProperty("table") CommonId tableId, + @JsonProperty("schema") DingoType schema, + @JsonProperty("primaryLockKey") byte[] primaryLockKey, + @JsonProperty("startTs") long startTs, + @JsonProperty("forUpdateTs") long forUpdateTs, + @JsonProperty("isolationLevel") int isolationLevel, + @JsonProperty("lockTimeOut") long lockTimeOut, + @JsonProperty("isScan") boolean isScan, + Table table + ) { + super(); + this.tableId = tableId; + this.schema = schema; + this.primaryLockKey = primaryLockKey; + this.startTs = startTs; + this.forUpdateTs = forUpdateTs; + this.isolationLevel = isolationLevel; + this.lockTimeOut = lockTimeOut; + this.isScan = isScan; + this.table = table; + this.codec = CodecService.getDefault().createKeyValueCodec(table.version, table.tupleType(), table.keyMapping()); + } +} diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PessimisticLockParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PessimisticLockParam.java index c7eee68da6..70dd39d649 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PessimisticLockParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PessimisticLockParam.java @@ -39,6 +39,7 @@ public class PessimisticLockParam extends TxnPartModifyParam { private final String opType; @JsonProperty("isDuplicateKeyUpdate") private final boolean isDuplicateUpdate; + private boolean forUpdate; public PessimisticLockParam( @JsonProperty("table") CommonId tableId, @JsonProperty("schema") DingoType schema, @@ -53,7 +54,8 @@ public PessimisticLockParam( @JsonProperty("isScan") boolean isScan, @JsonProperty("opType") String opType, Table table, - boolean isDuplicateUpdate + boolean isDuplicateUpdate, + boolean forUpdate ) { super(tableId, schema, keyMapping, table, pessimisticTxn, isolationLevel, primaryLockKey, startTs, forUpdateTs, lockTimeOut); @@ -61,6 +63,7 @@ public PessimisticLockParam( this.isScan = isScan; this.opType = opType; this.isDuplicateUpdate = isDuplicateUpdate; + this.forUpdate = forUpdate; } public void inc() { count++; diff --git a/dingo-exec/src/main/java/io/dingodb/exec/transaction/operator/PessimisticRollBackOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/transaction/operator/PessimisticRollBackOperator.java index 6bd315ef50..ae7277e0b6 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/transaction/operator/PessimisticRollBackOperator.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/transaction/operator/PessimisticRollBackOperator.java @@ -114,6 +114,8 @@ public boolean push(Context context, @Nullable Object[] tuple, Vertex vertex) { store.delete(lockKey); lockKey[lockKey.length - 2] = (byte) Op.PUT.getCode(); store.delete(lockKey); + lockKey[lockKey.length - 2] = (byte) Op.LOCK.getCode(); + store.delete(lockKey); LogUtils.info(log, "PessimisticRollBack key is {}, forUpdateTs:{}, jobId:{}", Arrays.toString(key), forUpdateTs, jobId); CommonId partId = param.getPartId(); if (partId == null) { diff --git a/dingo-exec/src/main/java/io/dingodb/exec/utils/OpStateUtils.java b/dingo-exec/src/main/java/io/dingodb/exec/utils/OpStateUtils.java index 7cc9c8062c..8c223c8b00 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/utils/OpStateUtils.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/utils/OpStateUtils.java @@ -33,7 +33,8 @@ public static boolean allowDeleteOnly(SchemaState schemaState) { } public static boolean allowOpContinue(String op, SchemaState schemaState) { - if ("insert".equalsIgnoreCase(op) || "update".equalsIgnoreCase(op)) { + // select op is used in the case of 'select... for update' + if ("insert".equalsIgnoreCase(op) || "update".equalsIgnoreCase(op) || "select".equalsIgnoreCase(op)) { return allowWrite(schemaState); } else if ("delete".equalsIgnoreCase(op)) { return allowDeleteOnly(schemaState); diff --git a/dingo-exec/src/main/java/io/dingodb/exec/utils/OperatorCodeUtils.java b/dingo-exec/src/main/java/io/dingodb/exec/utils/OperatorCodeUtils.java index ceef11a604..0fcfafb45f 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/utils/OperatorCodeUtils.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/utils/OperatorCodeUtils.java @@ -102,6 +102,7 @@ public final class OperatorCodeUtils { public static final CommonId TXN_INDEX_RANGE_SCAN = new CommonId(CommonId.CommonType.OP, OP, 73); public static final CommonId OPTIMISTIC_ROLL_BACK = new CommonId(CommonId.CommonType.OP, OP, 74); + public static final CommonId FOR_UPDATE = new CommonId(CommonId.CommonType.OP, OP, 75); // sink public static final CommonId ROOT = new CommonId(CommonId.CommonType.OP, SINK, 80);