-
Notifications
You must be signed in to change notification settings - Fork 163
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add CometRowToColumnar operator #206
Conversation
common/src/main/scala/org/apache/spark/sql/comet/execution/arrow/CometArrowConverters.scala
Show resolved
Hide resolved
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #206 +/- ##
============================================
- Coverage 33.48% 33.36% -0.13%
- Complexity 776 791 +15
============================================
Files 108 111 +3
Lines 37178 37479 +301
Branches 8146 8192 +46
============================================
+ Hits 12448 12503 +55
- Misses 22107 22351 +244
- Partials 2623 2625 +2 ☔ View full report in Codecov by Sentry. |
d6858fa
to
47cb668
Compare
@@ -78,6 +83,9 @@ object Utils { | |||
case _: ArrowType.FixedSizeBinary => BinaryType | |||
case d: ArrowType.Decimal => DecimalType(d.getPrecision, d.getScale) | |||
case date: ArrowType.Date if date.getUnit == DateUnit.DAY => DateType | |||
case ts: ArrowType.Timestamp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the TimestampNTZType
case is missed.
import org.apache.spark.sql.errors.QueryExecutionErrors | ||
import org.apache.spark.sql.types._ | ||
|
||
private[arrow] object ArrowWriter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's mostly from Spark's side.
Since we are shading Arrow in Comet, we cannot use this code directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be useful to add some comments on which Spark class this is from, to help it get better maintained in future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course, let me add some comments.
Sorry for the delay @advancedxy . I'll try to take a look soon. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this looks good, I just have some minor comments so far.
import org.apache.spark.sql.errors.QueryExecutionErrors | ||
import org.apache.spark.sql.types._ | ||
|
||
private[arrow] object ArrowWriter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be useful to add some comments on which Spark class this is from, to help it get better maintained in future.
case (BinaryType, vector: LargeVarBinaryVector) => new LargeBinaryWriter(vector) | ||
case (DateType, vector: DateDayVector) => new DateWriter(vector) | ||
case (TimestampType, vector: TimeStampMicroTZVector) => new TimestampWriter(vector) | ||
case (TimestampNTZType, vector: TimeStampMicroVector) => new TimestampNTZWriter(vector) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not compatible with Spark 3.2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, although the TimestampNTZType is removed in Spark 3.2 after this PR: https://github.com/apache/spark/pull/33444
. It's still possible to access TimestampNTZType
type, which makes the code a lot cleaner for later versions of Spark: we don't have to add special directories for spark 3.2/3.3/3.4 etc.
Per my understanding, since Spark 3.2 will not produce schema with TimestampNTZType
, this pattern match case is effective a no-op case. And it could be effective for spark 3.3 and spark 3.4.
common/src/main/scala/org/apache/spark/sql/comet/execution/arrow/ArrowWriters.scala
Show resolved
Hide resolved
try { | ||
if (!closed) { | ||
if (currentBatch != null) { | ||
arrowWriter.reset() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to close arrowWriter
too? for example close all the ValueVector
s in the writer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no close method for arrowWriter
. The ColumnarBatch
shall be closed to close all the ValueVector
s, which is already achieved by root.close
} | ||
|
||
override def hasNext: Boolean = rowIter.hasNext || { | ||
close(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder why we need to call close
here and whether just calling close
in the TaskCompletionListener
is sufficient. Will this iterator be used again once it drains all the rows from the input iterator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder why we need to call close here and whether just calling close in the TaskCompletionListener is sufficient.
It might not be sufficient to close in TaskCompletionListener
as the task might live much longer than the iterator, for example, a task contains Range -> CometRowToColumnar --> Sort --> ShuffleWrite
, the CometRowToColumnar
will be drained much earlier than Sort or ShuffleWrite. So we need to close the iterator earlier to make sure there's no memory buffering/(leaking).
However due to comments in https://github.com/apache/arrow-datafusion-comet/pull/206/files#diff-04037044481f9a656275a63ebb6a3a63badf866f19700d4a6909d2e17c8d7b72R37-R46, we cannot close the allocator after the iterator is consumed. It's already exported in the native side, it might be dropped later than the iterator consumption. So I add the allocator.close
in the task completion callback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, make sense.
spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
Outdated
Show resolved
Hide resolved
} | ||
new StructWriter(vector, children.toArray) | ||
case (NullType, vector: NullVector) => new NullWriter(vector) | ||
case (_: YearMonthIntervalType, vector: IntervalYearVector) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these are not supported yet for dictionary vector, see CometDictionary
on the check of minor type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I believe this ArrowWriter doesn't produce dictionary encoded ColumnVectors.
But I do believe we should match how org.apache.spark.sql.comet.util.Utils#toArrowType
matches SparkType to ArrowType. Let me comment out the following pattern match cases.
47cb668
to
fb49a88
Compare
@sunchao would you mind to take a look at this again, I should address most of your comments, please let me know if you have any other comments. And sorry for the late update, I wasn't feeling well last week. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for late on the review @advancedxy . Overall LGTM. Please rebase the PR.
} | ||
|
||
override def hasNext: Boolean = rowIter.hasNext || { | ||
close(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, make sense.
"spark.comet.rowToColumnar.enabled") | ||
.internal() | ||
.doc("Whether to enable row to columnar conversion in Comet. When this is turned on, " + | ||
"Comet will convert row-based operators in spark.comet.rowToColumnar.sourceNodeList into " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Comet will convert row-based operators in spark.comet.rowToColumnar.sourceNodeList into " + | |
"Comet will convert row-based data scan operators in spark.comet.rowToColumnar.sourceNodeList into " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, actually CometRowToColumnar is general enough that it can be used to convert any row-based operator to a columnar one.
@@ -238,6 +239,11 @@ class CometSparkSessionExtensions | |||
val nativeOp = QueryPlanSerde.operator2Proto(op).get | |||
CometScanWrapper(nativeOp, op) | |||
|
|||
case op if shouldApplyRowToColumnar(conf, op) => | |||
val cometOp = CometRowToColumnarExec(op) | |||
val nativeOp = QueryPlanSerde.operator2Proto(cometOp).get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, is the source always able to be converted to Comet scan? If there are unsupported types, it will return None
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldApplyRowToColumnar
already checks the output type of op. Only supported types are allowed.
// 2. Consecutive operators of CometRowToColumnarExec and ColumnarToRowExec, which might be | ||
// possible for Comet to add a `CometRowToColumnarExec` for row-based operators first, then | ||
// Spark only requests row-based output. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you actually mean:
Comet adds `CometRowToColumnarExec` on top of row-based data scan operators, but the
downstream operator is Spark operator which takes row-based input. So Spark adds another
`ColumnarToRowExec` after `CometRowToColumnarExec`. In this case, we remove the pair of
`CometRowToColumnarExec` and `ColumnarToRowExec`.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but the
downstream operator is Spark operator which takes row-based input
hmm, this is another possibility, let me update the comment to include this one.
The case I described above is that Spark only requests row-based at the end of the operator, the row-based requirement might be passed down to the CometRowToColumnarExec
and then we have a pair of CometRowToColumnarExec
and ColumnarToRowExec
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refined the comments, hopefully it clarifies things. Please let me know if you have any other comments.
fb49a88
to
2c6a4f7
Compare
spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
Outdated
Show resolved
Hide resolved
dc0841f
to
20cd07a
Compare
Merged, thanks @advancedxy ! If any comments from @viirya are not addressed, we can do it in a separate PR. This PR has been open for too long :) |
Which issue does this PR close?
This closes #119 and partially resolves #137
Rationale for this change
For ease testing with RangeExec operator in the short term.
In the long term, this PR introduce a general way to enable Comet with row-based source exec nodes
What changes are included in this PR?
CometRowToColumnarExec
to transform Spark's InternalRow into ColumnarBatchHow are these changes tested?