diff --git a/camus-etl-kafka/pom.xml b/camus-etl-kafka/pom.xml
index eec17c27e..a34b803d0 100644
--- a/camus-etl-kafka/pom.xml
+++ b/camus-etl-kafka/pom.xml
@@ -28,6 +28,11 @@
org.apache.avro
avro-mapred
+
+ com.twitter
+ parquet-avro
+ 1.5.0
+
org.apache.kafka
kafka_2.10
diff --git a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/ParquetRecordWriterProvider.java b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/ParquetRecordWriterProvider.java
new file mode 100644
index 000000000..4a852a03e
--- /dev/null
+++ b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/ParquetRecordWriterProvider.java
@@ -0,0 +1,73 @@
+package com.linkedin.camus.etl.kafka.common;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import com.linkedin.camus.coders.CamusWrapper;
+import com.linkedin.camus.etl.IEtlKey;
+import com.linkedin.camus.etl.RecordWriterProvider;
+import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat;
+
+import parquet.avro.AvroParquetWriter;
+import parquet.hadoop.ParquetWriter;
+import parquet.hadoop.metadata.CompressionCodecName;
+
+/**
+ * Provides a RecordWriter that uses AvroParquetWriter to write
+ * Parquet records to HDFS. Compression settings are controlled via ETL_OUTPUT_CODEC
+ * Supports Snappy & Gzip compression codecs.
+ *
+ */
+public class ParquetRecordWriterProvider implements RecordWriterProvider {
+ public final static String EXT = ".parquet";
+
+ public ParquetRecordWriterProvider(TaskAttemptContext context) {
+ }
+
+ @Override
+ public String getFilenameExtension() {
+ return EXT;
+ }
+
+ @Override
+ public RecordWriter getDataRecordWriter(TaskAttemptContext context, String fileName,
+ CamusWrapper data, FileOutputCommitter committer) throws IOException, InterruptedException {
+
+ CompressionCodecName compressionCodecName = null;
+ int blockSize = 256 * 1024 * 1024;
+ int pageSize = 64 * 1024;
+
+ if (FileOutputFormat.getCompressOutput(context)) {
+ if ("snappy".equals(EtlMultiOutputFormat.getEtlOutputCodec(context))) {
+ compressionCodecName = CompressionCodecName.SNAPPY;
+ } else {
+ compressionCodecName = CompressionCodecName.GZIP;
+ }
+ }
+
+ Path path = committer.getWorkPath();
+ path = new Path(path, EtlMultiOutputFormat.getUniqueFile(context, fileName, EXT));
+ Schema avroSchema = ((GenericRecord) data.getRecord()).getSchema();
+ final ParquetWriter parquetWriter = new AvroParquetWriter(path, avroSchema, compressionCodecName, blockSize,
+ pageSize);
+
+ return new RecordWriter() {
+ @Override
+ public void write(IEtlKey ignore, CamusWrapper data) throws IOException {
+ parquetWriter.write(data.getRecord());
+ }
+
+ @Override
+ public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
+ parquetWriter.close();
+ }
+ };
+ }
+}