-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-49723][SQL] Add Variant metrics to the JSON File Scan node #48172
base: master
Are you sure you want to change the base?
[SPARK-49723][SQL] Add Variant metrics to the JSON File Scan node #48172
Conversation
common/variant/src/main/java/org/apache/spark/types/variant/VariantBuilder.java
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
Outdated
Show resolved
Hide resolved
val readFile: (PartitionedFile) => Iterator[InternalRow] = { | ||
val hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options) | ||
relation.fileFormat match { | ||
case f: JsonFileFormat => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably make it more general and allow FileFormat
implementations to report additional metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried doing this initially but that required me to unnecessarily change every definition of this method in child classes of FileFormat
which would make the PR bigger. I think there were also some issues after I overrode every definition but I don't fully remember them.
Let me know if you think this suggestion is important. In my opinion, people who wish to add metrics in the future can just follow my idiom.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@harshmotw-db Thanks for this feature! I left a few questions.
common/variant/src/main/java/org/apache/spark/types/variant/VariantBuilder.java
Outdated
Show resolved
Hide resolved
common/variant/src/main/java/org/apache/spark/types/variant/VariantBuilder.java
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@harshmotw-db Thanks! I left a few questions.
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/VariantConstructionMetrics.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/VariantConstructionMetrics.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@harshmotw-db Thanks! I left a few followup comments.
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@harshmotw-db Thanks! I left a few more minor comments.
@@ -628,18 +645,39 @@ case class FileSourceScanExec( | |||
} | |||
} | |||
|
|||
val topLevelVariantMetrics: VariantMetrics = new VariantMetrics() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add some comments for these member variables.
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/VariantConstructionMetrics.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/VariantConstructionMetrics.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@harshmotw-db Thanks for these valuable metrics! I just had 1 minor comment.
LGTM
common/variant/src/main/java/org/apache/spark/types/variant/VariantBuilder.java
Outdated
Show resolved
Hide resolved
@cloud-fan Can you go over this PR again whenever you have time? Thanks! |
What changes were proposed in this pull request?
This pull request adds the following metrics to JSON file scan nodes to collect metrics related to variants being constructed as part of the scan:
Top level and nested variant metrics are separated as they can have different usage patterns.
singleVariantColumn
scans and columns in user-provided schema scans where the column type is a top level variant (not variant nested in a struct/array/map) are considered to be top level variants while variants nested in other data types are considered to be nested variants.Why are the changes needed?
This change allows users to collect metrics on variant usage to better monitor their data/workloads.
Does this PR introduce any user-facing change?
Users will now be able to see variant metrics in JSON scan nodes which were not available earlier.
How was this patch tested?
Comprehensive unit tests in VariantEndToEndSuite.scala
Was this patch authored or co-authored using generative AI tooling?
Yes, got some help related to scala syntax.
Generated by: ChatGPT 4o, GitHub CoPilot.