diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index ff791866..bfaec368 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -21,9 +21,9 @@ public class Server { private final static Logger logger = LogManager.getLogger(Server.class); private final int port; - private final NioEventLoopGroup workGroup; private final String host; private final int beatsHeandlerThreadCount; + private NioEventLoopGroup workGroup; private IMessageListener messageListener = new MessageListener(); private SslSimpleBuilder sslBuilder; private BeatsInitializer beatsInitializer; @@ -35,7 +35,6 @@ public Server(String host, int p, int timeout, int threadCount) { port = p; clientInactivityTimeoutSeconds = timeout; beatsHeandlerThreadCount = threadCount; - workGroup = new NioEventLoopGroup(); } public void enableSSL(SslSimpleBuilder builder) { @@ -43,8 +42,17 @@ public void enableSSL(SslSimpleBuilder builder) { } public Server listen() throws InterruptedException { + if (workGroup != null) { + try { + logger.debug("Shutting down existing worker group before starting"); + workGroup.shutdownGracefully().sync(); + } catch (Exception e) { + logger.error("Could not shut down worker group before starting", e); + } + } + workGroup = new NioEventLoopGroup(); try { - logger.info("Starting server on port: " + this.port); + logger.info("Starting server on port: {}", this.port); beatsInitializer = new BeatsInitializer(isSslEnable(), messageListener, clientInactivityTimeoutSeconds, beatsHeandlerThreadCount); @@ -63,7 +71,7 @@ public Server listen() throws InterruptedException { return this; } - public void stop() throws InterruptedException { + public void stop() { logger.debug("Server shutting down"); shutdown(); logger.debug("Server stopped"); @@ -71,8 +79,12 @@ public void stop() throws InterruptedException { private void shutdown(){ try { - workGroup.shutdownGracefully().sync(); - beatsInitializer.shutdownEventExecutor(); + if (workGroup != null) { + workGroup.shutdownGracefully().sync(); + } + if (beatsInitializer != null) { + beatsInitializer.shutdownEventExecutor(); + } } catch (InterruptedException e){ throw new IllegalStateException(e); }