diff --git a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryUtil.java b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryUtil.java index 5c85098b4..7498079bf 100644 --- a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryUtil.java +++ b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryUtil.java @@ -32,6 +32,8 @@ import com.google.common.collect.ImmutableSet; import java.io.IOException; import java.io.UncheckedIOException; +import java.time.Instant; +import java.time.LocalDate; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -39,6 +41,7 @@ import java.util.Properties; import java.util.ServiceLoader; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import javax.validation.constraints.NotNull; import org.apache.hadoop.conf.Configuration; @@ -59,6 +62,9 @@ public class SparkBigQueryUtil { private static final String SPARK_YARN_TAGS = "spark.yarn.tags"; + private static final long MICROS_PER_SECOND = TimeUnit.SECONDS.toMicros(1); + private static final long MIN_SECONDS = Math.floorDiv(Long.MIN_VALUE, MICROS_PER_SECOND); + static final Properties BUILD_PROPERTIES = loadBuildProperties(); static final String CONNECTOR_VERSION = BUILD_PROPERTIES.getProperty("connector.version"); @@ -199,6 +205,20 @@ public static long sparkTimestampToBigQuery(Object sparkValue) { if (sparkValue instanceof Long) { return ((Number) sparkValue).longValue(); } + + if (sparkValue instanceof Instant) { + Instant instant = (Instant) sparkValue; + long epochSeconds = instant.getEpochSecond(); + if (epochSeconds == MIN_SECONDS) { + long us = Math.multiplyExact(epochSeconds + 1, MICROS_PER_SECOND); + return Math.addExact( + us, TimeUnit.NANOSECONDS.toMicros(instant.getNano()) - MICROS_PER_SECOND); + } else { + long us = Math.multiplyExact(epochSeconds, MICROS_PER_SECOND); + return Math.addExact(us, TimeUnit.NANOSECONDS.toMicros(instant.getNano())); + } + } + // need to return timestamp in epoch microseconds java.sql.Timestamp timestamp = (java.sql.Timestamp) sparkValue; long epochMillis = timestamp.getTime(); @@ -210,6 +230,12 @@ public static int sparkDateToBigQuery(Object sparkValue) { if (sparkValue instanceof Number) { return ((Number) sparkValue).intValue(); } + + if (sparkValue instanceof LocalDate) { + LocalDate localDate = (LocalDate) sparkValue; + return Math.toIntExact(localDate.toEpochDay()); + } + java.sql.Date sparkDate = (java.sql.Date) sparkValue; return (int) sparkDate.toLocalDate().toEpochDay(); } diff --git a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/SparkBigQueryUtilTest.java b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/SparkBigQueryUtilTest.java index bcd8af7ea..1d3324ccb 100644 --- a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/SparkBigQueryUtilTest.java +++ b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/SparkBigQueryUtilTest.java @@ -25,6 +25,11 @@ import com.google.cloud.bigquery.TimePartitioning; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import java.sql.Date; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDate; +import java.util.TimeZone; import org.apache.spark.SparkConf; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; @@ -150,4 +155,23 @@ public void testExtractJobLabels_with_labels() { assertThat(labels).containsEntry("dataproc_job_id", "d8f27392957446dbbd7dc28df568e4eb"); assertThat(labels).containsEntry("dataproc_job_uuid", "df379ef3-eeda-3234-8941-e1a36a1959a3"); } + + @Test + public void testSparkDateToBigQuery() { + assertThat(SparkBigQueryUtil.sparkDateToBigQuery(16929L)).isEqualTo(16929L); + assertThat(SparkBigQueryUtil.sparkDateToBigQuery(Date.valueOf("2016-05-08"))).isEqualTo(16929); + assertThat(SparkBigQueryUtil.sparkDateToBigQuery(LocalDate.of(2016, 5, 8))).isEqualTo(16929); + } + + @Test + public void testSparkTimestampToBigQuery() { + TimeZone.setDefault(TimeZone.getTimeZone("UTC")); + assertThat(SparkBigQueryUtil.sparkTimestampToBigQuery(10L)).isEqualTo(10L); + assertThat( + SparkBigQueryUtil.sparkTimestampToBigQuery(Timestamp.valueOf("2016-05-08 00:00:01.01"))) + .isEqualTo(1462665601010000L); + assertThat( + SparkBigQueryUtil.sparkTimestampToBigQuery(Instant.parse("2016-05-08T00:00:01.010Z"))) + .isEqualTo(1462665601010000L); + } }