Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Spark on Kubernetes - basic scheduler backend [WIP] #492

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
a28728a
[SPARK-21513][SQL][FOLLOWUP] Allow UDF to_json support converting Map…
goldmedal Sep 15, 2017
8866174
[SPARK-22018][SQL] Preserve top-level alias metadata when collapsing …
tdas Sep 15, 2017
22b111e
[SPARK-21902][CORE] Print root cause for BlockManager#doPut
caneGuy Sep 15, 2017
4decedf
[SPARK-22002][SQL] Read JDBC table use custom schema support specify …
wangyum Sep 15, 2017
3c6198c
[SPARK-21987][SQL] fix a compatibility issue of sql event logs
cloud-fan Sep 15, 2017
79a4dab
[SPARK-21958][ML] Word2VecModel save: transform data in the cluster
travishegner Sep 15, 2017
c7307ac
[SPARK-15689][SQL] data source v2 read path
cloud-fan Sep 15, 2017
0bad10d
[SPARK-22017] Take minimum of all watermark execs in StreamExecution.
jose-torres Sep 16, 2017
73d9067
[SPARK-21967][CORE] org.apache.spark.unsafe.types.UTF8String#compareT…
original-brownbear Sep 16, 2017
f407302
[SPARK-22032][PYSPARK] Speed up StructType conversion
maver1ck Sep 17, 2017
6adf67d
[SPARK-21985][PYSPARK] PairDeserializer is broken for double-zipped RDDs
aray Sep 17, 2017
6308c65
[SPARK-21953] Show both memory and disk bytes spilled if either is pr…
ash211 Sep 18, 2017
7c72662
[SPARK-22043][PYTHON] Improves error message for show_profiles and du…
HyukjinKwon Sep 18, 2017
1e978b1
[SPARK-21113][CORE] Read ahead input stream to amortize disk IO cost …
Sep 18, 2017
894a756
[SPARK-22047][TEST] ignore HiveExternalCatalogVersionsSuite
cloud-fan Sep 18, 2017
3b049ab
[SPARK-22003][SQL] support array column in vectorized reader with UDF
Sep 18, 2017
c66d64b
[SPARK-14878][SQL] Trim characters string function support
kevinyu98 Sep 18, 2017
94f7e04
[SPARK-22030][CORE] GraphiteSink fails to re-connect to Graphite inst…
Sep 19, 2017
10f45b3
[SPARK-22047][FLAKY TEST] HiveExternalCatalogVersionsSuite
cloud-fan Sep 19, 2017
a11db94
[SPARK-21923][CORE] Avoid calling reserveUnrollMemoryForThisTask for …
ConeyLiu Sep 19, 2017
7c92351
[MINOR][CORE] Cleanup dead code and duplication in Mem. Management
original-brownbear Sep 19, 2017
1bc17a6
[SPARK-22052] Incorrect Metric assigned in MetricsReporter.scala
Taaffy Sep 19, 2017
581200a
[SPARK-21428][SQL][FOLLOWUP] CliSessionState should point to the actu…
yaooqinn Sep 19, 2017
8319432
[SPARK-21917][CORE][YARN] Supporting adding http(s) resources in yarn…
jerryshao Sep 19, 2017
2f96242
[MINOR][ML] Remove unnecessary default value setting for evaluators.
yanboliang Sep 19, 2017
d5aefa8
[SPARK-21338][SQL] implement isCascadingTruncateTable() method in Agg…
huaxingao Sep 19, 2017
ee13f3e
[SPARK-21969][SQL] CommandUtils.updateTableStats should call refreshT…
Sep 19, 2017
718bbc9
[SPARK-22067][SQL] ArrowWriter should use position when setting UTF8S…
BryanCutler Sep 20, 2017
c6ff59a
[SPARK-18838][CORE] Add separate listener queues to LiveListenerBus.
Sep 20, 2017
280ff52
[SPARK-21977] SinglePartition optimizations break certain Streaming S…
brkyvz Sep 20, 2017
964aef5
Spark on Kubernetes - basic scheduler backend
foxish Sep 15, 2017
5a5ce25
Fix unit tests
foxish Sep 18, 2017
c423539
Cleaned up extraneous constants
foxish Sep 18, 2017
12f9c21
Cleaned POMs
foxish Sep 18, 2017
bb7b0fb
Fix pom
foxish Sep 18, 2017
e2e45dc
Clean up deprecated configuration
foxish Sep 20, 2017
9c3d11e
clean up imports
foxish Sep 20, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ NULL
#'
#' @param x Column to compute on. Note the difference in the following methods:
#' \itemize{
#' \item \code{to_json}: it is the column containing the struct or array of the structs.
#' \item \code{to_json}: it is the column containing the struct, array of the structs,
#' the map or array of maps.
#' \item \code{from_json}: it is the column containing the JSON string.
#' }
#' @param ... additional argument(s). In \code{to_json} and \code{from_json}, this contains
Expand Down Expand Up @@ -1700,8 +1701,9 @@ setMethod("to_date",
})

