-
Notifications
You must be signed in to change notification settings - Fork 949
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Core]Support async lookup in hash store #4423
base: master
Are you sure you want to change the base?
[Core]Support async lookup in hash store #4423
Conversation
@JingsongLi PTAL, thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @neuyilan , I took a rough look and found that there are many thread safety risks in certain areas. Can we go back to this requirement? Is it necessary for us to do multi-threaded access? Is this effective? Why not increase Flink's parallelism?
} | ||
|
||
private synchronized Triple<BinaryRow, Integer, InternalRow> extractPartitionAndBucket( | ||
InternalRow key) { | ||
InternalRow adjustedKey = key; | ||
if (keyRearrange != null) { | ||
adjustedKey = keyRearrange.replaceRow(adjustedKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is reused
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have encapsulated these as a separate function called extractPartartitionAndBucket. And it has been declared as synchronized, so there will be no thread safety issues.
} | ||
|
||
private synchronized Triple<BinaryRow, Integer, InternalRow> extractPartitionAndBucket( | ||
InternalRow key) { | ||
InternalRow adjustedKey = key; | ||
if (keyRearrange != null) { | ||
adjustedKey = keyRearrange.replaceRow(adjustedKey); | ||
} | ||
extractor.setRecord(adjustedKey); | ||
int bucket = extractor.bucket(); | ||
BinaryRow partition = extractor.partition(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is reused
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the same as above
InternalRow adjustedKey = key; | ||
if (keyRearrange != null) { | ||
adjustedKey = keyRearrange.replaceRow(adjustedKey); | ||
} | ||
extractor.setRecord(adjustedKey); | ||
int bucket = extractor.bucket(); | ||
BinaryRow partition = extractor.partition(); | ||
|
||
InternalRow trimmedKey = key; | ||
if (trimmedKeyRearrange != null) { | ||
trimmedKey = trimmedKeyRearrange.replaceRow(trimmedKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is reused
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the same as above
Yes, there are too many thread safety issues in the current design. So essentially, access is synchronous. Even if ' I think supporting asynchronous multi-threaded access is necessary. And in our testing, it was effective. Asynchronous multi-threaded access has the following benefits compared to increasing concurrency in Flink:
|
Hi, @JingsongLi Could you please review it again. |
Currently, lookup join does not really support asynchronous, and the purpose of this PR is to support asynchronous lookup join.
This pr only support the hash store async lookup, in the next pr, I will do the rocksdb store async lookup.