Skip to content
This repository has been archived by the owner on Oct 30, 2020. It is now read-only.

Commit

Permalink
committing change to partitioner as well as other changes needed to b…
Browse files Browse the repository at this point in the history
…uild
  • Loading branch information
kgoodhop committed Feb 1, 2014
1 parent 8eb5618 commit 637a92f
Show file tree
Hide file tree
Showing 15 changed files with 1,524 additions and 62 deletions.
4 changes: 0 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,3 @@ camus-api/target/
camus-etl-kafka/target/
camus-example/target/
camus-schema-registry/target/

# auto-gen'd avro records
camus-etl-kafka/src/main/java/com/linkedin/camus/events/records
camus-example/src/main/java/com/linkedin/camus/example/records
1 change: 1 addition & 0 deletions camus-etl-kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ dependencies {
compile(project(":camus-external:camus-api"))
compile(project(":camus-external:camus-schema-registry"))
compile(project(":camus-external:camus-schema-registry-avro"))
compile(project(":camus-external:camus-kafka-coders"))

compile spec.external.log4j
compile spec.external.avro17
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,43 +124,41 @@ public static void setTime(String name) {
(timingMap.get(name) == null ? 0 : timingMap.get(name))
+ System.currentTimeMillis());
}

private Job createJob(Properties props) throws IOException {

if(getConf() == null)
{
Configuration conf = new Configuration();
for(Object key : props.keySet())
{
conf.set(key.toString(), props.getProperty(key.toString()));
}
setConf(conf);
}

Job job = new Job(getConf());
job.setJarByClass(CamusJob.class);


// Set the default partitioner
job.getConfiguration().set(
EtlMultiOutputFormat.ETL_DEFAULT_PARTITIONER_CLASS,
"com.linkedin.camus.etl.kafka.coders.DefaultPartitioner");
Job job;
if(getConf() == null)
{
setConf(new Configuration());
}

populateConf(props, getConf());

job = new Job(getConf());
job.setJarByClass(CamusJob.class);

if(job.getConfiguration().get("camus.job.name") != null)
{
job.setJobName(job.getConfiguration().get("camus.job.name"));
}
else
{
job.setJobName("Camus Job");
}

return job;
}

for (Object key : props.keySet()) {
job.getConfiguration().set(key.toString(),
props.getProperty(key.toString()));
}

job.setJobName("Camus Job");
if(job.getConfiguration().get("camus.job.name") != null)
{
job.setJobName(job.getConfiguration().get("camus.job.name"));
}

public static void populateConf(Properties props, Configuration conf) throws IOException {

FileSystem fs = FileSystem.get(job.getConfiguration());
for(Object key : props.keySet())
{
conf.set(key.toString(), props.getProperty(key.toString()));
}

FileSystem fs = FileSystem.get(conf);

String hadoopCacheJarDir = job.getConfiguration().get(
String hadoopCacheJarDir = conf.get(
"hdfs.default.classpath.dir", null);
if (hadoopCacheJarDir != null) {
FileStatus[] status = fs.listStatus(new Path(hadoopCacheJarDir));
Expand All @@ -173,7 +171,7 @@ private Job createJob(Properties props) throws IOException {

DistributedCache
.addFileToClassPath(status[i].getPath(),
job.getConfiguration(), fs);
conf, fs);
}
}
} else {
Expand All @@ -183,18 +181,16 @@ private Job createJob(Properties props) throws IOException {
}

// Adds External jars to hadoop classpath
String externalJarList = job.getConfiguration().get(
String externalJarList = conf.get(
"hadoop.external.jarFiles", null);
if (externalJarList != null) {
String[] jarFiles = externalJarList.split(",");
for (String jarFile : jarFiles) {
log.info("Adding external jar File:" + jarFile);
DistributedCache.addFileToClassPath(new Path(jarFile),
job.getConfiguration(), fs);
conf, fs);
}
}

return job;
}

public void run() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@

import com.linkedin.camus.coders.MessageEncoder;
import com.linkedin.camus.etl.kafka.CamusJob;
import com.linkedin.camus.events.records.EventHeader;
import com.linkedin.camus.events.records.Guid;
import com.linkedin.camus.events.records.TrackingMonitoringEvent;
import com.linkedin.camus.events.EventHeader;
import com.linkedin.camus.events.Guid;
import com.linkedin.camus.events.TrackingMonitoringEvent;

