Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increased use of log4j2 readability. #329

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 17 additions & 24 deletions src/main/java/org/logstash/beats/BeatsHandler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package org.logstash.beats;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.AttributeKey;
Expand All @@ -10,7 +14,8 @@
import javax.net.ssl.SSLHandshakeException;

public class BeatsHandler extends SimpleChannelInboundHandler<Batch> {
private final static Logger logger = LogManager.getLogger(BeatsHandler.class);

private final static Logger logger = LogManager.getLogger();
private final IMessageListener messageListener;
private ChannelHandlerContext context;

Expand All @@ -22,33 +27,25 @@ public BeatsHandler(IMessageListener listener) {
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
context = ctx;
if (logger.isTraceEnabled()){
logger.trace(format("Channel Active"));
}
logger.trace("{}", () -> format("Channel Active"));
super.channelActive(ctx);
messageListener.onNewConnection(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
if (logger.isTraceEnabled()){
logger.trace(format("Channel Inactive"));
}
logger.trace("{}", () -> format("Channel Inactive"));
messageListener.onConnectionClose(ctx);
}


@Override
public void channelRead0(ChannelHandlerContext ctx, Batch batch) throws Exception {
if(logger.isDebugEnabled()) {
logger.debug(format("Received a new payload"));
}
logger.debug("{}", () -> format("Received a new payload"));
try {
for (Message message : batch) {
if (logger.isDebugEnabled()) {
logger.debug(format("Sending a new message for the listener, sequence: " + message.getSequence()));
}
logger.debug("{}", () -> format("Sending a new message for the listener, sequence: " + message.getSequence()));
messageListener.onNewMessage(ctx, message);

if (needAck(message)) {
Expand All @@ -58,9 +55,9 @@ public void channelRead0(ChannelHandlerContext ctx, Batch batch) throws Exceptio
}finally{
//this channel is done processing this payload, instruct the connection handler to stop sending TCP keep alive
ctx.channel().attr(ConnectionHandler.CHANNEL_SEND_KEEP_ALIVE).get().set(false);
if (logger.isDebugEnabled()) {
logger.debug("{}: batches pending: {}", ctx.channel().id().asShortText(),ctx.channel().attr(ConnectionHandler.CHANNEL_SEND_KEEP_ALIVE).get().get());
}
logger.debug("{}: batches pending: {}",
() -> ctx.channel().id().asShortText(),
() -> ctx.channel().attr(ConnectionHandler.CHANNEL_SEND_KEEP_ALIVE).get().get());
batch.release();
ctx.flush();
}
Expand All @@ -83,11 +80,9 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
}
String causeMessage = cause.getMessage() == null ? cause.getClass().toString() : cause.getMessage();

if (logger.isDebugEnabled()){
logger.debug(format("Handling exception: " + causeMessage), cause);
}
logger.info(format("Handling exception: " + causeMessage));
} finally{
logger.info("{}", () -> format("Handling exception: " + causeMessage));
logger.catching(Level.DEBUG, cause);
} finally {
super.exceptionCaught(ctx, cause);
ctx.flush();
ctx.close();
Expand All @@ -99,9 +94,7 @@ private boolean needAck(Message message) {
}

private void ack(ChannelHandlerContext ctx, Message message) {
if (logger.isTraceEnabled()){
logger.trace(format("Acking message number " + message.getSequence()));
}
logger.trace("{}", () -> format("Acking message number " + message.getSequence()));
writeAck(ctx, message.getBatch().getProtocol(), message.getSequence());
}

Expand Down
11 changes: 4 additions & 7 deletions src/main/java/org/logstash/beats/BeatsParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@


public class BeatsParser extends ByteToMessageDecoder {
private final static Logger logger = LogManager.getLogger(BeatsParser.class);

private static final Logger logger = LogManager.getLogger();

private Batch batch;

Expand Down Expand Up @@ -195,9 +196,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
logger.trace("Running: READ_JSON");
((V2Batch)batch).addMessage(sequence, in, requiredBytes);
if(batch.isComplete()) {
if(logger.isTraceEnabled()) {
logger.trace("Sending batch size: " + this.batch.size() + ", windowSize: " + batch.getBatchSize() + " , seq: " + sequence);
}
logger.trace("Sending batch size: {}, windowSize: {}, seq: {}", () -> batch.size(), () -> batch.getBatchSize() , () -> sequence);
out.add(batch);
batchComplete();
}
Expand All @@ -217,9 +216,7 @@ private void transition(States next) {
}

private void transition(States nextState, int requiredBytes) {
if(logger.isTraceEnabled()) {
logger.trace("Transition, from: " + currentState + ", to: " + nextState + ", requiring " + requiredBytes + " bytes");
}
logger.trace("{}", () -> "Transition, from: " + currentState + ", to: " + nextState + ", requiring " + requiredBytes + " bytes");
this.currentState = nextState;
this.requiredBytes = requiredBytes;
}
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/org/logstash/beats/ConnectionHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@
* Manages the connection state to the beats client.
*/
public class ConnectionHandler extends ChannelDuplexHandler {
private final static Logger logger = LogManager.getLogger(ConnectionHandler.class);

private final static Logger logger = LogManager.getLogger();

public static AttributeKey<AtomicBoolean> CHANNEL_SEND_KEEP_ALIVE = AttributeKey.valueOf("channel-send-keep-alive");

@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
ctx.channel().attr(CHANNEL_SEND_KEEP_ALIVE).set(new AtomicBoolean(false));
if (logger.isTraceEnabled()) {
logger.trace("{}: channel activated", ctx.channel().id().asShortText());
}
logger.trace("{}: channel activated", () -> ctx.channel().id().asShortText());
super.channelActive(ctx);
}

Expand All @@ -37,9 +36,9 @@ public void channelActive(final ChannelHandlerContext ctx) throws Exception {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.channel().attr(CHANNEL_SEND_KEEP_ALIVE).get().set(true);
if (logger.isDebugEnabled()) {
logger.debug("{}: batches pending: {}", ctx.channel().id().asShortText(),ctx.channel().attr(CHANNEL_SEND_KEEP_ALIVE).get().get());
}
logger.debug("{}: batches pending: {}",
() -> ctx.channel().id().asShortText(),
() -> ctx.channel().attr(CHANNEL_SEND_KEEP_ALIVE).get().get());
super.channelRead(ctx, msg);
}

Expand Down Expand Up @@ -80,7 +79,8 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
}
}
} else if (e.state() == IdleState.ALL_IDLE) {
logger.debug("{}: reader and writer are idle, closing remote connection", ctx.channel().id().asShortText());
logger.debug("{}: reader and writer are idle, closing remote connection",
() -> ctx.channel().id().asShortText());
ctx.flush();
ChannelFuture f = ctx.close();
if (logger.isTraceEnabled()) {
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/logstash/beats/MessageListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
*/
// This need to be implemented in Ruby
public class MessageListener implements IMessageListener {
private final static Logger logger = LogManager.getLogger(MessageListener.class);

private static final Logger logger = LogManager.getLogger();


/**
Expand Down
4 changes: 1 addition & 3 deletions src/main/java/org/logstash/beats/Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
public class Runner {
private static final int DEFAULT_PORT = 5044;

private final static Logger logger = LogManager.getLogger(Runner.class);


private static final Logger logger = LogManager.getLogger();

static public void main(String[] args) throws Exception {
logger.info("Starting Beats Bulk");
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/org/logstash/beats/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import java.security.cert.CertificateException;

public class Server {
private final static Logger logger = LogManager.getLogger(Server.class);

private static final Logger logger = LogManager.getLogger();

private final int port;
private final String host;
Expand Down Expand Up @@ -52,7 +53,7 @@ public Server listen() throws InterruptedException {
}
workGroup = new NioEventLoopGroup();
try {
logger.info("Starting server on port: {}", this.port);
logger.info("Starting server on port: {}", port);

beatsInitializer = new BeatsInitializer(isSslEnable(), messageListener, clientInactivityTimeoutSeconds, beatsHeandlerThreadCount);

Expand Down
20 changes: 8 additions & 12 deletions src/main/java/org/logstash/netty/SslSimpleBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public static enum SslClientVerifyMode {
VERIFY_PEER,
FORCE_PEER,
}
private final static Logger logger = LogManager.getLogger(SslSimpleBuilder.class);
private static final Logger logger = LogManager.getLogger();


private File sslKeyFile;
Expand Down Expand Up @@ -77,7 +77,7 @@ public SslSimpleBuilder setCipherSuites(String[] ciphersSuite) throws IllegalArg
if(!OpenSsl.isCipherSuiteAvailable(cipher)) {
throw new IllegalArgumentException("Cipher `" + cipher + "` is not available");
} else {
logger.debug("Cipher is supported: " + cipher);
logger.debug("Cipher is supported: {}", cipher);
}
}

Expand Down Expand Up @@ -111,25 +111,21 @@ public File getSslCertificateFile() {
public SslHandler build(ByteBufAllocator bufferAllocator) throws IOException, NoSuchAlgorithmException, CertificateException {
SslContextBuilder builder = SslContextBuilder.forServer(sslCertificateFile, sslKeyFile, passPhrase);

if(logger.isDebugEnabled())
logger.debug("Available ciphers:" + Arrays.toString(OpenSsl.availableOpenSslCipherSuites().toArray()));
logger.debug("Ciphers: " + Arrays.toString(ciphers));
logger.debug("Available ciphers: {}", () ->Arrays.toString(OpenSsl.availableOpenSslCipherSuites().toArray()));
logger.debug("Ciphers: {}", () -> Arrays.toString(ciphers));


builder.ciphers(Arrays.asList(ciphers));

if(requireClientAuth()) {
if (logger.isDebugEnabled())
logger.debug("Certificate Authorities: " + Arrays.toString(certificateAuthorities));

if (requireClientAuth()) {
logger.debug("Certificate Authorities: {}", () -> Arrays.toString(certificateAuthorities));
builder.trustManager(loadCertificateCollection(certificateAuthorities));
}

SslContext context = builder.build();
SslHandler sslHandler = context.newHandler(bufferAllocator);

if(logger.isDebugEnabled())
logger.debug("TLS: " + Arrays.toString(protocols));
logger.debug("TLS: {}", () -> Arrays.toString(protocols));

SSLEngine engine = sslHandler.engine();
engine.setEnabledProtocols(protocols);
Expand Down Expand Up @@ -162,7 +158,7 @@ private X509Certificate[] loadCertificateCollection(String[] certificates) throw
for(int i = 0; i < certificates.length; i++) {
String certificate = certificates[i];

logger.debug("Loading certificates from file " + certificate);
logger.debug("Loading certificates from file {}", ()-> certificate);

try(InputStream in = new FileInputStream(certificate)) {
List<X509Certificate> certificatesChains = (List<X509Certificate>) certificateFactory.generateCertificates(in);
Expand Down