diff --git a/h2o-core/src/main/java/water/api/FramesHandler.java b/h2o-core/src/main/java/water/api/FramesHandler.java index 5ac14ff7eb10..283d4dc9f9c5 100644 --- a/h2o-core/src/main/java/water/api/FramesHandler.java +++ b/h2o-core/src/main/java/water/api/FramesHandler.java @@ -252,7 +252,7 @@ public FramesV3 export(int version, FramesV3 s) { if (s.parallel) { Log.warn("Parallel export to a single file is not supported for parquet format! Export will continue with a parquet-specific setup."); } - s.job = new JobV3(Frame.exportParquet(fr, s.path, s.force, s.compression, s.write_checksum)); + s.job = new JobV3(Frame.exportParquet(fr, s.path, s.force, s.compression, s.write_checksum, s.tz_adjust_from_local)); } else { Frame.CSVStreamParams csvParms = new Frame.CSVStreamParams() .setSeparator(s.separator) diff --git a/h2o-core/src/main/java/water/api/ParseHandler.java b/h2o-core/src/main/java/water/api/ParseHandler.java index 54420e41e8f9..105cd64e50dd 100644 --- a/h2o-core/src/main/java/water/api/ParseHandler.java +++ b/h2o-core/src/main/java/water/api/ParseHandler.java @@ -28,7 +28,7 @@ public ParseV3 parse(int version, ParseV3 parse) { new ParseWriter.ParseErr[0], parse.chunk_size, parse.decrypt_tool != null ? parse.decrypt_tool.key() : null, parse.skipped_columns, parse.custom_non_data_line_markers != null ? parse.custom_non_data_line_markers.getBytes(): null, - parse.escapechar, parse.force_col_types, parse.tz_adjustment); + parse.escapechar, parse.force_col_types, parse.tz_adjust_to_local); if (parse.source_frames == null) throw new H2OIllegalArgumentException("Data for Frame '" + parse.destination_frame.name + "' is not available. Please check that the path is valid (for all H2O nodes).'"); diff --git a/h2o-core/src/main/java/water/api/schemas3/FramesV3.java b/h2o-core/src/main/java/water/api/schemas3/FramesV3.java index 610ee0adf1de..3be470401f51 100644 --- a/h2o-core/src/main/java/water/api/schemas3/FramesV3.java +++ b/h2o-core/src/main/java/water/api/schemas3/FramesV3.java @@ -53,6 +53,9 @@ public class FramesV3 extends RequestSchemaV3 { @API(help="Specifies if checksum should be written next to data files on export (if supported by export format).") public boolean write_checksum = true; + @API(help="Specifies if the timezone should be adjusted from local to UTC timezone (parquet only).") + public boolean tz_adjust_from_local = false; + @API(help="Field separator (default ',')") public byte separator = Frame.CSVStreamParams.DEFAULT_SEPARATOR; diff --git a/h2o-core/src/main/java/water/api/schemas3/ParseSetupV3.java b/h2o-core/src/main/java/water/api/schemas3/ParseSetupV3.java index cb6fe9eb411c..a51318432036 100644 --- a/h2o-core/src/main/java/water/api/schemas3/ParseSetupV3.java +++ b/h2o-core/src/main/java/water/api/schemas3/ParseSetupV3.java @@ -91,7 +91,7 @@ public class ParseSetupV3 extends RequestSchemaV3 { public boolean force_col_types; @API(help="Adjust the imported time from GMT timezone to cluster timezone.", direction=API.Direction.INPUT) - public boolean tz_adjustment; + public boolean tz_adjust_to_local; @Override public ParseSetup fillImpl(ParseSetup impl) { diff --git a/h2o-core/src/main/java/water/api/schemas3/ParseV3.java b/h2o-core/src/main/java/water/api/schemas3/ParseV3.java index 8ea21f77efe8..b6eafb3c1002 100644 --- a/h2o-core/src/main/java/water/api/schemas3/ParseV3.java +++ b/h2o-core/src/main/java/water/api/schemas3/ParseV3.java @@ -81,5 +81,5 @@ public class ParseV3 extends RequestSchemaV3 { public byte escapechar = ParseSetup.DEFAULT_ESCAPE_CHAR; @API(help="Adjust the imported time from GMT timezone to cluster timezone.", direction=API.Direction.INPUT) - public boolean tz_adjustment; + public boolean tz_adjust_to_local; } diff --git a/h2o-core/src/main/java/water/fvec/Frame.java b/h2o-core/src/main/java/water/fvec/Frame.java index a1ac60dae984..a600f5ef12be 100644 --- a/h2o-core/src/main/java/water/fvec/Frame.java +++ b/h2o-core/src/main/java/water/fvec/Frame.java @@ -1614,7 +1614,7 @@ public static Job export(Frame fr, String path, String frameName, boolean overwr return job.start(t, fr.anyVec().nChunks()); } - public static Job exportParquet(Frame fr, String path, boolean overwrite, String compression, boolean writeChecksum) { + public static Job exportParquet(Frame fr, String path, boolean overwrite, String compression, boolean writeChecksum, boolean tzAdjustFromLocal) { // Validate input if (H2O.getPM().isFileAccessDenied(path)) { throw new H2OFileAccessDeniedException("File " + path + " access denied"); @@ -1638,7 +1638,7 @@ public static Job exportParquet(Frame fr, String path, boolean overwrite, String } Job job = new Job<>(fr._key, "water.fvec.Frame", "Export dataset"); - H2O.H2OCountedCompleter t = parquetExporter.export(fr, path, overwrite, compression, writeChecksum); + H2O.H2OCountedCompleter t = parquetExporter.export(fr, path, overwrite, compression, writeChecksum, tzAdjustFromLocal); return job.start(t, fr.anyVec().nChunks()); } diff --git a/h2o-core/src/main/java/water/parser/BinaryFormatExporter.java b/h2o-core/src/main/java/water/parser/BinaryFormatExporter.java index f90fc9803735..ac21e3452b88 100644 --- a/h2o-core/src/main/java/water/parser/BinaryFormatExporter.java +++ b/h2o-core/src/main/java/water/parser/BinaryFormatExporter.java @@ -6,7 +6,7 @@ public interface BinaryFormatExporter { - H2O.H2OCountedCompleter export(Frame frame, String path, boolean force, String compression, boolean writeChecksum); + H2O.H2OCountedCompleter export(Frame frame, String path, boolean force, String compression, boolean writeChecksum, boolean tzAdjustFromLocal); boolean supports(ExportFileFormat format); } diff --git a/h2o-core/src/main/java/water/parser/ParseSetup.java b/h2o-core/src/main/java/water/parser/ParseSetup.java index 2885ea643b39..1c7117f9dc66 100644 --- a/h2o-core/src/main/java/water/parser/ParseSetup.java +++ b/h2o-core/src/main/java/water/parser/ParseSetup.java @@ -44,7 +44,7 @@ public class ParseSetup extends Iced { int[] _parse_columns_indices; // store column indices to be parsed into the final file byte[] _nonDataLineMarkers; boolean _force_col_types = false; // at end of parsing, change column type to users specified ones - boolean _tz_adjustment = false; + boolean _tz_adjust_to_local = false; String[] _orig_column_types; // copy over the original column type setup before translating to byte[] String[] _synthetic_column_names; // Columns with constant values to be added to parsed Frame @@ -74,7 +74,7 @@ public ParseSetup(ParseSetup ps) { ps._separator, ps._single_quotes, ps._check_header, ps._number_columns, ps._column_names, ps._column_types, ps._domains, ps._na_strings, ps._data, new ParseWriter.ParseErr[0], ps._chunk_size, ps._decrypt_tool, ps._skipped_columns, - ps._nonDataLineMarkers, ps._escapechar, ps._tz_adjustment); + ps._nonDataLineMarkers, ps._escapechar, ps._tz_adjust_to_local); } public static ParseSetup makeSVMLightSetup(){ @@ -87,22 +87,22 @@ public static ParseSetup makeSVMLightSetup(){ // when it is called again, it either contains the guess column types or it will have user defined column types public ParseSetup(ParserInfo parse_type, byte sep, boolean singleQuotes, int checkHeader, int ncols, String[] columnNames, byte[] ctypes, String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs, - int chunkSize, byte[] nonDataLineMarkers, byte escapeChar, boolean tzAdjustment) { + int chunkSize, byte[] nonDataLineMarkers, byte escapeChar, boolean tzAdjustToLocal) { this(parse_type, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes, domains, naStrings, data, errs, - chunkSize, null, null, nonDataLineMarkers, escapeChar, tzAdjustment); + chunkSize, null, null, nonDataLineMarkers, escapeChar, tzAdjustToLocal); } public ParseSetup(ParserInfo parse_type, byte sep, boolean singleQuotes, int checkHeader, int ncols, String[] columnNames, byte[] ctypes, String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs, - int chunkSize, Key decrypt_tool, int[] skipped_columns, byte[] nonDataLineMarkers, byte escapeChar, boolean tzAdjustment) { + int chunkSize, Key decrypt_tool, int[] skipped_columns, byte[] nonDataLineMarkers, byte escapeChar, boolean tzAdjustToLocal) { this(parse_type, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes, domains, naStrings, data, errs, - chunkSize, decrypt_tool, skipped_columns, nonDataLineMarkers, escapeChar, false, tzAdjustment); + chunkSize, decrypt_tool, skipped_columns, nonDataLineMarkers, escapeChar, false, tzAdjustToLocal); } public ParseSetup(ParserInfo parse_type, byte sep, boolean singleQuotes, int checkHeader, int ncols, String[] columnNames, byte[] ctypes, String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs, int chunkSize, Key decrypt_tool, int[] skipped_columns, byte[] nonDataLineMarkers, - byte escapeChar, boolean force_col_types, boolean tz_adjustment) { + byte escapeChar, boolean force_col_types, boolean tz_adjust_to_local) { _parse_type = parse_type; _separator = sep; _nonDataLineMarkers = nonDataLineMarkers; @@ -120,7 +120,7 @@ public ParseSetup(ParserInfo parse_type, byte sep, boolean singleQuotes, int che _skipped_columns = skipped_columns; _escapechar = escapeChar; _force_col_types = force_col_types; - _tz_adjustment = tz_adjustment; + _tz_adjust_to_local = tz_adjust_to_local; setParseColumnIndices(ncols, _skipped_columns); } @@ -174,7 +174,7 @@ ps.column_names, strToColumnTypes(ps.column_types), ps.chunk_size, ps.decrypt_tool != null ? ps.decrypt_tool.key() : null, ps.skipped_columns, ps.custom_non_data_line_markers != null ? ps.custom_non_data_line_markers.getBytes() : null, - ps.escapechar, ps.force_col_types, ps.tz_adjustment); + ps.escapechar, ps.force_col_types, ps.tz_adjust_to_local); this._force_col_types = ps.force_col_types; this._orig_column_types = this._force_col_types ? (ps.column_types == null ? null : ps.column_types.clone()) : null; } @@ -187,9 +187,9 @@ ps.column_names, strToColumnTypes(ps.column_types), */ public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader, int ncols, String[] columnNames, byte[] ctypes, - String[][] domains, String[][] naStrings, String[][] data, byte[] nonDataLineMarkers, byte escapeChar, boolean tzAdjustment) { + String[][] domains, String[][] naStrings, String[][] data, byte[] nonDataLineMarkers, byte escapeChar, boolean tzAdjustToLocal) { this(parseType, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes, - domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, nonDataLineMarkers, escapeChar, tzAdjustment); + domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, nonDataLineMarkers, escapeChar, tzAdjustToLocal); } /** @@ -200,23 +200,23 @@ public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int chec */ public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader, int ncols, String[] columnNames, byte[] ctypes, - String[][] domains, String[][] naStrings, String[][] data, byte escapeChar, boolean tzAdjustment) { + String[][] domains, String[][] naStrings, String[][] data, byte escapeChar, boolean tzAdjustToLocal) { this(parseType, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes, - domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, null, escapeChar, tzAdjustment); + domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, null, escapeChar, tzAdjustToLocal); } public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader, int ncols, String[] columnNames, byte[] ctypes, - String[][] domains, String[][] naStrings, String[][] data, boolean tzAdjustment) { + String[][] domains, String[][] naStrings, String[][] data, boolean tzAdjustToLocal) { this(parseType, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes, - domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, null, ParseSetup.DEFAULT_ESCAPE_CHAR,tzAdjustment); + domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, null, ParseSetup.DEFAULT_ESCAPE_CHAR,tzAdjustToLocal); } public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader, int ncols, String[] columnNames, byte[] ctypes, - String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs, byte[] nonDataLineMarkers, boolean tzAdjustment) { + String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs, byte[] nonDataLineMarkers, boolean tzAdjustToLocal) { this(parseType, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes, - domains, naStrings, data, errs, FileVec.DFLT_CHUNK_SIZE, nonDataLineMarkers, ParseSetup.DEFAULT_ESCAPE_CHAR, tzAdjustment); + domains, naStrings, data, errs, FileVec.DFLT_CHUNK_SIZE, nonDataLineMarkers, ParseSetup.DEFAULT_ESCAPE_CHAR, tzAdjustToLocal); } public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader, @@ -261,8 +261,8 @@ public boolean getForceColTypes() { return _force_col_types; } - public boolean getTzAdjustment() { - return _tz_adjustment; + public boolean gettzAdjustToLocal() { + return _tz_adjust_to_local; } public byte[] getColumnTypes() { return _column_types; } @@ -564,7 +564,7 @@ public GuessSetupTsk(ParseSetup userSetup) { } if (_gblSetup==null) throw new RuntimeException("This H2O node couldn't find the file(s) to parse. Please check files and/or working directories."); - _gblSetup.setTzAdjustment(_userSetup.getTzAdjustment()); + _gblSetup.settzAdjustToLocal(_userSetup.gettzAdjustToLocal()); _gblSetup.setFileName(FileUtils.keyToFileName(key)); } @@ -594,7 +594,7 @@ public void reduce(GuessSetupTsk other) { else _gblSetup._na_strings = _userSetup._na_strings; } - _gblSetup._tz_adjustment = _gblSetup._tz_adjustment || _userSetup._tz_adjustment; + _gblSetup._tz_adjust_to_local = _gblSetup._tz_adjust_to_local || _userSetup._tz_adjust_to_local; // if(_gblSetup._errs != null) for(ParseWriter.ParseErr err:_gblSetup._errs) Log.warn("ParseSetup: " + err.toString()); @@ -608,7 +608,7 @@ private ParseSetup mergeSetups(ParseSetup setupA, ParseSetup setupB, String file } ParseSetup mergedSetup = setupA; - mergedSetup._tz_adjustment = setupA._tz_adjustment || setupB._tz_adjustment; + mergedSetup._tz_adjust_to_local = setupA._tz_adjust_to_local || setupB._tz_adjust_to_local; mergedSetup._check_header = unifyCheckHeader(setupA._check_header, setupB._check_header); mergedSetup._separator = unifyColumnSeparators(setupA._separator, setupB._separator); @@ -887,8 +887,8 @@ public ParseSetup setForceColTypes(boolean force_col_types) { return this; } - public ParseSetup setTzAdjustment(boolean tz_adjustment) { - this._tz_adjustment = tz_adjustment; + public ParseSetup settzAdjustToLocal(boolean tz_adjust_to_local) { + this._tz_adjust_to_local = tz_adjust_to_local; return this; } diff --git a/h2o-core/src/main/java/water/parser/ParserProvider.java b/h2o-core/src/main/java/water/parser/ParserProvider.java index f8b2f74f1551..376478d74bcb 100644 --- a/h2o-core/src/main/java/water/parser/ParserProvider.java +++ b/h2o-core/src/main/java/water/parser/ParserProvider.java @@ -37,7 +37,7 @@ public final ParseSetup guessSetup(ByteVec v, byte[] bits, ParseSetup userSetup) */ protected ParseSetup guessSetup_impl(ByteVec v, byte[] bits, ParseSetup userSetup) { ParseSetup ps = guessInitSetup(v, bits, userSetup); - return guessFinalSetup(v, bits, ps).setTzAdjustment(userSetup._tz_adjustment); + return guessFinalSetup(v, bits, ps).settzAdjustToLocal(userSetup._tz_adjust_to_local); } /** diff --git a/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/ChunkConverter.java b/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/ChunkConverter.java index 73490898d9a9..5dc09579fe65 100644 --- a/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/ChunkConverter.java +++ b/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/ChunkConverter.java @@ -10,6 +10,7 @@ import org.joda.time.DateTimeUtils; import org.joda.time.DateTimeZone; import water.fvec.Vec; +import water.logging.Logger; import water.parser.BufferedString; import water.parser.ParseTime; import water.parser.parquet.ext.DecimalUtils; @@ -17,6 +18,8 @@ import java.time.Instant; +import static water.parser.parquet.TypeUtils.getTimestampAdjustmentFromUtcToLocalInMillis; + /** * Implementation of Parquet's GroupConverter for H2O's chunks. * @@ -141,9 +144,7 @@ private PrimitiveConverter newConverter(int colIdx, byte vecType, PrimitiveType case Vec.T_TIME: if (OriginalType.TIMESTAMP_MILLIS.equals(parquetType.getOriginalType()) || parquetType.getPrimitiveTypeName().equals(PrimitiveType.PrimitiveTypeName.INT96)) { if (_adjustTimezone) { - DateTimeZone parsingTimezone = ParseTime.getTimezone(); - long currentTimeMillisInParsingTimezone = new DateTime(parsingTimezone).getMillis(); - long timestampAdjustmentMillis = parsingTimezone.getOffset(currentTimeMillisInParsingTimezone); + long timestampAdjustmentMillis = getTimestampAdjustmentFromUtcToLocalInMillis(); return new TimestampConverter(colIdx, _writer, timestampAdjustmentMillis); } else { return new TimestampConverter(colIdx, _writer, 0L); @@ -339,7 +340,7 @@ public void addBinary(Binary value) { } private long adjustTimeStamp(long ts) { - return ts - timestampAdjustmentMillis; + return ts + timestampAdjustmentMillis; } } diff --git a/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/FrameParquetExporter.java b/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/FrameParquetExporter.java index e1fc1f8a85ed..845997b5e8b2 100644 --- a/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/FrameParquetExporter.java +++ b/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/FrameParquetExporter.java @@ -19,6 +19,7 @@ import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; import static water.fvec.Vec.*; +import static water.parser.parquet.TypeUtils.getTimestampAdjustmentFromUtcToLocalInMillis; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.hadoop.fs.Path; @@ -30,7 +31,7 @@ public class FrameParquetExporter { - public void export(H2O.H2OCountedCompleter completer, String path, Frame frame, boolean force, String compression, boolean writeChecksum) { + public void export(H2O.H2OCountedCompleter completer, String path, Frame frame, boolean force, String compression, boolean writeChecksum, boolean tzAdjustFromLocal) { File f = new File(path); new FrameParquetExporter.PartExportParquetTask( completer, @@ -41,7 +42,8 @@ public void export(H2O.H2OCountedCompleter completer, String path, Frame fram frame.domains(), force, compression, - writeChecksum + writeChecksum, + tzAdjustFromLocal ).dfork(frame); } @@ -54,10 +56,11 @@ private static class PartExportParquetTask extends MRTask final String[][] _domains; final boolean _force; final boolean _writeChecksum; + final boolean _tzAdjustFromLocal; PartExportParquetTask(H2O.H2OCountedCompleter completer, String path, String messageTypeString, String[] colNames, byte[] colTypes, String[][] domains, - boolean force, String compression, boolean writeChecksum) { + boolean force, String compression, boolean writeChecksum, boolean tzAdjustFromLocal) { super(completer); _path = path; _compressionCodecName = getCompressionCodecName(compression); @@ -67,6 +70,7 @@ private static class PartExportParquetTask extends MRTask _domains = domains; _force = force; _writeChecksum = writeChecksum; + _tzAdjustFromLocal = tzAdjustFromLocal; } CompressionCodecName getCompressionCodecName(String compression) { @@ -100,7 +104,7 @@ public void map(Chunk[] cs) { try (ParquetWriter writer = buildWriter(new Path(partPath), _compressionCodecName, PersistHdfs.CONF, parseMessageType(_messageTypeString), getMode(_force), _writeChecksum)) { String currColName; byte currColType; - + long timeStampAdjustment = _tzAdjustFromLocal ? getTimestampAdjustmentFromUtcToLocalInMillis() : 0L; for (int i = 0; i < anyChunk._len; i++) { Group group = fact.newGroup(); for (int j = 0; j < cs.length; j++) { @@ -109,7 +113,9 @@ public void map(Chunk[] cs) { switch (currColType) { case (T_UUID): case (T_TIME): - group = group.append(currColName, cs[j].at8(i)); + long timestamp = cs[j].at8(i); + long adjustedTimestamp = timestamp - timeStampAdjustment; + group = group.append(currColName, adjustedTimestamp); break; case (T_STR): if (!cs[j].isNA(i)) { diff --git a/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/ParquetExporter.java b/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/ParquetExporter.java index 76559c20eb15..c42c483a3015 100644 --- a/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/ParquetExporter.java +++ b/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/ParquetExporter.java @@ -8,8 +8,8 @@ public class ParquetExporter implements BinaryFormatExporter { @Override - public H2O.H2OCountedCompleter export(Frame frame, String path, boolean force, String compression, boolean writeChecksum) { - return new ExportParquetDriver(frame, path, force, compression, writeChecksum); + public H2O.H2OCountedCompleter export(Frame frame, String path, boolean force, String compression, boolean writeChecksum, boolean tzAdjustFromLocal) { + return new ExportParquetDriver(frame, path, force, compression, writeChecksum, tzAdjustFromLocal); } @Override @@ -25,19 +25,22 @@ private class ExportParquetDriver extends H2O.H2OCountedCompleter jobKey) { @Override public ParseSetup guessInitSetup(ByteVec v, byte[] bits, ParseSetup userSetup) { - return ParquetParser.guessFormatSetup(v, bits, userSetup.getTzAdjustment()); + return ParquetParser.guessFormatSetup(v, bits, userSetup.gettzAdjustToLocal()); } @Override diff --git a/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/TypeUtils.java b/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/TypeUtils.java index 27b6234eea9d..295ee0c18c24 100644 --- a/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/TypeUtils.java +++ b/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/TypeUtils.java @@ -14,6 +14,8 @@ package water.parser.parquet; +import org.joda.time.DateTimeZone; + /** * Several helper methods inspired by Guava library - https://github.com/google/guava/. We want to avoid bringing guava dependency when possible. * @@ -44,4 +46,9 @@ public static long longFromBytes( public static int intFromBytes(byte b1, byte b2, byte b3, byte b4) { return b1 << 24 | (b2 & 0xFF) << 16 | (b3 & 0xFF) << 8 | (b4 & 0xFF); } + + public static int getTimestampAdjustmentFromUtcToLocalInMillis() { + DateTimeZone clusterLocalTimezone = DateTimeZone.getDefault(); + return clusterLocalTimezone.getOffset(null); + } } diff --git a/h2o-py/h2o/frame.py b/h2o-py/h2o/frame.py index 340163fd254a..802cdac82575 100644 --- a/h2o-py/h2o/frame.py +++ b/h2o-py/h2o/frame.py @@ -452,13 +452,13 @@ def type(self, col): def _import_parse(self, path, pattern, destination_frame, header, separator, column_names, column_types, na_strings, skipped_columns=None, force_col_types=False, custom_non_data_line_markers=None, partition_by=None, - quotechar=None, escapechar=None, tz_adjustment=False): + quotechar=None, escapechar=None, tz_adjust_to_local=False): if H2OFrame.__LOCAL_EXPANSION_ON_SINGLE_IMPORT__ and is_type(path, str) and "://" not in path: # fixme: delete those 2 lines, cf. https://github.com/h2oai/h2o-3/issues/12573 path = os.path.abspath(path) rawkey = h2o.lazy_import(path, pattern) self._parse(rawkey, destination_frame, header, separator, column_names, column_types, na_strings, skipped_columns, force_col_types, custom_non_data_line_markers, partition_by, quotechar, - escapechar, tz_adjustment) + escapechar, tz_adjust_to_local) return self def _upload_parse(self, path, destination_frame, header, sep, column_names, column_types, na_strings, @@ -471,9 +471,9 @@ def _upload_parse(self, path, destination_frame, header, sep, column_names, colu def _parse(self, rawkey, destination_frame="", header=None, separator=None, column_names=None, column_types=None, na_strings=None, skipped_columns=None, force_col_types=False, custom_non_data_line_markers=None, partition_by=None, quotechar=None, - escapechar=None, tz_adjustment=False): + escapechar=None, tz_adjust_to_local=False): setup = h2o.parse_setup(rawkey, destination_frame, header, separator, column_names, column_types, na_strings, - skipped_columns, force_col_types, custom_non_data_line_markers, partition_by, quotechar, escapechar, tz_adjustment) + skipped_columns, force_col_types, custom_non_data_line_markers, partition_by, quotechar, escapechar, tz_adjust_to_local) return self._parse_raw(setup) def _parse_raw(self, setup): @@ -493,7 +493,7 @@ def _parse_raw(self, setup): "partition_by": None, "single_quotes": None, "escapechar": None, - "tz_adjustment": False + "tz_adjust_to_local": False } if setup["column_names"]: p["column_names"] = None diff --git a/h2o-py/h2o/h2o.py b/h2o-py/h2o/h2o.py index dd9ed9f14a8b..60408518d50a 100644 --- a/h2o-py/h2o/h2o.py +++ b/h2o-py/h2o/h2o.py @@ -414,7 +414,7 @@ def upload_file(path, destination_frame=None, header=0, sep=None, col_names=None def import_file(path=None, destination_frame=None, parse=True, header=0, sep=None, col_names=None, col_types=None, na_strings=None, pattern=None, skipped_columns=None, force_col_types=False, custom_non_data_line_markers=None, - partition_by=None, quotechar=None, escapechar=None, tz_adjustment=False): + partition_by=None, quotechar=None, escapechar=None, tz_adjust_to_local=False): """ Import files into an H2O cluster. The default behavior is to pass-through to the parse phase automatically. @@ -490,7 +490,7 @@ def import_file(path=None, destination_frame=None, parse=True, header=0, sep=Non assert_is_type(partition_by, None, [str], str) assert_is_type(quotechar, None, U("'", '"')) assert_is_type(escapechar, None, I(str, lambda s: len(s) == 1)) - assert_is_type(tz_adjustment, bool) + assert_is_type(tz_adjust_to_local, bool) assert isinstance(skipped_columns, (type(None), list)), "The skipped_columns should be an list of column names!" check_frame_id(destination_frame) patharr = path if isinstance(path, list) else [path] @@ -501,7 +501,7 @@ def import_file(path=None, destination_frame=None, parse=True, header=0, sep=Non return lazy_import(path, pattern) else: return H2OFrame()._import_parse(path, pattern, destination_frame, header, sep, col_names, col_types, na_strings, - skipped_columns, force_col_types, custom_non_data_line_markers, partition_by, quotechar, escapechar, tz_adjustment) + skipped_columns, force_col_types, custom_non_data_line_markers, partition_by, quotechar, escapechar, tz_adjust_to_local) def load_grid(grid_file_path, load_params_references=False): @@ -745,7 +745,7 @@ def import_sql_select(connection_url, select_query, username, password, optimize def parse_setup(raw_frames, destination_frame=None, header=0, separator=None, column_names=None, column_types=None, na_strings=None, skipped_columns=None, force_col_types=False, custom_non_data_line_markers=None, partition_by=None, quotechar=None, escapechar=None, - tz_adjustment=False): + tz_adjust_to_local=False): """ Retrieve H2O's best guess as to what the structure of the data file is. @@ -797,7 +797,7 @@ def parse_setup(raw_frames, destination_frame=None, header=0, separator=None, co :param partition_by: A list of columns the dataset has been partitioned by. None by default. :param quotechar: A hint for the parser which character to expect as quoting character. Only single quote, double quote or None (default) are allowed. None means automatic detection. :param escapechar: (Optional) One ASCII character used to escape other characters. - :param tz_adjustment: (Optional) Adjust the imported data timezone from GMT to cluster timezone. + :param tz_adjust_to_local: (Optional) Adjust the imported data timezone from GMT to cluster timezone. :returns: a dictionary containing parse parameters guessed by the H2O backend. @@ -832,7 +832,7 @@ def parse_setup(raw_frames, destination_frame=None, header=0, separator=None, co assert_is_type(partition_by, None, [str], str) assert_is_type(quotechar, None, U("'", '"')) assert_is_type(escapechar, None, I(str, lambda s: len(s) == 1)) - assert_is_type(tz_adjustment, bool) + assert_is_type(tz_adjust_to_local, bool) check_frame_id(destination_frame) # The H2O backend only accepts things that are quoted @@ -842,7 +842,7 @@ def parse_setup(raw_frames, destination_frame=None, header=0, separator=None, co kwargs = {"check_header": header, "source_frames": [quoted(frame_id) for frame_id in raw_frames], "single_quotes": quotechar == "'", - "tz_adjustment": tz_adjustment} + "tz_adjust_to_local": tz_adjust_to_local} if separator: kwargs["separator"] = ord(separator) @@ -1609,7 +1609,7 @@ def load_model(path): def export_file(frame, path, force=False, sep=",", compression=None, parts=1, header=True, quote_header=True, - parallel=False, format="csv", write_checksum=True): + parallel=False, format="csv", write_checksum=True, tz_adjust_from_local=False): """ Export a given H2OFrame to a path on the machine this python session is currently connected to. @@ -1664,7 +1664,7 @@ def export_file(frame, path, force=False, sep=",", compression=None, parts=1, he data={"path": path, "num_parts": parts, "force": force, "compression": compression, "separator": ord(sep), "header": header, "quote_header": quote_header, "parallel": parallel, - "format": format, "write_checksum": write_checksum} + "format": format, "write_checksum": write_checksum, "tz_adjust_from_local": tz_adjust_from_local} ), "Export File").poll() diff --git a/h2o-py/tests/testdir_parser/pyunit_parquet_adjust_timezone.py b/h2o-py/tests/testdir_parser/pyunit_parquet_adjust_timezone.py index 196328ea770d..d5a00e1a2095 100644 --- a/h2o-py/tests/testdir_parser/pyunit_parquet_adjust_timezone.py +++ b/h2o-py/tests/testdir_parser/pyunit_parquet_adjust_timezone.py @@ -10,17 +10,30 @@ Adjust timestamp parquet ''' +test_local_user_timezone = "America/Chicago" + + def adjust_timestamp_parquet(): with tempfile.TemporaryDirectory() as dir: + # prepare the file which will be imported input_timestamp = '2024-08-02 12:00:00' - timestamp_df = H2OFrame({"timestamp": input_timestamp}) - h2o.export_file(timestamp_df, path=dir, format="parquet", write_checksum=False) - h2o.rapids(expr='(setTimeZone "America/Los_Angeles")') - expected_df = H2OFrame({"timestamp": '2024-08-02 12:00:00'}) - imported_df = h2o.import_file(dir, tz_adjustment=True) + original_timestamp_df = H2OFrame({"timestamp": input_timestamp}) + h2o.export_file(original_timestamp_df, path=dir + "/import", format="parquet", write_checksum=False) + + # import the file and see tz_adjust_to_local works + imported_df = h2o.import_file(dir + "/import", tz_adjust_to_local=True) + expected_df = H2OFrame({"timestamp": '2024-08-02 07:00:00'}) assert imported_df[0, 0] == expected_df[0, 0] + # export the file and see tz_adjust_from_local works + h2o.export_file(imported_df, path=dir + "/export", tz_adjust_from_local=True) + reimported_without_adjustment_df = h2o.import_file(dir + "/import") + assert original_timestamp_df[0, 0] == reimported_without_adjustment_df[0, 0] + + if __name__ == "__main__": - pyunit_utils.standalone_test(adjust_timestamp_parquet) + pyunit_utils.standalone_test(adjust_timestamp_parquet, init_options={"jvm_custom_args": [ + ("-Duser.timezone=%s" % test_local_user_timezone)]}) + else: adjust_timestamp_parquet()