From 95d20b625c1e04fa18b16f7e3313765626271620 Mon Sep 17 00:00:00 2001 From: nailixing Date: Sun, 31 May 2020 22:54:32 +0800 Subject: [PATCH] Rename some field, and fix some bugs --- .../user/client-upload-pretrained-models.rst | 4 +- .../food_darknet_xception.py | 6 +-- .../food_darknet_xception1.py | 6 +-- .../food_darknet_xception2.py | 6 +-- scripts/docker_swarm/build_images.sh | 1 + singa_auto/admin/view/model.py | 18 ++++++--- singa_auto/client/client.py | 10 ++--- singa_auto/predictor/app.py | 23 ++++++++++-- singa_auto/predictor/ensemble.py | 1 - singa_auto/predictor/predictor.py | 6 ++- singa_auto/utils/service.py | 5 +-- singa_auto/worker/inference.py | 37 ++++++++++++------- 12 files changed, 80 insertions(+), 43 deletions(-) diff --git a/docs/src/user/client-upload-pretrained-models.rst b/docs/src/user/client-upload-pretrained-models.rst index 681ccfc4..7e2b2a7b 100644 --- a/docs/src/user/client-upload-pretrained-models.rst +++ b/docs/src/user/client-upload-pretrained-models.rst @@ -12,7 +12,7 @@ Example: task='IMAGE_CLASSIFICATION', model_file_path='./examples/models/image_classification/TfFeedForward.py', model_class='TfFeedForward', - model_pretrained_params_id="b42cde03-0bc3-4b15-a276-4d95f6c88fa8.model", + model_preload_file_path="b42cde03-0bc3-4b15-a276-4d95f6c88fa8.model", dependencies={ModelDependency.TENSORFLOW: '1.12.0'} ) @@ -53,7 +53,7 @@ FoodLg model upload task='IMAGE_CLASSIFICATION', model_file_path='./examples/models/image_object_detection/food_darknet_xception1.py', model_class='FoodDetection', - model_pretrained_params_id="model231.zip", + model_preload_file_path="model231.zip", dependencies={"keras": "2.2.4", "tensorflow": "1.12.0"} ) diff --git a/examples/models/image_object_detection/food_darknet_xception.py b/examples/models/image_object_detection/food_darknet_xception.py index 4fc5f799..a8dbb17e 100644 --- a/examples/models/image_object_detection/food_darknet_xception.py +++ b/examples/models/image_object_detection/food_darknet_xception.py @@ -93,15 +93,15 @@ def load_parameters(self, params): self.class_dict = {v: k for k, v in np.load(self.npy_index)[()].items()} - h5_models_base64 = params['h5_model_base64'] + zip_file_base64 = params['zip_file_base64'] self.xception_model = self._build_model(classes=self.classes, image_size=self.image_size) with tempfile.NamedTemporaryFile() as tmp: # Convert back to bytes & write to temp file - h5_models_bytes = base64.b64decode(h5_models_base64.encode('utf-8')) + zip_file_base64 = base64.b64decode(zip_file_base64.encode('utf-8')) with open(tmp.name, 'wb') as f: - f.write(h5_models_bytes) + f.write(zip_file_base64) with tempfile.TemporaryDirectory() as d: dataset_zipfile = zipfile.ZipFile(tmp.name, 'r') dataset_zipfile.extractall(path=d) diff --git a/examples/models/image_object_detection/food_darknet_xception1.py b/examples/models/image_object_detection/food_darknet_xception1.py index c6dd7d95..638e8a40 100644 --- a/examples/models/image_object_detection/food_darknet_xception1.py +++ b/examples/models/image_object_detection/food_darknet_xception1.py @@ -93,15 +93,15 @@ def load_parameters(self, params): self.class_dict = {v: k for k, v in np.load(self.npy_index)[()].items()} - h5_models_base64 = params['h5_model_base64'] + zip_file_base64 = params['zip_file_base64'] self.xception_model = self._build_model(classes=self.classes, image_size=self.image_size) with tempfile.NamedTemporaryFile() as tmp: # Convert back to bytes & write to temp file - h5_models_bytes = base64.b64decode(h5_models_base64.encode('utf-8')) + zip_file_base64 = base64.b64decode(zip_file_base64.encode('utf-8')) with open(tmp.name, 'wb') as f: - f.write(h5_models_bytes) + f.write(zip_file_base64) with tempfile.TemporaryDirectory() as d: dataset_zipfile = zipfile.ZipFile(tmp.name, 'r') dataset_zipfile.extractall(path=d) diff --git a/examples/models/image_object_detection/food_darknet_xception2.py b/examples/models/image_object_detection/food_darknet_xception2.py index 1725c74c..9faa0c96 100644 --- a/examples/models/image_object_detection/food_darknet_xception2.py +++ b/examples/models/image_object_detection/food_darknet_xception2.py @@ -93,15 +93,15 @@ def load_parameters(self, params): self.class_dict = {v: k for k, v in np.load(self.npy_index)[()].items()} - h5_models_base64 = params['h5_model_base64'] + zip_file_base64 = params['zip_file_base64'] self.xception_model = self._build_model(classes=self.classes, image_size=self.image_size) with tempfile.NamedTemporaryFile() as tmp: # Convert back to bytes & write to temp file - h5_models_bytes = base64.b64decode(h5_models_base64.encode('utf-8')) + zip_file_base64 = base64.b64decode(zip_file_base64.encode('utf-8')) with open(tmp.name, 'wb') as f: - f.write(h5_models_bytes) + f.write(zip_file_base64) with tempfile.TemporaryDirectory() as d: dataset_zipfile = zipfile.ZipFile(tmp.name, 'r') dataset_zipfile.extractall(path=d) diff --git a/scripts/docker_swarm/build_images.sh b/scripts/docker_swarm/build_images.sh index 698b166f..45b4a135 100644 --- a/scripts/docker_swarm/build_images.sh +++ b/scripts/docker_swarm/build_images.sh @@ -31,6 +31,7 @@ title "Using docker swarm" echo "using $APP_MODE docker files" if [[ $APP_MODE = "DEV" ]] +then title "Building SINGA-Auto Admin's image..." docker build -t $SINGA_AUTO_IMAGE_ADMIN:$SINGA_AUTO_VERSION -f ./dockerfiles/dev_dockerfiles/admin.Dockerfile \ --build-arg DOCKER_WORKDIR_PATH=$DOCKER_WORKDIR_PATH \ diff --git a/singa_auto/admin/view/model.py b/singa_auto/admin/view/model.py index 0affdac5..ef9ee287 100644 --- a/singa_auto/admin/view/model.py +++ b/singa_auto/admin/view/model.py @@ -61,12 +61,14 @@ def create_model(auth, params): if 'checkpoint_id' in params and params['checkpoint_id'] is not None: # if the checkpoint is not .model file, serialize it first - if params['checkpoint_id'].filename.split(".")[-1] != 'model': - h5_model_bytes = params['checkpoint_id'].read() - checkpoint_id = FileParamStore().save({'h5_model_base64': base64.b64encode(h5_model_bytes).decode('utf-8')}) + if params['checkpoint_id'].filename.split(".")[-1] == 'zip': + zip_file_base64 = params['checkpoint_id'].read() + checkpoint_id = FileParamStore().save({'zip_file_base64': base64.b64encode(zip_file_base64).decode('utf-8')}) feed_params['checkpoint_id'] = checkpoint_id - # if the model is trained with singa_auto, copy it to params files - else: + + # if the model is trained with singa_auto (the model name is ended with 'model'), copy it to params files + # no need to encode it with b54 as it is already encoded in singa-auto after training + elif params['checkpoint_id'].filename.split(".")[-1] == 'model': with tempfile.NamedTemporaryFile() as f: file_storage = params['checkpoint_id'] file_storage.save(f.name) @@ -77,6 +79,12 @@ def create_model(auth, params): checkpoint_id) shutil.copyfile(f.name, dest_file_path) feed_params['checkpoint_id'] = checkpoint_id + else: + + # if the checkpoint name is not zip or model, return errormessage + return jsonify({'ErrorMsg': 'model preload file should be ended with "zip" or "model", ' + 'if it is a "*.model" file,' + 'it should be the model_file saved after training by using singa-auto'}), 400 with admin: return jsonify(admin.create_model(**feed_params)) diff --git a/singa_auto/client/client.py b/singa_auto/client/client.py index 51d4ac44..ae57e31e 100644 --- a/singa_auto/client/client.py +++ b/singa_auto/client/client.py @@ -230,7 +230,7 @@ def create_model(self, task: str, model_file_path: str, model_class: str, - model_pretrained_params_id: str = None, + model_preload_file_path: str = None, dependencies: ModelDependencies = None, access_right: ModelAccessRight = ModelAccessRight.PRIVATE, docker_image: str = None) -> Dict[str, Any]: @@ -245,7 +245,7 @@ def create_model(self, :param model_class: The name of the model class inside the Python file. This class should implement :class:`singa_auto.model.BaseModel` :param dependencies: List of Python dependencies & their versions :param access_right: Model access right - :param model_pretrained_params_id: pretrained mdoel file + :param model_preload_file_path: pretrained mdoel file :param docker_image: A custom Docker image that extends ``singa_auto/singa_auto_worker``, publicly available on Docker Hub. :returns: Created model as dictionary @@ -270,10 +270,10 @@ def create_model(self, } pretrained_files = {} - if model_pretrained_params_id is not None: + if model_preload_file_path is not None: pretrained_files = {'checkpoint_id': ( - model_pretrained_params_id, - open(model_pretrained_params_id, 'rb'), + model_preload_file_path, + open(model_preload_file_path, 'rb'), 'application/octet-stream')} files = {**model_files, **pretrained_files} diff --git a/singa_auto/predictor/app.py b/singa_auto/predictor/app.py index 1a9f80f2..ee592aba 100644 --- a/singa_auto/predictor/app.py +++ b/singa_auto/predictor/app.py @@ -23,6 +23,8 @@ from .predictor import Predictor from singa_auto.model import utils import traceback +import json + service_id = os.environ['SINGA_AUTO_SERVICE_ID'] logger = logging.getLogger(__name__) @@ -56,19 +58,32 @@ def predict(): img for img in [img_store.read() for img_store in img_stores] if img ] print("img_stores", img_stores) - print("img_bytes", img_bytes) if not img_bytes: return jsonify({'ErrorMsg': 'No image provided'}), 400 + print("img_bytes_first 10 bytes", img_bytes[0][:10]) + queries = utils.dataset.load_images_from_bytes(img_bytes).tolist() + print("queries_sizes", len(queries)) + elif request.get_json(): + data = request.get_json() + queries = [data] + elif request.data: + data = json.loads(request.data) + print(data) + queries = [data] else: - return jsonify({'ErrorMsg': 'No image provided'}), 400 + return jsonify({'ErrorMsg': 'data should be either at files or json payload'}), 400 try: predictor = get_predictor() - queries = utils.dataset.load_images_from_bytes(img_bytes).tolist() + # this queries is type of List[Any] predictions = predictor.predict(queries) - + print(type(predictions)) if isinstance(predictions[0], list): # this is only for pandavgg demo as the frontend only accept the dictionary. return jsonify(predictions[0][0]), 200 + elif isinstance(predictions, list) and isinstance(predictions[0], str): + # this is only match qa model, + print("this is only match qa model") + return predictions[0], 200 else: return jsonify(predictions), 200 except: diff --git a/singa_auto/predictor/ensemble.py b/singa_auto/predictor/ensemble.py index 39202588..fa6ca28d 100644 --- a/singa_auto/predictor/ensemble.py +++ b/singa_auto/predictor/ensemble.py @@ -52,7 +52,6 @@ def ensemble_probabilities(predictions: List[Any]) -> Any: def ensemble(predictions: List[Any]) -> Any: if len(predictions) == 0: return None - print("predictions is (in ensemble)", predictions) # Return some worker's predictions index = 0 prediction = predictions[index] diff --git a/singa_auto/predictor/predictor.py b/singa_auto/predictor/predictor.py index 5de9d267..b1fd756b 100644 --- a/singa_auto/predictor/predictor.py +++ b/singa_auto/predictor/predictor.py @@ -68,6 +68,7 @@ def start(self): def predict(self, queries): worker_predictions_list = self._get_predictions_from_workers(queries) + print("Getting prediction list") predictions = self._combine_worker_predictions(worker_predictions_list) return predictions @@ -124,8 +125,11 @@ def _get_predictions_from_workers( # Wait for at least 1 free worker worker_ids = [] + while len(worker_ids) == 0: + print("Getting free worker from redis...") worker_ids = self._redis_cache.get_workers() + time.sleep(0.5) # For each worker, send queries to worker pending_queries = set() # {(query_id, worker_id)} @@ -150,7 +154,7 @@ def _get_predictions_from_workers( # Record prediction & mark as not pending query_id_to_predictions[query_id].append(prediction) pending_queries.remove((query_id, worker_id)) - + print("Getting prediction result from kafka...") time.sleep(PREDICT_LOOP_SLEEP_SECS) # Reorganize predictions diff --git a/singa_auto/utils/service.py b/singa_auto/utils/service.py index 90594343..e8a55085 100644 --- a/singa_auto/utils/service.py +++ b/singa_auto/utils/service.py @@ -31,13 +31,12 @@ curr_time = datetime.now().strftime("%Y-%m-%d_%I.%M.%S.%p") - def run_worker(meta_store, start_worker, stop_worker): service_id = os.environ['SINGA_AUTO_SERVICE_ID'] service_type = os.environ['SINGA_AUTO_SERVICE_TYPE'] container_id = os.environ.get('HOSTNAME', 'localhost') - configure_logging('{}-SvcID-{}' - .format(curr_time, service_id)) + configure_logging('{}-SvcID-{}-ContainerID-{}' + .format(curr_time, service_id,container_id)) def _sigterm_handler(_signo, _stack_frame): logger.warn("Terminal signal received: %s, %s" % (_signo, _stack_frame)) diff --git a/singa_auto/worker/inference.py b/singa_auto/worker/inference.py index 3f1f453e..4b59adda 100644 --- a/singa_auto/worker/inference.py +++ b/singa_auto/worker/inference.py @@ -124,25 +124,35 @@ def _pull_job_info(self): inference_job = self._meta_store.get_inference_job( worker.inference_job_id) + if inference_job is None: raise InvalidWorkerError( 'No such inference job with ID "{}"'.format( worker.inference_job_id)) - if inference_job.model_id: - model = self._meta_store.get_model(inference_job.model_id) - logger.info(f'Using checkpoint of the model "{model.name}"...') - - self._proposal = Proposal.from_jsonable({ - "trial_no": 1, - "knobs": {} - }) - self._store_params_id = model.checkpoint_id - else: - trial = self._meta_store.get_trial(worker.trial_id) - if trial is None or trial.store_params_id is None: # Must have model saved + trial = self._meta_store.get_trial(worker.trial_id) + + # check if there are trained model saved + if trial is None or trial.store_params_id is None: + + # if there are no train job, then check if there is checkpoint uplaoded + if inference_job.model_id: + model = self._meta_store.get_model(inference_job.model_id) + logger.info(f'Using checkpoint of the model "{model.name}"...') + + self._proposal = Proposal.from_jsonable({ + "trial_no": 1, + "knobs": {} + }) + self._store_params_id = model.checkpoint_id + else: + + # if there is no checkpoint id and no trained model saved raise InvalidTrialError( - 'No saved trial with ID "{}"'.format(worker.trial_id)) + 'No saved trial with ID "{}" and no checkpoint uploaded'.format(worker.trial_id)) + else: + + # create inference with trained parameters first logger.info(f'Using trial "{trial.id}"...') model = self._meta_store.get_model(trial.model_id) @@ -183,6 +193,7 @@ def _predict(self, queries: List[Query]) -> List[Prediction]: try: predictions = self._model_inst.predict([x.query for x in queries]) except: + print('Error while making predictions:') logger.error('Error while making predictions:') logger.error(traceback.format_exc()) predictions = [None for x in range(len(queries))]