Skip to content

Commit

Permalink
[SPARK-21266][R][PYTHON] Support schema a DDL-formatted string in dap…
Browse files Browse the repository at this point in the history
…ply/gapply/from_json

## What changes were proposed in this pull request?

This PR supports schema in a DDL formatted string for `from_json` in R/Python and `dapply` and `gapply` in R, which are commonly used and/or consistent with Scala APIs.

Additionally, this PR exposes `structType` in R to allow working around in other possible corner cases.

**Python**

`from_json`

```python
from pyspark.sql.functions import from_json

data = [(1, '''{"a": 1}''')]
df = spark.createDataFrame(data, ("key", "value"))
df.select(from_json(df.value, "a INT").alias("json")).show()
```

**R**

`from_json`

```R
df <- sql("SELECT named_struct('name', 'Bob') as people")
df <- mutate(df, people_json = to_json(df$people))
head(select(df, from_json(df$people_json, "name STRING")))
```

`structType.character`

```R
structType("a STRING, b INT")
```

`dapply`

```R
dapply(createDataFrame(list(list(1.0)), "a"), function(x) {x}, "a DOUBLE")
```

`gapply`

```R
gapply(createDataFrame(list(list(1.0)), "a"), "a", function(key, x) { x }, "a DOUBLE")
```

## How was this patch tested?

Doc tests for `from_json` in Python and unit tests `test_sparkSQL.R` in R.

Author: hyukjinkwon <[email protected]>

Closes apache#18498 from HyukjinKwon/SPARK-21266.
  • Loading branch information
HyukjinKwon authored and Felix Cheung committed Jul 10, 2017
1 parent 18b3b00 commit 2bfd5ac
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 76 deletions.
2 changes: 2 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ export("structField",
"structField.character",
"print.structField",
"structType",
"structType.character",
"structType.jobj",
"structType.structField",
"print.structType")
Expand Down Expand Up @@ -465,5 +466,6 @@ S3method(print, summary.GBTRegressionModel)
S3method(print, summary.GBTClassificationModel)
S3method(structField, character)
S3method(structField, jobj)
S3method(structType, character)
S3method(structType, jobj)
S3method(structType, structField)
36 changes: 32 additions & 4 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,10 @@ setMethod("summarize",
})

