Skip to content

Commit

Permalink
now that zmq long messages are fixed, no need to split shell messages
Browse files Browse the repository at this point in the history
  • Loading branch information
craigbarratt committed Aug 31, 2020
1 parent 283802f commit 9028b45
Showing 1 changed file with 170 additions and 183 deletions.
353 changes: 170 additions & 183 deletions custom_components/pyscript/jupyter_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ async def recv(self, multipart=False):
# _LOGGER.debug(f"recv: got cmd={cmd}, params={params}")
else:
parts.append(msg_body)
if cmd == 0x0 or cmd == 0x2:
if cmd in (0x0, 0x2):
# _LOGGER.debug(f"recv: got msg {parts}")
if not multipart:
return b''.join(parts)
Expand Down Expand Up @@ -311,218 +311,205 @@ def encode(msg):
# else:
# _LOGGER.debug("send skipping msg_type %s since socket is None", msg_type)

async def shell_handler(self, shell_socket, full_msg):
async def shell_handler(self, shell_socket, wire_msg):
"""Handle shell messages."""
#
# Jupyter extensions like black can send several execute requests back-to-back, so
# we need to handle multiple messages separated by DELIM
#
msg_list = None
for this_msg in full_msg:
if this_msg == DELIM:
if msg_list is None:
msg_list = [[this_msg]]
else:
msg_list.append([this_msg])
else:
msg_list[-1].append(this_msg) # pylint: disable=unsubscriptable-object
for this_msg in msg_list: # pylint: disable=too-many-nested-blocks
identities, msg = self.deserialize_wire_msg(this_msg)
# _LOGGER.debug("shell received %s: %s", msg.get('header', {}).get('msg_type', 'UNKNOWN'), msg)

content = {
'execution_state': "busy",
}
await self.send(self.iopub_socket, 'status', content, parent_header=msg['header'])
identities, msg = self.deserialize_wire_msg(wire_msg)
# _LOGGER.debug("shell received %s: %s", msg.get('header', {}).get('msg_type', 'UNKNOWN'), msg)
self.parent_header = msg['header']

self.parent_header = msg['header']

# process request:
content = {
'execution_state': "busy",
}
await self.send(self.iopub_socket, 'status', content, parent_header=msg['header'])

if msg['header']["msg_type"] == "execute_request":
if msg['header']["msg_type"] == "execute_request":

content = {
'execution_count': self.execution_count,
'code': msg['content']["code"],
}
await self.send(self.iopub_socket, 'execute_input', content, parent_header=msg['header'])
content = {
'execution_count': self.execution_count,
'code': msg['content']["code"],
}
await self.send(self.iopub_socket, 'execute_input', content, parent_header=msg['header'])

code = msg['content']["code"]
self.ast_ctx.parse(code)
code = msg['content']["code"]
self.ast_ctx.parse(code)
exc = self.ast_ctx.get_exception_obj()
if exc is None:
result = await self.ast_ctx.eval()
exc = self.ast_ctx.get_exception_obj()
if exc is None:
result = await self.ast_ctx.eval()
exc = self.ast_ctx.get_exception_obj()
if exc:
traceback_mesg = self.ast_ctx.get_exception_long().split("\n")

if msg['content'].get("store_history", True):
self.execution_count += 1

metadata = {
"dependencies_met": True,
"engine": self.engine_id,
"status": "error",
"started": datetime.datetime.now().isoformat(),
}
content = {
'execution_count': self.execution_count,
'status': 'error',
'ename': type(exc).__name__, # Exception name, as a string
'evalue': str(exc), # Exception value, as a string
'traceback': traceback_mesg,
}
_LOGGER.debug("Executing '%s' got exception: %s", code, content)
await self.send(shell_socket, 'execute_reply', content, metadata=metadata,
parent_header=msg['header'], identities=identities)
del content["execution_count"], content["status"]
await self.send(self.iopub_socket, 'error', content, parent_header=msg['header'])

content = {
'execution_state': "idle",
}
await self.send(self.iopub_socket, 'status', content, parent_header=msg['header'])
return

# if True or isinstance(self.ast_ctx.ast, ast.Expr):
_LOGGER.debug("Executing: '%s' got result %s", code, result)
if result is not None:
content = {
'execution_count': self.execution_count,
'data': {"text/plain": repr(result)},
'metadata': {}
}
await self.send(self.iopub_socket, 'execute_result', content, parent_header=msg['header'])

if exc:
traceback_mesg = self.ast_ctx.get_exception_long().split("\n")

