-
Notifications
You must be signed in to change notification settings - Fork 946
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[spark] Implement distributed orphan file clean for spark #4207
Conversation
@ulysses-you reopen this pr plz, we can review it continue. |
+1,nice. |
collectWithoutDataFile(branch, snapshot, usedFileConsumer, manifestConsumer) | ||
usedFileBuffer | ||
} | ||
.toDS() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we cache this DS to avoid computing twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed
} | ||
val manifestConsumer = new Consumer[ManifestFileMeta] { | ||
override def accept(t: ManifestFileMeta): Unit = { | ||
isManifestMetaFile = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks very weird, and this protocol may be break.
Can you just refactor collectWithoutDataFile
to pass only one Consumer<Pair<String, Boolean>>
, file and isManifestFile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree, changed to use Consumer<Pair<String, Boolean>>
Please add close #4184 in comment too. |
@@ -104,7 +104,7 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { | |||
checkAnswer( | |||
spark.sql( | |||
s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than', dry_run => true)"), | |||
Row(orphanFile1.toUri.getPath) :: Row(orphanFile2.toUri.getPath) :: Nil | |||
Row(2) :: Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe need add a test line for each case at end with 'checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0) :: Nil)' to comfire no orphan files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Purpose
This pr supports distributed orphan file clean for spark using
Dataset
interface.Tests
add test
API and Format
no
Documentation