-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Enhancement] Optimize iceberg mor performance of iceberg equality delete #51050
Conversation
6565280
to
6796e0a
Compare
Signed-off-by: stephen <[email protected]>
6796e0a
to
162e88d
Compare
@@ -578,6 +581,29 @@ void HdfsScannerContext::append_or_update_partition_column_to_chunk(ChunkPtr* ch | |||
ck->set_num_rows(row_count); | |||
} | |||
|
|||
void HdfsScannerContext::append_or_update_extended_column_to_chunk(ChunkPtr* chunk, size_t row_count) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is the same as the partition column, why not merge them and make it more general instead of rewriting the same logic again
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, please consider to merge into that function, it will be less error-prone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
didn't see you change anything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this commit b918629
@@ -0,0 +1,88 @@ | |||
// Copyright 2021-present StarRocks, Inc. All rights reserved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we cannot abstract this to leverage connector API, we are not able to abstract the ConnectorScanNode
in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we will do this in the future.
|
||
long limit = scanOperator.getLimit(); | ||
ColumnRefFactory columnRefFactory = context.getColumnRefFactory(); | ||
boolean hasPartitionEvolution = deleteSchemas.stream().map(x -> x.specId).distinct().count() > 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the timeline of operation is as follows:
T1: insert data
T2: partition evolution
T3: delete data
the distinct spec id of the delete schema is 1 but it does partition evolution
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this only represents whether this query need to add spec_id to extended columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So you mean the eq delete files generated in T3 can delete data in T1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no. the eq delete files generated in T3 won't be matched by any data files.
} else { | ||
_materialize_slots.push_back(slots[i]); | ||
_materialize_index_in_chunk.push_back(i); | ||
} | ||
} | ||
|
||
if (_scan_range.__isset.delete_column_slot_ids && !_scan_range.delete_column_slot_ids.empty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can just remove these now? I think it may have problems when user upgrade
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have tested it. we will throw an exception.
ERROR 1064 (HY000): Unsupported iceberg file content: 2 in the scanner thread.
Relatively few users of mor scene.
.map(schema -> schema.equalityIds) | ||
.flatMap(List::stream) | ||
.distinct() | ||
.map(fieldId -> nativeTable.schema().findColumnName(fieldId)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
native table schema may doesn't have the column in delete schema if table has schema change like drop column?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iceberg doesn't allow dropping an identifier column.
return Utils.createCompound(CompoundPredicateOperator.CompoundType.AND, onOps); | ||
} | ||
|
||
private LogicalIcebergScanOperator buildNewScanOperatorWithUnselectedField(Set<DeleteSchema> deleteSchemas, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
buildNewScanOperatorWithExtendedField?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it includes not only extended columns, but also the identifier columns that are not selected in user's query.
@@ -1420,12 +1435,15 @@ public PlanFragment visitPhysicalIcebergScan(OptExpression optExpression, ExecPl | |||
.add(ScalarOperatorToExpr.buildExecExpression(predicate, formatterContext)); | |||
} | |||
|
|||
icebergScanNode.preProcessIcebergPredicate(node.getPredicate()); | |||
ScalarOperator icebergPredicate = !isEqDeleteScan ? node.getPredicate() : | |||
((PhysicalIcebergEqualityDeleteScanOperator) node).getOriginPredicate(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the difference of originPredicate and predicate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the schema of iceberg_equality_table is a subset of iceberg_table. eg: iceberg table schema: [c1, c2, c3]. the identifier column is c1. if the query predicate is c1 > 1 and c2 < 3
, it can't be used as a predicate of equality_table. so we named it originPredicate in the equality_table.
LogicalIcebergEqualityDeleteScanOperator eqScanOp = new LogicalIcebergEqualityDeleteScanOperator( | ||
equalityDeleteTable, colRefToColumn.build(), columnToColRef.build(), -1, null, | ||
scanOperator.getTableVersionRange()); | ||
eqScanOp.setOriginPredicate(newScanOp.getPredicate()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why eq scan operator need this? OnPredicateMoveAroundRule is not enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to get the scan range of equality_table from query level cache by originPredicate
.
Signed-off-by: stephen <[email protected]>
Signed-off-by: stephen <[email protected]>
Quality Gate passedIssues Measures |
[Java-Extensions Incremental Coverage Report]✅ pass : 0 / 0 (0%) |
[FE Incremental Coverage Report]✅ pass : 386 / 407 (94.84%) file detail
|
[BE Incremental Coverage Report]✅ pass : 70 / 74 (94.59%) file detail
|
boolean hasPartitionEvolution = deleteSchemas.stream().map(x -> x.specId).distinct().count() > 1; | ||
if (hasPartitionEvolution && !context.getSessionVariable().enableReadIcebergEqDeleteWithPartitionEvolution()) { | ||
throw new StarRocksConnectorException("Equality delete files aren't supported for tables with partition evolution." + | ||
"You can execute `set enable_read_iceberg_equality_delete_with_partition_evolution = true` then rerun it"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why need this enable_read_iceberg_equality_delete_with_partition_evolution variable? can we just suppport it by default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because there is a semantic inconsistency.
|
||
double rowCount = 0; | ||
Set<String> seenFiles = new HashSet<>(); | ||
for (FileScanTask fileScanTask : remoteFileDesc.getIcebergScanTasks()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's worthy to get all file scan task to get the row count? may be we can just set row count to a small number
hdfsScanRange.setOffset(file.content() == FileContent.DATA ? task.start() : 0); | ||
hdfsScanRange.setLength(file.content() == FileContent.DATA ? task.length() : file.fileSizeInBytes()); | ||
// For iceberg table we do not need partition id | ||
if (!idToPartitionSlots.containsKey(partitionId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is unpatititoned iceberg? the comment looks wired
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove this comment in the next patch.
…lete (StarRocks#51050) Signed-off-by: stephen <[email protected]>
…lete (StarRocks#51050) Signed-off-by: stephen <[email protected]> Signed-off-by: zhiminr.ren <[email protected]>
Why I'm doing:
The current implementation of Iceberg reading eq-delete file for mor is to do a local left anti join in each scanner thread in units of scan range.
There are three problems with this:
What I'm doing:
This patch implements iceberg equality deletes as a join, rather than reading the data file as the left table and the delete file as the right table for local left anti join. This optimization replaces the previous solution of using local hash joiner in each scanner thread. Compared to the previous solution, the main purpose is to reduce the overhead of repeatedly reading delete files and repeatedly building hashtable since a iceberg equality delete file may be matched by many data files after iceberg planning. This rule needs to strictly meet the check requirements before it can be rewritten.
Three are three conditions that need to rewrite:
We'll rewrite three patterns.
The first common case:
iceberg identifier column (also the same as pk) are
identifier_col
andp1
.The second case with mutable pk column:
pk column before altering the table is
k1
.pk column after altering the table is
k1
.The third case with partition evolution.
Partition Table With 1 delete schema: [k1, p1], Partition column: [p1]. Write some records to this table.
Then alter table partition field (partition evolution): (p1 -> bucket(5, p1)). Write some records to this table.
Fixes #issue
some poc tests
TODO:
What type of PR is this:
Does this PR entail a change in behavior?
If yes, please specify the type of change:
Checklist:
Bugfix cherry-pick branch check: