From 542eb54ba1e79d454f359c26d41b198448ca49ab Mon Sep 17 00:00:00 2001 From: guojn1 Date: Fri, 3 Jan 2025 10:54:49 +0800 Subject: [PATCH 1/3] [fix][dingo-store-proxy] Add codecVersion --- .../io/dingodb/calcite/DingoDdlExecutor.java | 1 + .../calcite/executor/LoadDataExecutor.java | 8 +++-- .../calcite/executor/ShowLocksExecutor.java | 2 +- .../rel/dingo/DingoIndexScanWithRelOp.java | 3 +- .../calcite/rel/dingo/DingoScanWithRelOp.java | 2 +- .../dingodb/calcite/rule/DingoLikeRule.java | 2 +- .../calcite/rule/DingoRangeDeleteRule.java | 2 +- .../dingodb/calcite/stats/StatsOperator.java | 12 ++++--- .../calcite/stats/task/AnalyzeTask.java | 2 +- .../calcite/stats/task/CollectStatsTask.java | 3 +- .../function/DingoFunctionScanVisitFun.java | 3 +- .../DingoGetByIndexMergeVisitFun.java | 2 +- .../function/DingoGetByIndexVisitFun.java | 4 +-- .../function/DingoIndexRangeScanVisitFun.java | 2 +- .../DingoIndexScanWithRelOpVisitFun.java | 12 ++++--- .../function/DingoLikeScanVisitFun.java | 3 +- .../function/DingoRangeDeleteVisitFun.java | 6 ++-- .../function/DingoScanWithRelOpVisitFun.java | 12 ++++--- .../function/DingoTableScanVisitFun.java | 8 +++-- .../io/dingodb/client/OperationServiceV2.java | 10 +++--- .../java/io/dingodb/codec/CodecService.java | 32 ++++++++++++------- .../io/dingodb/codec/serial/CodecService.java | 21 +++++++----- .../dingodb/common/table/IndexDefinition.java | 8 +++-- .../dingodb/common/table/TableDefinition.java | 7 +++- .../exec/operator/DistributeOperator.java | 3 +- .../exec/operator/GetByIndexOperator.java | 5 --- .../PessimisticLockDeleteOperator.java | 4 ++- .../PessimisticLockInsertOperator.java | 4 ++- .../operator/PessimisticLockOperator.java | 3 +- .../PessimisticLockUpdateOperator.java | 2 +- .../operator/TxnDiskAnnStatusOperator.java | 1 - .../exec/operator/TxnGetByKeysOperator.java | 1 - .../exec/operator/TxnPartDeleteOperator.java | 2 +- .../operator/TxnPartDocumentOperator.java | 4 --- .../exec/operator/TxnPartInsertOperator.java | 2 +- .../exec/operator/TxnPartUpdateOperator.java | 2 +- .../exec/operator/TxnPartVectorOperator.java | 3 +- .../TxnScanWithRelOpOperatorBase.java | 1 - .../operator/params/DistributionParam.java | 3 +- .../params/DocumentPartitionParam.java | 3 +- .../operator/params/FilterProjectParam.java | 6 +++- .../params/FilterProjectSourceParam.java | 23 +++++++++++++ .../exec/operator/params/GetByIndexParam.java | 6 ++-- .../exec/operator/params/GetByKeysParam.java | 5 +-- .../operator/params/GetDistributionParam.java | 3 +- .../operator/params/InfoSchemaScanParam.java | 2 +- .../exec/operator/params/LikeScanParam.java | 7 ++-- .../operator/params/PartDocumentParam.java | 5 +-- .../exec/operator/params/PartModifyParam.java | 3 +- .../operator/params/PartRangeDeleteParam.java | 7 ++-- .../operator/params/PartRangeScanParam.java | 10 +++--- .../exec/operator/params/PartUpdateParam.java | 4 --- .../exec/operator/params/PartVectorParam.java | 3 +- .../exec/operator/params/ScanParam.java | 8 +++-- .../operator/params/ScanWithRelOpParam.java | 8 +++-- .../operator/params/TxnDiskAnnBuildParam.java | 6 ++-- .../params/TxnDiskAnnCountMemoryParam.java | 3 +- .../operator/params/TxnDiskAnnLoadParam.java | 3 +- .../operator/params/TxnDiskAnnResetParam.java | 6 ++-- .../params/TxnDiskAnnStatusParam.java | 6 ++-- .../operator/params/TxnGetByIndexParam.java | 8 +++-- .../operator/params/TxnGetByKeysParam.java | 5 +-- .../params/TxnIndexRangeScanParam.java | 12 ++++--- .../operator/params/TxnLikeScanParam.java | 7 ++-- .../operator/params/TxnPartDocumentParam.java | 5 +-- .../operator/params/TxnPartModifyParam.java | 1 - .../params/TxnPartRangeDeleteParam.java | 7 ++-- .../params/TxnPartRangeScanParam.java | 10 +++--- .../operator/params/TxnPartVectorParam.java | 5 +-- .../exec/operator/params/TxnScanParam.java | 5 +-- .../params/TxnScanWithRelOpParam.java | 5 +-- .../operator/params/VectorPartitionParam.java | 3 +- .../util/TransactionCacheToMutation.java | 6 ++-- .../transaction/util/TransactionUtil.java | 2 +- .../transaction/util/TwoPhaseCommitUtils.java | 2 ++ .../io/dingodb/exec/transaction/util/Txn.java | 10 +++--- .../server/executor/ddl/AddColumnFiller.java | 5 +-- .../server/executor/ddl/IndexAddFiller.java | 7 ++-- .../executor/ddl/ModifyColumnFiller.java | 5 +-- .../server/executor/prepare/PrepareMeta.java | 4 +-- .../executor/service/SequenceService.java | 2 +- .../server/executor/service/UserService.java | 8 +++-- .../java/io/dingodb/meta/entity/Table.java | 3 ++ .../dingodb/store/proxy/common/Mapping.java | 3 +- .../store/proxy/common/TableDefinition.java | 5 +++ .../store/proxy/mapper/TableMapper.java | 8 +++-- .../dingodb/store/proxy/meta/MetaCache.java | 5 +-- .../store/proxy/service/CodecService.java | 11 ++++--- .../store/proxy/service/StoreService.java | 4 +-- .../service/TransactionStoreInstance.java | 2 +- 90 files changed, 315 insertions(+), 184 deletions(-) diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/DingoDdlExecutor.java b/dingo-calcite/src/main/java/io/dingodb/calcite/DingoDdlExecutor.java index 167ec6dfc1..3fbfdc9404 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/DingoDdlExecutor.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/DingoDdlExecutor.java @@ -489,6 +489,7 @@ public void execute(SqlCreateTable createT, CalcitePrepare.Context context) { .name(tableName) .columns(columns) .version(1) + .codecVersion(2) .ttl(create.getTtl()) .partDefinition(create.getPartDefinition()) .engine(create.getEngine()) diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/executor/LoadDataExecutor.java b/dingo-calcite/src/main/java/io/dingodb/calcite/executor/LoadDataExecutor.java index 7dab4bb830..af47528ae9 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/executor/LoadDataExecutor.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/executor/LoadDataExecutor.java @@ -159,7 +159,9 @@ public LoadDataExecutor(SqlLoadData sqlLoadData, Connection connection, DingoPar if (table == null) { throw DingoResource.DINGO_RESOURCE.unknownTable(schemaName + "." + sqlLoadData.getTableName()).ex(); } - codec = CodecService.getDefault().createKeyValueCodec(table.version, table.tupleType(), table.keyMapping()); + codec = CodecService.getDefault().createKeyValueCodec( + table.getCodecVersion(), table.version, table.tupleType(), table.keyMapping() + ); distributions = metaService.getRangeDistribution(table.tableId); schema = table.tupleType(); this.isTxn = checkEngine(); @@ -420,7 +422,9 @@ public void insertWithTxn(Object[] tuples) { .collect(Collectors.toList())); Object[] tuplesTmp = columnIndices.stream().map(i -> tuples[i]).toArray(); KeyValueCodec codec = CodecService.getDefault() - .createKeyValueCodec(indexTable.version, indexTable.tupleType(), indexTable.keyMapping()); + .createKeyValueCodec( + indexTable.getCodecVersion(), indexTable.version, + indexTable.tupleType(), indexTable.keyMapping()); keyValue = wrap(codec::encode).apply(tuplesTmp); PartitionService ps = PartitionService.getService( diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/executor/ShowLocksExecutor.java b/dingo-calcite/src/main/java/io/dingodb/calcite/executor/ShowLocksExecutor.java index 0f93b41381..9baba460d5 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/executor/ShowLocksExecutor.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/executor/ShowLocksExecutor.java @@ -225,7 +225,7 @@ private static String lockKey(CommonId tableId, byte[] keyBytes) { return ""; } KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec( - table.version, table.tupleType(), table.keyMapping() + table.getCodecVersion(), table.version, table.tupleType(), table.keyMapping() ); return Utils.buildKeyStr(table.keyMapping(), codec.decodeKeyPrefix(keyBytes)); } diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rel/dingo/DingoIndexScanWithRelOp.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rel/dingo/DingoIndexScanWithRelOp.java index ca365d1903..3773ede80b 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rel/dingo/DingoIndexScanWithRelOp.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rel/dingo/DingoIndexScanWithRelOp.java @@ -75,7 +75,8 @@ public DingoIndexScanWithRelOp( super(cluster, traitSet, hints, table, rowType, relOp, filter, pushDown, keepOrder, indexTable, rangeScan); if (getFilter() != null) { KeyValueCodec codec = CodecService.getDefault() - .createKeyValueCodec(indexTable.version, indexTable.tupleType(), indexTable.keyMapping()); + .createKeyValueCodec(indexTable.getCodecVersion(), indexTable.version, indexTable.tupleType(), + indexTable.keyMapping()); RangeDistribution range = RangeUtils.createRangeByFilter(indexTable, codec, filter, null); rangeDistribution = range; if (range != null && !(range.getStartKey() == null && range.getEndKey() == null)) { diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rel/dingo/DingoScanWithRelOp.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rel/dingo/DingoScanWithRelOp.java index 6173bf9bd7..23da49e125 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rel/dingo/DingoScanWithRelOp.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rel/dingo/DingoScanWithRelOp.java @@ -79,7 +79,7 @@ public DingoScanWithRelOp( if (getFilter() != null) { Table td = Objects.requireNonNull(table.unwrap(DingoTable.class)).getTable(); KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec( - td.version, td.tupleType(), td.keyMapping() + td.getCodecVersion(), td.version, td.tupleType(), td.keyMapping() ); RangeDistribution range = RangeUtils.createRangeByFilter(td, codec, filter, null); rangeDistribution = range; diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoLikeRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoLikeRule.java index fadbe7e669..869d9d1062 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoLikeRule.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoLikeRule.java @@ -107,7 +107,7 @@ public void onMatch(@NonNull RelOptRuleCall call) { } KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec( - td.version, td.tupleType(), td.keyMapping() + td.getCodecVersion(), td.version, td.tupleType(), td.keyMapping() ); Object[] tuple = new Object[td.getColumns().size()]; diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoRangeDeleteRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoRangeDeleteRule.java index e64489681c..9077ae0e00 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoRangeDeleteRule.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoRangeDeleteRule.java @@ -56,7 +56,7 @@ public void onMatch(@NonNull RelOptRuleCall call) { return; } KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec( - td.version, td.tupleType(), td.keyMapping() + td.getCodecVersion(), td.version, td.tupleType(), td.keyMapping() ); RangeDistribution range; if (rel.getFilter() == null && (rel.getSelection().size() == rel.getTable().getRowType().getFieldCount())) { diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/stats/StatsOperator.java b/dingo-calcite/src/main/java/io/dingodb/calcite/stats/StatsOperator.java index 3d3af78489..2b8588bcb9 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/stats/StatsOperator.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/stats/StatsOperator.java @@ -88,14 +88,18 @@ public abstract class StatsOperator { cmSketchTblId = cmSketchTable.tableId; analyzeTaskCodec = CodecService.getDefault() .createKeyValueCodec( - analyzeTaskTable.version, analyzeTaskTable.tupleType(), analyzeTaskTable.keyMapping() + analyzeTaskTable.getCodecVersion(), analyzeTaskTable.version, + analyzeTaskTable.tupleType(), analyzeTaskTable.keyMapping() ); bucketsCodec = CodecService.getDefault() - .createKeyValueCodec(bucketsTable.version, bucketsTable.tupleType(), bucketsTable.keyMapping()); + .createKeyValueCodec(bucketsTable.getCodecVersion(), bucketsTable.version, + bucketsTable.tupleType(), bucketsTable.keyMapping()); statsCodec = CodecService.getDefault() - .createKeyValueCodec(statsTable.version, statsTable.tupleType(), statsTable.keyMapping()); + .createKeyValueCodec(statsTable.getCodecVersion(), statsTable.version, + statsTable.tupleType(), statsTable.keyMapping()); cmSketchCodec = CodecService.getDefault() - .createKeyValueCodec(cmSketchTable.version, cmSketchTable.tupleType(), cmSketchTable.keyMapping()); + .createKeyValueCodec(cmSketchTable.getCodecVersion(), cmSketchTable.version, + cmSketchTable.tupleType(), cmSketchTable.keyMapping()); analyzeTaskStore = storeTxnService.getInstance(analyzeTaskTblId, getRegionId(analyzeTaskTblId)); bucketsStore = storeTxnService.getInstance(bucketsTblId, getRegionId(bucketsTblId)); diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/stats/task/AnalyzeTask.java b/dingo-calcite/src/main/java/io/dingodb/calcite/stats/task/AnalyzeTask.java index bde5e72888..1a48e6c9b3 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/stats/task/AnalyzeTask.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/stats/task/AnalyzeTask.java @@ -342,7 +342,7 @@ private void buildHistogram(List histogramList, ); return Iterators.transform( iterator, - wrap(CodecService.getDefault().createKeyValueCodec(td.version, + wrap(CodecService.getDefault().createKeyValueCodec(td.getCodecVersion(), td.version, outputSchema, outputKeyMapping)::decode)::apply ); }).collect(Collectors.toList()); diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/stats/task/CollectStatsTask.java b/dingo-calcite/src/main/java/io/dingodb/calcite/stats/task/CollectStatsTask.java index 1eb01be729..46513bd92f 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/stats/task/CollectStatsTask.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/stats/task/CollectStatsTask.java @@ -85,7 +85,8 @@ public CollectStatsTask(RangeDistribution region, this.startTs = scanTs; this.timeout = timeout; this.kvStore = Services.KV_STORE.getInstance(tableId, region.id()); - this.codec = CodecService.getDefault().createKeyValueCodec(td.getVersion(), td.tupleType(), td.keyMapping()); + this.codec = CodecService.getDefault().createKeyValueCodec( + td.getCodecVersion(), td.getVersion(), td.tupleType(), td.keyMapping()); this.minSketchList = minSketches.stream().map(CountMinSketch::copy) .collect(Collectors.toList()); diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoFunctionScanVisitFun.java b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoFunctionScanVisitFun.java index d8e64dcf7c..99aac7a410 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoFunctionScanVisitFun.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoFunctionScanVisitFun.java @@ -92,7 +92,8 @@ public static Collection visit( null, null, td.tupleType(), - false + false, + td.codecVersion ); task = job.getOrCreate(currentLocation, idGenerator); Vertex scanVertex = new Vertex(PART_RANGE_SCAN, param); diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoGetByIndexMergeVisitFun.java b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoGetByIndexMergeVisitFun.java index 565bcb1d58..320824c8e9 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoGetByIndexMergeVisitFun.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoGetByIndexMergeVisitFun.java @@ -93,7 +93,7 @@ public static Collection visit( List keyTuples = TableUtils.getTuplesForKeyMapping(indexValSet.getValue(), indexTd); KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec( - indexTd.version, indexTd.tupleType(), indexTd.keyMapping() + indexTd.getCodecVersion(), indexTd.version, indexTd.tupleType(), indexTd.keyMapping() ); List keyList = new ArrayList<>(); for (Object[] keyTuple : keyTuples) { diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoGetByIndexVisitFun.java b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoGetByIndexVisitFun.java index 118ad2358c..e3d02dad73 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoGetByIndexVisitFun.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoGetByIndexVisitFun.java @@ -90,8 +90,8 @@ public static LinkedList visit( List keyTuples = TableUtils.getTuplesForKeyMapping(indexValSet.getValue(), indexTd); - KeyValueCodec codec = - CodecService.getDefault().createKeyValueCodec(indexTd.version, indexTd.tupleType(), indexTd.keyMapping()); + KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec( + indexTd.getCodecVersion(), indexTd.version, indexTd.tupleType(), indexTd.keyMapping()); List keyList = new ArrayList<>(); for (Object[] keyTuple : keyTuples) { byte[] keys = codec.encodeKeyPrefix(keyTuple, calculatePrefixCount(keyTuple)); diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoIndexRangeScanVisitFun.java b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoIndexRangeScanVisitFun.java index 668ebf926d..e19430d6fd 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoIndexRangeScanVisitFun.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoIndexRangeScanVisitFun.java @@ -114,7 +114,7 @@ private DingoIndexRangeScanVisitFun() { if (rexFilter != null) { filter = SqlExprUtils.toSqlExpr(rexFilter); KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec( - indexTd.version, indexTd.tupleType(), indexTd.keyMapping() + indexTd.getCodecVersion(), indexTd.version, indexTd.tupleType(), indexTd.keyMapping() ); RangeDistribution range = RangeUtils.createRangeByFilter(indexTd, codec, rexFilter, null); if (range != null) { diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoIndexScanWithRelOpVisitFun.java b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoIndexScanWithRelOpVisitFun.java index e31b379163..9d76caeccd 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoIndexScanWithRelOpVisitFun.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoIndexScanWithRelOpVisitFun.java @@ -174,7 +174,8 @@ private DingoIndexScanWithRelOpVisitFun() { td.tableId, td.tupleType(), td.keyMapping(), - td.version + td.version, + td.getCodecVersion() ); return new Vertex(SCAN_WITH_NO_OP, param); } else { @@ -186,7 +187,8 @@ private DingoIndexScanWithRelOpVisitFun() { DefinitionMapper.mapToDingoType(rel.getRowType()), rel.isPushDown(), td.version, - 0 + 0, + td.getCodecVersion() ); if (relOp instanceof PipeOp) { return new Vertex(SCAN_WITH_PIPE_OP, param); @@ -212,7 +214,8 @@ private DingoIndexScanWithRelOpVisitFun() { scanTs, transaction.getIsolationLevel(), transaction.getLockTimeOut(), - td.version + td.version, + td.getCodecVersion() ); return new Vertex(TXN_SCAN_WITH_NO_OP, param); } else { @@ -227,7 +230,8 @@ private DingoIndexScanWithRelOpVisitFun() { DefinitionMapper.mapToDingoType(rel.getRowType()), rel.isPushDown(), td.version, - 0 + 0, + td.getCodecVersion() ); if (relOp instanceof PipeOp) { return new Vertex(TXN_SCAN_WITH_PIPE_OP, param); diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoLikeScanVisitFun.java b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoLikeScanVisitFun.java index 4fff3bc259..104a6cb40b 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoLikeScanVisitFun.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoLikeScanVisitFun.java @@ -105,7 +105,8 @@ public static Collection visit( td.keyMapping(), Optional.mapOrNull(filter, SqlExpr::copy), rel.getSelection(), - rel.getPrefix() + rel.getPrefix(), + td.getCodecVersion() ); Vertex vertex = new Vertex(LIKE_SCAN, param); OutputHint hint = new OutputHint(); diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoRangeDeleteVisitFun.java b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoRangeDeleteVisitFun.java index d1cf76ab0d..8c62a01f1b 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoRangeDeleteVisitFun.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoRangeDeleteVisitFun.java @@ -96,14 +96,16 @@ public static Collection visit( tableInfo.getId(), td.version, td.tupleType(), - td.keyMapping()); + td.keyMapping(), + td.getCodecVersion()); deleteVertex = new Vertex(TXN_PART_RANGE_DELETE, param); } else { PartRangeDeleteParam param = new PartRangeDeleteParam( tableInfo.getId(), td.version, td.tupleType(), - td.keyMapping()); + td.keyMapping(), + td.codecVersion); deleteVertex = new Vertex(PART_RANGE_DELETE, param); } if (transaction != null) { diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoScanWithRelOpVisitFun.java b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoScanWithRelOpVisitFun.java index f8d69a1066..00a3f35520 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoScanWithRelOpVisitFun.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoScanWithRelOpVisitFun.java @@ -198,7 +198,8 @@ private DingoScanWithRelOpVisitFun() { tableInfo.getId(), td.tupleType(), td.keyMapping(), - td.version + td.version, + td.getCodecVersion() ); return new Vertex(SCAN_WITH_NO_OP, param); } else { @@ -210,7 +211,8 @@ private DingoScanWithRelOpVisitFun() { DefinitionMapper.mapToDingoType(rel.getRowType()), rel.isPushDown(), td.version, - rel.getLimit() + rel.getLimit(), + td.getCodecVersion() ); if (relOp instanceof PipeOp) { return new Vertex(SCAN_WITH_PIPE_OP, param); @@ -237,7 +239,8 @@ private DingoScanWithRelOpVisitFun() { scanTs, transaction.getIsolationLevel(), transaction.getLockTimeOut(), - td.version + td.version, + td.getCodecVersion() ); return new Vertex(TXN_SCAN_WITH_NO_OP, param); } else { @@ -252,7 +255,8 @@ private DingoScanWithRelOpVisitFun() { DefinitionMapper.mapToDingoType(rel.getRowType()), rel.isPushDown(), td.version, - rel.getLimit() + rel.getLimit(), + td.getCodecVersion() ); if (relOp instanceof PipeOp) { return new Vertex(TXN_SCAN_WITH_PIPE_OP, param); diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoTableScanVisitFun.java b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoTableScanVisitFun.java index 18d3d111d3..6aef6d19dd 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoTableScanVisitFun.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoTableScanVisitFun.java @@ -80,7 +80,7 @@ private DingoTableScanVisitFun() { if (rel.getFilter() != null) { filter = SqlExprUtils.toSqlExpr(rel.getFilter()); KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec( - td.version, td.tupleType(), td.keyMapping() + td.getCodecVersion(), td.version, td.tupleType(), td.keyMapping() ); RangeDistribution range = RangeUtils.createRangeByFilter(td, codec, rel.getFilter(), rel.getSelection()); if (range != null) { @@ -145,7 +145,8 @@ private DingoTableScanVisitFun() { scanTs, transaction.getIsolationLevel(), transaction.getLockTimeOut(), - false + false, + td.codecVersion ); scanVertex = new Vertex(TXN_PART_RANGE_SCAN, param); } else { @@ -162,7 +163,8 @@ private DingoTableScanVisitFun() { rel.getAggCalls() == null ? null : AggFactory.getAggList( rel.getAggCalls(), DefinitionMapper.mapToDingoType(rel.getSelectedType())), DefinitionMapper.mapToDingoType(rel.getNormalRowType()), - rel.isPushDown() + rel.isPushDown(), + td.getCodecVersion() ); scanVertex = new Vertex(PART_RANGE_SCAN, param); } diff --git a/dingo-client/src/main/java/io/dingodb/client/OperationServiceV2.java b/dingo-client/src/main/java/io/dingodb/client/OperationServiceV2.java index 3f0440a60d..6164d24d01 100644 --- a/dingo-client/src/main/java/io/dingodb/client/OperationServiceV2.java +++ b/dingo-client/src/main/java/io/dingodb/client/OperationServiceV2.java @@ -484,7 +484,7 @@ private List scan(String schema, Table td = Parameters.nonNull(metaService.getTable(tableName), "Table not found."); CommonId tableId = Parameters.nonNull(metaService.getTable(tableName).getTableId(), "Table not found."); io.dingodb.codec.KeyValueCodec codec = CodecService.INSTANCE.createKeyValueCodec( - tableId, td.tupleType(), td.keyMapping() + td.getCodecVersion(), tableId, td.tupleType(), td.keyMapping() ); Key start = keyRange.start; @@ -526,7 +526,8 @@ private List scan(String schema, null, null, td.tupleType(), - true + true, + td.getCodecVersion() ); Vertex scanVertex = new Vertex(PART_RANGE_SCAN, param); scanVertex.setId(idGenerator.getOperatorId(task.getId())); @@ -548,7 +549,7 @@ private List rangeDelete(String schema, String tableName, CommonId tableId = Parameters.nonNull(metaService.getTable(tableName).getTableId(), "Table not found."); Table td = Parameters.nonNull(metaService.getTable(tableName), "Table not found."); io.dingodb.codec.KeyValueCodec codec = CodecService.INSTANCE.createKeyValueCodec( - tableId, td.tupleType(), td.keyMapping() + td.codecVersion, tableId, td.tupleType(), td.keyMapping() ); byte[] startKey = codec.encodeKeyPrefix(mapKeyPrefix(td, begin), begin.userKey.size()); @@ -572,7 +573,8 @@ private List rangeDelete(String schema, String tableName, List outputs = new ArrayList<>(); for (int i = 0; i <= td.getPartitions().size(); i++) { - PartRangeDeleteParam param = new PartRangeDeleteParam(tableId, td.version, td.tupleType(), td.keyMapping()); + PartRangeDeleteParam param = new PartRangeDeleteParam( + tableId, td.version, td.tupleType(), td.keyMapping(), td.getCodecVersion()); Vertex deleteVertex = new Vertex(PART_RANGE_DELETE, param); task = job.getOrCreate(currentLocation, idGenerator); deleteVertex.setId(idGenerator.getOperatorId(task.getId())); diff --git a/dingo-codec-api/src/main/java/io/dingodb/codec/CodecService.java b/dingo-codec-api/src/main/java/io/dingodb/codec/CodecService.java index 618562c323..03e8386cf9 100644 --- a/dingo-codec-api/src/main/java/io/dingodb/codec/CodecService.java +++ b/dingo-codec-api/src/main/java/io/dingodb/codec/CodecService.java @@ -50,9 +50,12 @@ default KeyValue setId(KeyValue keyValue, CommonId id) { return new KeyValue(setId(keyValue.getKey(), id), keyValue.getValue()); } - KeyValueCodec createKeyValueCodec(int version, CommonId id, DingoType type, TupleMapping keyMapping); + KeyValueCodec createKeyValueCodec(int codecVersion, int version, CommonId id, + DingoType type, TupleMapping keyMapping); - default KeyValueCodec createKeyValueCodec(int version, CommonId id, List columns) { + default KeyValueCodec createKeyValueCodec( + int codecVersion, int version, CommonId id, List columns + ) { int[] mappings = new int[columns.size()]; int keyCount = 0; for (int i = 0; i < columns.size(); i++) { @@ -64,6 +67,7 @@ default KeyValueCodec createKeyValueCodec(int version, CommonId id, List columns) { - return createKeyValueCodec(1, id, columns); + default KeyValueCodec createKeyValueCodec(int codecVersion, CommonId id, List columns) { + return createKeyValueCodec(codecVersion, 1, id, columns); } default KeyValueCodec createKeyValueCodec(CommonId id, TableDefinition tableDefinition) { - return createKeyValueCodec(tableDefinition.getVersion(), id, tableDefinition.getColumns()); + return createKeyValueCodec( + tableDefinition.getCodecVersion(), tableDefinition.getVersion(), id, tableDefinition.getColumns() + ); } - default KeyValueCodec createKeyValueCodec(int version, DingoType type, TupleMapping keyMapping) { - return createKeyValueCodec(version, CommonId.EMPTY_TABLE, type, keyMapping); + default KeyValueCodec createKeyValueCodec(int codecVersion, int version, DingoType type, TupleMapping keyMapping) { + return createKeyValueCodec(codecVersion, version, CommonId.EMPTY_TABLE, type, keyMapping); } - default KeyValueCodec createKeyValueCodec(int version, List columns) { - return createKeyValueCodec(version, CommonId.EMPTY_TABLE, columns); + default KeyValueCodec createKeyValueCodec(int codecVersion, int version, List columns) { + return createKeyValueCodec(codecVersion, version, CommonId.EMPTY_TABLE, columns); } default KeyValueCodec createKeyValueCodec(TableDefinition tableDefinition) { - return createKeyValueCodec(tableDefinition.getVersion(), CommonId.EMPTY_TABLE, tableDefinition.getColumns()); + return createKeyValueCodec( + tableDefinition.getCodecVersion(), tableDefinition.getVersion(), + CommonId.EMPTY_TABLE, tableDefinition.getColumns()); } } diff --git a/dingo-codec-serial/src/main/java/io/dingodb/codec/serial/CodecService.java b/dingo-codec-serial/src/main/java/io/dingodb/codec/serial/CodecService.java index 12cb9cbede..a7dd191fe1 100644 --- a/dingo-codec-serial/src/main/java/io/dingodb/codec/serial/CodecService.java +++ b/dingo-codec-serial/src/main/java/io/dingodb/codec/serial/CodecService.java @@ -40,29 +40,34 @@ public CodecService get() { } @Override - public KeyValueCodec createKeyValueCodec(int version, CommonId id, DingoType type, TupleMapping keyMapping) { - return createKeyValueCodec(id, type, keyMapping); + public KeyValueCodec createKeyValueCodec( + int codecVersion, int version, CommonId id, DingoType type, TupleMapping keyMapping + ) { + return createKeyValueCodec(codecVersion, id, type, keyMapping); } @Override - public KeyValueCodec createKeyValueCodec(int version, CommonId id, List columns) { - return createKeyValueCodec(id, columns); + public KeyValueCodec createKeyValueCodec( + int codecVersion, int version, CommonId id, List columns + ) { + return createKeyValueCodec(codecVersion, id, columns); } @Override - public KeyValueCodec createKeyValueCodec(CommonId id, DingoType type, TupleMapping keyMapping) { + public KeyValueCodec createKeyValueCodec(int codecVersion, CommonId id, DingoType type, TupleMapping keyMapping) { return new DingoKeyValueCodec(type, keyMapping); } @Override - public KeyValueCodec createKeyValueCodec(CommonId id, List columns) { + public KeyValueCodec createKeyValueCodec(int codecVersion, CommonId id, List columns) { TableDefinition tableDefinition = new TableDefinition(""); tableDefinition.setColumns(columns); - return createKeyValueCodec(id, tableDefinition.getDingoType(), tableDefinition.getKeyMapping()); + return createKeyValueCodec(codecVersion, id, tableDefinition.getDingoType(), tableDefinition.getKeyMapping()); } @Override public KeyValueCodec createKeyValueCodec(CommonId id, TableDefinition tableDefinition) { - return createKeyValueCodec(id, tableDefinition.getDingoType(), tableDefinition.getKeyMapping()); + return createKeyValueCodec( + tableDefinition.getCodecVersion(), id, tableDefinition.getDingoType(), tableDefinition.getKeyMapping()); } } diff --git a/dingo-common/src/main/java/io/dingodb/common/table/IndexDefinition.java b/dingo-common/src/main/java/io/dingodb/common/table/IndexDefinition.java index 06b2132a66..f514606781 100644 --- a/dingo-common/src/main/java/io/dingodb/common/table/IndexDefinition.java +++ b/dingo-common/src/main/java/io/dingodb/common/table/IndexDefinition.java @@ -60,11 +60,12 @@ public IndexDefinition( List indices, long prepareTableId, boolean unique, List originKeyList, - List originWithKeyList + List originWithKeyList, + int codecVersion ) { super(name, columns, version, ttl, partDefinition, engine, properties, autoIncrement, replica, createSql, comment, charset, collate, tableType, rowFormat, createTime, updateTime, schemaState, - indices, prepareTableId, true); + indices, prepareTableId, true, codecVersion); this.unique = unique; this.originKeyList = originKeyList; this.originWithKeyList = originWithKeyList; @@ -100,7 +101,8 @@ public static IndexDefinition createIndexDefinition( definition.getPrepareTableId(), unique, originKeyList, - originWithKeyList + originWithKeyList, + 2 ); } } diff --git a/dingo-common/src/main/java/io/dingodb/common/table/TableDefinition.java b/dingo-common/src/main/java/io/dingodb/common/table/TableDefinition.java index a13f38b8df..fd84b1e4b8 100644 --- a/dingo-common/src/main/java/io/dingodb/common/table/TableDefinition.java +++ b/dingo-common/src/main/java/io/dingodb/common/table/TableDefinition.java @@ -142,6 +142,10 @@ public class TableDefinition { @Setter private boolean visible; + @Getter + @Setter + private int codecVersion; + @JsonCreator public TableDefinition(@JsonProperty("name") String name) { this.name = name; @@ -358,7 +362,8 @@ public TableDefinition copyWithName(String name) { this.schemaState, this.indices, prepareTableId, - this.visible + this.visible, + this.codecVersion ); } diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/DistributeOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/DistributeOperator.java index c7ab459a87..847ab75135 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/DistributeOperator.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/DistributeOperator.java @@ -78,7 +78,8 @@ public boolean push(Context context, @Nullable Object[] tuple, Vertex vertex) { } } KeyValueCodec indexCodec = CodecService.getDefault() - .createKeyValueCodec(indexTable.version, indexTable.tupleType(), indexTable.keyMapping()); + .createKeyValueCodec(indexTable.getCodecVersion(), indexTable.version, + indexTable.tupleType(), indexTable.keyMapping()); partId = ps.calcPartId(indexTuple, wrap(indexCodec::encodeKey), param.getDistributions()); } else { partId = ps.calcPartId(newTuple, wrap(param.getCodec()::encodeKey), param.getDistributions()); diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/GetByIndexOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/GetByIndexOperator.java index 4c1b684013..62a2a529d8 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/GetByIndexOperator.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/GetByIndexOperator.java @@ -20,7 +20,6 @@ import io.dingodb.common.CommonId; import io.dingodb.common.partition.RangeDistribution; import io.dingodb.common.profile.OperatorProfile; -import io.dingodb.common.store.KeyValue; import io.dingodb.common.type.TupleMapping; import io.dingodb.common.util.ByteArrayUtils; import io.dingodb.common.util.Optional; @@ -28,9 +27,7 @@ import io.dingodb.exec.dag.Vertex; import io.dingodb.exec.operator.data.Context; import io.dingodb.exec.operator.params.GetByIndexParam; -import io.dingodb.exec.operator.params.TxnGetByIndexParam; import io.dingodb.meta.MetaService; -import io.dingodb.meta.entity.Column; import io.dingodb.meta.entity.Table; import io.dingodb.partition.DingoPartitionServiceProvider; import io.dingodb.partition.PartitionService; @@ -39,8 +36,6 @@ import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.NonNull; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.NavigableMap; diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockDeleteOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockDeleteOperator.java index 9ba1adcc93..9f82b495a0 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockDeleteOperator.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockDeleteOperator.java @@ -113,7 +113,9 @@ public boolean push(Context context, @Nullable Object[] tuple, Vertex vertex) { isDocument = true; } localStore = Services.LOCAL_STORE.getInstance(context.getIndexId(), partId); - codec = CodecService.getDefault().createKeyValueCodec(indexTable.version, indexTable.tupleType(), indexTable.keyMapping()); + codec = CodecService.getDefault().createKeyValueCodec( + indexTable.getCodecVersion(), indexTable.version, indexTable.tupleType(), indexTable.keyMapping() + ); } StoreInstance kvStore = Services.KV_STORE.getInstance(tableId, partId); Object[] newTuple = (Object[]) schema.convertFrom(tuple, ValueConverter.INSTANCE); diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockInsertOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockInsertOperator.java index e125e83cd0..1dfda1ce93 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockInsertOperator.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockInsertOperator.java @@ -116,7 +116,9 @@ public boolean push(Context context, @Nullable Object[] tuple, Vertex vertex) { isDocument = true; } localStore = Services.LOCAL_STORE.getInstance(context.getIndexId(), partId); - codec = CodecService.getDefault().createKeyValueCodec(indexTable.version, indexTable.tupleType(), indexTable.keyMapping()); + codec = CodecService.getDefault().createKeyValueCodec( + indexTable.getCodecVersion(), indexTable.version, indexTable.tupleType(), indexTable.keyMapping() + ); } StoreInstance kvStore = Services.KV_STORE.getInstance(tableId, partId); Object[] newTuple = (Object[]) schema.convertFrom(tuple, ValueConverter.INSTANCE); diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockOperator.java index 03cd2b18aa..5531ec95d0 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockOperator.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockOperator.java @@ -125,7 +125,8 @@ public boolean push(Context context, @Nullable Object[] tuple, Vertex vertex) { isDocument = true; } localStore = Services.LOCAL_STORE.getInstance(context.getIndexId(), partId); - codec = CodecService.getDefault().createKeyValueCodec(indexTable.version, indexTable.tupleType(), indexTable.keyMapping()); + codec = CodecService.getDefault().createKeyValueCodec(indexTable.getCodecVersion(), indexTable.version, + indexTable.tupleType(), indexTable.keyMapping()); } StoreInstance kvStore = Services.KV_STORE.getInstance(tableId, partId); Object[] newTuple; diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockUpdateOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockUpdateOperator.java index fdad9d8247..3693aeee65 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockUpdateOperator.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/PessimisticLockUpdateOperator.java @@ -157,7 +157,7 @@ public boolean push(Context context, @Nullable Object[] tuple, Vertex vertex) { } isUnique = index.unique; codec = CodecService.getDefault().createKeyValueCodec( - indexTable.version, indexTable.tupleType(), indexTable.keyMapping() + indexTable.getCodecVersion(), indexTable.version, indexTable.tupleType(), indexTable.keyMapping() ); if (updated && columnIndices.stream().anyMatch(c -> mapping.contains(c))) { PartitionService ps = PartitionService.getService( diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnDiskAnnStatusOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnDiskAnnStatusOperator.java index 3f3603d7a6..fa6fd9fc47 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnDiskAnnStatusOperator.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnDiskAnnStatusOperator.java @@ -17,7 +17,6 @@ package io.dingodb.exec.operator; import io.dingodb.common.profile.OperatorProfile; -import io.dingodb.common.util.Pair; import io.dingodb.exec.Services; import io.dingodb.exec.dag.Vertex; import io.dingodb.exec.operator.params.TxnDiskAnnStatusParam; diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnGetByKeysOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnGetByKeysOperator.java index 0fd1d27b0a..5c6ec58b21 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnGetByKeysOperator.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnGetByKeysOperator.java @@ -24,7 +24,6 @@ import io.dingodb.exec.Services; import io.dingodb.exec.dag.Vertex; import io.dingodb.exec.operator.data.Context; -import io.dingodb.exec.operator.params.GetByKeysParam; import io.dingodb.exec.operator.params.TxnGetByKeysParam; import io.dingodb.exec.transaction.base.TransactionType; import io.dingodb.exec.utils.ByteUtils; diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartDeleteOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartDeleteOperator.java index beaf57c580..9890d00d25 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartDeleteOperator.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartDeleteOperator.java @@ -107,7 +107,7 @@ protected boolean pushTuple(Context context, Object[] tuple, Vertex vertex) { } localStore = Services.LOCAL_STORE.getInstance(context.getIndexId(), partId); codec = CodecService.getDefault().createKeyValueCodec( - indexTable.version, indexTable.tupleType(), indexTable.keyMapping() + indexTable.getCodecVersion(), indexTable.version, indexTable.tupleType(), indexTable.keyMapping() ); } byte[] keys = wrap(codec::encodeKey).apply(tuple); diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartDocumentOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartDocumentOperator.java index 50c15a40b2..260c6726c3 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartDocumentOperator.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartDocumentOperator.java @@ -56,10 +56,6 @@ public class TxnPartDocumentOperator extends FilterProjectSourceOperator { TxnPartDocumentParam param = vertex.getParam(); OperatorProfile profile = param.getProfile("partDocument"); long start = System.currentTimeMillis(); - KeyValueCodec tableCodec; - tableCodec = CodecService.getDefault().createKeyValueCodec( - param.getTable().version, param.getTableDataSchema(), param.tableDataKeyMapping() - ); StoreInstance instance = Services.KV_STORE.getInstance(param.getTableId(), param.getPartId()); DocumentSearchParameter documentSearchParameter = DocumentSearchParameter.builder() .topN(param.getTopN()) diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartInsertOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartInsertOperator.java index d6531e7d2b..2fe8e62fce 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartInsertOperator.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartInsertOperator.java @@ -129,7 +129,7 @@ protected boolean pushTuple(Context context, Object[] tuple, Vertex vertex) { schema = indexTable.tupleType(); localStore = Services.LOCAL_STORE.getInstance(context.getIndexId(), partId); codec = CodecService.getDefault().createKeyValueCodec( - indexTable.version, indexTable.tupleType(), indexTable.keyMapping() + indexTable.getCodecVersion(), indexTable.version, indexTable.tupleType(), indexTable.keyMapping() ); } Object[] newTuple = (Object[]) schema.convertFrom(tuple, ValueConverter.INSTANCE); diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartUpdateOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartUpdateOperator.java index 76344e3553..fffc384a12 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartUpdateOperator.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartUpdateOperator.java @@ -128,7 +128,7 @@ protected boolean pushTuple(Context context, Object[] tuple, Vertex vertex) { tableId = context.getIndexId(); schema = indexTable.tupleType(); codec = CodecService.getDefault().createKeyValueCodec( - indexTable.version, indexTable.tupleType(), indexTable.keyMapping() + indexTable.getCodecVersion(), indexTable.version, indexTable.tupleType(), indexTable.keyMapping() ); Object[] finalNewTuple = newTuple; Object finalDefaultVal = defaultVal; diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartVectorOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartVectorOperator.java index d74fddfc80..a1e3678788 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartVectorOperator.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartVectorOperator.java @@ -63,7 +63,8 @@ public class TxnPartVectorOperator extends FilterProjectSourceOperator { String distanceType = param.getDistanceType(); KeyValueCodec tableCodec; tableCodec = CodecService.getDefault().createKeyValueCodec( - param.getIndexTable().version, param.getIndexTable().tupleType(), param.getIndexTable().keyMapping() + param.getIndexTable().getCodecVersion(), param.getIndexTable().version, param.getIndexTable().tupleType(), + param.getIndexTable().keyMapping() ); StoreInstance instance = Services.KV_STORE.getInstance(param.getTableId(), param.getPartId()); List searchResponseList = instance.vectorSearch( diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnScanWithRelOpOperatorBase.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnScanWithRelOpOperatorBase.java index 312caa0fe1..23ad976985 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnScanWithRelOpOperatorBase.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnScanWithRelOpOperatorBase.java @@ -20,7 +20,6 @@ import io.dingodb.common.CommonId; import io.dingodb.common.CoprocessorV2; import io.dingodb.common.partition.RangeDistribution; -import io.dingodb.common.profile.OperatorProfile; import io.dingodb.common.profile.SourceProfile; import io.dingodb.common.store.KeyValue; import io.dingodb.exec.Services; diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/DistributionParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/DistributionParam.java index 7831e32722..e738892aaa 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/DistributionParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/DistributionParam.java @@ -72,6 +72,7 @@ public DistributionParam( @Override public void init(Vertex vertex) { super.init(vertex); - codec = CodecService.getDefault().createKeyValueCodec(table.version, table.tupleType(), table.keyMapping()); + codec = CodecService.getDefault().createKeyValueCodec( + table.getCodecVersion(), table.version, table.tupleType(), table.keyMapping()); } } diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/DocumentPartitionParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/DocumentPartitionParam.java index 9fe3c3a920..782f2b3cc2 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/DocumentPartitionParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/DocumentPartitionParam.java @@ -67,7 +67,8 @@ public DocumentPartitionParam( DingoType dingoType = new LongType(false); TupleType tupleType = DingoTypeFactory.tuple(new DingoType[]{dingoType}); TupleMapping outputKeyMapping = TupleMapping.of(new int[] {0}); - this.codec = CodecService.getDefault().createKeyValueCodec(indexId, tupleType, outputKeyMapping); + this.codec = CodecService.getDefault().createKeyValueCodec(table.codecVersion, + indexId, tupleType, outputKeyMapping); this.table = table; } diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/FilterProjectParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/FilterProjectParam.java index 42ef8c7a19..ff93688a32 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/FilterProjectParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/FilterProjectParam.java @@ -37,6 +37,8 @@ public abstract class FilterProjectParam extends AbstractParams { protected final DingoType schema; @JsonProperty("schemaVersion") protected final int schemaVersion; + @JsonProperty("codecVersion") + protected final int codecVersion; @JsonProperty("filter") protected SqlExpr filter; @JsonProperty("selection") @@ -50,7 +52,8 @@ public FilterProjectParam( int schemaVersion, SqlExpr filter, TupleMapping selection, - TupleMapping keyMapping + TupleMapping keyMapping, + int codecVersion ) { super(); this.tableId = tableId; @@ -59,6 +62,7 @@ public FilterProjectParam( this.filter = filter; this.selection = selection; this.keyMapping = keyMapping; + this.codecVersion = codecVersion; } @Override diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/FilterProjectSourceParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/FilterProjectSourceParam.java index 4160433e18..69b787bacf 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/FilterProjectSourceParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/FilterProjectSourceParam.java @@ -36,6 +36,8 @@ public abstract class FilterProjectSourceParam extends SourceParam { protected final DingoType schema; @JsonProperty("schemaVersion") protected final int schemaVersion; + @JsonProperty("codecVersion") + protected final int codecVersion; @JsonProperty("filter") protected SqlExpr filter; @JsonProperty("selection") @@ -43,6 +45,26 @@ public abstract class FilterProjectSourceParam extends SourceParam { @JsonProperty("keyMapping") protected final TupleMapping keyMapping; + public FilterProjectSourceParam( + CommonId tableId, + CommonId partId, + DingoType schema, + int schemaVersion, + SqlExpr filter, + TupleMapping selection, + TupleMapping keyMapping, + int codecVersion + ) { + super(partId, null); + this.tableId = tableId; + this.schema = schema; + this.schemaVersion = schemaVersion; + this.filter = filter; + this.selection = selection; + this.keyMapping = keyMapping; + this.codecVersion = codecVersion; + } + public FilterProjectSourceParam( CommonId tableId, CommonId partId, @@ -59,6 +81,7 @@ public FilterProjectSourceParam( this.filter = filter; this.selection = selection; this.keyMapping = keyMapping; + this.codecVersion = 2; } @Override diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/GetByIndexParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/GetByIndexParam.java index fb9726c34a..8301c54a00 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/GetByIndexParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/GetByIndexParam.java @@ -65,14 +65,14 @@ public GetByIndexParam( Table table, boolean isLookup ) { - super(tableId, table.tupleType(), table.getVersion(), filter, selection, keyMapping); + super(tableId, table.tupleType(), table.getVersion(), filter, selection, keyMapping, index.getCodecVersion()); this.indexTableId = indexTableId; this.isLookup = isLookup; this.isUnique = isUnique; this.index = index; this.table = table; this.codec = CodecService.getDefault().createKeyValueCodec( - index.getVersion(), index.tupleType(), index.keyMapping() + index.getCodecVersion(), index.getVersion(), index.tupleType(), index.keyMapping() ); } @@ -81,7 +81,7 @@ public void init(Vertex vertex) { super.init(vertex); if (isLookup()) { lookupCodec = CodecService.getDefault().createKeyValueCodec( - table.getVersion(), table.tupleType(), table.keyMapping() + table.getCodecVersion(), table.getVersion(), table.tupleType(), table.keyMapping() ); } else { mapList = mapping(selection, table, index); diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/GetByKeysParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/GetByKeysParam.java index bac48daf74..236fc38253 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/GetByKeysParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/GetByKeysParam.java @@ -44,8 +44,9 @@ public GetByKeysParam( TupleMapping selection, Table table ) { - super(tableId, schema, schemaVersion, filter, selection, keyMapping); - this.codec = CodecService.getDefault().createKeyValueCodec(schemaVersion, table.tupleType(), table.keyMapping()); + super(tableId, schema, schemaVersion, filter, selection, keyMapping, table.codecVersion); + this.codec = CodecService.getDefault().createKeyValueCodec( + table.codecVersion, schemaVersion, table.tupleType(), table.keyMapping()); this.table = table; } diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/GetDistributionParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/GetDistributionParam.java index 62ddddb8f6..14c067ea0f 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/GetDistributionParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/GetDistributionParam.java @@ -60,7 +60,8 @@ public GetDistributionParam( @Override public void init(Vertex vertex) { super.init(vertex); - this.codec = CodecService.getDefault().createKeyValueCodec(table.version, table.tupleType(), table.keyMapping()); + this.codec = CodecService.getDefault().createKeyValueCodec( + table.getCodecVersion(), table.version, table.tupleType(), table.keyMapping()); } } diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/InfoSchemaScanParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/InfoSchemaScanParam.java index 07dfb70cfe..6a327ce9e9 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/InfoSchemaScanParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/InfoSchemaScanParam.java @@ -45,7 +45,7 @@ public InfoSchemaScanParam(DingoType schema, SqlExpr filter, TupleMapping selection, String target) { - super(null, null, schema, schemaVersion, filter, selection, null); + super(null, null, schema, schemaVersion, filter, selection, null, 2); this.schema = schema; this.filter = filter; this.selection = selection; diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/LikeScanParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/LikeScanParam.java index 9ddbca9420..1953e22034 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/LikeScanParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/LikeScanParam.java @@ -47,15 +47,16 @@ public LikeScanParam( @JsonProperty("keyMapping") TupleMapping keyMapping, @JsonProperty("filter") SqlExpr filter, @JsonProperty("selection") TupleMapping selection, - @JsonProperty("prefix") byte[] prefix + @JsonProperty("prefix") byte[] prefix, + @JsonProperty("codecVersion") int codecVersion ) { - super(tableId, partId, schema, schemaVersion, filter, selection, keyMapping); + super(tableId, partId, schema, schemaVersion, filter, selection, keyMapping, codecVersion); this.prefix = prefix; } @Override public void init(Vertex vertex) { super.init(vertex); - this.codec = CodecService.getDefault().createKeyValueCodec(schemaVersion, schema, keyMapping); + this.codec = CodecService.getDefault().createKeyValueCodec(codecVersion, schemaVersion, schema, keyMapping); } } diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PartDocumentParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PartDocumentParam.java index 6c449948b3..1e3e34d237 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PartDocumentParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PartDocumentParam.java @@ -59,8 +59,9 @@ public PartDocumentParam( String queryString, int topN ) { - super(tableId, partId, schema, table.version, filter, selection, keyMapping); - this.codec = CodecService.getDefault().createKeyValueCodec(table.version, table.tupleType(), table.keyMapping()); + super(tableId, partId, schema, table.version, filter, selection, keyMapping, table.codecVersion); + this.codec = CodecService.getDefault().createKeyValueCodec( + table.getCodecVersion(), table.version, table.tupleType(), table.keyMapping()); this.table = table; this.distributions = distributions; this.indexId = indexId; diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PartModifyParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PartModifyParam.java index 1ba539af30..70cf442bfd 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PartModifyParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PartModifyParam.java @@ -61,7 +61,8 @@ public PartModifyParam( this.tableId = tableId; this.schema = schema; this.keyMapping = keyMapping; - this.codec = CodecService.getDefault().createKeyValueCodec(table.version, table.tupleType(), table.keyMapping()); + this.codec = CodecService.getDefault().createKeyValueCodec( + table.getCodecVersion(), table.version, table.tupleType(), table.keyMapping()); this.table = table; } diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PartRangeDeleteParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PartRangeDeleteParam.java index 97c59e4f09..80b78108a7 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PartRangeDeleteParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PartRangeDeleteParam.java @@ -46,22 +46,25 @@ public class PartRangeDeleteParam extends AbstractParams { private KeyValueCodec codec; private int schemaVersion; + private int codecVersion; public PartRangeDeleteParam( CommonId tableId, int schemaVersion, DingoType schema, - TupleMapping keyMapping + TupleMapping keyMapping, + int codecVersion ) { super(); this.tableId = tableId; this.schema = schema; this.keyMapping = keyMapping; this.schemaVersion = schemaVersion; + this.codecVersion = codecVersion; } @Override public void init(Vertex vertex) { - this.codec = CodecService.getDefault().createKeyValueCodec(schemaVersion, schema, keyMapping); + this.codec = CodecService.getDefault().createKeyValueCodec(codecVersion, schemaVersion, schema, keyMapping); } } diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PartRangeScanParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PartRangeScanParam.java index 7c19633ba6..8cc9af178e 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PartRangeScanParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PartRangeScanParam.java @@ -68,9 +68,10 @@ public PartRangeScanParam( TupleMapping aggKeys, List aggList, DingoType outputSchema, - boolean pushDown + boolean pushDown, + int codecVersion ) { - super(tableId, schema, schemaVersion, filter, selection, keyMapping); + super(tableId, schema, schemaVersion, filter, selection, keyMapping, codecVersion); this.aggKeys = aggKeys; this.aggList = aggList; this.outputSchema = outputSchema; @@ -123,9 +124,10 @@ public void init(Vertex vertex) { outputSchema, outputKeyMapping, tableId.seq )); coprocessor = builder.build(); - codec = CodecService.getDefault().createKeyValueCodec(schemaVersion, outputSchema, outputKeyMapping); + codec = CodecService.getDefault().createKeyValueCodec( + codecVersion, schemaVersion, outputSchema, outputKeyMapping); return; } - codec = CodecService.getDefault().createKeyValueCodec(schemaVersion, schema, keyMapping); + codec = CodecService.getDefault().createKeyValueCodec(codecVersion, schemaVersion, schema, keyMapping); } } diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PartUpdateParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PartUpdateParam.java index 953234b37d..06fb167343 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PartUpdateParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PartUpdateParam.java @@ -20,18 +20,14 @@ import com.fasterxml.jackson.annotation.JsonPropertyOrder; import com.fasterxml.jackson.annotation.JsonTypeName; import io.dingodb.common.CommonId; -import io.dingodb.common.partition.RangeDistribution; import io.dingodb.common.type.DingoType; import io.dingodb.common.type.TupleMapping; -import io.dingodb.common.util.ByteArrayUtils; import io.dingodb.exec.dag.Vertex; import io.dingodb.exec.expr.SqlExpr; import io.dingodb.meta.entity.Table; import lombok.Getter; -import lombok.Setter; import java.util.List; -import java.util.NavigableMap; @Getter @JsonTypeName("update") diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PartVectorParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PartVectorParam.java index 8c87f18449..260a47e742 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PartVectorParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/PartVectorParam.java @@ -62,7 +62,8 @@ public PartVectorParam( Map parameterMap ) { super(tableId, partId, schema, table.version, filter, selection, keyMapping); - this.codec = CodecService.getDefault().createKeyValueCodec(table.version, table.tupleType(), table.keyMapping()); + this.codec = CodecService.getDefault().createKeyValueCodec( + table.getCodecVersion(), table.version, table.tupleType(), table.keyMapping()); this.table = table; this.distributions = distributions; this.indexId = indexId; diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/ScanParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/ScanParam.java index d2bb759598..574ab66c33 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/ScanParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/ScanParam.java @@ -24,7 +24,6 @@ import io.dingodb.codec.CodecService; import io.dingodb.codec.KeyValueCodec; import io.dingodb.common.CommonId; -import io.dingodb.common.config.DingoConfiguration; import io.dingodb.common.profile.OperatorProfile; import io.dingodb.common.profile.Profile; import io.dingodb.common.profile.SourceProfile; @@ -58,22 +57,25 @@ public class ScanParam extends AbstractParams { @Getter protected List profileList; protected int schemaVersion; + protected int codecVersion; public ScanParam( CommonId tableId, @NonNull DingoType schema, TupleMapping keyMapping, - int schemaVersion + int schemaVersion, + int codecVersion ) { super(null, null); this.tableId = tableId; this.schema = schema; this.keyMapping = keyMapping; this.schemaVersion = schemaVersion; + this.codecVersion = codecVersion; } public KeyValueCodec getCodec() { - return CodecService.getDefault().createKeyValueCodec(schemaVersion, schema, keyMapping); + return CodecService.getDefault().createKeyValueCodec(codecVersion, schemaVersion, schema, keyMapping); } public synchronized OperatorProfile getProfile(String type) { diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/ScanWithRelOpParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/ScanWithRelOpParam.java index 4f7ac16149..43f4219b3c 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/ScanWithRelOpParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/ScanWithRelOpParam.java @@ -122,9 +122,10 @@ public ScanWithRelOpParam( DingoType outputSchema, boolean pushDown, int schemaVersion, - int limit + int limit, + int codecVersion ) { - super(tableId, schema, keyMapping, schemaVersion); + super(tableId, schema, keyMapping, schemaVersion, codecVersion); this.relOp = relOp; this.outputSchema = outputSchema; this.pushDown = pushDown; @@ -162,7 +163,8 @@ public void init(Vertex vertex) { public KeyValueCodec getPushDownCodec() { TupleMapping outputKeyMapping = TupleMapping.of(new int[]{}); - return CodecService.getDefault().createKeyValueCodec(schemaVersion, outputSchema, outputKeyMapping); + return CodecService.getDefault().createKeyValueCodec( + codecVersion, schemaVersion, outputSchema, outputKeyMapping); } @Override diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnDiskAnnBuildParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnDiskAnnBuildParam.java index 0abd98a896..9274f34e36 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnDiskAnnBuildParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnDiskAnnBuildParam.java @@ -72,7 +72,8 @@ public TxnDiskAnnBuildParam( long timeOut, long ts ) { - super(table.tableId, partId, schema, table.version, filter, selection, table.keyMapping()); + super(table.tableId, partId, schema, table.version, filter, selection, + table.keyMapping(), indexTable.codecVersion); this.table = table; this.distributions = distributions; this.indexId = indexTable.tableId; @@ -86,7 +87,8 @@ public TxnDiskAnnBuildParam( @Override public void init(Vertex vertex) { super.init(vertex); - codec = CodecService.getDefault().createKeyValueCodec(schemaVersion, schema, keyMapping); + codec = CodecService.getDefault().createKeyValueCodec( + indexTable.getCodecVersion(), schemaVersion, schema, keyMapping); } @Override diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnDiskAnnCountMemoryParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnDiskAnnCountMemoryParam.java index 3b10d92ab6..03a6bfba73 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnDiskAnnCountMemoryParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnDiskAnnCountMemoryParam.java @@ -82,7 +82,8 @@ public TxnDiskAnnCountMemoryParam( @Override public void init(Vertex vertex) { super.init(vertex); - codec = CodecService.getDefault().createKeyValueCodec(schemaVersion, schema, keyMapping); + codec = CodecService.getDefault().createKeyValueCodec( + table.getCodecVersion(), schemaVersion, schema, keyMapping); } @Override diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnDiskAnnLoadParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnDiskAnnLoadParam.java index 676879d718..5886879628 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnDiskAnnLoadParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnDiskAnnLoadParam.java @@ -91,7 +91,8 @@ public TxnDiskAnnLoadParam( @Override public void init(Vertex vertex) { super.init(vertex); - codec = CodecService.getDefault().createKeyValueCodec(schemaVersion, schema, keyMapping); + codec = CodecService.getDefault().createKeyValueCodec( + table.getCodecVersion(), schemaVersion, schema, keyMapping); } @Override diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnDiskAnnResetParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnDiskAnnResetParam.java index e1989b1f33..92dd5ee1a2 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnDiskAnnResetParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnDiskAnnResetParam.java @@ -69,7 +69,8 @@ public TxnDiskAnnResetParam( int isolationLevel, long timeOut ) { - super(table.tableId, partId, schema, table.version, filter, selection, table.keyMapping()); + super(table.tableId, partId, schema, table.version, filter, selection, + table.keyMapping(), indexTable.getCodecVersion()); this.table = table; this.distributions = distributions; this.indexId = indexTable.tableId; @@ -82,7 +83,8 @@ public TxnDiskAnnResetParam( @Override public void init(Vertex vertex) { super.init(vertex); - codec = CodecService.getDefault().createKeyValueCodec(schemaVersion, schema, keyMapping); + codec = CodecService.getDefault().createKeyValueCodec( + indexTable.getCodecVersion(), schemaVersion, schema, keyMapping); } @Override diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnDiskAnnStatusParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnDiskAnnStatusParam.java index c4712b30a2..370882be5b 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnDiskAnnStatusParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnDiskAnnStatusParam.java @@ -71,7 +71,8 @@ public TxnDiskAnnStatusParam( int isolationLevel, long timeOut ) { - super(table.tableId, partId, schema, table.version, filter, selection, table.keyMapping()); + super(table.tableId, partId, schema, table.version, filter, selection, + table.keyMapping(), indexTable.codecVersion); this.table = table; this.distributions = distributions; this.indexId = indexTable.tableId; @@ -84,7 +85,8 @@ public TxnDiskAnnStatusParam( @Override public void init(Vertex vertex) { super.init(vertex); - codec = CodecService.getDefault().createKeyValueCodec(schemaVersion, schema, keyMapping); + codec = CodecService.getDefault().createKeyValueCodec( + indexTable.getCodecVersion(), schemaVersion, schema, keyMapping); } @Override diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnGetByIndexParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnGetByIndexParam.java index c1993d2332..f94945c7e8 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnGetByIndexParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnGetByIndexParam.java @@ -71,7 +71,7 @@ public TxnGetByIndexParam( long scanTs, long timeout ) { - super(tableId, table.tupleType(), table.version, filter, selection, keyMapping); + super(tableId, table.tupleType(), table.version, filter, selection, keyMapping, table.getCodecVersion()); this.indexTableId = indexTableId; this.isLookup = isLookup; this.isUnique = isUnique; @@ -79,14 +79,16 @@ public TxnGetByIndexParam( this.table = table; this.scanTs = scanTs; this.timeout = timeout; - this.codec = CodecService.getDefault().createKeyValueCodec(index.version, index.tupleType(), index.keyMapping()); + this.codec = CodecService.getDefault().createKeyValueCodec( + index.getCodecVersion(), index.version, index.tupleType(), index.keyMapping()); } @Override public void init(Vertex vertex) { super.init(vertex); if (isLookup()) { - lookupCodec = CodecService.getDefault().createKeyValueCodec(table.version, table.tupleType(), table.keyMapping()); + lookupCodec = CodecService.getDefault().createKeyValueCodec( + table.getCodecVersion(), table.version, table.tupleType(), table.keyMapping()); } else { mapList = mapping(selection, table, index); } diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnGetByKeysParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnGetByKeysParam.java index 5866536b23..176aed3ea2 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnGetByKeysParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnGetByKeysParam.java @@ -60,8 +60,9 @@ public TxnGetByKeysParam( long timeOut, boolean isSelect ) { - super(tableId, schema, table.version, filter, selection, keyMapping); - this.codec = CodecService.getDefault().createKeyValueCodec(table.version, table.tupleType(), table.keyMapping()); + super(tableId, schema, table.version, filter, selection, keyMapping, table.getCodecVersion()); + this.codec = CodecService.getDefault().createKeyValueCodec( + table.getCodecVersion(), table.version, table.tupleType(), table.keyMapping()); this.table = table; this.scanTs = scanTs; this.isolationLevel = isolationLevel; diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnIndexRangeScanParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnIndexRangeScanParam.java index 0a090dfa02..cf63136933 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnIndexRangeScanParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnIndexRangeScanParam.java @@ -83,7 +83,8 @@ public TxnIndexRangeScanParam(CommonId indexTableId, boolean pushDown, TupleMapping selection, int limit) { - super(tableId, index.tupleType(), keyMapping, relOp, outputSchema, pushDown, index.getVersion(), limit); + super(tableId, index.tupleType(), keyMapping, relOp, outputSchema, + pushDown, index.getVersion(), limit, table.getCodecVersion()); this.indexSchema = index.tupleType(); this.indexTableId = indexTableId; this.isLookup = isLookup; @@ -93,9 +94,11 @@ public TxnIndexRangeScanParam(CommonId indexTableId, this.scanTs = scanTs; this.timeout = timeout; this.selection = selection; - this.codec = CodecService.getDefault().createKeyValueCodec(index.version, index.tupleType(), index.keyMapping()); + this.codec = CodecService.getDefault().createKeyValueCodec( + index.getCodecVersion(), index.version, index.tupleType(), index.keyMapping()); if (isLookup) { - lookupCodec = CodecService.getDefault().createKeyValueCodec(table.version, table.tupleType(), table.keyMapping()); + lookupCodec = CodecService.getDefault().createKeyValueCodec( + table.getCodecVersion(), table.version, table.tupleType(), table.keyMapping()); } else { this.mapList = index.getColumns().stream().map(table.columns::indexOf).collect(Collectors.toList()); } @@ -144,7 +147,8 @@ public TupleMapping indexKeyMapping() { public KeyValueCodec getPushDownCodec() { TupleMapping outputKeyMapping = TupleMapping.of(new int[]{}); - return CodecService.getDefault().createKeyValueCodec(schemaVersion, indexSchema, outputKeyMapping); + return CodecService.getDefault().createKeyValueCodec( + index.getCodecVersion(), schemaVersion, indexSchema, outputKeyMapping); } } diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnLikeScanParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnLikeScanParam.java index bc5935bee2..3d841f67da 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnLikeScanParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnLikeScanParam.java @@ -47,15 +47,16 @@ public TxnLikeScanParam( @JsonProperty("keyMapping") TupleMapping keyMapping, @JsonProperty("filter") SqlExpr filter, @JsonProperty("selection") TupleMapping selection, - @JsonProperty("prefix") byte[] prefix + @JsonProperty("prefix") byte[] prefix, + @JsonProperty("codecVersion") int codecVersion ) { - super(tableId, partId, schema, schemaVersion, filter, selection, keyMapping); + super(tableId, partId, schema, schemaVersion, filter, selection, keyMapping, codecVersion); this.prefix = prefix; } @Override public void init(Vertex vertex) { super.init(vertex); - this.codec = CodecService.getDefault().createKeyValueCodec(schemaVersion, schema, keyMapping); + this.codec = CodecService.getDefault().createKeyValueCodec(codecVersion, schemaVersion, schema, keyMapping); } } diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartDocumentParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartDocumentParam.java index 07f69b6ac5..da15fe5971 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartDocumentParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartDocumentParam.java @@ -101,7 +101,8 @@ public TxnPartDocumentParam( long timeOut, TupleMapping resultSelection ) { - super(table.tableId, partId, schema, table.version, filter, selection, table.keyMapping()); + super(table.tableId, partId, schema, table.version, filter, selection, + table.keyMapping(), indexTable.getCodecVersion()); this.table = table; this.distributions = distributions; this.indexId = indexTable.tableId; @@ -151,7 +152,7 @@ tableDataSchema, tableDataKeyMapping(), 0 coprocessor = builder.build(); } - codec = CodecService.getDefault().createKeyValueCodec(schemaVersion, schema, keyMapping); + codec = CodecService.getDefault().createKeyValueCodec(codecVersion, schemaVersion, schema, keyMapping); } @Override diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartModifyParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartModifyParam.java index 92960262a2..f13de89ec7 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartModifyParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartModifyParam.java @@ -22,7 +22,6 @@ import io.dingodb.common.type.TupleMapping; import io.dingodb.meta.entity.Table; import lombok.Getter; -import lombok.Setter; @Getter public abstract class TxnPartModifyParam extends PartModifyParam { diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartRangeDeleteParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartRangeDeleteParam.java index b5701c6ff0..c947f8b17a 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartRangeDeleteParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartRangeDeleteParam.java @@ -46,22 +46,25 @@ public class TxnPartRangeDeleteParam extends AbstractParams { private transient KeyValueCodec codec; private int schemaVersion; + private int codecVersion; public TxnPartRangeDeleteParam( CommonId tableId, int schemaVersion, DingoType schema, - TupleMapping keyMapping + TupleMapping keyMapping, + int codecVersion ) { super(); this.tableId = tableId; this.schema = schema; this.keyMapping = keyMapping; this.schemaVersion = schemaVersion; + this.codecVersion = codecVersion; } @Override public void init(Vertex vertex) { - this.codec = CodecService.getDefault().createKeyValueCodec(schemaVersion, schema, keyMapping); + this.codec = CodecService.getDefault().createKeyValueCodec(codecVersion, schemaVersion, schema, keyMapping); } } diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartRangeScanParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartRangeScanParam.java index 2019de9f12..477b4e9084 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartRangeScanParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartRangeScanParam.java @@ -77,9 +77,10 @@ public TxnPartRangeScanParam( long scanTs, int isolationLevel, long timeOut, - boolean pushDown + boolean pushDown, + int codecVersion ) { - super(tableId, schema, schemaVersion, filter, selection, keyMapping); + super(tableId, schema, schemaVersion, filter, selection, keyMapping, codecVersion); this.aggKeys = aggKeys; this.aggList = aggList; this.outputSchema = outputSchema; @@ -135,9 +136,10 @@ public void init(Vertex vertex) { outputSchema, outputKeyMapping, tableId.seq )); coprocessor = builder.build(); - codec = CodecService.getDefault().createKeyValueCodec(schemaVersion, outputSchema, outputKeyMapping); + codec = CodecService.getDefault().createKeyValueCodec( + codecVersion, schemaVersion, outputSchema, outputKeyMapping); } - codec = CodecService.getDefault().createKeyValueCodec(schemaVersion, schema, keyMapping); + codec = CodecService.getDefault().createKeyValueCodec(codecVersion, schemaVersion, schema, keyMapping); } @Override diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartVectorParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartVectorParam.java index ea0d663dac..c5ba751366 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartVectorParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartVectorParam.java @@ -108,7 +108,8 @@ public TxnPartVectorParam( int vectorIndex, String distanceType ) { - super(table.tableId, partId, schema, table.version, filter, selection, table.keyMapping()); + super(table.tableId, partId, schema, + table.version, filter, selection, table.keyMapping(), table.getCodecVersion()); this.table = table; this.distributions = distributions; this.indexId = indexTable.tableId; @@ -161,7 +162,7 @@ tableDataSchema, tableDataKeyMapping(), 0 coprocessor = builder.build(); } - codec = CodecService.getDefault().createKeyValueCodec(schemaVersion, schema, keyMapping); + codec = CodecService.getDefault().createKeyValueCodec(codecVersion, schemaVersion, schema, keyMapping); } @Override diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnScanParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnScanParam.java index 6c80a92a1c..ddf6f264a2 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnScanParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnScanParam.java @@ -50,9 +50,10 @@ public TxnScanParam( long scanTs, int isolationLevel, long timeOut, - int schemaVersion + int schemaVersion, + int codecVersion ) { - super(tableId, schema, keyMapping, schemaVersion); + super(tableId, schema, keyMapping, schemaVersion, codecVersion); this.scanTs = scanTs; this.isolationLevel = isolationLevel; this.timeOut = timeOut; diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnScanWithRelOpParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnScanWithRelOpParam.java index bdd1a57c37..6529e64228 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnScanWithRelOpParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnScanWithRelOpParam.java @@ -58,9 +58,10 @@ public TxnScanWithRelOpParam( DingoType outputSchema, boolean pushDown, int schemaVersion, - int limit + int limit, + int codecVersion ) { - super(tableId, schema, keyMapping, relOp, outputSchema, pushDown, schemaVersion, limit); + super(tableId, schema, keyMapping, relOp, outputSchema, pushDown, schemaVersion, limit, codecVersion); this.scanTs = scanTs; this.isolationLevel = isolationLevel; this.timeOut = timeOut; diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/VectorPartitionParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/VectorPartitionParam.java index 5ee5443a71..cfe8a044f0 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/VectorPartitionParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/VectorPartitionParam.java @@ -67,7 +67,8 @@ public VectorPartitionParam( DingoType dingoType = new LongType(false); TupleType tupleType = DingoTypeFactory.tuple(new DingoType[]{dingoType}); TupleMapping outputKeyMapping = TupleMapping.of(new int[] {0}); - this.codec = CodecService.getDefault().createKeyValueCodec(indexId, tupleType, outputKeyMapping); + this.codec = CodecService.getDefault().createKeyValueCodec( + table.getCodecVersion(), indexId, tupleType, outputKeyMapping); this.table = table; } diff --git a/dingo-exec/src/main/java/io/dingodb/exec/transaction/util/TransactionCacheToMutation.java b/dingo-exec/src/main/java/io/dingodb/exec/transaction/util/TransactionCacheToMutation.java index 28666470b6..155cb692f5 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/transaction/util/TransactionCacheToMutation.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/transaction/util/TransactionCacheToMutation.java @@ -74,7 +74,7 @@ public final class TransactionCacheToMutation { TupleMapping mapping = TupleMapping.of(new int[]{0}); DingoType dingoType = new LongType(false); TupleType tupleType = DingoTypeFactory.tuple(new DingoType[]{dingoType}); - CODEC = CodecService.getDefault().createKeyValueCodec(1, tupleType, mapping); + CODEC = CodecService.getDefault().createKeyValueCodec(2, 1, tupleType, mapping); } private TransactionCacheToMutation() { @@ -94,7 +94,7 @@ public static Mutation cacheToMutation(@Nullable int op, @NonNull byte[] key, return new Mutation(Op.forNumber(op), key, value, forUpdateTs, null, null); } KeyValueCodec keyValueCodec = CodecService.getDefault().createKeyValueCodec( - index.tableId, index.tupleType(), index.keyMapping() + index.getCodecVersion(), index.tableId, index.tupleType(), index.keyMapping() ); Table table = (Table) TransactionManager.getTable(txnId, index.primaryId); if (table == null) { @@ -111,7 +111,7 @@ public static Mutation cacheToMutation(@Nullable int op, @NonNull byte[] key, } } key = CodecService.getDefault() - .createKeyValueCodec(table.version, table.tupleType(), table.keyMapping()) + .createKeyValueCodec(table.getCodecVersion(), table.version, table.tupleType(), table.keyMapping()) .encodeKey(tableRecord); Column column = index.getColumns().get(0); diff --git a/dingo-exec/src/main/java/io/dingodb/exec/transaction/util/TransactionUtil.java b/dingo-exec/src/main/java/io/dingodb/exec/transaction/util/TransactionUtil.java index 80d5fcbb52..1160962b93 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/transaction/util/TransactionUtil.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/transaction/util/TransactionUtil.java @@ -310,7 +310,7 @@ public static String duplicateEntryKey(CommonId tableId, byte[] key, CommonId tx throw new RuntimeException("duplicateEntryKey get table by txn is null, tableId:" + tableId); } KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec( - table.version, table.tupleType(), table.keyMapping() + table.getCodecVersion(), table.version, table.tupleType(), table.keyMapping() ); TupleMapping keyMapping = table.keyMapping(); return joinPrimaryKey(codec.decodeKeyPrefix(key), keyMapping); diff --git a/dingo-exec/src/main/java/io/dingodb/exec/transaction/util/TwoPhaseCommitUtils.java b/dingo-exec/src/main/java/io/dingodb/exec/transaction/util/TwoPhaseCommitUtils.java index 1ddff5ead3..e0ff47eab1 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/transaction/util/TwoPhaseCommitUtils.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/transaction/util/TwoPhaseCommitUtils.java @@ -205,6 +205,7 @@ public static byte[] commitKey(CommonId txnId, IndexTable indexTable = (IndexTable) TransactionManager.getIndex(txnId, tableId); if (indexTable.indexType.isVector || indexTable.indexType == IndexType.DOCUMENT) { KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec( + indexTable.codecVersion, indexTable.version, indexTable.tupleType(), indexTable.keyMapping() @@ -214,6 +215,7 @@ public static byte[] commitKey(CommonId txnId, DingoType dingoType = new LongType(false); TupleType tupleType = DingoTypeFactory.tuple(new DingoType[]{dingoType}); KeyValueCodec keyValueCodec = CodecService.getDefault().createKeyValueCodec( + indexTable.codecVersion, indexTable.version, tupleType, mapping diff --git a/dingo-exec/src/main/java/io/dingodb/exec/transaction/util/Txn.java b/dingo-exec/src/main/java/io/dingodb/exec/transaction/util/Txn.java index 75920ddecd..796c2e4b02 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/transaction/util/Txn.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/transaction/util/Txn.java @@ -432,14 +432,15 @@ public void commitSecondData(List secondData) { } if (indexTable.indexType.isVector) { KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec( - indexTable.version, indexTable.tupleType(), indexTable.keyMapping() + indexTable.getCodecVersion(), indexTable.version, indexTable.tupleType(), + indexTable.keyMapping() ); Object[] decodeKey = codec.decodeKeyPrefix(key); TupleMapping mapping = TupleMapping.of(new int[]{0}); DingoType dingoType = new LongType(false); TupleType tupleType = DingoTypeFactory.tuple(new DingoType[]{dingoType}); KeyValueCodec vectorCodec = CodecService.getDefault().createKeyValueCodec( - indexTable.version, tupleType, mapping + indexTable.getCodecVersion(), indexTable.version, tupleType, mapping ); key = vectorCodec.encodeKeyPrefix(new Object[]{decodeKey[0]}, 1); } @@ -585,14 +586,15 @@ public synchronized void rollback(List tupleList) { } if (indexTable.indexType.isVector) { KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec( - indexTable.version, indexTable.tupleType(), indexTable.keyMapping() + indexTable.getCodecVersion(), indexTable.version, + indexTable.tupleType(), indexTable.keyMapping() ); Object[] decodeKey = codec.decodeKeyPrefix(key); TupleMapping mapping = TupleMapping.of(new int[]{0}); DingoType dingoType = new LongType(false); TupleType tupleType = DingoTypeFactory.tuple(new DingoType[]{dingoType}); KeyValueCodec vectorCodec = CodecService.getDefault().createKeyValueCodec( - indexTable.version, tupleType, mapping + indexTable.getCodecVersion(), indexTable.version, tupleType, mapping ); key = vectorCodec.encodeKeyPrefix(new Object[]{decodeKey[0]}, 1); } diff --git a/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/AddColumnFiller.java b/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/AddColumnFiller.java index 6a017f8601..4fa1ba5b22 100644 --- a/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/AddColumnFiller.java +++ b/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/AddColumnFiller.java @@ -85,14 +85,15 @@ public boolean preWritePrimary(ReorgBackFillTask task) { colLen = columnIndices.size(); } indexCodec = CodecService.getDefault() - .createKeyValueCodec(indexTable.version, indexTable.tupleType(), indexTable.keyMapping()); + .createKeyValueCodec(indexTable.codecVersion, indexTable.version, + indexTable.tupleType(), indexTable.keyMapping()); ps = PartitionService.getService( Optional.ofNullable(indexTable.getPartitionStrategy()) .orElse(DingoPartitionServiceProvider.RANGE_FUNC_NAME)); // reorging when region split StoreInstance kvStore = Services.KV_STORE.getInstance(task.getTableId(), task.getRegionId()); KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec( - table.getVersion(), table.tupleType(), table.keyMapping() + table.getCodecVersion(), table.getVersion(), table.tupleType(), table.keyMapping() ); Iterator iterator = kvStore.txnScanWithoutStream( task.getStartTs(), diff --git a/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/IndexAddFiller.java b/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/IndexAddFiller.java index f5e5080ce8..4dd14a2a50 100644 --- a/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/IndexAddFiller.java +++ b/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/IndexAddFiller.java @@ -127,14 +127,15 @@ public boolean preWritePrimary(ReorgBackFillTask task) { .map(Column::getName) .collect(Collectors.toList())); indexCodec = CodecService.getDefault() - .createKeyValueCodec(indexTable.version, indexTable.tupleType(), indexTable.keyMapping()); + .createKeyValueCodec(indexTable.getCodecVersion(), indexTable.version, + indexTable.tupleType(), indexTable.keyMapping()); ps = PartitionService.getService( Optional.ofNullable(indexTable.getPartitionStrategy()) .orElse(DingoPartitionServiceProvider.RANGE_FUNC_NAME)); // reorging when region split StoreInstance kvStore = Services.KV_STORE.getInstance(task.getTableId(), task.getRegionId()); KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec( - table.getVersion(), table.tupleType(), table.keyMapping() + table.getCodecVersion(), table.getVersion(), table.tupleType(), table.keyMapping() ); Iterator iterator = kvStore.txnScanWithoutStream( task.getStartTs(), @@ -486,7 +487,7 @@ protected void preWritePrimaryKey(CacheToObject cacheToObject) { private Iterator getIterator(ReorgBackFillTask task, CommonId tableId, boolean check) { StoreInstance kvStore = Services.KV_STORE.getInstance(tableId, task.getRegionId()); KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec( - table.getVersion(), table.tupleType(), table.keyMapping() + table.getCodecVersion(), table.getVersion(), table.tupleType(), table.keyMapping() ); Iterator iterator = kvStore.txnScanWithoutStream( task.getStartTs(), diff --git a/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/ModifyColumnFiller.java b/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/ModifyColumnFiller.java index 5d2dad53af..13ecd6451a 100644 --- a/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/ModifyColumnFiller.java +++ b/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/ModifyColumnFiller.java @@ -86,13 +86,14 @@ public boolean preWritePrimary(ReorgBackFillTask task) { .collect(Collectors.toList())); colLen = columnIndices.size(); indexCodec = CodecService.getDefault() - .createKeyValueCodec(indexTable.version, indexTable.tupleType(), indexTable.keyMapping()); + .createKeyValueCodec(indexTable.getCodecVersion(), indexTable.version, indexTable.tupleType(), + indexTable.keyMapping()); ps = PartitionService.getService( Optional.ofNullable(indexTable.getPartitionStrategy()) .orElse(DingoPartitionServiceProvider.RANGE_FUNC_NAME)); // reorging when region split StoreInstance kvStore = Services.KV_STORE.getInstance(task.getTableId(), task.getRegionId()); - KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec( + KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec(table.getCodecVersion(), table.getVersion(), table.tupleType(), table.keyMapping() ); Iterator iterator = kvStore.txnScanWithoutStream( diff --git a/dingo-executor/src/main/java/io/dingodb/server/executor/prepare/PrepareMeta.java b/dingo-executor/src/main/java/io/dingodb/server/executor/prepare/PrepareMeta.java index 023c7e3335..b268ccf3a5 100644 --- a/dingo-executor/src/main/java/io/dingodb/server/executor/prepare/PrepareMeta.java +++ b/dingo-executor/src/main/java/io/dingodb/server/executor/prepare/PrepareMeta.java @@ -35,7 +35,6 @@ import io.dingodb.common.util.ByteArrayUtils; import io.dingodb.exec.fun.mysql.VersionFun; import io.dingodb.partition.DingoPartitionServiceProvider; -import io.dingodb.sdk.service.VersionService; import io.dingodb.sdk.service.entity.meta.DingoCommonId; import io.dingodb.sdk.service.entity.meta.TableDefinitionWithId; import io.dingodb.sdk.service.entity.version.PutRequest; @@ -366,7 +365,7 @@ public static void initUserWithRetry(String tableName, CommonId tableId) { io.dingodb.meta.entity.Table table = io.dingodb.meta.InfoSchemaService.root() .getTableDef(tableId.domain, tableId.seq); KeyValueCodec codec = CodecService.getDefault() - .createKeyValueCodec(table.version, table.tupleType(), table.keyMapping()); + .createKeyValueCodec(table.getCodecVersion(), table.version, table.tupleType(), table.keyMapping()); KeyValue keyValue = codec.encode(values.get(0)); CommonId regionId = rangeDistribution.firstEntry().getValue().getId(); @@ -447,6 +446,7 @@ private static io.dingodb.common.table.TableDefinition getTableDefinition(String .charset("utf8") .collate("utf8_bin") .tableType(tableType) + .codecVersion(2) .schemaState(SchemaState.SCHEMA_PUBLIC) .rowFormat(rowFormat); diff --git a/dingo-executor/src/main/java/io/dingodb/server/executor/service/SequenceService.java b/dingo-executor/src/main/java/io/dingodb/server/executor/service/SequenceService.java index 23efa7979f..b6e1fdcb39 100644 --- a/dingo-executor/src/main/java/io/dingodb/server/executor/service/SequenceService.java +++ b/dingo-executor/src/main/java/io/dingodb/server/executor/service/SequenceService.java @@ -75,7 +75,7 @@ private SequenceService() { this.table = getTable(SEQUENCE_TABLE); CommonId tableId = table.getTableId(); this.store = new StoreKvTxn(tableId, getRegionId(tableId)); - this.codec = CodecService.getDefault().createKeyValueCodec( + this.codec = CodecService.getDefault().createKeyValueCodec(this.table.getCodecVersion(), getPartId(tableId, store.getRegionId()), table.tupleType(), table.keyMapping() ); this.queueMap = new ConcurrentHashMap<>(); diff --git a/dingo-executor/src/main/java/io/dingodb/server/executor/service/UserService.java b/dingo-executor/src/main/java/io/dingodb/server/executor/service/UserService.java index d440c925a7..9152b2babd 100644 --- a/dingo-executor/src/main/java/io/dingodb/server/executor/service/UserService.java +++ b/dingo-executor/src/main/java/io/dingodb/server/executor/service/UserService.java @@ -105,13 +105,15 @@ private UserService() { dbPrivStore = new StoreKvTxn(tablePrivTblId, getRegionId(dbPrivTblId)); tablePrivStore = new StoreKvTxn(tablePrivTblId, getRegionId(tablePrivTblId)); userCodec = CodecService.getDefault().createKeyValueCodec( - getPartId(userTblId, userStore.getRegionId()), userTd.tupleType(), userTd.keyMapping() + userTd.codecVersion, getPartId(userTblId, userStore.getRegionId()), + userTd.tupleType(), userTd.keyMapping() ); dbPrivCodec = CodecService.getDefault().createKeyValueCodec( - getPartId(dbPrivTblId, dbPrivStore.getRegionId()), dbPrivTd.tupleType(), dbPrivTd.keyMapping() + dbPrivTd.codecVersion, getPartId(dbPrivTblId, dbPrivStore.getRegionId()), dbPrivTd.tupleType(), + dbPrivTd.keyMapping() ); tablePrivCodec = CodecService.getDefault().createKeyValueCodec( - getPartId(tablePrivTblId, tablePrivStore.getRegionId()), + tablePrivTd.codecVersion, getPartId(tablePrivTblId, tablePrivStore.getRegionId()), tablePrivTd.tupleType(), tablePrivTd.keyMapping() ); } catch (Exception e) { diff --git a/dingo-meta-api/src/main/java/io/dingodb/meta/entity/Table.java b/dingo-meta-api/src/main/java/io/dingodb/meta/entity/Table.java index 81388ea439..0ac5f13e2e 100644 --- a/dingo-meta-api/src/main/java/io/dingodb/meta/entity/Table.java +++ b/dingo-meta-api/src/main/java/io/dingodb/meta/entity/Table.java @@ -108,6 +108,9 @@ public class Table { @JsonProperty public SchemaState schemaState; + @JsonProperty + public int codecVersion; + public TupleType tupleType() { return DingoTypeFactory.tuple(columns.stream().map(Column::getType).toArray(DingoType[]::new)); } diff --git a/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/common/Mapping.java b/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/common/Mapping.java index 4a833e2aad..cae6a9f54f 100644 --- a/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/common/Mapping.java +++ b/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/common/Mapping.java @@ -104,7 +104,8 @@ public static TableDefinition mapping(Table table) { null, null, 0, - true); + true, + table.getCodecVersion()); } public static io.dingodb.store.proxy.common.TableDefinition mapping(TableDefinition tableDefinition) { diff --git a/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/common/TableDefinition.java b/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/common/TableDefinition.java index ae80256ffc..e95b8f6960 100644 --- a/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/common/TableDefinition.java +++ b/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/common/TableDefinition.java @@ -221,6 +221,11 @@ public long getUpdateTime() { return tableDefinition.getUpdateTime(); } + @Override + public int getCodecVersion() { + return tableDefinition.getCodecVersion(); + } + private VectorIndexParameter.MetricType getMetricType(String metricType) { switch (metricType.toUpperCase()) { case "INNER_PRODUCT": diff --git a/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/mapper/TableMapper.java b/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/mapper/TableMapper.java index 632b81bfea..9852afb25b 100644 --- a/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/mapper/TableMapper.java +++ b/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/mapper/TableMapper.java @@ -196,7 +196,8 @@ default Table tableFrom( builder.partitionStrategy(fromPartitionStrategy(partitionRule.getStrategy())); } KeyValueCodec codec = CodecService.INSTANCE - .createKeyValueCodec(definition.getVersion(), columnDefinitionFrom(definition.getColumns())); + .createKeyValueCodec(definition.getCodecVersion(), definition.getVersion(), + columnDefinitionFrom(definition.getColumns())); if (definition.getTablePartition() != null) { builder.partitions(partitionFrom( @@ -222,8 +223,9 @@ default IndexTable indexTableFrom( tableFrom(definition, builder); PartitionRule partitionRule = definition.getTablePartition(); builder.partitionStrategy(fromPartitionStrategy(partitionRule.getStrategy())); - KeyValueCodec codec = CodecService.INSTANCE - .createKeyValueCodec(definition.getVersion(), columnDefinitionFrom(definition.getColumns())); + KeyValueCodec codec = CodecService.INSTANCE.createKeyValueCodec( + definition.getCodecVersion(), definition.getVersion(), columnDefinitionFrom(definition.getColumns()) + ); builder.partitions(partitionFrom( definition.getTablePartition().getPartitions(), codec, diff --git a/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/meta/MetaCache.java b/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/meta/MetaCache.java index de1ce0e091..efcc0b3fa2 100644 --- a/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/meta/MetaCache.java +++ b/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/meta/MetaCache.java @@ -294,8 +294,9 @@ private NavigableMap loadDistribution(Co }); NavigableMap result = new TreeMap<>(); Table table = MAPPER.tableFrom(tableWithId, getIndexes(tableWithId, tableWithId.getTableId())); - KeyValueCodec codec = CodecService.getDefault() - .createKeyValueCodec(tableDefinition.getVersion(), table.tupleType(), table.keyMapping()); + KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec( + tableDefinition.getCodecVersion(), tableDefinition.getVersion(), + table.tupleType(), table.keyMapping()); boolean isOriginalKey = tableDefinition.getTablePartition().getStrategy().number() == 1; rangeDistributionList.forEach(scanRegionWithPartId -> { RangeDistribution distribution = mapping(scanRegionWithPartId, codec, isOriginalKey); diff --git a/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/service/CodecService.java b/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/service/CodecService.java index aa16210382..5bffecbcee 100644 --- a/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/service/CodecService.java +++ b/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/service/CodecService.java @@ -18,6 +18,7 @@ import com.google.auto.service.AutoService; import io.dingodb.codec.CodecServiceProvider; +import io.dingodb.codec.KeyValueCodec; import io.dingodb.common.CommonId; import io.dingodb.common.store.KeyValue; import io.dingodb.common.table.ColumnDefinition; @@ -129,21 +130,23 @@ public byte[] setId(byte[] key, long id) { @Override public KeyValueCodec createKeyValueCodec( - int schemaVersion, CommonId id, List columns) { + int codecVersion, int schemaVersion, CommonId id, List columns) { return new KeyValueCodec( id, DingoKeyValueCodec.of( - schemaVersion, id.seq, columns.stream().map(Mapping::mapping).collect(Collectors.toList()) + codecVersion, schemaVersion, id.seq, columns.stream().map(Mapping::mapping).collect(Collectors.toList()) ), tuple(columns.stream().map(ColumnDefinition::getType).toArray(DingoType[]::new)) ); } @Override - public KeyValueCodec createKeyValueCodec(int schemaVersion, CommonId id, DingoType type, TupleMapping keyMapping) { + public io.dingodb.codec.KeyValueCodec createKeyValueCodec( + int codecVersion, int version, CommonId id, DingoType type, TupleMapping keyMapping + ) { return new KeyValueCodec( id, - new DingoKeyValueCodec(schemaVersion, id.seq, createSchemasForType(type, keyMapping)), + new DingoKeyValueCodec(codecVersion, version, id.seq, createSchemasForType(type, keyMapping)), type); } diff --git a/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/service/StoreService.java b/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/service/StoreService.java index 9ecda5ed9b..f9ceaadaf8 100644 --- a/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/service/StoreService.java +++ b/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/service/StoreService.java @@ -248,7 +248,7 @@ public StoreInstance(CommonId tableId, CommonId regionId) { this.tableMap = new HashMap<>(); } this.tableCodec = (KeyValueCodec) CodecService.getDefault().createKeyValueCodec( - table.version, table.tupleType(), table.keyMapping() + table.getCodecVersion(), table.version, table.tupleType(), table.keyMapping() ); this.storeService = storeService(tableId, regionId); if (tableId.type == CommonId.CommonType.INDEX && table instanceof IndexTable @@ -275,7 +275,7 @@ public StoreInstance(CommonId tableId, CommonId regionId, CommonId indexId) { .orElseThrow(() -> new RuntimeException("Not found index " + indexId)); this.tableMap = table.getIndexes().stream().collect(Collectors.toMap(Table::getTableId, identity())); this.tableCodec = (KeyValueCodec) CodecService.getDefault().createKeyValueCodec( - table.version, table.tupleType(), table.keyMapping() + table.getCodecVersion(), table.version, table.tupleType(), table.keyMapping() ); this.storeService = storeService(tableId, regionId); this.transactionStoreInstance = new TransactionStoreInstance(storeService, indexService, partitionId); diff --git a/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/service/TransactionStoreInstance.java b/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/service/TransactionStoreInstance.java index fd2cbd8865..7cc690e015 100644 --- a/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/service/TransactionStoreInstance.java +++ b/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/service/TransactionStoreInstance.java @@ -285,7 +285,7 @@ public static void getJoinedPrimaryKey(TxnPreWrite txnPreWrite, List joinedKey = new AtomicReference<>(""); TupleMapping keyMapping = table.keyMapping(); keysAlreadyExist.forEach( From 0732ce83487c3af871a5f4e02dc0e3bf7e3fc697 Mon Sep 17 00:00:00 2001 From: guojn1 Date: Tue, 7 Jan 2025 09:47:50 +0800 Subject: [PATCH 2/3] [fix][dingo-meta-api] Keep the codecVersion when making incremental changes to infoschema --- .../client/operation/impl/ScanCoprocessorOperation.java | 3 ++- dingo-meta-api/src/main/java/io/dingodb/meta/entity/Table.java | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/dingo-client/src/main/java/io/dingodb/client/operation/impl/ScanCoprocessorOperation.java b/dingo-client/src/main/java/io/dingodb/client/operation/impl/ScanCoprocessorOperation.java index ddf9e8e14a..db2443e7b7 100644 --- a/dingo-client/src/main/java/io/dingodb/client/operation/impl/ScanCoprocessorOperation.java +++ b/dingo-client/src/main/java/io/dingodb/client/operation/impl/ScanCoprocessorOperation.java @@ -174,7 +174,8 @@ public void exec(OperationContext context) { List columnDefinitions = coprocessor.getResultSchema().getSchemas(); KeyValueCodec codec = new KeyValueCodec( - DingoKeyValueCodec.of(context.getTableId().entityId(), columnDefinitions), columnDefinitions + DingoKeyValueCodec.of(context.getTable().getCodecVersion(), + context.getTableId().entityId(), columnDefinitions), columnDefinitions ); context.[]>result()[context.getSeq()] = new CoprocessorIterator( columnDefinitions, codec, scanResult, context.getTableId().entityId() diff --git a/dingo-meta-api/src/main/java/io/dingodb/meta/entity/Table.java b/dingo-meta-api/src/main/java/io/dingodb/meta/entity/Table.java index 0ac5f13e2e..34ac0b1682 100644 --- a/dingo-meta-api/src/main/java/io/dingodb/meta/entity/Table.java +++ b/dingo-meta-api/src/main/java/io/dingodb/meta/entity/Table.java @@ -193,6 +193,7 @@ public Table copyWithColumns(List columns) { .tableType(this.tableType) .updateTime(this.updateTime) .version(this.version) + .codecVersion(this.codecVersion) .build(); } From b000267a5eab3ad3fc9e3eab2b9205231c59a7f5 Mon Sep 17 00:00:00 2001 From: guojn1 Date: Wed, 8 Jan 2025 15:26:09 +0800 Subject: [PATCH 3/3] [fix][dingo-exec] Add codecVersion to Coprocessor --- .../main/java/io/dingodb/calcite/stats/task/AnalyzeTask.java | 1 + .../src/main/java/io/dingodb/common/Coprocessor.java | 4 ++++ .../src/main/java/io/dingodb/common/CoprocessorV2.java | 3 +++ .../java/io/dingodb/exec/operator/ForUpdateOperator.java | 5 ++++- .../java/io/dingodb/exec/operator/params/ForUpdateParam.java | 4 +++- .../io/dingodb/exec/operator/params/ScanWithRelOpParam.java | 1 + .../dingodb/exec/operator/params/TxnIndexRangeScanParam.java | 1 + .../dingodb/exec/operator/params/TxnPartDocumentParam.java | 1 + .../io/dingodb/exec/operator/params/TxnPartVectorParam.java | 1 + .../src/main/java/io/dingodb/proxy/mapper/EntityMapper.java | 1 + 10 files changed, 20 insertions(+), 2 deletions(-) diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/stats/task/AnalyzeTask.java b/dingo-calcite/src/main/java/io/dingodb/calcite/stats/task/AnalyzeTask.java index 1a48e6c9b3..b3a5784735 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/stats/task/AnalyzeTask.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/stats/task/AnalyzeTask.java @@ -470,6 +470,7 @@ public static CoprocessorV2 getCoprocessor(Table table, List histogra .selection(selection) .relExpr(os.toByteArray()) .schemaVersion(table.getVersion()) + .codecVersion(table.getCodecVersion()) .build(); return coprocessor; } diff --git a/dingo-common/src/main/java/io/dingodb/common/Coprocessor.java b/dingo-common/src/main/java/io/dingodb/common/Coprocessor.java index 36e06bdb0e..945bb28365 100644 --- a/dingo-common/src/main/java/io/dingodb/common/Coprocessor.java +++ b/dingo-common/src/main/java/io/dingodb/common/Coprocessor.java @@ -20,6 +20,7 @@ import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.Setter; import java.util.Collections; import java.util.List; @@ -50,4 +51,7 @@ public class Coprocessor { @Builder.Default private List aggregations = Collections.emptyList(); + @Setter + private int codecVersion; + } diff --git a/dingo-common/src/main/java/io/dingodb/common/CoprocessorV2.java b/dingo-common/src/main/java/io/dingodb/common/CoprocessorV2.java index 1d08691b36..2be4e9b824 100644 --- a/dingo-common/src/main/java/io/dingodb/common/CoprocessorV2.java +++ b/dingo-common/src/main/java/io/dingodb/common/CoprocessorV2.java @@ -46,4 +46,7 @@ public class CoprocessorV2 { @Setter private int limit; + + @Setter + private int codecVersion; } diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/ForUpdateOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/ForUpdateOperator.java index 9a7cac4298..91a5bd505d 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/ForUpdateOperator.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/ForUpdateOperator.java @@ -101,7 +101,10 @@ public boolean push(Context context, @Nullable Object[] tuple, Vertex vertex) { }).toArray(); schema = indexTable.tupleType(); localStore = Services.LOCAL_STORE.getInstance(context.getIndexId(), partId); - codec = CodecService.getDefault().createKeyValueCodec(indexTable.version, indexTable.tupleType(), indexTable.keyMapping()); + codec = CodecService.getDefault().createKeyValueCodec( + indexTable.codecVersion, + indexTable.version, indexTable.tupleType(), indexTable.keyMapping() + ); } Object[] newTuple = (Object[]) schema.convertFrom(tuple, ValueConverter.INSTANCE); KeyValue keyValue = wrap(codec::encode).apply(newTuple); diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/ForUpdateParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/ForUpdateParam.java index 13eb9b3040..771ea0a437 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/ForUpdateParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/ForUpdateParam.java @@ -76,6 +76,8 @@ public ForUpdateParam( this.lockTimeOut = lockTimeOut; this.isScan = isScan; this.table = table; - this.codec = CodecService.getDefault().createKeyValueCodec(table.version, table.tupleType(), table.keyMapping()); + this.codec = CodecService.getDefault().createKeyValueCodec( + table.getCodecVersion(), table.version, table.tupleType(), table.keyMapping() + ); } } diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/ScanWithRelOpParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/ScanWithRelOpParam.java index 43f4219b3c..2b7269d68c 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/ScanWithRelOpParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/ScanWithRelOpParam.java @@ -153,6 +153,7 @@ public void init(Vertex vertex) { .resultSchema(SchemaWrapperUtils.buildSchemaWrapper(outputSchema, outputKeyMapping, tableId.seq)) .selection(selection) .relExpr(os.toByteArray()) + .codecVersion(codecVersion) .build(); if (limit > 0) { coprocessor.setLimit(limit); diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnIndexRangeScanParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnIndexRangeScanParam.java index cf63136933..034741f7fa 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnIndexRangeScanParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnIndexRangeScanParam.java @@ -126,6 +126,7 @@ public void init(Vertex vertex) { .resultSchema(SchemaWrapperUtils.buildSchemaWrapper(indexSchema, outputKeyMapping, indexTableId.seq)) .selection(selection) .relExpr(os.toByteArray()) + .codecVersion(this.codecVersion) .build(); } } diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartDocumentParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartDocumentParam.java index da15fe5971..9a6543f43d 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartDocumentParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartDocumentParam.java @@ -144,6 +144,7 @@ public void init(Vertex vertex) { filter = null; } builder.schemaVersion(table.getVersion()); + builder.codecVersion(table.getCodecVersion()); builder.originalSchema( SchemaWrapperUtils.buildSchemaWrapper( tableDataSchema, tableDataKeyMapping(), 0 diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartVectorParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartVectorParam.java index c5ba751366..35e360b284 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartVectorParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnPartVectorParam.java @@ -154,6 +154,7 @@ public void init(Vertex vertex) { filter = null; } builder.schemaVersion(table.getVersion()); + builder.codecVersion(table.getCodecVersion()); builder.originalSchema( SchemaWrapperUtils.buildSchemaWrapper( tableDataSchema, tableDataKeyMapping(), 0 diff --git a/dingo-proxy/src/main/java/io/dingodb/proxy/mapper/EntityMapper.java b/dingo-proxy/src/main/java/io/dingodb/proxy/mapper/EntityMapper.java index 38365791f6..2d86251a57 100644 --- a/dingo-proxy/src/main/java/io/dingodb/proxy/mapper/EntityMapper.java +++ b/dingo-proxy/src/main/java/io/dingodb/proxy/mapper/EntityMapper.java @@ -152,6 +152,7 @@ default CoprocessorV2 mapping(Expr expr) { .originalSchema(SchemaWrapper.builder().schema(attrSchemas).build()) .selectionColumns(attrSchemas.stream().map(Schema::getIndex).collect(Collectors.toList())) .relExpr(outputStream.toByteArray()) + .codecVersion(2) .build(); }