@JsonIgnoreProperties({"trackingCount", "lastKey", "eventCount", "RANDOM"})
public class EtlCounts {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -199,29 +200,19 @@ public String getWorkingFileName(JobContext context, EtlKey key) throws IOExcept
Partitioner partitioner = getPartitioner(context, key.getTopic());
return "data." + key.getTopic().replaceAll("\\.", "_") + "." + key.getLeaderId() + "." + key.getPartition() + "." + partitioner.encodePartition(context, key);
}

/* public static Partitioner getDefaultPartitioner(JobContext job) {
if(partitionersByTopic.get(ETL_DEFAULT_PARTITIONER_CLASS) == null) {
//List<Partitioner> partitioners = job.getConfiguration().getInstances(ETL_DEFAULT_PARTITIONER_CLASS, com.linkedin.camus.coders.Partitioner.class);
List<Partitioner> partitioners = new ArrayList<Partitioner>();
partitioners.add(new DefaultPartitioner());
partitionersByTopic.put(ETL_DEFAULT_PARTITIONER_CLASS, partitioners.get(0));
}
return partitionersByTopic.get(ETL_DEFAULT_PARTITIONER_CLASS);
}*/

public static void setDefaultPartitioner(JobContext job, Class<?> cls) {
job.getConfiguration().setClass(ETL_DEFAULT_PARTITIONER_CLASS, cls, Partitioner.class);
}

public static Partitioner getDefaultPartitioner(JobContext job) {
if(partitionersByTopic.get(ETL_DEFAULT_PARTITIONER_CLASS) == null) {
List<Partitioner> partitioners = job.getConfiguration().getInstances(ETL_DEFAULT_PARTITIONER_CLASS, com.linkedin.camus.coders.Partitioner.class);
partitionersByTopic.put(ETL_DEFAULT_PARTITIONER_CLASS, partitioners.get(0));
}
return partitionersByTopic.get(ETL_DEFAULT_PARTITIONER_CLASS);
return ReflectionUtils.newInstance(job.getConfiguration().getClass(ETL_DEFAULT_PARTITIONER_CLASS, DefaultPartitioner.class, Partitioner.class), job.getConfiguration());
}

