Skip to content

Commit

Permalink
WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
jnturton authored and cgivre committed Jan 3, 2024
1 parent c49a872 commit 25c9069
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.vector.complex.writer.BaseWriter;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;

@SuppressWarnings("unused")
public class JsonConvertFrom {

private JsonConvertFrom() {}
Expand All @@ -55,38 +55,39 @@ public static class ConvertFromJsonNullableInput implements DrillSimpleFunc {
ResultSetLoader rsLoader;

@Workspace
org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<java.io.InputStream> stream;
org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<java.io.InputStream> streamIter;

@Workspace
org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader;

@Override
public void setup() {
streamIter = new org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<>();
rsLoader.startBatch();
}

@Override
public void eval() {
if (in.end == 0) {
// Return empty map
// If the input is null or empty, return an empty map
if (in.isSet == 0 || in.start == in.end) {
return;
}

java.io.InputStream inputStream = org.apache.drill.exec.vector.complex.fn.DrillBufInputStream.getStream(in.start, in.end, in.buffer);

try {
stream.setValue(inputStream);
streamIter.setValue(inputStream);

if (jsonLoader == null) {
jsonLoader = org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.createJsonLoader(rsLoader, options, stream);
jsonLoader = org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.createJsonLoader(rsLoader, options, streamIter);
}

org.apache.drill.exec.physical.resultSet.RowSetLoader rowWriter = rsLoader.writer();
rowWriter.start();
if (jsonLoader.parser().next()) {
rowWriter.save();
}
inputStream.close();
//inputStream.close();

} catch (Exception e) {
throw org.apache.drill.common.exceptions.UserException.dataReadError(e)
Expand All @@ -108,7 +109,7 @@ public static class ConvertFromJsonVarcharInput implements DrillSimpleFunc {
ComplexWriter writer;

@Workspace
org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<java.io.InputStream> stream;
org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<java.io.InputStream> streamIter;

@Inject
OptionManager options;
Expand All @@ -119,25 +120,25 @@ public static class ConvertFromJsonVarcharInput implements DrillSimpleFunc {
@Workspace
org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader;


@Override
public void setup() {
streamIter = new org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<>();
rsLoader.startBatch();
}

@Override
public void eval() {
String jsonString = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(in.start, in.end, in.buffer);

// If the input is null or empty, return an empty map
if (jsonString.length() == 0) {
if (in.isSet == 0 || in.start == in.end) {
return;
}

java.io.InputStream inputStream = org.apache.drill.exec.vector.complex.fn.DrillBufInputStream.getStream(in.start, in.end, in.buffer);

try {
stream.setValue(org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.convertStringToInputStream(jsonString));
streamIter.setValue(inputStream);
if (jsonLoader == null) {
jsonLoader = org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.createJsonLoader(rsLoader, options, stream);
jsonLoader = org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.createJsonLoader(rsLoader, options, streamIter);
}
org.apache.drill.exec.physical.resultSet.RowSetLoader rowWriter = rsLoader.writer();
rowWriter.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package org.apache.drill.exec.expr.fn.impl.conv;


import org.apache.commons.io.IOUtils;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
Expand All @@ -29,14 +27,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;

public class JsonConverterUtils {

private static final Logger logger = LoggerFactory.getLogger(JsonConverterUtils.class);

/*
public static InputStream convertStringToInputStream(String input) {
try (InputStream stream = IOUtils.toInputStream(input, Charset.defaultCharset())) {
return stream;
Expand All @@ -46,6 +43,7 @@ public static InputStream convertStringToInputStream(String input) {
.build(logger);
}
}
*/

/**
* Creates a {@link JsonLoaderImpl} for use in JSON conversion UDFs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,20 @@ public static void setup() throws Exception {
}

@Test
public void testConvertFromJsonFunctionWithBinaryInput() throws Exception {
public void testConvertFromJsonVarBinary() throws Exception {
client.alterSession(ExecConstants.JSON_READER_NAN_INF_NUMBERS, true);
String sql = "SELECT string_binary(convert_toJSON(convert_fromJSON(columns[1]))) as col FROM cp.`jsoninput/nan_test.csv`";
RowSet results = client.queryBuilder().sql(sql).rowSet();
assertEquals("Query result must contain 1 row", 1, results.rowCount());

results.print();
results.clear();
}

@Test
public void testConvertFromJSONWithStringInput() throws Exception {
public void testConvertFromJsonVarChar() throws Exception {
// String sql = "SELECT *, convert_FromJSON('{\"foo\":\"bar\"}') FROM cp.`jsoninput/allTypes.csv`";
String sql = "SELECT convert_FromJSON('{\"foo\":\"bar\"}') FROM (VALUES(1))";
RowSet results = client.queryBuilder().sql(sql).rowSet();
results.print();
results.clear();
}

/*
Expand Down

0 comments on commit 25c9069

Please sign in to comment.