Skip to content

Commit

Permalink
fix: resolve conflict (server bind address)
Browse files Browse the repository at this point in the history
  • Loading branch information
l1101100 committed Jan 28, 2016
1 parent 2653823 commit fb82730
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 92 deletions.
56 changes: 11 additions & 45 deletions src/main/java/com/cloudhopper/smpp/SmppServerConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,17 @@

/**
* Configuration of an SMPP server.
*
*
* @author joelauer (twitter: @jjlauer or <a href="http://twitter.com/jjlauer" target=window>http://twitter.com/jjlauer</a>)
*/
public class SmppServerConfiguration extends SmppConnectionConfiguration {

private String name;
<<<<<<< HEAD
private String bindAddress;
private int port;
=======
>>>>>>> 756f0e3608188dcc0d37110ff802eaf5407cab8d
// SSL
private boolean useSsl = false;
private SslConfiguration sslConfiguration;
// length of time to wait for a bind request
private long bindTimeout;
private long bindTimeout;
private String systemId;
// if true, <= 3.3 for interface version normalizes to version 3.3
// if true, >= 3.4 for interface version normalizes to version 3.4 and
Expand All @@ -49,7 +44,7 @@ public class SmppServerConfiguration extends SmppConnectionConfiguration {
// smpp version the server supports
private byte interfaceVersion;
// max number of connections/sessions this server will expect to handle
// this number corresponds to the number of worker threads handling reading
// this number corrosponds to the number of worker threads handling reading
// data from sockets and the thread things will be processed under
private int maxConnectionSize;
private boolean nonBlockingSocketsEnabled;
Expand All @@ -67,11 +62,6 @@ public class SmppServerConfiguration extends SmppConnectionConfiguration {
public SmppServerConfiguration() {
super("0.0.0.0", 2775, 5000l);
this.name = "SmppServer";
<<<<<<< HEAD
this.bindAddress = "0.0.0.0";
this.port = 2775;
=======
>>>>>>> 756f0e3608188dcc0d37110ff802eaf5407cab8d
this.bindTimeout = 5000;
this.systemId = "cloudhopper";
this.autoNegotiateInterfaceVersion = true;
Expand Down Expand Up @@ -102,7 +92,7 @@ public boolean isJmxEnabled() {
public void setJmxEnabled(boolean jmxEnabled) {
this.jmxEnabled = jmxEnabled;
}

public boolean isReuseAddress() {
return reuseAddress;
}
Expand All @@ -119,7 +109,7 @@ public boolean isNonBlockingSocketsEnabled() {
* Sets if non-blocking (NIO) is used for this server. If true, then the
* NIO server socket is used for Netty, otherwise the normal blocking
* server socket will be used.
* @param nonBlockingSocketsEnabled True if enabled, otherwise false
* @param nonBlockingSocketsEnabled True if enabled, otherwise false
*/
public void setNonBlockingSocketsEnabled(boolean nonBlockingSocketsEnabled) {
this.nonBlockingSocketsEnabled = nonBlockingSocketsEnabled;
Expand Down Expand Up @@ -154,45 +144,21 @@ public String getName() {
return this.name;
}

<<<<<<< HEAD
/**
* The address you specify in bind tells server where to listen. The default address is 0.0.0.0 which tells server
* to bind to every available network interface.
* @param value
*/
public void setBindAddress(String value) {
this.bindAddress = value;
}

public String getBindAddress() {
return this.bindAddress;
}

public int getPort() {
return port;
}

public void setPort(int port) {
this.port = port;
}

=======
>>>>>>> 756f0e3608188dcc0d37110ff802eaf5407cab8d
public void setUseSsl(boolean value) {
this.useSsl = value;
this.useSsl = value;
}

public boolean isUseSsl() {
return this.useSsl;
public boolean isUseSsl() {
return this.useSsl;
}

public void setSslConfiguration(SslConfiguration value) {
this.sslConfiguration = value;
setUseSsl(true);
this.sslConfiguration = value;
setUseSsl(true);
}

public SslConfiguration getSslConfiguration() {
return this.sslConfiguration;
return this.sslConfiguration;
}

/**
Expand Down
85 changes: 38 additions & 47 deletions src/main/java/com/cloudhopper/smpp/impl/DefaultSmppServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@
import com.cloudhopper.smpp.type.SmppProcessingException;
import com.cloudhopper.smpp.util.DaemonExecutors;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Timer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -66,7 +64,7 @@

/**
* Default implementation of an SmppServer that supports SMPP version 3.3 and 3.4.
*
*
* @author joelauer (twitter: @jjlauer or <a href="http://twitter.com/jjlauer" target=window>http://twitter.com/jjlauer</a>)
*/
public class DefaultSmppServer implements SmppServer, DefaultSmppServerMXBean {
Expand All @@ -80,17 +78,17 @@ public class DefaultSmppServer implements SmppServer, DefaultSmppServerMXBean {
private ExecutorService bossThreadPool;
private ChannelFactory channelFactory;
private ServerBootstrap serverBootstrap;
private Channel serverChannel;
private Channel serverChannel;
// shared instance of a timer for session writeTimeout timing
private final org.jboss.netty.util.Timer writeTimeoutTimer;
// shared instance of a timer background thread to close unbound channels
private final Timer bindTimer;
// shared instance of a session id generator (an atomic long)
// shared instance of a session id generator (an atomic long)
private final AtomicLong sessionIdSequence;
// shared instance for monitor executors
private final ScheduledExecutorService monitorExecutor;
private DefaultSmppServerCounters counters;

/**
* Creates a new default SmppServer. Window monitoring and automatic
* expiration of requests will be disabled with no monitorExecutors.
Expand All @@ -102,7 +100,7 @@ public class DefaultSmppServer implements SmppServer, DefaultSmppServerMXBean {
public DefaultSmppServer(SmppServerConfiguration configuration, SmppServerHandler serverHandler) {
this(configuration, serverHandler, DaemonExecutors.newCachedDaemonThreadPool());
}

/**
* Creates a new default SmppServer. Window monitoring and automatic
* expiration of requests will be disabled with no monitorExecutors.
Expand Down Expand Up @@ -138,37 +136,37 @@ public DefaultSmppServer(final SmppServerConfiguration configuration, SmppServer
this.serverHandler = serverHandler;
// we'll put the "boss" worker for a server in its own pool
this.bossThreadPool = Executors.newCachedThreadPool();

// a factory for creating channels (connections)
if (configuration.isNonBlockingSocketsEnabled()) {
this.channelFactory = new NioServerSocketChannelFactory(this.bossThreadPool, executor, configuration.getMaxConnectionSize());
} else {
this.channelFactory = new OioServerSocketChannelFactory(this.bossThreadPool, executor);
}

// tie the server bootstrap to this server socket channel factory
this.serverBootstrap = new ServerBootstrap(this.channelFactory);

// set options for the server socket that are useful
this.serverBootstrap.setOption("reuseAddress", configuration.isReuseAddress());

// we use the same default pipeline for all new channels - no need for a factory
this.serverConnector = new SmppServerConnector(channels, this);
this.serverBootstrap.getPipeline().addLast(SmppChannelConstants.PIPELINE_SERVER_CONNECTOR_NAME, this.serverConnector);
// a shared instance of a timer for session writeTimeout timing
this.writeTimeoutTimer = new org.jboss.netty.util.HashedWheelTimer();
// a shared instance of a timer for session writeTimeout timing
this.writeTimeoutTimer = new org.jboss.netty.util.HashedWheelTimer();
// a shared timer used to make sure new channels are bound within X milliseconds
this.bindTimer = new Timer(configuration.getName() + "-BindTimer0", true);
// NOTE: this would permit us to customize the "transcoding" context for a server if needed
this.transcoder = new DefaultPduTranscoder(new DefaultPduTranscoderContext());
this.sessionIdSequence = new AtomicLong(0);
this.sessionIdSequence = new AtomicLong(0);
this.monitorExecutor = monitorExecutor;
this.counters = new DefaultSmppServerCounters();
if (configuration.isJmxEnabled()) {
registerMBean();
}
}

private void registerMBean() {
if (configuration == null) {
return;
Expand All @@ -184,7 +182,7 @@ private void registerMBean() {
}
}
}

private void unregisterMBean() {
if (configuration == null) {
return;
Expand Down Expand Up @@ -213,7 +211,7 @@ public ChannelGroup getChannels() {
public SmppServerConfiguration getConfiguration() {
return this.configuration;
}

@Override
public DefaultSmppServerCounters getCounters() {
return this.counters;
Expand All @@ -222,7 +220,7 @@ public DefaultSmppServerCounters getCounters() {
public Timer getBindTimer() {
return this.bindTimer;
}

@Override
public boolean isStarted() {
return (this.serverChannel != null && this.serverChannel.isBound());
Expand All @@ -237,24 +235,17 @@ public boolean isStopped() {
public boolean isDestroyed() {
return (this.serverBootstrap == null);
}

@Override
public void start() throws SmppChannelException {
if (isDestroyed()) {
throw new SmppChannelException("Unable to start: server is destroyed");
}
try {
<<<<<<< HEAD
serverChannel = this.serverBootstrap.bind(new InetSocketAddress(InetAddress.getByName(configuration.getBindAddress()), configuration.getPort()));
logger.info("{} started on SMPP port [{}] and socket address [{}]", configuration.getName(), configuration.getPort(), configuration.getBindAddress());
=======
serverChannel = this.serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
logger.info("{} started at {}:{}", configuration.getName(), configuration.getHost(), configuration.getPort());
>>>>>>> 756f0e3608188dcc0d37110ff802eaf5407cab8d
} catch (ChannelException e) {
throw new SmppChannelException(e.getMessage(), e);
} catch (UnknownHostException e) {
throw new SmppChannelException(e.getMessage(), e);
}
}

Expand All @@ -272,14 +263,14 @@ public void stop() {
}
logger.info("{} stopped at {}:{}", configuration.getName(), configuration.getHost(), configuration.getPort());
}

@Override
public void destroy() {
this.bindTimer.cancel();
stop();
this.serverBootstrap.releaseExternalResources();
this.serverBootstrap = null;
this.writeTimeoutTimer.stop();
this.writeTimeoutTimer.stop();
unregisterMBean();
logger.info("{} destroyed on SMPP port [{}]", configuration.getName(), configuration.getPort());
}
Expand Down Expand Up @@ -314,7 +305,7 @@ protected BaseBindResp createBindResponse(BaseBind bindRequest, int statusCode)
}

return bindResponse;
}
}

protected void bindRequested(Long sessionId, SmppSessionConfiguration config, BaseBind bindRequest) throws SmppProcessingException {
counters.incrementBindRequestedAndGet();
Expand Down Expand Up @@ -349,28 +340,28 @@ protected void createSession(Long sessionId, Channel channel, SmppSessionConfigu
SmppSessionLogger loggingHandler = new SmppSessionLogger(DefaultSmppSession.class.getCanonicalName(), config.getLoggingOptions());
channel.getPipeline().addAfter(SmppChannelConstants.PIPELINE_SESSION_THREAD_RENAMER_NAME, SmppChannelConstants.PIPELINE_SESSION_LOGGER_NAME, loggingHandler);

// add a writeTimeout handler after the logger
if (config.getWriteTimeout() > 0) {
WriteTimeoutHandler writeTimeoutHandler = new WriteTimeoutHandler(writeTimeoutTimer, config.getWriteTimeout(), TimeUnit.MILLISECONDS);
channel.getPipeline().addAfter(SmppChannelConstants.PIPELINE_SESSION_LOGGER_NAME, SmppChannelConstants.PIPELINE_SESSION_WRITE_TIMEOUT_NAME, writeTimeoutHandler);
}
// add a writeTimeout handler after the logger
if (config.getWriteTimeout() > 0) {
WriteTimeoutHandler writeTimeoutHandler = new WriteTimeoutHandler(writeTimeoutTimer, config.getWriteTimeout(), TimeUnit.MILLISECONDS);
channel.getPipeline().addAfter(SmppChannelConstants.PIPELINE_SESSION_LOGGER_NAME, SmppChannelConstants.PIPELINE_SESSION_WRITE_TIMEOUT_NAME, writeTimeoutHandler);
}

// decoder in pipeline is ok (keep it)

// create a new wrapper around a session to pass the pdu up the chain
channel.getPipeline().remove(SmppChannelConstants.PIPELINE_SESSION_WRAPPER_NAME);
channel.getPipeline().addLast(SmppChannelConstants.PIPELINE_SESSION_WRAPPER_NAME, new SmppSessionWrapper(session));

// check if the # of channels exceeds maxConnections
if (this.channels.size() > this.configuration.getMaxConnectionSize()) {
logger.warn("The current connection size [{}] exceeds the configured max connection size [{}]", this.channels.size(), this.configuration.getMaxConnectionSize());
}

// session created, now pass it upstream
counters.incrementSessionCreatedAndGet();
incrementSessionSizeCounters(session);
this.serverHandler.sessionCreated(sessionId, session, preparedBindResponse);

// register this session as an mbean
if (configuration.isJmxEnabled()) {
session.registerMBean(configuration.getJmxDomain() + ":type=" + configuration.getName() + "Sessions,name=" + sessionId);
Expand All @@ -383,13 +374,13 @@ protected void destroySession(Long sessionId, DefaultSmppSession session) {
counters.incrementSessionDestroyedAndGet();
decrementSessionSizeCounters(session);
serverHandler.sessionDestroyed(sessionId, session);

// unregister this session as an mbean
if (configuration.isJmxEnabled()) {
session.unregisterMBean(configuration.getJmxDomain() + ":type=" + configuration.getName() + "Sessions,name=" + sessionId);
}
}

private void incrementSessionSizeCounters(DefaultSmppSession session) {
this.counters.incrementSessionSizeAndGet();
switch (session.getBindType()) {
Expand All @@ -404,7 +395,7 @@ private void incrementSessionSizeCounters(DefaultSmppSession session) {
break;
}
}

private void decrementSessionSizeCounters(DefaultSmppSession session) {
this.counters.decrementSessionSizeAndGet();
switch (session.getBindType()) {
Expand All @@ -421,32 +412,32 @@ private void decrementSessionSizeCounters(DefaultSmppSession session) {
}

// mainly for exposing via JMX

@Override
public void resetCounters() {
this.counters.reset();
}

@Override
public int getSessionSize() {
return this.counters.getSessionSize();
}

@Override
public int getTransceiverSessionSize() {
return this.counters.getTransceiverSessionSize();
}

@Override
public int getTransmitterSessionSize() {
return this.counters.getTransmitterSessionSize();
}

@Override
public int getReceiverSessionSize() {
return this.counters.getReceiverSessionSize();
}

@Override
public int getMaxConnectionSize() {
return this.configuration.getMaxConnectionSize();
Expand All @@ -466,7 +457,7 @@ public long getBindTimeout() {
public boolean isNonBlockingSocketsEnabled() {
return this.configuration.isNonBlockingSocketsEnabled();
}

@Override
public boolean isReuseAddress() {
return this.configuration.isReuseAddress();
Expand Down

0 comments on commit fb82730

Please sign in to comment.