Skip to content

Commit

Permalink
Rename some field, and fix some bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
nailixing committed May 31, 2020
1 parent 19c3df6 commit 95d20b6
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 43 deletions.
4 changes: 2 additions & 2 deletions docs/src/user/client-upload-pretrained-models.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Example:
task='IMAGE_CLASSIFICATION',
model_file_path='./examples/models/image_classification/TfFeedForward.py',
model_class='TfFeedForward',
model_pretrained_params_id="b42cde03-0bc3-4b15-a276-4d95f6c88fa8.model",
model_preload_file_path="b42cde03-0bc3-4b15-a276-4d95f6c88fa8.model",
dependencies={ModelDependency.TENSORFLOW: '1.12.0'}
)
Expand Down Expand Up @@ -53,7 +53,7 @@ FoodLg model upload
task='IMAGE_CLASSIFICATION',
model_file_path='./examples/models/image_object_detection/food_darknet_xception1.py',
model_class='FoodDetection',
model_pretrained_params_id="model231.zip",
model_preload_file_path="model231.zip",
dependencies={"keras": "2.2.4", "tensorflow": "1.12.0"}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,15 @@ def load_parameters(self, params):

self.class_dict = {v: k for k, v in np.load(self.npy_index)[()].items()}

h5_models_base64 = params['h5_model_base64']
zip_file_base64 = params['zip_file_base64']

self.xception_model = self._build_model(classes=self.classes, image_size=self.image_size)

with tempfile.NamedTemporaryFile() as tmp:
# Convert back to bytes & write to temp file
h5_models_bytes = base64.b64decode(h5_models_base64.encode('utf-8'))
zip_file_base64 = base64.b64decode(zip_file_base64.encode('utf-8'))
with open(tmp.name, 'wb') as f:
f.write(h5_models_bytes)
f.write(zip_file_base64)
with tempfile.TemporaryDirectory() as d:
dataset_zipfile = zipfile.ZipFile(tmp.name, 'r')
dataset_zipfile.extractall(path=d)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,15 @@ def load_parameters(self, params):

self.class_dict = {v: k for k, v in np.load(self.npy_index)[()].items()}

h5_models_base64 = params['h5_model_base64']
zip_file_base64 = params['zip_file_base64']

self.xception_model = self._build_model(classes=self.classes, image_size=self.image_size)

with tempfile.NamedTemporaryFile() as tmp:
# Convert back to bytes & write to temp file
h5_models_bytes = base64.b64decode(h5_models_base64.encode('utf-8'))
zip_file_base64 = base64.b64decode(zip_file_base64.encode('utf-8'))
with open(tmp.name, 'wb') as f:
f.write(h5_models_bytes)
f.write(zip_file_base64)
with tempfile.TemporaryDirectory() as d:
dataset_zipfile = zipfile.ZipFile(tmp.name, 'r')
dataset_zipfile.extractall(path=d)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,15 @@ def load_parameters(self, params):

self.class_dict = {v: k for k, v in np.load(self.npy_index)[()].items()}

h5_models_base64 = params['h5_model_base64']
zip_file_base64 = params['zip_file_base64']

self.xception_model = self._build_model(classes=self.classes, image_size=self.image_size)

with tempfile.NamedTemporaryFile() as tmp:
# Convert back to bytes & write to temp file
h5_models_bytes = base64.b64decode(h5_models_base64.encode('utf-8'))
zip_file_base64 = base64.b64decode(zip_file_base64.encode('utf-8'))
with open(tmp.name, 'wb') as f:
f.write(h5_models_bytes)
f.write(zip_file_base64)
with tempfile.TemporaryDirectory() as d:
dataset_zipfile = zipfile.ZipFile(tmp.name, 'r')
dataset_zipfile.extractall(path=d)
Expand Down
1 change: 1 addition & 0 deletions scripts/docker_swarm/build_images.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ title "Using docker swarm"

echo "using $APP_MODE docker files"
if [[ $APP_MODE = "DEV" ]]
then
title "Building SINGA-Auto Admin's image..."
docker build -t $SINGA_AUTO_IMAGE_ADMIN:$SINGA_AUTO_VERSION -f ./dockerfiles/dev_dockerfiles/admin.Dockerfile \
--build-arg DOCKER_WORKDIR_PATH=$DOCKER_WORKDIR_PATH \
Expand Down
18 changes: 13 additions & 5 deletions singa_auto/admin/view/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,14 @@ def create_model(auth, params):
if 'checkpoint_id' in params and params['checkpoint_id'] is not None:

