Skip to content

Commit

Permalink
Support BigQuery JSON column type (#5416)
Browse files Browse the repository at this point in the history
Co-authored-by: Michel Davit <[email protected]>
  • Loading branch information
turb and RustedBones authored Jul 4, 2024
1 parent c4d4554 commit 4e7f918
Show file tree
Hide file tree
Showing 12 changed files with 86 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.google.api.client.googleapis.json.GoogleJsonResponseException
import com.google.api.services.bigquery.model.{Dataset, DatasetReference}
import com.google.protobuf.ByteString
import com.spotify.scio.bigquery.client.BigQuery
import com.spotify.scio.bigquery.types.{Geography, Json}
import org.joda.time._
import org.joda.time.format.DateTimeFormat
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -50,7 +51,9 @@ object PopulateTestData {
timestamp: Instant,
date: LocalDate,
time: LocalTime,
datetime: LocalDateTime
datetime: LocalDateTime,
geography: Geography,
json: Json
)

@BigQueryType.toTable
Expand All @@ -64,7 +67,9 @@ object PopulateTestData {
timestamp: Option[Instant],
date: Option[LocalDate],
time: Option[LocalTime],
datetime: Option[LocalDateTime]
datetime: Option[LocalDateTime],
geography: Option[Geography],
json: Option[Json]
)

@BigQueryType.toTable
Expand All @@ -78,7 +83,9 @@ object PopulateTestData {
timestamp: List[Instant],
date: List[LocalDate],
time: List[LocalTime],
datetime: List[LocalDateTime]
datetime: List[LocalDateTime],
geography: List[Geography],
json: List[Json]
)

case class Record(int: Long, string: String)
Expand Down Expand Up @@ -200,7 +207,9 @@ object PopulateTestData {
t.plus(Duration.millis(i.toLong)),
dt.toLocalDate.plusDays(i),
dt.toLocalTime.plusMillis(i),
dt.toLocalDateTime.plusMillis(i)
dt.toLocalDateTime.plusMillis(i),
Geography(s"POINT($i $i)"),
Json(s"""{"value": $i}""")
)
}

Expand All @@ -217,7 +226,9 @@ object PopulateTestData {
Some(t.plus(Duration.millis(i.toLong))),
Some(dt.toLocalDate.plusDays(i)),
Some(dt.toLocalTime.plusMillis(i)),
Some(dt.toLocalDateTime.plusMillis(i))
Some(dt.toLocalDateTime.plusMillis(i)),
Some(Geography(s"POINT($i $i)")),
Some(Json(s"""{"value": $i}"""))
)
}