metadata = {
"dependencies_met": True,
"engine": self.engine_id,
"status": "ok",
"status": "error",
"started": datetime.datetime.now().isoformat(),
}
content = {
"status": "ok",
"execution_count": self.execution_count,
"user_variables": {},
"payload": [],
"user_expressions": {},
'execution_count': self.execution_count,
'status': 'error',
'ename': type(exc).__name__, # Exception name, as a string
'evalue': str(exc), # Exception value, as a string
'traceback': traceback_mesg,
}
_LOGGER.debug("Executing '%s' got exception: %s", code, content)
await self.send(shell_socket, 'execute_reply', content, metadata=metadata,
parent_header=msg['header'], identities=identities)
if msg['content'].get("store_history", True):
self.execution_count += 1
elif msg['header']["msg_type"] == "kernel_info_request":
del content["execution_count"], content["status"]
await self.send(self.iopub_socket, 'error', content, parent_header=msg['header'])

content = {
"protocol_version": "5.3",
"ipython_version": [1, 1, 0, ""],
"language_version": [0, 0, 1],
"language": "python",
"implementation": "python",
"implementation_version": "3.7",
"language_info": {
"name": "python",
"version": "1.0",
'mimetype': "",
'file_extension': ".py",
#'pygments_lexer': "",
'codemirror_mode': "",
'nbconvert_exporter': "",
},
"banner": ""
'execution_state': "idle",
}
await self.send(shell_socket, 'kernel_info_reply', content, parent_header=msg['header'], identities=identities)
elif msg['header']["msg_type"] == "complete_request":
await self.send(self.iopub_socket, 'status', content, parent_header=msg['header'])
if msg['content'].get("store_history", True):
self.execution_count += 1
return

code = msg["content"]["code"]
posn = msg["content"]["cursor_pos"]
match = self.completion_re.match(code[0:posn].lower())
if match:
root = match[1].lower()
words = self.ast_ctx.state.completions(root)
words = words.union(await self.ast_ctx.handler.service_completions(root))
words = words.union(await self.ast_ctx.handler.func_completions(root))
words = words.union(self.ast_ctx.completions(root))
else:
root = ""
words = set()
# _LOGGER.debug(f"complete_request code={code}, posn={posn}, root={root}, words={words}")
# if True or isinstance(self.ast_ctx.ast, ast.Expr):
_LOGGER.debug("Executing: '%s' got result %s", code, result)
if result is not None:
content = {
"status": "ok",
"matches": sorted(list(words)),
"cursor_start": msg["content"]["cursor_pos"] - len(root),
"cursor_end": msg["content"]["cursor_pos"],
"metadata": {},
'execution_count': self.execution_count,
'data': {"text/plain": repr(result)},
'metadata': {}
}
await self.send(shell_socket, 'complete_reply', content, parent_header=msg['header'], identities=identities)
await self.send(self.iopub_socket, 'execute_result', content, parent_header=msg['header'])

elif msg['header']["msg_type"] == "is_complete_request":
code = msg['content']["code"]
self.ast_ctx.parse(code)
exc = self.ast_ctx.get_exception_obj()
metadata = {
"dependencies_met": True,
"engine": self.engine_id,
"status": "ok",
"started": datetime.datetime.now().isoformat(),
}
content = {
"status": "ok",
"execution_count": self.execution_count,
"user_variables": {},
"payload": [],
"user_expressions": {},
}
await self.send(shell_socket, 'execute_reply', content, metadata=metadata,
parent_header=msg['header'], identities=identities)
if msg['content'].get("store_history", True):
self.execution_count += 1

elif msg['header']["msg_type"] == "kernel_info_request":
content = {
"protocol_version": "5.3",
"ipython_version": [1, 1, 0, ""],
"language_version": [0, 0, 1],
"language": "python",
"implementation": "python",
"implementation_version": "3.7",
"language_info": {
"name": "python",
"version": "1.0",
'mimetype': "",
'file_extension': ".py",
#'pygments_lexer': "",
'codemirror_mode': "",
'nbconvert_exporter': "",
},
"banner": ""
}
await self.send(shell_socket, 'kernel_info_reply', content, parent_header=msg['header'], identities=identities)

# determine indent of last line
indent = 0
i = code.rfind("\n")
if i >= 0:
while i + 1 < len(code) and code[i+1] == " ":
i += 1
indent += 1
if exc is None:
if indent == 0:
content = {
# One of 'complete', 'incomplete', 'invalid', 'unknown'
"status": 'complete',
# If status is 'incomplete', indent should contain the characters to use
# to indent the next line. This is only a hint: frontends may ignore it
# and use their own autoindentation rules. For other statuses, this
# field does not exist.
#"indent": str,
}
else:
content = {
"status": 'incomplete',
"indent": " " * indent,
}
elif msg['header']["msg_type"] == "complete_request":
await self.send(self.iopub_socket, 'status', content, parent_header=msg['header'])

