Skip to content

Commit

Permalink
tuning-halving: Driver supports multi-cloud
Browse files Browse the repository at this point in the history
Signed-off-by: Alan Nair <[email protected]>
  • Loading branch information
alannair committed Dec 2, 2022
1 parent 07fb731 commit 277b823
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 156 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/e2e-tuning-halving.yml
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ jobs:
- name: Deploy Functions as Knative Services
env:
BUCKET_NAME: vhive-stacking
BUCKET_NAME: vhive-tuning
run: tools/kn_deploy.sh benchmarks/tuning-halving/knative_yamls/s3/*

- name: Check if the Service is Ready
Expand Down
115 changes: 115 additions & 0 deletions benchmarks/tuning-halving/driver/driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# MIT License
#
# Copyright (c) 2022 Alan Nair and The vHive Ecosystem
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import os

from storage import Storage

import itertools
import logging as log
import numpy as np
import pickle
import sklearn.datasets as datasets
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import cross_val_predict
from sklearn.model_selection import StratifiedShuffleSplit

def generate_dataset():
n_samples = 1000
n_features = 1024
X, y = datasets.make_classification(n_samples,
n_features,
n_redundant=0,
n_clusters_per_class=2,
weights=[0.9, 0.1],
flip_y=0.1,
random_state=42)
return {'features': X, 'labels': y}

def generate_hyperparam_sets(param_config):
keys = list(param_config.keys())
values = [param_config[k] for k in keys]
for elements in itertools.product(*values):
yield dict(zip(keys, elements))

class Driver:
def __init__(self, XDTconfig=None):
bucket = os.getenv('BUCKET_NAME', 'vhive-tuning')
self.storageBackend = Storage(bucket, XDTconfig)

def handler_broker(self, event, context):
dataset = generate_dataset()
hyperparam_config = {
'model': 'RandomForestRegressor',
'params': {
'n_estimators': [5, 10, 20],
'min_samples_split': [2, 4],
'random_state': [42]
}
}
models_config = {
'models': [
{
'model': 'RandomForestRegressor',
'params': hyperparam
} for hyperparam in generate_hyperparam_sets(hyperparam_config['params'])
]
}
key = self.storageBackend.put('dataset_key', pickle.dumps(dataset))
return {
'dataset_key': key,
'models_config': models_config
}

def drive(self, driveArgs):
event = self.handler_broker({}, {})
models = event['models_config']['models']
while len(models) > 1:
sample_rate = 1 / len(models)
log.info(f"Running {len(models)} models at sample rate {sample_rate}")

training_responses = []
for count, model_config in enumerate(models):
training_responses.append(
driveArgs['trainerfn']({
'dataset_key': event['dataset_key'],
'model_config': model_config,
'count': count,
'sample_rate': sample_rate
})
)

# Keep models with the best score
top_number = len(training_responses) // 2
sorted_responses = sorted(training_responses, key=lambda result: result['score'], reverse=True)
models = [resp['params'] for resp in sorted_responses[:top_number]]

log.info(f"Training final model {models[0]} on the full dataset")
final_response = driveArgs['trainerfn']({
'dataset_key': event['dataset_key'],
'model_config': models[0],
'count': 0,
'sample_rate': 1.0
})
log.info(f"Final result: score {final_response['score']}, model {final_response['params']['model']}")
return
194 changes: 39 additions & 155 deletions benchmarks/tuning-halving/driver/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,53 +20,48 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from __future__ import print_function

import sys
import os
import sys

# adding python tracing sources to the system path
sys.path.insert(0, os.getcwd() + '/../proto/')
sys.path.insert(0, os.getcwd() + '/../../../../utils/tracing/python')
sys.path.insert(0, os.getcwd() + '/../../../../utils/storage/python')
from driver import Driver
import tracing
from storage import Storage
import helloworld_pb2_grpc
import helloworld_pb2
import tuning_pb2_grpc
import tuning_pb2
import destination as XDTdst
import source as XDTsrc
import utils as XDTutil

import grpc
from grpc_reflection.v1alpha import reflection
import argparse
import boto3

import logging as log
import socket

import sklearn.datasets as datasets
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import cross_val_predict
from sklearn.metrics import roc_auc_score
import itertools
import numpy as np
import pickle
from sklearn.model_selection import StratifiedShuffleSplit

from concurrent import futures
LAMBDA = os.environ.get('IS_LAMBDA', 'no').lower() in ['true', 'yes', '1']

if LAMBDA:
import boto3

if not LAMBDA:
import helloworld_pb2_grpc
import helloworld_pb2
import tuning_pb2_grpc
import tuning_pb2
import destination as XDTdst
import source as XDTsrc
import utils as XDTutil

import grpc
from grpc_reflection.v1alpha import reflection
import argparse
import socket

from concurrent import futures

parser = argparse.ArgumentParser()
parser.add_argument("-dockerCompose", "--dockerCompose", dest="dockerCompose", default=False, help="Env docker compose")
parser.add_argument("-tAddr", "--tAddr", dest="tAddr", default="trainer.default.127.0.0.1.nip.io:80",
help="trainer address")
parser.add_argument("-sp", "--sp", dest="sp", default="80", help="serve port")
parser.add_argument("-zipkin", "--zipkin", dest="zipkinURL",
default="http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans",
help="Zipkin endpoint url")
parser = argparse.ArgumentParser()
parser.add_argument("-dockerCompose", "--dockerCompose",
dest="dockerCompose", default=False, help="Env docker compose")
parser.add_argument("-tAddr", "--tAddr", dest="tAddr",
default="trainer.default.127.0.0.1.nip.io:80",
help="trainer address")
parser.add_argument("-sp", "--sp", dest="sp", default="80", help="serve port")
parser.add_argument("-zipkin", "--zipkin", dest="zipkinURL",
default="http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans",
help="Zipkin endpoint url")

args = parser.parse_args()
args = parser.parse_args()

if tracing.IsTracingEnabled():
tracing.initTracer("driver", url=args.zipkinURL)
Expand All @@ -76,85 +71,10 @@
INLINE = "INLINE"
S3 = "S3"
XDT = "XDT"
storageBackend = None

# set aws credentials:
AWS_ID = os.getenv('AWS_ACCESS_KEY', "")
AWS_SECRET = os.getenv('AWS_SECRET_KEY', "")
# set aws bucket name:
BUCKET_NAME = os.getenv('BUCKET_NAME','vhive-tuning')

def get_self_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# doesn't even have to be reachable
s.connect(('10.255.255.255', 1))
IP = s.getsockname()[0]
except Exception:
IP = '127.0.0.1'
finally:
s.close()
return IP

def generate_dataset():
n_samples = 1000
n_features = 1024
X, y = datasets.make_classification(n_samples,
n_features,
n_redundant=0,
n_clusters_per_class=2,
weights=[0.9, 0.1],
flip_y=0.1,
random_state=42)
return {'features': X, 'labels': y}

def generate_hyperparam_sets(param_config):
keys = list(param_config.keys())
values = [param_config[k] for k in keys]

for elements in itertools.product(*values):
yield dict(zip(keys, elements))

class GreeterServicer(helloworld_pb2_grpc.GreeterServicer):
def __init__(self, transferType, XDTconfig=None):

self.benchName = BUCKET_NAME
self.transferType = transferType
if transferType == S3:
self.s3_client = boto3.resource(
service_name='s3',
region_name=os.getenv("AWS_REGION", 'us-west-1'),
aws_access_key_id=AWS_ID,
aws_secret_access_key=AWS_SECRET
)
elif transferType == XDT:
if XDTconfig is None:
log.fatal("Empty XDT config")
self.XDTconfig = XDTconfig

def handler_broker(self, event, context):
dataset = generate_dataset()
hyperparam_config = {
'model': 'RandomForestRegressor',
'params': {
'n_estimators': [5, 10, 20],
'min_samples_split': [2, 4],
'random_state': [42]
}
}
models_config = {
'models': [
{
'model': 'RandomForestRegressor',
'params': hyperparam
} for hyperparam in generate_hyperparam_sets(hyperparam_config['params'])
]
}
key = storageBackend.put('dataset_key', pickle.dumps(dataset))
return {
'dataset_key': key,
'models_config': models_config
}
def __init__(self, XDTconfig=None):
self.driver = Driver(XDTconfig)

def train(self, arg: dict) -> dict:
log.info("Invoke Trainer")
Expand All @@ -179,52 +99,17 @@ def train(self, arg: dict) -> dict:
# Driver code below
def SayHello(self, request, context):
log.info("Driver received a request")

event = self.handler_broker({}, {})
models = event['models_config']['models']

while len(models)>1:
sample_rate = 1/len(models)
log.info(f"Running {len(models)} models on the dataset with sample rate {sample_rate} ")
# Run different model configs on sampled dataset
training_responses = []
for count, model_config in enumerate(models):
training_responses.append(
self.train({
'dataset_key': event['dataset_key'],
'model_config': model_config,
'count': count,
'sample_rate': sample_rate
})
)

# Keep models with the best score
top_number = len(training_responses)//2
sorted_responses = sorted(training_responses, key=lambda result: result['score'], reverse=True)
models = [resp['params'] for resp in sorted_responses[:top_number]]

log.info(f"Training final model {models[0]} on the full dataset")
final_response = self.train({
'dataset_key': event['dataset_key'],
'model_config': models[0],
'count': 0,
'sample_rate': 1.0
})

log.info(f"Final result: score {final_response['score']}, model {final_response['params']['model']} ")
return helloworld_pb2.HelloReply(message=self.benchName)

self.driver.drive({'trainerfn': self.train})
return helloworld_pb2.HelloReply(message=self.driver.storageBackend.bucket)

def serve():
transferType = os.getenv('TRANSFER_TYPE', S3)
if transferType == S3:
global storageBackend
storageBackend = Storage(BUCKET_NAME)
log.info("Using inline or s3 transfers")
max_workers = int(os.getenv("MAX_SERVER_THREADS", 10))
server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers))
helloworld_pb2_grpc.add_GreeterServicer_to_server(
GreeterServicer(transferType=transferType), server)
GreeterServicer(), server)
SERVICE_NAMES = (
helloworld_pb2.DESCRIPTOR.services_by_name['Greeter'].full_name,
reflection.SERVICE_NAME,
Expand All @@ -239,7 +124,6 @@ def serve():
else:
log.fatal("Invalid Transfer type")


if __name__ == '__main__':
log.basicConfig(level=log.INFO)
serve()

0 comments on commit 277b823

Please sign in to comment.