From 6dce06c11a2509cfa7db20b4dd217b978379abb5 Mon Sep 17 00:00:00 2001 From: yejunhao Date: Tue, 12 Mar 2024 21:46:40 +0800 Subject: [PATCH 1/9] [core] Remove file io from splits and Add preferFileIo for Paimon catalog. --- .../apache/paimon/catalog/CatalogContext.java | 26 ++++-- .../java/org/apache/paimon/fs/FileIO.java | 20 ++++- .../table/system/AggregationFieldsTable.java | 14 ++-- .../paimon/table/system/BranchesTable.java | 13 +-- .../paimon/table/system/ConsumersTable.java | 13 +-- .../paimon/table/system/FilesTable.java | 83 +++++++++---------- .../paimon/table/system/ManifestsTable.java | 15 ++-- .../paimon/table/system/OptionsTable.java | 12 +-- .../paimon/table/system/PartitionsTable.java | 28 ++++--- .../paimon/table/system/SchemasTable.java | 14 ++-- .../paimon/table/system/SnapshotsTable.java | 20 +++-- .../apache/paimon/table/system/TagsTable.java | 12 +-- 12 files changed, 157 insertions(+), 113 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java b/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java index 8b1c450d6cf7..de1490e19a05 100644 --- a/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java +++ b/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java @@ -41,14 +41,17 @@ public class CatalogContext { private final Options options; private final Configuration hadoopConf; + @Nullable private final FileIOLoader preferIOLoader; @Nullable private final FileIOLoader fallbackIOLoader; private CatalogContext( Options options, @Nullable Configuration hadoopConf, + @Nullable FileIOLoader preferIOLoader, @Nullable FileIOLoader fallbackIOLoader) { this.options = checkNotNull(options); this.hadoopConf = hadoopConf == null ? getHadoopConfiguration(options) : hadoopConf; + this.preferIOLoader = preferIOLoader; this.fallbackIOLoader = fallbackIOLoader; } @@ -59,20 +62,28 @@ public static CatalogContext create(Path warehouse) { } public static CatalogContext create(Options options) { - return new CatalogContext(options, null, null); + return new CatalogContext(options, null, null, null); } public static CatalogContext create(Options options, Configuration hadoopConf) { - return new CatalogContext(options, hadoopConf, null); + return new CatalogContext(options, hadoopConf, null, null); } public static CatalogContext create(Options options, FileIOLoader fallbackIOLoader) { - return new CatalogContext(options, null, fallbackIOLoader); + return new CatalogContext(options, null, null, fallbackIOLoader); } public static CatalogContext create( - Options options, Configuration hadoopConf, FileIOLoader fallbackIOLoader) { - return new CatalogContext(options, hadoopConf, fallbackIOLoader); + Options options, FileIOLoader preferIOLoader, FileIOLoader fallbackIOLoader) { + return new CatalogContext(options, null, preferIOLoader, fallbackIOLoader); + } + + public static CatalogContext create( + Options options, + Configuration hadoopConf, + FileIOLoader preferIOLoader, + FileIOLoader fallbackIOLoader) { + return new CatalogContext(options, hadoopConf, preferIOLoader, fallbackIOLoader); } public Options options() { @@ -84,6 +95,11 @@ public Configuration hadoopConf() { return hadoopConf; } + @Nullable + public FileIOLoader preIO() { + return preferIOLoader; + } + @Nullable public FileIOLoader fallbackIO() { return fallbackIOLoader; diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index 352d5726a7dc..d5751b7133cd 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -316,14 +316,26 @@ static FileIO get(Path path, CatalogContext config) throws IOException { + "')"); } - Map loaders = discoverLoaders(); - FileIOLoader loader = loaders.get(uri.getScheme()); + FileIOLoader loader = null; + + List ioExceptionList = new ArrayList<>(); + + // load preIO + FileIOLoader preIO = config.preIO(); + try { + loader = checkAccess(preIO, path, config); + } catch (IOException ioException) { + ioExceptionList.add(ioException); + } + + if (loader == null) { + Map loaders = discoverLoaders(); + loader = loaders.get(uri.getScheme()); + } // load fallbackIO FileIOLoader fallbackIO = config.fallbackIO(); - List ioExceptionList = new ArrayList<>(); - if (loader != null) { Set options = config.options().keySet().stream() diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java index 940ea3b00c43..29bec8502d46 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java @@ -120,7 +120,11 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { - return () -> Collections.singletonList(new AggregationSplit(fileIO, location)); + return () -> + Collections.singletonList( + new AggregationSplit( + new SchemaManager(fileIO, location).listAllIds().size(), + location)); } } @@ -129,17 +133,17 @@ private static class AggregationSplit implements Split { private static final long serialVersionUID = 1L; - private final FileIO fileIO; + private final long rowCount; private final Path location; - private AggregationSplit(FileIO fileIO, Path location) { - this.fileIO = fileIO; + private AggregationSplit(long rowCount, Path location) { + this.rowCount = rowCount; this.location = location; } @Override public long rowCount() { - return new SchemaManager(fileIO, location).listAllIds().size(); + return rowCount; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java index da568aaff06a..7ae31095cf99 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java @@ -122,25 +122,26 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { - return () -> Collections.singletonList(new BranchesSplit(fileIO, location)); + FileStoreTable table = FileStoreTableFactory.create(fileIO, location); + long rowCount = table.branchManager().branchCount(); + return () -> Collections.singletonList(new BranchesSplit(rowCount, location)); } } private static class BranchesSplit implements Split { private static final long serialVersionUID = 1L; - private final FileIO fileIO; + private final long rowCount; private final Path location; - private BranchesSplit(FileIO fileIO, Path location) { - this.fileIO = fileIO; + private BranchesSplit(long rowCount, Path location) { + this.rowCount = rowCount; this.location = location; } @Override public long rowCount() { - FileStoreTable table = FileStoreTableFactory.create(fileIO, location); - return table.branchManager().branchCount(); + return rowCount; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java index 896acfccce1c..a3ec3017eb5b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java @@ -115,7 +115,10 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { return () -> - Collections.singletonList(new ConsumersTable.ConsumersSplit(fileIO, location)); + Collections.singletonList( + new ConsumersTable.ConsumersSplit( + new ConsumerManager(fileIO, location).listAllIds().size(), + location)); } } @@ -124,17 +127,17 @@ private static class ConsumersSplit implements Split { private static final long serialVersionUID = 1L; - private final FileIO fileIO; + private final long rowCount; private final Path location; - private ConsumersSplit(FileIO fileIO, Path location) { - this.fileIO = fileIO; + private ConsumersSplit(long rowCount, Path location) { + this.rowCount = rowCount; this.location = location; } @Override public long rowCount() { - return new ConsumerManager(fileIO, location).listAllIds().size(); + return rowCount; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java index a10a8e771ca1..eb98da6b183c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java @@ -139,7 +139,8 @@ public InnerTableScan newScan() { @Override public InnerTableRead newRead() { - return new FilesRead(new SchemaManager(storeTable.fileIO(), storeTable.location())); + return new FilesRead( + new SchemaManager(storeTable.fileIO(), storeTable.location()), storeTable); } @Override @@ -185,47 +186,12 @@ public InnerTableScan withFilter(Predicate pushdown) { @Override public Plan innerPlan() { - return () -> - Collections.singletonList( - new FilesSplit( - storeTable, - partitionPredicate, - bucketPredicate, - levelPredicate)); - } - } - - private static class FilesSplit implements Split { - - private static final long serialVersionUID = 1L; - - private final FileStoreTable storeTable; - - @Nullable private final LeafPredicate partitionPredicate; - @Nullable private final LeafPredicate bucketPredicate; - @Nullable private final LeafPredicate levelPredicate; - - private FilesSplit( - FileStoreTable storeTable, - @Nullable LeafPredicate partitionPredicate, - @Nullable LeafPredicate bucketPredicate, - @Nullable LeafPredicate levelPredicate) { - this.storeTable = storeTable; - this.partitionPredicate = partitionPredicate; - this.bucketPredicate = bucketPredicate; - this.levelPredicate = levelPredicate; - } - - @Override - public long rowCount() { + // plan here, just set the result of plan to split TableScan.Plan plan = plan(); - return plan.splits().stream() - .map(s -> (DataSplit) s) - .mapToLong(s -> s.dataFiles().size()) - .sum(); + return () -> Collections.singletonList(new FilesSplit(plan)); } - private TableScan.Plan plan() { + private TableScan.Plan tablePlan() { InnerTableScan scan = storeTable.newScan(); if (partitionPredicate != null) { if (partitionPredicate.function() instanceof Equal) { @@ -263,6 +229,29 @@ private TableScan.Plan plan() { } return scan.plan(); } + } + + private static class FilesSplit implements Split { + + private static final long serialVersionUID = 1L; + + private final TableScan.Plan plan; + + private FilesSplit(TableScan.Plan plan) { + this.plan = plan; + } + + @Override + public long rowCount() { + return plan.splits().stream() + .map(s -> (DataSplit) s) + .mapToLong(s -> s.dataFiles().size()) + .sum(); + } + + public TableScan.Plan plan() { + return plan; + } @Override public boolean equals(Object o) { @@ -273,12 +262,12 @@ public boolean equals(Object o) { return false; } FilesSplit that = (FilesSplit) o; - return Objects.equals(storeTable, that.storeTable); + return Objects.equals(plan, that.plan); } @Override public int hashCode() { - return Objects.hash(storeTable); + return Objects.hash(plan); } } @@ -286,10 +275,13 @@ private static class FilesRead implements InnerTableRead { private final SchemaManager schemaManager; + private final FileStoreTable storeTable; + private int[][] projection; - private FilesRead(SchemaManager schemaManager) { + private FilesRead(SchemaManager schemaManager, FileStoreTable fileStoreTable) { this.schemaManager = schemaManager; + this.storeTable = fileStoreTable; } @Override @@ -315,7 +307,6 @@ public RecordReader createReader(Split split) throws IOException { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } FilesSplit filesSplit = (FilesSplit) split; - FileStoreTable table = filesSplit.storeTable; TableScan.Plan plan = filesSplit.plan(); if (plan.splits().isEmpty()) { return new IteratorRecordReader<>(Collections.emptyIterator()); @@ -326,10 +317,10 @@ public RecordReader createReader(Split split) throws IOException { // schema id directly FieldStatsConverters fieldStatsConverters = new FieldStatsConverters( - sid -> schemaManager.schema(sid).fields(), table.schema().id()); + sid -> schemaManager.schema(sid).fields(), storeTable.schema().id()); RowDataToObjectArrayConverter partitionConverter = - new RowDataToObjectArrayConverter(table.schema().logicalPartitionType()); + new RowDataToObjectArrayConverter(storeTable.schema().logicalPartitionType()); Function keyConverters = new Function() { @@ -362,7 +353,7 @@ public RowDataToObjectArrayConverter apply(Long schemaId) { partitionConverter, keyConverters, file, - table.getSchemaFieldStats(file), + storeTable.getSchemaFieldStats(file), fieldStatsConverters))); } Iterator rows = Iterators.concat(iteratorList.iterator()); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java index 8f922cc156aa..d5e9f1647b56 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java @@ -124,7 +124,10 @@ public InnerTableScan withFilter(Predicate predicate) { @Override protected Plan innerPlan() { - return () -> Collections.singletonList(new ManifestsSplit(fileIO, location, dataTable)); + return () -> + Collections.singletonList( + new ManifestsSplit( + allManifests(fileIO, location, dataTable).size(), location)); } } @@ -132,19 +135,17 @@ private static class ManifestsSplit implements Split { private static final long serialVersionUID = 1L; - private final FileIO fileIO; + private final long rowCount; private final Path location; - private final Table dataTable; - private ManifestsSplit(FileIO fileIO, Path location, Table dataTable) { - this.fileIO = fileIO; + private ManifestsSplit(long rowCount, Path location) { + this.rowCount = rowCount; this.location = location; - this.dataTable = dataTable; } @Override public long rowCount() { - return allManifests(fileIO, location, dataTable).size(); + return rowCount; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java index 81dfa094c2fb..b740ddec1e2b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java @@ -112,7 +112,9 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { - return () -> Collections.singletonList(new OptionsSplit(fileIO, location)); + return () -> + Collections.singletonList( + new OptionsSplit(options(fileIO, location).size(), location)); } } @@ -120,17 +122,17 @@ private static class OptionsSplit implements Split { private static final long serialVersionUID = 1L; - private final FileIO fileIO; + private final long rowCount; private final Path location; - private OptionsSplit(FileIO fileIO, Path location) { - this.fileIO = fileIO; + private OptionsSplit(long rowCount, Path location) { + this.rowCount = rowCount; this.location = location; } @Override public long rowCount() { - return options(fileIO, location).size(); + return rowCount; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java index 877f9b7ef2ff..e8f3b45c831f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java @@ -110,7 +110,7 @@ public InnerTableScan newScan() { @Override public InnerTableRead newRead() { - return new PartitionsRead(); + return new PartitionsRead(storeTable); } @Override @@ -134,7 +134,8 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { - return () -> Collections.singletonList(new PartitionsSplit(storeTable)); + return () -> + Collections.singletonList(new PartitionsSplit(storeTable.newScan().plan())); } } @@ -142,15 +143,14 @@ private static class PartitionsSplit implements Split { private static final long serialVersionUID = 1L; - private final FileStoreTable storeTable; + private final TableScan.Plan plan; - private PartitionsSplit(FileStoreTable storeTable) { - this.storeTable = storeTable; + private PartitionsSplit(TableScan.Plan plan) { + this.plan = plan; } @Override public long rowCount() { - TableScan.Plan plan = plan(); return plan.splits().stream() .map(s -> ((DataSplit) s).partition()) .collect(Collectors.toSet()) @@ -158,7 +158,7 @@ public long rowCount() { } private TableScan.Plan plan() { - return storeTable.newScan().plan(); + return plan; } @Override @@ -170,19 +170,25 @@ public boolean equals(Object o) { return false; } PartitionsSplit that = (PartitionsSplit) o; - return Objects.equals(storeTable, that.storeTable); + return Objects.equals(plan, that.plan); } @Override public int hashCode() { - return Objects.hash(storeTable); + return Objects.hash(plan); } } private static class PartitionsRead implements InnerTableRead { + private final FileStoreTable fileStoreTable; + private int[][] projection; + public PartitionsRead(FileStoreTable table) { + this.fileStoreTable = table; + } + @Override public InnerTableRead withFilter(Predicate predicate) { // TODO @@ -206,14 +212,14 @@ public RecordReader createReader(Split split) throws IOException { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } PartitionsSplit filesSplit = (PartitionsSplit) split; - FileStoreTable table = filesSplit.storeTable; TableScan.Plan plan = filesSplit.plan(); if (plan.splits().isEmpty()) { return new IteratorRecordReader<>(Collections.emptyIterator()); } List> iteratorList = new ArrayList<>(); RowDataToObjectArrayConverter partitionConverter = - new RowDataToObjectArrayConverter(table.schema().logicalPartitionType()); + new RowDataToObjectArrayConverter( + fileStoreTable.schema().logicalPartitionType()); for (Split dataSplit : plan.splits()) { iteratorList.add( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java index b56b48539463..127323b42b50 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java @@ -127,7 +127,11 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { - return () -> Collections.singletonList(new SchemasSplit(fileIO, location)); + return () -> + Collections.singletonList( + new SchemasSplit( + new SchemaManager(fileIO, location).listAllIds().size(), + location)); } } @@ -136,17 +140,17 @@ private static class SchemasSplit implements Split { private static final long serialVersionUID = 1L; - private final FileIO fileIO; + private final long rowCount; private final Path location; - private SchemasSplit(FileIO fileIO, Path location) { - this.fileIO = fileIO; + private SchemasSplit(long rowCount, Path location) { + this.rowCount = rowCount; this.location = location; } @Override public long rowCount() { - return new SchemaManager(fileIO, location).listAllIds().size(); + return rowCount; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java index ee9cff1d1d7b..8d6b545d908e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java @@ -146,7 +146,13 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { - return () -> Collections.singletonList(new SnapshotsSplit(fileIO, location)); + long rowCount; + try { + rowCount = new SnapshotManager(fileIO, location).snapshotCount(); + } catch (IOException e) { + throw new RuntimeException(e); + } + return () -> Collections.singletonList(new SnapshotsSplit(rowCount, location)); } } @@ -154,21 +160,17 @@ private static class SnapshotsSplit implements Split { private static final long serialVersionUID = 1L; - private final FileIO fileIO; + private final long rowCount; private final Path location; - private SnapshotsSplit(FileIO fileIO, Path location) { - this.fileIO = fileIO; + private SnapshotsSplit(long rowCount, Path location) { this.location = location; + this.rowCount = rowCount; } @Override public long rowCount() { - try { - return new SnapshotManager(fileIO, location).snapshotCount(); - } catch (IOException e) { - throw new RuntimeException(e); - } + return rowCount; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java index 8027da2f6d67..ebd8aa7af0db 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java @@ -129,24 +129,26 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { - return () -> Collections.singletonList(new TagsSplit(fileIO, location)); + return () -> + Collections.singletonList( + new TagsSplit(new TagManager(fileIO, location).tagCount(), location)); } } private static class TagsSplit implements Split { private static final long serialVersionUID = 1L; - private final FileIO fileIO; + private final long rowCount; private final Path location; - private TagsSplit(FileIO fileIO, Path location) { - this.fileIO = fileIO; + private TagsSplit(long rowCount, Path location) { + this.rowCount = rowCount; this.location = location; } @Override public long rowCount() { - return new TagManager(fileIO, location).tagCount(); + return rowCount; } @Override From ce4fddc7a4e96bfba2f9f47f528a955afdd7ab4c Mon Sep 17 00:00:00 2001 From: yejunhao Date: Tue, 12 Mar 2024 21:51:32 +0800 Subject: [PATCH 2/9] fix minus --- .../src/main/java/org/apache/paimon/fs/FileIO.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index d5751b7133cd..e77c6f3efdea 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -386,6 +386,10 @@ static FileIO get(Path path, CatalogContext config) throws IOException { if (loader == null) { String fallbackMsg = ""; + String preMsg = ""; + if (preIO != null) { + preMsg = " " + preIO.getClass().getSimpleName() + " also cannot access this path."; + } if (fallbackIO != null) { fallbackMsg = " " @@ -396,8 +400,8 @@ static FileIO get(Path path, CatalogContext config) throws IOException { new UnsupportedSchemeException( String.format( "Could not find a file io implementation for scheme '%s' in the classpath." - + "%s Hadoop FileSystem also cannot access this path '%s'.", - uri.getScheme(), fallbackMsg, path)); + + "%s %s Hadoop FileSystem also cannot access this path '%s'.", + uri.getScheme(), preMsg, fallbackMsg, path)); for (IOException ioException : ioExceptionList) { ex.addSuppressed(ioException); } From 31c0f5b632e32360915d001380abe992d6824a88 Mon Sep 17 00:00:00 2001 From: yejunhao Date: Tue, 12 Mar 2024 22:03:38 +0800 Subject: [PATCH 3/9] fix minus --- paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java | 1 - 1 file changed, 1 deletion(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index e77c6f3efdea..6da755afe482 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -317,7 +317,6 @@ static FileIO get(Path path, CatalogContext config) throws IOException { } FileIOLoader loader = null; - List ioExceptionList = new ArrayList<>(); // load preIO From feba38ec828fa76a7916e09818f0815319b11feb Mon Sep 17 00:00:00 2001 From: yejunhao Date: Tue, 12 Mar 2024 22:23:07 +0800 Subject: [PATCH 4/9] fix minus --- .../main/java/org/apache/paimon/table/system/FilesTable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java index eb98da6b183c..d9f5d1ff26c4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java @@ -187,7 +187,7 @@ public InnerTableScan withFilter(Predicate pushdown) { @Override public Plan innerPlan() { // plan here, just set the result of plan to split - TableScan.Plan plan = plan(); + TableScan.Plan plan = tablePlan(); return () -> Collections.singletonList(new FilesSplit(plan)); } From 3ba36d76bedbb57306866a68eb51cdf431e0f415 Mon Sep 17 00:00:00 2001 From: yejunhao Date: Tue, 12 Mar 2024 23:37:31 +0800 Subject: [PATCH 5/9] fix error of plan serialize --- .../paimon/table/source/DataFilePlan.java | 3 +- .../table/source/InnerTableScanImpl.java | 21 +------ .../paimon/table/source/SimplePlan.java | 56 +++++++++++++++++++ .../snapshot/IncrementalStartingScanner.java | 20 +------ .../source/snapshot/SnapshotReaderImpl.java | 42 +------------- 5 files changed, 64 insertions(+), 78 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/source/SimplePlan.java diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java index d1355f1bbec8..8c2b47c848a1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java @@ -20,12 +20,13 @@ import org.apache.paimon.table.source.snapshot.StartingScanner; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.List; /** Scanning plan containing snapshot ID and input splits. */ -public class DataFilePlan implements TableScan.Plan { +public class DataFilePlan implements TableScan.Plan, Serializable { private final List splits; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java index b08da568ebbe..316fa8c27ff1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java @@ -25,8 +25,6 @@ import org.apache.paimon.table.source.snapshot.StartingScanner; import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult; -import javax.annotation.Nullable; - import java.util.ArrayList; import java.util.List; @@ -93,24 +91,7 @@ private StartingScanner.Result applyPushDownLimit(StartingScanner.Result result) } SnapshotReader.Plan newPlan = - new SnapshotReader.Plan() { - @Nullable - @Override - public Long watermark() { - return plan.watermark(); - } - - @Nullable - @Override - public Long snapshotId() { - return plan.snapshotId(); - } - - @Override - public List splits() { - return limitedSplits; - } - }; + new SimplePlan(plan.watermark(), plan.snapshotId(), limitedSplits); return new ScannedResult(newPlan); } else { return result; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/SimplePlan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/SimplePlan.java new file mode 100644 index 000000000000..0aa15cc92ed2 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/SimplePlan.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source; + +import org.apache.paimon.table.source.snapshot.SnapshotReader; + +import org.jetbrains.annotations.Nullable; + +import java.io.Serializable; +import java.util.List; + +public class SimplePlan implements SnapshotReader.Plan, Serializable { + + private final Long watermark; + private final Long snapshotId; + private final List splits; + + public SimplePlan(Long watermark, Long snapshotId, List splits) { + this.watermark = watermark; + this.snapshotId = snapshotId; + this.splits = splits; + } + + @Nullable + @Override + public Long watermark() { + return watermark; + } + + @Nullable + @Override + public Long snapshotId() { + return snapshotId; + } + + @Override + public List splits() { + return splits; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java index 17df85832de0..afde9a1aab0b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java @@ -24,7 +24,7 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.ScanMode; -import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.SimplePlan; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; @@ -78,23 +78,7 @@ public Result scan(SnapshotReader reader) { } } - return StartingScanner.fromPlan( - new SnapshotReader.Plan() { - @Override - public Long watermark() { - return null; - } - - @Override - public Long snapshotId() { - return endingSnapshotId; - } - - @Override - public List splits() { - return (List) result; - } - }); + return StartingScanner.fromPlan(new SimplePlan(null, endingSnapshotId, (List) result)); } private List readSplits(SnapshotReader reader, Snapshot s) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index 241f42395628..e933eda9b8f2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -40,7 +40,7 @@ import org.apache.paimon.table.source.DeletionFile; import org.apache.paimon.table.source.RawFile; import org.apache.paimon.table.source.ScanMode; -import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.SimplePlan; import org.apache.paimon.table.source.SplitGenerator; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; @@ -49,8 +49,6 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TypeUtils; -import javax.annotation.Nullable; - import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -252,24 +250,7 @@ public Plan read() { scanMode != ScanMode.ALL, splitGenerator, files); - return new Plan() { - @Nullable - @Override - public Long watermark() { - return plan.watermark(); - } - - @Nullable - @Override - public Long snapshotId() { - return plan.snapshotId(); - } - - @Override - public List splits() { - return (List) splits; - } - }; + return new SimplePlan(plan.watermark(), plan.snapshotId(), (List) splits); } private List generateSplits( @@ -401,24 +382,7 @@ private Plan toChangesPlan( } } - return new Plan() { - @Nullable - @Override - public Long watermark() { - return plan.watermark(); - } - - @Nullable - @Override - public Long snapshotId() { - return plan.snapshotId(); - } - - @Override - public List splits() { - return (List) splits; - } - }; + return new SimplePlan(plan.watermark(), plan.snapshotId(), (List) splits); } @Override From 5934e69bba7e4a34aa41d036522638134b5b3ccd Mon Sep 17 00:00:00 2001 From: yejunhao Date: Wed, 13 Mar 2024 09:53:59 +0800 Subject: [PATCH 6/9] fix minus --- .../src/main/java/org/apache/paimon/table/source/SimplePlan.java | 1 + 1 file changed, 1 insertion(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/SimplePlan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/SimplePlan.java index 0aa15cc92ed2..c297e3723c20 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/SimplePlan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/SimplePlan.java @@ -25,6 +25,7 @@ import java.io.Serializable; import java.util.List; +/** A simple implementation of {@link SnapshotReader.Plan}. */ public class SimplePlan implements SnapshotReader.Plan, Serializable { private final Long watermark; From 2c308a22f8ef1f513be28c37566a290a6703f50d Mon Sep 17 00:00:00 2001 From: yejunhao Date: Wed, 13 Mar 2024 10:24:24 +0800 Subject: [PATCH 7/9] fix minus --- .../apache/paimon/table/source/snapshot/SnapshotReaderImpl.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index e933eda9b8f2..54e30dccc830 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -49,6 +49,8 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TypeUtils; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; From 3e0101fb30a355f27058898a9cd265643a23c45c Mon Sep 17 00:00:00 2001 From: yejunhao Date: Wed, 13 Mar 2024 13:47:43 +0800 Subject: [PATCH 8/9] fix comment --- .../apache/paimon/catalog/CatalogContext.java | 2 +- .../main/java/org/apache/paimon/fs/FileIO.java | 17 ++++++++++------- .../paimon/table/source/InnerTableScanImpl.java | 2 +- .../source/{SimplePlan.java => PlanImpl.java} | 6 +++--- .../snapshot/IncrementalStartingScanner.java | 4 ++-- .../source/snapshot/SnapshotReaderImpl.java | 6 +++--- 6 files changed, 20 insertions(+), 17 deletions(-) rename paimon-core/src/main/java/org/apache/paimon/table/source/{SimplePlan.java => PlanImpl.java} (87%) diff --git a/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java b/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java index de1490e19a05..bef6565fe708 100644 --- a/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java +++ b/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java @@ -96,7 +96,7 @@ public Configuration hadoopConf() { } @Nullable - public FileIOLoader preIO() { + public FileIOLoader preferIO() { return preferIOLoader; } diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index 6da755afe482..84c1040ea24d 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -319,10 +319,10 @@ static FileIO get(Path path, CatalogContext config) throws IOException { FileIOLoader loader = null; List ioExceptionList = new ArrayList<>(); - // load preIO - FileIOLoader preIO = config.preIO(); + // load preferIO + FileIOLoader perferIOLoader = config.preferIO(); try { - loader = checkAccess(preIO, path, config); + loader = checkAccess(perferIOLoader, path, config); } catch (IOException ioException) { ioExceptionList.add(ioException); } @@ -385,9 +385,12 @@ static FileIO get(Path path, CatalogContext config) throws IOException { if (loader == null) { String fallbackMsg = ""; - String preMsg = ""; - if (preIO != null) { - preMsg = " " + preIO.getClass().getSimpleName() + " also cannot access this path."; + String preferMsg = ""; + if (perferIOLoader != null) { + preferMsg = + " " + + perferIOLoader.getClass().getSimpleName() + + " also cannot access this path."; } if (fallbackIO != null) { fallbackMsg = @@ -400,7 +403,7 @@ static FileIO get(Path path, CatalogContext config) throws IOException { String.format( "Could not find a file io implementation for scheme '%s' in the classpath." + "%s %s Hadoop FileSystem also cannot access this path '%s'.", - uri.getScheme(), preMsg, fallbackMsg, path)); + uri.getScheme(), preferMsg, fallbackMsg, path)); for (IOException ioException : ioExceptionList) { ex.addSuppressed(ioException); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java index 316fa8c27ff1..b123802dbac0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java @@ -91,7 +91,7 @@ private StartingScanner.Result applyPushDownLimit(StartingScanner.Result result) } SnapshotReader.Plan newPlan = - new SimplePlan(plan.watermark(), plan.snapshotId(), limitedSplits); + new PlanImpl(plan.watermark(), plan.snapshotId(), limitedSplits); return new ScannedResult(newPlan); } else { return result; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/SimplePlan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/PlanImpl.java similarity index 87% rename from paimon-core/src/main/java/org/apache/paimon/table/source/SimplePlan.java rename to paimon-core/src/main/java/org/apache/paimon/table/source/PlanImpl.java index c297e3723c20..b5e861e02665 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/SimplePlan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/PlanImpl.java @@ -25,14 +25,14 @@ import java.io.Serializable; import java.util.List; -/** A simple implementation of {@link SnapshotReader.Plan}. */ -public class SimplePlan implements SnapshotReader.Plan, Serializable { +/** An implementation of {@link SnapshotReader.Plan}. */ +public class PlanImpl implements SnapshotReader.Plan, Serializable { private final Long watermark; private final Long snapshotId; private final List splits; - public SimplePlan(Long watermark, Long snapshotId, List splits) { + public PlanImpl(Long watermark, Long snapshotId, List splits) { this.watermark = watermark; this.snapshotId = snapshotId; this.splits = splits; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java index afde9a1aab0b..49ab3a87e764 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java @@ -23,8 +23,8 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.PlanImpl; import org.apache.paimon.table.source.ScanMode; -import org.apache.paimon.table.source.SimplePlan; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; @@ -78,7 +78,7 @@ public Result scan(SnapshotReader reader) { } } - return StartingScanner.fromPlan(new SimplePlan(null, endingSnapshotId, (List) result)); + return StartingScanner.fromPlan(new PlanImpl(null, endingSnapshotId, (List) result)); } private List readSplits(SnapshotReader reader, Snapshot s) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index 54e30dccc830..9c8126ffe3a1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -38,9 +38,9 @@ import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.DeletionFile; +import org.apache.paimon.table.source.PlanImpl; import org.apache.paimon.table.source.RawFile; import org.apache.paimon.table.source.ScanMode; -import org.apache.paimon.table.source.SimplePlan; import org.apache.paimon.table.source.SplitGenerator; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; @@ -252,7 +252,7 @@ public Plan read() { scanMode != ScanMode.ALL, splitGenerator, files); - return new SimplePlan(plan.watermark(), plan.snapshotId(), (List) splits); + return new PlanImpl(plan.watermark(), plan.snapshotId(), (List) splits); } private List generateSplits( @@ -384,7 +384,7 @@ private Plan toChangesPlan( } } - return new SimplePlan(plan.watermark(), plan.snapshotId(), (List) splits); + return new PlanImpl(plan.watermark(), plan.snapshotId(), (List) splits); } @Override From de25f97bd4bc76644307bb003eb70a9ad2764281 Mon Sep 17 00:00:00 2001 From: yejunhao Date: Wed, 13 Mar 2024 14:52:47 +0800 Subject: [PATCH 9/9] fix comment --- .../paimon/table/source/DataFilePlan.java | 3 +-- .../apache/paimon/table/source/PlanImpl.java | 3 +-- .../table/system/AllTableOptionsTable.java | 18 +++++++------ .../paimon/table/system/FilesTable.java | 23 ++++++++--------- .../paimon/table/system/PartitionsTable.java | 25 +++++++++---------- 5 files changed, 36 insertions(+), 36 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java index 8c2b47c848a1..d1355f1bbec8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java @@ -20,13 +20,12 @@ import org.apache.paimon.table.source.snapshot.StartingScanner; -import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.List; /** Scanning plan containing snapshot ID and input splits. */ -public class DataFilePlan implements TableScan.Plan, Serializable { +public class DataFilePlan implements TableScan.Plan { private final List splits; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/PlanImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/PlanImpl.java index b5e861e02665..84c9ece9e181 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/PlanImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/PlanImpl.java @@ -22,11 +22,10 @@ import org.jetbrains.annotations.Nullable; -import java.io.Serializable; import java.util.List; /** An implementation of {@link SnapshotReader.Plan}. */ -public class PlanImpl implements SnapshotReader.Plan, Serializable { +public class PlanImpl implements SnapshotReader.Plan { private final Long watermark; private final Long snapshotId; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java index 5bb47e6e0d18..a7b1236bfd48 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java @@ -121,7 +121,13 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { - return () -> Collections.singletonList(new AllTableSplit(fileIO, allTablePaths)); + return () -> + Collections.singletonList( + new AllTableSplit( + options(fileIO, allTablePaths).values().stream() + .flatMap(t -> t.values().stream()) + .reduce(0, (a, b) -> a + b.size(), Integer::sum), + allTablePaths)); } } @@ -129,19 +135,17 @@ private static class AllTableSplit implements Split { private static final long serialVersionUID = 1L; - private final FileIO fileIO; + private final long rowCount; private final Map> allTablePaths; - private AllTableSplit(FileIO fileIO, Map> allTablePaths) { - this.fileIO = fileIO; + private AllTableSplit(long rowCount, Map> allTablePaths) { + this.rowCount = rowCount; this.allTablePaths = allTablePaths; } @Override public long rowCount() { - return options(fileIO, allTablePaths).values().stream() - .flatMap(t -> t.values().stream()) - .reduce(0, (a, b) -> a + b.size(), Integer::sum); + return rowCount; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java index d9f5d1ff26c4..0501dab08527 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java @@ -188,7 +188,7 @@ public InnerTableScan withFilter(Predicate pushdown) { public Plan innerPlan() { // plan here, just set the result of plan to split TableScan.Plan plan = tablePlan(); - return () -> Collections.singletonList(new FilesSplit(plan)); + return () -> Collections.singletonList(new FilesSplit(plan.splits())); } private TableScan.Plan tablePlan() { @@ -235,22 +235,22 @@ private static class FilesSplit implements Split { private static final long serialVersionUID = 1L; - private final TableScan.Plan plan; + private final List splits; - private FilesSplit(TableScan.Plan plan) { - this.plan = plan; + private FilesSplit(List splits) { + this.splits = splits; } @Override public long rowCount() { - return plan.splits().stream() + return splits.stream() .map(s -> (DataSplit) s) .mapToLong(s -> s.dataFiles().size()) .sum(); } - public TableScan.Plan plan() { - return plan; + public List splits() { + return splits; } @Override @@ -262,12 +262,12 @@ public boolean equals(Object o) { return false; } FilesSplit that = (FilesSplit) o; - return Objects.equals(plan, that.plan); + return Objects.equals(splits, that.splits); } @Override public int hashCode() { - return Objects.hash(plan); + return Objects.hash(splits); } } @@ -307,8 +307,7 @@ public RecordReader createReader(Split split) throws IOException { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } FilesSplit filesSplit = (FilesSplit) split; - TableScan.Plan plan = filesSplit.plan(); - if (plan.splits().isEmpty()) { + if (filesSplit.splits().isEmpty()) { return new IteratorRecordReader<>(Collections.emptyIterator()); } @@ -343,7 +342,7 @@ public RowDataToObjectArrayConverter apply(Long schemaId) { }); } }; - for (Split dataSplit : plan.splits()) { + for (Split dataSplit : filesSplit.splits()) { iteratorList.add( Iterators.transform( ((DataSplit) dataSplit).dataFiles().iterator(), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java index e8f3b45c831f..4f1899394b04 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java @@ -36,7 +36,6 @@ import org.apache.paimon.table.source.ReadOnceTableScan; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; -import org.apache.paimon.table.source.TableScan; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; @@ -135,7 +134,8 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { return () -> - Collections.singletonList(new PartitionsSplit(storeTable.newScan().plan())); + Collections.singletonList( + new PartitionsSplit(storeTable.newScan().plan().splits())); } } @@ -143,22 +143,22 @@ private static class PartitionsSplit implements Split { private static final long serialVersionUID = 1L; - private final TableScan.Plan plan; + private final List splits; - private PartitionsSplit(TableScan.Plan plan) { - this.plan = plan; + private PartitionsSplit(List splits) { + this.splits = splits; } @Override public long rowCount() { - return plan.splits().stream() + return splits.stream() .map(s -> ((DataSplit) s).partition()) .collect(Collectors.toSet()) .size(); } - private TableScan.Plan plan() { - return plan; + private List splits() { + return splits; } @Override @@ -170,12 +170,12 @@ public boolean equals(Object o) { return false; } PartitionsSplit that = (PartitionsSplit) o; - return Objects.equals(plan, that.plan); + return Objects.equals(splits, that.splits); } @Override public int hashCode() { - return Objects.hash(plan); + return Objects.hash(splits); } } @@ -212,8 +212,7 @@ public RecordReader createReader(Split split) throws IOException { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } PartitionsSplit filesSplit = (PartitionsSplit) split; - TableScan.Plan plan = filesSplit.plan(); - if (plan.splits().isEmpty()) { + if (filesSplit.splits().isEmpty()) { return new IteratorRecordReader<>(Collections.emptyIterator()); } List> iteratorList = new ArrayList<>(); @@ -221,7 +220,7 @@ public RecordReader createReader(Split split) throws IOException { new RowDataToObjectArrayConverter( fileStoreTable.schema().logicalPartitionType()); - for (Split dataSplit : plan.splits()) { + for (Split dataSplit : filesSplit.splits()) { iteratorList.add( Iterators.transform( ((DataSplit) dataSplit).dataFiles().iterator(),