Skip to content

Commit

Permalink
Add shim layers for GpuWindowInPandas. (NVIDIA#1124)
Browse files Browse the repository at this point in the history
Databricks Spark requires different output columns from that of Apache Spark.

Also add the python daemon module since Databricks changes the API.

Signed-off-by: Firestarman <[email protected]>
  • Loading branch information
firestarman authored Nov 17, 2020
1 parent cc0bfa3 commit 9148176
Show file tree
Hide file tree
Showing 9 changed files with 348 additions and 41 deletions.
2 changes: 0 additions & 2 deletions integration_tests/src/main/python/udf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
pytestmark = pytest.mark.skip(reason=str(e))

from asserts import assert_gpu_and_cpu_are_equal_collect
from conftest import is_databricks_runtime
from data_gen import *
from marks import incompat, approximate_float, allow_non_gpu, ignore_order
from pyspark.sql import Window
Expand Down Expand Up @@ -150,7 +149,6 @@ def pandas_sum(to_process: pd.Series) -> int:
conf=arrow_udf_conf)


@pytest.mark.xfail(is_databricks_runtime(), reason='https://github.com/NVIDIA/spark-rapids/issues/1122')
@ignore_order
@pytest.mark.parametrize('data_gen', [byte_gen, short_gen, int_gen], ids=idfn)
@pytest.mark.parametrize('window', udf_windows, ids=window_ids)
Expand Down
168 changes: 168 additions & 0 deletions python/rapids/daemon_databricks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os
import signal
import select
import socket
import sys
import traceback
import time
import gc
from errno import EINTR, EAGAIN
from socket import AF_INET, SOCK_STREAM, SOMAXCONN
from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN

from pyspark.serializers import read_int, write_int, UTF8Deserializer
from pyspark.daemon import worker

from rapids.worker import initialize_gpu_mem
utf8_deserializer = UTF8Deserializer()


def manager():
# Create a new process group to corral our children
os.setpgid(0, 0)

# Create a listening socket on the AF_INET loopback interface
listen_sock = socket.socket(AF_INET, SOCK_STREAM)
listen_sock.bind(('127.0.0.1', 0))
listen_sock.listen(max(1024, SOMAXCONN))
listen_host, listen_port = listen_sock.getsockname()

# re-open stdin/stdout in 'wb' mode
stdin_bin = os.fdopen(sys.stdin.fileno(), 'rb', 4)
stdout_bin = os.fdopen(sys.stdout.fileno(), 'wb', 4)
write_int(listen_port, stdout_bin)
stdout_bin.flush()

def shutdown(code):
signal.signal(SIGTERM, SIG_DFL)
# Send SIGHUP to notify workers of shutdown
os.kill(0, SIGHUP)
sys.exit(code)

def handle_sigterm(*args):
shutdown(1)
signal.signal(SIGTERM, handle_sigterm) # Gracefully exit on SIGTERM
signal.signal(SIGHUP, SIG_IGN) # Don't die on SIGHUP
signal.signal(SIGCHLD, SIG_IGN)

reuse = os.environ.get("SPARK_REUSE_WORKER")

# Initialization complete
try:
while True:
try:
ready_fds = select.select([0, listen_sock], [], [], 1)[0]
except select.error as ex:
if ex[0] == EINTR:
continue
else:
raise

if 0 in ready_fds:
try:
worker_pid = read_int(stdin_bin)
except EOFError:
# Spark told us to exit by closing stdin
shutdown(0)
try:
os.kill(worker_pid, signal.SIGKILL)
except OSError:
pass # process already died

if listen_sock in ready_fds:
try:
sock, _ = listen_sock.accept()
except OSError as e:
if e.errno == EINTR:
continue
raise

# Launch a worker process
try:
pid = os.fork()
except OSError as e:
if e.errno in (EAGAIN, EINTR):
time.sleep(1)
pid = os.fork() # error here will shutdown daemon
else:
outfile = sock.makefile(mode='wb')
write_int(e.errno, outfile) # Signal that the fork failed
outfile.flush()
outfile.close()
sock.close()
continue