public static Partitioner getPartitioner(JobContext job, String topicName) throws IOException {
String customPartitionerProperty = ETL_DEFAULT_PARTITIONER_CLASS + "." + topicName;
if(partitionersByTopic.get(customPartitionerProperty) == null) {
List<Partitioner> partitioners = new ArrayList<Partitioner>();//job.getConfiguration().getInstances(customPartitionerProperty, com.linkedin.camus.coders.Partitioner.class);
List<Partitioner> partitioners = new ArrayList<Partitioner>();
if(partitioners.isEmpty()) {
return getDefaultPartitioner(job);
} else {
Expand Down Expand Up @@ -407,7 +398,7 @@ public String getPartitionedPath(JobContext context, String file, int count, lon

return partitionedPath +
"/" + topic + "." + leaderId + "." + partition +
"." + count +
"." + count+
"." + offset +
"." + encodedPartition +
recordWriterProvider.getFilenameExtension();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Autogenerated by Avro
*
* DO NOT EDIT DIRECTLY
*/
package com.linkedin.camus.events;

@SuppressWarnings("all")
/** The basic header for every tracking event. */
public class EventHeader extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema
.parse("{\"type\":\"record\",\"name\":\"EventHeader\",\"namespace\":\"com.linkedin.events\",\"fields\":[{\"name\":\"memberId\",\"type\":\"int\",\"doc\":\"The member id of the user initiating the action\"},{\"name\":\"time\",\"type\":\"long\",\"doc\":\"The time of the event\"},{\"name\":\"server\",\"type\":\"string\",\"doc\":\"The name of the server\"},{\"name\":\"service\",\"type\":\"string\",\"doc\":\"The name of the service\"},{\"name\":\"guid\",\"type\":{\"type\":\"fixed\",\"name\":\"Guid\",\"size\":16},\"doc\":\"A unique identifier for the message\"}]}");
/** The member id of the user initiating the action */
public int memberId;
/** The time of the event */
public long time;
/** The name of the server */
public java.lang.CharSequence server;
/** The name of the service */
public java.lang.CharSequence service;
/** A unique identifier for the message */
public com.linkedin.camus.events.Guid guid;

public org.apache.avro.Schema getSchema() {
return SCHEMA$;
}

// Used by DatumWriter. Applications should not call.
public java.lang.Object get(int field$) {
switch (field$) {
case 0:
return memberId;
case 1:
return time;
case 2:
return server;
case 3:
return service;
case 4:
return guid;
default:
throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}

// Used by DatumReader. Applications should not call.
@SuppressWarnings(value = "unchecked")
public void put(int field$, java.lang.Object value$) {
switch (field$) {
case 0:
memberId = (java.lang.Integer) value$;
break;
case 1:
time = (java.lang.Long) value$;
break;
case 2:
server = (java.lang.CharSequence) value$;
break;
case 3:
service = (java.lang.CharSequence) value$;
break;
case 4:
guid = (com.linkedin.camus.events.Guid) value$;
break;
default:
throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
}
11 changes: 11 additions & 0 deletions camus-etl-kafka/src/main/java/com/linkedin/camus/events/Guid.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/**
* Autogenerated by Avro
*
* DO NOT EDIT DIRECTLY
*/
package com.linkedin.camus.events;

@SuppressWarnings("all")
@org.apache.avro.specific.FixedSize(16)
public class Guid extends org.apache.avro.specific.SpecificFixed {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* Autogenerated by Avro
*
* DO NOT EDIT DIRECTLY
*/
package com.linkedin.camus.events;

@SuppressWarnings("all")
/** An event to monitor the tracking system itself */
public class TrackingMonitoringEvent extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema
.parse("{\"type\":\"record\",\"name\":\"TrackingMonitoringEvent\",\"namespace\":\"com.linkedin.events\",\"fields\":[{\"name\":\"header\",\"type\":{\"type\":\"record\",\"name\":\"EventHeader\",\"fields\":[{\"name\":\"memberId\",\"type\":\"int\",\"doc\":\"The member id of the user initiating the action\"},{\"name\":\"time\",\"type\":\"long\",\"doc\":\"The time of the event\"},{\"name\":\"server\",\"type\":\"string\",\"doc\":\"The name of the server\"},{\"name\":\"service\",\"type\":\"string\",\"doc\":\"The name of the service\"},{\"name\":\"guid\",\"type\":{\"type\":\"fixed\",\"name\":\"Guid\",\"size\":16},\"doc\":\"A unique identifier for the message\"}]}},{\"name\":\"tier\",\"type\":\"string\",\"doc\":\"A name for the tier of servers to which this event belongs (client, broker, etc)\"},{\"name\":\"eventType\",\"type\":\"string\",\"doc\":\"The event type being monitored\"},{\"name\":\"count\",\"type\":\"long\",\"doc\":\"The number of events sent in this time period\"},{\"name\":\"beginTimestamp\",\"type\":\"long\",\"doc\":\"The timestamp at which this count began\"},{\"name\":\"endTimestamp\",\"type\":\"long\",\"doc\":\"The timestamp at which this count ended\"}]}");
public com.linkedin.camus.events.EventHeader header;
/**
* A name for the tier of servers to which this event belongs (client,
* broker, etc)
*/
public java.lang.CharSequence tier;
/** The event type being monitored */
public java.lang.CharSequence eventType;
/** The number of events sent in this time period */
public long count;
/** The timestamp at which this count began */
public long beginTimestamp;
/** The timestamp at which this count ended */
public long endTimestamp;

public org.apache.avro.Schema getSchema() {
return SCHEMA$;
}

// Used by DatumWriter. Applications should not call.
public java.lang.Object get(int field$) {
switch (field$) {
case 0:
return header;
case 1:
return tier;
case 2:
return eventType;
case 3:
return count;
case 4:
return beginTimestamp;
case 5:
return endTimestamp;
default:
throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}

// Used by DatumReader. Applications should not call.
@SuppressWarnings(value = "unchecked")
public void put(int field$, java.lang.Object value$) {
switch (field$) {
case 0:
header = (com.linkedin.camus.events.EventHeader) value$;
break;
case 1:
tier = (java.lang.CharSequence) value$;
break;
case 2:
eventType = (java.lang.CharSequence) value$;
break;
case 3:
count = (java.lang.Long) value$;
break;
case 4:
beginTimestamp = (java.lang.Long) value$;
break;
case 5:
endTimestamp = (java.lang.Long) value$;
break;
default:
throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
}
Loading

0 comments on commit 637a92f

Please sign in to comment.