From 5bcb4ea17f689044e680add5cf4204e9337b9f6f Mon Sep 17 00:00:00 2001 From: Alan Nair Date: Fri, 2 Dec 2022 23:41:10 +0530 Subject: [PATCH] tuning-halving: Trainer supports multi-cloud Signed-off-by: Alan Nair --- benchmarks/tuning-halving/trainer/main.py | 168 +++++-------------- benchmarks/tuning-halving/trainer/trainer.py | 83 +++++++++ 2 files changed, 123 insertions(+), 128 deletions(-) create mode 100644 benchmarks/tuning-halving/trainer/trainer.py diff --git a/benchmarks/tuning-halving/trainer/main.py b/benchmarks/tuning-halving/trainer/main.py index bc6046744..e0b451999 100644 --- a/benchmarks/tuning-halving/trainer/main.py +++ b/benchmarks/tuning-halving/trainer/main.py @@ -20,49 +20,37 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -from __future__ import print_function - -import sys import os -import grpc -import argparse -import boto3 -import logging as log -import socket - -import sklearn.datasets as datasets -from sklearn.ensemble import RandomForestRegressor -from sklearn.model_selection import cross_val_predict -from sklearn.metrics import roc_auc_score -import itertools -import numpy as np -import pickle -from sklearn.model_selection import StratifiedShuffleSplit +import sys -# adding python tracing sources to the system path -sys.path.insert(0, os.getcwd() + '/../proto/') -sys.path.insert(0, os.getcwd() + '/../../../../utils/tracing/python') -sys.path.insert(0, os.getcwd() + '/../../../../utils/storage/python') import tracing -from storage import Storage -import tuning_pb2_grpc -import tuning_pb2 -import destination as XDTdst -import source as XDTsrc -import utils as XDTutil +from trainer import Trainer +import logging as log +import pickle - -from concurrent import futures - -parser = argparse.ArgumentParser() -parser.add_argument("-dockerCompose", "--dockerCompose", dest="dockerCompose", default=False, help="Env docker compose") -parser.add_argument("-sp", "--sp", dest="sp", default="80", help="serve port") -parser.add_argument("-zipkin", "--zipkin", dest="zipkinURL", - default="http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans", - help="Zipkin endpoint url") - -args = parser.parse_args() +LAMBDA = os.environ.get('IS_LAMBDA', 'no').lower() in ['true', 'yes', '1'] + +if not LAMBDA: + import grpc + import argparse + import socket + import tuning_pb2_grpc + import tuning_pb2 + import destination as XDTdst + import source as XDTsrc + import utils as XDTutil + from concurrent import futures + + parser = argparse.ArgumentParser() + parser.add_argument("-dockerCompose", "--dockerCompose", dest="dockerCompose", + default=False, help="Env docker compose") + parser.add_argument("-sp", "--sp", dest="sp", default="80", help="serve port") + parser.add_argument("-zipkin", "--zipkin", dest="zipkinURL", + default="http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans", + help="Zipkin endpoint url") + + args = parser.parse_args() if tracing.IsTracingEnabled(): tracing.initTracer("trainer", url=args.zipkinURL) @@ -72,112 +60,36 @@ INLINE = "INLINE" S3 = "S3" XDT = "XDT" -storageBackend = None - -# set aws credentials: -AWS_ID = os.getenv('AWS_ACCESS_KEY', "") -AWS_SECRET = os.getenv('AWS_SECRET_KEY', "") -# set aws bucket name: -BUCKET_NAME = os.getenv('BUCKET_NAME','vhive-tuning') - -def get_self_ip(): - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - try: - # doesn't even have to be reachable - s.connect(('10.255.255.255', 1)) - IP = s.getsockname()[0] - except Exception: - IP = '127.0.0.1' - finally: - s.close() - return IP - - -def model_dispatcher(model_name): - if model_name=='LinearSVR': - return LinearSVR - elif model_name=='LinearRegression': - return LinearRegression - elif model_name=='RandomForestRegressor': - return RandomForestRegressor - elif model_name=='KNeighborsRegressor': - return KNeighborsRegressor - elif model_name=='LogisticRegression': - return LogisticRegression - else: - raise ValueError(f"Model {model_name} not found") - class TrainerServicer(tuning_pb2_grpc.TrainerServicer): - def __init__(self, transferType, XDTconfig=None): - - self.benchName = BUCKET_NAME - self.transferType = transferType - self.trainer_id = "" - if transferType == S3: - self.s3_client = boto3.resource( - service_name='s3', - region_name=os.getenv("AWS_REGION", 'us-west-1'), - aws_access_key_id=AWS_ID, - aws_secret_access_key=AWS_SECRET - ) - elif transferType == XDT: - if XDTconfig is None: - log.fatal("Empty XDT config") - self.XDTconfig = XDTconfig + def __init__(self, XDTconfig=None): + self.trainer = Trainer(XDTconfig) def Train(self, request, context): - # Read from S3 - dataset = pickle.loads(storageBackend.get(request.dataset_key)) - - with tracing.Span("Training a model"): - model_config = pickle.loads(request.model_config) - sample_rate = request.sample_rate - count = request.count - - # Init model - model_class = model_dispatcher(model_config['model']) - model = model_class(**model_config['params']) - - # Train model and get predictions - X = dataset['features'] - y = dataset['labels'] - if sample_rate==1.0: - X_sampled, y_sampled = X, y - else: - stratified_split = StratifiedShuffleSplit(n_splits=1, train_size=sample_rate, random_state=42) - sampled_index, _ = list(stratified_split.split(X, y))[0] - X_sampled, y_sampled = X[sampled_index], y[sampled_index] - y_pred = cross_val_predict(model, X_sampled, y_sampled, cv=5) - model.fit(X_sampled, y_sampled) - score = roc_auc_score(y_sampled, y_pred) - log.info(f"{model_config['model']}, params: {model_config['params']}, dataset size: {len(y_sampled)},score: {score}") - - # Write to S3 - model_key = f"model_{count}" - pred_key = f"pred_model_{count}" - model_key = storageBackend.put(model_key, pickle.dumps(model)) - pred_key = storageBackend.put(pred_key, pickle.dumps(y_pred)) - + trainerArgs = { + 'dataset_key': request.dataset_key, + 'model_config': pickle.loads(request.model_config), + 'sample_rate': request.sample_rate, + 'count': request.count + } + response = self.trainer.train(trainerArgs) return tuning_pb2.TrainReply( model=b'', - model_key=model_key, - pred_key=pred_key, - score=score, - params=pickle.dumps(model_config), + model_key=response['model_key'], + pred_key=response['pred_key'], + score=response['score'], + params=pickle.dumps(trainerArgs['model_config']), ) def serve(): transferType = os.getenv('TRANSFER_TYPE', S3) if transferType == S3: - global storageBackend - storageBackend = Storage(BUCKET_NAME) log.info("Using inline or s3 transfers") max_workers = int(os.getenv("MAX_SERVER_THREADS", 10)) server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers)) tuning_pb2_grpc.add_TrainerServicer_to_server( - TrainerServicer(transferType=transferType), server) + TrainerServicer(), server) server.add_insecure_port('[::]:' + args.sp) server.start() server.wait_for_termination() diff --git a/benchmarks/tuning-halving/trainer/trainer.py b/benchmarks/tuning-halving/trainer/trainer.py new file mode 100644 index 000000000..c513417d3 --- /dev/null +++ b/benchmarks/tuning-halving/trainer/trainer.py @@ -0,0 +1,83 @@ +# MIT License +# +# Copyright (c) 2022 Alan Nair and The vHive Ecosystem +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import os + +from storage import Storage +import tracing + +import logging as log +import numpy as np +import pickle +import sklearn.datasets as datasets +from sklearn.ensemble import RandomForestRegressor +from sklearn.model_selection import cross_val_predict +from sklearn.metrics import roc_auc_score +from sklearn.model_selection import StratifiedShuffleSplit + +def model_dispatcher(model_name): + if model_name=='LinearSVR': + return LinearSVR + elif model_name=='LinearRegression': + return LinearRegression + elif model_name=='RandomForestRegressor': + return RandomForestRegressor + elif model_name=='KNeighborsRegressor': + return KNeighborsRegressor + elif model_name=='LogisticRegression': + return LogisticRegression + else: + raise ValueError(f"Model {model_name} not found") + +class Trainer: + def __init__(self, XDTconfig=None): + bucket = os.getenv('BUCKET_NAME', 'vhive-tuning') + self.storageBackend = Storage(bucket, XDTconfig) + + def train(self, args): + dataset = pickle.loads(self.storageBackend.get(args['dataset_key'])) + with tracing.Span("Training a model"): + # Init model + model_class = model_dispatcher(args['model_config']['model']) + model = model_class(**args['model_config']['params']) + + # Train model and get predictions + X = dataset['features'] + y = dataset['labels'] + if args['sample_rate'] == 1.0: + X_sampled, y_sampled = X, y + else: + strat_split = StratifiedShuffleSplit(n_splits=1, train_size=args['sample_rate'], random_state=42) + sampled_index, _ = list(strat_split.split(X, y))[0] + X_sampled, y_sampled = X[sampled_index], y[sampled_index] + + y_pred = cross_val_predict(model, X_sampled, y_sampled, cv=5) + model.fit(X_sampled, y_sampled) + score = roc_auc_score(y_sampled, y_pred) + log.info(f"{args['model_config']['model']}, params: {args['model_config']['params']}, dataset size: {len(y_sampled)},score: {score}") + + mkey = f"model_{args['count']}" + pkey = f"pred_model_{args['count']}" + model_key = self.storageBackend.put(mkey, pickle.dumps(model)) + pred_key = self.storageBackend.put(pkey, pickle.dumps(y_pred)) + + return {'model_key': model_key, 'pred_key': pred_key, 'score': score}