“每小时小费”练习的任务是确定每小时赚取最多小费的司机。 最简单的方法是通过两个步骤来解决这个问题:首先使用一个小时长的窗口来计算每个司机在一小时内的总小费,然后从该窗口结果流中找到每小时总小费最多的司机。
请注意,该程序应使用事件时间(event time)。
本练习的输入数据是由出租车车费流生成器生成的 TaxiFare
事件流。
TaxiFareGenerator
用时间戳和水位线(watermark)注解生成的 DataStream<TaxiFare>
。
因此,无需提供自定义的时间戳和水印分配器即可正确使用事件时间。
所希望的结果是每小时产生一个 Tuple3<Long, Long, Float>
记录的数据流。
这个记录(Tuple3<Long, Long, Float>
)应包含该小时结束时的时间戳(对应三元组的第一个元素)、
该小时内获得小费最多的司机的 driverId(对应三元组的第二个元素)以及他的实际小费总数(对应三元组的第三个元素))。
结果流应打印到标准输出。
ℹ️ 最好在 IDE 的 flink-training 项目中找到这些类,而不是使用本节中源文件的链接。
- Java:
org.apache.flink.training.exercises.hourlytips.HourlyTipsExercise
- Scala:
org.apache.flink.training.exercises.hourlytips.scala.HourlyTipsExercise
- Java:
org.apache.flink.training.exercises.hourlytips.HourlyTipsTest
- Scala:
org.apache.flink.training.exercises.hourlytips.scala.HourlyTipsTest
程序结构
请注意,可以将一组时间窗口逐个级联,只要时间帧兼容(第二组窗口的持续时间需要是第一组的倍数)。
因此,首先可以得到一个由 driverId
键值分隔的具有一小时窗口的初始数据集,并使用它来创建一个 (endOfHourTimestamp,driverId,totalTips)
流。
然后使用另一个一小时窗口(该窗口不是用键值分隔的),从第一个窗口中查找具有最大 totalTips
的记录。
项目中提供了参考解决方案:
- Java:
org.apache.flink.training.solutions.hourlytips.HourlyTipsSolution
- Scala:
org.apache.flink.training.solutions.hourlytips.scala.HourlyTipsSolution