Skip to content

Commit

Permalink
add export option
Browse files Browse the repository at this point in the history
  • Loading branch information
krasinski committed Sep 2, 2024
1 parent 4b83c93 commit d16f706
Show file tree
Hide file tree
Showing 18 changed files with 111 additions and 78 deletions.
2 changes: 1 addition & 1 deletion h2o-core/src/main/java/water/api/FramesHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public FramesV3 export(int version, FramesV3 s) {
if (s.parallel) {
Log.warn("Parallel export to a single file is not supported for parquet format! Export will continue with a parquet-specific setup.");
}
s.job = new JobV3(Frame.exportParquet(fr, s.path, s.force, s.compression, s.write_checksum));
s.job = new JobV3(Frame.exportParquet(fr, s.path, s.force, s.compression, s.write_checksum, s.tz_adjust_from_local));
} else {
Frame.CSVStreamParams csvParms = new Frame.CSVStreamParams()
.setSeparator(s.separator)
Expand Down
2 changes: 1 addition & 1 deletion h2o-core/src/main/java/water/api/ParseHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public ParseV3 parse(int version, ParseV3 parse) {
new ParseWriter.ParseErr[0], parse.chunk_size,
parse.decrypt_tool != null ? parse.decrypt_tool.key() : null, parse.skipped_columns,
parse.custom_non_data_line_markers != null ? parse.custom_non_data_line_markers.getBytes(): null,
parse.escapechar, parse.force_col_types, parse.tz_adjustment);
parse.escapechar, parse.force_col_types, parse.tz_adjust_to_local);