# if the checkpoint is not .model file, serialize it first
if params['checkpoint_id'].filename.split(".")[-1] != 'model':
h5_model_bytes = params['checkpoint_id'].read()
checkpoint_id = FileParamStore().save({'h5_model_base64': base64.b64encode(h5_model_bytes).decode('utf-8')})
if params['checkpoint_id'].filename.split(".")[-1] == 'zip':
zip_file_base64 = params['checkpoint_id'].read()
checkpoint_id = FileParamStore().save({'zip_file_base64': base64.b64encode(zip_file_base64).decode('utf-8')})
feed_params['checkpoint_id'] = checkpoint_id
# if the model is trained with singa_auto, copy it to params files
else:

# if the model is trained with singa_auto (the model name is ended with 'model'), copy it to params files
# no need to encode it with b54 as it is already encoded in singa-auto after training
elif params['checkpoint_id'].filename.split(".")[-1] == 'model':
with tempfile.NamedTemporaryFile() as f:
file_storage = params['checkpoint_id']
file_storage.save(f.name)
Expand All @@ -77,6 +79,12 @@ def create_model(auth, params):
checkpoint_id)
shutil.copyfile(f.name, dest_file_path)
feed_params['checkpoint_id'] = checkpoint_id
else:

# if the checkpoint name is not zip or model, return errormessage
return jsonify({'ErrorMsg': 'model preload file should be ended with "zip" or "model", '
'if it is a "*.model" file,'
'it should be the model_file saved after training by using singa-auto'}), 400
with admin:
return jsonify(admin.create_model(**feed_params))

Expand Down
10 changes: 5 additions & 5 deletions singa_auto/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def create_model(self,
task: str,
model_file_path: str,
model_class: str,
model_pretrained_params_id: str = None,
model_preload_file_path: str = None,
dependencies: ModelDependencies = None,
access_right: ModelAccessRight = ModelAccessRight.PRIVATE,
docker_image: str = None) -> Dict[str, Any]:
Expand All @@ -245,7 +245,7 @@ def create_model(self,
:param model_class: The name of the model class inside the Python file. This class should implement :class:`singa_auto.model.BaseModel`
:param dependencies: List of Python dependencies & their versions
:param access_right: Model access right
:param model_pretrained_params_id: pretrained mdoel file
:param model_preload_file_path: pretrained mdoel file
:param docker_image: A custom Docker image that extends ``singa_auto/singa_auto_worker``, publicly available on Docker Hub.
:returns: Created model as dictionary
Expand All @@ -270,10 +270,10 @@ def create_model(self,
}
pretrained_files = {}

if model_pretrained_params_id is not None:
if model_preload_file_path is not None:
pretrained_files = {'checkpoint_id': (
model_pretrained_params_id,
open(model_pretrained_params_id, 'rb'),
model_preload_file_path,
open(model_preload_file_path, 'rb'),
'application/octet-stream')}

files = {**model_files, **pretrained_files}
Expand Down
23 changes: 19 additions & 4 deletions singa_auto/predictor/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from .predictor import Predictor
from singa_auto.model import utils
import traceback
import json

service_id = os.environ['SINGA_AUTO_SERVICE_ID']

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -56,19 +58,32 @@ def predict():
img for img in [img_store.read() for img_store in img_stores] if img
]
print("img_stores", img_stores)
print("img_bytes", img_bytes)
if not img_bytes:
return jsonify({'ErrorMsg': 'No image provided'}), 400
print("img_bytes_first 10 bytes", img_bytes[0][:10])
queries = utils.dataset.load_images_from_bytes(img_bytes).tolist()
print("queries_sizes", len(queries))
elif request.get_json():
data = request.get_json()
queries = [data]
elif request.data:
data = json.loads(request.data)
print(data)
queries = [data]
else:
return jsonify({'ErrorMsg': 'No image provided'}), 400
return jsonify({'ErrorMsg': 'data should be either at files or json payload'}), 400
try:
predictor = get_predictor()
queries = utils.dataset.load_images_from_bytes(img_bytes).tolist()
# this queries is type of List[Any]
predictions = predictor.predict(queries)