dapplyInternal <- function(x, func, schema) {
if (is.character(schema)) {
schema <- structType(schema)
}

packageNamesArr <- serialize(.sparkREnv[[".packages"]],
connection = NULL)

Expand All @@ -1408,6 +1412,8 @@ dapplyInternal <- function(x, func, schema) {
dataFrame(sdf)
}

setClassUnion("characterOrstructType", c("character", "structType"))

#' dapply
#'
#' Apply a function to each partition of a SparkDataFrame.
Expand All @@ -1418,10 +1424,11 @@ dapplyInternal <- function(x, func, schema) {
#' to each partition will be passed.
#' The output of func should be a R data.frame.
#' @param schema The schema of the resulting SparkDataFrame after the function is applied.
#' It must match the output of func.
#' It must match the output of func. Since Spark 2.3, the DDL-formatted string
#' is also supported for the schema.
#' @family SparkDataFrame functions
#' @rdname dapply
#' @aliases dapply,SparkDataFrame,function,structType-method
#' @aliases dapply,SparkDataFrame,function,characterOrstructType-method
#' @name dapply
#' @seealso \link{dapplyCollect}
#' @export
Expand All @@ -1444,6 +1451,17 @@ dapplyInternal <- function(x, func, schema) {
#' y <- cbind(y, y[1] + 1L)
#' },
#' schema)
#'
#' # The schema also can be specified in a DDL-formatted string.
#' schema <- "a INT, d DOUBLE, c STRING, d INT"
#' df1 <- dapply(
#' df,
#' function(x) {
#' y <- x[x[1] > 1, ]
#' y <- cbind(y, y[1] + 1L)
#' },
#' schema)
#'
#' collect(df1)
#' # the result
#' # a b c d
Expand All @@ -1452,7 +1470,7 @@ dapplyInternal <- function(x, func, schema) {
#' }
#' @note dapply since 2.0.0
setMethod("dapply",
signature(x = "SparkDataFrame", func = "function", schema = "structType"),
signature(x = "SparkDataFrame", func = "function", schema = "characterOrstructType"),
function(x, func, schema) {
dapplyInternal(x, func, schema)
})
Expand Down Expand Up @@ -1522,6 +1540,7 @@ setMethod("dapplyCollect",
#' @param schema the schema of the resulting SparkDataFrame after the function is applied.
#' The schema must match to output of \code{func}. It has to be defined for each
#' output column with preferred output column name and corresponding data type.
#' Since Spark 2.3, the DDL-formatted string is also supported for the schema.
#' @return A SparkDataFrame.
#' @family SparkDataFrame functions
#' @aliases gapply,SparkDataFrame-method
Expand All @@ -1541,7 +1560,7 @@ setMethod("dapplyCollect",
#'
#' Here our output contains three columns, the key which is a combination of two
#' columns with data types integer and string and the mean which is a double.
#' schema <- structType(structField("a", "integer"), structField("c", "string"),
#' schema <- structType(structField("a", "integer"), structField("c", "string"),
#' structField("avg", "double"))
#' result <- gapply(
#' df,
Expand All @@ -1550,6 +1569,15 @@ setMethod("dapplyCollect",
#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
#' }, schema)
#'
#' The schema also can be specified in a DDL-formatted string.
#' schema <- "a INT, c STRING, avg DOUBLE"
#' result <- gapply(
#' df,
#' c("a", "c"),
#' function(key, x) {
#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
#' }, schema)
#'
#' We can also group the data and afterwards call gapply on GroupedData.
#' For Example:
#' gdf <- group_by(df, "a", "c")
Expand Down
12 changes: 9 additions & 3 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -2174,8 +2174,9 @@ setMethod("date_format", signature(y = "Column", x = "character"),
#'
#' @rdname column_collection_functions
#' @param schema a structType object to use as the schema to use when parsing the JSON string.
#' Since Spark 2.3, the DDL-formatted string is also supported for the schema.
#' @param as.json.array indicating if input string is JSON array of objects or a single object.
#' @aliases from_json from_json,Column,structType-method
#' @aliases from_json from_json,Column,characterOrstructType-method
#' @export
#' @examples
#'
Expand All @@ -2188,10 +2189,15 @@ setMethod("date_format", signature(y = "Column", x = "character"),
#' df2 <- sql("SELECT named_struct('name', 'Bob') as people")
#' df2 <- mutate(df2, people_json = to_json(df2$people))
#' schema <- structType(structField("name", "string"))
#' head(select(df2, from_json(df2$people_json, schema)))}
#' head(select(df2, from_json(df2$people_json, schema)))
#' head(select(df2, from_json(df2$people_json, "name STRING")))}
#' @note from_json since 2.2.0
setMethod("from_json", signature(x = "Column", schema = "structType"),
setMethod("from_json", signature(x = "Column", schema = "characterOrstructType"),
function(x, schema, as.json.array = FALSE, ...) {
if (is.character(schema)) {
schema <- structType(schema)
}

if (as.json.array) {
jschema <- callJStatic("org.apache.spark.sql.types.DataTypes",
"createArrayType",
Expand Down
3 changes: 3 additions & 0 deletions R/pkg/R/group.R
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ setMethod("gapplyCollect",
})

gapplyInternal <- function(x, func, schema) {
if (is.character(schema)) {
schema <- structType(schema)
}
packageNamesArr <- serialize(.sparkREnv[[".packages"]],
connection = NULL)
broadcastArr <- lapply(ls(.broadcastNames),
Expand Down
29 changes: 26 additions & 3 deletions R/pkg/R/schema.R
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,24 @@
#' Create a structType object that contains the metadata for a SparkDataFrame. Intended for
#' use with createDataFrame and toDF.
#'
#' @param x a structField object (created with the field() function)
#' @param x a structField object (created with the \code{structField} method). Since Spark 2.3,
#' this can be a DDL-formatted string, which is a comma separated list of field
#' definitions, e.g., "a INT, b STRING".
#' @param ... additional structField objects
#' @return a structType object
#' @rdname structType
#' @export
#' @examples
#'\dontrun{
#' schema <- structType(structField("a", "integer"), structField("c", "string"),
#' schema <- structType(structField("a", "integer"), structField("c", "string"),
#' structField("avg", "double"))
#' df1 <- gapply(df, list("a", "c"),
#' function(key, x) { y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) },
#' schema)
#' schema <- structType("a INT, c STRING, avg DOUBLE")
#' df1 <- gapply(df, list("a", "c"),
#' function(key, x) { y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) },
#' schema)
#' }
#' @note structType since 1.4.0
structType <- function(x, ...) {
Expand Down Expand Up @@ -68,6 +74,23 @@ structType.structField <- function(x, ...) {
structType(stObj)
}

#' @rdname structType
#' @method structType character
#' @export
structType.character <- function(x, ...) {
if (!is.character(x)) {
stop("schema must be a DDL-formatted string.")
}
if (length(list(...)) > 0) {
stop("multiple DDL-formatted strings are not supported")
}

stObj <- handledCallJStatic("org.apache.spark.sql.types.StructType",
"fromDDL",
x)
structType(stObj)
}

#' Print a Spark StructType.
#'
#' This function prints the contents of a StructType returned from the
Expand Down Expand Up @@ -102,7 +125,7 @@ print.structType <- function(x, ...) {
#' field1 <- structField("a", "integer")
#' field2 <- structField("c", "string")
#' field3 <- structField("avg", "double")
#' schema <- structType(field1, field2, field3)
#' schema <- structType(field1, field2, field3)
#' df1 <- gapply(df, list("a", "c"),
#' function(key, x) { y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) },
#' schema)
Expand Down
136 changes: 76 additions & 60 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,13 @@ test_that("structType and structField", {
expect_is(testSchema, "structType")
expect_is(testSchema$fields()[[2]], "structField")
expect_equal(testSchema$fields()[[1]]$dataType.toString(), "StringType")

testSchema <- structType("a STRING, b INT")
expect_is(testSchema, "structType")
expect_is(testSchema$fields()[[2]], "structField")
expect_equal(testSchema$fields()[[1]]$dataType.toString(), "StringType")

expect_error(structType("A stri"), "DataType stri is not supported.")
})

test_that("structField type strings", {
Expand Down Expand Up @@ -1480,13 +1487,15 @@ test_that("column functions", {
j <- collect(select(df, alias(to_json(df$info), "json")))
expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}")
df <- as.DataFrame(j)
schema <- structType(structField("age", "integer"),
structField("height", "double"))
s <- collect(select(df, alias(from_json(df$json, schema), "structcol")))
expect_equal(ncol(s), 1)
expect_equal(nrow(s), 3)
expect_is(s[[1]][[1]], "struct")
expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 } )))
schemas <- list(structType(structField("age", "integer"), structField("height", "double")),
"age INT, height DOUBLE")
for (schema in schemas) {
s <- collect(select(df, alias(from_json(df$json, schema), "structcol")))
expect_equal(ncol(s), 1)
expect_equal(nrow(s), 3)
expect_is(s[[1]][[1]], "struct")
expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 } )))
}

# passing option
df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}")))
Expand All @@ -1504,14 +1513,15 @@ test_that("column functions", {
# check if array type in string is correctly supported.
jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]"
df <- as.DataFrame(list(list("people" = jsonArr)))
schema <- structType(structField("name", "string"))
arr <- collect(select(df, alias(from_json(df$people, schema, as.json.array = TRUE), "arrcol")))
expect_equal(ncol(arr), 1)
expect_equal(nrow(arr), 1)
expect_is(arr[[1]][[1]], "list")
expect_equal(length(arr$arrcol[[1]]), 2)
expect_equal(arr$arrcol[[1]][[1]]$name, "Bob")
expect_equal(arr$arrcol[[1]][[2]]$name, "Alice")
for (schema in list(structType(structField("name", "string")), "name STRING")) {
arr <- collect(select(df, alias(from_json(df$people, schema, as.json.array = TRUE), "arrcol")))
expect_equal(ncol(arr), 1)
expect_equal(nrow(arr), 1)
expect_is(arr[[1]][[1]], "list")
expect_equal(length(arr$arrcol[[1]]), 2)
expect_equal(arr$arrcol[[1]][[1]]$name, "Bob")
expect_equal(arr$arrcol[[1]][[2]]$name, "Alice")
}

# Test create_array() and create_map()
df <- as.DataFrame(data.frame(
Expand Down Expand Up @@ -2885,30 +2895,33 @@ test_that("dapply() and dapplyCollect() on a DataFrame", {
expect_identical(ldf, result)

# Filter and add a column
schema <- structType(structField("a", "integer"), structField("b", "double"),
structField("c", "string"), structField("d", "integer"))
df1 <- dapply(
df,
function(x) {
y <- x[x$a > 1, ]
y <- cbind(y, y$a + 1L)
},
schema)
result <- collect(df1)
expected <- ldf[ldf$a > 1, ]
expected$d <- expected$a + 1L
rownames(expected) <- NULL
expect_identical(expected, result)

result <- dapplyCollect(
df,
function(x) {
y <- x[x$a > 1, ]
y <- cbind(y, y$a + 1L)
})
expected1 <- expected
names(expected1) <- names(result)
expect_identical(expected1, result)
schemas <- list(structType(structField("a", "integer"), structField("b", "double"),
structField("c", "string"), structField("d", "integer")),
"a INT, b DOUBLE, c STRING, d INT")
for (schema in schemas) {
df1 <- dapply(
df,
function(x) {
y <- x[x$a > 1, ]
y <- cbind(y, y$a + 1L)
},
schema)
result <- collect(df1)
expected <- ldf[ldf$a > 1, ]
expected$d <- expected$a + 1L
rownames(expected) <- NULL
expect_identical(expected, result)

result <- dapplyCollect(
df,
function(x) {
y <- x[x$a > 1, ]
y <- cbind(y, y$a + 1L)
})
expected1 <- expected
names(expected1) <- names(result)
expect_identical(expected1, result)
}

# Remove the added column
df2 <- dapply(
Expand Down Expand Up @@ -3020,29 +3033,32 @@ test_that("gapply() and gapplyCollect() on a DataFrame", {

# Computes the sum of second column by grouping on the first and third columns
# and checks if the sum is larger than 2
schema <- structType(structField("a", "integer"), structField("e", "boolean"))
df2 <- gapply(
df,
c(df$"a", df$"c"),
function(key, x) {
y <- data.frame(key[1], sum(x$b) > 2)
},
schema)
actual <- collect(df2)$e
expected <- c(TRUE, TRUE)
expect_identical(actual, expected)

df2Collect <- gapplyCollect(
df,
c(df$"a", df$"c"),
function(key, x) {
y <- data.frame(key[1], sum(x$b) > 2)
colnames(y) <- c("a", "e")
y
})
actual <- df2Collect$e
schemas <- list(structType(structField("a", "integer"), structField("e", "boolean")),
"a INT, e BOOLEAN")
for (schema in schemas) {
df2 <- gapply(
df,
c(df$"a", df$"c"),
function(key, x) {
y <- data.frame(key[1], sum(x$b) > 2)
},
schema)
actual <- collect(df2)$e
expected <- c(TRUE, TRUE)
expect_identical(actual, expected)

df2Collect <- gapplyCollect(
df,
c(df$"a", df$"c"),
function(key, x) {
y <- data.frame(key[1], sum(x$b) > 2)
colnames(y) <- c("a", "e")
y
})
actual <- df2Collect$e
expect_identical(actual, expected)
}

# Computes the arithmetic mean of the second column by grouping
# on the first and third columns. Output the groupping value and the average.
schema <- structType(structField("a", "integer"), structField("c", "string"),
Expand Down
Loading

0 comments on commit 2bfd5ac

Please sign in to comment.