forked from Rishav273/kafkaPysparkAnalytics
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspark_stream_app.py
135 lines (102 loc) · 4.14 KB
/
spark_stream_app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# importing spark modules
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, sum, count, desc, window
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, LongType, DateType, FloatType, TimestampType
from lib.utils import read_events_from_kafka, write_streaming_df_to_file_sink, process_raw_df_from_kafka_source, define_schema, load_config
import yaml
# read config file
config = load_config()
# importing logging modules
import logging
from lib.logger import Logger
# define explicit schema
schema = define_schema()
if __name__ == "__main__":
# create logger instance
logger = Logger(
'SparkStreamingApp',
'application_logs/log.txt',
level=logging.INFO
)
logger.info('Starting SparkSession...')
# create a spark session for handling transactions data
spark = SparkSession.builder.master('local[3]').appName('transactionsApp').getOrCreate()
# stream from source to an input df
df = read_events_from_kafka(
spark,
topic='transactions',
starting_offset='latest'
)
logger.info('Displaying raw streaming dataframe schema...')
df.printSchema()
# process the raw dataframe
processed_df = process_raw_df_from_kafka_source(df, schema)
logger.info('Displaying raw streaming dataframe schema...')
processed_df.printSchema()
# total fraud transactions per transaction category
fraud_transactions_per_category_df = processed_df \
.withWatermark('timestamp', '10 seconds') \
.groupBy(
window(col('timestamp'), '10 seconds'),
col('category')
).agg(
sum('is_fraud').alias('total_frauds')
).select(
'window.start',
'window.end',
'category',
'total_frauds'
)
logger.info('Displaying total frauds per category schema...')
fraud_transactions_per_category_df.printSchema()
# write df to sink
query1 = write_streaming_df_to_file_sink(spark,
fraud_transactions_per_category_df,
checkpoint_string='frauds-per-category',
path='frauds-per-category',
outputMode='update',
truncate='false',
format='console')
#############################################################################
# total frauds per window
frauds_per_window = processed_df \
.withWatermark('timestamp', '10 seconds') \
.groupBy(window(col('timestamp'), '10 seconds')) \
.agg(sum('is_fraud').alias('total_frauds')) \
.select(
'window.start', 'window.end', 'total_frauds'
)
logger.info('Displaying total frauds per window schema...')
frauds_per_window.printSchema()
# write df to sink
query2 = write_streaming_df_to_file_sink(spark,
frauds_per_window,
checkpoint_string='frauds-per-window',
path='frauds-per-window',
outputMode='update',
truncate='false',
format='console')
# #############################################################################
#frauds per merchant
frauds_per_merchant = processed_df \
.withWatermark('timestamp', '10 seconds') \
.groupBy(window(col('timestamp'), '10 seconds'), 'merchant') \
.agg(
sum('is_fraud').alias('total_frauds')
).select(
'window.start', 'window.end', 'merchant', 'total_frauds'
)
logger.info('Displaying total frauds per merchant schema...')
frauds_per_merchant.printSchema()
# write df to sink
query3 = write_streaming_df_to_file_sink(spark,
frauds_per_merchant,
checkpoint_string='frauds-per-merchant',
path='frauds-per-merchant',
outputMode='update',
truncate='false',
format='console')
# Terminate streaming when any one of the jobs fail
logger.info('Starting stream...')
spark.streams.awaitAnyTermination()
#############################################################################