|
26 | 26 | import java.io.UncheckedIOException;
|
27 | 27 | import java.net.URI;
|
28 | 28 | import java.sql.Timestamp;
|
29 |
| -import java.util.ArrayList; |
30 | 29 | import java.util.Collections;
|
31 | 30 | import java.util.Iterator;
|
32 | 31 | import java.util.List;
|
@@ -246,7 +245,7 @@ private String jobDesc() {
|
246 | 245 |
|
247 | 246 | private SerializableConsumer<Iterator<String>> bulkDeleteFiles(SupportsBulkOperations io) {
|
248 | 247 | return stringIterator -> {
|
249 |
| - ArrayList<String> paths = Lists.newArrayList(stringIterator); |
| 248 | + List<String> paths = Lists.newArrayList(stringIterator); |
250 | 249 | try {
|
251 | 250 | io.deleteFiles(paths);
|
252 | 251 | } catch (BulkDeletionFailureException e) {
|
@@ -444,14 +443,15 @@ Dataset<String> findOrphanFiles(
|
444 | 443 | if (orphanFileDF != null) {
|
445 | 444 | return orphanFileDF;
|
446 | 445 | }
|
447 |
| - SetAccumulator<Pair<String, String>> conflicts = accumulator(); |
448 |
| - spark.sparkContext().register(conflicts); |
| 446 | + SetAccumulator<Pair<String, String>> conflictsAccumulator = accumulator(); |
| 447 | + spark.sparkContext().register(conflictsAccumulator); |
449 | 448 | Column joinCond = actualFileIdentDS.col("path").equalTo(validFileIdentDS.col("path"));
|
450 | 449 |
|
451 | 450 | orphanFileDF =
|
452 | 451 | actualFileIdentDS
|
453 | 452 | .joinWith(validFileIdentDS, joinCond, "leftouter")
|
454 |
| - .mapPartitions(new FindOrphanFiles(prefixMismatchMode, conflicts), Encoders.STRING()); |
| 453 | + .mapPartitions( |
| 454 | + new FindOrphanFiles(prefixMismatchMode, conflictsAccumulator), Encoders.STRING()); |
455 | 455 | return orphanFileDF;
|
456 | 456 | }
|
457 | 457 |
|
|
0 commit comments