18
18
19
19
package com .github .novaonline
20
20
21
+ import java .time .OffsetDateTime
21
22
import java .util .Properties
22
23
23
24
import com .github .novaonline .cassandra .HueLoggingLocalCluster
24
- import com .github .novaonline .map .JsonToLightEvent
25
25
import com .github .novaonline .model .light ._
26
+ import com .github .novaonline .serialization .SimpleJsonSchema
26
27
import org .apache .flink .api .java .utils .ParameterTool
27
28
import org .apache .flink .streaming .api .TimeCharacteristic
28
29
import org .apache .flink .streaming .api .functions .timestamps .BoundedOutOfOrdernessTimestampExtractor
29
30
import org .apache .flink .streaming .api .scala ._
30
31
import org .apache .flink .streaming .api .windowing .time .Time
31
32
import org .apache .flink .streaming .connectors .cassandra .CassandraSink
32
- import org .apache .flink .streaming .connectors .kafka .FlinkKafkaConsumer010
33
- import org .apache .flink .streaming .util .serialization .JSONKeyValueDeserializationSchema
33
+ import org .apache .flink .streaming .connectors .kafka .{FlinkKafkaConsumer010 , FlinkKafkaProducer010 }
34
34
import org .slf4j .LoggerFactory
35
35
36
36
@@ -50,32 +50,39 @@ object LightEvents {
50
50
val env = StreamExecutionEnvironment .getExecutionEnvironment
51
51
env.setStreamTimeCharacteristic(TimeCharacteristic .EventTime )
52
52
53
+ val lightEventSchema = new SimpleJsonSchema [LightEvent ]
54
+ val lightSessionSchema = new SimpleJsonSchema [LightSession ]
55
+ val lightAccumlatedSchema = new SimpleJsonSchema [LightAccumulated ]
56
+
53
57
val properties = new Properties ()
54
58
properties.setProperty(" bootstrap.servers" , s " $kafkaHost:9092 " )
55
59
properties.setProperty(" group.id" , " Hue-Logging" )
56
- val lightEventSource = new FlinkKafkaConsumer010 (" hue-logging-light-event" , new JSONKeyValueDeserializationSchema ( false ) , properties)
60
+ val lightEventSource = new FlinkKafkaConsumer010 (" hue-logging-light-event" , lightEventSchema , properties)
57
61
lightEventSource.setStartFromEarliest()
58
62
59
63
val cassandraCluster = new HueLoggingLocalCluster (cassandraHost)
60
64
61
65
// Start Map Reduce
62
- val lightEventJsonStream = env.addSource(lightEventSource)
63
- val lightEventStream = lightEventJsonStream.map(JsonToLightEvent )
66
+ val lightEventStream = env.addSource(lightEventSource).map(x => {
67
+ // enrich anything else missed from serializing
68
+ x.copy(state = x.state.copy(addDate = OffsetDateTime .parse(x.addDate).toEpochSecond))
69
+ })
64
70
65
71
val watermarkedLightEventsStream = lightEventStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor [LightEvent ](Time .seconds(10 )) {
66
- override def extractTimestamp (t : LightEvent ): Long = t.state. addDate
72
+ override def extractTimestamp (t : LightEvent ): Long = OffsetDateTime .parse(t. addDate).toEpochSecond
67
73
})
68
74
69
75
val lightSessionsStream = watermarkedLightEventsStream
70
76
.keyBy(x => x.light.id)
71
- .flatMapWithState[LightSession , LightState ]((light : LightEvent , state : Option [LightState ]) => {
72
- if (state.isEmpty && light.state.on) {
77
+ .flatMapWithState[LightSession , LightState ]((light : LightEvent , prevState : Option [LightState ]) => {
78
+ val currState = light.state
79
+ if (prevState.isEmpty && currState.on) {
73
80
LOG .info(" Opened Light Session" )
74
- (Iterator (LightSession (light.light, light.state , None , 0 )), Some (light.state ))
75
- } else if (state .isDefined && state .get.on && ! light.state .on) {
81
+ (Iterator (LightSession (light.light, currState , None , 0 )), Some (currState ))
82
+ } else if (prevState .isDefined && prevState .get.on && ! currState .on) {
76
83
LOG .info(" Closed Light Session" )
77
- val duration = light.state. addDate - state .get.addDate
78
- (Iterator (LightSession (light.light, light.state , Some (light.state ), duration)), None )
84
+ val duration = currState. addDate - prevState .get.addDate
85
+ (Iterator (LightSession (light.light, currState , Some (currState ), duration)), None )
79
86
} else {
80
87
// LOG.info("Unknown State for Light Session")
81
88
(Iterator .empty, None )
@@ -89,8 +96,17 @@ object LightEvents {
89
96
.map(x => LightAccumulated (x.light, x.durationSeconds))
90
97
91
98
99
+ // Publish Processed Streams to Kafka
100
+ val kafkaLightSessionProducer = new FlinkKafkaProducer010 (" hue-logging-light-session" , lightSessionSchema, properties)
101
+
102
+ val kafkaLightAccumulatedProducer = new FlinkKafkaProducer010 (" hue-logging-light-accumulated" , lightAccumlatedSchema, properties)
103
+
104
+ lightSessionsStream.addSink(kafkaLightSessionProducer)
105
+ lightAccumulated.addSink(kafkaLightAccumulatedProducer)
106
+
107
+
92
108
CassandraSink .addSink(lightEventStream.map(x => {
93
- val r = x.toCassandraTuple()
109
+ val r = x.toCassandraTuple
94
110
LOG .info(s " mapped light event stream ${r.toString()}" )
95
111
r
96
112
}))
0 commit comments