Skip to content

Commit

Permalink
Series/2.x CE interop module (#316)
Browse files Browse the repository at this point in the history
  • Loading branch information
googley42 authored Nov 11, 2023
1 parent 388ca9c commit e4cc3ef
Show file tree
Hide file tree
Showing 7 changed files with 402 additions and 7 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
fail-fast: false
matrix:
java: ["[email protected]", "[email protected]"]
scala: ["3.2.1", "2.13.8", "2.12.17"]
scala: ["3.3.0", "2.13.8", "2.12.17"]
steps:
- uses: actions/[email protected]
- uses: olafurpg/setup-scala@v13
Expand Down
18 changes: 15 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,29 @@ Under the hood we use the excellent [ZIO AWS](https://zio.dev/zio-aws) library f

## Installation

To use ZIO DynamoDB, we need to add the following lines to our `build.sbt` file:
To use ZIO DynamoDB, we need to add the following line to our `build.sbt` file:

```scala
libraryDependencies ++= Seq(
"dev.zio" %% "zio-dynamodb" % "0.2.11"
"dev.zio" %% "zio-dynamodb" % "0.2.13"
)
```

### Cats Effect Interop

To use the new Cats Effect 3 interop module, we need to also add the following line to our `build.sbt` file:

```scala
libraryDependencies ++= Seq(
"dev.zio" %% "zio-dynamodb-ce" % "0.2.13"
)
```

For CE interop examples please see [examples sbt module](examples/src/main/scala/zio/dynamodb/examples/dynamodblocal/interop/CeInteropExample.scala).

## Example

For examples please see [examples sbt module](../examples/src/main/scala/zio/dynamodb/examples). Below is `Main.scala` from that module:
For examples please see examples sbt module. Below is `Main.scala` from that module:

```scala
import zio.aws.core.config
Expand Down
28 changes: 26 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@ val zioVersion = "2.0.13"
val zioAwsVersion = "5.20.42.1"
val zioSchemaVersion = "0.4.15"
val zioPreludeVersion = "1.0.0-RC19"
val zioInteropCats3Version = "23.0.0.8"
val catsEffect3Version = "3.5.1"
val fs2Version = "3.9.2"

lazy val root =
project
.in(file("."))
.settings(skip in publish := true)
.aggregate(zioDynamodb, examples /*, docs */ )
.aggregate(zioDynamodb, zioDynamodbCe, examples /*, docs */ )

lazy val zioDynamodb = module("zio-dynamodb", "dynamodb")
.enablePlugins(BuildInfoPlugin)
Expand Down Expand Up @@ -267,13 +270,34 @@ lazy val examples = module("zio-dynamodb-examples", "examples")
skip in publish := true,
fork := true,
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-effect" % catsEffect3Version,
"co.fs2" %% "fs2-core" % fs2Version,
"dev.zio" %% "zio-test" % zioVersion % "test",
"dev.zio" %% "zio-test-sbt" % zioVersion % "test",
"software.amazon.awssdk" % "dynamodb" % "2.17.295"
),
testFrameworks := Seq(new TestFramework("zio.test.sbt.ZTestFramework"))
)
.dependsOn(zioDynamodb)
.dependsOn(zioDynamodb, zioDynamodbCe)

lazy val zioDynamodbCe =
module("zio-dynamodb-ce", "interop/dynamodb-ce")
.enablePlugins(BuildInfoPlugin)
.settings(buildInfoSettings("zio.dynamodb"))
.configs(IntegrationTest)
.settings(
resolvers += Resolver.sonatypeRepo("releases"),
fork := true,
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-effect" % catsEffect3Version,
"co.fs2" %% "fs2-core" % fs2Version,
"dev.zio" %% "zio-test" % zioVersion % "test",
"dev.zio" %% "zio-test-sbt" % zioVersion % "test",
"dev.zio" %% "zio-interop-cats" % zioInteropCats3Version
),
testFrameworks := Seq(new TestFramework("zio.test.sbt.ZTestFramework"))
)
.dependsOn(zioDynamodb)

def module(moduleName: String, fileName: String): Project =
Project(moduleName, file(fileName))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package zio.dynamodb.examples.dynamodblocal.interop

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.regions.Region
import zio.dynamodb.DynamoDBQuery.{ createTable, deleteTable, get, put }

import cats.effect.std.Console
import cats.effect.IO
import cats.effect.IOApp
import cats.syntax.all._

import java.net.URI

import zio.dynamodb.interop.ce.syntax._
import zio.dynamodb.ProjectionExpression
import zio.schema.DeriveSchema
import zio.schema.Schema
import zio.dynamodb.KeySchema
import zio.dynamodb.BillingMode
import zio.dynamodb.AttributeDefinition
import zio.dynamodb.DynamoDBQuery
import cats.effect.kernel.Async

/**
* example cats effect interop application
*
* to run in the sbt console:
* {{{
* zio-dynamodb-examples/runMain zio.dynamodb.examples.dynamodblocal.CeInteropExample
* }}}
*/
object CeInteropExample extends IOApp.Simple {

final case class Person(id: String, name: String)
object Person {
implicit val schema: Schema.CaseClass2[String, String, Person] = DeriveSchema.gen[Person]
val (id, name) = ProjectionExpression.accessors[Person]
}

def program[F[_]](implicit F: Async[F]) = {
val console = Console.make[F]

for {
_ <- DynamoDBExceutorF
.ofCustomised[F] { builder => // note only AWS SDK model is exposed here, not zio.aws
builder
.endpointOverride(URI.create("http://localhost:8000"))
.region(Region.US_EAST_1)
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("dummy", "dummy")))
}
.use { implicit dynamoDBExecutorF => // To use extension method "executeToF" we need implicit here
for {
_ <- createTable("Person", KeySchema("id"), BillingMode.PayPerRequest)(
AttributeDefinition.attrDefnString("id")
).executeToF
_ <- put(tableName = "Person", Person(id = "avi", name = "Avinder")).executeToF
result <- get(tableName = "Person")(Person.id.partitionKey === "avi").executeToF
_ <- console.println(s"found=$result")
fs2Stream <- DynamoDBQuery
.scanAll[Person](tableName = "Person")
.parallel(50) // server side parallel scan
.filter(Person.name.beginsWith("Avi") && Person.name.contains("de"))
.executeToF
_ <- fs2Stream.evalTap(person => console.println(s"person=$person")).compile.drain
_ <- deleteTable("Person").executeToF
} yield ()
}
} yield ()
}

val run = program[IO]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package zio.dynamodb.examples.dynamodblocal.interop

import cats.effect.IO
import cats.effect.IOApp
import cats.effect.kernel.Async
import cats.effect.std.Console
import cats.effect.std.Dispatcher
import cats.syntax.all._
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.regions.Region
import zio.dynamodb.AttributeDefinition
import zio.dynamodb.BillingMode
import zio.dynamodb.DynamoDBQuery._
import zio.dynamodb.KeySchema
import zio.dynamodb.PrimaryKey
import zio.dynamodb.ProjectionExpression
import zio.dynamodb.interop.ce.syntax._
import zio.schema.DeriveSchema
import zio.schema.Schema

import java.net.URI

/**
* example interop app for stream utils
*
* to run in the sbt console:
* {{{
* zio-dynamodb-examples/runMain zio.dynamodb.examples.dynamodblocal.CeInteropStreamUtilsExample
* }}}
*/
object CeInteropStreamUtilsExample extends IOApp.Simple {

final case class Person(id: String, name: String)
object Person {
implicit val schema: Schema.CaseClass2[String, String, Person] = DeriveSchema.gen[Person]
val (id, name) = ProjectionExpression.accessors[Person]
}

def program[F[_]](implicit F: Async[F]) = {

val dynamoDBExceutorF = DynamoDBExceutorF
.ofCustomised[F] { builder =>
builder
.endpointOverride(URI.create("http://localhost:8000"))
.region(Region.US_EAST_1)
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("dummy", "dummy")))
}

val resources = for {
dynamo <- dynamoDBExceutorF
dispatcher <- Dispatcher.parallel[F] // required by batchReadXXX and batchWriteXXX utilities
} yield (dynamo, dispatcher)

for {
_ <- resources.use {
case (dynamoDBExceutorF, dispatcher) =>
implicit val dynamo_ = dynamoDBExceutorF // To use executeToF extension method we need this implicit here
implicit val dispatcher_ = dispatcher

for {
_ <- createTable("Person", KeySchema("id"), BillingMode.PayPerRequest)(
AttributeDefinition.attrDefnString("id")
).executeToF
fs2Input = fs2.Stream(Person("avi", "avi")).covary[F]
_ <- batchWriteFromStreamF(fs2Input)(p => put("Person", p)).compile.drain
console = Console.make[F]
fs2Stream <- scanAll[Person]("Person").executeToF
_ <- fs2Stream.evalTap(p => console.println(s"scanned $p")).compile.drain
_ <- batchReadFromStreamF("Person", fs2Input) { p =>
Person.id.partitionKey === p.id
}.evalTap(p => console.println(s"person $p")).compile.toList
_ <- batchReadItemFromStreamF("Person", fs2Input) { p =>
PrimaryKey("id" -> p.id)
}.evalTap(item => console.println(s"item $item")).compile.toList
_ <- deleteTable("Person").executeToF
} yield ()
}
} yield ()
}

def run = program[IO]

}
Loading

0 comments on commit e4cc3ef

Please sign in to comment.