Skip to content

Commit dd8256c

Browse files
committed
Bump to Scio v0.14.10 and Beam 2.61
1 parent 8fec3f5 commit dd8256c

File tree

4 files changed

+161
-78
lines changed

4 files changed

+161
-78
lines changed

build.sbt

+67-19
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,63 @@
11
import sbt._
22
import Keys._
3-
val scioVersion = "0.10.4"
4-
val beamVersion = "2.30.0"
5-
lazy val commonSettings = Def.settings(
6-
organization := "dev.herraiz",
3+
import com.here.bom.Bom
4+
5+
val scioVersion = "0.14.10"
6+
val beamVersion = "2.61.0"
7+
8+
val guavaVersion = "33.1.0-jre"
9+
val jacksonVersion = "2.15.4"
10+
val magnolifyVersion = "0.7.4"
11+
val nettyVersion = "4.1.100.Final"
12+
val slf4jVersion = "1.7.30"
13+
val gcpLibrariesVersion = "26.45.0"
14+
15+
lazy val gcpBom = Bom(
16+
"com.google.cloud" % "libraries-bom" % gcpLibrariesVersion
17+
)
18+
lazy val beamBom = Bom("org.apache.beam" % "beam-sdks-java-bom" % beamVersion)
19+
lazy val guavaBom = Bom("com.google.guava" % "guava-bom" % guavaVersion)
20+
lazy val jacksonBom = Bom(
21+
"com.fasterxml.jackson" % "jackson-bom" % jacksonVersion
22+
)
23+
lazy val magnolifyBom = Bom("com.spotify" % "magnolify-bom" % magnolifyVersion)
24+
lazy val nettyBom = Bom("io.netty" % "netty-bom" % nettyVersion)
25+
lazy val scioBom = Bom("com.spotify" % "scio-bom" % scioVersion)
26+
27+
val bomSettings = Def.settings(
28+
gcpBom,
29+
beamBom,
30+
guavaBom,
31+
jacksonBom,
32+
magnolifyBom,
33+
nettyBom,
34+
dependencyOverrides ++=
35+
gcpBom.key.value.bomDependencies ++
36+
beamBom.key.value.bomDependencies ++
37+
guavaBom.key.value.bomDependencies ++
38+
jacksonBom.key.value.bomDependencies ++
39+
magnolifyBom.key.value.bomDependencies ++
40+
nettyBom.key.value.bomDependencies
41+
)
42+
43+
lazy val commonSettings = bomSettings ++ Def.settings(
44+
organization := "example",
745
// Semantic versioning http://semver.org/
846
version := "0.1.0-SNAPSHOT",
9-
scalaVersion := "2.13.16",
10-
scalacOptions ++= Seq("-target:jvm-1.8",
11-
"-deprecation",
12-
"-feature",
13-
"-unchecked",
14-
"-Ymacro-annotations"),
15-
javacOptions ++= Seq("-source", "1.8", "-target", "1.8")
47+
scalaVersion := "2.13.15",
48+
scalacOptions ++= Seq(
49+
"-release",
50+
"11",
51+
"-deprecation",
52+
"-feature",
53+
"-unchecked",
54+
"-Ymacro-annotations"
55+
),
56+
javacOptions ++= Seq("--release", "11"),
57+
// add extra resolved and remove exclude if you need kafka
58+
// resolvers += "confluent" at "https://packages.confluent.io/maven/",
59+
excludeDependencies += "org.apache.beam" % "beam-sdks-java-io-kafka",
60+
excludeDependencies += "com.github.luben" % "zstd-jni"
1661
)
1762

1863
lazy val root: Project = project
@@ -22,19 +67,21 @@ lazy val root: Project = project
2267
name := "scio-scala-workshop-beam-summit",
2368
description := "scio-scala-workshop-beam-summit",
2469
publish / skip := true,
25-
run / classLoaderLayeringStrategy := ClassLoaderLayeringStrategy.Flat,
70+
fork := true,
71+
run / outputStrategy := Some(OutputStrategy.StdoutOutput),
2672
libraryDependencies ++= Seq(
2773
"com.spotify" %% "scio-core" % scioVersion,
2874
"com.spotify" %% "scio-google-cloud-platform" % scioVersion,
2975
"com.spotify" %% "scio-extra" % scioVersion,
3076
"com.spotify" %% "scio-test" % scioVersion % Test,
31-
"org.apache.beam" % "beam-runners-direct-java" % beamVersion,
32-
"org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % beamVersion,
33-
"org.slf4j" % "slf4j-simple" % "2.0.16",
34-
"com.google.http-client" % "google-http-client-apache-v2" % "1.45.3"
77+
"org.slf4j" % "slf4j-api" % slf4jVersion,
78+
"org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % beamVersion % Runtime,
79+
"org.apache.beam" % "beam-runners-direct-java" % beamVersion % Test,
80+
"com.spotify" %% "scio-test" % scioVersion % Test,
81+
"org.slf4j" % "slf4j-simple" % slf4jVersion % Test,
82+
"com.github.luben" % "zstd-jni" % "1.5.2-2"
3583
)
3684
)
37-
.enablePlugins(JavaAppPackaging)
3885

