Skip to content

Commit

Permalink
Merge pull request #711 from data-integrations/cherrypick-710
Browse files Browse the repository at this point in the history
[🍒] Use Kryo for seraliazation
  • Loading branch information
samdgupi authored May 1, 2024
2 parents 2ded7ed + 3d839d4 commit fd35a93
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 3 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
<aws.sdk.version>1.11.133</aws.sdk.version>
<bigquery.connector.hadoop2.version>0.10.2-hadoop2</bigquery.connector.hadoop2.version>
<bouncycastle.version>1.56</bouncycastle.version>
<cdap.version>6.10.0</cdap.version>
<cdap.version>6.10.1-SNAPSHOT</cdap.version>
<chlorine.version>1.1.5</chlorine.version>
<commons.validator.version>1.6</commons.validator.version>
<commons-io.version>2.5</commons-io.version>
Expand Down
5 changes: 5 additions & 0 deletions wrangler-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@
<artifactId>guava-retrying</artifactId>
<version>${guava.retrying.version}</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>4.0.2</version>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package io.cdap.wrangler.utils;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonNull;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import io.cdap.wrangler.api.Row;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;

/**
* A helper class with allows Serialization and Deserialization using Kryo
* We should register all schema classes present in {@link SchemaConverter}
**/
public class RowSerializer {

private final Kryo kryo;
private static final Gson GSON = new Gson();

public RowSerializer() {
kryo = new Kryo();
// Register all classes from SchemaConverter
kryo.register(Row.class);
kryo.register(ArrayList.class);
kryo.register(LocalDate.class);
kryo.register(LocalTime.class);
kryo.register(ZonedDateTime.class);
kryo.register(Map.class);
kryo.register(JsonNull.class);
// JsonPrimitive does not have no-arg constructor hence we need a
// custom serializer
kryo.register(JsonPrimitive.class, new JsonSerializer());
kryo.register(JsonArray.class);
kryo.register(JsonObject.class);
// Support deprecated util.date classes
kryo.register(Date.class);
kryo.register(java.sql.Date.class);
kryo.register(Time.class);
kryo.register(Timestamp.class);
}

public byte[] fromRows(List<Row> rows) {
Output output = new Output(1024, -1);
kryo.writeClassAndObject(output, rows);
return output.getBuffer();
}

public List<Row> toRows(byte[] bytes) {
Input input = new Input(bytes);
List<Row> result = (List<Row>) kryo.readClassAndObject(input);
return result;
}

static class JsonSerializer extends Serializer<JsonElement> {

@Override
public void write(Kryo kryo, Output output, JsonElement object) {
output.writeString(GSON.toJson(object));
}

@Override
public JsonElement read(Kryo kryo, Input input, Class<JsonElement> type) {
return GSON.fromJson(input.readString(), type);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public Schema getSchema(Object value, String name) throws RecordConvertorExcepti
* @param name name of the field
* @param recordPrefix prefix to append at the beginning of a custom record
* @return the schema of this object
* NOTE: ANY NEWLY SUPPORTED DATATYPE SHOULD ALSO BE REGISTERED IN {@link RowSerializer}
*/
@Nullable
public Schema getSchema(Object value, String name, @Nullable String recordPrefix) throws RecordConvertorException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,5 @@ public final class JsonTestData {
+ " }"
+ " }";
public static final String EMPTY_OBJECT = "{ \"dividesplitdetails\":{\"type0\":[]}}";
public static final String NULL_OBJECT = "{ \"dividesplitdetails\":{\"type0\":null, \"type1\":0}}";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package io.cdap.wrangler.utils;

import com.google.common.collect.Lists;
import com.google.gson.JsonParser;
import io.cdap.wrangler.TestingRig;
import io.cdap.wrangler.api.RecipePipeline;
import io.cdap.wrangler.api.Row;
import org.junit.Assert;
import org.junit.Test;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class RowSerializerTest {

private static final String[] TESTS = new String[]{
JsonTestData.BASIC,
JsonTestData.SIMPLE_JSON_OBJECT,
JsonTestData.ARRAY_OF_OBJECTS,
JsonTestData.JSON_ARRAY_WITH_OBJECT,
JsonTestData.COMPLEX_1,
JsonTestData.ARRAY_OF_NUMBERS,
JsonTestData.ARRAY_OF_STRING,
JsonTestData.COMPLEX_2,
JsonTestData.EMPTY_OBJECT,
JsonTestData.NULL_OBJECT,
JsonTestData.FB_JSON
};

private static final String[] directives = new String[]{
"set-column body json:Parse(body)"
};

@Test
public void testJsonTypes() throws Exception {
SchemaConverter converter = new SchemaConverter();
RecordConvertor recordConvertor = new RecordConvertor();
JsonParser parser = new JsonParser();
RecipePipeline executor = TestingRig.execute(directives);
for (String test : TESTS) {
Row row = new Row("body", test);

List<Row> expectedRows = executor.execute(Lists.newArrayList(row));
byte[] serializedRows = new RowSerializer().fromRows(expectedRows);
List<Row> gotRows = new RowSerializer().toRows(serializedRows);
Assert.assertArrayEquals(expectedRows.toArray(), gotRows.toArray());
}
}

@Test
public void testLogicalTypes() throws Exception {
Row testRow = new Row();
testRow.add("id", 1);
testRow.add("name", "abc");
testRow.add("date", LocalDate.of(2018, 11, 11));
testRow.add("time", LocalTime.of(11, 11, 11));
testRow.add("timestamp", ZonedDateTime.of(2018, 11, 11, 11, 11, 11, 0, ZoneId.of("UTC")));
testRow.add("bigdecimal", new BigDecimal(new BigInteger("123456"), 5));
testRow.add("datetime", LocalDateTime.now());
List<Row> expectedRows = Collections.singletonList(testRow);
byte[] serializedRows = new RowSerializer().fromRows(expectedRows);
List<Row> gotRows = new RowSerializer().toRows(serializedRows);
Assert.assertArrayEquals(expectedRows.toArray(), gotRows.toArray());
}

@Test
public void testCollectionTypes() throws Exception {
List<Integer> list = new ArrayList<>();
list.add(null);
list.add(1);
list.add(2);
Set<Integer> set = new HashSet<>();
set.add(null);
set.add(1);
set.add(2);
Map<String, Integer> map = new HashMap<>();
map.put("null", null);
map.put("1", 1);
map.put("2", 2);

Row testRow = new Row();
testRow.add("list", list);
testRow.add("set", set);
testRow.add("map", map);

List<Row> expectedRows = Collections.singletonList(testRow);
byte[] serializedRows = new RowSerializer().fromRows(expectedRows);
List<Row> gotRows = new RowSerializer().toRows(serializedRows);
Assert.assertArrayEquals(expectedRows.toArray(), gotRows.toArray());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.cdap.cdap.api.service.worker.RunnableTask;
import io.cdap.cdap.api.service.worker.RunnableTaskContext;
import io.cdap.cdap.api.service.worker.SystemAppTaskContext;
import io.cdap.cdap.features.Feature;
import io.cdap.directives.aggregates.DefaultTransientStore;
import io.cdap.wrangler.api.Arguments;
import io.cdap.wrangler.api.CompileException;
Expand All @@ -44,6 +45,7 @@
import io.cdap.wrangler.registry.UserDirectiveRegistry;
import io.cdap.wrangler.utils.ObjectSerDe;

import io.cdap.wrangler.utils.RowSerializer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -57,6 +59,7 @@ public class RemoteExecutionTask implements RunnableTask {

private static final Gson GSON = new Gson();


@Override
public void run(RunnableTaskContext runnableTaskContext) throws Exception {
RemoteDirectiveRequest directiveRequest = GSON.fromJson(runnableTaskContext.getParam(),
Expand Down Expand Up @@ -121,7 +124,12 @@ public void run(RunnableTaskContext runnableTaskContext) throws Exception {
}

runnableTaskContext.setTerminateOnComplete(hasUDD.get() || EL.isUsed());
runnableTaskContext.writeResult(objectSerDe.toByteArray(rows));

if (Feature.WRANGLER_KRYO_SERIALIZATION.isEnabled(systemAppContext)) {
runnableTaskContext.writeResult(new RowSerializer().fromRows(rows));
} else {
runnableTaskContext.writeResult(objectSerDe.toByteArray(rows));
}
} catch (DirectiveParseException | ClassNotFoundException | CompileException e) {
throw new BadRequestException(e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.cdap.cdap.etl.proto.ArtifactSelectorConfig;
import io.cdap.cdap.etl.proto.connection.ConnectorDetail;
import io.cdap.cdap.etl.proto.connection.SampleResponse;
import io.cdap.cdap.features.Feature;
import io.cdap.cdap.internal.io.SchemaTypeAdapter;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.wrangler.PropertyIds;
Expand Down Expand Up @@ -79,6 +80,7 @@
import io.cdap.wrangler.store.workspace.WorkspaceStore;
import io.cdap.wrangler.utils.ObjectSerDe;
import io.cdap.wrangler.utils.RowHelper;
import io.cdap.wrangler.utils.RowSerializer;
import io.cdap.wrangler.utils.SchemaConverter;
import io.cdap.wrangler.utils.StructuredToRowTransformer;
import org.apache.commons.lang3.StringEscapeUtils;
Expand Down Expand Up @@ -624,7 +626,11 @@ private <E extends Exception> List<Row> executeRemotely(String namespace, List<S
.withNamespace(namespace)
.build();
byte[] bytes = getContext().runTask(runnableTaskRequest);
return new ObjectSerDe<List<Row>>().toObject(bytes);
if (Feature.WRANGLER_KRYO_SERIALIZATION.isEnabled(getContext())) {
return new RowSerializer().toRows(bytes);
} else {
return new ObjectSerDe<List<Row>>().toObject(bytes);
}
}

private List<Row> getSample(SampleResponse sampleResponse) {
Expand Down

0 comments on commit fd35a93

Please sign in to comment.