if (parse.source_frames == null)
throw new H2OIllegalArgumentException("Data for Frame '" + parse.destination_frame.name + "' is not available. Please check that the path is valid (for all H2O nodes).'");
Expand Down
3 changes: 3 additions & 0 deletions h2o-core/src/main/java/water/api/schemas3/FramesV3.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class FramesV3 extends RequestSchemaV3<Frames, FramesV3> {
@API(help="Specifies if checksum should be written next to data files on export (if supported by export format).")
public boolean write_checksum = true;

@API(help="Specifies if the timezone should be adjusted from local to UTC timezone (parquet only).")
public boolean tz_adjust_from_local = false;

@API(help="Field separator (default ',')")
public byte separator = Frame.CSVStreamParams.DEFAULT_SEPARATOR;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public class ParseSetupV3 extends RequestSchemaV3<ParseSetup, ParseSetupV3> {
public boolean force_col_types;

@API(help="Adjust the imported time from GMT timezone to cluster timezone.", direction=API.Direction.INPUT)
public boolean tz_adjustment;
public boolean tz_adjust_to_local;

@Override
public ParseSetup fillImpl(ParseSetup impl) {
Expand Down
2 changes: 1 addition & 1 deletion h2o-core/src/main/java/water/api/schemas3/ParseV3.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,5 @@ public class ParseV3 extends RequestSchemaV3<Iced, ParseV3> {
public byte escapechar = ParseSetup.DEFAULT_ESCAPE_CHAR;

@API(help="Adjust the imported time from GMT timezone to cluster timezone.", direction=API.Direction.INPUT)
public boolean tz_adjustment;
public boolean tz_adjust_to_local;
}
4 changes: 2 additions & 2 deletions h2o-core/src/main/java/water/fvec/Frame.java
Original file line number Diff line number Diff line change
Expand Up @@ -1614,7 +1614,7 @@ public static Job export(Frame fr, String path, String frameName, boolean overwr
return job.start(t, fr.anyVec().nChunks());
}

public static Job exportParquet(Frame fr, String path, boolean overwrite, String compression, boolean writeChecksum) {
public static Job exportParquet(Frame fr, String path, boolean overwrite, String compression, boolean writeChecksum, boolean tzAdjustFromLocal) {
// Validate input
if (H2O.getPM().isFileAccessDenied(path)) {
throw new H2OFileAccessDeniedException("File " + path + " access denied");
Expand All @@ -1638,7 +1638,7 @@ public static Job exportParquet(Frame fr, String path, boolean overwrite, String
}
Job job = new Job<>(fr._key, "water.fvec.Frame", "Export dataset");

H2O.H2OCountedCompleter t = parquetExporter.export(fr, path, overwrite, compression, writeChecksum);
H2O.H2OCountedCompleter t = parquetExporter.export(fr, path, overwrite, compression, writeChecksum, tzAdjustFromLocal);
return job.start(t, fr.anyVec().nChunks());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

public interface BinaryFormatExporter {

H2O.H2OCountedCompleter export(Frame frame, String path, boolean force, String compression, boolean writeChecksum);
H2O.H2OCountedCompleter export(Frame frame, String path, boolean force, String compression, boolean writeChecksum, boolean tzAdjustFromLocal);

boolean supports(ExportFileFormat format);
}
48 changes: 24 additions & 24 deletions h2o-core/src/main/java/water/parser/ParseSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class ParseSetup extends Iced {
int[] _parse_columns_indices; // store column indices to be parsed into the final file
byte[] _nonDataLineMarkers;
boolean _force_col_types = false; // at end of parsing, change column type to users specified ones
boolean _tz_adjustment = false;
boolean _tz_adjust_to_local = false;
String[] _orig_column_types; // copy over the original column type setup before translating to byte[]

String[] _synthetic_column_names; // Columns with constant values to be added to parsed Frame
Expand Down Expand Up @@ -74,7 +74,7 @@ public ParseSetup(ParseSetup ps) {
ps._separator, ps._single_quotes, ps._check_header, ps._number_columns,
ps._column_names, ps._column_types, ps._domains, ps._na_strings, ps._data,
new ParseWriter.ParseErr[0], ps._chunk_size, ps._decrypt_tool, ps._skipped_columns,
ps._nonDataLineMarkers, ps._escapechar, ps._tz_adjustment);
ps._nonDataLineMarkers, ps._escapechar, ps._tz_adjust_to_local);
}

public static ParseSetup makeSVMLightSetup(){
Expand All @@ -87,22 +87,22 @@ public static ParseSetup makeSVMLightSetup(){
// when it is called again, it either contains the guess column types or it will have user defined column types
public ParseSetup(ParserInfo parse_type, byte sep, boolean singleQuotes, int checkHeader, int ncols, String[] columnNames,
byte[] ctypes, String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs,
int chunkSize, byte[] nonDataLineMarkers, byte escapeChar, boolean tzAdjustment) {
int chunkSize, byte[] nonDataLineMarkers, byte escapeChar, boolean tzAdjustToLocal) {
this(parse_type, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes, domains, naStrings, data, errs,
chunkSize, null, null, nonDataLineMarkers, escapeChar, tzAdjustment);
chunkSize, null, null, nonDataLineMarkers, escapeChar, tzAdjustToLocal);
}

public ParseSetup(ParserInfo parse_type, byte sep, boolean singleQuotes, int checkHeader, int ncols, String[] columnNames,
byte[] ctypes, String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs,
int chunkSize, Key<DecryptionTool> decrypt_tool, int[] skipped_columns, byte[] nonDataLineMarkers, byte escapeChar, boolean tzAdjustment) {
int chunkSize, Key<DecryptionTool> decrypt_tool, int[] skipped_columns, byte[] nonDataLineMarkers, byte escapeChar, boolean tzAdjustToLocal) {
this(parse_type, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes, domains, naStrings, data, errs,
chunkSize, decrypt_tool, skipped_columns, nonDataLineMarkers, escapeChar, false, tzAdjustment);
chunkSize, decrypt_tool, skipped_columns, nonDataLineMarkers, escapeChar, false, tzAdjustToLocal);
}

public ParseSetup(ParserInfo parse_type, byte sep, boolean singleQuotes, int checkHeader, int ncols, String[] columnNames,
byte[] ctypes, String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs,
int chunkSize, Key<DecryptionTool> decrypt_tool, int[] skipped_columns, byte[] nonDataLineMarkers,
byte escapeChar, boolean force_col_types, boolean tz_adjustment) {
byte escapeChar, boolean force_col_types, boolean tz_adjust_to_local) {
_parse_type = parse_type;
_separator = sep;
_nonDataLineMarkers = nonDataLineMarkers;
Expand All @@ -120,7 +120,7 @@ public ParseSetup(ParserInfo parse_type, byte sep, boolean singleQuotes, int che
_skipped_columns = skipped_columns;
_escapechar = escapeChar;
_force_col_types = force_col_types;
_tz_adjustment = tz_adjustment;
_tz_adjust_to_local = tz_adjust_to_local;
setParseColumnIndices(ncols, _skipped_columns);
}

Expand Down Expand Up @@ -174,7 +174,7 @@ ps.column_names, strToColumnTypes(ps.column_types),
ps.chunk_size,
ps.decrypt_tool != null ? ps.decrypt_tool.key() : null, ps.skipped_columns,
ps.custom_non_data_line_markers != null ? ps.custom_non_data_line_markers.getBytes() : null,
ps.escapechar, ps.force_col_types, ps.tz_adjustment);
ps.escapechar, ps.force_col_types, ps.tz_adjust_to_local);
this._force_col_types = ps.force_col_types;
this._orig_column_types = this._force_col_types ? (ps.column_types == null ? null : ps.column_types.clone()) : null;
}
Expand All @@ -187,9 +187,9 @@ ps.column_names, strToColumnTypes(ps.column_types),
*/
public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader,
int ncols, String[] columnNames, byte[] ctypes,
String[][] domains, String[][] naStrings, String[][] data, byte[] nonDataLineMarkers, byte escapeChar, boolean tzAdjustment) {
String[][] domains, String[][] naStrings, String[][] data, byte[] nonDataLineMarkers, byte escapeChar, boolean tzAdjustToLocal) {
this(parseType, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes,
domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, nonDataLineMarkers, escapeChar, tzAdjustment);
domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, nonDataLineMarkers, escapeChar, tzAdjustToLocal);
}

/**
Expand All @@ -200,23 +200,23 @@ public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int chec
*/
public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader,
int ncols, String[] columnNames, byte[] ctypes,
String[][] domains, String[][] naStrings, String[][] data, byte escapeChar, boolean tzAdjustment) {
String[][] domains, String[][] naStrings, String[][] data, byte escapeChar, boolean tzAdjustToLocal) {
this(parseType, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes,
domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, null, escapeChar, tzAdjustment);
domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, null, escapeChar, tzAdjustToLocal);
}

