Skip to content

Commit

Permalink
[Monitoring] Add EC2 auto scaling script (#4098)
Browse files Browse the repository at this point in the history
* Add auto manage script for start stop ec2 workers

Signed-off-by: AyushR1 <[email protected]>

* Fix issues

Signed-off-by: AyushR1 <[email protected]>

* Update serializers.py

* Update EC2 instance auto scaling script

* Fix flake8

* Update auto scaling

* Fix flake8

* Fix tests

---------

Signed-off-by: AyushR1 <[email protected]>
Co-authored-by: Gunjan Chhablani <[email protected]>
  • Loading branch information
AyushR1 and gchhablani committed Aug 11, 2023
1 parent bcc642b commit 7568d9c
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 0 deletions.
2 changes: 2 additions & 0 deletions apps/challenges/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class Meta:
"cpu_only_jobs",
"job_cpu_cores",
"job_memory",
"uses_ec2_worker",
"ec2_storage",
)

Expand Down Expand Up @@ -296,6 +297,7 @@ class Meta:
"cpu_only_jobs",
"job_cpu_cores",
"job_memory",
"uses_ec2_worker",
"ec2_storage",
)

Expand Down
124 changes: 124 additions & 0 deletions scripts/monitoring/auto_scale_ec2_workers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import os
import pytz
import warnings
import boto3
from datetime import datetime
from dateutil.parser import parse
from evalai_interface import EvalAI_Interface

warnings.filterwarnings("ignore")

utc = pytz.UTC

ENV = os.environ.get("ENV", "dev")
evalai_endpoint = os.environ.get("API_HOST_URL", "http://localhost:8000")
auth_token = os.environ.get(
"AUTH_TOKEN",
)


def get_boto3_client(resource, aws_keys):
client = boto3.client(
resource,
region_name=aws_keys["AWS_REGION"],
aws_access_key_id=aws_keys["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=aws_keys["AWS_SECRET_ACCESS_KEY"],
)
return client


def get_pending_submission_count(challenge_metrics):
pending_submissions = 0
for status in ["running", "submitted", "queued", "resuming"]:
pending_submissions += challenge_metrics.get(status, 0)
return pending_submissions


def stop_instance(challenge, evalai_interface):
instance_details = evalai_interface.get_ec2_instance_details(challenge["id"])
instance = instance_details["message"]
if instance["State"]["Name"] == "running":
response = evalai_interface.stop_challenge_ec2_instance(challenge["id"])
print("AWS API Response: {}".format(response))
print(
"Stopped EC2 instance for Challenge ID: {}, Title: {}".format(
challenge["id"], challenge["title"]
)
)
else:
print(
"No running EC2 instance and pending messages found for Challenge ID: {}, Title: {}. Skipping.".format(
challenge["id"], challenge["title"]
)
)


def start_instance(challenge, evalai_interface):
instance_details = evalai_interface.get_ec2_instance_details(challenge["id"])
instance = instance_details["message"]
if instance["State"]["Name"] == "stopped":
response = evalai_interface.start_challenge_ec2_instance(challenge["id"])
print("AWS API Response: {}".format(response))
print(
"Started EC2 instance for Challenge ID: {}, Title: {}.".format(
challenge["id"], challenge["title"]
)
)
else:
print(
"Existing running EC2 instance and pending messages found for Challenge ID: {}, Title: {}. Skipping.".format(
challenge["id"], challenge["title"]
)
)


def start_or_stop_workers(challenge, challenge_metrics, evalai_interface):
try:
pending_submissions = get_pending_submission_count(challenge_metrics)
except Exception: # noqa: F841
print(
"Unable to get the pending submissions for challenge ID: {}, Title: {}. Skipping.".format(
challenge["id"], challenge["title"]
)
)
return

print("Pending Submissions: {}".format(pending_submissions))

if pending_submissions == 0 or parse(
challenge["end_date"]
) < pytz.UTC.localize(datetime.utcnow()):
stop_instance(challenge, evalai_interface)
else:
start_instance(challenge, evalai_interface)


# TODO: Factor in limits for the APIs
def start_or_stop_workers_for_challenges(response, metrics, evalai_interface):
for challenge in response["results"]:
if challenge["uses_ec2_worker"]:
start_or_stop_workers(challenge, metrics[str(challenge["id"])], evalai_interface)


def create_evalai_interface(auth_token, evalai_endpoint):
evalai_interface = EvalAI_Interface(auth_token, evalai_endpoint)
return evalai_interface


# Cron Job
def start_job():
evalai_interface = create_evalai_interface(auth_token, evalai_endpoint)
response = evalai_interface.get_challenges()
metrics = evalai_interface.get_challenges_submission_metrics()
start_or_stop_workers_for_challenges(response, metrics)
next_page = response["next"]
while next_page is not None:
response = evalai_interface.make_request(next_page, "GET")
start_or_stop_workers_for_challenges(response, metrics, evalai_interface)
next_page = response["next"]


if __name__ == "__main__":
print("Starting worker auto scaling script")
start_job()
print("Quitting worker auto scaling script!")
24 changes: 24 additions & 0 deletions scripts/monitoring/evalai_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
"get_challenges": "/api/challenges/challenge/all/all/all",
"get_submissions_for_challenge": "/api/jobs/challenge/{}/submission/",
"get_challenges_submission_metrics": "/api/challenges/challenge/get_submission_metrics",
"manage_ec2_instance": "/api/challenges/{}/manage_ec2_instance/{}",
"get_ec2_instance_details": "/api/challenges/{}/get_ec2_instance_details/",
}


