Skip to content

Commit

Permalink
tuning-halving: Trainer supports multi-cloud
Browse files Browse the repository at this point in the history
Signed-off-by: Alan Nair <[email protected]>
  • Loading branch information
alannair committed Dec 2, 2022
1 parent 277b823 commit 5bcb4ea
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 128 deletions.
168 changes: 40 additions & 128 deletions benchmarks/tuning-halving/trainer/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,49 +20,37 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from __future__ import print_function

import sys
import os
import grpc
import argparse
import boto3
import logging as log
import socket

import sklearn.datasets as datasets
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import cross_val_predict
from sklearn.metrics import roc_auc_score
import itertools
import numpy as np
import pickle
from sklearn.model_selection import StratifiedShuffleSplit
import sys

# adding python tracing sources to the system path
sys.path.insert(0, os.getcwd() + '/../proto/')
sys.path.insert(0, os.getcwd() + '/../../../../utils/tracing/python')
sys.path.insert(0, os.getcwd() + '/../../../../utils/storage/python')
import tracing
from storage import Storage
import tuning_pb2_grpc
import tuning_pb2
import destination as XDTdst
import source as XDTsrc
import utils as XDTutil
from trainer import Trainer

import logging as log
import pickle


from concurrent import futures

parser = argparse.ArgumentParser()
parser.add_argument("-dockerCompose", "--dockerCompose", dest="dockerCompose", default=False, help="Env docker compose")
parser.add_argument("-sp", "--sp", dest="sp", default="80", help="serve port")
parser.add_argument("-zipkin", "--zipkin", dest="zipkinURL",
default="http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans",
help="Zipkin endpoint url")

args = parser.parse_args()
LAMBDA = os.environ.get('IS_LAMBDA', 'no').lower() in ['true', 'yes', '1']

if not LAMBDA:
import grpc
import argparse
import socket
import tuning_pb2_grpc
import tuning_pb2
import destination as XDTdst
import source as XDTsrc
import utils as XDTutil
from concurrent import futures

parser = argparse.ArgumentParser()
parser.add_argument("-dockerCompose", "--dockerCompose", dest="dockerCompose",
default=False, help="Env docker compose")
parser.add_argument("-sp", "--sp", dest="sp", default="80", help="serve port")
parser.add_argument("-zipkin", "--zipkin", dest="zipkinURL",
default="http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans",
help="Zipkin endpoint url")

args = parser.parse_args()

if tracing.IsTracingEnabled():
tracing.initTracer("trainer", url=args.zipkinURL)
Expand All @@ -72,112 +60,36 @@
INLINE = "INLINE"
S3 = "S3"
XDT = "XDT"
storageBackend = None

# set aws credentials:
AWS_ID = os.getenv('AWS_ACCESS_KEY', "")
AWS_SECRET = os.getenv('AWS_SECRET_KEY', "")
# set aws bucket name:
BUCKET_NAME = os.getenv('BUCKET_NAME','vhive-tuning')

def get_self_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# doesn't even have to be reachable
s.connect(('10.255.255.255', 1))
IP = s.getsockname()[0]
except Exception:
IP = '127.0.0.1'
finally:
s.close()
return IP


def model_dispatcher(model_name):
if model_name=='LinearSVR':
return LinearSVR
elif model_name=='LinearRegression':
return LinearRegression
elif model_name=='RandomForestRegressor':
return RandomForestRegressor
elif model_name=='KNeighborsRegressor':
return KNeighborsRegressor
elif model_name=='LogisticRegression':
return LogisticRegression
else:
raise ValueError(f"Model {model_name} not found")


class TrainerServicer(tuning_pb2_grpc.TrainerServicer):
def __init__(self, transferType, XDTconfig=None):

self.benchName = BUCKET_NAME
self.transferType = transferType
self.trainer_id = ""
if transferType == S3:
self.s3_client = boto3.resource(
service_name='s3',
region_name=os.getenv("AWS_REGION", 'us-west-1'),
aws_access_key_id=AWS_ID,
aws_secret_access_key=AWS_SECRET
)
elif transferType == XDT:
if XDTconfig is None:
log.fatal("Empty XDT config")
self.XDTconfig = XDTconfig
def __init__(self, XDTconfig=None):
self.trainer = Trainer(XDTconfig)

def Train(self, request, context):
# Read from S3
dataset = pickle.loads(storageBackend.get(request.dataset_key))

with tracing.Span("Training a model"):
model_config = pickle.loads(request.model_config)
sample_rate = request.sample_rate
count = request.count