public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader,
int ncols, String[] columnNames, byte[] ctypes,
String[][] domains, String[][] naStrings, String[][] data, boolean tzAdjustment) {
String[][] domains, String[][] naStrings, String[][] data, boolean tzAdjustToLocal) {
this(parseType, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes,
domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, null, ParseSetup.DEFAULT_ESCAPE_CHAR,tzAdjustment);
domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, null, ParseSetup.DEFAULT_ESCAPE_CHAR,tzAdjustToLocal);
}

public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader,
int ncols, String[] columnNames, byte[] ctypes,
String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs, byte[] nonDataLineMarkers, boolean tzAdjustment) {
String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs, byte[] nonDataLineMarkers, boolean tzAdjustToLocal) {
this(parseType, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes,
domains, naStrings, data, errs, FileVec.DFLT_CHUNK_SIZE, nonDataLineMarkers, ParseSetup.DEFAULT_ESCAPE_CHAR, tzAdjustment);
domains, naStrings, data, errs, FileVec.DFLT_CHUNK_SIZE, nonDataLineMarkers, ParseSetup.DEFAULT_ESCAPE_CHAR, tzAdjustToLocal);
}

public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader,
Expand Down Expand Up @@ -261,8 +261,8 @@ public boolean getForceColTypes() {
return _force_col_types;
}