code = msg["content"]["code"]
posn = msg["content"]["cursor_pos"]
match = self.completion_re.match(code[0:posn].lower())
if match:
root = match[1].lower()
words = self.ast_ctx.state.completions(root)
words = words.union(await self.ast_ctx.handler.service_completions(root))
words = words.union(await self.ast_ctx.handler.func_completions(root))
words = words.union(self.ast_ctx.completions(root))
else:
root = ""
words = set()
# _LOGGER.debug(f"complete_request code={code}, posn={posn}, root={root}, words={words}")
content = {
"status": "ok",
"matches": sorted(list(words)),
"cursor_start": msg["content"]["cursor_pos"] - len(root),
"cursor_end": msg["content"]["cursor_pos"],
"metadata": {},
}
await self.send(shell_socket, 'complete_reply', content, parent_header=msg['header'], identities=identities)

elif msg['header']["msg_type"] == "is_complete_request":
code = msg['content']["code"]
self.ast_ctx.parse(code)
exc = self.ast_ctx.get_exception_obj()

# determine indent of last line
indent = 0
i = code.rfind("\n")
if i >= 0:
while i + 1 < len(code) and code[i+1] == " ":
i += 1
indent += 1
if exc is None:
if indent == 0:
content = {
# One of 'complete', 'incomplete', 'invalid', 'unknown'
"status": 'complete',
# If status is 'incomplete', indent should contain the characters to use
# to indent the next line. This is only a hint: frontends may ignore it
# and use their own autoindentation rules. For other statuses, this
# field does not exist.
#"indent": str,
}
else:
#
# if the syntax error is right at the end, then we label it incomplete,
# otherwise it's invalid
#
if str(exc).find("EOF while") >= 0:
# if error is at ":" then increase indent
if hasattr(exc, "lineno"):
line = code.split("\n")[exc.lineno-1]
if self.colon_end_re.match(line):
indent += 4
content = {
"status": 'incomplete',
"indent": " " * indent,
}
else:
content = {
"status": 'invalid',
}
# _LOGGER.debug(f"is_complete_request code={code}, exc={exc}, content={content}")
await self.send(shell_socket, 'is_complete_reply', content, parent_header=msg['header'], identities=identities)
elif msg['header']["msg_type"] == "comm_info_request":
content = {
"comms": {}
}
await self.send(shell_socket, 'comm_info_reply', content, parent_header=msg['header'], identities=identities)
elif msg['header']["msg_type"] == "history_request":
content = {
"history": []
}
await self.send(shell_socket, 'history_reply', content, parent_header=msg['header'], identities=identities)
content = {
"status": 'incomplete',
"indent": " " * indent,
}
else:
_LOGGER.error("unknown msg_type: %s", msg['header']["msg_type"])
#
# if the syntax error is right at the end, then we label it incomplete,
# otherwise it's invalid
#
if str(exc).find("EOF while") >= 0:
# if error is at ":" then increase indent
if hasattr(exc, "lineno"):
line = code.split("\n")[exc.lineno-1]
if self.colon_end_re.match(line):
indent += 4
content = {
"status": 'incomplete',
"indent": " " * indent,
}
else:
content = {
"status": 'invalid',
}
# _LOGGER.debug(f"is_complete_request code={code}, exc={exc}, content={content}")
await self.send(shell_socket, 'is_complete_reply', content, parent_header=msg['header'], identities=identities)

elif msg['header']["msg_type"] == "comm_info_request":
content = {
'execution_state': "idle",
"comms": {}
}
await self.send(self.iopub_socket, 'status', content, parent_header=msg['header'])
await self.send(shell_socket, 'comm_info_reply', content, parent_header=msg['header'], identities=identities)

elif msg['header']["msg_type"] == "history_request":
content = {
"history": []
}
await self.send(shell_socket, 'history_reply', content, parent_header=msg['header'], identities=identities)

else:
_LOGGER.error("unknown msg_type: %s", msg['header']["msg_type"])

content = {
'execution_state': "idle",
}
await self.send(self.iopub_socket, 'status', content, parent_header=msg['header'])

async def control_listen(self, reader, writer):
"""Task that listens to control messages."""
Expand Down

0 comments on commit 9028b45

Please sign in to comment.