From 65e412968db64630825bf5fd9e36c0ee8d28e016 Mon Sep 17 00:00:00 2001 From: tomasz t Date: Sat, 17 Feb 2024 14:58:53 +0100 Subject: [PATCH 1/8] allow passing kwargs to sftp client --- sshfs/pools/base.py | 4 +++- sshfs/spec.py | 25 ++++++++++++++++++++----- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/sshfs/pools/base.py b/sshfs/pools/base.py index 698f2e7..478d2a1 100644 --- a/sshfs/pools/base.py +++ b/sshfs/pools/base.py @@ -38,6 +38,8 @@ def __init__( self.unsafe_terminate = unsafe_terminate self._stack = AsyncExitStack() + self.other_init_params = kwargs or {} + async def _maybe_new_channel(self): # If there is no hard limit or the limit is not hit yet # try to create a new channel @@ -47,7 +49,7 @@ async def _maybe_new_channel(self): ): try: return await self._stack.enter_async_context( - self.client.start_sftp_client() + self.client.start_sftp_client(**self.other_init_params) ) except ChannelOpenError: # If we can't create any more channels, then change diff --git a/sshfs/spec.py b/sshfs/spec.py index 6dba100..5210959 100644 --- a/sshfs/spec.py +++ b/sshfs/spec.py @@ -45,7 +45,9 @@ def __init__( SSH host to connect. **kwargs: Any Any option that will be passed to either the top level - `AsyncFileSystem` or the `asyncssh.connect`. + `AsyncFileSystem` (e.g. timeout) + or `asyncssh.SSHClientConnection.start_sftp_client` (e.g. env, send_env, path_encoding, path_errors, sftp_version) + or the `asyncssh.connect`. pool_type: sshfs.pools.base.BaseSFTPChannelPool Pool manager to use (when doing concurrent operations together, pool managers offer the flexibility of prioritizing channels @@ -54,6 +56,17 @@ def __init__( super().__init__(self, **kwargs) + _sftp_client_args = { + k: kwargs.pop(k) for k in kwargs.copy().keys() + if k in { + "env", + "send_env", + "path_encoding", + "path_errors", + "sftp_version", + } + } + _timeout = kwargs.pop("timeout", None) max_sessions = kwargs.pop("max_sessions", _DEFAULT_MAX_SESSIONS) if max_sessions <= _SHELL_CHANNELS: raise ValueError( @@ -68,7 +81,9 @@ def __init__( host, pool_type, max_sftp_channels=max_sessions - _SHELL_CHANNELS, - **_client_args, + timeout=_timeout, # goes to sync_wrapper + connect_args=_client_args, # for asyncssh.connect + sftp_client_args=_sftp_client_args, # for asyncssh.SSHClientConnection.start_sftp_client ) weakref.finalize( self, sync, self.loop, self._finalize, self._pool, self._stack @@ -89,13 +104,13 @@ def _get_kwargs_from_urls(urlpath): @wrap_exceptions async def _connect( - self, host, pool_type, max_sftp_channels, **client_args + self, host, pool_type, max_sftp_channels, connect_args, sftp_client_args ): self._client_lock = asyncio.Semaphore(_SHELL_CHANNELS) - _raw_client = asyncssh.connect(host, **client_args) + _raw_client = asyncssh.connect(host, **connect_args) client = await self._stack.enter_async_context(_raw_client) - pool = pool_type(client, max_channels=max_sftp_channels) + pool = pool_type(client, max_channels=max_sftp_channels, **sftp_client_args) return client, pool connect = sync_wrapper(_connect) From 62c16a199336782677fcf35e9ef869ffd6875131 Mon Sep 17 00:00:00 2001 From: tomasz t Date: Sat, 17 Feb 2024 15:06:34 +0100 Subject: [PATCH 2/8] fixed formatting --- sshfs/spec.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/sshfs/spec.py b/sshfs/spec.py index 5210959..e5be54d 100644 --- a/sshfs/spec.py +++ b/sshfs/spec.py @@ -57,8 +57,10 @@ def __init__( super().__init__(self, **kwargs) _sftp_client_args = { - k: kwargs.pop(k) for k in kwargs.copy().keys() - if k in { + k: kwargs.pop(k) + for k in kwargs.copy().keys() + if k + in { "env", "send_env", "path_encoding", @@ -104,13 +106,20 @@ def _get_kwargs_from_urls(urlpath): @wrap_exceptions async def _connect( - self, host, pool_type, max_sftp_channels, connect_args, sftp_client_args + self, + host, + pool_type, + max_sftp_channels, + connect_args, + sftp_client_args, ): self._client_lock = asyncio.Semaphore(_SHELL_CHANNELS) _raw_client = asyncssh.connect(host, **connect_args) client = await self._stack.enter_async_context(_raw_client) - pool = pool_type(client, max_channels=max_sftp_channels, **sftp_client_args) + pool = pool_type( + client, max_channels=max_sftp_channels, **sftp_client_args + ) return client, pool connect = sync_wrapper(_connect) From 0a8a32daba21401ccb9101eb59b2b9de3e22df25 Mon Sep 17 00:00:00 2001 From: tomasz t Date: Sat, 17 Feb 2024 15:18:24 +0100 Subject: [PATCH 3/8] make sure kwargs that are passed to sftp_client_kwargs are acceptable --- sshfs/pools/base.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/sshfs/pools/base.py b/sshfs/pools/base.py index 478d2a1..a36fd08 100644 --- a/sshfs/pools/base.py +++ b/sshfs/pools/base.py @@ -38,7 +38,18 @@ def __init__( self.unsafe_terminate = unsafe_terminate self._stack = AsyncExitStack() - self.other_init_params = kwargs or {} + self.sftp_client_kwargs = { + k: v + for k, v in kwargs.items() + if k + in { + "env", + "send_env", + "path_encoding", + "path_errors", + "sftp_version", + } + } async def _maybe_new_channel(self): # If there is no hard limit or the limit is not hit yet @@ -49,7 +60,7 @@ async def _maybe_new_channel(self): ): try: return await self._stack.enter_async_context( - self.client.start_sftp_client(**self.other_init_params) + self.client.start_sftp_client(**self.sftp_client_kwargs) ) except ChannelOpenError: # If we can't create any more channels, then change From 11024e1a23a3dfb52b8b0df73d93ff7661efcfc7 Mon Sep 17 00:00:00 2001 From: tomasz t Date: Sun, 16 Jun 2024 19:30:50 +0200 Subject: [PATCH 4/8] create a separate parameter for SSHClientConnection options --- sshfs/spec.py | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/sshfs/spec.py b/sshfs/spec.py index e5be54d..451b008 100644 --- a/sshfs/spec.py +++ b/sshfs/spec.py @@ -2,6 +2,7 @@ import posixpath import shlex import stat +from typing import Optional import weakref from contextlib import AsyncExitStack, suppress from datetime import datetime @@ -34,6 +35,7 @@ def __init__( host, *, pool_type=SFTPSoftChannelPool, + sftp_client_args: Optional[dict] = None, **kwargs, ): """ @@ -46,28 +48,18 @@ def __init__( **kwargs: Any Any option that will be passed to either the top level `AsyncFileSystem` (e.g. timeout) - or `asyncssh.SSHClientConnection.start_sftp_client` (e.g. env, send_env, path_encoding, path_errors, sftp_version) or the `asyncssh.connect`. pool_type: sshfs.pools.base.BaseSFTPChannelPool Pool manager to use (when doing concurrent operations together, pool managers offer the flexibility of prioritizing channels and deciding which to use). + sftp_client_args: Optional[dict] + Parameters to pass to asyncssh.SSHClientConnection.start_sftp_client method + (e.g. env, send_env, path_encoding, path_errors, sftp_version). """ super().__init__(self, **kwargs) - _sftp_client_args = { - k: kwargs.pop(k) - for k in kwargs.copy().keys() - if k - in { - "env", - "send_env", - "path_encoding", - "path_errors", - "sftp_version", - } - } _timeout = kwargs.pop("timeout", None) max_sessions = kwargs.pop("max_sessions", _DEFAULT_MAX_SESSIONS) if max_sessions <= _SHELL_CHANNELS: @@ -85,7 +77,7 @@ def __init__( max_sftp_channels=max_sessions - _SHELL_CHANNELS, timeout=_timeout, # goes to sync_wrapper connect_args=_client_args, # for asyncssh.connect - sftp_client_args=_sftp_client_args, # for asyncssh.SSHClientConnection.start_sftp_client + sftp_client_args=sftp_client_args or {}, # for asyncssh.SSHClientConnection.start_sftp_client ) weakref.finalize( self, sync, self.loop, self._finalize, self._pool, self._stack From 3187663927ef5df46e494d233643f782a273c752 Mon Sep 17 00:00:00 2001 From: tomasz t Date: Sun, 16 Jun 2024 19:33:01 +0200 Subject: [PATCH 5/8] renamed args to kwargs --- sshfs/spec.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sshfs/spec.py b/sshfs/spec.py index 451b008..8e2caee 100644 --- a/sshfs/spec.py +++ b/sshfs/spec.py @@ -35,7 +35,7 @@ def __init__( host, *, pool_type=SFTPSoftChannelPool, - sftp_client_args: Optional[dict] = None, + sftp_client_kwargs: Optional[dict] = None, **kwargs, ): """ @@ -53,7 +53,7 @@ def __init__( Pool manager to use (when doing concurrent operations together, pool managers offer the flexibility of prioritizing channels and deciding which to use). - sftp_client_args: Optional[dict] + sftp_client_kwargs: Optional[dict] Parameters to pass to asyncssh.SSHClientConnection.start_sftp_client method (e.g. env, send_env, path_encoding, path_errors, sftp_version). """ @@ -77,7 +77,7 @@ def __init__( max_sftp_channels=max_sessions - _SHELL_CHANNELS, timeout=_timeout, # goes to sync_wrapper connect_args=_client_args, # for asyncssh.connect - sftp_client_args=sftp_client_args or {}, # for asyncssh.SSHClientConnection.start_sftp_client + sftp_client_args=sftp_client_kwargs or {}, # for asyncssh.SSHClientConnection.start_sftp_client ) weakref.finalize( self, sync, self.loop, self._finalize, self._pool, self._stack @@ -103,14 +103,14 @@ async def _connect( pool_type, max_sftp_channels, connect_args, - sftp_client_args, + sftp_client_kwargs, ): self._client_lock = asyncio.Semaphore(_SHELL_CHANNELS) _raw_client = asyncssh.connect(host, **connect_args) client = await self._stack.enter_async_context(_raw_client) pool = pool_type( - client, max_channels=max_sftp_channels, **sftp_client_args + client, max_channels=max_sftp_channels, **sftp_client_kwargs ) return client, pool From 932ccf63de1895b2a093077f930f3b0e934058fe Mon Sep 17 00:00:00 2001 From: tomasz t Date: Sun, 16 Jun 2024 19:44:39 +0200 Subject: [PATCH 6/8] fixed rename of parameter --- sshfs/spec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sshfs/spec.py b/sshfs/spec.py index 8e2caee..c8be3ae 100644 --- a/sshfs/spec.py +++ b/sshfs/spec.py @@ -77,7 +77,7 @@ def __init__( max_sftp_channels=max_sessions - _SHELL_CHANNELS, timeout=_timeout, # goes to sync_wrapper connect_args=_client_args, # for asyncssh.connect - sftp_client_args=sftp_client_kwargs or {}, # for asyncssh.SSHClientConnection.start_sftp_client + sftp_client_kwargs=sftp_client_kwargs or {}, # for asyncssh.SSHClientConnection.start_sftp_client ) weakref.finalize( self, sync, self.loop, self._finalize, self._pool, self._stack From b97199996a31156d3b4472d614a403c9862d42c6 Mon Sep 17 00:00:00 2001 From: tomasz t Date: Sun, 16 Jun 2024 19:49:04 +0200 Subject: [PATCH 7/8] fixed formatting with pre-commit --- sshfs/spec.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sshfs/spec.py b/sshfs/spec.py index c8be3ae..95b723a 100644 --- a/sshfs/spec.py +++ b/sshfs/spec.py @@ -2,10 +2,10 @@ import posixpath import shlex import stat -from typing import Optional import weakref from contextlib import AsyncExitStack, suppress from datetime import datetime +from typing import Optional import asyncssh from asyncssh.sftp import SFTPOpUnsupported @@ -54,7 +54,7 @@ def __init__( pool managers offer the flexibility of prioritizing channels and deciding which to use). sftp_client_kwargs: Optional[dict] - Parameters to pass to asyncssh.SSHClientConnection.start_sftp_client method + Parameters to pass to asyncssh.SSHClientConnection.start_sftp_client method (e.g. env, send_env, path_encoding, path_errors, sftp_version). """ @@ -68,6 +68,7 @@ def __init__( ) _client_args = kwargs.copy() _client_args.setdefault("known_hosts", None) + sftp_client_kwargs = sftp_client_kwargs or {} self._stack = AsyncExitStack() self.active_executors = 0 @@ -77,7 +78,7 @@ def __init__( max_sftp_channels=max_sessions - _SHELL_CHANNELS, timeout=_timeout, # goes to sync_wrapper connect_args=_client_args, # for asyncssh.connect - sftp_client_kwargs=sftp_client_kwargs or {}, # for asyncssh.SSHClientConnection.start_sftp_client + sftp_client_kwargs=sftp_client_kwargs, # for asyncssh.SSHClientConnection.start_sftp_client ) weakref.finalize( self, sync, self.loop, self._finalize, self._pool, self._stack From 2292727dc6221bc469f1dfb8c893fb68449c58c6 Mon Sep 17 00:00:00 2001 From: tomasz t Date: Sun, 16 Jun 2024 20:00:09 +0200 Subject: [PATCH 8/8] added sftp_client_kwargs parameter to pool classes --- sshfs/pools/base.py | 21 ++++++--------------- sshfs/spec.py | 4 +++- 2 files changed, 9 insertions(+), 16 deletions(-) diff --git a/sshfs/pools/base.py b/sshfs/pools/base.py index a36fd08..89ced8c 100644 --- a/sshfs/pools/base.py +++ b/sshfs/pools/base.py @@ -1,5 +1,6 @@ import asyncio from contextlib import AsyncExitStack, suppress +from typing import Optional from asyncssh.misc import ChannelOpenError @@ -15,9 +16,10 @@ def __init__( self, client, *, - max_channels=None, - timeout=MAX_TIMEOUT, - unsafe_terminate=True, + max_channels: Optional[int] = None, + timeout: int = MAX_TIMEOUT, + unsafe_terminate: bool = True, + sftp_client_kwargs: Optional[dict] = None, **kwargs, ): self.client = client @@ -38,18 +40,7 @@ def __init__( self.unsafe_terminate = unsafe_terminate self._stack = AsyncExitStack() - self.sftp_client_kwargs = { - k: v - for k, v in kwargs.items() - if k - in { - "env", - "send_env", - "path_encoding", - "path_errors", - "sftp_version", - } - } + self.sftp_client_kwargs = sftp_client_kwargs or {} async def _maybe_new_channel(self): # If there is no hard limit or the limit is not hit yet diff --git a/sshfs/spec.py b/sshfs/spec.py index 95b723a..d5791a9 100644 --- a/sshfs/spec.py +++ b/sshfs/spec.py @@ -111,7 +111,9 @@ async def _connect( _raw_client = asyncssh.connect(host, **connect_args) client = await self._stack.enter_async_context(_raw_client) pool = pool_type( - client, max_channels=max_sftp_channels, **sftp_client_kwargs + client, + max_channels=max_sftp_channels, + sftp_client_kwargs=sftp_client_kwargs, ) return client, pool