Skip to content

Commit

Permalink
Fix integration tests after revert (#5601)
Browse files Browse the repository at this point in the history
Co-authored-by: Claire McGinty <[email protected]>

🔥 🔥 🔥
  • Loading branch information
kellen authored Feb 7, 2025
1 parent fe41bb3 commit 9e91a20
Show file tree
Hide file tree
Showing 15 changed files with 173 additions and 196 deletions.
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,9 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
),
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"com.spotify.scio.bigquery.types.package#Json.parse"
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.spotify.scio.bigquery.types.package#BigNumeric.bytes"
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,7 @@ object TypedBigQueryIT {
timestamp: Instant,
date: LocalDate,
time: LocalTime,
// BQ DATETIME is problematic with avro as BQ api uses different representations:
// - BQ export uses 'string(datetime)'
// - BQ load uses 'long(local-timestamp-micros)'
// BigQueryType avroSchema favors read with string type
// datetime: LocalDateTime,
datetime: LocalDateTime,
geography: Geography,
json: Json,
bigNumeric: BigNumeric
Expand Down Expand Up @@ -81,23 +77,12 @@ object TypedBigQueryIT {
y <- Gen.numChar
} yield Geography(s"POINT($x $y)")
)
implicit val arbJson: Arbitrary[Json] = Arbitrary {
import Arbitrary._
import Gen._
Gen
.oneOf(
// json object
alphaLowerStr.flatMap(str => arbInt.arbitrary.map(num => s"""{"$str":$num}""")),
// json array
alphaLowerStr.flatMap(str => arbInt.arbitrary.map(num => s"""["$str",$num]""")),
// json literals
alphaLowerStr.map(str => s""""$str""""),
arbInt.arbitrary.map(_.toString),
arbBool.arbitrary.map(_.toString),
Gen.const("null")
)
.map(wkt => Json(wkt))
}
implicit val arbJson: Arbitrary[Json] = Arbitrary(
for {
key <- Gen.alphaStr
value <- Gen.alphaStr
} yield Json(s"""{"$key":"$value"}""")
)

implicit val arbBigNumeric: Arbitrary[BigNumeric] = Arbitrary {
// Precision: 76.76 (the 77th digit is partial)
Expand All @@ -119,7 +104,7 @@ object TypedBigQueryIT {
private val tableRowTable = table("records_tablerow")
private val avroTable = table("records_avro")

private val records = Gen.listOfN(5, recordGen).sample.get
private val records = Gen.listOfN(100, recordGen).sample.get
private val options = PipelineOptionsFactory
.fromArgs(
"--project=data-integration-test",
Expand All @@ -131,9 +116,8 @@ object TypedBigQueryIT {
class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
import TypedBigQueryIT._

private val bq = BigQuery.defaultInstance()

override protected def afterAll(): Unit = {
val bq = BigQuery.defaultInstance()
// best effort cleanup
Try(bq.tables.delete(typedTable.ref))
Try(bq.tables.delete(tableRowTable.ref))
Expand All @@ -156,14 +140,6 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
runWithRealContext(options) { sc =>
sc.parallelize(records)
.map(Record.toTableRow)
.map { row =>
// TableRow BQ save API uses json
// TO disambiguate from literal json string,
// field MUST be converted to parsed JSON
val jsonLoadRow = new TableRow()
jsonLoadRow.putAll(row.asInstanceOf[java.util.Map[String, _]]) // cast for 2.12
jsonLoadRow.set("json", Json.parse(row.getJson("json")))
}
.saveAsBigQueryTable(
tableRowTable,
schema = Record.schema,
Expand All @@ -177,9 +153,9 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
}
}

it should "handle records as avro format" in {
// TODO fix if in beam 2.61
ignore should "handle records as avro format" in {
implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder(Record.avroSchema)

runWithRealContext(options) { sc =>
sc.parallelize(records)
.map(Record.toAvro)
Expand All @@ -191,8 +167,7 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
}.waitUntilFinish()

runWithRealContext(options) { sc =>
val data =
sc.bigQueryTable(avroTable, Format.GenericRecordWithLogicalTypes).map(Record.fromAvro)
val data = sc.bigQueryTable(avroTable, Format.GenericRecord).map(Record.fromAvro)
data should containInAnyOrder(records)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,57 @@ final private[client] class TableOps(client: Client) {
def exists(tableSpec: String): Boolean =
exists(bq.BigQueryHelpers.parseTableSpec(tableSpec))

/**
* This is annoying but the GCP BQ client v2 does not accept BQ json rows in the same format as BQ
* load. JSON column are expected as string instead of parsed json
*/
private def normalizeRows(schema: TableSchema)(tableRow: TableRow): TableRow =
normalizeRows(schema.getFields.asScala.toList)(tableRow)

private def normalizeRows(fields: List[TableFieldSchema])(tableRow: TableRow): TableRow = {
import com.spotify.scio.bigquery._

fields.foldLeft(tableRow) { (row, f) =>
f.getType match {
case "JSON" =>
val name = f.getName
f.getMode match {
case "REQUIRED" =>
row.set(name, row.getJson(name).wkt)
case "NULLABLE" =>
row.getJsonOpt(name).fold(row) { json =>
row.set(name, json.wkt)
}
case "REPEATED" =>
row.set(name, row.getJsonList(name).map(_.wkt).asJava)
}
case "RECORD" | "STRUCT" =>
val name = f.getName
val netedFields = f.getFields.asScala.toList
f.getMode match {
case "REQUIRED" =>
row.set(name, normalizeRows(netedFields)(row.getRecord(name)))
case "NULLABLE" =>
row.getRecordOpt(name).fold(row) { nestedRow =>
row.set(name, normalizeRows(netedFields)(nestedRow))
}
case "REPEATED" =>
row.set(
name,
row
.getRecordList(name)
.map { nestedRow =>
normalizeRows(netedFields)(nestedRow)
}
.asJava
)
}
case _ =>
row
}
}
}

/** Write rows to a table. */
def writeRows(
tableReference: TableReference,
Expand Down Expand Up @@ -262,7 +313,7 @@ final private[client] class TableOps(client: Client) {
case WriteDisposition.WRITE_APPEND =>
}

service.insertAll(tableReference, rows.asJava)
service.insertAll(tableReference, rows.map(normalizeRows(schema)).asJava)
}

/** Write rows to a table. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,10 @@ object TableRowOps {
}

def json(value: AnyRef): Json = value match {
case x: Json => x
case x: String => Json(x)
case _ => throw new UnsupportedOperationException("Cannot convert to json: " + value)
case x: Json => x
case x: TableRow => Json(x)
case x: String => Json(x)
case _ => throw new UnsupportedOperationException("Cannot convert to json: " + value)
}

def bignumeric(value: AnyRef): BigNumeric = value match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,30 +180,27 @@ private[types] object ConverterProvider {
case t if t =:= typeOf[String] => tree

case t if t =:= typeOf[BigDecimal] =>
q"_root_.com.spotify.scio.bigquery.Numeric.bytes($tree)"
q"_root_.com.spotify.scio.bigquery.Numeric($tree).toString"
case t if t =:= typeOf[ByteString] =>
q"_root_.java.nio.ByteBuffer.wrap($tree.toByteArray)"
case t if t =:= typeOf[Array[Byte]] =>
q"_root_.java.nio.ByteBuffer.wrap($tree)"

case t if t =:= typeOf[Instant] =>
q"_root_.com.spotify.scio.bigquery.Timestamp.micros($tree)"
case t if t =:= typeOf[Instant] => q"$tree.getMillis * 1000"
case t if t =:= typeOf[LocalDate] =>
q"_root_.com.spotify.scio.bigquery.Date.days($tree)"
q"_root_.com.spotify.scio.bigquery.Date($tree)"
case t if t =:= typeOf[LocalTime] =>
q"_root_.com.spotify.scio.bigquery.Time.micros($tree)"
q"_root_.com.spotify.scio.bigquery.Time($tree)"
case t if t =:= typeOf[LocalDateTime] =>
// LocalDateTime is read as avro string
// on write we should use `local-timestamp-micros`
q"_root_.com.spotify.scio.bigquery.DateTime.format($tree)"
q"_root_.com.spotify.scio.bigquery.DateTime($tree)"

// different than nested record match below, even though thore are case classes
case t if t =:= typeOf[Geography] =>
q"$tree.wkt"
case t if t =:= typeOf[Json] =>
q"$tree.wkt"
case t if t =:= typeOf[BigNumeric] =>
q"_root_.com.spotify.scio.bigquery.types.BigNumeric.bytes($tree)"
q"_root_.com.spotify.scio.bigquery.types.BigNumeric($tree.wkt).toString"

// nested records
case t if isCaseClass(c)(t) =>
Expand Down Expand Up @@ -377,7 +374,8 @@ private[types] object ConverterProvider {
case t if t =:= typeOf[Geography] =>
q"$tree.wkt"
case t if t =:= typeOf[Json] =>
q"$tree.wkt"
// for TableRow/json, use parsed JSON to prevent escaping
q"_root_.com.spotify.scio.bigquery.types.Json.parse($tree)"
case t if t =:= typeOf[BigNumeric] =>
// for TableRow/json, use string to avoid precision loss (like numeric)
q"$tree.wkt.toString"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private[types] object SchemaProvider {
def avroSchemaOf[T: TypeTag]: Schema =
AvroSchemaCache.get(
typeTag[T].tpe.toString,
BigQueryUtils.toGenericAvroSchema(typeTag[T].tpe.toString, schemaOf[T].getFields, true)
BigQueryUtils.toGenericAvroSchema(typeTag[T].tpe.toString, schemaOf[T].getFields)
)

def schemaOf[T: TypeTag]: TableSchema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ object SchemaUtil {
case "DATE" => "LocalDate"
case "TIME" => "LocalTime"
case "DATETIME" => "LocalDateTime"
case "GEOGRAPHY" => "Geography"
case "JSON" => "Json"
case "BIGNUMERIC" => "BigNumeric"
case "RECORD" | "STRUCT" => NameProvider.getUniqueName(tfs.getName)
case t => throw new IllegalArgumentException(s"Type: $t not supported")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ private[types] object TypeProvider {
q"override def schema: ${p(c, GModel)}.TableSchema = ${p(c, SUtil)}.parseSchema(${schema.toString})"
}
val defAvroSchema =
q"override def avroSchema: org.apache.avro.Schema = ${p(c, BigQueryUtils)}.toGenericAvroSchema(this.schema, true)"
q"override def avroSchema: org.apache.avro.Schema = ${p(c, BigQueryUtils)}.toGenericAvroSchema(${cName.toString}, this.schema.getFields)"
val defToPrettyString =
q"override def toPrettyString(indent: Int = 0): String = ${p(c, s"$SBQ.types.SchemaUtil")}.toPrettyString(this.schema, ${cName.toString}, indent)"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ package object types {
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);

def parse(json: Json): AnyRef = mapper.readValue(json.wkt, classOf[Object])
def apply(row: TableRow): Json = Json(mapper.writeValueAsString(row))
def parse(json: Json): TableRow = mapper.readValue(json.wkt, classOf[TableRow])
}

/**
Expand Down Expand Up @@ -116,8 +117,5 @@ package object types {
case b: ByteBuffer => new BigNumeric(DecimalConverter.fromBytes(b, null, DecimalLogicalType))
case _ => apply(value.toString)
}

def bytes(value: BigNumeric): ByteBuffer =
DecimalConverter.toBytes(value.wkt.bigDecimal, null, DecimalLogicalType)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ final class ConverterProviderSpec

import Schemas._

def arbBigDecimal(precision: Int, scale: Int): Arbitrary[BigDecimal] = Arbitrary {
val max = BigInt(10).pow(precision) - 1
Gen.choose(-max, max).map(BigDecimal(_, scale))
}

implicit val arbByteArray: Arbitrary[Array[Byte]] = Arbitrary(Gen.alphaStr.map(_.getBytes))
implicit val arbByteString: Arbitrary[ByteString] = Arbitrary(
Gen.alphaStr.map(ByteString.copyFromUtf8)
Expand All @@ -51,37 +46,19 @@ final class ConverterProviderSpec
implicit val arbDate: Arbitrary[LocalDate] = Arbitrary(Gen.const(LocalDate.now()))
implicit val arbTime: Arbitrary[LocalTime] = Arbitrary(Gen.const(LocalTime.now()))
implicit val arbDatetime: Arbitrary[LocalDateTime] = Arbitrary(Gen.const(LocalDateTime.now()))
implicit val arbNumericBigDecimal: Arbitrary[BigDecimal] =
arbBigDecimal(Numeric.MaxNumericPrecision, Numeric.MaxNumericScale)
implicit val arbGeography: Arbitrary[Geography] = Arbitrary(
implicit val arbNumericBigDecimal: Arbitrary[BigDecimal] = Arbitrary {
Arbitrary.arbBigDecimal.arbitrary
.retryUntil(_.precision <= Numeric.MaxNumericPrecision)
.map(Numeric.apply)
}
implicit val arbJson: Arbitrary[Json] = Arbitrary(
for {
x <- Gen.numChar
y <- Gen.numChar
} yield Geography(s"POINT($x $y)")
// f is a key field from TableRow. It cannot be used as column name
// see https://github.com/apache/beam/issues/33531
key <- Gen.alphaStr.retryUntil(_ != "f")
value <- Gen.alphaStr
} yield Json(s"""{"$key":"$value"}""")
)
implicit val arbJson: Arbitrary[Json] = Arbitrary {
import Arbitrary._
import Gen._
Gen
.oneOf(
// json object
alphaLowerStr.flatMap(str => arbInt.arbitrary.map(num => s"""{"$str":$num}""")),
// json array
alphaLowerStr.flatMap(str => arbInt.arbitrary.map(num => s"""["$str",$num]""")),
// json literals
alphaLowerStr.map(str => s""""$str""""),
arbInt.arbitrary.map(_.toString),
arbBool.arbitrary.map(_.toString),
Gen.const("null")
)
.map(wkt => Json(wkt))
}

implicit val arbBigNumeric: Arbitrary[BigNumeric] = Arbitrary {
// Precision: 76.76 (the 77th digit is partial)
arbBigDecimal(BigNumeric.MaxNumericPrecision - 1, BigNumeric.MaxNumericScale).arbitrary
.map(BigNumeric.apply)
}

implicit val eqByteArrays: Eq[Array[Byte]] = Eq.instance[Array[Byte]](_.toList == _.toList)
implicit val eqByteString: Eq[ByteString] = Eq.instance[ByteString](_ == _)
Expand Down Expand Up @@ -137,7 +114,6 @@ final class ConverterProviderSpec
o.bigDecimalF.isDefined shouldBe r.containsKey("bigDecimalF")
o.geographyF.isDefined shouldBe r.containsKey("geographyF")
o.jsonF.isDefined shouldBe r.containsKey("jsonF")
o.bigNumericF.isDefined shouldBe r.containsKey("bigNumericF")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@ class ConverterProviderTest extends AnyFlatSpec with Matchers {

it should "handle required json type" in {
val wkt = """{"name":"Alice","age":30}"""
val parsed = new TableRow()
.set("name", "Alice")
.set("age", 30)

RequiredJson.fromTableRow(TableRow("a" -> wkt)) shouldBe RequiredJson(Json(wkt))
BigQueryType.toTableRow[RequiredJson](RequiredJson(Json(wkt))) shouldBe TableRow("a" -> wkt)
RequiredJson.fromTableRow(TableRow("a" -> parsed)) shouldBe RequiredJson(Json(wkt))
BigQueryType.toTableRow[RequiredJson](RequiredJson(Json(wkt))) shouldBe TableRow("a" -> parsed)
}

it should "handle required big numeric type" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ class SchemaProviderTest extends AnyFlatSpec with Matchers {
| {"mode": "$mode", "name": "datetimeF", "type": "DATETIME"},
| {"mode": "$mode", "name": "bigDecimalF", "type": "NUMERIC"},
| {"mode": "$mode", "name": "geographyF", "type": "GEOGRAPHY"},
| {"mode": "$mode", "name": "jsonF", "type": "JSON"},
| {"mode": "$mode", "name": "bigNumericF", "type": "BIGNUMERIC"}
| {"mode": "$mode", "name": "jsonF", "type": "JSON"}
|]
|""".stripMargin

Expand Down
Loading

0 comments on commit 9e91a20

Please sign in to comment.