Skip to content
This repository has been archived by the owner on Jan 29, 2022. It is now read-only.

Treasury Yield Example

Luke Lovett edited this page May 4, 2015 · 4 revisions

Treasury Yield

This sample can be run as a MapReduce job from the root directory of this project with ./gradlew historicalYield. If you have just checked out the repository, you will need to run ./gradlew jar first.

Source code is in examples/treasury_yield. The data used in this example can be found in examples/treasury_yield/src/main/resources/yield_historical_in.json. After this JSON data is loaded into MongoDB, we should have a test MongoDB collection with documents that look like this:

{ 
  "_id": ISODate("1990-01-25T19:00:00-0500"), 
  "dayOfWeek": "FRIDAY", "bc3Year": 8.38,
  "bc10Year": 8.49,
  …
}

Goal

The goal is to find the average of the bc10Year field across each year that exists in the data set (see above document for an example).

MapReduce with Java

First we define a Mapper, which is executed against each document in the collection. We extract the year from the _id field and use it as the output key, along with the value we want to use for averaging, bc10Year.

public class TreasuryYieldMapper extends Mapper<Object, BSONObject, IntWritable, DoubleWritable> {
    @Override
    public void map( final Object pKey, final BSONObject pValue, final Context pContext ) throws IOException, InterruptedException{
        final int year = ((Date)pValue.get("_id")).getYear() + 1900;
        double bid10Year = ( (Number) pValue.get( "bc10Year" ) ).doubleValue();
        pContext.write( new IntWritable( year ), new DoubleWritable( bid10Year ) );
    }
}

Then we write a Reducer, a function which takes the values collected for each key (the year) and performs some aggregate computation of them to get a result.

public class TreasuryYieldReducer
        extends Reducer<IntWritable, DoubleWritable, IntWritable, BSONWritable> {
    @Override
    public void reduce( final IntWritable pKey, final Iterable<DoubleWritable> pValues, final Context pContext )
            throws IOException, InterruptedException{
        int count = 0;
        double sum = 0;
        for ( final DoubleWritable value : pValues ){
            sum += value.get();
            count++;
        }

        final double avg = sum / count;
    
        BasicBSONObject output = new BasicBSONObject();
        output.put("avg", avg);
        pContext.write( pKey, new BSONWritable( output ) );
    }   
}

Pig

We can also easily accomplish the same task with just a few lines of Pig script. We also use some external UDFs provided by the Amazon Piggybank jar: http://aws.amazon.com/code/Elastic-MapReduce/2730

-- Provides UnixToISO UDF, provided with pig distribution
REGISTER /contrib/piggybank/java/piggybank.jar
-- UDFs used for date parsing
REGISTER /tmp/piggybank-0.3-amzn.jar
-- MongoDB Java driver
REGISTER  /tmp/mongo-2.10.1.jar;
-- Core Mongo-Hadoop Library
REGISTER ../core/target/mongo-hadoop-core_1.0.3-1.1.0-SNAPSHOT.jar
-- mongo-hadoop pig support
REGISTER ../pig/target/mongo-hadoop-pig_1.0.3-1.1.0-SNAPSHOT.jar

raw = LOAD 'mongodb://localhost:27017/demo.yield_historical.in' using com.mongodb.hadoop.pig.MongoLoader; 
DEFINE UnixToISO org.apache.pig.piggybank.evaluation.datetime.convert.UnixToISO();
DEFINE EXTRACT org.apache.pig.piggybank.evaluation.string.EXTRACT();

date_tenyear = foreach raw generate UnixToISO($0#'_id'), $0#'bc10Year';
parsed_year = foreach date_tenyear generate 
    FLATTEN(EXTRACT($0, '(\\d{4})')) AS year, (double)$1 as bc;

by_year = GROUP parsed_year BY (chararray)year;
year_10yearavg = FOREACH by_year GENERATE group, AVG(parsed_year.bc) as tenyear_avg;

-- Args to MongoInsertStorage are: schema for output doc, field to use as '_id'.
STORE year_10yearavg 
 INTO 'mongodb://localhost:27017/demo.asfkjabfa' 
 USING      
 com.mongodb.hadoop.pig.MongoInsertStorage('group:chararray,tenyear_avg:float', 'group');
Clone this wiki locally