print(type(predictions))
if isinstance(predictions[0], list):
# this is only for pandavgg demo as the frontend only accept the dictionary.
return jsonify(predictions[0][0]), 200
elif isinstance(predictions, list) and isinstance(predictions[0], str):
# this is only match qa model,
print("this is only match qa model")
return predictions[0], 200
else:
return jsonify(predictions), 200
except:
Expand Down
1 change: 0 additions & 1 deletion singa_auto/predictor/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ def ensemble_probabilities(predictions: List[Any]) -> Any:
def ensemble(predictions: List[Any]) -> Any:
if len(predictions) == 0:
return None
print("predictions is (in ensemble)", predictions)
# Return some worker's predictions
index = 0
prediction = predictions[index]
Expand Down
6 changes: 5 additions & 1 deletion singa_auto/predictor/predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def start(self):

def predict(self, queries):
worker_predictions_list = self._get_predictions_from_workers(queries)
print("Getting prediction list")
predictions = self._combine_worker_predictions(worker_predictions_list)
return predictions

Expand Down Expand Up @@ -124,8 +125,11 @@ def _get_predictions_from_workers(

# Wait for at least 1 free worker
worker_ids = []

while len(worker_ids) == 0:
print("Getting free worker from redis...")
worker_ids = self._redis_cache.get_workers()
time.sleep(0.5)

# For each worker, send queries to worker
pending_queries = set() # {(query_id, worker_id)}
Expand All @@ -150,7 +154,7 @@ def _get_predictions_from_workers(
# Record prediction & mark as not pending
query_id_to_predictions[query_id].append(prediction)
pending_queries.remove((query_id, worker_id))

print("Getting prediction result from kafka...")
time.sleep(PREDICT_LOOP_SLEEP_SECS)

# Reorganize predictions
Expand Down
5 changes: 2 additions & 3 deletions singa_auto/utils/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,12 @@
curr_time = datetime.now().strftime("%Y-%m-%d_%I.%M.%S.%p")



def run_worker(meta_store, start_worker, stop_worker):
service_id = os.environ['SINGA_AUTO_SERVICE_ID']
service_type = os.environ['SINGA_AUTO_SERVICE_TYPE']
container_id = os.environ.get('HOSTNAME', 'localhost')
configure_logging('{}-SvcID-{}'
.format(curr_time, service_id))
configure_logging('{}-SvcID-{}-ContainerID-{}'
.format(curr_time, service_id,container_id))

def _sigterm_handler(_signo, _stack_frame):
logger.warn("Terminal signal received: %s, %s" % (_signo, _stack_frame))
Expand Down
37 changes: 24 additions & 13 deletions singa_auto/worker/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,25 +124,35 @@ def _pull_job_info(self):

inference_job = self._meta_store.get_inference_job(
worker.inference_job_id)

if inference_job is None:
raise InvalidWorkerError(
'No such inference job with ID "{}"'.format(
worker.inference_job_id))
if inference_job.model_id:
model = self._meta_store.get_model(inference_job.model_id)
logger.info(f'Using checkpoint of the model "{model.name}"...')

self._proposal = Proposal.from_jsonable({
"trial_no": 1,
"knobs": {}
})
self._store_params_id = model.checkpoint_id
else:

trial = self._meta_store.get_trial(worker.trial_id)
if trial is None or trial.store_params_id is None: # Must have model saved
trial = self._meta_store.get_trial(worker.trial_id)

# check if there are trained model saved
if trial is None or trial.store_params_id is None:

# if there are no train job, then check if there is checkpoint uplaoded
if inference_job.model_id:
model = self._meta_store.get_model(inference_job.model_id)
logger.info(f'Using checkpoint of the model "{model.name}"...')

self._proposal = Proposal.from_jsonable({
"trial_no": 1,
"knobs": {}
})
self._store_params_id = model.checkpoint_id
else:

# if there is no checkpoint id and no trained model saved
raise InvalidTrialError(
'No saved trial with ID "{}"'.format(worker.trial_id))
'No saved trial with ID "{}" and no checkpoint uploaded'.format(worker.trial_id))
else:

# create inference with trained parameters first
logger.info(f'Using trial "{trial.id}"...')

model = self._meta_store.get_model(trial.model_id)
Expand Down Expand Up @@ -183,6 +193,7 @@ def _predict(self, queries: List[Query]) -> List[Prediction]:
try:
predictions = self._model_inst.predict([x.query for x in queries])
except:
print('Error while making predictions:')
logger.error('Error while making predictions:')
logger.error(traceback.format_exc())
predictions = [None for x in range(len(queries))]
Expand Down

0 comments on commit 95d20b6

Please sign in to comment.