public boolean getTzAdjustment() {
return _tz_adjustment;
public boolean gettzAdjustToLocal() {
return _tz_adjust_to_local;
}

public byte[] getColumnTypes() { return _column_types; }
Expand Down Expand Up @@ -564,7 +564,7 @@ public GuessSetupTsk(ParseSetup userSetup) {
}
if (_gblSetup==null)
throw new RuntimeException("This H2O node couldn't find the file(s) to parse. Please check files and/or working directories.");
_gblSetup.setTzAdjustment(_userSetup.getTzAdjustment());
_gblSetup.settzAdjustToLocal(_userSetup.gettzAdjustToLocal());
_gblSetup.setFileName(FileUtils.keyToFileName(key));
}

Expand Down Expand Up @@ -594,7 +594,7 @@ public void reduce(GuessSetupTsk other) {
else
_gblSetup._na_strings = _userSetup._na_strings;
}
_gblSetup._tz_adjustment = _gblSetup._tz_adjustment || _userSetup._tz_adjustment;
_gblSetup._tz_adjust_to_local = _gblSetup._tz_adjust_to_local || _userSetup._tz_adjust_to_local;
// if(_gblSetup._errs != null)
for(ParseWriter.ParseErr err:_gblSetup._errs)
Log.warn("ParseSetup: " + err.toString());
Expand All @@ -608,7 +608,7 @@ private ParseSetup mergeSetups(ParseSetup setupA, ParseSetup setupB, String file
}
ParseSetup mergedSetup = setupA;

mergedSetup._tz_adjustment = setupA._tz_adjustment || setupB._tz_adjustment;
mergedSetup._tz_adjust_to_local = setupA._tz_adjust_to_local || setupB._tz_adjust_to_local;
mergedSetup._check_header = unifyCheckHeader(setupA._check_header, setupB._check_header);

mergedSetup._separator = unifyColumnSeparators(setupA._separator, setupB._separator);
Expand Down Expand Up @@ -887,8 +887,8 @@ public ParseSetup setForceColTypes(boolean force_col_types) {
return this;
}

