From 2382feec5165b6c13fe89ef31f99d26b91dbea04 Mon Sep 17 00:00:00 2001 From: andsel Date: Thu, 23 Jan 2025 10:58:18 +0100 Subject: [PATCH 01/11] Re-established existing tests with the addition of the encoding case transformation --- .../common/BufferedTokenizerExtTest.java | 104 ++++++++++++++++ ...BufferedTokenizerExtWithDelimiterTest.java | 66 +++++++++++ ...BufferedTokenizerExtWithSizeLimitTest.java | 111 ++++++++++++++++++ 3 files changed, 281 insertions(+) create mode 100644 logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java create mode 100644 logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java create mode 100644 logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java new file mode 100644 index 00000000000..91e8f6d45c6 --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.common; + +import org.jruby.RubyArray; +import org.jruby.RubyString; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.junit.Before; +import org.junit.Test; +import org.logstash.RubyTestBase; +import org.logstash.RubyUtil; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.logstash.RubyUtil.RUBY; + +@SuppressWarnings("unchecked") +public final class BufferedTokenizerExtTest extends RubyTestBase { + + private BufferedTokenizerExt sut; + private ThreadContext context; + + @Before + public void setUp() { + sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); + context = RUBY.getCurrentContext(); + IRubyObject[] args = {}; + sut.init(context, args); + } + + @Test + public void shouldTokenizeASingleToken() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\n")); + + assertEquals(List.of("foo"), tokens); + } + + @Test + public void shouldMergeMultipleToken() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo")); + assertTrue(tokens.isEmpty()); + + tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("bar\n")); + assertEquals(List.of("foobar"), tokens); + } + + @Test + public void shouldTokenizeMultipleToken() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar\n")); + + assertEquals(List.of("foo", "bar"), tokens); + } + + @Test + public void shouldIgnoreEmptyPayload() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("")); + assertTrue(tokens.isEmpty()); + + tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar")); + assertEquals(List.of("foo"), tokens); + } + + @Test + public void shouldTokenizeEmptyPayloadWithNewline() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("\n")); + assertEquals(List.of(""), tokens); + + tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("\n\n\n")); + assertEquals(List.of("", "", ""), tokens); + } + + @Test + public void shouldNotChangeEncodingOfTokensAfterPartitioning() { + RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3}); // £ character + IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); +// RubyArray tokens = (RubyArray) sut.extract(context, rubyInput); + sut.extract(context, rubyInput); + + IRubyObject token = sut.flush(context); + +// assertEquals(List.of("£"), tokens); + assertEquals(rubyInput, token); + } +} \ No newline at end of file diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java new file mode 100644 index 00000000000..19872e66c3c --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.common; + +import org.jruby.RubyArray; +import org.jruby.RubyString; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.junit.Before; +import org.junit.Test; +import org.logstash.RubyTestBase; +import org.logstash.RubyUtil; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.logstash.RubyUtil.RUBY; + +@SuppressWarnings("unchecked") +public final class BufferedTokenizerExtWithDelimiterTest extends RubyTestBase { + + private BufferedTokenizerExt sut; + private ThreadContext context; + + @Before + public void setUp() { + sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); + context = RUBY.getCurrentContext(); + IRubyObject[] args = {RubyUtil.RUBY.newString("||")}; + sut.init(context, args); + } + + @Test + public void shouldTokenizeMultipleToken() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo||b|r||")); + + assertEquals(List.of("foo", "b|r"), tokens); + } + + @Test + public void shouldIgnoreEmptyPayload() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("")); + assertTrue(tokens.isEmpty()); + + tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo||bar")); + assertEquals(List.of("foo"), tokens); + } +} diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java new file mode 100644 index 00000000000..9a07242369d --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.common; + +import org.jruby.RubyArray; +import org.jruby.RubyString; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.junit.Before; +import org.junit.Test; +import org.logstash.RubyTestBase; +import org.logstash.RubyUtil; + +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.*; +import static org.logstash.RubyUtil.RUBY; + +@SuppressWarnings("unchecked") +public final class BufferedTokenizerExtWithSizeLimitTest extends RubyTestBase { + + private BufferedTokenizerExt sut; + private ThreadContext context; + + @Before + public void setUp() { + sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); + context = RUBY.getCurrentContext(); + IRubyObject[] args = {RubyUtil.RUBY.newString("\n"), RubyUtil.RUBY.newFixnum(10)}; + sut.init(context, args); + } + + @Test + public void givenTokenWithinSizeLimitWhenExtractedThenReturnTokens() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar\n")); + + assertEquals(List.of("foo", "bar"), tokens); + } + + @Test + public void givenTokenExceedingSizeLimitWhenExtractedThenThrowsAnError() { + Exception thrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract(context, RubyUtil.RUBY.newString("this_is_longer_than_10\nkaboom")); + }); + assertThat(thrownException.getMessage(), containsString("input buffer full")); + } + + @Test + public void givenExtractedThrownLimitErrorWhenFeedFreshDataThenReturnTokenStartingFromEndOfOffendingToken() { + Exception thrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract(context, RubyUtil.RUBY.newString("this_is_longer_than_10\nkaboom")); + }); + assertThat(thrownException.getMessage(), containsString("input buffer full")); + + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("\nanother")); + assertEquals("After buffer full error should resume from the end of line", List.of("kaboom"), tokens); + } + + @Test + public void givenExtractInvokedWithDifferentFramingAfterBufferFullErrorTWhenFeedFreshDataThenReturnTokenStartingFromEndOfOffendingToken() { + sut.extract(context, RubyUtil.RUBY.newString("aaaa")); + + Exception thrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract(context, RubyUtil.RUBY.newString("aaaaaaa")); + }); + assertThat(thrownException.getMessage(), containsString("input buffer full")); + + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("aa\nbbbb\nccc")); + assertEquals(List.of("bbbb"), tokens); + } + + @Test + public void giveMultipleSegmentsThatGeneratesMultipleBufferFullErrorsThenIsAbleToRecoverTokenization() { + sut.extract(context, RubyUtil.RUBY.newString("aaaa")); + + //first buffer full on 13 "a" letters + Exception thrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract(context, RubyUtil.RUBY.newString("aaaaaaa")); + }); + assertThat(thrownException.getMessage(), containsString("input buffer full")); + + // second buffer full on 11 "b" letters + Exception secondThrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract(context, RubyUtil.RUBY.newString("aa\nbbbbbbbbbbb\ncc")); + }); + assertThat(secondThrownException.getMessage(), containsString("input buffer full")); + + // now should resemble processing on c and d + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("ccc\nddd\n")); + assertEquals(List.of("ccccc", "ddd"), tokens); + } +} \ No newline at end of file From 339d6455aeebbb172f2255cb14acae7c8d2af1eb Mon Sep 17 00:00:00 2001 From: andsel Date: Thu, 23 Jan 2025 17:13:58 +0100 Subject: [PATCH 02/11] Fixed the test to compare Java strings byte rappresentation so that has the same encoding --- .../java/org/logstash/common/BufferedTokenizerExtTest.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java index 91e8f6d45c6..51d10dba88b 100644 --- a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java @@ -93,12 +93,10 @@ public void shouldTokenizeEmptyPayloadWithNewline() { public void shouldNotChangeEncodingOfTokensAfterPartitioning() { RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3}); // £ character IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); -// RubyArray tokens = (RubyArray) sut.extract(context, rubyInput); sut.extract(context, rubyInput); IRubyObject token = sut.flush(context); -// assertEquals(List.of("£"), tokens); - assertEquals(rubyInput, token); + assertEquals((byte) 0xA3, token.asJavaString().getBytes()[0]); } } \ No newline at end of file From 0a910a26698a0325697630261f73aac1d522fa68 Mon Sep 17 00:00:00 2001 From: andsel Date: Thu, 23 Jan 2025 17:17:55 +0100 Subject: [PATCH 03/11] Re-imported previously reverted code --- .../logstash/common/BufferedTokenizerExt.java | 65 +++++++++++++++---- 1 file changed, 54 insertions(+), 11 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java index be1c64d2356..0d285f936ca 100644 --- a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java +++ b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java @@ -40,10 +40,12 @@ public class BufferedTokenizerExt extends RubyObject { freeze(RubyUtil.RUBY.getCurrentContext()); private @SuppressWarnings("rawtypes") RubyArray input = RubyUtil.RUBY.newArray(); + private StringBuilder headToken = new StringBuilder(); private RubyString delimiter = NEW_LINE; private int sizeLimit; private boolean hasSizeLimit; private int inputSize; + private boolean bufferFullErrorNotified = false; public BufferedTokenizerExt(final Ruby runtime, final RubyClass metaClass) { super(runtime, metaClass); @@ -81,22 +83,63 @@ public IRubyObject init(final ThreadContext context, IRubyObject[] args) { @SuppressWarnings("rawtypes") public RubyArray extract(final ThreadContext context, IRubyObject data) { final RubyArray entities = data.convertToString().split(delimiter, -1); + if (!bufferFullErrorNotified) { + input.clear(); + input.addAll(entities); + } else { + // after a full buffer signal + if (input.isEmpty()) { + // after a buffer full error, the remaining part of the line, till next delimiter, + // has to be consumed, unless the input buffer doesn't still contain fragments of + // subsequent tokens. + entities.shift(context); + input.addAll(entities); + } else { + // merge last of the input with first of incoming data segment + if (!entities.isEmpty()) { + RubyString last = ((RubyString) input.pop(context)); + RubyString nextFirst = ((RubyString) entities.shift(context)); + entities.unshift(last.concat(nextFirst)); + input.addAll(entities); + } + } + } + if (hasSizeLimit) { - final int entitiesSize = ((RubyString) entities.first()).size(); + if (bufferFullErrorNotified) { + bufferFullErrorNotified = false; + if (input.isEmpty()) { + return RubyUtil.RUBY.newArray(); + } + } + final int entitiesSize = ((RubyString) input.first()).size(); if (inputSize + entitiesSize > sizeLimit) { + bufferFullErrorNotified = true; + headToken = new StringBuilder(); + inputSize = 0; + input.shift(context); // consume the token fragment that generates the buffer full throw new IllegalStateException("input buffer full"); } this.inputSize = inputSize + entitiesSize; } - input.append(entities.shift(context)); - if (entities.isEmpty()) { + + if (input.getLength() < 2) { + // this is a specialization case which avoid adding and removing from input accumulator + // when it contains just one element + headToken.append(input.shift(context)); // remove head return RubyUtil.RUBY.newArray(); } - entities.unshift(input.join(context)); - input.clear(); - input.append(entities.pop(context)); - inputSize = ((RubyString) input.first()).size(); - return entities; + + if (headToken.length() > 0) { + // if there is a pending token part, merge it with the first token segment present + // in the accumulator, and clean the pending token part. + headToken.append(input.shift(context)); // append buffer to first element and + input.unshift(RubyUtil.toRubyObject(headToken.toString())); // reinsert it into the array + headToken = new StringBuilder(); + } + headToken.append(input.pop(context)); // put the leftovers in headToken for later + inputSize = headToken.length(); + return input; } /** @@ -108,15 +151,15 @@ public RubyArray extract(final ThreadContext context, IRubyObject data) { */ @JRubyMethod public IRubyObject flush(final ThreadContext context) { - final IRubyObject buffer = input.join(context); - input.clear(); + final IRubyObject buffer = RubyUtil.toRubyObject(headToken.toString()); + headToken = new StringBuilder(); inputSize = 0; return buffer; } @JRubyMethod(name = "empty?") public IRubyObject isEmpty(final ThreadContext context) { - return RubyUtil.RUBY.newBoolean(input.isEmpty() && (inputSize == 0)); + return RubyUtil.RUBY.newBoolean(headToken.toString().isEmpty() && (inputSize == 0)); } } From 33dc6b19c1b1c8b07f588c1fb2c3204c8051c88a Mon Sep 17 00:00:00 2001 From: andsel Date: Tue, 28 Jan 2025 09:48:31 +0100 Subject: [PATCH 04/11] Try to make the test reporduce the encoding problem --- .../logstash/common/BufferedTokenizerExtTest.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java index 51d10dba88b..c62f1b49ab9 100644 --- a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java @@ -91,12 +91,16 @@ public void shouldTokenizeEmptyPayloadWithNewline() { @Test public void shouldNotChangeEncodingOfTokensAfterPartitioning() { - RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3}); // £ character +// RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3}); // £ character + RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x0A, 0x41}); // £ character, newline, A IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); - sut.extract(context, rubyInput); + RubyArray tokens = (RubyArray)sut.extract(context, rubyInput); + IRubyObject firstToken = tokens.shift(context); - IRubyObject token = sut.flush(context); +// IRubyObject token = sut.flush(context); - assertEquals((byte) 0xA3, token.asJavaString().getBytes()[0]); +// assertEquals((byte) 0xA3, token.asJavaString().getBytes()[0]); +// assertEquals("£", token.asJavaString()); + assertEquals("£", firstToken.toString()); } } \ No newline at end of file From 674d377da31cdf59997645d53c9a23df5355aa5a Mon Sep 17 00:00:00 2001 From: andsel Date: Tue, 28 Jan 2025 12:18:11 +0100 Subject: [PATCH 05/11] Fixed test to verify the case of encoding preservation --- .../logstash/common/BufferedTokenizerExtTest.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java index c62f1b49ab9..3cbde7ddc3f 100644 --- a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java @@ -20,6 +20,7 @@ package org.logstash.common; import org.jruby.RubyArray; +import org.jruby.RubyEncoding; import org.jruby.RubyString; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; @@ -91,16 +92,16 @@ public void shouldTokenizeEmptyPayloadWithNewline() { @Test public void shouldNotChangeEncodingOfTokensAfterPartitioning() { -// RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3}); // £ character RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x0A, 0x41}); // £ character, newline, A IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); RubyArray tokens = (RubyArray)sut.extract(context, rubyInput); - IRubyObject firstToken = tokens.shift(context); - -// IRubyObject token = sut.flush(context); -// assertEquals((byte) 0xA3, token.asJavaString().getBytes()[0]); -// assertEquals("£", token.asJavaString()); + // read the first token, the £ string + IRubyObject firstToken = tokens.shift(context); assertEquals("£", firstToken.toString()); + + // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion + RubyEncoding encoding = (RubyEncoding) firstToken.callMethod(context, "encoding"); + assertEquals("ISO8859-1", encoding.toString()); } } \ No newline at end of file From 5af3a6243c926d3e78580d50fa042abf00500d78 Mon Sep 17 00:00:00 2001 From: andsel Date: Tue, 28 Jan 2025 13:10:23 +0100 Subject: [PATCH 06/11] Switch from RubyArray addAll to concat method to preserve the encoding and avoid implicit deconding in addAll iterator --- .../main/java/org/logstash/common/BufferedTokenizerExt.java | 6 +++--- .../java/org/logstash/common/BufferedTokenizerExtTest.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java index 0d285f936ca..0b21d60389b 100644 --- a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java +++ b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java @@ -85,7 +85,7 @@ public RubyArray extract(final ThreadContext context, IRubyObject data) { final RubyArray entities = data.convertToString().split(delimiter, -1); if (!bufferFullErrorNotified) { input.clear(); - input.addAll(entities); + input.concat(entities); } else { // after a full buffer signal if (input.isEmpty()) { @@ -93,14 +93,14 @@ public RubyArray extract(final ThreadContext context, IRubyObject data) { // has to be consumed, unless the input buffer doesn't still contain fragments of // subsequent tokens. entities.shift(context); - input.addAll(entities); + input.concat(entities); } else { // merge last of the input with first of incoming data segment if (!entities.isEmpty()) { RubyString last = ((RubyString) input.pop(context)); RubyString nextFirst = ((RubyString) entities.shift(context)); entities.unshift(last.concat(nextFirst)); - input.addAll(entities); + input.concat(entities); } } } diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java index 3cbde7ddc3f..218cf769689 100644 --- a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java @@ -102,6 +102,6 @@ public void shouldNotChangeEncodingOfTokensAfterPartitioning() { // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion RubyEncoding encoding = (RubyEncoding) firstToken.callMethod(context, "encoding"); - assertEquals("ISO8859-1", encoding.toString()); + assertEquals("ISO-8859-1", encoding.toString()); } } \ No newline at end of file From 0f76bd02beee1e4942e4ce6c3e31896c9d654526 Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 29 Jan 2025 18:14:21 +0100 Subject: [PATCH 07/11] Covered with more use cases and verifies encoding of the returned Ruby strings --- .../common/BufferedTokenizerExtTest.java | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java index 218cf769689..3fc767ace60 100644 --- a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java @@ -104,4 +104,44 @@ public void shouldNotChangeEncodingOfTokensAfterPartitioning() { RubyEncoding encoding = (RubyEncoding) firstToken.callMethod(context, "encoding"); assertEquals("ISO-8859-1", encoding.toString()); } + + @Test + public void shouldNotChangeEncodingOfTokensAfterPartitioningInCaseMultipleExtractionInInvoked() { + RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3}); // £ character + IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); + sut.extract(context, rubyInput); + IRubyObject capitalAInLatin1 = RubyString.newString(RUBY, new byte[]{(byte) 0x41}) + .force_encoding(context, RUBY.newString("ISO8859-1")); + RubyArray tokens = (RubyArray)sut.extract(context, capitalAInLatin1); + assertTrue(tokens.isEmpty()); + + tokens = (RubyArray)sut.extract(context, RubyString.newString(RUBY, new byte[]{(byte) 0x0A})); + + // read the first token, the £ string + IRubyObject firstToken = tokens.shift(context); + assertEquals("£A", firstToken.toString()); + + // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion + RubyEncoding encoding = (RubyEncoding) firstToken.callMethod(context, "encoding"); + assertEquals("ISO-8859-1", encoding.toString()); + } + + @Test + public void shouldNotChangeEncodingOfTokensAfterPartitioningWhenRetrieveLastFlushedToken() { + RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x0A, 0x41}); // £ character, newline, A + IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); + RubyArray tokens = (RubyArray)sut.extract(context, rubyInput); + + // read the first token, the £ string + IRubyObject firstToken = tokens.shift(context); + assertEquals("£", firstToken.toString()); + + // flush and check that the remaining A is still encoded in ISO8859-1 + IRubyObject lastToken = sut.flush(context); + assertEquals("A", lastToken.toString()); + + // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion + RubyEncoding encoding = (RubyEncoding) lastToken.callMethod(context, "encoding"); + assertEquals("ISO-8859-1", encoding.toString()); + } } \ No newline at end of file From a5d6bef689006e74e80ecc19fdedae55877cde42 Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 29 Jan 2025 19:14:20 +0100 Subject: [PATCH 08/11] Updates the point of return Java String to return the one encoded with data input encoding, to do not change encoding --- .../logstash/common/BufferedTokenizerExt.java | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java index 0b21d60389b..e1c465f0995 100644 --- a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java +++ b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java @@ -20,17 +20,17 @@ package org.logstash.common; -import org.jruby.Ruby; -import org.jruby.RubyArray; -import org.jruby.RubyClass; -import org.jruby.RubyObject; -import org.jruby.RubyString; +import org.jruby.*; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; +import org.jruby.util.ByteList; import org.logstash.RubyUtil; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + @JRubyClass(name = "BufferedTokenizer") public class BufferedTokenizerExt extends RubyObject { @@ -46,6 +46,7 @@ public class BufferedTokenizerExt extends RubyObject { private boolean hasSizeLimit; private int inputSize; private boolean bufferFullErrorNotified = false; + private String encodingName; public BufferedTokenizerExt(final Ruby runtime, final RubyClass metaClass) { super(runtime, metaClass); @@ -82,6 +83,8 @@ public IRubyObject init(final ThreadContext context, IRubyObject[] args) { @JRubyMethod @SuppressWarnings("rawtypes") public RubyArray extract(final ThreadContext context, IRubyObject data) { + RubyEncoding encoding = (RubyEncoding) data.convertToString().encoding(context); + encodingName = encoding.getEncoding().getCharsetName(); final RubyArray entities = data.convertToString().split(delimiter, -1); if (!bufferFullErrorNotified) { input.clear(); @@ -134,7 +137,10 @@ public RubyArray extract(final ThreadContext context, IRubyObject data) { // if there is a pending token part, merge it with the first token segment present // in the accumulator, and clean the pending token part. headToken.append(input.shift(context)); // append buffer to first element and - input.unshift(RubyUtil.toRubyObject(headToken.toString())); // reinsert it into the array + // create new RubyString with the data specified encoding + RubyString encodedHeadToken = RubyUtil.RUBY.newString(new ByteList(headToken.toString().getBytes(Charset.forName(encodingName)))); + encodedHeadToken.force_encoding(context, RubyUtil.RUBY.newString(encodingName)); + input.unshift(encodedHeadToken); // reinsert it into the array headToken = new StringBuilder(); } headToken.append(input.pop(context)); // put the leftovers in headToken for later @@ -154,7 +160,12 @@ public IRubyObject flush(final ThreadContext context) { final IRubyObject buffer = RubyUtil.toRubyObject(headToken.toString()); headToken = new StringBuilder(); inputSize = 0; - return buffer; + + // create new RubyString with the last data specified encoding + RubyString encodedHeadToken = RubyUtil.RUBY.newString(new ByteList(buffer.toString().getBytes(Charset.forName(encodingName)))); + encodedHeadToken.force_encoding(context, RubyUtil.RUBY.newString(encodingName)); + + return encodedHeadToken; } @JRubyMethod(name = "empty?") From b42ca05e325cfed5ebc335e42ec23d74c4daa654 Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 31 Jan 2025 12:46:43 +0100 Subject: [PATCH 09/11] Minor, removed unused import --- .../src/main/java/org/logstash/common/BufferedTokenizerExt.java | 1 - 1 file changed, 1 deletion(-) diff --git a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java index e1c465f0995..41e448f7d84 100644 --- a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java +++ b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java @@ -29,7 +29,6 @@ import org.logstash.RubyUtil; import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; @JRubyClass(name = "BufferedTokenizer") public class BufferedTokenizerExt extends RubyObject { From 015ba432154e0ad2994690a23d6da555a00bf642 Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 3 Feb 2025 15:52:01 +0100 Subject: [PATCH 10/11] Force UTF8 for empty string when flush is invoked without a previous extract call --- .../logstash/common/BufferedTokenizerExt.java | 17 ++++++++++++++--- .../common/BufferedTokenizerExtTest.java | 14 ++++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java index 41e448f7d84..63aa3c2300b 100644 --- a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java +++ b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java @@ -160,9 +160,20 @@ public IRubyObject flush(final ThreadContext context) { headToken = new StringBuilder(); inputSize = 0; - // create new RubyString with the last data specified encoding - RubyString encodedHeadToken = RubyUtil.RUBY.newString(new ByteList(buffer.toString().getBytes(Charset.forName(encodingName)))); - encodedHeadToken.force_encoding(context, RubyUtil.RUBY.newString(encodingName)); + // create new RubyString with the last data specified encoding, if exists + RubyString encodedHeadToken; + if (encodingName != null) { + encodedHeadToken = RubyUtil.RUBY.newString(new ByteList(buffer.toString().getBytes(Charset.forName(encodingName)))); + encodedHeadToken.force_encoding(context, RubyUtil.RUBY.newString(encodingName)); + } else { + // When used with TCP input it could be that on socket connection the flush method + // is invoked while no invocation of extract, leaving the encoding name unassigned. + // In such case also the headToken must be empty + if (!buffer.toString().isEmpty()) { + throw new IllegalStateException("invoked flush with unassigned encoding but not empty head token, this shouldn't happen"); + } + encodedHeadToken = (RubyString) buffer; + } return encodedHeadToken; } diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java index 3fc767ace60..524abb36ed5 100644 --- a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java @@ -144,4 +144,18 @@ public void shouldNotChangeEncodingOfTokensAfterPartitioningWhenRetrieveLastFlus RubyEncoding encoding = (RubyEncoding) lastToken.callMethod(context, "encoding"); assertEquals("ISO-8859-1", encoding.toString()); } + + @Test + public void givenDirectFlushInvocationUTF8EncodingIsApplied() { + RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x41}); // £ character, A + IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); + + // flush and check that the remaining A is still encoded in ISO8859-1 + IRubyObject lastToken = sut.flush(context); + assertEquals("", lastToken.toString()); + + // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion + RubyEncoding encoding = (RubyEncoding) lastToken.callMethod(context, "encoding"); + assertEquals("UTF-8", encoding.toString()); + } } \ No newline at end of file From 727e3b1b073aa090c8430b6e87a85affb6155f68 Mon Sep 17 00:00:00 2001 From: andsel Date: Tue, 4 Feb 2025 10:47:20 +0100 Subject: [PATCH 11/11] Addressed some concerns raised in PR review - extracted common code used in string encoding - avoid full packaeg import - better execption message with details on limit exceeded --- .../logstash/common/BufferedTokenizerExt.java | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java index 63aa3c2300b..e2c476520c1 100644 --- a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java +++ b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java @@ -20,7 +20,12 @@ package org.logstash.common; -import org.jruby.*; +import org.jruby.Ruby; +import org.jruby.RubyArray; +import org.jruby.RubyClass; +import org.jruby.RubyEncoding; +import org.jruby.RubyObject; +import org.jruby.RubyString; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.runtime.ThreadContext; @@ -118,9 +123,10 @@ public RubyArray extract(final ThreadContext context, IRubyObject data) { if (inputSize + entitiesSize > sizeLimit) { bufferFullErrorNotified = true; headToken = new StringBuilder(); + String errorMessage = String.format("input buffer full, consumed token which exceeded the sizeLimit %d; inputSize: %d, entitiesSize %d", sizeLimit, inputSize, entitiesSize); inputSize = 0; input.shift(context); // consume the token fragment that generates the buffer full - throw new IllegalStateException("input buffer full"); + throw new IllegalStateException(errorMessage); } this.inputSize = inputSize + entitiesSize; } @@ -137,8 +143,7 @@ public RubyArray extract(final ThreadContext context, IRubyObject data) { // in the accumulator, and clean the pending token part. headToken.append(input.shift(context)); // append buffer to first element and // create new RubyString with the data specified encoding - RubyString encodedHeadToken = RubyUtil.RUBY.newString(new ByteList(headToken.toString().getBytes(Charset.forName(encodingName)))); - encodedHeadToken.force_encoding(context, RubyUtil.RUBY.newString(encodingName)); + RubyString encodedHeadToken = toEncodedRubyString(context, headToken.toString()); input.unshift(encodedHeadToken); // reinsert it into the array headToken = new StringBuilder(); } @@ -147,6 +152,13 @@ public RubyArray extract(final ThreadContext context, IRubyObject data) { return input; } + private RubyString toEncodedRubyString(ThreadContext context, String input) { + // Depends on the encodingName being set by the extract method, could potentially raise if not set. + RubyString result = RubyUtil.RUBY.newString(new ByteList(input.getBytes(Charset.forName(encodingName)))); + result.force_encoding(context, RubyUtil.RUBY.newString(encodingName)); + return result; + } + /** * Flush the contents of the input buffer, i.e. return the input buffer even though * a token has not yet been encountered @@ -163,8 +175,7 @@ public IRubyObject flush(final ThreadContext context) { // create new RubyString with the last data specified encoding, if exists RubyString encodedHeadToken; if (encodingName != null) { - encodedHeadToken = RubyUtil.RUBY.newString(new ByteList(buffer.toString().getBytes(Charset.forName(encodingName)))); - encodedHeadToken.force_encoding(context, RubyUtil.RUBY.newString(encodingName)); + encodedHeadToken = toEncodedRubyString(context, buffer.toString()); } else { // When used with TCP input it could be that on socket connection the flush method // is invoked while no invocation of extract, leaving the encoding name unassigned.