Skip to content

Commit

Permalink
#8 initial naive approach to transformation
Browse files Browse the repository at this point in the history
  • Loading branch information
michal-harish committed Sep 7, 2016
1 parent f7fcc8f commit 61d14c6
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 56 deletions.
63 changes: 32 additions & 31 deletions src/main/java/io/amient/kafka/hadoop/HadoopJobMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

package io.amient.kafka.hadoop;

import io.amient.kafka.hadoop.api.Extractor;
import io.amient.kafka.hadoop.api.TimestampExtractor;
import io.amient.kafka.hadoop.api.Transformation;
import io.amient.kafka.hadoop.io.MsgMetadataWritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
Expand All @@ -33,46 +35,39 @@ public class HadoopJobMapper extends Mapper<MsgMetadataWritable, BytesWritable,

static Logger log = LoggerFactory.getLogger(HadoopJobMapper.class);

// private static final String CONFIG_SERDE_CLASS = "mapper.serde.class";
private static final String CONFIG_TIMESTAMP_EXTRACTOR_CLASS = "mapper.timestamp.extractor.class";
private static final String CONFIG_EXTRACTOR_CLASS = "mapper.timestamp.extractor.class";

private TimestampExtractor extractor;
// private Serde serde = null;
//TODO #8 instead of serde make the OUTVAL generic and configure Deserializer kafkaDeserializer;
//TODO #8 it should be possible to use different output format, e.g. ParquetOutputFormat in combination with deser.
private Extractor extractor;


// public interface Serde {
// public BytesWritable map(BytesWritable value) throws IOException;
// }


// public static void configureSerde(Configuration conf, String className) {
// conf.set(CONFIG_SERDE_CLASS, className);
// }

