Skip to content
This repository has been archived by the owner on Jan 29, 2022. It is now read-only.

Mortar changes for mongo-hadoop connector #102

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public OutputCommitter getOutputCommitter(final TaskAttemptContext context) {
* Get the record writer that points to the output collection.
*/
public RecordWriter<K, V> getRecordWriter(final TaskAttemptContext context) {
return new MongoRecordWriter(MongoConfigUtil.getOutputCollections(context.getConfiguration()), context);
return new MongoRecordWriter(MongoConfigUtil.getOutputCollections(context.getConfiguration()), context, updateKeys, multiUpdate);
}

public MongoOutputFormat() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.mongodb.hadoop.mapred.output.MongoOutputCommitter;
import com.mongodb.hadoop.mapred.output.MongoRecordWriter;
import com.mongodb.hadoop.util.MongoConfigUtil;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCommitter;
Expand All @@ -33,7 +34,17 @@

@SuppressWarnings("deprecation")
public class MongoOutputFormat<K, V> implements OutputFormat<K, V> {

private final String[] updateKeys;
private final boolean multiUpdate;

public MongoOutputFormat() {
this(null, false);
}

public MongoOutputFormat(String[] updateKeys, boolean multiUpdate) {
this.updateKeys = updateKeys;
this.multiUpdate = multiUpdate;
}

public void checkOutputSpecs(final FileSystem ignored, final JobConf job) throws IOException {
Expand All @@ -50,7 +61,7 @@ public OutputCommitter getOutputCommitter(final TaskAttemptContext context) {

public RecordWriter<K, V> getRecordWriter(final FileSystem ignored, final JobConf job, final String name,
final Progressable progress) {
return new MongoRecordWriter<K, V>(MongoConfigUtil.getOutputCollections(job), job);
return new MongoRecordWriter<K, V>(MongoConfigUtil.getOutputCollections(job), job, updateKeys, multiUpdate);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.mongodb.MongoException;
import com.mongodb.hadoop.MongoOutput;
import com.mongodb.hadoop.io.BSONWritable;

import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
Expand All @@ -38,11 +39,21 @@ public class MongoRecordWriter<K, V> implements RecordWriter<K, V> {
private final List<DBCollection> collections;

private final JobConf configuration;


private final String[] updateKeys;
private final boolean multiUpdate;

public MongoRecordWriter(final List<DBCollection> c, final JobConf conf) {
this(c, conf, null, false);
}


public MongoRecordWriter(final List<DBCollection> c, final JobConf conf, String[] updateKeys, boolean multiUpdate) {
collections = c;
configuration = conf;
numberOfHosts = c.size();
this.updateKeys = updateKeys;
this.multiUpdate = multiUpdate;
}


Expand Down Expand Up @@ -75,7 +86,23 @@ public void write(final K key, final V value) throws IOException {

try {
DBCollection dbCollection = getDbCollectionByRoundRobin();
dbCollection.save(o);

if (updateKeys == null) {
dbCollection.save(o);
} else {
// Form the query fields
DBObject query = new BasicDBObject(updateKeys.length);
for (String updateKey : updateKeys) {
query.put(updateKey, o.get(updateKey));
o.removeField(updateKey);
}
// If _id is null remove it, we don't want to override with null _id
if (o.get("_id") == null) {
o.removeField("_id");
}
DBObject set = new BasicDBObject().append("$set", o);
dbCollection.update(query, set, true, multiUpdate);
}
} catch (final MongoException e) {
throw new IOException("can't write to mongo", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public MongoRecordWriter(final List<DBCollection> c, final TaskAttemptContext ct
collections = new ArrayList<DBCollection>(c);
context = ctx;
this.updateKeys = updateKeys;
this.multiUpdate = false;
this.multiUpdate = multi;
this.numberOfHosts = c.size();

//authenticate if necessary - but don't auth twice on same DB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,17 @@ public MongoCollectionSplitter(final Configuration conf) {

protected void init() {
MongoURI inputURI = MongoConfigUtil.getInputURI(conf);
this.inputCollection = MongoConfigUtil.getCollection(inputURI);
DB db = this.inputCollection.getDB();

DB db;
try {
this.inputCollection = MongoConfigUtil.getCollection(inputURI);
db = this.inputCollection.getDB();
} catch (Exception e) {
String message = e.getMessage() + "\n\nMongo connection strings are required to be of the form:\n" +
" mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]]/database.collection";
throw new IllegalStateException(message, e);
}

this.mongo = db.getMongo();
MongoURI authURI = MongoConfigUtil.getAuthURI(conf);
if (authURI != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,20 @@ public List<InputSplit> calculateSplits() throws SplitFailedException {
numChunks++;
}

return createSplitList(numChunks, shardToSplits);
}

/**
* Round robin splits across shards. The splits are going to end up as Map jobs
* processed in the same order as the splits. We want to have continuous map
* jobs be on separate shards so that as you're completing map jobs the work
* is spread evenly across shard machines.
*
* @param numChunks - Number of chunks
* @param shardToSplits - Map of shardName to list of splits on that shard.
*/
protected static List<InputSplit> createSplitList(int numChunks,
Map<String, LinkedList<InputSplit>> shardToSplits) {
final List<InputSplit> splits = new ArrayList<InputSplit>(numChunks);
int splitIndex = 0;
while (splitIndex < numChunks) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.mongodb.hadoop.splitter;

import static org.junit.Assert.assertEquals;

import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.mapreduce.InputSplit;
import org.junit.Test;

import com.mongodb.MongoURI;

@SuppressWarnings("deprecation")
public class ShardChunkMongoSplitterTest {
@Test
public void testCreateSplitList_oneShard() {
int numChunks = 2;
InputSplit split1 = new TestMongoInputSplit(new MongoURI("mongodb://split1"));
InputSplit split2 = new TestMongoInputSplit(new MongoURI("mongodb://split2"));
LinkedList<InputSplit> shardSplits = new LinkedList<InputSplit>(Arrays.asList(split1, split2));

Map<String,LinkedList<InputSplit>> shardToSplits = new HashMap<String, LinkedList<InputSplit>>();
shardToSplits.put("shard1", shardSplits);

List<InputSplit> splits = ShardChunkMongoSplitter.createSplitList(numChunks, shardToSplits);
assertEquals(split1, splits.get(0));
assertEquals(split2, splits.get(1));
}

@Test
public void testCreateSplitList_twoEvenShards() {
int numChunks = 4;
InputSplit split1 = new TestMongoInputSplit(new MongoURI("mongodb://split1"));
InputSplit split2 = new TestMongoInputSplit(new MongoURI("mongodb://split2"));
InputSplit split3 = new TestMongoInputSplit(new MongoURI("mongodb://split3"));
InputSplit split4 = new TestMongoInputSplit(new MongoURI("mongodb://split4"));
LinkedList<InputSplit> shardSplits1 = new LinkedList<InputSplit>(Arrays.asList(split1, split2));
LinkedList<InputSplit> shardSplits2 = new LinkedList<InputSplit>(Arrays.asList(split3, split4));

Map<String,LinkedList<InputSplit>> shardToSplits = new HashMap<String, LinkedList<InputSplit>>();
shardToSplits.put("shard1", shardSplits1);
shardToSplits.put("shard2", shardSplits2);

List<InputSplit> splits = ShardChunkMongoSplitter.createSplitList(numChunks, shardToSplits);
assertEquals(split1, splits.get(0));
assertEquals(split3, splits.get(1));
assertEquals(split2, splits.get(2));
assertEquals(split4, splits.get(3));
}

@Test
public void testCreateSplitList_twoUnevenShards() {
int numChunks = 6;
InputSplit split1 = new TestMongoInputSplit(new MongoURI("mongodb://split1"));
InputSplit split2 = new TestMongoInputSplit(new MongoURI("mongodb://split2"));
InputSplit split3 = new TestMongoInputSplit(new MongoURI("mongodb://split3"));
InputSplit split4 = new TestMongoInputSplit(new MongoURI("mongodb://split4"));
InputSplit split5 = new TestMongoInputSplit(new MongoURI("mongodb://split5"));
InputSplit split6 = new TestMongoInputSplit(new MongoURI("mongodb://split6"));
LinkedList<InputSplit> shardSplits1 = new LinkedList<InputSplit>(Arrays.asList(split1, split2));
LinkedList<InputSplit> shardSplits2 = new LinkedList<InputSplit>(Arrays.asList(split3, split4, split5, split6));

Map<String,LinkedList<InputSplit>> shardToSplits = new HashMap<String, LinkedList<InputSplit>>();
shardToSplits.put("shard1", shardSplits1);
shardToSplits.put("shard2", shardSplits2);

List<InputSplit> splits = ShardChunkMongoSplitter.createSplitList(numChunks, shardToSplits);
assertEquals(split1, splits.get(0));
assertEquals(split3, splits.get(1));
assertEquals(split2, splits.get(2));
assertEquals(split4, splits.get(3));
assertEquals(split5, splits.get(4));
assertEquals(split6, splits.get(5));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.mongodb.hadoop.splitter;

import com.mongodb.MongoURI;
import com.mongodb.hadoop.input.MongoInputSplit;

public class TestMongoInputSplit extends MongoInputSplit {

public TestMongoInputSplit(MongoURI inputURI) {
this.inputURI = inputURI;
}
}
Loading