Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][dingo-store-proxy] Add codecVersion #1330

Merged
merged 3 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ public void execute(SqlCreateTable createT, CalcitePrepare.Context context) {
.name(tableName)
.columns(columns)
.version(1)
.codecVersion(2)
.ttl(create.getTtl())
.partDefinition(create.getPartDefinition())
.engine(create.getEngine())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ public LoadDataExecutor(SqlLoadData sqlLoadData, Connection connection, DingoPar
if (table == null) {
throw DingoResource.DINGO_RESOURCE.unknownTable(schemaName + "." + sqlLoadData.getTableName()).ex();
}
codec = CodecService.getDefault().createKeyValueCodec(table.version, table.tupleType(), table.keyMapping());
codec = CodecService.getDefault().createKeyValueCodec(
table.getCodecVersion(), table.version, table.tupleType(), table.keyMapping()
);
distributions = metaService.getRangeDistribution(table.tableId);
schema = table.tupleType();
this.isTxn = checkEngine();
Expand Down Expand Up @@ -420,7 +422,9 @@ public void insertWithTxn(Object[] tuples) {
.collect(Collectors.toList()));
Object[] tuplesTmp = columnIndices.stream().map(i -> tuples[i]).toArray();
KeyValueCodec codec = CodecService.getDefault()
.createKeyValueCodec(indexTable.version, indexTable.tupleType(), indexTable.keyMapping());
.createKeyValueCodec(
indexTable.getCodecVersion(), indexTable.version,
indexTable.tupleType(), indexTable.keyMapping());

keyValue = wrap(codec::encode).apply(tuplesTmp);
PartitionService ps = PartitionService.getService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ private static String lockKey(CommonId tableId, byte[] keyBytes) {
return "";
}
KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec(
table.version, table.tupleType(), table.keyMapping()
table.getCodecVersion(), table.version, table.tupleType(), table.keyMapping()
);
return Utils.buildKeyStr(table.keyMapping(), codec.decodeKeyPrefix(keyBytes));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public DingoIndexScanWithRelOp(
super(cluster, traitSet, hints, table, rowType, relOp, filter, pushDown, keepOrder, indexTable, rangeScan);
if (getFilter() != null) {
KeyValueCodec codec = CodecService.getDefault()
.createKeyValueCodec(indexTable.version, indexTable.tupleType(), indexTable.keyMapping());
.createKeyValueCodec(indexTable.getCodecVersion(), indexTable.version, indexTable.tupleType(),
indexTable.keyMapping());
RangeDistribution range = RangeUtils.createRangeByFilter(indexTable, codec, filter, null);
rangeDistribution = range;
if (range != null && !(range.getStartKey() == null && range.getEndKey() == null)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public DingoScanWithRelOp(
if (getFilter() != null) {
Table td = Objects.requireNonNull(table.unwrap(DingoTable.class)).getTable();
KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec(
td.version, td.tupleType(), td.keyMapping()
td.getCodecVersion(), td.version, td.tupleType(), td.keyMapping()
);
RangeDistribution range = RangeUtils.createRangeByFilter(td, codec, filter, null);
rangeDistribution = range;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void onMatch(@NonNull RelOptRuleCall call) {
}

KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec(
td.version, td.tupleType(), td.keyMapping()
td.getCodecVersion(), td.version, td.tupleType(), td.keyMapping()
);
Object[] tuple = new Object[td.getColumns().size()];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void onMatch(@NonNull RelOptRuleCall call) {
return;
}
KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec(
td.version, td.tupleType(), td.keyMapping()
td.getCodecVersion(), td.version, td.tupleType(), td.keyMapping()
);
RangeDistribution range;
if (rel.getFilter() == null && (rel.getSelection().size() == rel.getTable().getRowType().getFieldCount())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,18 @@ public abstract class StatsOperator {
cmSketchTblId = cmSketchTable.tableId;
analyzeTaskCodec = CodecService.getDefault()
.createKeyValueCodec(
analyzeTaskTable.version, analyzeTaskTable.tupleType(), analyzeTaskTable.keyMapping()
analyzeTaskTable.getCodecVersion(), analyzeTaskTable.version,
analyzeTaskTable.tupleType(), analyzeTaskTable.keyMapping()
);
bucketsCodec = CodecService.getDefault()
.createKeyValueCodec(bucketsTable.version, bucketsTable.tupleType(), bucketsTable.keyMapping());
.createKeyValueCodec(bucketsTable.getCodecVersion(), bucketsTable.version,
bucketsTable.tupleType(), bucketsTable.keyMapping());
statsCodec = CodecService.getDefault()
.createKeyValueCodec(statsTable.version, statsTable.tupleType(), statsTable.keyMapping());
.createKeyValueCodec(statsTable.getCodecVersion(), statsTable.version,
statsTable.tupleType(), statsTable.keyMapping());
cmSketchCodec = CodecService.getDefault()
.createKeyValueCodec(cmSketchTable.version, cmSketchTable.tupleType(), cmSketchTable.keyMapping());
.createKeyValueCodec(cmSketchTable.getCodecVersion(), cmSketchTable.version,
cmSketchTable.tupleType(), cmSketchTable.keyMapping());
analyzeTaskStore = storeTxnService.getInstance(analyzeTaskTblId,
getRegionId(analyzeTaskTblId));
bucketsStore = storeTxnService.getInstance(bucketsTblId, getRegionId(bucketsTblId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ private void buildHistogram(List<Histogram> histogramList,
);
return Iterators.transform(
iterator,
wrap(CodecService.getDefault().createKeyValueCodec(td.version,
wrap(CodecService.getDefault().createKeyValueCodec(td.getCodecVersion(), td.version,
outputSchema, outputKeyMapping)::decode)::apply
);
}).collect(Collectors.toList());
Expand Down Expand Up @@ -470,6 +470,7 @@ public static CoprocessorV2 getCoprocessor(Table table, List<Histogram> histogra
.selection(selection)
.relExpr(os.toByteArray())
.schemaVersion(table.getVersion())
.codecVersion(table.getCodecVersion())
.build();
return coprocessor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public CollectStatsTask(RangeDistribution region,
this.startTs = scanTs;
this.timeout = timeout;
this.kvStore = Services.KV_STORE.getInstance(tableId, region.id());
this.codec = CodecService.getDefault().createKeyValueCodec(td.getVersion(), td.tupleType(), td.keyMapping());
this.codec = CodecService.getDefault().createKeyValueCodec(
td.getCodecVersion(), td.getVersion(), td.tupleType(), td.keyMapping());

this.minSketchList = minSketches.stream().map(CountMinSketch::copy)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ public static Collection<Vertex> visit(
null,
null,
td.tupleType(),
false
false,
td.codecVersion
);
task = job.getOrCreate(currentLocation, idGenerator);
Vertex scanVertex = new Vertex(PART_RANGE_SCAN, param);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public static Collection<Vertex> visit(

List<Object[]> keyTuples = TableUtils.getTuplesForKeyMapping(indexValSet.getValue(), indexTd);
KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec(
indexTd.version, indexTd.tupleType(), indexTd.keyMapping()
indexTd.getCodecVersion(), indexTd.version, indexTd.tupleType(), indexTd.keyMapping()
);
List<ByteArrayUtils.ComparableByteArray> keyList = new ArrayList<>();
for (Object[] keyTuple : keyTuples) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ public static LinkedList<Vertex> visit(

List<Object[]> keyTuples = TableUtils.getTuplesForKeyMapping(indexValSet.getValue(), indexTd);

KeyValueCodec codec =
CodecService.getDefault().createKeyValueCodec(indexTd.version, indexTd.tupleType(), indexTd.keyMapping());
KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec(
indexTd.getCodecVersion(), indexTd.version, indexTd.tupleType(), indexTd.keyMapping());
List<ByteArrayUtils.ComparableByteArray> keyList = new ArrayList<>();
for (Object[] keyTuple : keyTuples) {
byte[] keys = codec.encodeKeyPrefix(keyTuple, calculatePrefixCount(keyTuple));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private DingoIndexRangeScanVisitFun() {
if (rexFilter != null) {
filter = SqlExprUtils.toSqlExpr(rexFilter);
KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec(
indexTd.version, indexTd.tupleType(), indexTd.keyMapping()
indexTd.getCodecVersion(), indexTd.version, indexTd.tupleType(), indexTd.keyMapping()
);
RangeDistribution range = RangeUtils.createRangeByFilter(indexTd, codec, rexFilter, null);
if (range != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ private DingoIndexScanWithRelOpVisitFun() {
td.tableId,
td.tupleType(),
td.keyMapping(),
td.version
td.version,
td.getCodecVersion()
);
return new Vertex(SCAN_WITH_NO_OP, param);
} else {
Expand All @@ -186,7 +187,8 @@ private DingoIndexScanWithRelOpVisitFun() {
DefinitionMapper.mapToDingoType(rel.getRowType()),
rel.isPushDown(),
td.version,
0
0,
td.getCodecVersion()
);
if (relOp instanceof PipeOp) {
return new Vertex(SCAN_WITH_PIPE_OP, param);
Expand All @@ -212,7 +214,8 @@ private DingoIndexScanWithRelOpVisitFun() {
scanTs,
transaction.getIsolationLevel(),
transaction.getLockTimeOut(),
td.version
td.version,
td.getCodecVersion()
);
return new Vertex(TXN_SCAN_WITH_NO_OP, param);
} else {
Expand All @@ -227,7 +230,8 @@ private DingoIndexScanWithRelOpVisitFun() {
DefinitionMapper.mapToDingoType(rel.getRowType()),
rel.isPushDown(),
td.version,
0
0,
td.getCodecVersion()
);
if (relOp instanceof PipeOp) {
return new Vertex(TXN_SCAN_WITH_PIPE_OP, param);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public static Collection<Vertex> visit(
td.keyMapping(),
Optional.mapOrNull(filter, SqlExpr::copy),
rel.getSelection(),
rel.getPrefix()
rel.getPrefix(),
td.getCodecVersion()
);
Vertex vertex = new Vertex(LIKE_SCAN, param);
OutputHint hint = new OutputHint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,16 @@ public static Collection<Vertex> visit(
tableInfo.getId(),
td.version,
td.tupleType(),
td.keyMapping());
td.keyMapping(),
td.getCodecVersion());
deleteVertex = new Vertex(TXN_PART_RANGE_DELETE, param);
} else {
PartRangeDeleteParam param = new PartRangeDeleteParam(
tableInfo.getId(),
td.version,
td.tupleType(),
td.keyMapping());
td.keyMapping(),
td.codecVersion);
deleteVertex = new Vertex(PART_RANGE_DELETE, param);
}
if (transaction != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ private DingoScanWithRelOpVisitFun() {
tableInfo.getId(),
td.tupleType(),
td.keyMapping(),
td.version
td.version,
td.getCodecVersion()
);
return new Vertex(SCAN_WITH_NO_OP, param);
} else {
Expand All @@ -210,7 +211,8 @@ private DingoScanWithRelOpVisitFun() {
DefinitionMapper.mapToDingoType(rel.getRowType()),
rel.isPushDown(),
td.version,
rel.getLimit()
rel.getLimit(),
td.getCodecVersion()
);
if (relOp instanceof PipeOp) {
return new Vertex(SCAN_WITH_PIPE_OP, param);
Expand All @@ -237,7 +239,8 @@ private DingoScanWithRelOpVisitFun() {
scanTs,
transaction.getIsolationLevel(),
transaction.getLockTimeOut(),
td.version
td.version,
td.getCodecVersion()
);
return new Vertex(TXN_SCAN_WITH_NO_OP, param);
} else {
Expand All @@ -252,7 +255,8 @@ private DingoScanWithRelOpVisitFun() {
DefinitionMapper.mapToDingoType(rel.getRowType()),
rel.isPushDown(),
td.version,
rel.getLimit()
rel.getLimit(),
td.getCodecVersion()
);
if (relOp instanceof PipeOp) {
return new Vertex(TXN_SCAN_WITH_PIPE_OP, param);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private DingoTableScanVisitFun() {
if (rel.getFilter() != null) {
filter = SqlExprUtils.toSqlExpr(rel.getFilter());
KeyValueCodec codec = CodecService.getDefault().createKeyValueCodec(
td.version, td.tupleType(), td.keyMapping()
td.getCodecVersion(), td.version, td.tupleType(), td.keyMapping()
);
RangeDistribution range = RangeUtils.createRangeByFilter(td, codec, rel.getFilter(), rel.getSelection());
if (range != null) {
Expand Down Expand Up @@ -145,7 +145,8 @@ private DingoTableScanVisitFun() {
scanTs,
transaction.getIsolationLevel(),
transaction.getLockTimeOut(),
false
false,
td.codecVersion
);
scanVertex = new Vertex(TXN_PART_RANGE_SCAN, param);
} else {
Expand All @@ -162,7 +163,8 @@ private DingoTableScanVisitFun() {
rel.getAggCalls() == null ? null : AggFactory.getAggList(
rel.getAggCalls(), DefinitionMapper.mapToDingoType(rel.getSelectedType())),
DefinitionMapper.mapToDingoType(rel.getNormalRowType()),
rel.isPushDown()
rel.isPushDown(),
td.getCodecVersion()
);
scanVertex = new Vertex(PART_RANGE_SCAN, param);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ private List<Vertex> scan(String schema,
Table td = Parameters.nonNull(metaService.getTable(tableName), "Table not found.");
CommonId tableId = Parameters.nonNull(metaService.getTable(tableName).getTableId(), "Table not found.");
io.dingodb.codec.KeyValueCodec codec = CodecService.INSTANCE.createKeyValueCodec(
tableId, td.tupleType(), td.keyMapping()
td.getCodecVersion(), tableId, td.tupleType(), td.keyMapping()
);

Key start = keyRange.start;
Expand Down Expand Up @@ -526,7 +526,8 @@ private List<Vertex> scan(String schema,
null,
null,
td.tupleType(),
true
true,
td.getCodecVersion()
);
Vertex scanVertex = new Vertex(PART_RANGE_SCAN, param);
scanVertex.setId(idGenerator.getOperatorId(task.getId()));
Expand All @@ -548,7 +549,7 @@ private List<Vertex> rangeDelete(String schema, String tableName,
CommonId tableId = Parameters.nonNull(metaService.getTable(tableName).getTableId(), "Table not found.");
Table td = Parameters.nonNull(metaService.getTable(tableName), "Table not found.");
io.dingodb.codec.KeyValueCodec codec = CodecService.INSTANCE.createKeyValueCodec(
tableId, td.tupleType(), td.keyMapping()
td.codecVersion, tableId, td.tupleType(), td.keyMapping()
);

byte[] startKey = codec.encodeKeyPrefix(mapKeyPrefix(td, begin), begin.userKey.size());
Expand All @@ -572,7 +573,8 @@ private List<Vertex> rangeDelete(String schema, String tableName,

List<Vertex> outputs = new ArrayList<>();
for (int i = 0; i <= td.getPartitions().size(); i++) {
PartRangeDeleteParam param = new PartRangeDeleteParam(tableId, td.version, td.tupleType(), td.keyMapping());
PartRangeDeleteParam param = new PartRangeDeleteParam(
tableId, td.version, td.tupleType(), td.keyMapping(), td.getCodecVersion());
Vertex deleteVertex = new Vertex(PART_RANGE_DELETE, param);
task = job.getOrCreate(currentLocation, idGenerator);
deleteVertex.setId(idGenerator.getOperatorId(task.getId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ public void exec(OperationContext context) {

List<Column> columnDefinitions = coprocessor.getResultSchema().getSchemas();
KeyValueCodec codec = new KeyValueCodec(
DingoKeyValueCodec.of(context.getTableId().entityId(), columnDefinitions), columnDefinitions
DingoKeyValueCodec.of(context.getTable().getCodecVersion(),
context.getTableId().entityId(), columnDefinitions), columnDefinitions
);
context.<Iterator<KeyValue>[]>result()[context.getSeq()] = new CoprocessorIterator(
columnDefinitions, codec, scanResult, context.getTableId().entityId()
Expand Down
Loading
Loading