Skip to content
This repository has been archived by the owner on Oct 30, 2020. It is now read-only.

Added Parquet output writer #249

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions camus-etl-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.linkedin.camus.etl.kafka.common;

import java.io.IOException;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.linkedin.camus.coders.CamusWrapper;
import com.linkedin.camus.etl.IEtlKey;
import com.linkedin.camus.etl.RecordWriterProvider;
import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat;

import parquet.avro.AvroParquetWriter;
import parquet.hadoop.ParquetWriter;
import parquet.hadoop.metadata.CompressionCodecName;

/**
* Provides a RecordWriter that uses AvroParquetWriter to write
* Parquet records to HDFS. Compression settings are controlled via ETL_OUTPUT_CODEC
* Supports Snappy & Gzip compression codecs.
*
*/
public class ParquetRecordWriterProvider implements RecordWriterProvider {
public final static String EXT = ".parquet";

public ParquetRecordWriterProvider(TaskAttemptContext context) {
}

@Override
public String getFilenameExtension() {
return EXT;
}

@Override
public RecordWriter<IEtlKey, CamusWrapper> getDataRecordWriter(TaskAttemptContext context, String fileName,
CamusWrapper data, FileOutputCommitter committer) throws IOException, InterruptedException {

CompressionCodecName compressionCodecName = null;
int blockSize = 256 * 1024 * 1024;
int pageSize = 64 * 1024;

if (FileOutputFormat.getCompressOutput(context)) {
if ("snappy".equals(EtlMultiOutputFormat.getEtlOutputCodec(context))) {
compressionCodecName = CompressionCodecName.SNAPPY;
} else {
compressionCodecName = CompressionCodecName.GZIP;
}
}

Path path = committer.getWorkPath();
path = new Path(path, EtlMultiOutputFormat.getUniqueFile(context, fileName, EXT));
Schema avroSchema = ((GenericRecord) data.getRecord()).getSchema();
final ParquetWriter parquetWriter = new AvroParquetWriter(path, avroSchema, compressionCodecName, blockSize,
pageSize);

return new RecordWriter<IEtlKey, CamusWrapper>() {
@Override
public void write(IEtlKey ignore, CamusWrapper data) throws IOException {
parquetWriter.write(data.getRecord());
}

@Override
public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
parquetWriter.close();
}
};
}
}