-
Notifications
You must be signed in to change notification settings - Fork 180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add --node-ipc support #2015
base: main
Are you sure you want to change the base?
Add --node-ipc support #2015
Changes from 5 commits
f14e088
6ab3fca
ca34372
bdcd755
47fde0d
93157e7
e6ae057
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,8 +5,9 @@ | |
from contextlib import closing | ||
from functools import partial | ||
from queue import Queue | ||
import http | ||
import http.client | ||
import json | ||
import multiprocessing.connection | ||
import os | ||
import shutil | ||
import socket | ||
|
@@ -48,18 +49,35 @@ def on_stderr_message(self, message: str) -> None: | |
|
||
class AbstractProcessor(Generic[T]): | ||
|
||
def write_data(self, writer: IO[bytes], data: T) -> None: | ||
def write_data(self, writer: Any, data: T) -> None: | ||
raise NotImplementedError() | ||
|
||
def read_data(self, reader: IO[bytes]) -> Optional[T]: | ||
def read_data(self, reader: Any) -> Optional[T]: | ||
raise NotImplementedError() | ||
|
||
|
||
class JsonRpcProcessor(AbstractProcessor[Dict[str, Any]]): | ||
def encode_payload(data: Dict[str, Any]) -> bytes: | ||
return json.dumps( | ||
data, | ||
ensure_ascii=False, | ||
check_circular=False, | ||
separators=(',', ':') | ||
).encode('utf-8') | ||
|
||
|
||
def decode_payload(message: bytes) -> Optional[Dict[str, Any]]: | ||
try: | ||
return json.loads(message.decode('utf-8')) | ||
except Exception as ex: | ||
exception_log("JSON decode error", ex) | ||
return None | ||
|
||
|
||
class StandardProcessor(AbstractProcessor[Dict[str, Any]]): | ||
def write_data(self, writer: IO[bytes], data: Dict[str, Any]) -> None: | ||
body = self._encode(data) | ||
body = encode_payload(data) | ||
writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body)) | ||
writer.flush() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is potentially breaking for chokidar watcher. We need to add a flush there. I guess duplicate flush() might be ok to avoid adding too much logic there. |
||
|
||
def read_data(self, reader: IO[bytes]) -> Optional[Dict[str, Any]]: | ||
headers = http.client.parse_headers(reader) # type: ignore | ||
|
@@ -68,31 +86,38 @@ def read_data(self, reader: IO[bytes]) -> Optional[Dict[str, Any]]: | |
except TypeError: | ||
# Expected error on process stopping. Stop the read loop. | ||
raise StopLoopError() | ||
try: | ||
return self._decode(body) | ||
except Exception as ex: | ||
exception_log("JSON decode error", ex) | ||
return None | ||
|
||
@staticmethod | ||
def _encode(data: Dict[str, Any]) -> bytes: | ||
return json.dumps( | ||
data, | ||
ensure_ascii=False, | ||
sort_keys=False, | ||
check_circular=False, | ||
separators=(',', ':') | ||
).encode('utf-8') | ||
|
||
@staticmethod | ||
def _decode(message: bytes) -> Dict[str, Any]: | ||
return json.loads(message.decode('utf-8')) | ||
return decode_payload(body) | ||
|
||
|
||
class NodeIpcProcessor(AbstractProcessor[Dict[str, Any]]): | ||
_buf = bytearray() | ||
_lines = 0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't these two properties be part of the class instance, instead of the class? If I have LSP-dockerfile, LSP-stylelint, and LSP-typescript running with node IPC, wouldn't these overwrite each other in unpredictable ways? Maybe I'm not understanding something. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe it should work as expected (in most cases ;)) due to how Python works with those. When instance of the class reads those properties it looks them up first in the instance and then in the class. When writing it always writes to the instance property (creates it if necessary). The only issue would be if the instance would read a class property (for example So I guess it's still safer to set those on the instance. |
||
|
||
def write_data(self, connection: multiprocessing.connection._ConnectionBase, data: Dict[str, Any]) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there really no way to use public type here? Underscore denotes a private property. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. _ConnectionBase was used because multiprocessing.Pipe() uses that.
I looked a bit at the
and found that maybe we can use the as Connection extends _ConnectionBase, and it doesn't add any public methods
|
||
body = encode_payload(data) + b"\n" | ||
while len(body): | ||
n = connection._write(connection.fileno(), body) # type: ignore | ||
body = body[n:] | ||
|
||
def read_data(self, connection: multiprocessing.connection._ConnectionBase) -> Optional[Dict[str, Any]]: | ||
while self._lines == 0: | ||
chunk = connection._read(connection.fileno(), 65536) # type: ignore | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 65536 looks like a rather large buffer (65KB), have you tried 4KB? What are the performance implications compared to the standard transport? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also private API... I'm not saying it's unacceptable but is there really no higher level API exposed for that? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This also bothers me. Perhaps switching to some public mechanism also abstracts away the burden of choosing the buffer size? (Currently set to 65KB) |
||
if len(chunk) == 0: | ||
# EOF reached: https://docs.python.org/3/library/os.html#os.read | ||
raise StopLoopError() | ||
|
||
self._buf += chunk | ||
self._lines += chunk.count(b'\n') | ||
|
||
self._lines -= 1 | ||
message, _, self._buf = self._buf.partition(b'\n') | ||
return decode_payload(message) | ||
|
||
|
||
class ProcessTransport(Transport[T]): | ||
|
||
def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket.socket], reader: IO[bytes], | ||
writer: IO[bytes], stderr: Optional[IO[bytes]], processor: AbstractProcessor[T], | ||
def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket.socket], reader: Any, | ||
writer: Any, stderr: Optional[IO[bytes]], processor: AbstractProcessor[T], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be good that instead of any I used a better type There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could be solved with generics, I think, but it would again require downstream changes, since it would change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have some local changes to clean up the types but I'm yet to find a solution that is fully properly typed. It's complicated. In any case, that would be something for another PR. |
||
callback_object: TransportCallbacks[T]) -> None: | ||
self._closed = False | ||
self._process = process | ||
|
@@ -191,7 +216,6 @@ def _write_loop(self) -> None: | |
if d is None: | ||
break | ||
self._processor.write_data(self._writer, d) | ||
self._writer.flush() | ||
except (BrokenPipeError, AttributeError): | ||
pass | ||
except Exception as ex: | ||
|
@@ -220,27 +244,36 @@ def _stderr_loop(self) -> None: | |
|
||
|
||
# Can be a singleton since it doesn't hold any state. | ||
json_rpc_processor = JsonRpcProcessor() | ||
standard_processor = StandardProcessor() | ||
node_ipc_processor = NodeIpcProcessor() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again, I'm unsure whether this can actually be a singleton. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, right. The standard processor wasn't meant to have any instance properties so NodeIpc shouldn't either. |
||
|
||
|
||
def create_transport(config: TransportConfig, cwd: Optional[str], | ||
callback_object: TransportCallbacks) -> Transport[Dict[str, Any]]: | ||
stderr = subprocess.PIPE | ||
pass_fds = () # type: Union[Tuple[()], Tuple[int]] | ||
if config.tcp_port is not None: | ||
assert config.tcp_port is not None | ||
if config.tcp_port < 0: | ||
stdout = subprocess.PIPE | ||
else: | ||
stdout = subprocess.DEVNULL | ||
stdin = subprocess.DEVNULL | ||
elif config.node_ipc: | ||
stdout = subprocess.PIPE | ||
stdin = subprocess.DEVNULL | ||
stderr = subprocess.STDOUT | ||
pass_fds = (config.node_ipc.child_connection.fileno(),) | ||
else: | ||
stdout = subprocess.PIPE | ||
stdin = subprocess.PIPE | ||
|
||
startupinfo = _fixup_startup_args(config.command) | ||
sock = None # type: Optional[socket.socket] | ||
process = None # type: Optional[subprocess.Popen] | ||
|
||
def start_subprocess() -> subprocess.Popen: | ||
return _start_subprocess(config.command, stdin, stdout, subprocess.PIPE, startupinfo, config.env, cwd) | ||
return _start_subprocess(config.command, stdin, stdout, stderr, startupinfo, config.env, cwd, pass_fds) | ||
|
||
if config.listener_socket: | ||
assert isinstance(config.tcp_port, int) and config.tcp_port > 0 | ||
|
@@ -256,14 +289,24 @@ def start_subprocess() -> subprocess.Popen: | |
sock = _connect_tcp(config.tcp_port) | ||
if sock is None: | ||
raise RuntimeError("Failed to connect on port {}".format(config.tcp_port)) | ||
reader = sock.makefile('rwb') # type: ignore | ||
writer = reader | ||
reader = writer = sock.makefile('rwb') | ||
elif config.node_ipc: | ||
reader = writer = config.node_ipc.parent_connection # type: ignore | ||
else: | ||
reader = process.stdout # type: ignore | ||
writer = process.stdin # type: ignore | ||
if not process.stdout or not process.stdin: | ||
predragnikolic marked this conversation as resolved.
Show resolved
Hide resolved
|
||
raise RuntimeError( | ||
'Failed initializing transport: reader: {}, writer: {}' | ||
.format(process.stdout, process.stdin) | ||
) | ||
reader = process.stdout | ||
writer = process.stdin | ||
stderr_reader = process.stdout if config.node_ipc else process.stderr | ||
predragnikolic marked this conversation as resolved.
Show resolved
Hide resolved
|
||
processor = node_ipc_processor if config.node_ipc else standard_processor | ||
|
||
if not reader or not writer: | ||
raise RuntimeError('Failed initializing transport: reader: {}, writer: {}'.format(reader, writer)) | ||
return ProcessTransport(config.name, process, sock, reader, writer, process.stderr, json_rpc_processor, | ||
|
||
return ProcessTransport(config.name, process, sock, reader, writer, stderr_reader, processor, | ||
callback_object) | ||
|
||
|
||
|
@@ -312,7 +355,8 @@ def _start_subprocess( | |
stderr: int, | ||
startupinfo: Any, | ||
env: Dict[str, str], | ||
cwd: Optional[str] | ||
cwd: Optional[str], | ||
pass_fds: Union[Tuple[()], Tuple[int]] | ||
) -> subprocess.Popen: | ||
debug("starting {} in {}".format(args, cwd if cwd else os.getcwd())) | ||
process = subprocess.Popen( | ||
|
@@ -322,7 +366,8 @@ def _start_subprocess( | |
stderr=stderr, | ||
startupinfo=startupinfo, | ||
env=env, | ||
cwd=cwd) | ||
cwd=cwd, | ||
pass_fds=pass_fds) | ||
_subprocesses.add(process) | ||
return process | ||
|
||
|
@@ -356,8 +401,7 @@ def start_in_background(d: _SubprocessData) -> None: | |
# Await one client connection (blocking!) | ||
sock, _ = listener_socket.accept() | ||
thread.join() | ||
reader = sock.makefile('rwb') # type: IO[bytes] | ||
writer = reader | ||
reader = writer = sock.makefile('rwb') | ||
assert data.process | ||
return data.process, sock, reader, writer | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,7 @@ | |
from wcmatch.glob import GLOBSTAR | ||
import contextlib | ||
import fnmatch | ||
import multiprocessing | ||
import os | ||
import posixpath | ||
import socket | ||
|
@@ -605,16 +606,24 @@ def map_from_remote_to_local(self, uri: str) -> Tuple[str, bool]: | |
return _translate_path(uri, self._remote, self._local) | ||
|
||
|
||
class NodeIpcPipe(): | ||
def __init__(self) -> None: | ||
parent_connection, child_connection = multiprocessing.Pipe() | ||
self.parent_connection = parent_connection | ||
self.child_connection = child_connection | ||
|
||
|
||
class TransportConfig: | ||
__slots__ = ("name", "command", "tcp_port", "env", "listener_socket") | ||
__slots__ = ("name", "command", "tcp_port", "env", "listener_socket", "node_ipc") | ||
|
||
def __init__( | ||
self, | ||
name: str, | ||
command: List[str], | ||
tcp_port: Optional[int], | ||
env: Dict[str, str], | ||
listener_socket: Optional[socket.socket] | ||
listener_socket: Optional[socket.socket], | ||
node_ipc: Optional[NodeIpcPipe] | ||
) -> None: | ||
if not command and not tcp_port: | ||
raise ValueError('neither "command" nor "tcp_port" is provided; cannot start a language server') | ||
|
@@ -623,6 +632,7 @@ def __init__( | |
self.tcp_port = tcp_port | ||
self.env = env | ||
self.listener_socket = listener_socket | ||
self.node_ipc = node_ipc | ||
|
||
|
||
class ClientConfig: | ||
|
@@ -632,6 +642,7 @@ def __init__(self, | |
priority_selector: Optional[str] = None, | ||
schemes: Optional[List[str]] = None, | ||
command: Optional[List[str]] = None, | ||
use_node_ipc: bool = False, | ||
binary_args: Optional[List[str]] = None, # DEPRECATED | ||
tcp_port: Optional[int] = None, | ||
auto_complete_selector: Optional[str] = None, | ||
|
@@ -656,6 +667,7 @@ def __init__(self, | |
else: | ||
assert isinstance(binary_args, list) | ||
self.command = binary_args | ||
self.use_node_ipc = use_node_ipc | ||
self.tcp_port = tcp_port | ||
self.auto_complete_selector = auto_complete_selector | ||
self.enabled = enabled | ||
|
@@ -689,9 +701,10 @@ def from_sublime_settings(cls, name: str, s: sublime.Settings, file: str) -> "Cl | |
priority_selector=_read_priority_selector(s), | ||
schemes=s.get("schemes"), | ||
command=read_list_setting(s, "command", []), | ||
use_node_ipc=bool(s.get("use_node_ipc", False)), | ||
tcp_port=s.get("tcp_port"), | ||
auto_complete_selector=s.get("auto_complete_selector"), | ||
# Default to True, because an LSP plugin is enabled iff it is enabled as a Sublime package. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "iff" is short for "if and only if" :) but keep the change, no one really cares. |
||
# Default to True, because an LSP plugin is enabled if it is enabled as a Sublime package. | ||
enabled=bool(s.get("enabled", True)), | ||
init_options=init_options, | ||
settings=settings, | ||
|
@@ -719,6 +732,7 @@ def from_dict(cls, name: str, d: Dict[str, Any]) -> "ClientConfig": | |
priority_selector=_read_priority_selector(d), | ||
schemes=schemes, | ||
command=d.get("command", []), | ||
use_node_ipc=d.get("use_node_ipc", False), | ||
tcp_port=d.get("tcp_port"), | ||
auto_complete_selector=d.get("auto_complete_selector"), | ||
enabled=d.get("enabled", False), | ||
|
@@ -746,6 +760,7 @@ def from_config(cls, src_config: "ClientConfig", override: Dict[str, Any]) -> "C | |
priority_selector=_read_priority_selector(override) or src_config.priority_selector, | ||
schemes=override.get("schemes", src_config.schemes), | ||
command=override.get("command", src_config.command), | ||
use_node_ipc=override.get("use_node_ipc", src_config.use_node_ipc), | ||
tcp_port=override.get("tcp_port", src_config.tcp_port), | ||
auto_complete_selector=override.get("auto_complete_selector", src_config.auto_complete_selector), | ||
enabled=override.get("enabled", src_config.enabled), | ||
|
@@ -790,7 +805,11 @@ def resolve_transport_config(self, variables: Dict[str, str]) -> TransportConfig | |
env[key] = sublime.expand_variables(value, variables) + os.path.pathsep + env[key] | ||
else: | ||
env[key] = sublime.expand_variables(value, variables) | ||
return TransportConfig(self.name, command, tcp_port, env, listener_socket) | ||
node_ipc = None | ||
if self.use_node_ipc: | ||
node_ipc = NodeIpcPipe() | ||
env["NODE_CHANNEL_FD"] = str(node_ipc.child_connection.fileno()) | ||
return TransportConfig(self.name, command, tcp_port, env, listener_socket, node_ipc) | ||
|
||
def set_view_status(self, view: sublime.View, message: str) -> None: | ||
if sublime.load_settings("LSP.sublime-settings").get("show_view_status"): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fits one line now :)