#' @details
#' \code{to_json}: Converts a column containing a \code{structType} or array of \code{structType}
#' into a Column of JSON string. Resolving the Column can fail if an unsupported type is encountered.
#' \code{to_json}: Converts a column containing a \code{structType}, array of \code{structType},
#' a \code{mapType} or array of \code{mapType} into a Column of JSON string.
#' Resolving the Column can fail if an unsupported type is encountered.
#'
#' @rdname column_collection_functions
#' @aliases to_json to_json,Column-method
Expand All @@ -1715,6 +1717,14 @@ setMethod("to_date",
#'
#' # Converts an array of structs into a JSON array
#' df2 <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people")
#' df2 <- mutate(df2, people_json = to_json(df2$people))
#'
#' # Converts a map into a JSON object
#' df2 <- sql("SELECT map('name', 'Bob')) as people")
#' df2 <- mutate(df2, people_json = to_json(df2$people))
#'
#' # Converts an array of maps into a JSON array
#' df2 <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people")
#' df2 <- mutate(df2, people_json = to_json(df2$people))}
#' @note to_json since 2.2.0
setMethod("to_json", signature(x = "Column"),
Expand Down
8 changes: 8 additions & 0 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1491,6 +1491,14 @@ test_that("column functions", {
j <- collect(select(df, alias(to_json(df$people), "json")))
expect_equal(j[order(j$json), ][1], "[{\"name\":\"Bob\"},{\"name\":\"Alice\"}]")

df <- sql("SELECT map('name', 'Bob') as people")
j <- collect(select(df, alias(to_json(df$people), "json")))
expect_equal(j[order(j$json), ][1], "{\"name\":\"Bob\"}")

df <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people")
j <- collect(select(df, alias(to_json(df$people), "json")))
expect_equal(j[order(j$json), ][1], "[{\"name\":\"Bob\"},{\"name\":\"Alice\"}]")

df <- read.json(mapTypeJsonPath)
j <- collect(select(df, alias(to_json(df$info), "json")))
expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
5, 5, 5, 5,
6, 6};

private static boolean isLittleEndian = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
private static final boolean IS_LITTLE_ENDIAN =
ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;

private static final UTF8String COMMA_UTF8 = UTF8String.fromString(",");
public static final UTF8String EMPTY_UTF8 = UTF8String.fromString("");
Expand Down Expand Up @@ -220,7 +221,7 @@ public long getPrefix() {
// After getting the data, we use a mask to mask out data that is not part of the string.
long p;
long mask = 0;
if (isLittleEndian) {
if (IS_LITTLE_ENDIAN) {
if (numBytes >= 8) {
p = Platform.getLong(base, offset);
} else if (numBytes > 4) {
Expand Down Expand Up @@ -510,6 +511,21 @@ public UTF8String trim() {
}
}

/**
* Based on the given trim string, trim this string starting from both ends
* This method searches for each character in the source string, removes the character if it is found
* in the trim string, stops at the first not found. It calls the trimLeft first, then trimRight.
* It returns a new string in which both ends trim characters have been removed.
* @param trimString the trim character string
*/
public UTF8String trim(UTF8String trimString) {
if (trimString != null) {
return trimLeft(trimString).trimRight(trimString);
} else {
return null;
}
}

public UTF8String trimLeft() {
int s = 0;
// skip all of the space (0x20) in the left side
Expand All @@ -522,6 +538,40 @@ public UTF8String trimLeft() {
}
}

/**
* Based on the given trim string, trim this string starting from left end
* This method searches each character in the source string starting from the left end, removes the character if it
* is in the trim string, stops at the first character which is not in the trim string, returns the new string.
* @param trimString the trim character string
*/
public UTF8String trimLeft(UTF8String trimString) {
if (trimString == null) return null;
// the searching byte position in the source string
int srchIdx = 0;
// the first beginning byte position of a non-matching character
int trimIdx = 0;

while (srchIdx < numBytes) {
UTF8String searchChar = copyUTF8String(srchIdx, srchIdx + numBytesForFirstByte(this.getByte(srchIdx)) - 1);
int searchCharBytes = searchChar.numBytes;
// try to find the matching for the searchChar in the trimString set
if (trimString.find(searchChar, 0) >= 0) {
trimIdx += searchCharBytes;
} else {
// no matching, exit the search
break;
}
srchIdx += searchCharBytes;
}

if (trimIdx >= numBytes) {
// empty string
return EMPTY_UTF8;
} else {
return copyUTF8String(trimIdx, numBytes - 1);
}
}

public UTF8String trimRight() {
int e = numBytes - 1;
// skip all of the space (0x20) in the right side
Expand All @@ -535,6 +585,50 @@ public UTF8String trimRight() {
}
}

/**
* Based on the given trim string, trim this string starting from right end
* This method searches each character in the source string starting from the right end, removes the character if it
* is in the trim string, stops at the first character which is not in the trim string, returns the new string.
* @param trimString the trim character string
*/
public UTF8String trimRight(UTF8String trimString) {
if (trimString == null) return null;
int charIdx = 0;
// number of characters from the source string
int numChars = 0;
// array of character length for the source string
int[] stringCharLen = new int[numBytes];
// array of the first byte position for each character in the source string
int[] stringCharPos = new int[numBytes];
// build the position and length array
while (charIdx < numBytes) {
stringCharPos[numChars] = charIdx;
stringCharLen[numChars] = numBytesForFirstByte(getByte(charIdx));
charIdx += stringCharLen[numChars];
numChars ++;
}

// index trimEnd points to the first no matching byte position from the right side of the source string.
int trimEnd = numBytes - 1;
while (numChars > 0) {
UTF8String searchChar =
copyUTF8String(stringCharPos[numChars - 1], stringCharPos[numChars - 1] + stringCharLen[numChars - 1] - 1);
if (trimString.find(searchChar, 0) >= 0) {
trimEnd -= stringCharLen[numChars - 1];
} else {
break;
}
numChars --;
}

if (trimEnd < 0) {
// empty string
return EMPTY_UTF8;
} else {
return copyUTF8String(0, trimEnd);
}
}

public UTF8String reverse() {
byte[] result = new byte[this.numBytes];

Expand Down Expand Up @@ -1097,10 +1191,23 @@ public UTF8String copy() {
@Override
public int compareTo(@Nonnull final UTF8String other) {
int len = Math.min(numBytes, other.numBytes);
// TODO: compare 8 bytes as unsigned long
for (int i = 0; i < len; i ++) {
int wordMax = (len / 8) * 8;
long roffset = other.offset;
Object rbase = other.base;
for (int i = 0; i < wordMax; i += 8) {
long left = getLong(base, offset + i);
long right = getLong(rbase, roffset + i);
if (left != right) {
if (IS_LITTLE_ENDIAN) {
return Long.compareUnsigned(Long.reverseBytes(left), Long.reverseBytes(right));
} else {
return Long.compareUnsigned(left, right);
}
}
}
for (int i = wordMax; i < len; i++) {
// In UTF-8, the byte should be unsigned, so we should compare them as unsigned int.
int res = (getByte(i) & 0xFF) - (other.getByte(i) & 0xFF);
int res = (getByte(i) & 0xFF) - (Platform.getByte(rbase, roffset + i) & 0xFF);
if (res != 0) {
return res;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -730,4 +730,61 @@ public void testToLong() throws IOException {
assertFalse(negativeInput, UTF8String.fromString(negativeInput).toLong(wrapper));
}
}

@Test
public void trimBothWithTrimString() {
assertEquals(fromString("hello"), fromString(" hello ").trim(fromString(" ")));
assertEquals(fromString("o"), fromString(" hello ").trim(fromString(" hle")));
assertEquals(fromString("h e"), fromString("ooh e ooo").trim(fromString("o ")));
assertEquals(fromString(""), fromString("ooo...oooo").trim(fromString("o.")));
assertEquals(fromString("b"), fromString("%^b[]@").trim(fromString("][@^%")));

assertEquals(EMPTY_UTF8, fromString(" ").trim(fromString(" ")));

assertEquals(fromString("数据砖头"), fromString(" 数据砖头 ").trim());
assertEquals(fromString("数"), fromString("a数b").trim(fromString("ab")));
assertEquals(fromString(""), fromString("a").trim(fromString("a数b")));
assertEquals(fromString(""), fromString("数数 数数数").trim(fromString("数 ")));
assertEquals(fromString("据砖头"), fromString("数]数[数据砖头#数数").trim(fromString("[数]#")));
assertEquals(fromString("据砖头数数 "), fromString("数数数据砖头数数 ").trim(fromString("数")));
}

@Test
public void trimLeftWithTrimString() {
assertEquals(fromString(" hello "), fromString(" hello ").trimLeft(fromString("")));
assertEquals(fromString(""), fromString("a").trimLeft(fromString("a")));
assertEquals(fromString("b"), fromString("b").trimLeft(fromString("a")));
assertEquals(fromString("ba"), fromString("ba").trimLeft(fromString("a")));
assertEquals(fromString(""), fromString("aaaaaaa").trimLeft(fromString("a")));
assertEquals(fromString("trim"), fromString("oabtrim").trimLeft(fromString("bao")));
assertEquals(fromString("rim "), fromString("ooootrim ").trimLeft(fromString("otm")));

assertEquals(EMPTY_UTF8, fromString(" ").trimLeft(fromString(" ")));

assertEquals(fromString("数据砖头 "), fromString(" 数据砖头 ").trimLeft(fromString(" ")));
assertEquals(fromString("数"), fromString("数").trimLeft(fromString("a")));
assertEquals(fromString("a"), fromString("a").trimLeft(fromString("数")));
assertEquals(fromString("砖头数数"), fromString("数数数据砖头数数").trimLeft(fromString("据数")));
assertEquals(fromString("据砖头数数"), fromString(" 数数数据砖头数数").trimLeft(fromString("数 ")));
assertEquals(fromString("据砖头数数"), fromString("aa数数数据砖头数数").trimLeft(fromString("a数砖")));
assertEquals(fromString("$S,.$BR"), fromString(",,,,%$S,.$BR").trimLeft(fromString("%,")));
}

@Test
public void trimRightWithTrimString() {
assertEquals(fromString(" hello "), fromString(" hello ").trimRight(fromString("")));
assertEquals(fromString(""), fromString("a").trimRight(fromString("a")));
assertEquals(fromString("cc"), fromString("ccbaaaa").trimRight(fromString("ba")));
assertEquals(fromString(""), fromString("aabbbbaaa").trimRight(fromString("ab")));
assertEquals(fromString(" he"), fromString(" hello ").trimRight(fromString(" ol")));
assertEquals(fromString("oohell"), fromString("oohellooo../*&").trimRight(fromString("./,&%*o")));

assertEquals(EMPTY_UTF8, fromString(" ").trimRight(fromString(" ")));

assertEquals(fromString(" 数据砖头"), fromString(" 数据砖头 ").trimRight(fromString(" ")));
assertEquals(fromString("数数砖头"), fromString("数数砖头数aa数").trimRight(fromString("a数")));
assertEquals(fromString(""), fromString("数数数据砖ab").trimRight(fromString("数据砖ab")));
assertEquals(fromString("头"), fromString("头a???/").trimRight(fromString("数?/*&^%a")));
assertEquals(fromString("头"), fromString("头数b数数 [").trimRight(fromString(" []数b")));
}
}
Loading