Skip to content

Commit

Permalink
Re-initialise Netty worker group on plugin restart
Browse files Browse the repository at this point in the history
This allows the plugin to actually recover from exceptions after a
restart. It also has the side effect of providing nicer error messages
and clearer stack traces to the end user.

Fixes #289
  • Loading branch information
praseodym authored and robbavey committed Jun 4, 2018
1 parent f7a2978 commit f853ce6
Showing 1 changed file with 18 additions and 6 deletions.
24 changes: 18 additions & 6 deletions src/main/java/org/logstash/beats/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ public class Server {
private final static Logger logger = LogManager.getLogger(Server.class);

private final int port;
private final NioEventLoopGroup workGroup;
private final String host;
private final int beatsHeandlerThreadCount;
private NioEventLoopGroup workGroup;
private IMessageListener messageListener = new MessageListener();
private SslSimpleBuilder sslBuilder;
private BeatsInitializer beatsInitializer;
Expand All @@ -35,16 +35,24 @@ public Server(String host, int p, int timeout, int threadCount) {
port = p;
clientInactivityTimeoutSeconds = timeout;
beatsHeandlerThreadCount = threadCount;
workGroup = new NioEventLoopGroup();
}

public void enableSSL(SslSimpleBuilder builder) {
sslBuilder = builder;
}

public Server listen() throws InterruptedException {
if (workGroup != null) {
try {
logger.debug("Shutting down existing worker group before starting");
workGroup.shutdownGracefully().sync();
} catch (Exception e) {
logger.error("Could not shut down worker group before starting", e);
}
}
workGroup = new NioEventLoopGroup();
try {
logger.info("Starting server on port: " + this.port);
logger.info("Starting server on port: {}", this.port);

beatsInitializer = new BeatsInitializer(isSslEnable(), messageListener, clientInactivityTimeoutSeconds, beatsHeandlerThreadCount);

Expand All @@ -63,16 +71,20 @@ public Server listen() throws InterruptedException {
return this;
}

public void stop() throws InterruptedException {
public void stop() {
logger.debug("Server shutting down");
shutdown();
logger.debug("Server stopped");
}

private void shutdown(){
try {
workGroup.shutdownGracefully().sync();
beatsInitializer.shutdownEventExecutor();
if (workGroup != null) {
workGroup.shutdownGracefully().sync();
}
if (beatsInitializer != null) {
beatsInitializer.shutdownEventExecutor();
}
} catch (InterruptedException e){
throw new IllegalStateException(e);
}
Expand Down

0 comments on commit f853ce6

Please sign in to comment.