Skip to content

Commit

Permalink
[GH-539] Implement no-flow-control extension (fixes #539)
Browse files Browse the repository at this point in the history
  • Loading branch information
gnodet committed Jul 26, 2024
1 parent e58d2f1 commit 5435c4c
Show file tree
Hide file tree
Showing 16 changed files with 503 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
## New Features

* New utility methods `SftpClient.put(Path localFile, String remoteFileName)` and `SftpClient.put(InputStream in, String remoteFileName)` facilitate SFTP file uploading.
* [GH-539](https://github.com/apache/mina-sshd/issues/539) Implement no-flow-control extension

## Potential compatibility issues

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
Expand All @@ -42,12 +43,14 @@
import org.apache.sshd.client.auth.AuthenticationIdentitiesProvider;
import org.apache.sshd.client.config.hosts.HostConfigEntry;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.common.PropertyResolver;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.ReflectionUtils;
import org.apache.sshd.common.util.io.input.NoCloseInputStream;
import org.apache.sshd.common.util.threads.ThreadUtils;
import org.apache.sshd.core.CoreModuleProperties;
import org.apache.sshd.scp.client.ScpClient;
import org.apache.sshd.scp.client.ScpClient.Option;
import org.apache.sshd.scp.client.ScpClientCreator;
Expand All @@ -61,6 +64,8 @@
import org.apache.sshd.scp.common.helpers.ScpTimestampCommandDetails;
import org.slf4j.Logger;

import static org.apache.sshd.common.PropertyResolverUtils.toPropertyResolver;

/**
* @see <A HREF="https://man7.org/linux/man-pages/man1/scp.1.html">SCP(1) - manual page</A>
* @author <a href="mailto:[email protected]">Apache MINA SSHD Project</a>
Expand Down Expand Up @@ -151,7 +156,7 @@ public static String[] normalizeCommandArguments(PrintStream stdout, PrintStream
return null;
}

return effective.toArray(new String[effective.size()]);
return effective.toArray(new String[0]);
}

/* -------------------------------------------------------------------------------- */
Expand Down Expand Up @@ -248,11 +253,11 @@ public static void showUsageMessage(PrintStream stderr) {
public static void xferLocalToRemote(
BufferedReader stdin, PrintStream stdout, PrintStream stderr, String[] args,
ScpLocation source, ScpLocation target, Collection<Option> options,
OutputStream logStream, Level level, boolean quiet)
OutputStream logStream, Level level, boolean quiet, PropertyResolver defaultOptions)
throws Exception {
ScpClientCreator creator = resolveScpClientCreator(stderr, args);
ClientSession session = ((logStream == null) || (creator == null) || GenericUtils.isEmpty(args))
? null : setupClientSession(SCP_PORT_OPTION, stdin, level, stdout, stderr, args);
? null : setupClientSession(SCP_PORT_OPTION, stdin, level, stdout, stderr, args, defaultOptions);
if (session == null) {
showUsageMessage(stderr);
System.exit(-1);
Expand Down Expand Up @@ -330,10 +335,10 @@ private void logEvent(
public static void xferRemoteToRemote(
BufferedReader stdin, PrintStream stdout, PrintStream stderr, String[] args,
ScpLocation source, ScpLocation target, Collection<Option> options,
OutputStream logStream, Level level, boolean quiet)
OutputStream logStream, Level level, boolean quiet, PropertyResolver defaultOptions)
throws Exception {
ClientSession srcSession = ((logStream == null) || GenericUtils.isEmpty(args))
? null : setupClientSession(SCP_PORT_OPTION, stdin, level, stdout, stderr, args);
? null : setupClientSession(SCP_PORT_OPTION, stdin, level, stdout, stderr, args, defaultOptions);
if (srcSession == null) {
showUsageMessage(stderr);
System.exit(-1);
Expand Down Expand Up @@ -444,6 +449,10 @@ public static void main(String[] args) throws Exception {
new InputStreamReader(new NoCloseInputStream(System.in), Charset.defaultCharset()))) {
args = normalizeCommandArguments(stdout, stderr, args);

PropertyResolver defaultOptions = toPropertyResolver(new HashMap<>());
CoreModuleProperties.NO_FLOW_CONTROL.set(defaultOptions, Boolean.TRUE);
CoreModuleProperties.WINDOW_SIZE.set(defaultOptions, 1024L * 1024L * 1024L);

Level level = Level.SEVERE;
int numArgs = GenericUtils.length(args);
// see the way normalizeCommandArguments works...
Expand Down Expand Up @@ -472,9 +481,11 @@ public static void main(String[] args) throws Exception {
}

if (threeWay) {
xferRemoteToRemote(stdin, stdout, stderr, args, source, target, options, logStream, level, quiet);
xferRemoteToRemote(stdin, stdout, stderr, args, source, target, options, logStream, level, quiet,
defaultOptions);
} else {
xferLocalToRemote(stdin, stdout, stderr, args, source, target, options, logStream, level, quiet);
xferLocalToRemote(stdin, stdout, stderr, args, source, target, options, logStream, level, quiet,
defaultOptions);
}
} finally {
if ((logStream != stdout) && (logStream != stderr)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
Expand All @@ -47,6 +48,7 @@
import org.apache.sshd.client.ClientFactoryManager;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.common.NamedResource;
import org.apache.sshd.common.PropertyResolver;
import org.apache.sshd.common.ServiceFactory;
import org.apache.sshd.common.channel.ChannelFactory;
import org.apache.sshd.common.cipher.CipherFactory;
Expand All @@ -71,6 +73,7 @@
import org.apache.sshd.common.util.io.output.LineLevelAppenderStream;
import org.apache.sshd.common.util.io.output.NullOutputStream;
import org.apache.sshd.common.util.threads.ThreadUtils;
import org.apache.sshd.core.CoreModuleProperties;
import org.apache.sshd.server.config.SshServerConfigFileReader;
import org.apache.sshd.sftp.client.SftpClient;
import org.apache.sshd.sftp.client.SftpClient.Attributes;
Expand All @@ -91,6 +94,8 @@
import org.apache.sshd.sftp.common.extensions.openssh.StatVfsExtensionParser;
import org.slf4j.Logger;

import static org.apache.sshd.common.PropertyResolverUtils.toPropertyResolver;

/**
* TODO Add javadoc
*
Expand Down Expand Up @@ -366,9 +371,13 @@ public static void main(String[] args) throws Exception {
setupLogging(level, stdout, stderr, logStream);
}

PropertyResolver defaultOptions = toPropertyResolver(new HashMap<>());
CoreModuleProperties.NO_FLOW_CONTROL.set(defaultOptions, Boolean.TRUE);
CoreModuleProperties.WINDOW_SIZE.set(defaultOptions, 1024L * 1024L * 1024L);

ClientSession session = (logStream == null)
? null
: setupClientSession(SFTP_PORT_OPTION, stdin, level, stdout, stderr, args);
: setupClientSession(SFTP_PORT_OPTION, stdin, level, stdout, stderr, args, defaultOptions);
if (session == null) {
System.err.println("usage: sftp [-v[v][v]] [-E logoutput] [-i identity] [-io nio2|mina|netty]"
+ " [-J proxyJump] [-l login] [" + SFTP_PORT_OPTION + " port] [-o option=value]"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,15 @@ public static boolean isArgumentedOption(String portOption, String argName) {
// CHECKSTYLE:OFF
public static ClientSession setupClientSession(
String portOption, BufferedReader stdin, Level level,
PrintStream stdout, PrintStream stderr, String... args)
PrintStream stdout, PrintStream stderr, String[] args)
throws Exception {
return setupClientSession(portOption, stdin, level, stdout, stderr, args, null);
}

public static ClientSession setupClientSession(
String portOption, BufferedReader stdin, Level level,
PrintStream stdout, PrintStream stderr, String[] args,
PropertyResolver defaultOptions)
throws Exception {
int port = -1;
String host = null;
Expand Down Expand Up @@ -240,7 +248,8 @@ public static ClientSession setupClientSession(
return null;
}

PropertyResolver resolver = PropertyResolverUtils.toPropertyResolver(options);
PropertyResolver resolver = PropertyResolverUtils.toPropertyResolver(options,
defaultOptions);
SshClient client = setupClient(
resolver, ciphers, macs, compressions, identities,
stdin, stdout, stderr, level, args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
public class NoFlowControl extends AbstractKexExtensionParser<String> {
public static final String NAME = "no-flow-control";

public static final String SUPPORTED = "s";
public static final String PREFERRED = "p";

public static final NoFlowControl INSTANCE = new NoFlowControl();

public NoFlowControl() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ public void consume(long len) throws IOException {
BufferUtils.validateUint32Value(len, "Invalid consumption length: %d");
checkInitialized("consume");

if (noFlowControl) {
// flow control is disabled, so just bail out
return;
}

long remainLen;
synchronized (lock) {
remainLen = getSize() - len;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public void consume(long len) {
BufferUtils.validateUint32Value(len, "Invalid consumption length: %d");
checkInitialized("consume");

if (noFlowControl) {
// flow control is disabled, so just bail out
return;
}

long remainLen;
synchronized (lock) {
remainLen = getSize() - len;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.function.Predicate;

import org.apache.sshd.common.PropertyResolver;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.buffer.BufferUtils;
import org.apache.sshd.common.util.logging.AbstractLoggingBean;
Expand All @@ -46,6 +47,8 @@ public abstract class Window extends AbstractLoggingBean implements ChannelHolde

protected final Object lock = new Object();

protected boolean noFlowControl;

private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final Channel channelInstance;
Expand Down Expand Up @@ -94,6 +97,8 @@ protected void init(long size, long packetSize, PropertyResolver resolver) {
}

synchronized (lock) {
Session session = channelInstance.getSession(); // this should only be null during tests
this.noFlowControl = session != null && session.isNoFlowControl();
this.maxSize = size;
this.packetSize = packetSize;
updateSize(size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,23 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.BiConsumer;

import org.apache.sshd.common.AttributeRepository.AttributeKey;
import org.apache.sshd.common.NamedFactory;
import org.apache.sshd.common.kex.extension.parser.HostBoundPubkeyAuthentication;
import org.apache.sshd.common.kex.extension.parser.NoFlowControl;
import org.apache.sshd.common.kex.extension.parser.ServerSignatureAlgorithms;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.session.helpers.AbstractSession;
import org.apache.sshd.common.signature.Signature;
import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.logging.AbstractLoggingBean;
import org.apache.sshd.core.CoreModuleProperties;

/**
* Detects if the server sends a
Expand Down Expand Up @@ -92,6 +97,15 @@ public boolean handleKexExtensionRequest(
} else {
session.setAttribute(HOSTBOUND_AUTHENTICATION, version);
}
} else if (NoFlowControl.NAME.equals(name)) {
String o = NoFlowControl.INSTANCE.parseExtension(data);
Optional<Boolean> nfc = CoreModuleProperties.NO_FLOW_CONTROL.get(session);
if (NoFlowControl.PREFERRED.equals(o) && nfc.orElse(Boolean.TRUE)
|| NoFlowControl.SUPPORTED.equals(o) && nfc.orElse(Boolean.FALSE)) {
AbstractSession abstractSession
= ValidateUtils.checkInstanceOf(session, AbstractSession.class, "Not a supported session: %s", session);
abstractSession.activateNoFlowControl();
}
}
return true;
}
Expand Down Expand Up @@ -157,13 +171,19 @@ public void sendKexExtensions(Session session, KexPhase phase) throws Exception
* Collects extension info records, handing them off to the given {@code marshaller} for writing into an
* {@link KexExtensions#SSH_MSG_EXT_INFO} message.
* <p>
* This default implementation does not marshal any extension.
* This default implementation marshals a {@link NoFlowControl} extension}.
* </p>
*
* @param session {@link Session} to send the KEX extension information for
* @param phase {@link KexPhase} of the SSH protocol
* @param marshaller {@link BiConsumer} writing the extensions into an SSH message
*/
public void collectExtensions(Session session, KexPhase phase, BiConsumer<String, Object> marshaller) {
// no-flow-control
Boolean nfc = CoreModuleProperties.NO_FLOW_CONTROL.get(session).orElse(null);
if (nfc == null || nfc) {
marshaller.accept(NoFlowControl.NAME,
nfc != null ? NoFlowControl.PREFERRED : NoFlowControl.SUPPORTED);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,29 @@

package org.apache.sshd.common.kex.extension;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;

import org.apache.sshd.common.AttributeRepository.AttributeKey;
import org.apache.sshd.common.kex.KexProposalOption;
import org.apache.sshd.common.kex.extension.parser.NoFlowControl;
import org.apache.sshd.common.kex.extension.parser.ServerSignatureAlgorithms;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.session.helpers.AbstractSession;
import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.logging.AbstractLoggingBean;
import org.apache.sshd.core.CoreModuleProperties;

/**
* A basic default implementation of a server-side {@link KexExtensionHandler} handling the
* {@link ServerSignatureAlgorithms} KEX extension.
* {@link ServerSignatureAlgorithms} KEX extension along with the {@link NoFlowControl} extension.
*
* @see <a href="https://tools.ietf.org/html/rfc8308">RFC 8308</a>
*/
Expand Down Expand Up @@ -130,6 +136,23 @@ public void sendKexExtensions(Session session, KexPhase phase) throws Exception
}
}

@Override
public boolean handleKexExtensionRequest(
Session session, int index, int count, String name, byte[] data)
throws IOException {
if (NoFlowControl.NAME.equals(name)) {
String o = NoFlowControl.INSTANCE.parseExtension(data);
Optional<Boolean> nfc = CoreModuleProperties.NO_FLOW_CONTROL.get(session);
if (NoFlowControl.PREFERRED.equals(o) && nfc.orElse(Boolean.TRUE)
|| NoFlowControl.SUPPORTED.equals(o) && nfc.orElse(Boolean.FALSE)) {
AbstractSession abstractSession
= ValidateUtils.checkInstanceOf(session, AbstractSession.class, "Not a supported session: %s", session);
abstractSession.activateNoFlowControl();
}
}
return true;
}

/**
* Collects extension info records, handing them off to the given {@code marshaller} for writing into an
* {@link KexExtensions#SSH_MSG_EXT_INFO} message.
Expand All @@ -144,6 +167,7 @@ public void sendKexExtensions(Session session, KexPhase phase) throws Exception
*/
@SuppressWarnings("javadoc")
public void collectExtensions(Session session, KexPhase phase, BiConsumer<String, Object> marshaller) {
// server-sig-algs
if (phase == KexPhase.NEWKEYS) {
Collection<String> algorithms = session.getSignatureFactoriesNames();
if (!GenericUtils.isEmpty(algorithms)) {
Expand All @@ -157,5 +181,11 @@ public void collectExtensions(Session session, KexPhase phase, BiConsumer<String
ServerSignatureAlgorithms.NAME);
}
}
// no-flow-control
Boolean nfc = CoreModuleProperties.NO_FLOW_CONTROL.get(session).orElse(null);
if (nfc != Boolean.FALSE) {
marshaller.accept(NoFlowControl.NAME,
nfc != null ? NoFlowControl.PREFERRED : NoFlowControl.SUPPORTED);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -382,4 +382,10 @@ static <T> T resolveAttribute(Session session, AttributeRepository.AttributeKey<
T value = session.getAttribute(key);
return (value != null) ? value : FactoryManager.resolveAttribute(session.getFactoryManager(), key);
}

/**
* Check if the no-flow-control KEX extension has been activated.
*/
boolean isNoFlowControl();

}
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,13 @@ protected void channelOpen(Buffer buffer) throws Exception {
return;
}

if (getSession().isNoFlowControl() && !channels.isEmpty()) {
// TODO add language tag configurable control
sendChannelOpenFailure(buffer, sender, SshConstants.SSH_OPEN_CONNECT_FAILED,
"Only a single channel can be opened when using no-flow-control", "");
return;
}

Session session = getSession();
FactoryManager manager = Objects.requireNonNull(session.getFactoryManager(), "No factory manager");
Channel channel = ChannelFactory.createChannel(session, manager.getChannelFactories(), type);
Expand Down
Loading

0 comments on commit 5435c4c

Please sign in to comment.