3986
lazy val repl: Project = project
4087
.in(file(".repl"))
@@ -46,8 +93,9 @@ lazy val repl: Project = project
4693
"com.spotify" %% "scio-repl" % scioVersion
4794
),
4895
Compile / mainClass := Some("com.spotify.scio.repl.ScioShell"),
49-
publish / skip := true
96+
publish / skip := true,
97+
fork := false
5098
)
5199
.dependsOn(root)
52100

53-
resolvers += "confluent" at "https://packages.confluent.io/maven/"
101+
ThisBuild / versionPolicyIntention := Compatibility.BinaryAndSourceCompatible

project/plugins.sbt

+3-1
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1-
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.8.1")
1+
addDependencyTreePlugin
2+
addSbtPlugin("com.here.platform" % "sbt-bom" % "1.0.17")
3+
addSbtPlugin("ch.epfl.scala" % "sbt-version-policy" % "3.2.1")

src/main/scala/dev/herraiz/TaxiSessionsPipeline.scala

+44-19
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,16 @@
1616

1717
package dev.herraiz
1818

19+
import io.circe.Error
20+
import com.spotify.scio._
1921
import com.spotify.scio.bigquery._
20-
import com.spotify.scio.bigquery.{CREATE_IF_NEEDED, Table, WRITE_TRUNCATE}
21-
import com.spotify.scio.values.{SCollection, WindowOptions}
22-
import com.spotify.scio.{Args, ContextAndArgs, ScioContext, streaming}
22+
import com.spotify.scio.pubsub._
23+
import com.spotify.scio.values._
2324
import dev.herraiz.data.DataTypes._
24-
import org.apache.beam.sdk.transforms.windowing.{AfterProcessingTime, AfterWatermark}
25+
import org.apache.beam.sdk.transforms.windowing.{
26+
AfterProcessingTime,
27+
AfterWatermark
28+
}
2529
import org.joda.time.Duration
2630

