-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpsutil.py
81 lines (56 loc) · 2.29 KB
/
psutil.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from locust import HttpUser, task, between
import sqlite3
import time
class MyUser(HttpUser):
wait_time = between(1, 3)
host = "http://127.0.0.1:3000/"
host_secondary = "http://127.0.0.1:3001/"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.max_request_names = []
def fetch_response_time(self):
connection = sqlite3.connect("locust_data.db")
cursor = connection.cursor()
cursor.execute("SELECT DISTINCT Name, Average_Response_Time FROM response_times ORDER BY Average_Response_Time DESC LIMIT 2")
top_response_time = cursor.fetchall()
# print(top_response_time)
connection.close()
return top_response_time
def on_start(self):
response_time_data = self.fetch_response_time()
self.max_request_names = [name.lstrip('/') for name, _ in response_time_data]
@task
def get_post_1(self):
self.perform_request("get_post_1", "posts/1")
@task
def get_post_2(self):
self.perform_request("get_post_2", "posts/2")
@task
def get_albums(self):
self.perform_request("get_albums", "albums")
@task
def get_photos(self):
self.perform_request("get_photos", "photos")
@task
def get_comments(self):
self.perform_request("get_comments", "comments")
@task
def get_comments1(self):
self.perform_request("get_comments1", "comments/1")
@task
def get_comments2(self):
self.perform_request("get_comments2", "comments/2")
def perform_request(self, name, endpoint):
if endpoint in self.max_request_names:
self.client.base_url = self.host_secondary
print(f"Redirecting request {endpoint} to the secondary server")
with self.client.get(endpoint, catch_response=True) as response:
self.handle_response(response, name)
else:
self.client.base_url = self.host
print(f"Redirecting request {endpoint} to the primary server")
with self.client.get(endpoint, catch_response=True) as response:
self.handle_response(response, name)
def handle_response(self, response, name):
response_time = response.elapsed.total_seconds()
print(name, response_time)