Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]read beats payload in javaheap chunked #491

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Moved read of compressed payload to heap and by chunks
andsel committed Jan 29, 2024
commit 279a071b40b4daf2b5e383d6f294ba9afbebcb97
59 changes: 52 additions & 7 deletions src/main/java/org/logstash/beats/BeatsParser.java
Original file line number Diff line number Diff line change
@@ -21,6 +21,8 @@
public class BeatsParser extends ByteToMessageDecoder {
private final static Logger logger = LogManager.getLogger(BeatsParser.class);

private static final int CHUNK_SIZE = 64 * 1024; // chuck size of compressed data to be read.

private Batch batch;

private enum States {
@@ -30,6 +32,7 @@ private enum States {
READ_JSON_HEADER(8),
READ_COMPRESSED_FRAME_HEADER(4),
READ_COMPRESSED_FRAME(-1), // -1 means the length to read is variable and defined in the frame itself.
READ_COMPRESSED_FRAME_JAVA_HEAP(-1), // -1 means the length to read is variable and defined in the frame itself.
READ_JSON(-1),
READ_DATA_FIELDS(-1);

@@ -45,6 +48,10 @@ private enum States {
private int requiredBytes = 0;
private int sequence = 0;
private boolean decodingCompressedBuffer = false;
private ByteBuf compressedAccumulator;
private int compressedReadBytes; // count of bytes actually read
private int compressedSize; // total size of compressed payload


@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws InvalidFrameProtocolException, IOException {
@@ -176,13 +183,18 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
case READ_COMPRESSED_FRAME_HEADER: {
logger.trace("Running: READ_COMPRESSED_FRAME_HEADER");

transition(States.READ_COMPRESSED_FRAME, in.readInt());
compressedSize = in.readInt();
compressedReadBytes = 0;
compressedAccumulator = ctx.alloc().heapBuffer(compressedSize);
// read compressed payload at most in chuck of 64Kb and aggregate in Java heap
int bytesToRead = Math.min(compressedSize, CHUNK_SIZE);
transition(States.READ_COMPRESSED_FRAME_JAVA_HEAP, bytesToRead);
break;
}

case READ_COMPRESSED_FRAME: {
logger.trace("Running: READ_COMPRESSED_FRAME");
inflateCompressedFrame(ctx, in, (buffer) -> {
inflateCompressedFrame(ctx, in, requiredBytes, (buffer) -> {
transition(States.READ_HEADER);

decodingCompressedBuffer = true;
@@ -197,6 +209,39 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
});
break;
}
case READ_COMPRESSED_FRAME_JAVA_HEAP: {
logger.trace("Running: READ_COMPRESSED_FRAME_JAVA_HEAP");
// accumulate in heap
int missedBytes = compressedSize - compressedReadBytes;
int readBytes = Math.min(in.readableBytes(), missedBytes);
in.readBytes(compressedAccumulator, readBytes);
compressedReadBytes += readBytes;

if (compressedReadBytes == compressedSize) {
logger.debug("Finished to accumulate");
// inflate compressedAccumulator in heap
inflateCompressedFrame(ctx, compressedAccumulator, compressedSize, (buffer) -> {
transition(States.READ_HEADER);
compressedSize = -1;
compressedReadBytes = -1;
compressedAccumulator.release();

decodingCompressedBuffer = true;
try {
while (buffer.readableBytes() > 0) {
decode(ctx, buffer, out);
}
} finally {
decodingCompressedBuffer = false;
transition(States.READ_HEADER);
}
});
} else {
logger.debug("Read next chunk");
transition(States.READ_COMPRESSED_FRAME_JAVA_HEAP, CHUNK_SIZE);
}
break;
}
case READ_JSON: {
logger.trace("Running: READ_JSON");
((V2Batch)batch).addMessage(sequence, in, requiredBytes);
@@ -214,25 +259,25 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
}
}

private void inflateCompressedFrame(final ChannelHandlerContext ctx, final ByteBuf in, final CheckedConsumer<ByteBuf> fn)
private void inflateCompressedFrame(final ChannelHandlerContext ctx, final ByteBuf in, int deflatedSize, final CheckedConsumer<ByteBuf> fn)
throws IOException {
// Use the compressed size as the safe start for the buffer.
ByteBuf buffer = ctx.alloc().buffer(requiredBytes);
ByteBuf buffer = ctx.alloc().heapBuffer(deflatedSize);
try {
decompressImpl(in, buffer);
decompressImpl(in, buffer, deflatedSize);
fn.accept(buffer);
} finally {
buffer.release();
}
}

private void decompressImpl(final ByteBuf in, final ByteBuf out) throws IOException {
private void decompressImpl(final ByteBuf in, final ByteBuf out, int deflatedSize) throws IOException {
Inflater inflater = new Inflater();
try (
ByteBufOutputStream buffOutput = new ByteBufOutputStream(out);
InflaterOutputStream inflaterStream = new InflaterOutputStream(buffOutput, inflater)
) {
in.readBytes(inflaterStream, requiredBytes);
in.readBytes(inflaterStream, deflatedSize);
} finally {
inflater.end();
}
4 changes: 2 additions & 2 deletions src/test/java/org/logstash/beats/BeatsParserTest.java
Original file line number Diff line number Diff line change
@@ -39,7 +39,7 @@ public void setup() throws Exception{
this.v1Batch = new V1Batch();

for(int i = 1; i <= numberOfMessage; i++) {
Map map = new HashMap<String, String>();
Map<String, String> map = new HashMap<>();
map.put("line", "Another world");
map.put("from", "Little big Adventure");

@@ -50,7 +50,7 @@ public void setup() throws Exception{
this.byteBufBatch = new V2Batch();

for(int i = 1; i <= numberOfMessage; i++) {
Map map = new HashMap<String, String>();
Map<String, String> map = new HashMap<>();
map.put("line", "Another world");
map.put("from", "Little big Adventure");
ByteBuf bytebuf = Unpooled.wrappedBuffer(MAPPER.writeValueAsBytes(map));