2731
object TaxiSessionsPipeline {
@@ -39,43 +43,64 @@ object TaxiSessionsPipeline {
3943
val accumTable = opts("accum-table")
4044

4145
val messages: SCollection[String] = getMessagesFromPubSub(pubsubTopic)
42-
val (rides, writableErrors) = parseJSONStrings(messages)
46+
val (
47+
rides: SCollection[PointTaxiRide],
48+
writableErrors: SCollection[JsonError]
49+
) = parseJSONStrings(messages)
50+
51+
rides
52+
.saveAsTypedBigQueryTable(
53+
Table.Spec(goodTable)
54+
)
4355

44-
rides.saveAsBigQueryTable(Table.Spec(goodTable), WRITE_TRUNCATE, CREATE_IF_NEEDED)
45-
writableErrors.saveAsBigQueryTable(Table.Spec(badTable), WRITE_TRUNCATE, CREATE_IF_NEEDED)
56+
writableErrors.saveAsTypedBigQueryTable(
57+
Table.Spec(badTable)
58+
)
4659

4760
// Group by session with a max duration of 5 mins between events
4861
// Window options
4962
val wopts: WindowOptions = customWindowOptions
5063
val groupRides = groupRidesByKey(rides.map(_.toTaxiRide), wopts)
51-
groupRides.saveAsBigQueryTable(Table.Spec(accumTable), WRITE_TRUNCATE, CREATE_IF_NEEDED)
64+
groupRides.saveAsTypedBigQueryTable(
65+
Table.Spec(accumTable)
66+
)
5267

5368
sc.run
5469
}
5570

5671
def customWindowOptions: WindowOptions =
5772
WindowOptions(
58-
trigger = AfterWatermark.pastEndOfWindow()
59-
.withEarlyFirings(AfterProcessingTime
60-
.pastFirstElementInPane
61-
.plusDelayOf(Duration.standardSeconds(EARLY_RESULT)))
62-
.withLateFirings(AfterProcessingTime
63-
.pastFirstElementInPane()
64-
.plusDelayOf(Duration.standardSeconds(LATENESS))),
73+
trigger = AfterWatermark
74+
.pastEndOfWindow()
75+
.withEarlyFirings(
76+
AfterProcessingTime.pastFirstElementInPane
77+
.plusDelayOf(Duration.standardSeconds(EARLY_RESULT))
78+
)
79+
.withLateFirings(
80+
AfterProcessingTime
81+
.pastFirstElementInPane()
82+
.plusDelayOf(Duration.standardSeconds(LATENESS))
83+
),
6584
accumulationMode = streaming.ACCUMULATING_FIRED_PANES,
6685
allowedLateness = Duration.standardSeconds(LATENESS)
6786
)
6887

69-
def getMessagesFromPubSub(pubsubTopic: String)(implicit sc: ScioContext): SCollection[String] = {
88+
def getMessagesFromPubSub(
89+
pubsubTopic: String
90+
)(implicit sc: ScioContext): SCollection[String] = {
7091
???
7192
}
7293

73-
def parseJSONStrings(messages: SCollection[String]):
74-
(SCollection[PointTaxiRide], SCollection[JsonError]) = {
94+
def parseJSONStrings(
95+
messages: SCollection[String]
96+
): (SCollection[PointTaxiRide], SCollection[JsonError]) = {
7597
???
7698
}
7799

78-
def groupRidesByKey(rides: SCollection[TaxiRide], wopts: WindowOptions): SCollection[TaxiRide] = {
100+
def groupRidesByKey(
101+
rides: SCollection[TaxiRide],
102+
wopts: WindowOptions
103+
): SCollection[TaxiRide] = {
79104
???
80105
}
81106
}

src/main/scala/dev/herraiz/data/TaxiDataTypes.scala

+47-39
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616

1717
package dev.herraiz.data
1818

19-
import com.spotify.scio.bigquery.types.BigQueryType
2019
import io.circe._
2120
import io.circe.generic.semiauto._
2221
import io.circe.parser.decode
22+
import com.spotify.scio.bigquery.types.BigQueryType
2323
import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
2424
import org.joda.time.{Instant, Interval}
2525

@@ -46,27 +46,29 @@ object DataTypes {
4646

4747
ti match {
4848
case Success(instant: Instant) => Right(instant)
49-
case Failure(e: Throwable) => Left(e.getMessage)
49+
case Failure(e: Throwable) => Left(e.getMessage)
5050
}
5151
}
5252

5353
// Decoder to produce TaxiRide objects from Json
54-
protected lazy implicit val taxiRideDecoder: Decoder[PointTaxiRide] = deriveDecoder[PointTaxiRide]
54+
protected lazy implicit val taxiRideDecoder: Decoder[PointTaxiRide] =
55+
deriveDecoder[PointTaxiRide]
5556

5657
@BigQueryType.toTable
5758
case class PointTaxiRide(
58-
ride_id: String,
59-
point_idx: Int,
60-
latitude: Double,
61-
longitude: Double,
62-
timestamp: Instant,
63-
meter_reading: Double,
64-
meter_increment: Double,
65-
ride_status: String,
66-
passenger_count: Int
67-
) {
59+
ride_id: String,
60+
point_idx: Int,
61+
latitude: Double,
62+
longitude: Double,
63+
timestamp: Instant,
64+
meter_reading: Double,
65+
meter_increment: Double,
66+
ride_status: String,
67+
passenger_count: Int
68+
) {
6869
def toTaxiRide: TaxiRide =
69-
TaxiRide(this.ride_id,
70+
TaxiRide(
71+
this.ride_id,
7072
1,
7173
this.timestamp,
7274
None,
@@ -78,14 +80,14 @@ object DataTypes {
7880

7981
@BigQueryType.toTable
8082
case class TaxiRide(
81-
ride_id: String,
82-
n_points: Int,
83-
init: Instant,
84-
finish: Option[Instant],
85-
total_meter: Double,
86-
init_status: String,
87-
finish_status: Option[String]
88-
) {
83+
ride_id: String,
84+
n_points: Int,
85+
init: Instant,
86+
finish: Option[Instant],
87+
total_meter: Double,
88+
init_status: String,
89+
finish_status: Option[String]
90+
) {
8991
def +(taxiRide: TaxiRide): TaxiRide = {
9092

9193
// Something is missing here
@@ -97,22 +99,28 @@ object DataTypes {
9799
(this, taxiRide)
98100
}
99101

100-
101-
val (finishStatus: Option[String], finishInstant: Option[Instant]) = first.finish match {
102-
case None =>
103-
(Some(second.finish_status.getOrElse(second.init_status)),
104-
Some(second.finish.getOrElse(second.init)))
105-
case Some(i) =>
106-
val interval: Interval = new Interval(first.init, i)
107-
val testInstant: Instant = second.finish.getOrElse(second.init)
108-
if (interval.contains(testInstant)) {
109-
(Some(first.finish_status.getOrElse(first.init_status)),
110-
Some(first.finish.getOrElse(first.init)))
111-
} else {
112-
(Some(second.finish_status.getOrElse(second.init_status)),
113-
Some(second.finish.getOrElse(second.init)))
114-
}
115-
}
102+
val (finishStatus: Option[String], finishInstant: Option[Instant]) =
103+
first.finish match {
104+
case None =>
105+
(
106+
Some(second.finish_status.getOrElse(second.init_status)),
107+
Some(second.finish.getOrElse(second.init))
108+
)
109+
case Some(i) =>
110+
val interval: Interval = new Interval(first.init, i)
111+
val testInstant: Instant = second.finish.getOrElse(second.init)
112+
if (interval.contains(testInstant)) {
113+
(
114+
Some(first.finish_status.getOrElse(first.init_status)),
115+
Some(first.finish.getOrElse(first.init))
116+
)
117+
} else {
118+
(
119+
Some(second.finish_status.getOrElse(second.init_status)),
120+
Some(second.finish.getOrElse(second.init))
121+
)
122+
}
123+
}
116124

117125
TaxiRide(
118126
taxiRide.ride_id,
@@ -129,4 +137,4 @@ object DataTypes {
129137
@BigQueryType.toTable
130138
case class JsonError(msg: String)
131139

132-
}
140+
}

0 commit comments

Comments
 (0)