# Init model
model_class = model_dispatcher(model_config['model'])
model = model_class(**model_config['params'])

# Train model and get predictions
X = dataset['features']
y = dataset['labels']
if sample_rate==1.0:
X_sampled, y_sampled = X, y
else:
stratified_split = StratifiedShuffleSplit(n_splits=1, train_size=sample_rate, random_state=42)
sampled_index, _ = list(stratified_split.split(X, y))[0]
X_sampled, y_sampled = X[sampled_index], y[sampled_index]
y_pred = cross_val_predict(model, X_sampled, y_sampled, cv=5)
model.fit(X_sampled, y_sampled)
score = roc_auc_score(y_sampled, y_pred)
log.info(f"{model_config['model']}, params: {model_config['params']}, dataset size: {len(y_sampled)},score: {score}")

# Write to S3
model_key = f"model_{count}"
pred_key = f"pred_model_{count}"
model_key = storageBackend.put(model_key, pickle.dumps(model))
pred_key = storageBackend.put(pred_key, pickle.dumps(y_pred))

trainerArgs = {
'dataset_key': request.dataset_key,
'model_config': pickle.loads(request.model_config),
'sample_rate': request.sample_rate,
'count': request.count
}
response = self.trainer.train(trainerArgs)
return tuning_pb2.TrainReply(
model=b'',
model_key=model_key,
pred_key=pred_key,
score=score,
params=pickle.dumps(model_config),
model_key=response['model_key'],
pred_key=response['pred_key'],
score=response['score'],
params=pickle.dumps(trainerArgs['model_config']),
)


def serve():
transferType = os.getenv('TRANSFER_TYPE', S3)
if transferType == S3:
global storageBackend
storageBackend = Storage(BUCKET_NAME)
log.info("Using inline or s3 transfers")
max_workers = int(os.getenv("MAX_SERVER_THREADS", 10))
server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers))
tuning_pb2_grpc.add_TrainerServicer_to_server(
TrainerServicer(transferType=transferType), server)
TrainerServicer(), server)
server.add_insecure_port('[::]:' + args.sp)
server.start()
server.wait_for_termination()
Expand Down
83 changes: 83 additions & 0 deletions benchmarks/tuning-halving/trainer/trainer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# MIT License
#
# Copyright (c) 2022 Alan Nair and The vHive Ecosystem
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import os

from storage import Storage
import tracing

import logging as log
import numpy as np
import pickle
import sklearn.datasets as datasets
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import cross_val_predict
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import StratifiedShuffleSplit

def model_dispatcher(model_name):
if model_name=='LinearSVR':
return LinearSVR
elif model_name=='LinearRegression':
return LinearRegression
elif model_name=='RandomForestRegressor':
return RandomForestRegressor
elif model_name=='KNeighborsRegressor':
return KNeighborsRegressor
elif model_name=='LogisticRegression':
return LogisticRegression
else:
raise ValueError(f"Model {model_name} not found")

class Trainer:
def __init__(self, XDTconfig=None):
bucket = os.getenv('BUCKET_NAME', 'vhive-tuning')
self.storageBackend = Storage(bucket, XDTconfig)

def train(self, args):
dataset = pickle.loads(self.storageBackend.get(args['dataset_key']))
with tracing.Span("Training a model"):
# Init model
model_class = model_dispatcher(args['model_config']['model'])
model = model_class(**args['model_config']['params'])

# Train model and get predictions
X = dataset['features']
y = dataset['labels']
if args['sample_rate'] == 1.0:
X_sampled, y_sampled = X, y
else:
strat_split = StratifiedShuffleSplit(n_splits=1, train_size=args['sample_rate'], random_state=42)
sampled_index, _ = list(strat_split.split(X, y))[0]
X_sampled, y_sampled = X[sampled_index], y[sampled_index]

y_pred = cross_val_predict(model, X_sampled, y_sampled, cv=5)
model.fit(X_sampled, y_sampled)
score = roc_auc_score(y_sampled, y_pred)
log.info(f"{args['model_config']['model']}, params: {args['model_config']['params']}, dataset size: {len(y_sampled)},score: {score}")

mkey = f"model_{args['count']}"
pkey = f"pred_model_{args['count']}"
model_key = self.storageBackend.put(mkey, pickle.dumps(model))
pred_key = self.storageBackend.put(pkey, pickle.dumps(y_pred))

return {'model_key': model_key, 'pred_key': pred_key, 'score': score}

0 comments on commit 5bcb4ea

Please sign in to comment.