Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix BufferedTokenizer to properly resume after a buffer full condition respecting the encoding of the input string #16968

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@

package org.logstash.common;

import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyClass;
import org.jruby.RubyObject;
import org.jruby.RubyString;
import org.jruby.*;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.util.ByteList;
import org.logstash.RubyUtil;

import java.nio.charset.Charset;

@JRubyClass(name = "BufferedTokenizer")
public class BufferedTokenizerExt extends RubyObject {

Expand All @@ -40,10 +39,13 @@ public class BufferedTokenizerExt extends RubyObject {
freeze(RubyUtil.RUBY.getCurrentContext());

private @SuppressWarnings("rawtypes") RubyArray input = RubyUtil.RUBY.newArray();
private StringBuilder headToken = new StringBuilder();
private RubyString delimiter = NEW_LINE;
private int sizeLimit;
private boolean hasSizeLimit;
private int inputSize;
private boolean bufferFullErrorNotified = false;
private String encodingName;

public BufferedTokenizerExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
Expand Down Expand Up @@ -80,23 +82,69 @@ public IRubyObject init(final ThreadContext context, IRubyObject[] args) {
@JRubyMethod
@SuppressWarnings("rawtypes")
public RubyArray extract(final ThreadContext context, IRubyObject data) {
RubyEncoding encoding = (RubyEncoding) data.convertToString().encoding(context);
encodingName = encoding.getEncoding().getCharsetName();
final RubyArray entities = data.convertToString().split(delimiter, -1);
if (!bufferFullErrorNotified) {
input.clear();
input.concat(entities);
} else {
// after a full buffer signal
if (input.isEmpty()) {
// after a buffer full error, the remaining part of the line, till next delimiter,
// has to be consumed, unless the input buffer doesn't still contain fragments of
// subsequent tokens.
entities.shift(context);
input.concat(entities);
} else {
// merge last of the input with first of incoming data segment
if (!entities.isEmpty()) {
RubyString last = ((RubyString) input.pop(context));
RubyString nextFirst = ((RubyString) entities.shift(context));
entities.unshift(last.concat(nextFirst));
input.concat(entities);
}
}
}

if (hasSizeLimit) {
final int entitiesSize = ((RubyString) entities.first()).size();
if (bufferFullErrorNotified) {
bufferFullErrorNotified = false;
if (input.isEmpty()) {
return RubyUtil.RUBY.newArray();
}
}
final int entitiesSize = ((RubyString) input.first()).size();
if (inputSize + entitiesSize > sizeLimit) {
bufferFullErrorNotified = true;
headToken = new StringBuilder();
inputSize = 0;
input.shift(context); // consume the token fragment that generates the buffer full
throw new IllegalStateException("input buffer full");
}
this.inputSize = inputSize + entitiesSize;
}
input.append(entities.shift(context));
if (entities.isEmpty()) {

if (input.getLength() < 2) {
// this is a specialization case which avoid adding and removing from input accumulator
// when it contains just one element
headToken.append(input.shift(context)); // remove head
return RubyUtil.RUBY.newArray();
}
entities.unshift(input.join(context));
input.clear();
input.append(entities.pop(context));
inputSize = ((RubyString) input.first()).size();
return entities;

if (headToken.length() > 0) {
// if there is a pending token part, merge it with the first token segment present
// in the accumulator, and clean the pending token part.
headToken.append(input.shift(context)); // append buffer to first element and
// create new RubyString with the data specified encoding
RubyString encodedHeadToken = RubyUtil.RUBY.newString(new ByteList(headToken.toString().getBytes(Charset.forName(encodingName))));
encodedHeadToken.force_encoding(context, RubyUtil.RUBY.newString(encodingName));
input.unshift(encodedHeadToken); // reinsert it into the array
headToken = new StringBuilder();
}
headToken.append(input.pop(context)); // put the leftovers in headToken for later
inputSize = headToken.length();
return input;
}

/**
Expand All @@ -108,15 +156,20 @@ public RubyArray extract(final ThreadContext context, IRubyObject data) {
*/
@JRubyMethod
public IRubyObject flush(final ThreadContext context) {
final IRubyObject buffer = input.join(context);
input.clear();
final IRubyObject buffer = RubyUtil.toRubyObject(headToken.toString());
headToken = new StringBuilder();
inputSize = 0;
return buffer;

// create new RubyString with the last data specified encoding
RubyString encodedHeadToken = RubyUtil.RUBY.newString(new ByteList(buffer.toString().getBytes(Charset.forName(encodingName))));
encodedHeadToken.force_encoding(context, RubyUtil.RUBY.newString(encodingName));

return encodedHeadToken;
}

@JRubyMethod(name = "empty?")
public IRubyObject isEmpty(final ThreadContext context) {
return RubyUtil.RUBY.newBoolean(input.isEmpty() && (inputSize == 0));
return RubyUtil.RUBY.newBoolean(headToken.toString().isEmpty() && (inputSize == 0));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.logstash.common;

import org.jruby.RubyArray;
import org.jruby.RubyEncoding;
import org.jruby.RubyString;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.junit.Before;
import org.junit.Test;
import org.logstash.RubyTestBase;
import org.logstash.RubyUtil;

import java.util.List;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.logstash.RubyUtil.RUBY;

@SuppressWarnings("unchecked")
public final class BufferedTokenizerExtTest extends RubyTestBase {

private BufferedTokenizerExt sut;
private ThreadContext context;

@Before
public void setUp() {
sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER);
context = RUBY.getCurrentContext();
IRubyObject[] args = {};
sut.init(context, args);
}

@Test
public void shouldTokenizeASingleToken() {
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo\n"));

assertEquals(List.of("foo"), tokens);
}

@Test
public void shouldMergeMultipleToken() {
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo"));
assertTrue(tokens.isEmpty());

tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("bar\n"));
assertEquals(List.of("foobar"), tokens);
}

@Test
public void shouldTokenizeMultipleToken() {
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar\n"));

assertEquals(List.of("foo", "bar"), tokens);
}

@Test
public void shouldIgnoreEmptyPayload() {
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString(""));
assertTrue(tokens.isEmpty());

tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar"));
assertEquals(List.of("foo"), tokens);
}

@Test
public void shouldTokenizeEmptyPayloadWithNewline() {
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("\n"));
assertEquals(List.of(""), tokens);

tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("\n\n\n"));
assertEquals(List.of("", "", ""), tokens);
}

@Test
public void shouldNotChangeEncodingOfTokensAfterPartitioning() {
RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x0A, 0x41}); // £ character, newline, A
IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1"));
RubyArray<RubyString> tokens = (RubyArray<RubyString>)sut.extract(context, rubyInput);

