Skip to content

Commit

Permalink
Raise inference failure exceptions in default handlers (deepjavalibra…
Browse files Browse the repository at this point in the history
…ry#883)

* raise inference failure exceptions in default handlers
  • Loading branch information
rohithkrn authored and KexinFeng committed Aug 16, 2023
1 parent 1619c6e commit c4f29a5
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 262 deletions.
165 changes: 81 additions & 84 deletions engines/python/setup/djl_python/deepspeed.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,96 +279,93 @@ def format_input_for_task(self, input_values):
return batch_inputs

def inference(self, inputs: Input):
try:
content_type = inputs.get_property("Content-Type")
input_data = []
input_size = []
model_kwargs = {}
batch = inputs.get_batches()
if content_type is not None and content_type.startswith(
"application/json"):
first = True
for item in batch:
json_input = item.get_as_json()
if isinstance(json_input, dict):
input_size.append(len(json_input.get("inputs")))
input_data.extend(
self.format_input_for_task(
json_input.pop("inputs")))
if first:
model_kwargs = json_input.pop("parameters", {})
first = False
else:
if model_kwargs != json_input.pop(
"parameters", {}):
return Output().error(
"In order to enable dynamic batching, all input batches must have the same parameters"
)
content_type = inputs.get_property("Content-Type")
input_data = []
input_size = []
model_kwargs = {}
batch = inputs.get_batches()
if content_type is not None and content_type.startswith(
"application/json"):
first = True
for item in batch:
json_input = item.get_as_json()
if isinstance(json_input, dict):
input_size.append(len(json_input.get("inputs")))
input_data.extend(
self.format_input_for_task(
json_input.pop("inputs")))
if first:
model_kwargs = json_input.pop("parameters", {})
first = False
else:
input_size.append(len(json_input))
input_data.extend(json_input)
else:
for item in batch:
input_size.append(1)
input_data.extend(item.get_as_string())

outputs = Output()
if self.enable_streaming:
outputs.add_property("content-type", "application/jsonlines")
if self.enable_streaming == "huggingface":
outputs.add_stream_content(
StreamingUtils.use_hf_default_streamer(
self.model, self.tokenizer, input_data,
self.device, **model_kwargs))
if model_kwargs != json_input.pop(
"parameters", {}):
return Output().error(
"In order to enable dynamic batching, all input batches must have the same parameters"
)
else:
stream_generator = StreamingUtils.get_stream_generator(
"DeepSpeed")
outputs.add_stream_content(
stream_generator(self.model, self.tokenizer,
input_data, self.device,
**model_kwargs))
return outputs
if self.task == "text-generation":
tokenized_inputs = self.tokenizer(input_data,
padding=True,
return_tensors="pt").to(
self.device)
with torch.no_grad():
output_tokens = self.model.generate(
input_ids=tokenized_inputs.input_ids,
attention_mask=tokenized_inputs.attention_mask,
**model_kwargs)
generated_text = self.tokenizer.batch_decode(
output_tokens, skip_special_tokens=True)
outputs.add_property("content-type", "application/json")
offset = 0
for i in range(inputs.get_batch_size()):
result = [{
"generated_text": s
} for s in generated_text[offset:offset + input_size[i]]]
outputs.add(result, key=inputs.get_content().key_at(i))
offset += input_size[i]
return outputs

result = self.pipeline(input_data, **model_kwargs)
input_size.append(len(json_input))
input_data.extend(json_input)
else:
for item in batch:
input_size.append(1)
input_data.extend(item.get_as_string())

outputs = Output()
if self.enable_streaming:
outputs.add_property("content-type", "application/jsonlines")
if self.enable_streaming == "huggingface":
outputs.add_stream_content(
StreamingUtils.use_hf_default_streamer(
self.model, self.tokenizer, input_data,
self.device, **model_kwargs))
else:
stream_generator = StreamingUtils.get_stream_generator(
"DeepSpeed")
outputs.add_stream_content(
stream_generator(self.model, self.tokenizer,
input_data, self.device,
**model_kwargs))
return outputs
if self.task == "text-generation":
tokenized_inputs = self.tokenizer(input_data,
padding=True,
return_tensors="pt").to(
self.device)
with torch.no_grad():
output_tokens = self.model.generate(
input_ids=tokenized_inputs.input_ids,
attention_mask=tokenized_inputs.attention_mask,
**model_kwargs)
generated_text = self.tokenizer.batch_decode(
output_tokens, skip_special_tokens=True)
outputs.add_property("content-type", "application/json")
offset = 0
for i in range(inputs.get_batch_size()):
res = result[offset:offset + input_size[i]]
if self.task == "conversational":
res = [{
"generated_text": s.generated_responses[-1],
"conversation": {
"past_user_inputs": s.past_user_inputs,
"generated_responses": s.generated_responses,
},
} for s in res]
outputs.add(res, key=inputs.get_content().key_at(i))
result = [{
"generated_text": s
} for s in generated_text[offset:offset + input_size[i]]]
outputs.add(result, key=inputs.get_content().key_at(i))
offset += input_size[i]
return outputs

outputs.add_property("content-type", "application/json")
except Exception as e:
logging.exception("DeepSpeed inference failed")
outputs = Output().error((str(e)))
result = self.pipeline(input_data, **model_kwargs)
offset = 0
for i in range(inputs.get_batch_size()):
res = result[offset:offset + input_size[i]]
if self.task == "conversational":
res = [{
"generated_text": s.generated_responses[-1],
"conversation": {
"past_user_inputs": s.past_user_inputs,
"generated_responses": s.generated_responses,
},
} for s in res]
outputs.add(res, key=inputs.get_content().key_at(i))
offset += input_size[i]

outputs.add_property("content-type", "application/json")

return outputs


Expand Down
122 changes: 59 additions & 63 deletions engines/python/setup/djl_python/fastertransformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,70 +109,66 @@ def param_mapper(parameters: dict):
return parameters

def inference(self, inputs: Input):
try:
# TODO: Add support for more content types
input_data = []
input_size = []
parameters = {}
batches = inputs.get_batches()
first = True
for item in batches:
input_map = item.get_as_json()
input_text = input_map.pop("inputs", input_map)
if isinstance(input_text, str):
input_text = [input_text]
input_size.append(len(input_text))
input_data.extend(input_text)
if first:
parameters = input_map.pop("parameters", {})
first = False
else:
if parameters != input_map.pop("parameters", {}):
return Output().error(
"In order to enable dynamic batching, all input batches must have the same parameters"
)

parameters = self.param_mapper(parameters)
max_length = parameters.pop("max_length", 50)
output_len = parameters.pop("max_seq_len", max_length)
if self.use_triton:
output_length = [output_len] * len(input_data)
if self.enable_streaming:
outputs = Output()
outputs.add_property("content-type",
"application/jsonlines")
outputs.add_stream_content(
self.model.stream_generate(input_data, output_length,
**parameters))
return outputs
result = self.model.pipeline_generate(input_data,
output_length,
**parameters)
# TODO: Add support for more content types
input_data = []
input_size = []
parameters = {}
batches = inputs.get_batches()
first = True
for item in batches:
input_map = item.get_as_json()
input_text = input_map.pop("inputs", input_map)
if isinstance(input_text, str):
input_text = [input_text]
input_size.append(len(input_text))
input_data.extend(input_text)
if first:
parameters = input_map.pop("parameters", {})
first = False
else:
if self.is_t5:
result = self.model.pipeline_generate(
input_data, **parameters)
else:
beam_width = parameters.pop("beam_width", 1)
# TODO: remove after fixes in FT python package
result = self.model.pipeline_generate(
input_data,
batch_size=len(input_data),
output_len=output_len,
beam_width=beam_width,
**parameters)

offset = 0
outputs = Output()
outputs.add_property("content-type", "application/json")
for i in range(inputs.get_batch_size()):
generated_text = [{
"generated_text": s
} for s in result[offset:offset + input_size[i]]]
outputs.add(generated_text, key=inputs.get_content().key_at(i))
except Exception as e:
logging.exception("FasterTransformer inference failed")
outputs = Output().error((str(e)))
if parameters != input_map.pop("parameters", {}):
return Output().error(
"In order to enable dynamic batching, all input batches must have the same parameters"
)

parameters = self.param_mapper(parameters)
max_length = parameters.pop("max_length", 50)
output_len = parameters.pop("max_seq_len", max_length)
if self.use_triton:
output_length = [output_len] * len(input_data)
if self.enable_streaming:
outputs = Output()
outputs.add_property("content-type",
"application/jsonlines")
outputs.add_stream_content(
self.model.stream_generate(input_data, output_length,
**parameters))
return outputs
result = self.model.pipeline_generate(input_data,
output_length,
**parameters)
else:
if self.is_t5:
result = self.model.pipeline_generate(
input_data, **parameters)
else:
beam_width = parameters.pop("beam_width", 1)
# TODO: remove after fixes in FT python package
result = self.model.pipeline_generate(
input_data,
batch_size=len(input_data),
output_len=output_len,
beam_width=beam_width,
**parameters)

offset = 0
outputs = Output()
outputs.add_property("content-type", "application/json")
for i in range(inputs.get_batch_size()):
generated_text = [{
"generated_text": s
} for s in result[offset:offset + input_size[i]]]
outputs.add(generated_text, key=inputs.get_content().key_at(i))

return outputs

Expand Down
Loading

0 comments on commit c4f29a5

Please sign in to comment.