diff --git a/plugin/core/transports.py b/plugin/core/transports.py index 21595bc5b..4db7faa7a 100644 --- a/plugin/core/transports.py +++ b/plugin/core/transports.py @@ -7,6 +7,7 @@ from queue import Queue import http import json +import multiprocessing.connection import os import shutil import socket @@ -48,26 +49,33 @@ def on_stderr_message(self, message: str) -> None: class AbstractProcessor(Generic[T]): - def write_data(self, writer: IO[bytes], data: T) -> None: + def write_data(self, writer: IO[bytes], data: T, is_node_ipc = False) -> None: raise NotImplementedError() - def read_data(self, reader: IO[bytes]) -> Optional[T]: + def read_data(self, reader: IO[bytes], is_node_ipc = False) -> Optional[T]: raise NotImplementedError() class JsonRpcProcessor(AbstractProcessor[Dict[str, Any]]): - def write_data(self, writer: IO[bytes], data: Dict[str, Any]) -> None: + def write_data(self, writer: IO[bytes], data: Dict[str, Any], is_node_ipc = False) -> None: body = self._encode(data) - writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body)) + if not is_node_ipc: + writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body)) + else: + writer.write(body + b"\n") + + def read_data(self, reader: IO[bytes], is_node_ipc = False) -> Optional[Dict[str, Any]]: + if not is_node_ipc: + headers = http.client.parse_headers(reader) # type: ignore + try: + body = reader.read(int(headers.get("Content-Length"))) + except TypeError: + # Expected error on process stopping. Stop the read loop. + raise StopLoopError() + else: + body = reader.readline() - def read_data(self, reader: IO[bytes]) -> Optional[Dict[str, Any]]: - headers = http.client.parse_headers(reader) # type: ignore - try: - body = reader.read(int(headers.get("Content-Length"))) - except TypeError: - # Expected error on process stopping. Stop the read loop. - raise StopLoopError() try: return self._decode(body) except Exception as ex: @@ -79,7 +87,6 @@ def _encode(data: Dict[str, Any]) -> bytes: return json.dumps( data, ensure_ascii=False, - sort_keys=False, check_circular=False, separators=(',', ':') ).encode('utf-8') @@ -93,7 +100,7 @@ class ProcessTransport(Transport[T]): def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket.socket], reader: IO[bytes], writer: IO[bytes], stderr: Optional[IO[bytes]], processor: AbstractProcessor[T], - callback_object: TransportCallbacks[T]) -> None: + callback_object: TransportCallbacks[T], is_node_ipc: bool) -> None: self._closed = False self._process = process self._socket = socket @@ -105,6 +112,7 @@ def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket self._writer_thread = threading.Thread(target=self._write_loop, name='{}-writer'.format(name)) self._stderr_thread = threading.Thread(target=self._stderr_loop, name='{}-stderr'.format(name)) self._callback_object = weakref.ref(callback_object) + self._is_node_ipc = is_node_ipc self._send_queue = Queue(0) # type: Queue[Union[T, None]] self._reader_thread.start() self._writer_thread.start() @@ -137,7 +145,7 @@ def __del__(self) -> None: def _read_loop(self) -> None: try: while self._reader: - payload = self._processor.read_data(self._reader) + payload = self._processor.read_data(self._reader, self._is_node_ipc) if payload is None: continue @@ -190,8 +198,9 @@ def _write_loop(self) -> None: d = self._send_queue.get() if d is None: break - self._processor.write_data(self._writer, d) - self._writer.flush() + self._processor.write_data(self._writer, d, self._is_node_ipc) + if not self._is_node_ipc: + self._writer.flush() except (BrokenPipeError, AttributeError): pass except Exception as ex: @@ -223,8 +232,36 @@ def _stderr_loop(self) -> None: json_rpc_processor = JsonRpcProcessor() +class NodeIpcIO(): + _buf = bytearray() + _lines = 0 + + def __init__(self, conn: multiprocessing.connection._ConnectionBase): + self._conn = conn + + # https://github.com/python/cpython/blob/330f1d58282517bdf1f19577ab9317fa9810bf95/Lib/multiprocessing/connection.py#L378-L392 + def readline(self): + while self._lines == 0: + chunk = self._conn._read(self._conn.fileno(), 65536) # type: bytes + self._buf += chunk + self._lines += chunk.count(b'\n') + + self._lines -= 1 + foo, _, self._buf = self._buf.partition(b'\n') + print('READLINE: ' + str(foo)) + return foo + + # https://github.com/python/cpython/blob/330f1d58282517bdf1f19577ab9317fa9810bf95/Lib/multiprocessing/connection.py#L369-L376 + def write(self, data: bytes): + while len(data): + n = self._conn._write(self._conn.fileno(), data) + data = data[n:] + + def create_transport(config: TransportConfig, cwd: Optional[str], callback_object: TransportCallbacks) -> Transport[Dict[str, Any]]: + stderr = subprocess.PIPE + pass_fds = () if config.tcp_port is not None: assert config.tcp_port is not None if config.tcp_port < 0: @@ -232,15 +269,21 @@ def create_transport(config: TransportConfig, cwd: Optional[str], else: stdout = subprocess.DEVNULL stdin = subprocess.DEVNULL - else: + elif not config.node_ipc: stdout = subprocess.PIPE stdin = subprocess.PIPE + else: + stdout = subprocess.PIPE + stdin = subprocess.DEVNULL + stderr = subprocess.STDOUT + pass_fds = (config.node_ipc.child_conn.fileno(),) + startupinfo = _fixup_startup_args(config.command) sock = None # type: Optional[socket.socket] process = None # type: Optional[subprocess.Popen] def start_subprocess() -> subprocess.Popen: - return _start_subprocess(config.command, stdin, stdout, subprocess.PIPE, startupinfo, config.env, cwd) + return _start_subprocess(config.command, stdin, stdout, stderr, startupinfo, config.env, cwd, pass_fds) if config.listener_socket: assert isinstance(config.tcp_port, int) and config.tcp_port > 0 @@ -258,13 +301,16 @@ def start_subprocess() -> subprocess.Popen: raise RuntimeError("Failed to connect on port {}".format(config.tcp_port)) reader = sock.makefile('rwb') # type: ignore writer = reader - else: + elif not config.node_ipc: reader = process.stdout # type: ignore writer = process.stdin # type: ignore + else: + reader = writer = NodeIpcIO(config.node_ipc.parent_conn) if not reader or not writer: raise RuntimeError('Failed initializing transport: reader: {}, writer: {}'.format(reader, writer)) - return ProcessTransport(config.name, process, sock, reader, writer, process.stderr, json_rpc_processor, - callback_object) + stderr_reader = process.stdout if config.node_ipc else process.stderr + return ProcessTransport(config.name, process, sock, reader, writer, stderr_reader, json_rpc_processor, + callback_object, bool(config.node_ipc)) _subprocesses = weakref.WeakSet() # type: weakref.WeakSet[subprocess.Popen] @@ -312,7 +358,8 @@ def _start_subprocess( stderr: int, startupinfo: Any, env: Dict[str, str], - cwd: Optional[str] + cwd: Optional[str], + pass_fds: Union[Tuple[()], Tuple[int]] ) -> subprocess.Popen: debug("starting {} in {}".format(args, cwd if cwd else os.getcwd())) process = subprocess.Popen( @@ -322,7 +369,8 @@ def _start_subprocess( stderr=stderr, startupinfo=startupinfo, env=env, - cwd=cwd) + cwd=cwd, + pass_fds=pass_fds) _subprocesses.add(process) return process diff --git a/plugin/core/types.py b/plugin/core/types.py index 85d352167..86d6beccc 100644 --- a/plugin/core/types.py +++ b/plugin/core/types.py @@ -10,8 +10,11 @@ from wcmatch.glob import BRACE from wcmatch.glob import globmatch from wcmatch.glob import GLOBSTAR +import collections import contextlib import fnmatch +import multiprocessing +import multiprocessing.connection import os import posixpath import socket @@ -605,8 +608,12 @@ def map_from_remote_to_local(self, uri: str) -> Tuple[str, bool]: return _translate_path(uri, self._remote, self._local) +NodeIpc = collections.namedtuple('NodeIpc', 'parent_conn,child_conn') +NodeIpc.__annotations__ = {'parent_conn': multiprocessing.connection._ConnectionBase, 'child_conn': multiprocessing.connection._ConnectionBase} + + class TransportConfig: - __slots__ = ("name", "command", "tcp_port", "env", "listener_socket") + __slots__ = ("name", "command", "tcp_port", "env", "listener_socket", "node_ipc") def __init__( self, @@ -614,15 +621,19 @@ def __init__( command: List[str], tcp_port: Optional[int], env: Dict[str, str], - listener_socket: Optional[socket.socket] + listener_socket: Optional[socket.socket], + node_ipc: Optional[NodeIpc] ) -> None: if not command and not tcp_port: raise ValueError('neither "command" nor "tcp_port" is provided; cannot start a language server') + if node_ipc and (tcp_port or listener_socket): + raise ValueError('"tcp_port" and "listener_socket" can\'t be provided in "--node-ipc" mode; cannot start a language server') self.name = name self.command = command self.tcp_port = tcp_port self.env = env self.listener_socket = listener_socket + self.node_ipc = node_ipc class ClientConfig: @@ -790,7 +801,11 @@ def resolve_transport_config(self, variables: Dict[str, str]) -> TransportConfig env[key] = sublime.expand_variables(value, variables) + os.path.pathsep + env[key] else: env[key] = sublime.expand_variables(value, variables) - return TransportConfig(self.name, command, tcp_port, env, listener_socket) + node_ipc = None + if '--node-ipc' in command: + node_ipc = NodeIpc(*multiprocessing.Pipe()) + env["NODE_CHANNEL_FD"] = str(node_ipc.child_conn.fileno()) + return TransportConfig(self.name, command, tcp_port, env, listener_socket, node_ipc) def set_view_status(self, view: sublime.View, message: str) -> None: if sublime.load_settings("LSP.sublime-settings").get("show_view_status"):