|
| 1 | +import requests |
| 2 | +import concurrent.futures |
| 3 | +import time |
| 4 | +import sys |
| 5 | + |
| 6 | +# Redirect standard output to a file |
| 7 | +original_stdout = sys.stdout |
| 8 | +with open('test_load_details.txt', 'w') as f: |
| 9 | + sys.stdout = f |
| 10 | + |
| 11 | + # The URL of the endpoint you want to test |
| 12 | + url = 'https://bmjg67cbef.execute-api.eu-central-1.amazonaws.com/prod/docs/' |
| 13 | + |
| 14 | + # Function to make a single HTTP request |
| 15 | + def make_request(): |
| 16 | + try: |
| 17 | + start_time = time.time() |
| 18 | + response = requests.get(url) # You can change this to requests.post() if needed |
| 19 | + end_time = time.time() |
| 20 | + |
| 21 | + return { |
| 22 | + 'status_code': response.status_code, |
| 23 | + 'response_time': end_time - start_time |
| 24 | + } |
| 25 | + except Exception as e: |
| 26 | + return { |
| 27 | + 'error': str(e) |
| 28 | + } |
| 29 | + |
| 30 | + # Number of requests you want to make (if negative, resort to time_limit) |
| 31 | + num_requests = 12000 |
| 32 | + |
| 33 | + # Time limit for the load test (2 minutes) |
| 34 | + time_limit = 120 |
| 35 | + |
| 36 | + # List to store response times and status codes |
| 37 | + responses = [] |
| 38 | + |
| 39 | + # Record the start time of the load test |
| 40 | + start_time = time.time() |
| 41 | + |
| 42 | + # Use ThreadPoolExecutor to manage concurrent requests |
| 43 | + if num_requests > 0: |
| 44 | + with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor: |
| 45 | + future_to_request = {executor.submit(make_request): i for i in range(num_requests)} |
| 46 | + for future in concurrent.futures.as_completed(future_to_request): |
| 47 | + response = future.result() |
| 48 | + responses.append(response) |
| 49 | + else: |
| 50 | + with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor: |
| 51 | + # Run the loop for time_limit |
| 52 | + while time.time() - start_time < time_limit: |
| 53 | + # Submit a new request |
| 54 | + future = executor.submit(make_request) |
| 55 | + # Get the result of the request and add it to the list of responses |
| 56 | + try: |
| 57 | + response = future.result(timeout=1) # Set a timeout for each request if needed |
| 58 | + responses.append(response) |
| 59 | + except concurrent.futures.TimeoutError: |
| 60 | + responses.append({'error': 'Request timed out'}) |
| 61 | + |
| 62 | + # Record the end time of the load test |
| 63 | + end_time = time.time() |
| 64 | + |
| 65 | + # Calculate and print the total execution time of the load test |
| 66 | + total_time = end_time - start_time |
| 67 | + print(f"Total execution time: {total_time} seconds") |
| 68 | + |
| 69 | + # Print the total number of requests |
| 70 | + print(f"Total requests: {len(responses)} requests") |
| 71 | + |
| 72 | + # Print the total number of successful requests |
| 73 | + success_responses = sum(1 for d in responses if d.get('status_code') == 200) |
| 74 | + print(f"Total successful requests: {success_responses} requests") |
| 75 | + |
| 76 | + # Calculate and print the average response time |
| 77 | + average_response_time = sum(d['response_time'] for d in responses if 'response_time' in d) / len(responses) |
| 78 | + print(f"Average response time: {average_response_time} seconds") |
| 79 | + |
| 80 | + # Calculate and print success rate |
| 81 | + success_rate = 100.0 * (1.0 * success_responses / len(responses)) |
| 82 | + print(f"Success rate: {success_rate}%") |
| 83 | + |
| 84 | + # Optionally, log the detailed responses or any errors encountered |
| 85 | + for response in responses: |
| 86 | + if 'error' in response: |
| 87 | + print(f"Error: {response['error']}") |
| 88 | + else: |
| 89 | + print(f"Status Code: {response['status_code']}, Response Time: {response['response_time']} seconds") |
| 90 | + |
| 91 | + # Reset standard output to original |
| 92 | + sys.stdout = original_stdout |
0 commit comments