Skip to content

Commit

Permalink
Send entire SinkRecord value serialized as json in Scalyr event messa…
Browse files Browse the repository at this point in the history
…ge field (#39)

* Add `JsonRecordToMessageMapping` to send entire SinkRecord value serialized as JSON in the `message` field
* Add `send_entire_record` config
  • Loading branch information
Oliver Hsu authored Nov 24, 2020
1 parent 429bf38 commit fc57855
Show file tree
Hide file tree
Showing 12 changed files with 300 additions and 35 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
<version>${kafka.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>com.scalyr</groupId>
<artifactId>scalyr-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public class ScalyrSinkConnectorConfig extends AbstractConfig {
" Multiple custom application event mappings can be specified in a JSON list. Example config JSON:\n"
+ "[{\"matcher\": { \"attribute\": \"app.name\", \"value\": \"customApp\"},\n" +
" \"eventMapping\": { \"message\": \"message\", \"logfile\": \"log.path\", \"serverHost\": \"host.hostname\", \"parser\": \"fields.parser\", \"version\": \"app.version\"} }]";
public static final String SEND_ENTIRE_RECORD = "send_entire_record";
private static final String SEND_ENTIRE_RECORD_DOC = "If true, send the entire Kafka Connect record value serialized to JSON as the message field.";

public ScalyrSinkConnectorConfig(Map<String, String> parsedConfig) {
super(configDef(), parsedConfig);
Expand All @@ -85,7 +87,8 @@ public static ConfigDef configDef() {
.define(ADD_EVENTS_RETRY_DELAY_MS_CONFIG, Type.INT, DEFAULT_ADD_EVENTS_RETRY_DELAY_MS, ConfigDef.Range.atLeast(100), Importance.LOW, ADD_EVENTS_RETRY_DELAY_MS_DOC)
.define(BATCH_SEND_SIZE_BYTES_CONFIG, Type.INT, DEFAULT_BATCH_SEND_SIZE_BYTES, ConfigDef.Range.between(500_000, 5_500_000), Importance.LOW, BATCH_SEND_SIZE_BYTES_DOC)
.define(BATCH_SEND_WAIT_MS_CONFIG, Type.INT, DEFAULT_BATCH_SEND_WAIT_MS, ConfigDef.Range.atLeast(1000), Importance.LOW, BATCH_SEND_WAIT_MS_DOC)
.define(CUSTOM_APP_EVENT_MAPPING_CONFIG, Type.STRING, null, customAppEventMappingValidator, Importance.MEDIUM, CUSTOM_APP_EVENT_MAPPING_DOC);
.define(CUSTOM_APP_EVENT_MAPPING_CONFIG, Type.STRING, null, customAppEventMappingValidator, Importance.MEDIUM, CUSTOM_APP_EVENT_MAPPING_DOC)
.define(SEND_ENTIRE_RECORD, Type.BOOLEAN, false, Importance.LOW, SEND_ENTIRE_RECORD_DOC);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public void start(Map<String, String> configProps) {

this.eventMapper = new EventMapper(
parseEnrichmentAttrs(sinkConfig.getList(ScalyrSinkConnectorConfig.EVENT_ENRICHMENT_CONFIG)),
parseCustomAppEventMapping(sinkConfig.getString(ScalyrSinkConnectorConfig.CUSTOM_APP_EVENT_MAPPING_CONFIG)));
parseCustomAppEventMapping(sinkConfig.getString(ScalyrSinkConnectorConfig.CUSTOM_APP_EVENT_MAPPING_CONFIG)),
sinkConfig.getBoolean(ScalyrSinkConnectorConfig.SEND_ENTIRE_RECORD));

log.info("Started ScalyrSinkTask with config {}", configProps);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,31 @@
public class EventMapper {
private static final Logger log = LoggerFactory.getLogger(EventMapper.class);
private static final List<MessageMapper> DEFAULT_MAPPERS = ImmutableList.of(new FilebeatMessageMapper());
private static final List<MessageMapper> messageMappers = new ArrayList<>();
private final List<MessageMapper> messageMappers = new ArrayList<>();
private final Map<String, String> enrichmentAttrs;
private static final RateLimiter noEventMapperLogRateLimiter = RateLimiter.create(1.0/30); // 1 permit every 30 seconds to not log
@VisibleForTesting static final String DEFAULT_PARSER = "kafkaParser";

/**
* @param enrichmentAttrs Map<String, String> of enrichment key/value pairs
*/
public EventMapper(Map<String, String> enrichmentAttrs, List<CustomAppEventMapping> customAppEventMappings) {
public EventMapper(Map<String, String> enrichmentAttrs, List<CustomAppEventMapping> customAppEventMappings, boolean sendEntireRecord) {
this.enrichmentAttrs = enrichmentAttrs;
if (customAppEventMappings != null) {
log.info("Adding custom event mappers {}", customAppEventMappings);
customAppEventMappings.forEach(customAppEventMapping -> messageMappers.add(new CustomAppMessageMapper(customAppEventMapping)));
customAppEventMappings.forEach(customAppEventMapping -> messageMappers.add(
addJsonRecordToMessageMapperIfNeeeded(new CustomAppMessageMapper(customAppEventMapping), sendEntireRecord)));
}
messageMappers.addAll(DEFAULT_MAPPERS);
DEFAULT_MAPPERS.forEach(messageMapper -> messageMappers.add(addJsonRecordToMessageMapperIfNeeeded(messageMapper, sendEntireRecord)));
}

/**
* Add JsonRecordToMessageMapping decorator if sendEntireRecord is enabled.
* @return messageMapper decorated with JsonRecordToMessageMapping when sendEntireRecord=true.
* Otherwise, return original MessageMapper.
*/
private MessageMapper addJsonRecordToMessageMapperIfNeeeded(MessageMapper messageMapper, boolean sendEntireRecord) {
return sendEntireRecord ? new JsonRecordToMessageMapping(messageMapper) : messageMapper;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2020 Scalyr Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.scalyr.integrations.kafka.mapping;

import com.google.common.collect.ImmutableMap;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.storage.ConverterConfig;

import java.nio.charset.StandardCharsets;
import java.util.Map;

/**
* Decorator for base MessageMapper which will map the entire Record value to the message field.
* This is used to send the entire Kafka record value serialized as JSON to Scalyr in the Scalyr event `message` field.
* The typical use case for this is when the Kafka record value is JSON and the entire JSON needs to be sent to Scalyr.
*/
public class JsonRecordToMessageMapping implements MessageMapper {
/** MessageMapper for the SinkRecord message format. */
private final MessageMapper baseMapper;
private final JsonConverter jsonConverter;

public JsonRecordToMessageMapping(MessageMapper baseMapper) {
this.baseMapper = baseMapper;
jsonConverter = new JsonConverter();
jsonConverter.configure(ImmutableMap.of(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false, ConverterConfig.TYPE_CONFIG, "value"));
}

@Override
public String getServerHost(SinkRecord record) {
return baseMapper.getServerHost(record);
}

@Override
public String getLogfile(SinkRecord record) {
return baseMapper.getLogfile(record);
}

@Override
public String getParser(SinkRecord record) {
return baseMapper.getParser(record);
}

@Override
public String getMessage(SinkRecord record) {
return new String(jsonConverter.fromConnectData(
record.topic(),
record.valueSchema(),
record.value()),
StandardCharsets.UTF_8);
}

@Override
public Map<String, Object> getAdditionalAttrs(SinkRecord record) {
return baseMapper.getAdditionalAttrs(record);
}

@Override
public boolean matches(SinkRecord record) {
return baseMapper.matches(record);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static com.scalyr.integrations.kafka.ScalyrSinkConnectorConfig.*;
import static com.scalyr.integrations.kafka.TestUtils.fails;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -64,7 +65,8 @@ public void testConfig() {
EVENT_ENRICHMENT_CONFIG, TEST_EVENT_ENRICHMENT,
BATCH_SEND_SIZE_BYTES_CONFIG, TEST_BATCH_SEND_SIZE,
BATCH_SEND_WAIT_MS_CONFIG, TEST_BATCH_SEND_WAIT_MS,
CUSTOM_APP_EVENT_MAPPING_CONFIG, TestValues.CUSTOM_APP_EVENT_MAPPING_JSON);
CUSTOM_APP_EVENT_MAPPING_CONFIG, TestValues.CUSTOM_APP_EVENT_MAPPING_JSON,
SEND_ENTIRE_RECORD, "true");

ScalyrSinkConnectorConfig connectorConfig = new ScalyrSinkConnectorConfig(config);
assertEquals(TEST_SCALYR_SERVER, connectorConfig.getString(SCALYR_SERVER_CONFIG));
Expand All @@ -77,6 +79,7 @@ public void testConfig() {
assertEquals(Integer.valueOf(TEST_BATCH_SEND_SIZE), connectorConfig.getInt(BATCH_SEND_SIZE_BYTES_CONFIG));
assertEquals(Integer.valueOf(TEST_BATCH_SEND_WAIT_MS), connectorConfig.getInt(BATCH_SEND_WAIT_MS_CONFIG));
assertEquals(TestValues.CUSTOM_APP_EVENT_MAPPING_JSON, connectorConfig.getString(CUSTOM_APP_EVENT_MAPPING_CONFIG));
assertTrue(connectorConfig.getBoolean(SEND_ENTIRE_RECORD));
}

/**
Expand All @@ -97,6 +100,7 @@ public void testConfigDefaults() {
assertEquals(DEFAULT_BATCH_SEND_SIZE_BYTES, connectorConfig.getInt(BATCH_SEND_SIZE_BYTES_CONFIG).intValue());
assertEquals(DEFAULT_BATCH_SEND_WAIT_MS, connectorConfig.getInt(BATCH_SEND_WAIT_MS_CONFIG).intValue());
assertNull(connectorConfig.getString(CUSTOM_APP_EVENT_MAPPING_CONFIG));
assertFalse(connectorConfig.getBoolean(SEND_ENTIRE_RECORD));
}

/**
Expand Down Expand Up @@ -175,6 +179,10 @@ public void testInvalidConfigValues() {
config.put(ADD_EVENTS_RETRY_DELAY_MS_CONFIG, "1");
fails(() -> new ScalyrSinkConnectorConfig(config), ConfigException.class);
config.remove(ADD_EVENTS_RETRY_DELAY_MS_CONFIG);

config.put(SEND_ENTIRE_RECORD, "invalidBoolean");
fails(() -> new ScalyrSinkConnectorConfig(config), ConfigException.class);
config.remove(SEND_ENTIRE_RECORD);
}

/**
Expand Down Expand Up @@ -254,7 +262,7 @@ public void testConfigDef() {
final ImmutableSet<String> configs = ImmutableSet.of(SCALYR_SERVER_CONFIG, SCALYR_API_CONFIG,
COMPRESSION_TYPE_CONFIG, COMPRESSION_LEVEL_CONFIG, ADD_EVENTS_TIMEOUT_MS_CONFIG,
ADD_EVENTS_RETRY_DELAY_MS_CONFIG, EVENT_ENRICHMENT_CONFIG,
BATCH_SEND_SIZE_BYTES_CONFIG, BATCH_SEND_WAIT_MS_CONFIG, CUSTOM_APP_EVENT_MAPPING_CONFIG);
BATCH_SEND_SIZE_BYTES_CONFIG, BATCH_SEND_WAIT_MS_CONFIG, CUSTOM_APP_EVENT_MAPPING_CONFIG, SEND_ENTIRE_RECORD);

ConfigDef configDef = ScalyrSinkConnectorConfig.configDef();
assertEquals(configs.size(), configDef.names().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void testTaskConfigs() {
scalyrSinkConnector.start(config);
List<Map<String, String>> taskConfigs = scalyrSinkConnector.taskConfigs(numTaskConfigs);
assertEquals(numTaskConfigs, taskConfigs.size());
taskConfigs.forEach(taskConfig -> TestUtils.verifyMap(config, taskConfig));
taskConfigs.forEach(taskConfig -> assertEquals(config, taskConfig));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static com.scalyr.integrations.kafka.AddEventsClientTest.EVENTS;
import static com.scalyr.integrations.kafka.TestUtils.fails;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeFalse;

/**
* Test ScalyrSinkTask
Expand All @@ -58,6 +60,7 @@ public class ScalyrSinkTaskTest {

private ScalyrSinkTask scalyrSinkTask;
private final TriFunction<Integer, Integer, Integer, Object> recordValue;
private final boolean sendEntireRecord;

private static final String topic = "test-topic";
private static final int partition = 0;
Expand All @@ -67,15 +70,19 @@ public class ScalyrSinkTaskTest {
private static final int ADD_EVENTS_OVERHEAD_BYTES = (int)(TestValues.MIN_BATCH_SEND_SIZE_BYTES * 0.2); // 20% overhead

/**
* Create test parameters for each SinkRecordValueCreator type.
* Create test parameters for each SinkRecordValueCreator type and send_entire_record combination.
* Object[] = {TriFunction<Integer, Integer, Integer, Object> recordValue, send_entire_record boolean}
*/
@Parameterized.Parameters
public static Collection<Object[]> testParams() {
return TestUtils.multipleRecordValuesTestParams();
return TestUtils.multipleRecordValuesTestParams().stream()
.flatMap(recordValue -> Stream.of(new Object[] {recordValue[0], false}, new Object[] {recordValue[0], true}))
.collect(Collectors.toList());
}

public ScalyrSinkTaskTest(TriFunction<Integer, Integer, Integer, Object> recordValue) {
public ScalyrSinkTaskTest(TriFunction<Integer, Integer, Integer, Object> recordValue, boolean sendEntireRecord) {
this.recordValue = recordValue;
this.sendEntireRecord = sendEntireRecord;
// Print test params
Object data = recordValue.apply(1, 1, 1);
System.out.println("Executing test with " + (data instanceof Struct ? "schema" : "schemaless") + " recordValue: " + data);
Expand Down Expand Up @@ -119,7 +126,7 @@ public void testPut() throws Exception {
*/
private void putAndVerifyRecords(MockWebServer server) throws InterruptedException, java.io.IOException {
// Add multiple server responses for `put` batch exceeds `batch_send_size_bytes`
IntStream.range(0, 2).forEach(i ->
IntStream.range(0, 4).forEach(i ->
server.enqueue(new MockResponse().setResponseCode(200).setBody(TestValues.ADD_EVENTS_RESPONSE_SUCCESS)));

// put SinkRecords
Expand All @@ -141,7 +148,7 @@ private void verifyRecords(MockWebServer server, List<SinkRecord> records) throw

EventMapper eventMapper = new EventMapper(
scalyrSinkTask.parseEnrichmentAttrs(new ScalyrSinkConnectorConfig(createConfig()).getList(ScalyrSinkConnectorConfig.EVENT_ENRICHMENT_CONFIG)),
CustomAppEventMapping.parseCustomAppEventMappingConfig(TestValues.CUSTOM_APP_EVENT_MAPPING_JSON));
CustomAppEventMapping.parseCustomAppEventMappingConfig(TestValues.CUSTOM_APP_EVENT_MAPPING_JSON), sendEntireRecord);

List<Event> origEvents = records.stream()
.map(eventMapper::createEvent)
Expand Down Expand Up @@ -190,6 +197,7 @@ public void testPutFlushCycles() throws Exception {
*/
@Test
public void testPutErrorHandling() {
assumeFalse(sendEntireRecord);
final int numRequests = 3;
int requestCount = 0;
TestUtils.MockSleep mockSleep = new TestUtils.MockSleep();
Expand Down Expand Up @@ -228,6 +236,7 @@ public void testPutErrorHandling() {
*/
@Test
public void testIgnoreInputTooLongError() throws Exception {
assumeFalse(sendEntireRecord);
TestUtils.MockSleep mockSleep = new TestUtils.MockSleep();
this.scalyrSinkTask = new ScalyrSinkTask(mockSleep.sleep);
MockWebServer server = new MockWebServer();
Expand Down Expand Up @@ -258,6 +267,7 @@ public void testIgnoreInputTooLongError() throws Exception {
*/
@Test
public void testPutEventBufferingSendSize() throws Exception {
assumeFalse(sendEntireRecord);
final int numRecords = (TestValues.MIN_BATCH_EVENTS / 2) + 1;
ScalyrUtil.setCustomTimeNs(0); // Set custom time and never advance so batchSendWaitMs will not be met

Expand Down Expand Up @@ -366,6 +376,7 @@ public void testFlushSendsEventsInBuffer() throws Exception {
*/
@Test
public void testSinglePutExceedsBatchBytesSize() throws Exception {
assumeFalse(sendEntireRecord);
final int numExpectedSends = 4;

MockWebServer server = new MockWebServer();
Expand Down Expand Up @@ -394,6 +405,7 @@ public void testSinglePutExceedsBatchBytesSize() throws Exception {
*/
@Test
public void testLargeMsgMixedWithSmallMsgs() throws Exception {
assumeFalse(sendEntireRecord);
final int numExpectedSends = 3;

MockWebServer server = new MockWebServer();
Expand Down Expand Up @@ -526,6 +538,7 @@ private Map<String, String> createConfig() {
ScalyrSinkConnectorConfig.SCALYR_API_CONFIG, TestValues.API_KEY_VALUE,
ScalyrSinkConnectorConfig.COMPRESSION_TYPE_CONFIG, CompressorFactory.NONE,
ScalyrSinkConnectorConfig.EVENT_ENRICHMENT_CONFIG, TestValues.ENRICHMENT_VALUE,
ScalyrSinkConnectorConfig.CUSTOM_APP_EVENT_MAPPING_CONFIG, TestValues.CUSTOM_APP_EVENT_MAPPING_JSON);
ScalyrSinkConnectorConfig.CUSTOM_APP_EVENT_MAPPING_CONFIG, TestValues.CUSTOM_APP_EVENT_MAPPING_JSON,
ScalyrSinkConnectorConfig.SEND_ENTIRE_RECORD, Boolean.toString(sendEntireRecord));
}
}
Loading

0 comments on commit fc57855

Please sign in to comment.