if pid == 0:
# in child process
listen_sock.close()

# It should close the standard input in the child process so that
# Python native function executions stay intact.
#
# Note that if we just close the standard input (file descriptor 0),
# the lowest file descriptor (file descriptor 0) will be allocated,
# later when other file descriptors should happen to open.
#
# Therefore, here we redirects it to '/dev/null' by duplicating
# another file descriptor for '/dev/null' to the standard input (0).
# See SPARK-26175.
devnull = open(os.devnull, 'r')
os.dup2(devnull.fileno(), 0)
devnull.close()

try:
# GPU context setup
initialize_gpu_mem()

infile = sock.makefile(mode="rb")
executor_username = utf8_deserializer.loads(infile)
# Acknowledge that the fork was successful
outfile = sock.makefile(mode="wb")
write_int(os.getpid(), outfile)
outfile.flush()
outfile.close()
authenticated = False
while True:
code = worker(sock, authenticated, executor_username)
if code == 0:
authenticated = True
if not reuse or code:
# wait for closing
try:
while sock.recv(1024):
pass
except Exception:
pass
break
gc.collect()
except:
traceback.print_exc()
os._exit(1)
else:
os._exit(0)
else:
sock.close()

finally:
shutdown(1)


if __name__ == '__main__':
manager()
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims.spark300

import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, NamedExpression, SortOrder}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.rapids.execution.python.GpuWindowInPandasExecBase
import org.apache.spark.sql.vectorized.ColumnarBatch

