16
16
17
17
package dev .herraiz
18
18
19
+ import io .circe .Error
20
+ import com .spotify .scio ._
19
21
import com .spotify .scio .bigquery ._
20
22
import com .spotify .scio .pubsub ._
21
- import com .spotify .scio .values .{SCollection , WindowOptions }
22
- import com .spotify .scio .{Args , ContextAndArgs , ScioContext , streaming }
23
+ import com .spotify .scio .values ._
23
24
import dev .herraiz .data .DataTypes ._
24
- import io .circe
25
- import org .apache .beam .sdk .transforms .windowing .{AfterProcessingTime , AfterWatermark }
25
+ import org .apache .beam .sdk .transforms .windowing .{
26
+ AfterProcessingTime ,
27
+ AfterWatermark
28
+ }
26
29
import org .joda .time .Duration
27
30
28
31
object TaxiSessionsPipeline {
@@ -40,67 +43,96 @@ object TaxiSessionsPipeline {
40
43
val accumTable = opts(" accum-table" )
41
44
42
45
val messages : SCollection [String ] = getMessagesFromPubSub(pubsubTopic)
43
- val (rides : SCollection [PointTaxiRide ], writableErrors : SCollection [JsonError ]) = parseJSONStrings(messages)
44
-
45
- rides.saveAsBigQueryTable(Table .Spec (goodTable), WRITE_APPEND , CREATE_IF_NEEDED )
46
- writableErrors.saveAsBigQueryTable(Table .Spec (badTable), WRITE_APPEND , CREATE_IF_NEEDED )
46
+ val (
47
+ rides : SCollection [PointTaxiRide ],
48
+ writableErrors : SCollection [JsonError ]
49
+ ) = parseJSONStrings(messages)
50
+
51
+ rides
52
+ .saveAsTypedBigQueryTable(
53
+ Table .Spec (goodTable)
54
+ )
55
+
56
+ writableErrors.saveAsTypedBigQueryTable(
57
+ Table .Spec (badTable)
58
+ )
47
59
48
60
// Group by session with a max duration of 5 mins between events
49
61
// Window options
50
62
val wopts : WindowOptions = customWindowOptions
51
63
val groupRides = groupRidesByKey(rides.map(_.toTaxiRide), wopts)
52
- groupRides.saveAsBigQueryTable(Table .Spec (accumTable), WRITE_APPEND , CREATE_IF_NEEDED )
64
+ groupRides.saveAsTypedBigQueryTable(
65
+ Table .Spec (accumTable)
66
+ )
53
67
54
68
sc.run
55
69
}
56
70
57
71
def customWindowOptions : WindowOptions =
58
72
WindowOptions (
59
- trigger = AfterWatermark .pastEndOfWindow()
60
- .withEarlyFirings(AfterProcessingTime
61
- .pastFirstElementInPane
62
- .plusDelayOf(Duration .standardSeconds(EARLY_RESULT )))
63
- .withLateFirings(AfterProcessingTime
64
- .pastFirstElementInPane()
65
- .plusDelayOf(Duration .standardSeconds(LATENESS ))),
73
+ trigger = AfterWatermark
74
+ .pastEndOfWindow()
75
+ .withEarlyFirings(
76
+ AfterProcessingTime .pastFirstElementInPane
77
+ .plusDelayOf(Duration .standardSeconds(EARLY_RESULT ))
78
+ )
79
+ .withLateFirings(
80
+ AfterProcessingTime
81
+ .pastFirstElementInPane()
82
+ .plusDelayOf(Duration .standardSeconds(LATENESS ))
83
+ ),
66
84
accumulationMode = streaming.ACCUMULATING_FIRED_PANES ,
67
85
allowedLateness = Duration .standardSeconds(LATENESS )
68
86
)
69
87
70
- def getMessagesFromPubSub (pubsubTopic : String )(implicit sc : ScioContext ): SCollection [String ] = {
71
- val pubsubRead : PubsubIO [String ] = PubsubIO .string(pubsubTopic, timestampAttribute = " ts" )
88
+ def getMessagesFromPubSub (
89
+ pubsubTopic : String
90
+ )(implicit sc : ScioContext ): SCollection [String ] = {
91
+ val pubsubRead : PubsubIO [String ] =
92
+ PubsubIO .string(pubsubTopic, timestampAttribute = " ts" )
72
93
val pubsubParams : PubsubIO .ReadParam = PubsubIO .ReadParam (PubsubIO .Topic )
73
94
74
95
/* _*/
75
96
sc.read(pubsubRead)(pubsubParams) /* _*/
76
97
}
77
98
78
- def parseJSONStrings (messages : SCollection [String ]):
79
- (SCollection [PointTaxiRide ], SCollection [JsonError ]) = {
80
- val jsons : SCollection [Either [circe.Error , PointTaxiRide ]] = messages.map { s : String => json2TaxiRide(s) }
81
-
82
- val errorsEither :: pointsEither :: Nil = jsons.partition(2 , { e =>
83
- e match {
84
- case Left (_) => 0
85
- case Right (_) => 1
99
+ def parseJSONStrings (
100
+ messages : SCollection [String ]
101
+ ): (SCollection [PointTaxiRide ], SCollection [JsonError ]) = {
102
+ val jsons : SCollection [Either [Error , PointTaxiRide ]] = messages.map {
103
+ s : String => json2TaxiRide(s)
104
+ }
105
+
106
+ val errorsEither :: pointsEither :: Nil = jsons.partition(
107
+ 2 ,
108
+ { e =>
109
+ e match {
110
+ case Left (_) => 0
111
+ case Right (_) => 1
112
+ }
86
113
}
87
- } )
114
+ )
88
115
89
- val errors : SCollection [circe. Error ] = errorsEither.map(_.left.get)
116
+ val errors : SCollection [Error ] = errorsEither.map(_.left.get)
90
117
val points : SCollection [PointTaxiRide ] = pointsEither.map(_.right.get)
91
118
92
119
val jsonErrors : SCollection [JsonError ] = errors.map(circeErrorToCustomError)
93
120
94
121
(points, jsonErrors)
95
122
}
96
123
97
- def groupRidesByKey (rides : SCollection [TaxiRide ], wopts : WindowOptions ): SCollection [TaxiRide ] = {
124
+ def groupRidesByKey (
125
+ rides : SCollection [TaxiRide ],
126
+ wopts : WindowOptions
127
+ ): SCollection [TaxiRide ] = {
98
128
val ridesWithKey : SCollection [(String , TaxiRide )] =
99
129
rides.keyBy(_.ride_id)
100
130
101
131
val afterWindow : SCollection [(String , TaxiRide )] =
102
- ridesWithKey.withSessionWindows(Duration .standardSeconds(SESSION_GAP ), options = wopts)
103
-
132
+ ridesWithKey.withSessionWindows(
133
+ Duration .standardSeconds(SESSION_GAP ),
134
+ options = wopts
135
+ )
104
136
105
137
val agg : SCollection [TaxiRide ] = afterWindow.reduceByKey(_ + _).map(_._2)
106
138
0 commit comments