public ParseSetup setTzAdjustment(boolean tz_adjustment) {
this._tz_adjustment = tz_adjustment;
public ParseSetup settzAdjustToLocal(boolean tz_adjust_to_local) {
this._tz_adjust_to_local = tz_adjust_to_local;
return this;
}

Expand Down
2 changes: 1 addition & 1 deletion h2o-core/src/main/java/water/parser/ParserProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public final ParseSetup guessSetup(ByteVec v, byte[] bits, ParseSetup userSetup)
*/
protected ParseSetup guessSetup_impl(ByteVec v, byte[] bits, ParseSetup userSetup) {
ParseSetup ps = guessInitSetup(v, bits, userSetup);
return guessFinalSetup(v, bits, ps).setTzAdjustment(userSetup._tz_adjustment);
return guessFinalSetup(v, bits, ps).settzAdjustToLocal(userSetup._tz_adjust_to_local);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@
import org.joda.time.DateTimeUtils;
import org.joda.time.DateTimeZone;
import water.fvec.Vec;
import water.logging.Logger;
import water.parser.BufferedString;
import water.parser.ParseTime;
import water.parser.parquet.ext.DecimalUtils;
import water.util.StringUtils;

import java.time.Instant;

import static water.parser.parquet.TypeUtils.getTimestampAdjustmentFromUtcToLocalInMillis;

/**
* Implementation of Parquet's GroupConverter for H2O's chunks.
*
Expand Down Expand Up @@ -141,9 +144,7 @@ private PrimitiveConverter newConverter(int colIdx, byte vecType, PrimitiveType
case Vec.T_TIME:
if (OriginalType.TIMESTAMP_MILLIS.equals(parquetType.getOriginalType()) || parquetType.getPrimitiveTypeName().equals(PrimitiveType.PrimitiveTypeName.INT96)) {
if (_adjustTimezone) {
DateTimeZone parsingTimezone = ParseTime.getTimezone();
long currentTimeMillisInParsingTimezone = new DateTime(parsingTimezone).getMillis();
long timestampAdjustmentMillis = parsingTimezone.getOffset(currentTimeMillisInParsingTimezone);
long timestampAdjustmentMillis = getTimestampAdjustmentFromUtcToLocalInMillis();
return new TimestampConverter(colIdx, _writer, timestampAdjustmentMillis);
} else {
return new TimestampConverter(colIdx, _writer, 0L);
Expand Down Expand Up @@ -339,7 +340,7 @@ public void addBinary(Binary value) {
}

private long adjustTimeStamp(long ts) {
return ts - timestampAdjustmentMillis;
return ts + timestampAdjustmentMillis;
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
import static water.fvec.Vec.*;
import static water.parser.parquet.TypeUtils.getTimestampAdjustmentFromUtcToLocalInMillis;

import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.hadoop.fs.Path;
Expand All @@ -30,7 +31,7 @@

public class FrameParquetExporter {

public void export(H2O.H2OCountedCompleter<?> completer, String path, Frame frame, boolean force, String compression, boolean writeChecksum) {
public void export(H2O.H2OCountedCompleter<?> completer, String path, Frame frame, boolean force, String compression, boolean writeChecksum, boolean tzAdjustFromLocal) {
File f = new File(path);
new FrameParquetExporter.PartExportParquetTask(
completer,
Expand All @@ -41,7 +42,8 @@ public void export(H2O.H2OCountedCompleter<?> completer, String path, Frame fram
frame.domains(),
force,
compression,
writeChecksum
writeChecksum,
tzAdjustFromLocal
).dfork(frame);
}

Expand All @@ -54,10 +56,11 @@ private static class PartExportParquetTask extends MRTask<PartExportParquetTask>
final String[][] _domains;
final boolean _force;
final boolean _writeChecksum;
final boolean _tzAdjustFromLocal;

PartExportParquetTask(H2O.H2OCountedCompleter<?> completer, String path, String messageTypeString,
String[] colNames, byte[] colTypes, String[][] domains,
boolean force, String compression, boolean writeChecksum) {
boolean force, String compression, boolean writeChecksum, boolean tzAdjustFromLocal) {
super(completer);
_path = path;
_compressionCodecName = getCompressionCodecName(compression);
Expand All @@ -67,6 +70,7 @@ private static class PartExportParquetTask extends MRTask<PartExportParquetTask>
_domains = domains;
_force = force;
_writeChecksum = writeChecksum;
_tzAdjustFromLocal = tzAdjustFromLocal;
}

CompressionCodecName getCompressionCodecName(String compression) {
Expand Down Expand Up @@ -100,7 +104,7 @@ public void map(Chunk[] cs) {
try (ParquetWriter<Group> writer = buildWriter(new Path(partPath), _compressionCodecName, PersistHdfs.CONF, parseMessageType(_messageTypeString), getMode(_force), _writeChecksum)) {
String currColName;
byte currColType;

long timeStampAdjustment = _tzAdjustFromLocal ? getTimestampAdjustmentFromUtcToLocalInMillis() : 0L;
for (int i = 0; i < anyChunk._len; i++) {
Group group = fact.newGroup();
for (int j = 0; j < cs.length; j++) {
Expand All @@ -109,7 +113,9 @@ public void map(Chunk[] cs) {
switch (currColType) {
case (T_UUID):
case (T_TIME):
group = group.append(currColName, cs[j].at8(i));
long timestamp = cs[j].at8(i);
long adjustedTimestamp = timestamp - timeStampAdjustment;
group = group.append(currColName, adjustedTimestamp);
break;
case (T_STR):
if (!cs[j].isNA(i)) {
Expand Down
Loading

0 comments on commit d16f706

Please sign in to comment.