// read the first token, the £ string
IRubyObject firstToken = tokens.shift(context);
assertEquals("£", firstToken.toString());

// verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion
RubyEncoding encoding = (RubyEncoding) firstToken.callMethod(context, "encoding");
assertEquals("ISO-8859-1", encoding.toString());
}

@Test
public void shouldNotChangeEncodingOfTokensAfterPartitioningInCaseMultipleExtractionInInvoked() {
RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3}); // £ character
IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1"));
sut.extract(context, rubyInput);
IRubyObject capitalAInLatin1 = RubyString.newString(RUBY, new byte[]{(byte) 0x41})
.force_encoding(context, RUBY.newString("ISO8859-1"));
RubyArray<RubyString> tokens = (RubyArray<RubyString>)sut.extract(context, capitalAInLatin1);
assertTrue(tokens.isEmpty());

tokens = (RubyArray<RubyString>)sut.extract(context, RubyString.newString(RUBY, new byte[]{(byte) 0x0A}));

// read the first token, the £ string
IRubyObject firstToken = tokens.shift(context);
assertEquals("£A", firstToken.toString());

// verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion
RubyEncoding encoding = (RubyEncoding) firstToken.callMethod(context, "encoding");
assertEquals("ISO-8859-1", encoding.toString());
}

@Test
public void shouldNotChangeEncodingOfTokensAfterPartitioningWhenRetrieveLastFlushedToken() {
RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x0A, 0x41}); // £ character, newline, A
IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1"));
RubyArray<RubyString> tokens = (RubyArray<RubyString>)sut.extract(context, rubyInput);

// read the first token, the £ string
IRubyObject firstToken = tokens.shift(context);
assertEquals("£", firstToken.toString());

// flush and check that the remaining A is still encoded in ISO8859-1
IRubyObject lastToken = sut.flush(context);
assertEquals("A", lastToken.toString());

// verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion
RubyEncoding encoding = (RubyEncoding) lastToken.callMethod(context, "encoding");
assertEquals("ISO-8859-1", encoding.toString());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.logstash.common;

import org.jruby.RubyArray;
import org.jruby.RubyString;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.junit.Before;
import org.junit.Test;
import org.logstash.RubyTestBase;
import org.logstash.RubyUtil;

import java.util.List;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.logstash.RubyUtil.RUBY;

@SuppressWarnings("unchecked")
public final class BufferedTokenizerExtWithDelimiterTest extends RubyTestBase {

private BufferedTokenizerExt sut;
private ThreadContext context;

@Before
public void setUp() {
sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER);
context = RUBY.getCurrentContext();
IRubyObject[] args = {RubyUtil.RUBY.newString("||")};
sut.init(context, args);
}

@Test
public void shouldTokenizeMultipleToken() {
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo||b|r||"));

assertEquals(List.of("foo", "b|r"), tokens);
}

@Test
public void shouldIgnoreEmptyPayload() {
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString(""));
assertTrue(tokens.isEmpty());

tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo||bar"));
assertEquals(List.of("foo"), tokens);
}
}
Loading