Skip to content

Commit f7fcc8f

Browse files
committed
refactor of systemtest base in preparation for #8
1 parent de5372a commit f7fcc8f

File tree

5 files changed

+213
-114
lines changed

5 files changed

+213
-114
lines changed
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright 2014 Michal Harish, [email protected]
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package io.amient.kafka.hadoop;
21+
22+
import io.amient.kafka.hadoop.testutils.SystemTestBase;
23+
import kafka.producer.KeyedMessage;
24+
import org.apache.hadoop.fs.Path;
25+
import org.junit.Test;
26+
27+
import java.io.IOException;
28+
29+
import static org.junit.Assert.assertEquals;
30+
import static org.junit.Assert.assertTrue;
31+
32+
public class DefaultSystemTest extends SystemTestBase {
33+
@Test
34+
public void canFollowKafkaPartitionsIncrementally()
35+
throws IOException, ClassNotFoundException, InterruptedException {
36+
37+
//produce text data
38+
simpleProducer.send(new KeyedMessage<>("topic01", "key1", "payloadA"));
39+
simpleProducer.send(new KeyedMessage<>("topic01", "key2", "payloadB"));
40+
simpleProducer.send(new KeyedMessage<>("topic01", "key1", "payloadC"));
41+
42+
//run the first job
43+
runSimpleJob("topic01", "canFollowKafkaPartitions");
44+
45+
//produce more data
46+
simpleProducer.send(new KeyedMessage<>("topic01", "key1", "payloadD"));
47+
simpleProducer.send(new KeyedMessage<>("topic01", "key2", "payloadE"));
48+
49+
//run the second job
50+
Path result = runSimpleJob("topic01", "canFollowKafkaPartitions");
51+
52+
//check results
53+
Path part0offset0 = new Path(result, "topic01/0/topic01-0-0000000000000000000");
54+
assertTrue(localFileSystem.exists(part0offset0));
55+
assertEquals(String.format("payloadA%npayloadC%n"), readFullyAsString(part0offset0, 20));
56+
57+
Path part0offset2 = new Path(result, "topic01/0/topic01-0-0000000000000000002");
58+
assertTrue(localFileSystem.exists(part0offset2));
59+
assertEquals(String.format("payloadD%n"), readFullyAsString(part0offset2, 20));
60+
61+
Path part1offset0 = new Path(result, "topic01/1/topic01-1-0000000000000000000");
62+
assertTrue(localFileSystem.exists(part1offset0));
63+
assertEquals(String.format("payloadB%n"), readFullyAsString(part1offset0, 20));
64+
65+
Path part1offset1 = new Path(result, "topic01/1/topic01-1-0000000000000000001");
66+
assertTrue(localFileSystem.exists(part1offset1));
67+
assertEquals(String.format("payloadE%n"), readFullyAsString(part1offset1, 20));
68+
69+
}
70+
71+
72+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2014 Michal Harish, [email protected]
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package io.amient.kafka.hadoop;
21+
22+
import io.amient.kafka.hadoop.io.KafkaInputFormat;
23+
import io.amient.kafka.hadoop.io.MultiOutputFormat;
24+
import io.amient.kafka.hadoop.testutils.MyJsonTimestampExtractor;
25+
import io.amient.kafka.hadoop.testutils.SystemTestBase;
26+
import kafka.producer.KeyedMessage;
27+
import org.junit.Test;
28+
29+
import java.io.IOException;
30+
31+
public class InvalidInputSystemTest extends SystemTestBase {
32+
33+
@Test(expected = java.lang.Error.class)
34+
public void failsNormallyWithInvalidInput() throws IOException, ClassNotFoundException, InterruptedException {
35+
//configure inputs, timestamp extractor and the output path format
36+
KafkaInputFormat.configureKafkaTopics(conf, "topic02");
37+
KafkaInputFormat.configureZkConnection(conf, zkConnect);
38+
HadoopJobMapper.configureTimestampExtractor(conf, MyJsonTimestampExtractor.class.getName());
39+
MultiOutputFormat.configurePathFormat(conf, "'t={T}/d='yyyy-MM-dd'/h='HH");
40+
41+
//produce and run
42+
simpleProducer.send(new KeyedMessage<>("topic02", "1", "{invalid-json-should-fail-in-extractor"));
43+
runSimpleJob("topic02", "failsNormallyWithInvalidInput");
44+
}
45+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2014 Michal Harish, [email protected]
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package io.amient.kafka.hadoop;
21+
22+
import io.amient.kafka.hadoop.io.KafkaInputFormat;
23+
import io.amient.kafka.hadoop.io.MultiOutputFormat;
24+
import io.amient.kafka.hadoop.testutils.MyJsonTimestampExtractor;
25+
import io.amient.kafka.hadoop.testutils.SystemTestBase;
26+
import kafka.producer.KeyedMessage;
27+
import org.apache.hadoop.fs.Path;
28+
import org.junit.Test;
29+
30+
import java.io.IOException;
31+
32+
import static org.junit.Assert.assertEquals;
33+
import static org.junit.Assert.assertTrue;
34+
35+
public class TimestampExtractorSystemTest extends SystemTestBase {
36+
37+
38+
@Test
39+
public void canUseTimestampInPartitions() throws IOException, ClassNotFoundException, InterruptedException {
40+
41+
//produce some json data
42+
String message5 = "{\"version\":5,\"timestamp\":1402944501425,\"id\": 1}";
43+
simpleProducer.send(new KeyedMessage<>("topic02", "1", message5));
44+
String message1 = "{\"version\":1,\"timestamp\":1402945801425,\"id\": 2}";
45+
simpleProducer.send(new KeyedMessage<>("topic02", "2", message1));
46+
String message6 = "{\"version\":6,\"timestamp\":1402948801425,\"id\": 1}";
47+
simpleProducer.send(new KeyedMessage<>("topic02", "1", message6));
48+
//testing a null message - with timestamp extractor this means skip message
49+
simpleProducer.send(new KeyedMessage<>("topic02", "1", (String)null));
50+
51+
//configure inputs, timestamp extractor and the output path format
52+
KafkaInputFormat.configureKafkaTopics(conf, "topic02");
53+
KafkaInputFormat.configureZkConnection(conf, zkConnect);
54+
HadoopJobMapper.configureTimestampExtractor(conf, MyJsonTimestampExtractor.class.getName());
55+
MultiOutputFormat.configurePathFormat(conf, "'t={T}/d='yyyy-MM-dd'/h='HH");
56+
57+
Path outDir = runSimpleJob("topic02", "canUseTimestampInPartitions");
58+
59+
Path h18 = new Path(outDir, "t=topic02/d=2014-06-16/h=18/topic02-1-0000000000000000000");
60+
assertTrue(localFileSystem.exists(h18));
61+
assertEquals(String.format("%s%n", message5), readFullyAsString(h18, 100));
62+
63+
Path h19 = new Path(outDir, "t=topic02/d=2014-06-16/h=19/topic02-0-0000000000000000000");
64+
assertTrue(localFileSystem.exists(h19));
65+
assertEquals(String.format("%s%n", message1), readFullyAsString(h19, 100));
66+
67+
Path h20 = new Path(outDir, "t=topic02/d=2014-06-16/h=20/topic02-1-0000000000000000000");
68+
assertTrue(localFileSystem.exists(h20));
69+
assertEquals(String.format("%s%n", message6), readFullyAsString(h20, 100));
70+
}
71+
72+
73+
}

src/test/java/io/amient/kafka/hadoop/TimestampExtractorTest.java renamed to src/test/java/io/amient/kafka/hadoop/TimestampExtractorUnitTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import static org.junit.Assert.assertEquals;
1818

19-
public class TimestampExtractorTest {
19+
public class TimestampExtractorUnitTest {
2020

2121
private MapDriver<MsgMetadataWritable, BytesWritable, MsgMetadataWritable, BytesWritable> mapDriver;
2222

0 commit comments

Comments
 (0)