Expand Down Expand Up @@ -141,3 +143,25 @@ def get_challenges_submission_metrics(self):
url = self.return_url_per_environment(url)
response = self.make_request(url, "GET")
return response

def get_ec2_instance_details(self, challenge_pk):
url = URLS.get("get_ec2_instance_details")
url_template = URLS.get("manage_ec2_instance")
url = url_template.format(challenge_pk)
url = self.return_url_per_environment(url)
response = self.make_request(url, "GET")
return response

def start_challenge_ec2_instance(self, challenge_pk):
url_template = URLS.get("manage_ec2_instance")
url = url_template.format(challenge_pk, "start")
url = self.return_url_per_environment(url)
response = self.make_request(url, "PUT")
return response

def stop_challenge_ec2_instance(self, challenge_pk):
url_template = URLS.get("manage_ec2_instance")
url = url_template.format(challenge_pk, "stop")
url = self.return_url_per_environment(url)
response = self.make_request(url, "PUT")
return response
30 changes: 30 additions & 0 deletions scripts/monitoring/run_auto_scale_ec2_workers.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/bin/bash
path=$PWD
auth_token=''
api_host_url=''
if [ ! -z "$1" ]
then
path=$1
fi

if [ ! -z "$2" ]
then
auth_token=$2
fi

if [ ! -z "$3" ]
then
api_host_url=$3
fi

if [ ! -z "$4" ]
then
env=$4
fi

# crontab doesn't have access to env variable, define explicitly
export AUTH_TOKEN=${auth_token};
export API_HOST_URL=${api_host_url};
export ENV=${env}