public static void configureTimestampExtractor(Configuration conf, String className) {
conf.set(CONFIG_TIMESTAMP_EXTRACTOR_CLASS, className);
/**
* Provide a timestamp extractor
* @param conf
* @param cls class implementing the TimestampExtractor interface
*/
public static void configureExtractor(Configuration conf, Class<? extends Extractor> cls) {
conf.set(CONFIG_EXTRACTOR_CLASS, cls.getName());
}

public static boolean isTimestampExtractorConfigured(Configuration conf) {
return !conf.get(CONFIG_TIMESTAMP_EXTRACTOR_CLASS, "").equals("");
public static boolean isTimestampExtractorConfigured(Configuration conf) throws IOException {
String extractorClassName = conf.get(CONFIG_EXTRACTOR_CLASS, null);
if (extractorClassName == null) return false; else {
try {
return TimestampExtractor.class.isAssignableFrom(Class.forName(extractorClassName));
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
}
}


@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
try {
// Class<?> serdeClass = conf.getClass(CONFIG_SERDE_CLASS, null);
// if (serdeClass != null) {
// serde = serdeClass.asSubclass(Serde.class).newInstance();
// log.info("Using Serde " + extractor);
// }
Class<?> extractorClass = conf.getClass(CONFIG_TIMESTAMP_EXTRACTOR_CLASS, null);
Class<?> extractorClass = conf.getClass(CONFIG_EXTRACTOR_CLASS, null);
if (extractorClass != null) {
extractor = extractorClass.asSubclass(TimestampExtractor.class).newInstance();
log.info("Using timestamp extractor " + extractor);
extractor = extractorClass.asSubclass(Extractor.class).newInstance();
log.info("Using extractor " + extractor);
}

} catch (Exception e) {
Expand All @@ -86,11 +81,17 @@ public void map(MsgMetadataWritable key, BytesWritable value, Context context) t
try {
if (key != null) {
MsgMetadataWritable outputKey = key;
BytesWritable outputValue = value;
if (extractor != null) {
Long timestamp = extractor.extract(key, value);
outputKey = new MsgMetadataWritable(key, timestamp);
Object any = extractor.deserialize(key, value);
if (extractor instanceof TimestampExtractor) {
Long timestamp = ((TimestampExtractor)extractor).extractTimestamp(any);
outputKey = new MsgMetadataWritable(key, timestamp);
}
if (extractor instanceof Transformation) {
outputValue = ((Transformation)extractor).transform(any);
}
}
BytesWritable outputValue = value; //(serde == null) ? value : serde.map(value);
context.write(outputKey, outputValue);
}
} catch (InterruptedException e) {
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/io/amient/kafka/hadoop/api/Extractor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.amient.kafka.hadoop.api;

import io.amient.kafka.hadoop.io.MsgMetadataWritable;
import org.apache.hadoop.io.BytesWritable;

import java.io.IOException;

public interface Extractor {

/**
* TODO this could be used directly with configured Deserializer kafkaDeserializer;
* @param key metadata associated with the message
* @param value message payload byte buffer
* @return deserialized object that can be processed for extraction of values
* @throws IOException
*/
Object deserialize(MsgMetadataWritable key, BytesWritable value) throws IOException;

}
10 changes: 3 additions & 7 deletions src/main/java/io/amient/kafka/hadoop/api/TimestampExtractor.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,21 @@

package io.amient.kafka.hadoop.api;

import io.amient.kafka.hadoop.io.MsgMetadataWritable;
import org.apache.hadoop.io.BytesWritable;

import java.io.IOException;

public interface TimestampExtractor {
public interface TimestampExtractor extends Extractor {

/**
*
* Note: value.getBytes() returns a buffer which may be reused by the calling
* code and its length may be bigger than the current value so getLength()
* has to be used as the limit.
*
* @param key metadata associated with the message
* @param value message payload byte buffer
* @param any deserialized object which contains the timestamp
* @return timestamp utc in millisecond or null if no value could be extracted
* without exception.
* @throws IOException
*/
Long extract(MsgMetadataWritable key, BytesWritable value) throws IOException;
Long extractTimestamp(Object any) throws IOException;
}

31 changes: 31 additions & 0 deletions src/main/java/io/amient/kafka/hadoop/api/Transformation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2014 Michal Harish, [email protected]
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.amient.kafka.hadoop.api;

import org.apache.hadoop.io.BytesWritable;

import java.io.IOException;

public interface Transformation {

//TODO #8 besides transformation make also the OUTVAL generic
BytesWritable transform(Object any) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.Iterator;
Expand All @@ -60,7 +59,6 @@ public static void configurePathFormat(Configuration conf, String format) {
conf.set(CONFIG_PATH_FORMAT, format);
}


public void checkOutputSpecs(JobContext job) throws IOException {
// Ensure that the output directory is set and not already there
Path outDir = getOutputPath(job);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void failsNormallyWithInvalidInput() throws IOException, ClassNotFoundExc
//configure inputs, timestamp extractor and the output path format
KafkaInputFormat.configureKafkaTopics(conf, "topic02");
KafkaInputFormat.configureZkConnection(conf, zkConnect);
HadoopJobMapper.configureTimestampExtractor(conf, MyJsonTimestampExtractor.class.getName());
HadoopJobMapper.configureExtractor(conf, MyJsonTimestampExtractor.class);
MultiOutputFormat.configurePathFormat(conf, "'t={T}/d='yyyy-MM-dd'/h='HH");

//produce and run
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void canUseTimestampInPartitions() throws IOException, ClassNotFoundExcep
//configure inputs, timestamp extractor and the output path format
KafkaInputFormat.configureKafkaTopics(conf, "topic02");
KafkaInputFormat.configureZkConnection(conf, zkConnect);
HadoopJobMapper.configureTimestampExtractor(conf, MyJsonTimestampExtractor.class.getName());
HadoopJobMapper.configureExtractor(conf, MyJsonTimestampExtractor.class);
MultiOutputFormat.configurePathFormat(conf, "'t={T}/d='yyyy-MM-dd'/h='HH");

Path outDir = runSimpleJob("topic02", "canUseTimestampInPartitions");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void testTextTimeExtractor() throws IOException {
mapDriver.withInput(inputKey, new BytesWritable(data.getBytes()));

MyTextTimestampExtractor extractor = new MyTextTimestampExtractor();
HadoopJobMapper.configureTimestampExtractor(mapDriver.getConfiguration(), extractor.getClass().getName());
HadoopJobMapper.configureExtractor(mapDriver.getConfiguration(), extractor.getClass());

final List<Pair<MsgMetadataWritable, BytesWritable>> result = mapDriver.run();
MsgMetadataWritable metadata = result.get(0).getFirst();
Expand All @@ -51,7 +51,7 @@ public void testJsonTimeExtractor() throws IOException {
mapDriver.withInput(inputKey, new BytesWritable(data.getBytes()));

TimestampExtractor extractor = new MyJsonTimestampExtractor();
HadoopJobMapper.configureTimestampExtractor(mapDriver.getConfiguration(), extractor.getClass().getName());
HadoopJobMapper.configureExtractor(mapDriver.getConfiguration(), extractor.getClass());

final List<Pair<MsgMetadataWritable, BytesWritable>> result = mapDriver.run();
assertEquals(new Long(1402944501425L), result.get(0).getFirst().getTimestamp());
Expand Down
96 changes: 96 additions & 0 deletions src/test/java/io/amient/kafka/hadoop/TransformationSystemTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package io.amient.kafka.hadoop;

import io.amient.kafka.hadoop.api.TimestampExtractor;
import io.amient.kafka.hadoop.api.Transformation;
import io.amient.kafka.hadoop.io.KafkaInputFormat;
import io.amient.kafka.hadoop.io.MsgMetadataWritable;
import io.amient.kafka.hadoop.io.MultiOutputFormat;
import io.amient.kafka.hadoop.testutils.SystemTestBase;
import kafka.producer.KeyedMessage;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ObjectNode;
import org.junit.Test;

import java.io.IOException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class TransformationSystemTest extends SystemTestBase {


static public class TSVToJson implements Transformation, TimestampExtractor {
private ObjectMapper mapper = new ObjectMapper();

@Override
public Object deserialize(MsgMetadataWritable key, BytesWritable value) throws IOException {
String[] tsv = new String(value.getBytes(), 0, value.getLength(), "UTF-8").split("\t");
ObjectNode json = mapper.createObjectNode();
json.put("category", tsv[0]);
json.put("quantity", Double.parseDouble(tsv[1]));
json.put("timestamp", Long.parseLong(tsv[2]));
return json;
}

@Override
public Long extractTimestamp(Object any) throws IOException {
return ((JsonNode) any).get("timestamp").getLongValue();
}

@Override
public BytesWritable transform(Object any) throws IOException {
JsonNode json = (JsonNode) any;
return new BytesWritable(mapper.writeValueAsBytes(json));
}

}

/**
* this test uses a custom schema transformation. the tsv file has
* a `known` field mapping provided by the transformation implementation.
*
* @throws Exception
*/
@Test
public void canTransformTSVInputToJSON() throws Exception {
//produce text data
simpleProducer.send(new KeyedMessage<>("topic01", "key1", "A\t1.0\t1473246194481"));
simpleProducer.send(new KeyedMessage<>("topic01", "key2", "B\t1.5\t1473246214844"));
simpleProducer.send(new KeyedMessage<>("topic01", "key1", "C\t2.0\t1473246220528"));

//configure inputs, timestamp extractor and the output path format
KafkaInputFormat.configureKafkaTopics(conf, "topic01");
KafkaInputFormat.configureZkConnection(conf, zkConnect);
HadoopJobMapper.configureExtractor(conf, TSVToJson.class);
MultiOutputFormat.configurePathFormat(conf, "'{T}'");

//run the first job
Path outDir = runSimpleJob("topic01", "canTransformTSVInputToJSON");

Path output1 = new Path(outDir, "topic01/topic01-0-0000000000000000000");
assertTrue(localFileSystem.exists(output1));
String json1 = readFullyAsString(output1, 1000);
assertEquals("{\"category\":\"A\",\"quantity\":1.0,\"timestamp\":1473246194481}\n" +
"{\"category\":\"C\",\"quantity\":2.0,\"timestamp\":1473246220528}\n", json1);

Path output2 = new Path(outDir, "topic01/topic01-1-0000000000000000000");
assertTrue(localFileSystem.exists(output2));
String json2 = readFullyAsString(output2, 1000);
assertEquals("{\"category\":\"B\",\"quantity\":1.5,\"timestamp\":1473246214844}\n", json2);

}

// /**
// * this test uses a custom schema transformation. the tsv file has
// * a pre-defined field mapping provided by the
// * @throws Exception
// */
// @Test
// public void canTransfromJsonInputToParquet() throws Exception {
// //TODO
// }

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,19 @@ public class MyJsonTimestampExtractor implements TimestampExtractor {
ObjectMapper jsonMapper = new ObjectMapper();

@Override
public Long extract(MsgMetadataWritable key, BytesWritable value) throws IOException {
public Object deserialize(MsgMetadataWritable key, BytesWritable value) throws IOException {
if (value.getLength() > 0) {
JsonNode json = jsonMapper.readValue(value.getBytes(), 0, value.getLength(), JsonNode.class);
if (json.has("timestamp")) {
return json.get("timestamp").getLongValue();
}
return jsonMapper.readValue(value.getBytes(), 0, value.getLength(), JsonNode.class);
}
return null;
}

@Override
public Long extractTimestamp(Object any) throws IOException {
JsonNode json = (JsonNode) any;
if (json != null && json.has("timestamp")) {
return json.get("timestamp").getLongValue();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,27 @@
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;

public class MyTextTimestampExtractor implements TimestampExtractor {
SimpleDateFormat parser = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public String format(long timestamp) {
return parser.format(timestamp);
}

@Override
public Long extract(MsgMetadataWritable key, BytesWritable value) throws IOException {
public Object deserialize(MsgMetadataWritable key, BytesWritable value) throws IOException {
return new String(Arrays.copyOfRange(value.getBytes(), 0, 19));
}

@Override
public Long extractTimestamp(Object any) throws IOException {
try {
String leadString = new String(Arrays.copyOfRange(value.getBytes(), 0, 19));
return parser.parse(leadString).getTime();
Date date = parser.parse((String)any);
return date.getTime();
} catch (ParseException e) {
return null;
}
}

public String format(long timestamp) {
return parser.format(timestamp);
}
}

0 comments on commit 61d14c6

Please sign in to comment.