From fc57855f07d3cce15fe275de79d7bdd16dde6372 Mon Sep 17 00:00:00 2001 From: Oliver Hsu Date: Tue, 24 Nov 2020 13:54:35 -0800 Subject: [PATCH] Send entire SinkRecord value serialized as json in Scalyr event message field (#39) * Add `JsonRecordToMessageMapping` to send entire SinkRecord value serialized as JSON in the `message` field * Add `send_entire_record` config --- pom.xml | 5 + .../kafka/ScalyrSinkConnectorConfig.java | 5 +- .../integrations/kafka/ScalyrSinkTask.java | 3 +- .../kafka/mapping/EventMapper.java | 18 +++- .../mapping/JsonRecordToMessageMapping.java | 76 +++++++++++++++ .../kafka/ScalyrSinkConnectorConfigTest.java | 12 ++- .../kafka/ScalyrSinkConnectorTest.java | 2 +- .../kafka/ScalyrSinkTaskTest.java | 25 +++-- .../scalyr/integrations/kafka/TestUtils.java | 43 +++++++-- .../mapping/CustomAppMessageMapperTest.java | 3 +- .../kafka/mapping/EventMapperTest.java | 48 +++++++--- .../JsonRecordToMessageMappingTest.java | 95 +++++++++++++++++++ 12 files changed, 300 insertions(+), 35 deletions(-) create mode 100644 src/main/java/com/scalyr/integrations/kafka/mapping/JsonRecordToMessageMapping.java create mode 100644 src/test/java/com/scalyr/integrations/kafka/mapping/JsonRecordToMessageMappingTest.java diff --git a/pom.xml b/pom.xml index 5fe350e..50928ef 100644 --- a/pom.xml +++ b/pom.xml @@ -26,6 +26,11 @@ ${kafka.version} provided + + org.apache.kafka + connect-json + ${kafka.version} + com.scalyr scalyr-client diff --git a/src/main/java/com/scalyr/integrations/kafka/ScalyrSinkConnectorConfig.java b/src/main/java/com/scalyr/integrations/kafka/ScalyrSinkConnectorConfig.java index 9c1eaad..5c3a7a9 100644 --- a/src/main/java/com/scalyr/integrations/kafka/ScalyrSinkConnectorConfig.java +++ b/src/main/java/com/scalyr/integrations/kafka/ScalyrSinkConnectorConfig.java @@ -68,6 +68,8 @@ public class ScalyrSinkConnectorConfig extends AbstractConfig { " Multiple custom application event mappings can be specified in a JSON list. Example config JSON:\n" + "[{\"matcher\": { \"attribute\": \"app.name\", \"value\": \"customApp\"},\n" + " \"eventMapping\": { \"message\": \"message\", \"logfile\": \"log.path\", \"serverHost\": \"host.hostname\", \"parser\": \"fields.parser\", \"version\": \"app.version\"} }]"; + public static final String SEND_ENTIRE_RECORD = "send_entire_record"; + private static final String SEND_ENTIRE_RECORD_DOC = "If true, send the entire Kafka Connect record value serialized to JSON as the message field."; public ScalyrSinkConnectorConfig(Map parsedConfig) { super(configDef(), parsedConfig); @@ -85,7 +87,8 @@ public static ConfigDef configDef() { .define(ADD_EVENTS_RETRY_DELAY_MS_CONFIG, Type.INT, DEFAULT_ADD_EVENTS_RETRY_DELAY_MS, ConfigDef.Range.atLeast(100), Importance.LOW, ADD_EVENTS_RETRY_DELAY_MS_DOC) .define(BATCH_SEND_SIZE_BYTES_CONFIG, Type.INT, DEFAULT_BATCH_SEND_SIZE_BYTES, ConfigDef.Range.between(500_000, 5_500_000), Importance.LOW, BATCH_SEND_SIZE_BYTES_DOC) .define(BATCH_SEND_WAIT_MS_CONFIG, Type.INT, DEFAULT_BATCH_SEND_WAIT_MS, ConfigDef.Range.atLeast(1000), Importance.LOW, BATCH_SEND_WAIT_MS_DOC) - .define(CUSTOM_APP_EVENT_MAPPING_CONFIG, Type.STRING, null, customAppEventMappingValidator, Importance.MEDIUM, CUSTOM_APP_EVENT_MAPPING_DOC); + .define(CUSTOM_APP_EVENT_MAPPING_CONFIG, Type.STRING, null, customAppEventMappingValidator, Importance.MEDIUM, CUSTOM_APP_EVENT_MAPPING_DOC) + .define(SEND_ENTIRE_RECORD, Type.BOOLEAN, false, Importance.LOW, SEND_ENTIRE_RECORD_DOC); } diff --git a/src/main/java/com/scalyr/integrations/kafka/ScalyrSinkTask.java b/src/main/java/com/scalyr/integrations/kafka/ScalyrSinkTask.java index 608c21b..1e27ad3 100644 --- a/src/main/java/com/scalyr/integrations/kafka/ScalyrSinkTask.java +++ b/src/main/java/com/scalyr/integrations/kafka/ScalyrSinkTask.java @@ -115,7 +115,8 @@ public void start(Map configProps) { this.eventMapper = new EventMapper( parseEnrichmentAttrs(sinkConfig.getList(ScalyrSinkConnectorConfig.EVENT_ENRICHMENT_CONFIG)), - parseCustomAppEventMapping(sinkConfig.getString(ScalyrSinkConnectorConfig.CUSTOM_APP_EVENT_MAPPING_CONFIG))); + parseCustomAppEventMapping(sinkConfig.getString(ScalyrSinkConnectorConfig.CUSTOM_APP_EVENT_MAPPING_CONFIG)), + sinkConfig.getBoolean(ScalyrSinkConnectorConfig.SEND_ENTIRE_RECORD)); log.info("Started ScalyrSinkTask with config {}", configProps); } diff --git a/src/main/java/com/scalyr/integrations/kafka/mapping/EventMapper.java b/src/main/java/com/scalyr/integrations/kafka/mapping/EventMapper.java index 7b61391..7e8b0a5 100644 --- a/src/main/java/com/scalyr/integrations/kafka/mapping/EventMapper.java +++ b/src/main/java/com/scalyr/integrations/kafka/mapping/EventMapper.java @@ -39,7 +39,7 @@ public class EventMapper { private static final Logger log = LoggerFactory.getLogger(EventMapper.class); private static final List DEFAULT_MAPPERS = ImmutableList.of(new FilebeatMessageMapper()); - private static final List messageMappers = new ArrayList<>(); + private final List messageMappers = new ArrayList<>(); private final Map enrichmentAttrs; private static final RateLimiter noEventMapperLogRateLimiter = RateLimiter.create(1.0/30); // 1 permit every 30 seconds to not log @VisibleForTesting static final String DEFAULT_PARSER = "kafkaParser"; @@ -47,13 +47,23 @@ public class EventMapper { /** * @param enrichmentAttrs Map of enrichment key/value pairs */ - public EventMapper(Map enrichmentAttrs, List customAppEventMappings) { + public EventMapper(Map enrichmentAttrs, List customAppEventMappings, boolean sendEntireRecord) { this.enrichmentAttrs = enrichmentAttrs; if (customAppEventMappings != null) { log.info("Adding custom event mappers {}", customAppEventMappings); - customAppEventMappings.forEach(customAppEventMapping -> messageMappers.add(new CustomAppMessageMapper(customAppEventMapping))); + customAppEventMappings.forEach(customAppEventMapping -> messageMappers.add( + addJsonRecordToMessageMapperIfNeeeded(new CustomAppMessageMapper(customAppEventMapping), sendEntireRecord))); } - messageMappers.addAll(DEFAULT_MAPPERS); + DEFAULT_MAPPERS.forEach(messageMapper -> messageMappers.add(addJsonRecordToMessageMapperIfNeeeded(messageMapper, sendEntireRecord))); + } + + /** + * Add JsonRecordToMessageMapping decorator if sendEntireRecord is enabled. + * @return messageMapper decorated with JsonRecordToMessageMapping when sendEntireRecord=true. + * Otherwise, return original MessageMapper. + */ + private MessageMapper addJsonRecordToMessageMapperIfNeeeded(MessageMapper messageMapper, boolean sendEntireRecord) { + return sendEntireRecord ? new JsonRecordToMessageMapping(messageMapper) : messageMapper; } /** diff --git a/src/main/java/com/scalyr/integrations/kafka/mapping/JsonRecordToMessageMapping.java b/src/main/java/com/scalyr/integrations/kafka/mapping/JsonRecordToMessageMapping.java new file mode 100644 index 0000000..630a7b1 --- /dev/null +++ b/src/main/java/com/scalyr/integrations/kafka/mapping/JsonRecordToMessageMapping.java @@ -0,0 +1,76 @@ +/* + * Copyright 2020 Scalyr Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.scalyr.integrations.kafka.mapping; + +import com.google.common.collect.ImmutableMap; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.storage.ConverterConfig; + +import java.nio.charset.StandardCharsets; +import java.util.Map; + +/** + * Decorator for base MessageMapper which will map the entire Record value to the message field. + * This is used to send the entire Kafka record value serialized as JSON to Scalyr in the Scalyr event `message` field. + * The typical use case for this is when the Kafka record value is JSON and the entire JSON needs to be sent to Scalyr. + */ +public class JsonRecordToMessageMapping implements MessageMapper { + /** MessageMapper for the SinkRecord message format. */ + private final MessageMapper baseMapper; + private final JsonConverter jsonConverter; + + public JsonRecordToMessageMapping(MessageMapper baseMapper) { + this.baseMapper = baseMapper; + jsonConverter = new JsonConverter(); + jsonConverter.configure(ImmutableMap.of(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false, ConverterConfig.TYPE_CONFIG, "value")); + } + + @Override + public String getServerHost(SinkRecord record) { + return baseMapper.getServerHost(record); + } + + @Override + public String getLogfile(SinkRecord record) { + return baseMapper.getLogfile(record); + } + + @Override + public String getParser(SinkRecord record) { + return baseMapper.getParser(record); + } + + @Override + public String getMessage(SinkRecord record) { + return new String(jsonConverter.fromConnectData( + record.topic(), + record.valueSchema(), + record.value()), + StandardCharsets.UTF_8); + } + + @Override + public Map getAdditionalAttrs(SinkRecord record) { + return baseMapper.getAdditionalAttrs(record); + } + + @Override + public boolean matches(SinkRecord record) { + return baseMapper.matches(record); + } +} diff --git a/src/test/java/com/scalyr/integrations/kafka/ScalyrSinkConnectorConfigTest.java b/src/test/java/com/scalyr/integrations/kafka/ScalyrSinkConnectorConfigTest.java index 3d6e89d..2f4fed9 100644 --- a/src/test/java/com/scalyr/integrations/kafka/ScalyrSinkConnectorConfigTest.java +++ b/src/test/java/com/scalyr/integrations/kafka/ScalyrSinkConnectorConfigTest.java @@ -31,6 +31,7 @@ import static com.scalyr.integrations.kafka.ScalyrSinkConnectorConfig.*; import static com.scalyr.integrations.kafka.TestUtils.fails; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -64,7 +65,8 @@ public void testConfig() { EVENT_ENRICHMENT_CONFIG, TEST_EVENT_ENRICHMENT, BATCH_SEND_SIZE_BYTES_CONFIG, TEST_BATCH_SEND_SIZE, BATCH_SEND_WAIT_MS_CONFIG, TEST_BATCH_SEND_WAIT_MS, - CUSTOM_APP_EVENT_MAPPING_CONFIG, TestValues.CUSTOM_APP_EVENT_MAPPING_JSON); + CUSTOM_APP_EVENT_MAPPING_CONFIG, TestValues.CUSTOM_APP_EVENT_MAPPING_JSON, + SEND_ENTIRE_RECORD, "true"); ScalyrSinkConnectorConfig connectorConfig = new ScalyrSinkConnectorConfig(config); assertEquals(TEST_SCALYR_SERVER, connectorConfig.getString(SCALYR_SERVER_CONFIG)); @@ -77,6 +79,7 @@ public void testConfig() { assertEquals(Integer.valueOf(TEST_BATCH_SEND_SIZE), connectorConfig.getInt(BATCH_SEND_SIZE_BYTES_CONFIG)); assertEquals(Integer.valueOf(TEST_BATCH_SEND_WAIT_MS), connectorConfig.getInt(BATCH_SEND_WAIT_MS_CONFIG)); assertEquals(TestValues.CUSTOM_APP_EVENT_MAPPING_JSON, connectorConfig.getString(CUSTOM_APP_EVENT_MAPPING_CONFIG)); + assertTrue(connectorConfig.getBoolean(SEND_ENTIRE_RECORD)); } /** @@ -97,6 +100,7 @@ public void testConfigDefaults() { assertEquals(DEFAULT_BATCH_SEND_SIZE_BYTES, connectorConfig.getInt(BATCH_SEND_SIZE_BYTES_CONFIG).intValue()); assertEquals(DEFAULT_BATCH_SEND_WAIT_MS, connectorConfig.getInt(BATCH_SEND_WAIT_MS_CONFIG).intValue()); assertNull(connectorConfig.getString(CUSTOM_APP_EVENT_MAPPING_CONFIG)); + assertFalse(connectorConfig.getBoolean(SEND_ENTIRE_RECORD)); } /** @@ -175,6 +179,10 @@ public void testInvalidConfigValues() { config.put(ADD_EVENTS_RETRY_DELAY_MS_CONFIG, "1"); fails(() -> new ScalyrSinkConnectorConfig(config), ConfigException.class); config.remove(ADD_EVENTS_RETRY_DELAY_MS_CONFIG); + + config.put(SEND_ENTIRE_RECORD, "invalidBoolean"); + fails(() -> new ScalyrSinkConnectorConfig(config), ConfigException.class); + config.remove(SEND_ENTIRE_RECORD); } /** @@ -254,7 +262,7 @@ public void testConfigDef() { final ImmutableSet configs = ImmutableSet.of(SCALYR_SERVER_CONFIG, SCALYR_API_CONFIG, COMPRESSION_TYPE_CONFIG, COMPRESSION_LEVEL_CONFIG, ADD_EVENTS_TIMEOUT_MS_CONFIG, ADD_EVENTS_RETRY_DELAY_MS_CONFIG, EVENT_ENRICHMENT_CONFIG, - BATCH_SEND_SIZE_BYTES_CONFIG, BATCH_SEND_WAIT_MS_CONFIG, CUSTOM_APP_EVENT_MAPPING_CONFIG); + BATCH_SEND_SIZE_BYTES_CONFIG, BATCH_SEND_WAIT_MS_CONFIG, CUSTOM_APP_EVENT_MAPPING_CONFIG, SEND_ENTIRE_RECORD); ConfigDef configDef = ScalyrSinkConnectorConfig.configDef(); assertEquals(configs.size(), configDef.names().size()); diff --git a/src/test/java/com/scalyr/integrations/kafka/ScalyrSinkConnectorTest.java b/src/test/java/com/scalyr/integrations/kafka/ScalyrSinkConnectorTest.java index cd719a1..a740dbf 100644 --- a/src/test/java/com/scalyr/integrations/kafka/ScalyrSinkConnectorTest.java +++ b/src/test/java/com/scalyr/integrations/kafka/ScalyrSinkConnectorTest.java @@ -79,7 +79,7 @@ public void testTaskConfigs() { scalyrSinkConnector.start(config); List> taskConfigs = scalyrSinkConnector.taskConfigs(numTaskConfigs); assertEquals(numTaskConfigs, taskConfigs.size()); - taskConfigs.forEach(taskConfig -> TestUtils.verifyMap(config, taskConfig)); + taskConfigs.forEach(taskConfig -> assertEquals(config, taskConfig)); } /** diff --git a/src/test/java/com/scalyr/integrations/kafka/ScalyrSinkTaskTest.java b/src/test/java/com/scalyr/integrations/kafka/ScalyrSinkTaskTest.java index bdccfb7..c762429 100644 --- a/src/test/java/com/scalyr/integrations/kafka/ScalyrSinkTaskTest.java +++ b/src/test/java/com/scalyr/integrations/kafka/ScalyrSinkTaskTest.java @@ -43,12 +43,14 @@ import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import static com.scalyr.integrations.kafka.AddEventsClientTest.EVENTS; import static com.scalyr.integrations.kafka.TestUtils.fails; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeFalse; /** * Test ScalyrSinkTask @@ -58,6 +60,7 @@ public class ScalyrSinkTaskTest { private ScalyrSinkTask scalyrSinkTask; private final TriFunction recordValue; + private final boolean sendEntireRecord; private static final String topic = "test-topic"; private static final int partition = 0; @@ -67,15 +70,19 @@ public class ScalyrSinkTaskTest { private static final int ADD_EVENTS_OVERHEAD_BYTES = (int)(TestValues.MIN_BATCH_SEND_SIZE_BYTES * 0.2); // 20% overhead /** - * Create test parameters for each SinkRecordValueCreator type. + * Create test parameters for each SinkRecordValueCreator type and send_entire_record combination. + * Object[] = {TriFunction recordValue, send_entire_record boolean} */ @Parameterized.Parameters public static Collection testParams() { - return TestUtils.multipleRecordValuesTestParams(); + return TestUtils.multipleRecordValuesTestParams().stream() + .flatMap(recordValue -> Stream.of(new Object[] {recordValue[0], false}, new Object[] {recordValue[0], true})) + .collect(Collectors.toList()); } - public ScalyrSinkTaskTest(TriFunction recordValue) { + public ScalyrSinkTaskTest(TriFunction recordValue, boolean sendEntireRecord) { this.recordValue = recordValue; + this.sendEntireRecord = sendEntireRecord; // Print test params Object data = recordValue.apply(1, 1, 1); System.out.println("Executing test with " + (data instanceof Struct ? "schema" : "schemaless") + " recordValue: " + data); @@ -119,7 +126,7 @@ public void testPut() throws Exception { */ private void putAndVerifyRecords(MockWebServer server) throws InterruptedException, java.io.IOException { // Add multiple server responses for `put` batch exceeds `batch_send_size_bytes` - IntStream.range(0, 2).forEach(i -> + IntStream.range(0, 4).forEach(i -> server.enqueue(new MockResponse().setResponseCode(200).setBody(TestValues.ADD_EVENTS_RESPONSE_SUCCESS))); // put SinkRecords @@ -141,7 +148,7 @@ private void verifyRecords(MockWebServer server, List records) throw EventMapper eventMapper = new EventMapper( scalyrSinkTask.parseEnrichmentAttrs(new ScalyrSinkConnectorConfig(createConfig()).getList(ScalyrSinkConnectorConfig.EVENT_ENRICHMENT_CONFIG)), - CustomAppEventMapping.parseCustomAppEventMappingConfig(TestValues.CUSTOM_APP_EVENT_MAPPING_JSON)); + CustomAppEventMapping.parseCustomAppEventMappingConfig(TestValues.CUSTOM_APP_EVENT_MAPPING_JSON), sendEntireRecord); List origEvents = records.stream() .map(eventMapper::createEvent) @@ -190,6 +197,7 @@ public void testPutFlushCycles() throws Exception { */ @Test public void testPutErrorHandling() { + assumeFalse(sendEntireRecord); final int numRequests = 3; int requestCount = 0; TestUtils.MockSleep mockSleep = new TestUtils.MockSleep(); @@ -228,6 +236,7 @@ public void testPutErrorHandling() { */ @Test public void testIgnoreInputTooLongError() throws Exception { + assumeFalse(sendEntireRecord); TestUtils.MockSleep mockSleep = new TestUtils.MockSleep(); this.scalyrSinkTask = new ScalyrSinkTask(mockSleep.sleep); MockWebServer server = new MockWebServer(); @@ -258,6 +267,7 @@ public void testIgnoreInputTooLongError() throws Exception { */ @Test public void testPutEventBufferingSendSize() throws Exception { + assumeFalse(sendEntireRecord); final int numRecords = (TestValues.MIN_BATCH_EVENTS / 2) + 1; ScalyrUtil.setCustomTimeNs(0); // Set custom time and never advance so batchSendWaitMs will not be met @@ -366,6 +376,7 @@ public void testFlushSendsEventsInBuffer() throws Exception { */ @Test public void testSinglePutExceedsBatchBytesSize() throws Exception { + assumeFalse(sendEntireRecord); final int numExpectedSends = 4; MockWebServer server = new MockWebServer(); @@ -394,6 +405,7 @@ public void testSinglePutExceedsBatchBytesSize() throws Exception { */ @Test public void testLargeMsgMixedWithSmallMsgs() throws Exception { + assumeFalse(sendEntireRecord); final int numExpectedSends = 3; MockWebServer server = new MockWebServer(); @@ -526,6 +538,7 @@ private Map createConfig() { ScalyrSinkConnectorConfig.SCALYR_API_CONFIG, TestValues.API_KEY_VALUE, ScalyrSinkConnectorConfig.COMPRESSION_TYPE_CONFIG, CompressorFactory.NONE, ScalyrSinkConnectorConfig.EVENT_ENRICHMENT_CONFIG, TestValues.ENRICHMENT_VALUE, - ScalyrSinkConnectorConfig.CUSTOM_APP_EVENT_MAPPING_CONFIG, TestValues.CUSTOM_APP_EVENT_MAPPING_JSON); + ScalyrSinkConnectorConfig.CUSTOM_APP_EVENT_MAPPING_CONFIG, TestValues.CUSTOM_APP_EVENT_MAPPING_JSON, + ScalyrSinkConnectorConfig.SEND_ENTIRE_RECORD, Boolean.toString(sendEntireRecord)); } } diff --git a/src/test/java/com/scalyr/integrations/kafka/TestUtils.java b/src/test/java/com/scalyr/integrations/kafka/TestUtils.java index 4067bcc..1d607c0 100644 --- a/src/test/java/com/scalyr/integrations/kafka/TestUtils.java +++ b/src/test/java/com/scalyr/integrations/kafka/TestUtils.java @@ -16,6 +16,7 @@ package com.scalyr.integrations.kafka; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.scalyr.api.internal.ScalyrUtil; import com.scalyr.integrations.kafka.mapping.CustomAppMessageMapperTest; @@ -24,8 +25,12 @@ import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.sink.SinkRecord; +import java.io.IOException; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -42,6 +47,7 @@ import java.util.stream.Stream; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -50,6 +56,8 @@ */ public class TestUtils { + private static final ObjectMapper objectMapper = new ObjectMapper(); + /** * Create a Map of of String[] key/value pairs * @param keyValuePairs key1, value1, key2, value2, ... @@ -66,12 +74,34 @@ public static Map makeMap(String... keyValuePairs) { } /** - * Verify two maps contain the same values. + * Verify serialized Map JSON contains expected values + */ + public static void verifyMap(Map expected, String serializedMapJson) throws IOException { + final Map mapValues = objectMapper.readValue(serializedMapJson, Map.class); + assertEquals(expected, mapValues); + } + + /** + * Verifies Struct serialized to JSON contains the correct Struct values. + * @param structValues Original Struct that is serialized to JSON + * @param serializedStructJson Serialized Struct JSON */ - public static void verifyMap(Map expected, Map actual) { - assertEquals(expected.size(), actual.size()); - assertEquals(expected.keySet(), actual.keySet()); - expected.keySet().forEach(key -> assertEquals(expected.get(key), actual.get(key))); + public static void verifyStruct(Struct structValues, String serializedStructJson) throws IOException { + final Map mapValues = objectMapper.readValue(serializedStructJson, Map.class); + verifyStruct(structValues, mapValues); + } + + public static void verifyStruct(Struct structValues, Map mapValues) { + final List fields = structValues.schema().fields(); + assertFalse(fields.isEmpty()); + for (Field field : fields) { + assertTrue(mapValues.containsKey(field.name())); + if (field.schema().type() == Schema.Type.STRUCT) { + verifyStruct(structValues.getStruct(field.name()), (Map)mapValues.get(field.name())); + } else { + assertEquals(structValues.get(field.name()), mapValues.get(field.name())); + } + } } /** @@ -177,9 +207,10 @@ private static Stream> getMultipl public static List createRecords(String topic, int partition, int numRecords, Object recordValue) { AtomicInteger offset = new AtomicInteger(); + final Schema valueSchema = recordValue instanceof Struct ? ((Struct)recordValue).schema() : null; return IntStream.range(0, numRecords) .boxed() - .map(i -> new SinkRecord(topic, partition, null, null, null, recordValue, offset.getAndIncrement(), ScalyrUtil.currentTimeMillis(), TimestampType.CREATE_TIME)) + .map(i -> new SinkRecord(topic, partition, null, null, valueSchema, recordValue, offset.getAndIncrement(), ScalyrUtil.currentTimeMillis(), TimestampType.CREATE_TIME)) .collect(Collectors.toList()); } diff --git a/src/test/java/com/scalyr/integrations/kafka/mapping/CustomAppMessageMapperTest.java b/src/test/java/com/scalyr/integrations/kafka/mapping/CustomAppMessageMapperTest.java index 661e502..78e20f4 100644 --- a/src/test/java/com/scalyr/integrations/kafka/mapping/CustomAppMessageMapperTest.java +++ b/src/test/java/com/scalyr/integrations/kafka/mapping/CustomAppMessageMapperTest.java @@ -16,6 +16,7 @@ package com.scalyr.integrations.kafka.mapping; +import com.google.common.annotations.VisibleForTesting; import com.scalyr.integrations.kafka.TestUtils; import com.scalyr.integrations.kafka.TestValues; import org.apache.kafka.connect.data.Schema; @@ -60,7 +61,7 @@ public void setup() { * Typically this is parsed from JSON, but we construct it programmatically here. * See {@link CustomAppRecordValueCreator} for the app fields. */ - private CustomAppEventMapping createCustomAppEventMapping() { + @VisibleForTesting static CustomAppEventMapping createCustomAppEventMapping() { CustomAppEventMapping customAppEventMapping = new CustomAppEventMapping(); customAppEventMapping.setEventMapping(TestUtils.makeMap( "id", "id", diff --git a/src/test/java/com/scalyr/integrations/kafka/mapping/EventMapperTest.java b/src/test/java/com/scalyr/integrations/kafka/mapping/EventMapperTest.java index b437779..4693b9f 100644 --- a/src/test/java/com/scalyr/integrations/kafka/mapping/EventMapperTest.java +++ b/src/test/java/com/scalyr/integrations/kafka/mapping/EventMapperTest.java @@ -21,6 +21,7 @@ import com.scalyr.integrations.kafka.TestUtils; import com.scalyr.integrations.kafka.TestValues; import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.Before; @@ -37,9 +38,11 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; /** * Test for EventMapper @@ -55,22 +58,25 @@ public class EventMapperTest { private final Supplier recordValue; private final Map enrichmentAttrs; + private final boolean sendEntireRecord; private EventMapper eventMapper; /** - * Create test parameters for each SinkRecordValueCreator type and testEnrichmentAttrs combination. - * Object[] = {Supplier recordValue, List enrichmentAttr} + * Create test parameters for each SinkRecordValueCreator type, testEnrichmentAttrs, and sendEntireRecord combination. + * Object[] = {Supplier recordValue, List enrichmentAttr, sendEntireRecord boolean} */ @Parameterized.Parameters public static Collection testParams() { return TestUtils.singleRecordValueTestParams().stream() - .flatMap(recordValue -> testEnrichmentAttrs.stream().map(enrichmentAttrs -> new Object[] {recordValue[0], enrichmentAttrs})) + .flatMap(recordValue -> testEnrichmentAttrs.stream().flatMap(enrichmentAttrs -> + Stream.of(new Object[] {recordValue[0], enrichmentAttrs, false}, new Object[] {recordValue[0], enrichmentAttrs, true}))) .collect(Collectors.toList()); } - public EventMapperTest(Supplier recordValue, Map enrichmentAttrs) { + public EventMapperTest(Supplier recordValue, Map enrichmentAttrs, boolean sendEntireRecord) { this.recordValue = recordValue; this.enrichmentAttrs = enrichmentAttrs; + this.sendEntireRecord = sendEntireRecord; // Print test params Object data = recordValue.get(); @@ -80,20 +86,22 @@ public EventMapperTest(Supplier recordValue, Map enrichm @Before public void setup() throws IOException { - this.eventMapper = new EventMapper(enrichmentAttrs, CustomAppEventMapping.parseCustomAppEventMappingConfig(TestValues.CUSTOM_APP_EVENT_MAPPING_JSON)); + this.eventMapper = new EventMapper(enrichmentAttrs, CustomAppEventMapping.parseCustomAppEventMappingConfig(TestValues.CUSTOM_APP_EVENT_MAPPING_JSON), sendEntireRecord); } /** * Test EventMapper with no timestamp in SinkRecord */ @Test - public void createEventNoTimestampTest() { + public void createEventNoTimestampTest() throws Exception { // Without timestamp final long nsFromEpoch = ScalyrUtil.NANOS_PER_SECOND; // 1 second after epoch ScalyrUtil.setCustomTimeNs(nsFromEpoch); - SinkRecord sinkRecord = new SinkRecord(topic, partition, null, null, null, recordValue.get(), offset.getAndIncrement()); + Object value = recordValue.get(); + Schema valueSchema = value instanceof Struct ? ((Struct)value).schema() : null; + SinkRecord sinkRecord = new SinkRecord(topic, partition, null, null, valueSchema, value, offset.getAndIncrement()); Event event = eventMapper.createEvent(sinkRecord); - validateEvent(event); + validateEvent(event, value); assertEquals(nsFromEpoch, event.getTimestamp()); } @@ -101,12 +109,14 @@ public void createEventNoTimestampTest() { * Test EventMapper with timestamp in SinkRecord */ @Test - public void createEventWithTimestampTest() { + public void createEventWithTimestampTest() throws Exception { // With timestamp final long msSinceEpoch = 60 * 1000; // 1 minute after epoch - SinkRecord sinkRecord = new SinkRecord(topic, partition, null, null, null, recordValue.get(), offset.getAndIncrement(), msSinceEpoch, TimestampType.CREATE_TIME); + Object value = recordValue.get(); + Schema valueSchema = value instanceof Struct ? ((Struct)value).schema() : null; + SinkRecord sinkRecord = new SinkRecord(topic, partition, null, null, valueSchema, value, offset.getAndIncrement(), msSinceEpoch, TimestampType.CREATE_TIME); Event event = eventMapper.createEvent(sinkRecord); - validateEvent(event); + validateEvent(event, value); assertEquals(msSinceEpoch * ScalyrUtil.NANOS_PER_MS, event.getTimestamp()); } @@ -139,13 +149,13 @@ public void defaultValuesTest() { /** * Validate Scalyr event matches SinkRecord */ - private void validateEvent(Event event) { + private void validateEvent(Event event, Object recordValue) throws Exception { assertEquals(TestValues.SERVER_VALUE + "0", event.getServerHost()); - assertEquals(TestValues.MESSAGE_VALUE, event.getMessage()); assertEquals(topic, event.getTopic()); assertEquals(partition, event.getPartition()); assertEquals(offset.get() - 1, event.getOffset()); assertEquals(enrichmentAttrs, event.getEnrichmentAttrs()); + validateMessage(event, recordValue); if (event.getLogfile().equals(TestValues.CUSTOM_APP_NAME)) { validateCustomAppFields(event); @@ -154,6 +164,18 @@ private void validateEvent(Event event) { } } + private void validateMessage(Event event, Object recordValue) throws Exception { + if (!sendEntireRecord) { + assertEquals(TestValues.MESSAGE_VALUE, event.getMessage()); + } else if (recordValue instanceof Map) { + TestUtils.verifyMap((Map)recordValue, event.getMessage()); + } else if (recordValue instanceof Struct) { + TestUtils.verifyStruct((Struct)recordValue, event.getMessage()); + } else { + fail("Invalid record value"); + } + } + private void validateFilebeatFields(Event event) { assertEquals(TestValues.LOGFILE_VALUE + "0", event.getLogfile()); assertEquals(TestValues.PARSER_VALUE + "0", event.getParser()); diff --git a/src/test/java/com/scalyr/integrations/kafka/mapping/JsonRecordToMessageMappingTest.java b/src/test/java/com/scalyr/integrations/kafka/mapping/JsonRecordToMessageMappingTest.java new file mode 100644 index 0000000..e758596 --- /dev/null +++ b/src/test/java/com/scalyr/integrations/kafka/mapping/JsonRecordToMessageMappingTest.java @@ -0,0 +1,95 @@ +/* + * Copyright 2020 Scalyr Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.scalyr.integrations.kafka.mapping; + +import com.scalyr.integrations.kafka.TestUtils; +import com.scalyr.integrations.kafka.TestValues; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.Before; +import org.junit.Test; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Test for JsonRecordToMessageMapping + */ +public class JsonRecordToMessageMappingTest { + /** Simulated Kafka partition offset */ + private static final AtomicInteger kafkaOffset = new AtomicInteger(); + private MessageMapper messageMapper; + private SinkRecordValueCreator sinkRecordValueCreator; + + private static final String topic = "test-topic"; + private static final int partition = 0; + + @Before + public void setup() { + final CustomAppMessageMapper customAppMessageMapper = new CustomAppMessageMapper(CustomAppMessageMapperTest.createCustomAppEventMapping()); + messageMapper = new JsonRecordToMessageMapping(customAppMessageMapper); + sinkRecordValueCreator = new CustomAppMessageMapperTest.CustomAppRecordValueCreator(); + } + + /** + * Test mapper gets correct values for schemaless record value + */ + @Test + public void testCustomAppMessageMapperSchemaless() throws Exception { + SinkRecord record = new SinkRecord(topic, partition, null, null, null, sinkRecordValueCreator.createSchemalessRecordValue(1, 1, 1), kafkaOffset.getAndIncrement()); + verifySinkRecord(record); + } + + /** + * Test mapper gets correct values for schema record value + */ + @Test + public void testCustomAppMessageMapperSchema() throws Exception { + final Struct schemaRecordValue = sinkRecordValueCreator.createSchemaRecordValue(1, 1, 1); + SinkRecord record = new SinkRecord(topic, partition, null, null, schemaRecordValue.schema(), schemaRecordValue, kafkaOffset.getAndIncrement()); + verifySinkRecord(record); + } + + private void verifySinkRecord(SinkRecord record) throws Exception { + assertEquals(TestValues.CUSTOM_APP_NAME, messageMapper.getLogfile(record)); + assertEquals(TestValues.PARSER_VALUE, messageMapper.getParser(record)); + assertEquals(TestValues.SERVER_VALUE + "0", messageMapper.getServerHost(record)); + Map additionalAttrs = messageMapper.getAdditionalAttrs(record); + assertNotNull(additionalAttrs.get("id")); + assertEquals(TestValues.SEVERITY_VALUE, additionalAttrs.get("severity")); + assertEquals(TestValues.CUSTOM_APP_VERSION, additionalAttrs.get("version")); + assertEquals(TestValues.CUSTOM_APP_NAME, additionalAttrs.get("application")); + assertEquals(TestValues.ACTIVITY_TYPE_VALUE, additionalAttrs.get("activityType")); + + // Verify message field, which should contain the entire record value serialized to JSON + if (record.value() instanceof Map) { + // Schemaless record value + TestUtils.verifyMap((Map)record.value(), messageMapper.getMessage(record)); + } else if (record.value() instanceof Struct){ + // Schema record value + TestUtils.verifyStruct((Struct)record.value(), messageMapper.getMessage(record)); + } else { + fail("Invalid record value"); + } + + assertTrue(messageMapper.matches(record)); + } +}