Skip to content

Commit

Permalink
#15 Fix for output paths when no timestamp extractor is provided
Browse files Browse the repository at this point in the history
  • Loading branch information
michal-harish committed Sep 7, 2016
1 parent 3b987ee commit de5372a
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,21 +97,21 @@ public RecordWriter<MsgMetadataWritable, BytesWritable> getRecordWriter(TaskAtte
final SimpleDateFormat timeFormat = new SimpleDateFormat(pathFormat);
timeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
final DecimalFormat offsetFormat = new DecimalFormat("0000000000000000000");
final boolean hasTimeExtractor = HadoopJobMapper.isTimestampExtractorConfigured(conf);
final boolean hasTS = HadoopJobMapper.isTimestampExtractorConfigured(conf);

return new RecordWriter<MsgMetadataWritable, BytesWritable>() {
TreeMap<String, RecordWriter<Void, BytesWritable>> recordWriters = new TreeMap<>();

Path prefixPath = ((FileOutputCommitter) getOutputCommitter(taskContext)).getWorkPath();

public void write(MsgMetadataWritable key, BytesWritable value) throws IOException {
if (hasTimeExtractor && key.getTimestamp() == null) {
if (hasTS && key.getTimestamp() == null) {
//extractor didn't wish to throw exception so skipping this record
return;
}
String P = String.valueOf(key.getSplit().getPartition());
String T = key.getSplit().getTopic();
String suffixPath = hasTimeExtractor ? timeFormat.format(key.getTimestamp()) : pathFormat;
String suffixPath = hasTS ? timeFormat.format(key.getTimestamp()) : pathFormat.replaceAll("'", "");
suffixPath = suffixPath.replace("{T}", T);
suffixPath = suffixPath.replace("{P}", P);
suffixPath += "/" + T + "-"+ P + "-" + offsetFormat.format(key.getSplit().getStartOffset());
Expand Down
40 changes: 18 additions & 22 deletions src/test/java/io/amient/kafka/hadoop/SystemTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.notification.Failure;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -144,7 +146,8 @@ public void tearDown() throws Exception {
}


private Path runSimpleJob(String topic, String testOutputDir) throws InterruptedException, IOException, ClassNotFoundException {
private Path runSimpleJob(String topic, String testOutputDir)
throws InterruptedException, IOException, ClassNotFoundException {

//run hadoop loader job
Path outDir = new Path(new File(dfsBaseDir, testOutputDir).toString());
Expand All @@ -165,7 +168,7 @@ private Path runSimpleJob(String topic, String testOutputDir) throws Interrupted
MultiOutputFormat.setCompressOutput(job, false);

job.waitForCompletion(true);
assertTrue(job.isSuccessful());
if (!job.isSuccessful()) throw new Error("job failed - see logs for details");

fs.copyToLocalFile(outDir, outDir);
return outDir;
Expand Down Expand Up @@ -222,32 +225,13 @@ public void canUseTimestampInPartitions() throws IOException, ClassNotFoundExcep
//testing a null message - with timestamp extractor this means skip message
simpleProducer.send(new KeyedMessage<>("topic02", "1", (String)null));

//run the job
Path outDir = new Path(new File(dfsBaseDir, "canUseTimestampInPartitions").toString());
localFileSystem.delete(outDir, true);

//configure inputs, timestamp extractor and the output path format
KafkaInputFormat.configureKafkaTopics(conf, "topic02");
KafkaInputFormat.configureZkConnection(conf, zkConnect);
HadoopJobMapper.configureTimestampExtractor(conf, MyJsonTimestampExtractor.class.getName());
MultiOutputFormat.configurePathFormat(conf, "'t={T}/d='yyyy-MM-dd'/h='HH");

Job job = Job.getInstance(conf, "kafka.hadoop.loader");
job.setNumReduceTasks(0);

job.setInputFormatClass(KafkaInputFormat.class);
job.setMapperClass(HadoopJobMapper.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(MultiOutputFormat.class);

MultiOutputFormat.setOutputPath(job, outDir);
MultiOutputFormat.setCompressOutput(job, false);

job.waitForCompletion(true);

assertTrue(job.isSuccessful());

fs.copyToLocalFile(outDir, outDir);
Path outDir = runSimpleJob("topic02", "canUseTimestampInPartitions");

Path h18 = new Path(outDir, "t=topic02/d=2014-06-16/h=18/topic02-1-0000000000000000000");
assertTrue(localFileSystem.exists(h18));
Expand All @@ -260,7 +244,19 @@ public void canUseTimestampInPartitions() throws IOException, ClassNotFoundExcep
Path h20 = new Path(outDir, "t=topic02/d=2014-06-16/h=20/topic02-1-0000000000000000000");
assertTrue(localFileSystem.exists(h20));
assertEquals(String.format("%s%n", message6), readFullyAsString(h20, 100));
}

@Test(expected=java.lang.Error.class)
public void failsNormallyWithInvalidInput() throws IOException, ClassNotFoundException, InterruptedException {
//configure inputs, timestamp extractor and the output path format
KafkaInputFormat.configureKafkaTopics(conf, "topic02");
KafkaInputFormat.configureZkConnection(conf, zkConnect);
HadoopJobMapper.configureTimestampExtractor(conf, MyJsonTimestampExtractor.class.getName());
MultiOutputFormat.configurePathFormat(conf, "'t={T}/d='yyyy-MM-dd'/h='HH");

//produce and run
simpleProducer.send(new KeyedMessage<>("topic02", "1", "{invalid-json-should-fail-in-extractor"));
runSimpleJob("topic02", "failsNormallyWithInvalidInput");
}

private String readFullyAsString(Path file, int maxSize) throws IOException {
Expand Down

0 comments on commit de5372a

Please sign in to comment.