diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index 6c3b2db14367..6efe8a080bde 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -35,19 +35,39 @@ import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.source.metrics.EqualityDeleteFiles; +import org.apache.iceberg.spark.source.metrics.IndexedDeleteFiles; import org.apache.iceberg.spark.source.metrics.NumDeletes; import org.apache.iceberg.spark.source.metrics.NumSplits; -import org.apache.iceberg.spark.source.metrics.ScannedDataFiles; +import org.apache.iceberg.spark.source.metrics.PositionalDeleteFiles; +import org.apache.iceberg.spark.source.metrics.ResultDataFiles; +import org.apache.iceberg.spark.source.metrics.ResultDeleteFiles; import org.apache.iceberg.spark.source.metrics.ScannedDataManifests; +import org.apache.iceberg.spark.source.metrics.ScannedDeleteManifests; import org.apache.iceberg.spark.source.metrics.SkippedDataFiles; import org.apache.iceberg.spark.source.metrics.SkippedDataManifests; -import org.apache.iceberg.spark.source.metrics.TaskScannedDataFiles; +import org.apache.iceberg.spark.source.metrics.SkippedDeleteFiles; +import org.apache.iceberg.spark.source.metrics.SkippedDeleteManifests; +import org.apache.iceberg.spark.source.metrics.TaskEqualityDeleteFiles; +import org.apache.iceberg.spark.source.metrics.TaskIndexedDeleteFiles; +import org.apache.iceberg.spark.source.metrics.TaskPositionalDeleteFiles; +import org.apache.iceberg.spark.source.metrics.TaskResultDataFiles; +import org.apache.iceberg.spark.source.metrics.TaskResultDeleteFiles; import org.apache.iceberg.spark.source.metrics.TaskScannedDataManifests; +import org.apache.iceberg.spark.source.metrics.TaskScannedDeleteManifests; import org.apache.iceberg.spark.source.metrics.TaskSkippedDataFiles; import org.apache.iceberg.spark.source.metrics.TaskSkippedDataManifests; -import org.apache.iceberg.spark.source.metrics.TaskTotalFileSize; +import org.apache.iceberg.spark.source.metrics.TaskSkippedDeleteFiles; +import org.apache.iceberg.spark.source.metrics.TaskSkippedDeleteManifests; +import org.apache.iceberg.spark.source.metrics.TaskTotalDataFileSize; +import org.apache.iceberg.spark.source.metrics.TaskTotalDataManifests; +import org.apache.iceberg.spark.source.metrics.TaskTotalDeleteFileSize; +import org.apache.iceberg.spark.source.metrics.TaskTotalDeleteManifests; import org.apache.iceberg.spark.source.metrics.TaskTotalPlanningDuration; -import org.apache.iceberg.spark.source.metrics.TotalFileSize; +import org.apache.iceberg.spark.source.metrics.TotalDataFileSize; +import org.apache.iceberg.spark.source.metrics.TotalDataManifests; +import org.apache.iceberg.spark.source.metrics.TotalDeleteFileSize; +import org.apache.iceberg.spark.source.metrics.TotalDeleteManifests; import org.apache.iceberg.spark.source.metrics.TotalPlanningDuration; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PropertyUtil; @@ -200,12 +220,32 @@ public CustomTaskMetric[] reportDriverMetrics() { } List driverMetrics = Lists.newArrayList(); - driverMetrics.add(TaskTotalFileSize.from(scanReport)); + + // common driverMetrics.add(TaskTotalPlanningDuration.from(scanReport)); - driverMetrics.add(TaskSkippedDataFiles.from(scanReport)); - driverMetrics.add(TaskScannedDataFiles.from(scanReport)); - driverMetrics.add(TaskSkippedDataManifests.from(scanReport)); + + // data manifests + driverMetrics.add(TaskTotalDataManifests.from(scanReport)); driverMetrics.add(TaskScannedDataManifests.from(scanReport)); + driverMetrics.add(TaskSkippedDataManifests.from(scanReport)); + + // data files + driverMetrics.add(TaskResultDataFiles.from(scanReport)); + driverMetrics.add(TaskSkippedDataFiles.from(scanReport)); + driverMetrics.add(TaskTotalDataFileSize.from(scanReport)); + + // delete manifests + driverMetrics.add(TaskTotalDeleteManifests.from(scanReport)); + driverMetrics.add(TaskScannedDeleteManifests.from(scanReport)); + driverMetrics.add(TaskSkippedDeleteManifests.from(scanReport)); + + // delete files + driverMetrics.add(TaskTotalDeleteFileSize.from(scanReport)); + driverMetrics.add(TaskResultDeleteFiles.from(scanReport)); + driverMetrics.add(TaskEqualityDeleteFiles.from(scanReport)); + driverMetrics.add(TaskIndexedDeleteFiles.from(scanReport)); + driverMetrics.add(TaskPositionalDeleteFiles.from(scanReport)); + driverMetrics.add(TaskSkippedDeleteFiles.from(scanReport)); return driverMetrics.toArray(new CustomTaskMetric[0]); } @@ -213,14 +253,35 @@ public CustomTaskMetric[] reportDriverMetrics() { @Override public CustomMetric[] supportedCustomMetrics() { return new CustomMetric[] { + // task metrics new NumSplits(), new NumDeletes(), - new TotalFileSize(), + + // common new TotalPlanningDuration(), + + // data manifests + new TotalDataManifests(), new ScannedDataManifests(), new SkippedDataManifests(), - new ScannedDataFiles(), - new SkippedDataFiles() + + // data files + new ResultDataFiles(), + new SkippedDataFiles(), + new TotalDataFileSize(), + + // delete manifests + new TotalDeleteManifests(), + new ScannedDeleteManifests(), + new SkippedDeleteManifests(), + + // delete files + new TotalDeleteFileSize(), + new ResultDeleteFiles(), + new EqualityDeleteFiles(), + new IndexedDeleteFiles(), + new PositionalDeleteFiles(), + new SkippedDeleteFiles() }; } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java new file mode 100644 index 000000000000..754145f7d252 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class EqualityDeleteFiles extends CustomSumMetric { + + static final String NAME = "equalityDeleteFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of equality delete files"; + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/IndexedDeleteFiles.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/IndexedDeleteFiles.java new file mode 100644 index 000000000000..7fc5b9066cdc --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/IndexedDeleteFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class IndexedDeleteFiles extends CustomSumMetric { + + static final String NAME = "indexedDeleteFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of indexed delete files"; + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/PositionalDeleteFiles.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/PositionalDeleteFiles.java new file mode 100644 index 000000000000..5de75776ea4f --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/PositionalDeleteFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class PositionalDeleteFiles extends CustomSumMetric { + + static final String NAME = "positionalDeleteFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of positional delete files"; + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataFiles.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDataFiles.java similarity index 87% rename from spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataFiles.java rename to spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDataFiles.java index f453872fdc29..21959cbf6c63 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataFiles.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDataFiles.java @@ -20,9 +20,9 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric; -public class ScannedDataFiles extends CustomSumMetric { +public class ResultDataFiles extends CustomSumMetric { - static final String NAME = "scannedDataFiles"; + static final String NAME = "resultDataFiles"; @Override public String name() { @@ -31,6 +31,6 @@ public String name() { @Override public String description() { - return "number of scanned data files"; + return "number of result data files"; } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDeleteFiles.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDeleteFiles.java new file mode 100644 index 000000000000..9c6ad2ca328a --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDeleteFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class ResultDeleteFiles extends CustomSumMetric { + + static final String NAME = "resultDeleteFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of result delete files"; + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDeleteManifests.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDeleteManifests.java new file mode 100644 index 000000000000..1fa006b7b193 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDeleteManifests.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class ScannedDeleteManifests extends CustomSumMetric { + + static final String NAME = "scannedDeleteManifests"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of scanned delete manifests"; + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteFiles.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteFiles.java new file mode 100644 index 000000000000..70597be67113 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class SkippedDeleteFiles extends CustomSumMetric { + + static final String NAME = "skippedDeleteFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of skipped delete files"; + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteManifests.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteManifests.java new file mode 100644 index 000000000000..0336170b45a1 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteManifests.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class SkippedDeleteManifests extends CustomSumMetric { + + static final String NAME = "skippedDeleteManifests"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of skipped delete manifest"; + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskEqualityDeleteFiles.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskEqualityDeleteFiles.java new file mode 100644 index 000000000000..ecd14bcca31d --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskEqualityDeleteFiles.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.iceberg.metrics.CounterResult; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; + +public class TaskEqualityDeleteFiles implements CustomTaskMetric { + private final long value; + + private TaskEqualityDeleteFiles(long value) { + this.value = value; + } + + @Override + public String name() { + return EqualityDeleteFiles.NAME; + } + + @Override + public long value() { + return value; + } + + public static TaskEqualityDeleteFiles from(ScanReport scanReport) { + CounterResult counter = scanReport.scanMetrics().equalityDeleteFiles(); + long value = counter != null ? counter.value() : 0L; + return new TaskEqualityDeleteFiles(value); + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskIndexedDeleteFiles.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskIndexedDeleteFiles.java new file mode 100644 index 000000000000..63b6767e955d --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskIndexedDeleteFiles.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.iceberg.metrics.CounterResult; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; + +public class TaskIndexedDeleteFiles implements CustomTaskMetric { + private final long value; + + private TaskIndexedDeleteFiles(long value) { + this.value = value; + } + + @Override + public String name() { + return IndexedDeleteFiles.NAME; + } + + @Override + public long value() { + return value; + } + + public static TaskIndexedDeleteFiles from(ScanReport scanReport) { + CounterResult counter = scanReport.scanMetrics().indexedDeleteFiles(); + long value = counter != null ? counter.value() : 0L; + return new TaskIndexedDeleteFiles(value); + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskPositionalDeleteFiles.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskPositionalDeleteFiles.java new file mode 100644 index 000000000000..805f22bf0d7c --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskPositionalDeleteFiles.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.iceberg.metrics.CounterResult; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; + +public class TaskPositionalDeleteFiles implements CustomTaskMetric { + private final long value; + + private TaskPositionalDeleteFiles(long value) { + this.value = value; + } + + @Override + public String name() { + return PositionalDeleteFiles.NAME; + } + + @Override + public long value() { + return value; + } + + public static TaskPositionalDeleteFiles from(ScanReport scanReport) { + CounterResult counter = scanReport.scanMetrics().positionalDeleteFiles(); + long value = counter != null ? counter.value() : 0L; + return new TaskPositionalDeleteFiles(value); + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskScannedDataFiles.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskResultDataFiles.java similarity index 83% rename from spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskScannedDataFiles.java rename to spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskResultDataFiles.java index d9a527da08f6..a27142131403 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskScannedDataFiles.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskResultDataFiles.java @@ -22,16 +22,16 @@ import org.apache.iceberg.metrics.ScanReport; import org.apache.spark.sql.connector.metric.CustomTaskMetric; -public class TaskScannedDataFiles implements CustomTaskMetric { +public class TaskResultDataFiles implements CustomTaskMetric { private final long value; - private TaskScannedDataFiles(long value) { + private TaskResultDataFiles(long value) { this.value = value; } @Override public String name() { - return ScannedDataFiles.NAME; + return ResultDataFiles.NAME; } @Override @@ -39,9 +39,9 @@ public long value() { return value; } - public static TaskScannedDataFiles from(ScanReport scanReport) { + public static TaskResultDataFiles from(ScanReport scanReport) { CounterResult counter = scanReport.scanMetrics().resultDataFiles(); long value = counter != null ? counter.value() : 0L; - return new TaskScannedDataFiles(value); + return new TaskResultDataFiles(value); } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskResultDeleteFiles.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskResultDeleteFiles.java new file mode 100644 index 000000000000..aea8ca07dd05 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskResultDeleteFiles.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.iceberg.metrics.CounterResult; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; + +public class TaskResultDeleteFiles implements CustomTaskMetric { + private final long value; + + private TaskResultDeleteFiles(long value) { + this.value = value; + } + + @Override + public String name() { + return ResultDeleteFiles.NAME; + } + + @Override + public long value() { + return value; + } + + public static TaskResultDeleteFiles from(ScanReport scanReport) { + CounterResult counter = scanReport.scanMetrics().resultDeleteFiles(); + long value = counter != null ? counter.value() : 0L; + return new TaskResultDeleteFiles(value); + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskScannedDeleteManifests.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskScannedDeleteManifests.java new file mode 100644 index 000000000000..1766cf2f6835 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskScannedDeleteManifests.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.iceberg.metrics.CounterResult; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; + +public class TaskScannedDeleteManifests implements CustomTaskMetric { + private final long value; + + private TaskScannedDeleteManifests(long value) { + this.value = value; + } + + @Override + public String name() { + return ScannedDeleteManifests.NAME; + } + + @Override + public long value() { + return value; + } + + public static TaskScannedDeleteManifests from(ScanReport scanReport) { + CounterResult counter = scanReport.scanMetrics().scannedDeleteManifests(); + long value = counter != null ? counter.value() : 0L; + return new TaskScannedDeleteManifests(value); + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskSkippedDeleteFiles.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskSkippedDeleteFiles.java new file mode 100644 index 000000000000..87579751742c --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskSkippedDeleteFiles.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.iceberg.metrics.CounterResult; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; + +public class TaskSkippedDeleteFiles implements CustomTaskMetric { + private final long value; + + private TaskSkippedDeleteFiles(long value) { + this.value = value; + } + + @Override + public String name() { + return SkippedDeleteFiles.NAME; + } + + @Override + public long value() { + return value; + } + + public static TaskSkippedDeleteFiles from(ScanReport scanReport) { + CounterResult counter = scanReport.scanMetrics().skippedDeleteFiles(); + long value = counter != null ? counter.value() : 0L; + return new TaskSkippedDeleteFiles(value); + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskSkippedDeleteManifests.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskSkippedDeleteManifests.java new file mode 100644 index 000000000000..4a9c71e0c1e4 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskSkippedDeleteManifests.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.iceberg.metrics.CounterResult; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; + +public class TaskSkippedDeleteManifests implements CustomTaskMetric { + private final long value; + + private TaskSkippedDeleteManifests(long value) { + this.value = value; + } + + @Override + public String name() { + return SkippedDeleteManifests.NAME; + } + + @Override + public long value() { + return value; + } + + public static TaskSkippedDeleteManifests from(ScanReport scanReport) { + CounterResult counter = scanReport.scanMetrics().skippedDeleteManifests(); + long value = counter != null ? counter.value() : 0L; + return new TaskSkippedDeleteManifests(value); + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalFileSize.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalDataFileSize.java similarity index 83% rename from spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalFileSize.java rename to spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalDataFileSize.java index c300d835e777..3f5a224425d8 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalFileSize.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalDataFileSize.java @@ -22,17 +22,17 @@ import org.apache.iceberg.metrics.ScanReport; import org.apache.spark.sql.connector.metric.CustomTaskMetric; -public class TaskTotalFileSize implements CustomTaskMetric { +public class TaskTotalDataFileSize implements CustomTaskMetric { private final long value; - private TaskTotalFileSize(long value) { + private TaskTotalDataFileSize(long value) { this.value = value; } @Override public String name() { - return TotalFileSize.NAME; + return TotalDataFileSize.NAME; } @Override @@ -40,9 +40,9 @@ public long value() { return value; } - public static TaskTotalFileSize from(ScanReport scanReport) { + public static TaskTotalDataFileSize from(ScanReport scanReport) { CounterResult counter = scanReport.scanMetrics().totalFileSizeInBytes(); long value = counter != null ? counter.value() : 0L; - return new TaskTotalFileSize(value); + return new TaskTotalDataFileSize(value); } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalDataManifests.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalDataManifests.java new file mode 100644 index 000000000000..6d8c3c24e460 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalDataManifests.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.iceberg.metrics.CounterResult; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; + +public class TaskTotalDataManifests implements CustomTaskMetric { + private final long value; + + private TaskTotalDataManifests(long value) { + this.value = value; + } + + @Override + public String name() { + return TotalDataManifests.NAME; + } + + @Override + public long value() { + return value; + } + + public static TaskTotalDataManifests from(ScanReport scanReport) { + CounterResult counter = scanReport.scanMetrics().totalDataManifests(); + long value = counter != null ? counter.value() : 0L; + return new TaskTotalDataManifests(value); + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalDeleteFileSize.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalDeleteFileSize.java new file mode 100644 index 000000000000..17ecec78da3f --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalDeleteFileSize.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.iceberg.metrics.CounterResult; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; + +public class TaskTotalDeleteFileSize implements CustomTaskMetric { + + private final long value; + + private TaskTotalDeleteFileSize(long value) { + this.value = value; + } + + @Override + public String name() { + return TotalDeleteFileSize.NAME; + } + + @Override + public long value() { + return value; + } + + public static TaskTotalDeleteFileSize from(ScanReport scanReport) { + CounterResult counter = scanReport.scanMetrics().totalDeleteFileSizeInBytes(); + long value = counter != null ? counter.value() : 0L; + return new TaskTotalDeleteFileSize(value); + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalDeleteManifests.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalDeleteManifests.java new file mode 100644 index 000000000000..ff55c1be89e3 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalDeleteManifests.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.iceberg.metrics.CounterResult; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; + +public class TaskTotalDeleteManifests implements CustomTaskMetric { + private final long value; + + private TaskTotalDeleteManifests(long value) { + this.value = value; + } + + @Override + public String name() { + return TotalDeleteManifests.NAME; + } + + @Override + public long value() { + return value; + } + + public static TaskTotalDeleteManifests from(ScanReport scanReport) { + CounterResult counter = scanReport.scanMetrics().totalDeleteManifests(); + long value = counter != null ? counter.value() : 0L; + return new TaskTotalDeleteManifests(value); + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFileSize.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFileSize.java new file mode 100644 index 000000000000..b1ff8a46368c --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFileSize.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class TotalDataFileSize extends CustomSumMetric { + + static final String NAME = "totalDataFileSize"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "total data file size (bytes)"; + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalFileSize.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java similarity index 87% rename from spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalFileSize.java rename to spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java index 994626e54f10..de8f04be7767 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalFileSize.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java @@ -20,9 +20,9 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric; -public class TotalFileSize extends CustomSumMetric { +public class TotalDataManifests extends CustomSumMetric { - static final String NAME = "totalFileSize"; + static final String NAME = "totalDataManifest"; @Override public String name() { @@ -31,6 +31,6 @@ public String name() { @Override public String description() { - return "total file size (bytes)"; + return "total data manifests"; } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFileSize.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFileSize.java new file mode 100644 index 000000000000..da4303325273 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFileSize.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class TotalDeleteFileSize extends CustomSumMetric { + + static final String NAME = "totalDeleteFileSize"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "total delete file size (bytes)"; + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteManifests.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteManifests.java new file mode 100644 index 000000000000..7442dfdb6ffb --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteManifests.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class TotalDeleteManifests extends CustomSumMetric { + + static final String NAME = "totalDeleteManifests"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "total delete manifests"; + } +} diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadMetrics.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadMetrics.java index 7b943372d167..fea5a1d3e1c3 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadMetrics.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadMetrics.java @@ -43,7 +43,7 @@ public void removeTables() { @Test public void testReadMetricsForV1Table() throws NoSuchTableException { sql( - "CREATE TABLE %s (id BIGINT) USING iceberg TBLPROPERTIES ('format-version'='2')", + "CREATE TABLE %s (id BIGINT) USING iceberg TBLPROPERTIES ('format-version'='1')", tableName); spark.range(10000).coalesce(1).writeTo(tableName).append(); @@ -56,12 +56,31 @@ public void testReadMetricsForV1Table() throws NoSuchTableException { seqAsJavaListConverter(df.queryExecution().executedPlan().collectLeaves()).asJava(); Map metricsMap = JavaConverters.mapAsJavaMapConverter(sparkPlans.get(0).metrics()).asJava(); - Assertions.assertThat(metricsMap.get("skippedDataFiles").value()).isEqualTo(1); + // Common + Assertions.assertThat(metricsMap.get("totalPlanningDuration").value()).isNotEqualTo(0); + + // data manifests + Assertions.assertThat(metricsMap.get("totalDataManifest").value()).isEqualTo(2); Assertions.assertThat(metricsMap.get("scannedDataManifests").value()).isEqualTo(2); - Assertions.assertThat(metricsMap.get("scannedDataFiles").value()).isEqualTo(1); Assertions.assertThat(metricsMap.get("skippedDataManifests").value()).isEqualTo(0); - Assertions.assertThat(metricsMap.get("totalFileSize").value()).isNotEqualTo(0); - Assertions.assertThat(metricsMap.get("totalPlanningDuration").value()).isNotEqualTo(0); + + // data files + Assertions.assertThat(metricsMap.get("resultDataFiles").value()).isEqualTo(1); + Assertions.assertThat(metricsMap.get("skippedDataFiles").value()).isEqualTo(1); + Assertions.assertThat(metricsMap.get("totalDataFileSize").value()).isNotEqualTo(0); + + // delete manifests + Assertions.assertThat(metricsMap.get("totalDeleteManifests").value()).isEqualTo(0); + Assertions.assertThat(metricsMap.get("scannedDeleteManifests").value()).isEqualTo(0); + Assertions.assertThat(metricsMap.get("skippedDeleteManifests").value()).isEqualTo(0); + + // delete files + Assertions.assertThat(metricsMap.get("totalDeleteFileSize").value()).isEqualTo(0); + Assertions.assertThat(metricsMap.get("resultDeleteFiles").value()).isEqualTo(0); + Assertions.assertThat(metricsMap.get("equalityDeleteFiles").value()).isEqualTo(0); + Assertions.assertThat(metricsMap.get("indexedDeleteFiles").value()).isEqualTo(0); + Assertions.assertThat(metricsMap.get("positionalDeleteFiles").value()).isEqualTo(0); + Assertions.assertThat(metricsMap.get("skippedDeleteFiles").value()).isEqualTo(0); } @Test @@ -80,11 +99,82 @@ public void testReadMetricsForV2Table() throws NoSuchTableException { seqAsJavaListConverter(df.queryExecution().executedPlan().collectLeaves()).asJava(); Map metricsMap = JavaConverters.mapAsJavaMapConverter(sparkPlans.get(0).metrics()).asJava(); - Assertions.assertThat(metricsMap.get("skippedDataFiles").value()).isEqualTo(1); + + // Common + Assertions.assertThat(metricsMap.get("totalPlanningDuration").value()).isNotEqualTo(0); + + // data manifests + Assertions.assertThat(metricsMap.get("totalDataManifest").value()).isEqualTo(2); Assertions.assertThat(metricsMap.get("scannedDataManifests").value()).isEqualTo(2); - Assertions.assertThat(metricsMap.get("scannedDataFiles").value()).isEqualTo(1); Assertions.assertThat(metricsMap.get("skippedDataManifests").value()).isEqualTo(0); - Assertions.assertThat(metricsMap.get("totalFileSize").value()).isNotEqualTo(0); + + // data files + Assertions.assertThat(metricsMap.get("resultDataFiles").value()).isEqualTo(1); + Assertions.assertThat(metricsMap.get("skippedDataFiles").value()).isEqualTo(1); + Assertions.assertThat(metricsMap.get("totalDataFileSize").value()).isNotEqualTo(0); + + // delete manifests + Assertions.assertThat(metricsMap.get("totalDeleteManifests").value()).isEqualTo(0); + Assertions.assertThat(metricsMap.get("scannedDeleteManifests").value()).isEqualTo(0); + Assertions.assertThat(metricsMap.get("skippedDeleteManifests").value()).isEqualTo(0); + + // delete files + Assertions.assertThat(metricsMap.get("totalDeleteFileSize").value()).isEqualTo(0); + Assertions.assertThat(metricsMap.get("resultDeleteFiles").value()).isEqualTo(0); + Assertions.assertThat(metricsMap.get("equalityDeleteFiles").value()).isEqualTo(0); + Assertions.assertThat(metricsMap.get("indexedDeleteFiles").value()).isEqualTo(0); + Assertions.assertThat(metricsMap.get("positionalDeleteFiles").value()).isEqualTo(0); + Assertions.assertThat(metricsMap.get("skippedDeleteFiles").value()).isEqualTo(0); + } + + @Test + public void testDeleteMetrics() throws NoSuchTableException { + sql( + "CREATE TABLE %s (id BIGINT)" + + " USING iceberg" + + " TBLPROPERTIES (\n" + + " 'write.delete.mode'='merge-on-read',\n" + + " 'write.update.mode'='merge-on-read',\n" + + " 'write.merge.mode'='merge-on-read',\n" + + " 'format-version'='2'\n" + + " )", + tableName); + + spark.range(10000).coalesce(1).writeTo(tableName).append(); + + spark.sql(String.format("DELETE FROM %s WHERE id = 1", tableName)).collect(); + Dataset df = spark.sql(String.format("SELECT * FROM %s", tableName)); + df.collect(); + + List sparkPlans = + seqAsJavaListConverter(df.queryExecution().executedPlan().collectLeaves()).asJava(); + Map metricsMap = + JavaConverters.mapAsJavaMapConverter(sparkPlans.get(0).metrics()).asJava(); + + // Common Assertions.assertThat(metricsMap.get("totalPlanningDuration").value()).isNotEqualTo(0); + + // data manifests + Assertions.assertThat(metricsMap.get("totalDataManifest").value()).isEqualTo(1); + Assertions.assertThat(metricsMap.get("scannedDataManifests").value()).isEqualTo(1); + Assertions.assertThat(metricsMap.get("skippedDataManifests").value()).isEqualTo(0); + + // data files + Assertions.assertThat(metricsMap.get("resultDataFiles").value()).isEqualTo(1); + Assertions.assertThat(metricsMap.get("skippedDataFiles").value()).isEqualTo(0); + Assertions.assertThat(metricsMap.get("totalDataFileSize").value()).isNotEqualTo(0); + + // delete manifests + Assertions.assertThat(metricsMap.get("totalDeleteManifests").value()).isEqualTo(1); + Assertions.assertThat(metricsMap.get("scannedDeleteManifests").value()).isEqualTo(1); + Assertions.assertThat(metricsMap.get("skippedDeleteManifests").value()).isEqualTo(0); + + // delete files + Assertions.assertThat(metricsMap.get("totalDeleteFileSize").value()).isNotEqualTo(0); + Assertions.assertThat(metricsMap.get("resultDeleteFiles").value()).isEqualTo(1); + Assertions.assertThat(metricsMap.get("equalityDeleteFiles").value()).isEqualTo(0); + Assertions.assertThat(metricsMap.get("indexedDeleteFiles").value()).isEqualTo(1); + Assertions.assertThat(metricsMap.get("positionalDeleteFiles").value()).isEqualTo(1); + Assertions.assertThat(metricsMap.get("skippedDeleteFiles").value()).isEqualTo(0); } }