-
Notifications
You must be signed in to change notification settings - Fork 0
/
generate_load.py
142 lines (128 loc) · 5.29 KB
/
generate_load.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import barnum, random, time, json
from mysql.connector import connect, Error
from kafka import KafkaProducer
# CONFIG
userSeedCount = 10000
itemSeedCount = 1000
purchaseGenCount = 100000
purchaseGenEveryMS = 100
pageviewMultiplier = 75
itemInventoryMin = 1000
itemInventoryMax = 5000
itemPriceMin = 5
itemPriceMax = 500
kafkaHost = 'kafka:9092'
kafkaTopic = 'pageviews'
channels = ['organic search', 'paid search', 'referral', 'social', 'display']
# INSERT TEMPLATES
item_insert = "INSERT INTO shop.items (name, price, inventory) VALUES ( %s, %s, %s )"
user_insert = "INSERT INTO shop.users (email, is_vip) VALUES ( %s, %s )"
purchase_insert = "INSERT INTO shop.purchases (user_id, item_id, quantity, purchase_price) VALUES ( %s, %s, %s, %s )"
#Initialize Kafka
producer = KafkaProducer(bootstrap_servers=[kafkaHost],
value_serializer=lambda x:
json.dumps(x).encode('utf-8'))
def generatePageview(user_id, product_id):
return {
"user_id": user_id,
"url": f'/products/{product_id}',
"channel": random.choice(channels),
"received_at": int(time.time())
}
try:
with connect(
host="mysql",
user='root',
password='debezium',
) as connection:
with connection.cursor() as cursor:
print("Initializing shop database...")
cursor.execute('DROP DATABASE IF EXISTS shop;')
cursor.execute('CREATE DATABASE shop;')
cursor.execute(
"""CREATE TABLE shop.users
(
id SERIAL PRIMARY KEY,
email VARCHAR(255),
is_vip BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);"""
)
cursor.execute(
"""CREATE TABLE shop.items
(
id SERIAL PRIMARY KEY,
name VARCHAR(100),
price DECIMAL(7,2),
inventory INT,
inventory_updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);"""
)
cursor.execute(
"""CREATE TABLE shop.purchases
(
id SERIAL PRIMARY KEY,
user_id BIGINT UNSIGNED REFERENCES user(id),
item_id BIGINT UNSIGNED REFERENCES item(id),
status TINYINT UNSIGNED DEFAULT 1,
quantity INT UNSIGNED DEFAULT 1,
purchase_price DECIMAL(12,2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);"""
)
connection.commit()
print("Seeding data...")
cursor.executemany(
item_insert,
[
(
barnum.create_nouns(),
random.randint(itemPriceMin*100,itemPriceMax*100)/100,
random.randint(itemInventoryMin,itemInventoryMax)
) for i in range(itemSeedCount)
]
)
cursor.executemany(
user_insert,
[
(
barnum.create_email(),
(random.randint(0,10) > 8)
) for i in range(userSeedCount)
]
)
connection.commit()
print("Getting item ID and PRICEs...")
cursor.execute("SELECT id, price FROM shop.items")
item_prices = [(row[0], row[1]) for row in cursor]
print("Preparing to loop + seed kafka pageviews and purchases")
for i in range(purchaseGenCount):
# Get a user and item to purchase
purchase_item = random.choice(item_prices)
purchase_user = random.randint(0,userSeedCount-1)
purchase_quantity = random.randint(1,5)
# Write purchaser pageview
producer.send(kafkaTopic, key=b'test', value=generatePageview(purchase_user, purchase_item[0]))
# Write random pageviews
for i in range(pageviewMultiplier):
producer.send(kafkaTopic, key=b'test', value=generatePageview(random.randint(0,userSeedCount), random.randint(0,itemSeedCount)))
# Write purchase row
cursor.execute(
purchase_insert,
(
purchase_user,
purchase_item[0],
purchase_quantity,
purchase_item[1] * purchase_quantity
)
)
connection.commit()
#Pause
time.sleep(purchaseGenEveryMS/1000)
connection.close()
except Error as e:
print(e)