/*
* This GpuWindowInPandasExec aims at accelerating the data transfer between
* JVM and Python, and scheduling GPU resources for Python processes
*/
case class GpuWindowInPandasExec(
windowExpression: Seq[Expression],
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
child: SparkPlan) extends GpuWindowInPandasExecBase {

override final def pythonModuleKey: String = "spark"

// Apache Spark expects input columns before the result columns
override def output: Seq[Attribute] = child.output ++ windowExpression
.map(_.asInstanceOf[NamedExpression].toAttribute)

// Return the join batch directly per Apache Spark's expectation.
override def projectResult(joinedBatch: ColumnarBatch): ColumnarBatch = joinedBatch
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, SortMergeJoinExec}
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
import org.apache.spark.sql.execution.python.WindowInPandasExec
import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, GpuStringReplace, GpuTimeSub, ShuffleManagerShimBase}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase}
import org.apache.spark.sql.rapids.execution.python.GpuWindowInPandasExecMetaBase
import org.apache.spark.sql.rapids.shims.spark300._
import org.apache.spark.sql.types._
import org.apache.spark.storage.{BlockId, BlockManagerId}
Expand Down Expand Up @@ -131,6 +133,20 @@ class Spark300Shims extends SparkShims {

override def getExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = {
Seq(
GpuOverrides.exec[WindowInPandasExec](
"The backend for Window Aggregation Pandas UDF, Accelerates the data transfer between" +
" the Java process and the Python process. It also supports scheduling GPU resources" +
" for the Python process when enabled. For now it only supports row based window frame.",
(winPy, conf, p, r) => new GpuWindowInPandasExecMetaBase(winPy, conf, p, r) {
override def convertToGpu(): GpuExec = {
GpuWindowInPandasExec(
windowExpressions.map(_.convertToGpu()),
partitionSpec.map(_.convertToGpu()),
orderSpec.map(_.convertToGpu().asInstanceOf[SortOrder]),
childPlans.head.convertIfNeeded()
)
}
}).disabledByDefault("it only supports row based frame for now"),
GpuOverrides.exec[FileSourceScanExec](
"Reading data from files, often from Hive tables",
(fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims.spark300db

import com.nvidia.spark.rapids.{GpuBindReferences, GpuBoundReference, GpuProjectExec, GpuWindowExpression}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, NamedExpression, SortOrder}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.rapids.execution.python.GpuWindowInPandasExecBase
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}

/*
* This GpuWindowInPandasExec aims at accelerating the data transfer between
* JVM and Python, and scheduling GPU resources for Python processes
*/
case class GpuWindowInPandasExec(
projectList: Seq[Expression],
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
child: SparkPlan) extends GpuWindowInPandasExecBase {

override final def pythonModuleKey: String = "databricks"

// On Databricks, the projectList contains not only the window expression, but may also contains
// the input attributes. So we need to extract the window expressions from it.
override def windowExpression: Seq[Expression] = projectList.filter { expr =>
expr.find(node => node.isInstanceOf[GpuWindowExpression]).isDefined
}

// On Databricks, the projectList is expected to be the final output, and it is nondeterministic.
// It may contain the input attributes or not, or even part of the input attributes. So
// we need to project the joined batch per this projectList.
// But for the schema, just return it directly.
override def output: Seq[Attribute] = projectList
.map(_.asInstanceOf[NamedExpression].toAttribute)

override def projectResult(joinedBatch: ColumnarBatch): ColumnarBatch = {
// Project the data
withResource(joinedBatch) { joinBatch =>
GpuProjectExec.project(joinBatch, outReferences)
}
}

private val outReferences = {
val references = windowExpression.zipWithIndex.map { case (e, i) =>
// Results of window expressions will be on the right side of child's output
GpuBoundReference(child.output.size + i, e.dataType, e.nullable)
}
val unboundToRefMap = windowExpression.zip(references).toMap
// Bound the project list for GPU
GpuBindReferences.bindGpuReferences(
projectList.map(_.transform(unboundToRefMap)), child.output)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, SortMergeJoinExec}
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
import org.apache.spark.sql.execution.python.WindowInPandasExec
import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, GpuTimeSub}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastMeta, GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase, GpuShuffleMeta}
import org.apache.spark.sql.rapids.execution.python.GpuWindowInPandasExecMetaBase
import org.apache.spark.sql.types._
import org.apache.spark.storage.{BlockId, BlockManagerId}

Expand Down Expand Up @@ -86,6 +88,20 @@ class Spark300dbShims extends Spark300Shims {

override def getExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = {
Seq(
GpuOverrides.exec[WindowInPandasExec](
"The backend for Window Aggregation Pandas UDF, Accelerates the data transfer between" +
" the Java process and the Python process. It also supports scheduling GPU resources" +
" for the Python process when enabled. For now it only supports row based window frame.",
(winPy, conf, p, r) => new GpuWindowInPandasExecMetaBase(winPy, conf, p, r) {
override def convertToGpu(): GpuExec = {
GpuWindowInPandasExec(
windowExpressions.map(_.convertToGpu()),
partitionSpec.map(_.convertToGpu()),
orderSpec.map(_.convertToGpu().asInstanceOf[SortOrder]),
childPlans.head.convertIfNeeded()
)
}
}).disabledByDefault("it only supports row based frame for now"),
GpuOverrides.exec[FileSourceScanExec](
"Reading data from files, often from Hive tables",
(fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2016,12 +2016,6 @@ object GpuOverrides {
" scheduling GPU resources for the Python process when enabled",
(flatCoPy, conf, p, r) => new GpuFlatMapCoGroupsInPandasExecMeta(flatCoPy, conf, p, r))
.disabledByDefault("Performance is not ideal now"),
exec[WindowInPandasExec](
"The backend for Window Aggregation Pandas UDF, Accelerates the data transfer between the" +
" Java process and the Python process. It also supports scheduling GPU resources" +
" for the Python process when enabled. For now it only supports row based window frame.",
(winPy, conf, p, r) => new GpuWindowInPandasExecMeta(winPy, conf, p, r))
.disabledByDefault("it only supports row based frame for now"),
neverReplaceExec[AlterNamespaceSetPropertiesExec]("Namespace metadata operation"),
neverReplaceExec[CreateNamespaceExec]("Namespace metadata operation"),
neverReplaceExec[DescribeNamespaceExec]("Namespace metadata operation"),
Expand Down
Loading

0 comments on commit 9148176

Please sign in to comment.