/home/ubuntu/venv/bin/python ${path}/scripts/monitoring/auto_scale_ec2_workers.py
19 changes: 19 additions & 0 deletions tests/unit/challenges/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ def test_get_challenge(self):
"cpu_only_jobs": self.challenge.cpu_only_jobs,
"job_cpu_cores": self.challenge.job_cpu_cores,
"job_memory": self.challenge.job_memory,
"uses_ec2_worker": self.challenge.uses_ec2_worker,
"ec2_storage": self.challenge.ec2_storage,
}
]
Expand Down Expand Up @@ -533,6 +534,7 @@ def test_get_particular_challenge(self):
"cpu_only_jobs": self.challenge.cpu_only_jobs,
"job_cpu_cores": self.challenge.job_cpu_cores,
"job_memory": self.challenge.job_memory,
"uses_ec2_worker": self.challenge.uses_ec2_worker,
"ec2_storage": self.challenge.ec2_storage,
}
response = self.client.get(self.url, {})
Expand Down Expand Up @@ -627,6 +629,7 @@ def test_update_challenge_when_user_is_its_creator(self):
"cpu_only_jobs": self.challenge.cpu_only_jobs,
"job_cpu_cores": self.challenge.job_cpu_cores,
"job_memory": self.challenge.job_memory,
"uses_ec2_worker": self.challenge.uses_ec2_worker,
"ec2_storage": self.challenge.ec2_storage,
}
response = self.client.put(
Expand Down Expand Up @@ -747,6 +750,7 @@ def test_particular_challenge_partial_update(self):
"cpu_only_jobs": self.challenge.cpu_only_jobs,
"job_cpu_cores": self.challenge.job_cpu_cores,
"job_memory": self.challenge.job_memory,
"uses_ec2_worker": self.challenge.uses_ec2_worker,
"ec2_storage": self.challenge.ec2_storage,
}
response = self.client.patch(self.url, self.partial_update_data)
Expand Down Expand Up @@ -816,6 +820,7 @@ def test_particular_challenge_update(self):
"cpu_only_jobs": self.challenge.cpu_only_jobs,
"job_cpu_cores": self.challenge.job_cpu_cores,
"job_memory": self.challenge.job_memory,
"uses_ec2_worker": self.challenge.uses_ec2_worker,
"ec2_storage": self.challenge.ec2_storage,
}
response = self.client.put(self.url, self.data)
Expand Down Expand Up @@ -1401,6 +1406,7 @@ def test_get_past_challenges(self):
"cpu_only_jobs": self.challenge3.cpu_only_jobs,
"job_cpu_cores": self.challenge3.job_cpu_cores,
"job_memory": self.challenge3.job_memory,
"uses_ec2_worker": self.challenge3.uses_ec2_worker,
"ec2_storage": self.challenge3.ec2_storage,
}
]
Expand Down Expand Up @@ -1476,6 +1482,7 @@ def test_get_present_challenges(self):
"cpu_only_jobs": self.challenge2.cpu_only_jobs,
"job_cpu_cores": self.challenge2.job_cpu_cores,
"job_memory": self.challenge2.job_memory,
"uses_ec2_worker": self.challenge2.uses_ec2_worker,
"ec2_storage": self.challenge.ec2_storage,
}
]
Expand Down Expand Up @@ -1551,6 +1558,7 @@ def test_get_future_challenges(self):
"cpu_only_jobs": self.challenge4.cpu_only_jobs,
"job_cpu_cores": self.challenge4.job_cpu_cores,
"job_memory": self.challenge4.job_memory,
"uses_ec2_worker": self.challenge4.uses_ec2_worker,
"ec2_storage": self.challenge4.ec2_storage,
}
]
Expand Down Expand Up @@ -1626,6 +1634,7 @@ def test_get_all_challenges(self):
"cpu_only_jobs": self.challenge3.cpu_only_jobs,
"job_cpu_cores": self.challenge3.job_cpu_cores,
"job_memory": self.challenge3.job_memory,
"uses_ec2_worker": self.challenge3.uses_ec2_worker,
"ec2_storage": self.challenge3.ec2_storage,
},
{
Expand Down Expand Up @@ -1685,6 +1694,7 @@ def test_get_all_challenges(self):
"cpu_only_jobs": self.challenge3.cpu_only_jobs,
"job_cpu_cores": self.challenge3.job_cpu_cores,
"job_memory": self.challenge3.job_memory,
"uses_ec2_worker": self.challenge3.uses_ec2_worker,
"ec2_storage": self.challenge3.ec2_storage,
},
{
Expand Down Expand Up @@ -1744,6 +1754,7 @@ def test_get_all_challenges(self):
"cpu_only_jobs": self.challenge2.cpu_only_jobs,
"job_cpu_cores": self.challenge2.job_cpu_cores,
"job_memory": self.challenge2.job_memory,
"uses_ec2_worker": self.challenge2.uses_ec2_worker,
"ec2_storage": self.challenge2.ec2_storage,
},
]
Expand Down Expand Up @@ -1874,6 +1885,7 @@ def test_get_featured_challenges(self):
"cpu_only_jobs": self.challenge3.cpu_only_jobs,
"job_cpu_cores": self.challenge3.job_cpu_cores,
"job_memory": self.challenge3.job_memory,
"uses_ec2_worker": self.challenge3.uses_ec2_worker,
"ec2_storage": self.challenge3.ec2_storage,
}
]
Expand Down Expand Up @@ -2028,6 +2040,7 @@ def test_get_challenge_by_pk_when_user_is_challenge_host(self):
"cpu_only_jobs": self.challenge3.cpu_only_jobs,
"job_cpu_cores": self.challenge3.job_cpu_cores,
"job_memory": self.challenge3.job_memory,
"uses_ec2_worker": self.challenge3.uses_ec2_worker,
"ec2_storage": self.challenge3.ec2_storage,
}

