Skip to content

Commit

Permalink
Merge pull request #43 from sakama/fix-incremental-load
Browse files Browse the repository at this point in the history
Fix for #34 last_record generating logic output empty last_record when no record is imported
  • Loading branch information
kamatama41 authored May 8, 2018
2 parents b2c7458 + 19dd6d5 commit 6e63c34
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 5 deletions.
19 changes: 14 additions & 5 deletions src/main/java/org/embulk/input/mongodb/MongodbInputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,13 @@ public TaskReport run(TaskSource taskSource,
}

pageBuilder.finish();
return updateTaskReport(Exec.newTaskReport(), valueCodec, task);
}

TaskReport report = Exec.newTaskReport();

if (valueCodec.getLastRecord() != null) {
DataSource lastRecord = new DataSourceImpl(Exec.getInjector().getInstance(ModelManager.class));
private TaskReport updateTaskReport(TaskReport report, ValueCodec valueCodec, PluginTask task)
{
DataSource lastRecord = new DataSourceImpl(Exec.getInjector().getInstance(ModelManager.class));
if (valueCodec.getLastRecord() != null && valueCodec.getProcessedRecordCount() > 0) {
for (String k : valueCodec.getLastRecord().keySet()) {
String value = valueCodec.getLastRecord().get(k).toString();
Map<String, String> types = valueCodec.getLastRecordType();
Expand Down Expand Up @@ -199,8 +201,15 @@ public TaskReport run(TaskSource taskSource,
lastRecord.set(k, value);
}
}
report.setNested("last_record", lastRecord);
}
else if (task.getIncrementalField().isPresent() && task.getLastRecord().isPresent()) {
for (String field : task.getIncrementalField().get()) {
if (task.getLastRecord().get().containsKey(field)) {
lastRecord.set(field, task.getLastRecord().get().get(field));
}
}
}
report.setNested("last_record", lastRecord);
return report;
}

Expand Down
7 changes: 7 additions & 0 deletions src/main/java/org/embulk/input/mongodb/ValueCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class ValueCodec implements Codec<Value>
private final PluginTask task;
private final Optional<List<String>> incrementalField;
private Map<String, Object> lastRecord;
private long processedRecordCount = 0;
private Map<String, String> lastRecordType;

public ValueCodec(boolean stopOnInvalidRecord, PluginTask task)
Expand Down Expand Up @@ -84,6 +85,7 @@ public Value decode(final BsonReader reader, final DecoderContext decoderContext
log.warn(String.format("Skipped document because field '%s' contains unsupported object type [%s]",
fieldName, type));
}
this.processedRecordCount++;
}
reader.readEndDocument();

Expand Down Expand Up @@ -166,6 +168,11 @@ public Map<String, Object> getLastRecord()
return this.lastRecord;
}

public Long getProcessedRecordCount()
{
return this.processedRecordCount;
}

public Map<String, String> getLastRecordType()
{
return this.lastRecordType;
Expand Down
29 changes: 29 additions & 0 deletions src/test/java/org/embulk/input/mongodb/TestMongodbInputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,35 @@ public void testRunWithLimitIncrementalLoad() throws Exception
assertEquals("true", lastRecord.get(String.class, "boolean_field"));
}

@Test
public void testRunWithLimitIncrementalLoadWithNoRecord() throws Exception
{
Map<String, Object> previousLastRecord = new HashMap<>();
previousLastRecord.put("int32_field", 1);
previousLastRecord.put("datetime_field", "{$date=2015-01-27T10:23:49.000Z}");
previousLastRecord.put("boolean_field", true);
ConfigSource config = Exec.newConfigSource()
.set("uri", MONGO_URI)
.set("collection", MONGO_COLLECTION)
.set("id_field_name", "int32_field")
.set("query", "{\"double_field\":{\"$gte\": 1.23}}")
.set("incremental_field", Optional.of(Arrays.asList("int32_field", "datetime_field", "boolean_field")))
.set("last_record", previousLastRecord);

PluginTask task = config.loadConfig(PluginTask.class);

dropCollection(task, MONGO_COLLECTION);
createCollection(task, MONGO_COLLECTION);
insertDocument(task, createValidDocuments());

ConfigDiff diff = plugin.transaction(config, new Control());
ConfigDiff lastRecord = diff.getNested("last_record");

assertEquals("1", lastRecord.get(String.class, "int32_field"));
assertEquals("{$date=2015-01-27T10:23:49.000Z}", lastRecord.get(String.class, "datetime_field"));
assertEquals("true", lastRecord.get(String.class, "boolean_field"));
}

@Test(expected = ConfigException.class)
public void testRunWithIncrementalLoadUnsupportedType() throws Exception
{
Expand Down

0 comments on commit 6e63c34

Please sign in to comment.