Expand All @@ -234,7 +245,9 @@ object PopulateTestData {
List(t.plus(Duration.millis(i.toLong))),
List(dt.toLocalDate.plusDays(i)),
List(dt.toLocalTime.plusMillis(i)),
List(dt.toLocalDateTime.plusMillis(i))
List(dt.toLocalDateTime.plusMillis(i)),
List(Geography(s"POINT($i $i)")),
List(Json(s"""{"value": $i}"""))
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class BigQueryAvroUtilsWrapper {
ImmutableMap.<String, Type>builder()
.put("STRING", Type.STRING)
.put("GEOGRAPHY", Type.STRING)
.put("JSON", Type.STRING)
.put("BYTES", Type.BYTES)
.put("INTEGER", Type.LONG)
.put("FLOAT", Type.DOUBLE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ object StorageUtil {
case null => "STRING"
case t if t.getName == "datetime" => "DATETIME"
case t if t.getName == "geography" => "GEOGRAPHY"
case t if t.getName == "json" => "JSON"
case t =>
throw new IllegalStateException(s"Unsupported logical type: $t")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ private[types] object ConverterProvider {
q"_root_.com.spotify.scio.bigquery.DateTime.parse($tree.toString)"
case t if t =:= typeOf[Geography] =>
q"_root_.com.spotify.scio.bigquery.types.Geography($tree.toString)"
case t if t =:= typeOf[Json] =>
q"_root_.com.spotify.scio.bigquery.types.Json($tree.toString)"

case t if isCaseClass(c)(t) =>
val fn = TermName("r" + t.typeSymbol.name)
Expand Down Expand Up @@ -187,8 +189,10 @@ private[types] object ConverterProvider {
case t if t =:= typeOf[LocalDateTime] =>
q"_root_.com.spotify.scio.bigquery.DateTime($tree)"

// different than nested record match below, even though thore are case classes
case t if t =:= typeOf[Geography] =>
// different than nested record match below, even though this is a case class
q"$tree.wkt"
case t if t =:= typeOf[Json] =>
q"$tree.wkt"

case t if isCaseClass(c)(t) => // nested records
Expand Down Expand Up @@ -289,9 +293,11 @@ private[types] object ConverterProvider {
case t if t =:= typeOf[LocalDateTime] =>
q"_root_.com.spotify.scio.bigquery.DateTime.parse($s)"

// different than nested record match below, even though those are case classes
case t if t =:= typeOf[Geography] =>
// different than nested record match below, even though this is a case class
q"_root_.com.spotify.scio.bigquery.types.Geography($s)"
case t if t =:= typeOf[Json] =>
q"_root_.com.spotify.scio.bigquery.types.Json($s)"

case t if isCaseClass(c)(t) => // nested records
val fn = TermName("r" + t.typeSymbol.name)
Expand Down Expand Up @@ -392,8 +398,10 @@ private[types] object ConverterProvider {
case t if t =:= typeOf[LocalDateTime] =>
q"_root_.com.spotify.scio.bigquery.DateTime($tree)"

// different than nested record match below, even though those are case classes
case t if t =:= typeOf[Geography] =>
// different than nested record match below, even though this is a case class
q"$tree.wkt"
case t if t =:= typeOf[Json] =>
q"$tree.wkt"

case t if isCaseClass(c)(t) => // nested records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ private[types] object SchemaProvider {
case t if t =:= typeOf[LocalTime] => ("TIME", Iterable.empty)
case t if t =:= typeOf[LocalDateTime] => ("DATETIME", Iterable.empty)
case t if t =:= typeOf[Geography] => ("GEOGRAPHY", Iterable.empty)
case t if t =:= typeOf[Json] => ("JSON", Iterable.empty)

case t if isCaseClass(t) => ("RECORD", toFields(t))
case _ => throw new RuntimeException(s"Unsupported type: $tpe")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,23 +204,14 @@ private[types] object TypeProvider {
tq"${TypeName(s"Function${fields.size}")}[..${fields.map(_.children.head)}, $cName]"
val traits = (if (fields.size <= 22) Seq(fnTrait) else Seq()) ++ defTblDesc
.map(_ => tq"${p(c, SType)}.HasTableDescription")
val taggedFields = fields.map {
case q"$m val $n: _root_.com.spotify.scio.bigquery.types.Geography = $rhs" =>
provider.initializeToTable(c)(m, n, tq"_root_.java.lang.String")
c.universe.ValDef(
c.universe.Modifiers(m.flags, m.privateWithin, m.annotations),
n,
tq"_root_.java.lang.String @${typeOf[BigQueryTag]}",
q"{$rhs}.wkt"
)
case ValDef(m, n, tpt, rhs) =>
provider.initializeToTable(c)(m, n, tpt)
c.universe.ValDef(
c.universe.Modifiers(m.flags, m.privateWithin, m.annotations),
n,
tq"$tpt @${typeOf[BigQueryTag]}",
rhs
)
val taggedFields = fields.map { case ValDef(m, n, tpt, rhs) =>
provider.initializeToTable(c)(m, n, tpt)
c.universe.ValDef(
c.universe.Modifiers(m.flags, m.privateWithin, m.annotations),
n,
tq"$tpt @${typeOf[BigQueryTag]}",
rhs
)
}
val caseClassTree =
q"""${caseClass(c)(mods, cName, taggedFields, body)}"""
Expand Down Expand Up @@ -288,6 +279,8 @@ private[types] object TypeProvider {
case "DATETIME" => (tq"_root_.org.joda.time.LocalDateTime", Nil)
case "GEOGRAPHY" =>
(tq"_root_.com.spotify.scio.bigquery.types.Geography", Nil)
case "JSON" =>
(tq"_root_.com.spotify.scio.bigquery.types.Json", Nil)
case "RECORD" | "STRUCT" =>
val name = NameProvider.getUniqueName(tfs.getName)
val (fields, records) = toFields(tfs.getFields)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,14 @@ package object types {
* Well Known Text formatted string that BigQuery displays for Geography
*/
case class Geography(wkt: String)

/**
* Case class to serve as raw type for Json instances to distinguish them from Strings.
*
* See also https://cloud.google.com/bigquery/docs/json-data
*
* @param wkt
* Well Known Text formatted string that BigQuery displays for Json
*/
case class Json(wkt: String)
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ final class ConverterProviderSpec
o.datetimeF.isDefined shouldBe r.containsKey("datetimeF")
o.bigDecimalF.isDefined shouldBe r.containsKey("bigDecimalF")
o.geographyF.isDefined shouldBe r.containsKey("geographyF")
o.jsonF.isDefined shouldBe r.containsKey("jsonF")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ class ConverterProviderTest extends AnyFlatSpec with Matchers {
BigQueryType.toTableRow[RequiredGeo](RequiredGeo(Geography(wkt))) shouldBe TableRow("a" -> wkt)
}

it should "handle required json type" in {
val wkt = "{\"name\": \"Alice\", \"age\": 30}"
RequiredJson.fromTableRow(TableRow("a" -> wkt)) shouldBe RequiredJson(Json(wkt))
BigQueryType.toTableRow[RequiredJson](RequiredJson(Json(wkt))) shouldBe TableRow("a" -> wkt)
}

it should "handle case classes with methods" in {
RequiredWithMethod.fromTableRow(TableRow("a" -> "")) shouldBe RequiredWithMethod("")
BigQueryType.toTableRow[RequiredWithMethod](RequiredWithMethod("")) shouldBe TableRow("a" -> "")
Expand All @@ -57,6 +63,9 @@ object ConverterProviderTest {
@BigQueryType.toTable
case class RequiredGeo(a: Geography)

@BigQueryType.toTable
case class RequiredJson(a: Json)

@BigQueryType.toTable
case class Required(a: String)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class SchemaProviderTest extends AnyFlatSpec with Matchers {
| {"mode": "$mode", "name": "timeF", "type": "TIME"},
| {"mode": "$mode", "name": "datetimeF", "type": "DATETIME"},
| {"mode": "$mode", "name": "bigDecimalF", "type": "NUMERIC"},
| {"mode": "$mode", "name": "geographyF", "type": "GEOGRAPHY"}
| {"mode": "$mode", "name": "geographyF", "type": "GEOGRAPHY"},
| {"mode": "$mode", "name": "jsonF", "type": "JSON"}
|]
|""".stripMargin

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ object Schemas {
timeF: LocalTime,
datetimeF: LocalDateTime,
bigDecimalF: BigDecimal,
geographyF: Geography
geographyF: Geography,
jsonF: Json
)
case class Optional(
boolF: Option[Boolean],
Expand All @@ -52,7 +53,8 @@ object Schemas {
timeF: Option[LocalTime],
datetimeF: Option[LocalDateTime],
bigDecimalF: Option[BigDecimal],
geographyF: Option[Geography]
geographyF: Option[Geography],
jsonF: Option[Json]
)
case class Repeated(
boolF: List[Boolean],
Expand All @@ -68,7 +70,8 @@ object Schemas {
timeF: List[LocalTime],
datetimeF: List[LocalDateTime],
bigDecimalF: List[BigDecimal],
geographyF: List[Geography]
geographyF: List[Geography],
jsonF: List[Json]
)

// records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,4 +619,19 @@ class TypeProviderTest extends AnyFlatSpec with Matchers {
cc.f1 shouldBe Geography(wkt)
GeoRecordTo(Geography(wkt))
}

@BigQueryType.fromSchema("""
|{"fields": [{"mode": "REQUIRED", "name": "f1", "type": "JSON"}]}
""".stripMargin)
class JsonRecordFrom

@BigQueryType.toTable
case class JsonRecordTo(f1: Json)

it should "support JSON type" in {
val wkt = "{\"name\": \"Alice\", \"age\": 30}"
val cc = JsonRecordFrom(Json(wkt))
cc.f1 shouldBe Json(wkt)
JsonRecordTo(Json(wkt))
}
}

0 comments on commit 4e7f918

Please sign in to comment.