-
Notifications
You must be signed in to change notification settings - Fork 602
Treasury Yield Example
This sample can be run as a MapReduce job from the root directory of this project with ./gradlew historicalYield
. If you have just checked out the repository, you will need to run ./gradlew jar
first.
Source code is in examples/treasury_yield
. The data used in this example can be found in
examples/treasury_yield/src/main/resources/yield_historical_in.json
. After this JSON data is loaded into MongoDB, we should have a test MongoDB collection with documents that look like this:
{
"_id": ISODate("1990-01-25T19:00:00-0500"),
"dayOfWeek": "FRIDAY", "bc3Year": 8.38,
"bc10Year": 8.49,
…
}
The goal is to find the average of the bc10Year
field across each year that exists in the data set (see above document for an example).
First we define a Mapper, which is
executed against each document in the collection. We extract the year from the _id
field and use it as the output key, along with the
value we want to use for averaging, bc10Year
.
public class TreasuryYieldMapper extends Mapper<Object, BSONObject, IntWritable, DoubleWritable> {
@Override
public void map( final Object pKey, final BSONObject pValue, final Context pContext ) throws IOException, InterruptedException{
final int year = ((Date)pValue.get("_id")).getYear() + 1900;
double bid10Year = ( (Number) pValue.get( "bc10Year" ) ).doubleValue();
pContext.write( new IntWritable( year ), new DoubleWritable( bid10Year ) );
}
}
Then we write a Reducer, a function which takes the values collected for each key (the year) and performs some aggregate computation of them to get a result.
public class TreasuryYieldReducer
extends Reducer<IntWritable, DoubleWritable, IntWritable, BSONWritable> {
@Override
public void reduce( final IntWritable pKey, final Iterable<DoubleWritable> pValues, final Context pContext )
throws IOException, InterruptedException{
int count = 0;
double sum = 0;
for ( final DoubleWritable value : pValues ){
sum += value.get();
count++;
}
final double avg = sum / count;
BasicBSONObject output = new BasicBSONObject();
output.put("avg", avg);
pContext.write( pKey, new BSONWritable( output ) );
}
}
We can also easily accomplish the same task with just a few lines of Pig script. We also use some external UDFs provided by the Amazon Piggybank jar: http://aws.amazon.com/code/Elastic-MapReduce/2730
-- Provides UnixToISO UDF, provided with pig distribution
REGISTER /contrib/piggybank/java/piggybank.jar
-- UDFs used for date parsing
REGISTER /tmp/piggybank-0.3-amzn.jar
-- MongoDB Java driver
REGISTER /tmp/mongo-2.10.1.jar;
-- Core Mongo-Hadoop Library
REGISTER ../core/target/mongo-hadoop-core_1.0.3-1.1.0-SNAPSHOT.jar
-- mongo-hadoop pig support
REGISTER ../pig/target/mongo-hadoop-pig_1.0.3-1.1.0-SNAPSHOT.jar
raw = LOAD 'mongodb://localhost:27017/demo.yield_historical.in' using com.mongodb.hadoop.pig.MongoLoader;
DEFINE UnixToISO org.apache.pig.piggybank.evaluation.datetime.convert.UnixToISO();
DEFINE EXTRACT org.apache.pig.piggybank.evaluation.string.EXTRACT();
date_tenyear = foreach raw generate UnixToISO($0#'_id'), $0#'bc10Year';
parsed_year = foreach date_tenyear generate
FLATTEN(EXTRACT($0, '(\\d{4})')) AS year, (double)$1 as bc;
by_year = GROUP parsed_year BY (chararray)year;
year_10yearavg = FOREACH by_year GENERATE group, AVG(parsed_year.bc) as tenyear_avg;
-- Args to MongoInsertStorage are: schema for output doc, field to use as '_id'.
STORE year_10yearavg
INTO 'mongodb://localhost:27017/demo.asfkjabfa'
USING
com.mongodb.hadoop.pig.MongoInsertStorage('group:chararray,tenyear_avg:float', 'group');