Expand Down Expand Up @@ -2111,6 +2124,7 @@ def test_get_challenge_by_pk_when_user_is_participant(self):
"cpu_only_jobs": self.challenge4.cpu_only_jobs,
"job_cpu_cores": self.challenge4.job_cpu_cores,
"job_memory": self.challenge4.job_memory,
"uses_ec2_worker": self.challenge4.uses_ec2_worker,
"ec2_storage": self.challenge4.ec2_storage,
}

Expand Down Expand Up @@ -2254,6 +2268,7 @@ def test_get_challenge_when_host_team_is_given(self):
"cpu_only_jobs": self.challenge2.cpu_only_jobs,
"job_cpu_cores": self.challenge2.job_cpu_cores,
"job_memory": self.challenge2.job_memory,
"uses_ec2_worker": self.challenge2.uses_ec2_worker,
"ec2_storage": self.challenge2.ec2_storage,
}
]
Expand Down Expand Up @@ -2325,6 +2340,7 @@ def test_get_challenge_when_participant_team_is_given(self):
"cpu_only_jobs": self.challenge2.cpu_only_jobs,
"job_cpu_cores": self.challenge2.job_cpu_cores,
"job_memory": self.challenge2.job_memory,
"uses_ec2_worker": self.challenge2.uses_ec2_worker,
"ec2_storage": self.challenge2.ec2_storage,
}
]
Expand Down Expand Up @@ -2396,6 +2412,7 @@ def test_get_challenge_when_mode_is_participant(self):
"cpu_only_jobs": self.challenge2.cpu_only_jobs,
"job_cpu_cores": self.challenge2.job_cpu_cores,
"job_memory": self.challenge2.job_memory,
"uses_ec2_worker": self.challenge2.uses_ec2_worker,
"ec2_storage": self.challenge2.ec2_storage,
}
]
Expand Down Expand Up @@ -2465,6 +2482,7 @@ def test_get_challenge_when_mode_is_host(self):
"cpu_only_jobs": self.challenge.cpu_only_jobs,
"job_cpu_cores": self.challenge.job_cpu_cores,
"job_memory": self.challenge.job_memory,
"uses_ec2_worker": self.challenge.uses_ec2_worker,
"ec2_storage": self.challenge.ec2_storage,
},
{
Expand Down Expand Up @@ -2524,6 +2542,7 @@ def test_get_challenge_when_mode_is_host(self):
"cpu_only_jobs": self.challenge2.cpu_only_jobs,
"job_cpu_cores": self.challenge2.job_cpu_cores,
"job_memory": self.challenge2.job_memory,
"uses_ec2_worker": self.challenge2.uses_ec2_worker,
"ec2_storage": self.challenge2.ec2_storage,
},
]
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/participants/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,7 @@ def test_get_teams_and_corresponding_challenges_for_a_participant(self):
"cpu_only_jobs": self.challenge1.cpu_only_jobs,
"job_cpu_cores": self.challenge1.job_cpu_cores,
"job_memory": self.challenge1.job_memory,
"uses_ec2_worker": self.challenge1.uses_ec2_worker,
"ec2_storage": self.challenge1.ec2_storage,
},
"participant_team": {
Expand Down Expand Up @@ -961,6 +962,7 @@ def test_get_participant_team_challenge_list(self):
"cpu_only_jobs": self.challenge1.cpu_only_jobs,
"job_cpu_cores": self.challenge1.job_cpu_cores,
"job_memory": self.challenge1.job_memory,
"uses_ec2_worker": self.challenge1.uses_ec2_worker,
"ec2_storage": self.challenge1.ec2_storage,
}
]
Expand Down

0 comments on commit 7568d9c

Please sign in to comment.