diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 477c4964e8..bd0921661a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -120,11 +120,11 @@ jobs: - name: Make target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: mkdir -p scio-bom/target scio-tensorflow/target site/target scio-cassandra/cassandra3/target scio-elasticsearch/es8/target scio-jdbc/target scio-macros/target scio-grpc/target scio-elasticsearch/common/target scio-test/target scio-avro/target scio-elasticsearch/es7/target scio-redis/target scio-extra/target scio-test/parquet/target scio-test/core/target scio-google-cloud-platform/target scio-smb/target scio-test/google-cloud-platform/target scio-neo4j/target scio-parquet/target scio-core/target scio-repl/target project/target + run: mkdir -p scio-bom/target scio-tensorflow/target site/target scio-cassandra/cassandra3/target scio-elasticsearch/es8/target scio-jdbc/target scio-macros/target scio-grpc/target scio-elasticsearch/common/target scio-test/target scio-avro/target scio-elasticsearch/es7/target scio-redis/target scio-extra/target scio-test/parquet/target scio-test/core/target scio-google-cloud-platform/target scio-smb/target scio-test/google-cloud-platform/target scio-neo4j/target scio-parquet/target scio-core/target scio-repl/target scio-managed/target project/target - name: Compress target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: tar cf targets.tar scio-bom/target scio-tensorflow/target site/target scio-cassandra/cassandra3/target scio-elasticsearch/es8/target scio-jdbc/target scio-macros/target scio-grpc/target scio-elasticsearch/common/target scio-test/target scio-avro/target scio-elasticsearch/es7/target scio-redis/target scio-extra/target scio-test/parquet/target scio-test/core/target scio-google-cloud-platform/target scio-smb/target scio-test/google-cloud-platform/target scio-neo4j/target scio-parquet/target scio-core/target scio-repl/target project/target + run: tar cf targets.tar scio-bom/target scio-tensorflow/target site/target scio-cassandra/cassandra3/target scio-elasticsearch/es8/target scio-jdbc/target scio-macros/target scio-grpc/target scio-elasticsearch/common/target scio-test/target scio-avro/target scio-elasticsearch/es7/target scio-redis/target scio-extra/target scio-test/parquet/target scio-test/core/target scio-google-cloud-platform/target scio-smb/target scio-test/google-cloud-platform/target scio-neo4j/target scio-parquet/target scio-core/target scio-repl/target scio-managed/target project/target - name: Upload target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') diff --git a/README.md b/README.md index c13c9467fe..58ffdf0a41 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,7 @@ Scio includes the following artifacts: - `scio-google-cloud-platform`: add-on for Google Cloud IO's: BigQuery, Bigtable, Pub/Sub, Datastore, Spanner - `scio-grpc`: add-on for gRPC service calls - `scio-jdbc`: add-on for JDBC IO +- `scio-managed`: add-on for Beam's managed transforms. Includes Iceberg - `scio-neo4j`: add-on for Neo4J IO - `scio-parquet`: add-on for Parquet - `scio-redis`: add-on for Redis diff --git a/build.sbt b/build.sbt index 7bb3bbc7df..f81948f050 100644 --- a/build.sbt +++ b/build.sbt @@ -90,6 +90,7 @@ val elasticsearch8Version = "8.15.1" val fansiVersion = "0.5.0" val featranVersion = "0.8.0" val httpAsyncClientVersion = "4.1.5" +val icebergVersion = "1.4.2" val jakartaJsonVersion = "2.1.3" val javaLshVersion = "0.12" val jedisVersion = "5.1.5" @@ -100,7 +101,7 @@ val kantanCodecsVersion = "0.5.3" val kantanCsvVersion = "0.7.0" val kryoVersion = "4.0.3" val magnoliaVersion = "1.1.10" -val magnolifyVersion = "0.7.4" +val magnolifyVersion = "0.7.4-34-a3708ba-SNAPSHOT" val metricsVersion = "4.2.27" val munitVersion = "1.0.1" val neo4jDriverVersion = "4.4.18" @@ -699,6 +700,7 @@ lazy val `scio-bom` = project `scio-grpc`, `scio-jdbc`, `scio-macros`, + `scio-managed`, `scio-neo4j`, `scio-parquet`, `scio-redis`, @@ -1171,6 +1173,23 @@ lazy val `scio-grpc` = project ) ) +lazy val `scio-managed` = project + .in(file("scio-managed")) + .dependsOn( + `scio-core` % "compile;test->test" + ) + .settings(commonSettings) + .settings( + description := "Scio add-on for Beam's managed transforms", + libraryDependencies ++= Seq( + // compile + "org.apache.beam" % "beam-sdks-java-core" % beamVersion, + "org.apache.beam" % "beam-sdks-java-managed" % beamVersion, + "com.spotify" %% "magnolify-beam" % magnolifyVersion + // test + ) + ) + lazy val `scio-jdbc` = project .in(file("scio-jdbc")) .dependsOn( @@ -1340,17 +1359,18 @@ lazy val `scio-examples` = project .enablePlugins(NoPublishPlugin) .disablePlugins(ScalafixPlugin) .dependsOn( - `scio-core` % "compile->test", `scio-avro` % "compile->test", + `scio-core` % "compile->test", + `scio-elasticsearch8`, + `scio-extra`, `scio-google-cloud-platform`, `scio-jdbc`, - `scio-extra`, - `scio-elasticsearch8`, + `scio-managed`, `scio-neo4j`, - `scio-tensorflow`, - `scio-smb`, + `scio-parquet`, `scio-redis`, - `scio-parquet` + `scio-smb`, + `scio-tensorflow` ) .settings(commonSettings) .settings(soccoSettings) @@ -1405,6 +1425,7 @@ lazy val `scio-examples` = project "com.mysql" % "mysql-connector-j" % "9.0.0", "com.softwaremill.magnolia1_2" %% "magnolia" % magnoliaVersion, "com.spotify" %% "magnolify-avro" % magnolifyVersion, + "com.spotify" %% "magnolify-beam" % magnolifyVersion, "com.spotify" %% "magnolify-bigtable" % magnolifyVersion, "com.spotify" %% "magnolify-datastore" % magnolifyVersion, "com.spotify" %% "magnolify-guava" % magnolifyVersion, @@ -1422,6 +1443,7 @@ lazy val `scio-examples` = project "org.apache.beam" % "beam-sdks-java-extensions-sql" % beamVersion, "org.apache.beam" % "beam-sdks-java-io-google-cloud-platform" % beamVersion, "org.apache.beam" % "beam-sdks-java-io-jdbc" % beamVersion, + "org.apache.beam" % "beam-sdks-java-managed" % beamVersion, "org.apache.hadoop" % "hadoop-common" % hadoopVersion, "org.apache.httpcomponents" % "httpcore" % httpCoreVersion, "org.apache.parquet" % "parquet-column" % parquetVersion, @@ -1690,6 +1712,7 @@ lazy val integration = project `scio-extra` % "test->test", `scio-google-cloud-platform` % "compile;test->test", `scio-jdbc` % "compile;test->test", + `scio-managed` % "test->test", `scio-neo4j` % "test->test", `scio-smb` % "test->provided,test" ) @@ -1736,7 +1759,13 @@ lazy val integration = project "com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion % Test, "com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion % Test, "com.spotify" %% "magnolify-datastore" % magnolifyVersion % Test, - "org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % beamVersion % Test + "org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % beamVersion % Test, + "org.apache.beam" % "beam-sdks-java-io-iceberg" % beamVersion % Test, + "org.apache.iceberg" % "iceberg-common" % icebergVersion % Test, + "org.apache.iceberg" % "iceberg-core" % icebergVersion % Test, + "org.apache.iceberg" % "iceberg-parquet" % icebergVersion % Test, + "org.apache.parquet" % "parquet-common" % parquetVersion % Test, + "org.apache.parquet" % "parquet-column" % parquetVersion % Test ) ) @@ -1764,6 +1793,7 @@ lazy val site = project `scio-grpc` % "compile->test", `scio-jdbc`, `scio-macros`, + `scio-managed`, `scio-neo4j`, `scio-parquet`, `scio-redis`, diff --git a/integration/src/test/scala/com/spotify/scio/iceberg/IcebergIOIT.scala b/integration/src/test/scala/com/spotify/scio/iceberg/IcebergIOIT.scala new file mode 100644 index 0000000000..f2f7fa2c0f --- /dev/null +++ b/integration/src/test/scala/com/spotify/scio/iceberg/IcebergIOIT.scala @@ -0,0 +1,95 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio.iceberg + +import com.dimafeng.testcontainers.{ForAllTestContainer, GenericContainer} +import com.spotify.scio.testing.PipelineSpec +import magnolify.beam._ +import org.apache.iceberg.catalog.{Namespace, TableIdentifier} +import org.apache.iceberg.rest.RESTCatalog +import org.apache.iceberg.types.Types.{IntegerType, NestedField, StringType} +import org.apache.iceberg.{CatalogProperties, CatalogUtil, PartitionSpec, Schema} +import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy + +import java.time.Duration +import java.io.File +import java.nio.file.Files +import scala.jdk.CollectionConverters._ + +case class IcebergIOITRecord(a: Int, b: String) +object IcebergIOITRecord { + implicit val icebergIOITRecordRowType: RowType[IcebergIOITRecord] = RowType[IcebergIOITRecord] +} + +class IcebergIOIT extends PipelineSpec with ForAllTestContainer { + val ContainerPort = 8181 + val CatalogName = "iceberg_it" + val NamespaceName = "iceberg_it_ns" + val TableName = s"${NamespaceName}.iceberg_records" + + lazy val tempDir: File = { + val t = Files.createTempDirectory("iceberg-it").toFile + t.deleteOnExit() + t + } + + override val container: GenericContainer = + GenericContainer( + GenericContainer.stringToDockerImage("tabulario/iceberg-rest:1.6.0"), + exposedPorts = Seq(ContainerPort), + waitStrategy = new HostPortWaitStrategy() + .forPorts(ContainerPort) + .withStartupTimeout(Duration.ofSeconds(180)) + ) + + lazy val uri = s"http://${container.containerIpAddress}:${container.mappedPort(ContainerPort)}" + + override def afterStart(): Unit = { + val cat = new RESTCatalog() + cat.initialize(CatalogName, Map("uri" -> uri).asJava) + + cat.createNamespace(Namespace.of(NamespaceName)) + cat.createTable( + TableIdentifier.parse(TableName), + new Schema( + NestedField.required(0, "a", IntegerType.get()), + NestedField.required(1, "b", StringType.get()) + ), + PartitionSpec.unpartitioned() + ) + } + + "IcebergIO" should "work" in { + val catalogProperties = Map( + CatalogUtil.ICEBERG_CATALOG_TYPE -> CatalogUtil.ICEBERG_CATALOG_TYPE_REST, + CatalogProperties.URI -> uri + ) + val elements = 1.to(10).map(i => IcebergIOITRecord(i, s"$i")) + + runWithRealContext() { sc => + sc.parallelize(elements) + .saveAsIceberg(TableName, catalogProperties = catalogProperties) + } + + runWithRealContext() { sc => + sc.iceberg[IcebergIOITRecord]( + TableName, + catalogProperties = catalogProperties + ) should containInAnyOrder(elements) + } + } +} diff --git a/scio-core/src/main/scala/com/spotify/scio/util/FilenamePolicySupplier.scala b/scio-core/src/main/scala/com/spotify/scio/util/FilenamePolicySupplier.scala index d66e1f3ec2..decf98bf41 100644 --- a/scio-core/src/main/scala/com/spotify/scio/util/FilenamePolicySupplier.scala +++ b/scio-core/src/main/scala/com/spotify/scio/util/FilenamePolicySupplier.scala @@ -23,6 +23,7 @@ import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions import org.apache.beam.sdk.io.fs.ResourceId import org.apache.beam.sdk.transforms.windowing.{BoundedWindow, PaneInfo} +@FunctionalInterface trait FilenamePolicySupplier { def apply(path: String, suffix: String): FilenamePolicy } diff --git a/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala b/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala index f7f3221493..ab03774f07 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala @@ -1737,15 +1737,15 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] { } /** - * Generic write method for all `ScioIO[T]` implementations, if it is test pipeline this will + * Generic write method for all `ScioIO[T]` implementations, if it is a test pipeline this will * evaluate pre-registered output IO implementation which match for the passing `ScioIO[T]` - * implementation. if not this will invoke [[com.spotify.scio.io.ScioIO[T]#write]] method along - * with write configurations passed by. + * implementation. If not, this will invoke [[com.spotify.scio.io.ScioIO[T]#write]] with the + * provided write configuration. * * @param io * an implementation of `ScioIO[T]` trait * @param params - * configurations need to pass to perform underline write implementation + * configurations need to pass to perform underlying write implementation */ def write(io: ScioIO[T])(params: io.WriteP): ClosedTap[io.tapT.T] = io.writeWithContext(this, params) diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/IcebergExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/IcebergExample.scala new file mode 100644 index 0000000000..b3ffc386e5 --- /dev/null +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/IcebergExample.scala @@ -0,0 +1,66 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio.examples.extra + +import com.spotify.scio.ContextAndArgs +import com.spotify.scio.iceberg._ +import magnolify.beam._ + +// Example: Apache Iceberg read/write Example + +// Usage: + +// `sbt "runMain com.spotify.scio.examples.extra.IcebergExample +// --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] +// --inputTable=[INPUT TABLE] --catalogName=[CATALOG NAME] +// --catalogType=[CATALOG TYPE] --catalogUri=[CATALOG URI] +// --catalogWarehouse=[CATALOG WAREHOUSE] --outputTable=[OUTPUT TABLE]"` +object IcebergExample { + + case class Record(a: Int, b: String) + + def main(cmdlineArgs: Array[String]): Unit = { + val (sc, args) = ContextAndArgs(cmdlineArgs) + + // Catalog configuration + val catalogConfig = Map( + "type" -> args("catalogType"), + "uri" -> args("catalogUri"), + "warehouse" -> args("catalogWarehouse") + ) + + // Derive a conversion between Record and Beam Row + implicit val rt: RowType[Record] = RowType[Record] + + sc + // Read Records from Iceberg + .iceberg[Record]( + args("inputTable"), + args.optional("catalogName").orNull, + catalogConfig + ) + .map(r => r.copy(a = r.a + 1)) + // Write Records to Iceberg + .saveAsIceberg( + args("outputTable"), + args.optional("catalogName").orNull, + catalogConfig + ) + + sc.run() + } +} diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/ManagedExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/ManagedExample.scala new file mode 100644 index 0000000000..efe825e617 --- /dev/null +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/ManagedExample.scala @@ -0,0 +1,78 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio.examples.extra + +import com.spotify.scio.ContextAndArgs +import com.spotify.scio.coders.Coder +import com.spotify.scio.managed._ +import com.spotify.scio.values.SCollection +import magnolify.beam._ +import org.apache.beam.sdk.managed.Managed +import org.apache.beam.sdk.values.Row + +// Example: Beam's Managed IO + +// Usage: + +// `sbt "runMain com.spotify.scio.examples.extra.ManagedExample +// --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] +// --table=[TABLE] --catalogName=[CATALOG] --catalogType=[CATALOG TYPE] +// --catalogUri=[CATALOG URI] --catalogWarehouse=[CATALOG WAREHOUSE] +// --output=[OUTPUT PATH]"` +object ManagedExample { + + case class Record(a: Int, b: String) + + def main(cmdlineArgs: Array[String]): Unit = { + val (sc, args) = ContextAndArgs(cmdlineArgs) + + val config: Map[String, Object] = Map( + "table" -> args("table"), + "catalog_name" -> args("catalogName"), + "catalog_properties" -> + Map( + "type" -> args("catalogType"), + "uri" -> args("catalogUri"), + "warehouse" -> args("catalogWarehouse") + ) + ) + + val rt = RowType[Record] + // Provide an implicit coder for Row with the schema derived from Record case class + implicit val recordRowCoder: Coder[Row] = Coder.row(rt.schema) + + // Read beam Row instances from iceberg + val records: SCollection[Record] = sc + .managed( + Managed.ICEBERG, + // Schema derived from the Record case class + rt.schema, + config + ) + // Convert the Row instance to a Record + .map(rt.apply) + + records + .map(r => r.copy(a = r.a + 1)) + // Convert the Record to a Row + .map(rt.apply) + // Save Row instances to Iceberg + .saveAsManaged(Managed.ICEBERG, config) + + sc.run() + } +} diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/RedisExamples.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/RedisExamples.scala index bbc712b7ae..a1c476903b 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/RedisExamples.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/RedisExamples.scala @@ -70,7 +70,7 @@ object RedisReadStringsExample { // `sbt "runMain com.spotify.scio.examples.extra.RedisWriteBatchExample // --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] // --redisHost=[REDIS_HOST] -// --redisPort=[REDIS_PORT]` +// --redisPort=[REDIS_PORT]"` object RedisWriteBatchExample { def main(cmdlineArgs: Array[String]): Unit = { @@ -102,7 +102,7 @@ object RedisWriteBatchExample { // --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] // --subscription=[PUBSUB_SUBSCRIPTION] // --redisHost=[REDIS_HOST] -// --redisPort=[REDIS_PORT]` +// --redisPort=[REDIS_PORT]"` object RedisWriteStreamingExample { def main(cmdlineArgs: Array[String]): Unit = { @@ -139,7 +139,7 @@ object RedisWriteStreamingExample { // `sbt "runMain com.spotify.scio.examples.extra.RedisLookUpStringsExample // --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] // --redisHost=[REDIS_HOST] -// --redisPort=[REDIS_PORT]` +// --redisPort=[REDIS_PORT]"` object RedisLookUpStringsExample { def main(cmdlineArgs: Array[String]): Unit = { diff --git a/scio-managed/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala b/scio-managed/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala new file mode 100644 index 0000000000..4343cd3244 --- /dev/null +++ b/scio-managed/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala @@ -0,0 +1,83 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio.iceberg + +import com.spotify.scio.ScioContext +import com.spotify.scio.coders.Coder +import com.spotify.scio.io.{EmptyTap, EmptyTapOf, ScioIO, Tap, TapT} +import com.spotify.scio.values.SCollection +import magnolify.beam.RowType +import org.apache.beam.sdk.managed.Managed +import com.spotify.scio.managed.ManagedIO +import org.apache.beam.sdk.coders.RowCoder +import org.apache.beam.sdk.values.Row + +final case class IcebergIO[T: RowType: Coder](table: String, catalogName: Option[String]) + extends ScioIO[T] { + override type ReadP = IcebergIO.ReadParam + override type WriteP = IcebergIO.WriteParam + override val tapT: TapT.Aux[T, Nothing] = EmptyTapOf[T] + + private lazy val rowType: RowType[T] = implicitly + private lazy val beamRowCoder: RowCoder = RowCoder.of(rowType.schema) + implicit private lazy val rowCoder: Coder[Row] = Coder.beam(beamRowCoder) + + override def testId: String = s"IcebergIO(${(Some(table) ++ catalogName).mkString(", ")})" + + private def config( + catalogProperties: Map[String, String], + configProperties: Map[String, String] + ): Map[String, AnyRef] = { + val b = Map.newBuilder[String, AnyRef] + b += ("table" -> table) + catalogName.foreach(name => b += ("catalog_name" -> name)) + Option(catalogProperties).foreach(p => b += ("catalog_properties" -> p)) + Option(configProperties).foreach(p => b += ("config_properties" -> p)) + b.result() + } + + override protected def read(sc: ScioContext, params: IcebergIO.ReadParam): SCollection[T] = { + val io = ManagedIO(Managed.ICEBERG, config(params.catalogProperties, params.configProperties)) + sc.transform(_.read(io)(ManagedIO.ReadParam(rowType.schema)).map(rowType.from)) + } + + override protected def write(data: SCollection[T], params: IcebergIO.WriteParam): Tap[tapT.T] = { + val io = ManagedIO(Managed.ICEBERG, config(params.catalogProperties, params.configProperties)) + data.map(rowType.to).setCoder(beamRowCoder).write(io).underlying + } + + override def tap(read: IcebergIO.ReadParam): Tap[tapT.T] = EmptyTap +} + +object IcebergIO { + case class ReadParam private ( + catalogProperties: Map[String, String] = ReadParam.DefaultCatalogProperties, + configProperties: Map[String, String] = ReadParam.DefaultConfigProperties + ) + object ReadParam { + val DefaultCatalogProperties: Map[String, String] = null + val DefaultConfigProperties: Map[String, String] = null + } + case class WriteParam private ( + catalogProperties: Map[String, String] = WriteParam.DefaultCatalogProperties, + configProperties: Map[String, String] = WriteParam.DefaultConfigProperties + ) + object WriteParam { + val DefaultCatalogProperties: Map[String, String] = null + val DefaultConfigProperties: Map[String, String] = null + } +} diff --git a/scio-managed/src/main/scala/com/spotify/scio/iceberg/package.scala b/scio-managed/src/main/scala/com/spotify/scio/iceberg/package.scala new file mode 100644 index 0000000000..579f505a1a --- /dev/null +++ b/scio-managed/src/main/scala/com/spotify/scio/iceberg/package.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio + +import com.spotify.scio.iceberg.syntax.{SCollectionSyntax, ScioContextSyntax} + +/** + * Iceberg IO APIs. Import all. + * + * {{{ + * import com.spotify.scio.iceberg._ + * }}} + */ +package object iceberg extends ScioContextSyntax with SCollectionSyntax diff --git a/scio-managed/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergSCollectionSyntax.scala b/scio-managed/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergSCollectionSyntax.scala new file mode 100644 index 0000000000..86e3c6e007 --- /dev/null +++ b/scio-managed/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergSCollectionSyntax.scala @@ -0,0 +1,42 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio.iceberg.syntax + +import com.spotify.scio.coders.Coder +import com.spotify.scio.iceberg.IcebergIO +import com.spotify.scio.io.ClosedTap +import com.spotify.scio.values.SCollection +import magnolify.beam.RowType + +class IcebergSCollectionSyntax[T: RowType: Coder](self: SCollection[T]) { + def saveAsIceberg( + table: String, + catalogName: String = null, + catalogProperties: Map[String, String] = IcebergIO.WriteParam.DefaultCatalogProperties, + configProperties: Map[String, String] = IcebergIO.WriteParam.DefaultConfigProperties + ): ClosedTap[Nothing] = { + val params = IcebergIO.WriteParam(catalogProperties, configProperties) + self.write(IcebergIO(table, Option(catalogName)))(params) + } +} + +trait SCollectionSyntax { + implicit def icebergSCollectionSyntax[T: RowType: Coder]( + self: SCollection[T] + ): IcebergSCollectionSyntax[T] = + new IcebergSCollectionSyntax(self) +} diff --git a/scio-managed/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergScioContextSyntax.scala b/scio-managed/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergScioContextSyntax.scala new file mode 100644 index 0000000000..21ce3b698e --- /dev/null +++ b/scio-managed/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergScioContextSyntax.scala @@ -0,0 +1,45 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio.iceberg.syntax + +import com.spotify.scio.ScioContext +import com.spotify.scio.coders.Coder +import com.spotify.scio.iceberg.IcebergIO +import com.spotify.scio.values.SCollection +import magnolify.beam.RowType + +class IcebergScioContextSyntax(self: ScioContext) { + + /** + * @see + * [[org.apache.beam.sdk.io.iceberg.SchemaTransformConfiguration SchemaTransformConfiguration]] + */ + def iceberg[T: Coder]( + table: String, + catalogName: String = null, + catalogProperties: Map[String, String] = IcebergIO.ReadParam.DefaultCatalogProperties, + configProperties: Map[String, String] = IcebergIO.ReadParam.DefaultConfigProperties + )(implicit rt: RowType[T]): SCollection[T] = { + val params = IcebergIO.ReadParam(catalogProperties, configProperties) + self.read(IcebergIO(table, Option(catalogName)))(params) + } +} + +trait ScioContextSyntax { + implicit def icebergScioContextSyntax(self: ScioContext): IcebergScioContextSyntax = + new IcebergScioContextSyntax(self) +} diff --git a/scio-managed/src/main/scala/com/spotify/scio/managed/ManagedIO.scala b/scio-managed/src/main/scala/com/spotify/scio/managed/ManagedIO.scala new file mode 100644 index 0000000000..557bed16f2 --- /dev/null +++ b/scio-managed/src/main/scala/com/spotify/scio/managed/ManagedIO.scala @@ -0,0 +1,71 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio.managed + +import com.spotify.scio.ScioContext +import com.spotify.scio.io.{EmptyTap, EmptyTapOf, ScioIO, Tap, TapT} +import com.spotify.scio.values.SCollection +import org.apache.beam.sdk.managed.Managed +import org.apache.beam.sdk.schemas.Schema +import org.apache.beam.sdk.values.{PCollectionRowTuple, Row} +import scala.jdk.CollectionConverters._ + +final case class ManagedIO(ioName: String, config: Map[String, Object]) extends ScioIO[Row] { + override type ReadP = ManagedIO.ReadParam + override type WriteP = ManagedIO.WriteParam + override val tapT: TapT.Aux[Row, Nothing] = EmptyTapOf[Row] + + private lazy val _config: java.util.Map[String, Object] = { + // recursively convert this yaml-compatible nested scala map to java map + // we either do this or the user has to create nested java maps in scala code + // both are bad + def _convert(a: Object): Object = { + a match { + case m: Map[_, _] => + m.asInstanceOf[Map[_, Object]].map { case (k, v) => k -> _convert(v) }.asJava + case i: Iterable[_] => i.map(o => _convert(o.asInstanceOf[Object])).asJava + case _ => a + } + } + config.map { case (k, v) => k -> _convert(v) }.asJava + } + + // not-ideal IO naming, but we have no identifier except the config map + override def testId: String = s"ManagedIO($ioName, ${config.toString})" + override protected def read(sc: ScioContext, params: ManagedIO.ReadParam): SCollection[Row] = { + sc.wrap( + sc.applyInternal[PCollectionRowTuple]( + Managed.read(ioName).withConfig(_config) + ).getSinglePCollection + ) + } + + override protected def write( + data: SCollection[Row], + params: ManagedIO.WriteParam + ): Tap[tapT.T] = { + data.applyInternal(Managed.write(ioName).withConfig(_config)) + EmptyTap + } + + override def tap(read: ManagedIO.ReadParam): Tap[tapT.T] = EmptyTap +} + +object ManagedIO { + final case class ReadParam(schema: Schema) + type WriteParam = Unit +} diff --git a/scio-managed/src/main/scala/com/spotify/scio/managed/package.scala b/scio-managed/src/main/scala/com/spotify/scio/managed/package.scala new file mode 100644 index 0000000000..dd0def4dd3 --- /dev/null +++ b/scio-managed/src/main/scala/com/spotify/scio/managed/package.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio + +import com.spotify.scio.managed.syntax.{SCollectionSyntax, ScioContextSyntax} + +/** + * Managed IO APIs. Import all. + * + * {{{ + * import com.spotify.scio.managed._ + * }}} + */ +package object managed extends ScioContextSyntax with SCollectionSyntax diff --git a/scio-managed/src/main/scala/com/spotify/scio/managed/syntax/ManagedSCollectionSyntax.scala b/scio-managed/src/main/scala/com/spotify/scio/managed/syntax/ManagedSCollectionSyntax.scala new file mode 100644 index 0000000000..076c112c03 --- /dev/null +++ b/scio-managed/src/main/scala/com/spotify/scio/managed/syntax/ManagedSCollectionSyntax.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio.managed.syntax + +import com.spotify.scio.io.ClosedTap +import com.spotify.scio.managed.ManagedIO +import com.spotify.scio.values.SCollection +import org.apache.beam.sdk.values.Row + +class ManagedSCollectionSyntax(self: SCollection[Row]) { + def saveAsManaged(sink: String, config: Map[String, AnyRef] = Map.empty): ClosedTap[Nothing] = + self.write(ManagedIO(sink, config)) +} + +trait SCollectionSyntax { + implicit def managedSCollectionSyntax(self: SCollection[Row]): ManagedSCollectionSyntax = + new ManagedSCollectionSyntax(self) +} diff --git a/scio-managed/src/main/scala/com/spotify/scio/managed/syntax/ManagedScioContextSyntax.scala b/scio-managed/src/main/scala/com/spotify/scio/managed/syntax/ManagedScioContextSyntax.scala new file mode 100644 index 0000000000..a00b7f45b2 --- /dev/null +++ b/scio-managed/src/main/scala/com/spotify/scio/managed/syntax/ManagedScioContextSyntax.scala @@ -0,0 +1,37 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio.managed.syntax + +import com.spotify.scio.ScioContext +import com.spotify.scio.managed.ManagedIO +import com.spotify.scio.values.SCollection +import org.apache.beam.sdk.schemas.Schema +import org.apache.beam.sdk.values.Row + +class ManagedScioContextSyntax(self: ScioContext) { + def managed( + source: String, + schema: Schema, + config: Map[String, Object] = Map.empty + ): SCollection[Row] = + self.read[Row](ManagedIO(source, config))(ManagedIO.ReadParam(schema)) +} + +trait ScioContextSyntax { + implicit def managedScioContextSyntax(self: ScioContext): ManagedScioContextSyntax = + new ManagedScioContextSyntax(self) +} diff --git a/site/src/main/paradox/io/Iceberg.md b/site/src/main/paradox/io/Iceberg.md new file mode 100644 index 0000000000..c01adcd865 --- /dev/null +++ b/site/src/main/paradox/io/Iceberg.md @@ -0,0 +1,47 @@ +# Iceberg + +Scio supports reading from and writing to [Apache Iceberg](https://iceberg.apache.org/) via Beam's @ref[Managed transforms](Managed.md). +[Magnolify's](https://github.com/spotify/magnolify) `RowType` (available as part of the `magnolify-beam` artifact) provides automatically-derived mappings between scala case classes and Beam's `Row`, used by the underlying managed transform. See [full documentation here](https://github.com/spotify/magnolify/blob/main/docs/beam.md). + +To read: + +```scala mdoc:compile-only +import com.spotify.scio.ScioContext +import com.spotify.scio.iceberg._ +import com.spotify.scio.values.SCollection +import magnolify.beam._ + +case class Record(a: Int, b: String) +implicit val rt: RowType[Record] = RowType[Record] + +val sc: ScioContext = ??? +val table: String = ??? +val catalogName: String = ??? +val catalogConfig: Map[String, String] = ??? + +val records: SCollection[Record] = sc.iceberg[Record]( + table, + catalogName, + catalogConfig +) +``` + +To write: + +```scala mdoc:invisible +import com.spotify.scio.iceberg._ +import com.spotify.scio.values.SCollection +import magnolify.beam._ +case class Record(a: Int, b: String) +implicit val rt: RowType[Record] = RowType[Record] +``` + +```scala mdoc:compile-only +val records: SCollection[Record] = ??? + +val table: String = ??? +val catalogName: String = ??? +val catalogConfig: Map[String, String] = ??? + +records.saveAsIceberg(table, catalogName, catalogConfig) +``` diff --git a/site/src/main/paradox/io/Managed.md b/site/src/main/paradox/io/Managed.md new file mode 100644 index 0000000000..c6f2f6275a --- /dev/null +++ b/site/src/main/paradox/io/Managed.md @@ -0,0 +1,74 @@ +# Managed IO + +Beam's Managed transforms move responsibility for the creation of transform classes from user code to the runner, allowing runner-specific optimizations like hot-swapping an instance of a transform with an updated one. +Beam currently supports Iceberg and Kafka managed transforms. +See also [Dataflow's supported transforms](https://cloud.google.com/dataflow/docs/guides/managed-io). + +A Scio @ref:[Coder](../internals/Coders.md) must be defined for the Beam @javadoc[Row](org.apache.beam.sdk.values.Row), derived from the Beam @javadoc[Schema](org.apache.beam.sdk.schemas.Schema) expected from the datasource. +If you have more than one type of data being read into Beam Rows, you will need to provide the coders explicitly instead of implicitly. + +The source and sink parameters should be imported from Beam's @javadoc[Managed](org.apache.beam.sdk.managed.Managed). + +```scala mdoc:compile-only +import com.spotify.scio.ScioContext +import com.spotify.scio.coders.Coder +import com.spotify.scio.managed._ +import com.spotify.scio.values.SCollection +import org.apache.beam.sdk.managed.Managed +import org.apache.beam.sdk.schemas.Schema +import org.apache.beam.sdk.values.Row + +val sc: ScioContext = ??? + +val rowSchema: Schema = ??? +implicit val rowCoder: Coder[Row] = Coder.row(rowSchema) + +val config: Map[String, Object] = ??? +val rows: SCollection[Row] = sc.managed(Managed.ICEBERG, rowSchema, config) +``` + +Saving data to a Managed IO is similar: +```scala mdoc:invisible +import com.spotify.scio.managed._ +import com.spotify.scio.coders.Coder +import com.spotify.scio.values.SCollection +import org.apache.beam.sdk.managed.Managed +import org.apache.beam.sdk.schemas.Schema +import org.apache.beam.sdk.values.Row +``` + +```scala mdoc:compile-only +val rows: SCollection[Row] = ??? +val config: Map[String, Object] = ??? + +rows.saveAsManaged(Managed.ICEBERG, config) +``` + +[Magnolify's](https://github.com/spotify/magnolify) `RowType` (available as part of the `magnolify-beam` artifact) provides automatically-derived mappings between Beam's `Row` and scala case classes. See [full documentation here](https://github.com/spotify/magnolify/blob/main/docs/beam.md). + +```scala mdoc:invisible +import com.spotify.scio.ScioContext +import com.spotify.scio.managed._ +import org.apache.beam.sdk.managed.Managed +import org.apache.beam.sdk.schemas.Schema +import org.apache.beam.sdk.values.Row +``` + +```scala mdoc:compile-only +import magnolify.beam._ + +val config: Map[String, Object] = ??? + +case class Record(a: Int, b: String) +val rt = RowType[Record] +implicit val recordRowCoder: Coder[Row] = Coder.row(rt.schema) + +val sc: ScioContext = ??? +sc.managed(Managed.ICEBERG, rt.schema, config) + // convert the Row instance to a Record + .map(rt.apply) + .map(r => r.copy(a = r.a + 1)) + // convert the Record to a Row + .map(rt.apply) + .saveAsManaged(Managed.ICEBERG, config) +``` diff --git a/site/src/main/paradox/io/Neo4J.md b/site/src/main/paradox/io/Neo4J.md index 8a932965fc..40d4977d46 100644 --- a/site/src/main/paradox/io/Neo4J.md +++ b/site/src/main/paradox/io/Neo4J.md @@ -1,6 +1,6 @@ # Neo4J -Scio provides support [Neo4J](https://neo4j.com/) in the `scio-neo4j` artifact. +Scio provides support for [Neo4J](https://neo4j.com/) in the `scio-neo4j` artifact. Scio uses [magnolify's](https://github.com/spotify/magnolify) `magnolify-neo4j` to convert to and from Neo4J types. diff --git a/site/src/main/paradox/io/index.md b/site/src/main/paradox/io/index.md index 72a47d427a..422f8a6023 100644 --- a/site/src/main/paradox/io/index.md +++ b/site/src/main/paradox/io/index.md @@ -11,10 +11,12 @@ * @ref:[Cassandra](Cassandra.md) * @ref:[CSV](Csv.md) * @ref:[Datastore](Datastore.md) -* @ref:[Grpc](Grpc.md) * @ref:[Elasticsearch](Elasticsearch.md) +* @ref:[Grpc](Grpc.md) +* @ref:[Iceberg](Iceberg.md) * @ref:[JDBC](Jdbc.md) * @ref:[Json](Json.md) +* @ref:[Managed](Managed.md) * @ref:[Neo4J](Neo4J.md) * @ref:[Object](Object.md) * @ref:[Parquet](Parquet.md)