Skip to content
This repository has been archived by the owner on Aug 29, 2019. It is now read-only.

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
h-matsuo committed Sep 30, 2018
2 parents 5a87c5c + aa34b85 commit e240099
Show file tree
Hide file tree
Showing 15 changed files with 198 additions and 13 deletions.
40 changes: 29 additions & 11 deletions client/src/wasm-madoop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@ let execEmit: Function = () => {
throw new Error('[Madoop] function `execEmit` not defined.');
};

const madoop = {
runtime: {
wasmMapPreprocess: async data => {
return data;
},
wasmReducePreprocess: async ({key: key, value: value}) => {
return {key: key, value: value};
}
}
};

((): void => {

const __SERVER_ENDPOINT_URL =
Expand Down Expand Up @@ -68,9 +79,11 @@ let execEmit: Function = () => {
}
const wasmData: {
wasmJs: string,
wasmBinary: {type: string, data: Array<number>}
wasmBinary: {type: string, data: Array<number>},
wasmPreprocessJs: string
} = await ajaxGet(`${__SERVER_ENDPOINT_URL}/wasmData/${metaInfo.phase}`).then(res => res.json());
applyScript(wasmData.wasmJs);
if (wasmData.wasmPreprocessJs) { (new Function(wasmData.wasmPreprocessJs))(); }
let func;
if (metaInfo.phase === 'map') {
await new Promise<void>((resolve, reject) => {
Expand All @@ -81,9 +94,9 @@ let execEmit: Function = () => {
const map = module.cwrap('map', null, ['number']);
func = (inputDataString: string) => {
// Pass the argument through the pointer which addresses Emscripten's heap
const bufferSize = module.lengthBytesUTF8(inputDataString);
const buffer = module._malloc(bufferSize + 1);
module.stringToUTF8(inputDataString, buffer, bufferSize + 1);
const bufferSize = module.lengthBytesUTF8(inputDataString) + 1;
const buffer = module._malloc(bufferSize);
module.stringToUTF8(inputDataString, buffer, bufferSize);
map(buffer);
module._free(buffer);
};
Expand All @@ -101,9 +114,9 @@ let execEmit: Function = () => {
const reduce = module.cwrap('reduce', null, ['string', 'number']);
func = (keyString: string, valuesString: string) => {
// Pass the argument through the pointer which addresses Emscripten's heap
const bufferSize = module.lengthBytesUTF8(valuesString);
const buffer = module._malloc(bufferSize + 1);
module.stringToUTF8(valuesString, buffer, bufferSize + 1);
const bufferSize = module.lengthBytesUTF8(valuesString) + 1;
const buffer = module._malloc(bufferSize);
module.stringToUTF8(valuesString, buffer, bufferSize);
reduce(keyString, buffer);
module._free(buffer);
};
Expand Down Expand Up @@ -140,12 +153,17 @@ let execEmit: Function = () => {
result.push(element);
};
if (nextTask.metaInfo.phase === 'map') {
func(nextTask.inputData); // call above `execEmit` inside
const preprocessed = await madoop.runtime.wasmMapPreprocess(nextTask.inputData);
func(preprocessed); // call above `execEmit` inside
} else if (nextTask.metaInfo.phase === 'reduce') {
const inputDataObject: { key: any, values: any[] }[] = JSON.parse(nextTask.inputData);
inputDataObject.forEach(element => {
func(element.key, element.values.toString()); // call above `execEmit` inside
});
for (const element of inputDataObject) {
const preprocessed = await madoop.runtime.wasmReducePreprocess({
key: element.key,
value: element.values.toString()
});
func(preprocessed.key, preprocessed.value); // call above `execEmit` inside
}
} else {
throw new Error(`[Madoop] invalid task id provided: ${nextTask.metaInfo.phase}`);
}
Expand Down
3 changes: 3 additions & 0 deletions sample/preprocess/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
map.js
reduce.js
*.wasm
12 changes: 12 additions & 0 deletions sample/preprocess/MyInputData.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import {AbstractInputData} from '../../';

export default
class MyInputData extends AbstractInputData {

constructor() {
super();
this.addInputData('https://raw.githubusercontent.com/h-matsuo/madoop/master/README.md');
this.addInputData('https://raw.githubusercontent.com/h-matsuo/madoop/master/LICENSE');
}

}
20 changes: 20 additions & 0 deletions sample/preprocess/MyShuffler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import {AbstractShuffler} from '../../';

export default
class MyShuffler extends AbstractShuffler {

setUpReducerInputData(
mapperResult: Map<any, any[]>
): Map<any, any[]>[] {

const reduceInputData = [];
const data = new Map<any, any[]>();
mapperResult.forEach((values, key) => {
data.set(key, values);
});
reduceInputData.push(data);
return reduceInputData;

}

}
38 changes: 38 additions & 0 deletions sample/preprocess/app-bbvc-wasm.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import {Job, WasmMapper, WasmReducer, WasmWebServer} from '../../';
import MyInputData from './MyInputData';
import MyShuffler from './MyShuffler';

import * as fs from 'fs';

const job = new Job('preprocess');
const inputData = new MyInputData();
const mapper = new WasmMapper();
const reducer = new WasmReducer();
const shuffler = new MyShuffler();
const server = new WasmWebServer();

const wasmJsMap = fs.readFileSync('./map.js', 'utf8');
const wasmBinaryMap = fs.readFileSync('./map.wasm');
const wasmPreprocessJsMap = fs.readFileSync('./wasm_map_preprocess.js', 'utf8');
mapper.setWasmJs(wasmJsMap);
mapper.setWasmBinary(wasmBinaryMap);
mapper.setWasmPreprocessJs(wasmPreprocessJsMap);

const wasmJsReducer = fs.readFileSync('./reduce.js', 'utf8');
const wasmBinaryReducer = fs.readFileSync('./reduce.wasm');
const wasmPreprocessJsReducer = fs.readFileSync('./wasm_reduce_preprocess.js', 'utf8');
reducer.setWasmJs(wasmJsReducer);
reducer.setWasmBinary(wasmBinaryReducer);
reducer.setWasmPreprocessJs(wasmPreprocessJsReducer);

job.setInputData(inputData);
job.setMapper(mapper);
job.setReducer(reducer);
job.setShuffler(shuffler);
job.setCallbackWhenCompleted(result => {
console.log(result);
process.exit(0);
});
server.setJob(job);
server.setRoot('/madoop/wasm');
server.run();
27 changes: 27 additions & 0 deletions sample/preprocess/compile.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/usr/bin/env bash

EMSCRIPTEN_IMAGE_NAME="trzeci/emscripten-slim"
EMSCRIPTEN_IMAGE_TAG="sdk-tag-1.38.8-64bit"

SCRIPT_DIR="$(cd $(dirname "${BASH_SOURCE:-$0}"); pwd)"

run_emscripten () {
docker run --rm -t \
-v "${SCRIPT_DIR}:/src" \
"${EMSCRIPTEN_IMAGE_NAME}:${EMSCRIPTEN_IMAGE_TAG}" "$@"
}

compile() {
run_emscripten emcc "${1}.cpp" \
-std=c++11 \
-s WASM=1 \
-s MODULARIZE=1 \
-s ALLOW_MEMORY_GROWTH=1 \
-s "EXPORTED_FUNCTIONS=['_${1}']" \
-s "EXTRA_EXPORTED_RUNTIME_METHODS=['cwrap', 'lengthBytesUTF8', 'stringToUTF8']" \
--js-library lib_emit_func.js \
-o "${1}.js"
}

compile map
compile reduce
5 changes: 5 additions & 0 deletions sample/preprocess/lib_emit_func.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mergeInto(LibraryManager.library, {
emit_func: function (key, value) {
execEmit(Pointer_stringify(key), Pointer_stringify(value));
}
});
14 changes: 14 additions & 0 deletions sample/preprocess/map.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#include <iostream>

extern "C" { // REQUIRED to prevent C++ name mangling

extern void emit_func(const char*, const char*); // REQUIRED to emit key-value pairs

void map(const char* data)
{
std::cout << "received data (preprocessed by wasm_map_preprocess.js):"
<< std::endl << data << std::endl;
emit_func("this-key-will-be-overwritten-by-wasm_reduce_preprocess.js", data);
}

}
10 changes: 10 additions & 0 deletions sample/preprocess/reduce.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
extern "C" { // REQUIRED to prevent C++ name mangling

extern void emit_func(const char*, const char*); // REQUIRED to emit key-value pairs

void reduce(const char* key, const char* values_str)
{
emit_func(key, values_str);
}

}
4 changes: 4 additions & 0 deletions sample/preprocess/wasm_map_preprocess.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
madoop.runtime.wasmMapPreprocess = async url => {
const text = await fetch(url).then(res => res.text());
return text;
};
3 changes: 3 additions & 0 deletions sample/preprocess/wasm_reduce_preprocess.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
madoop.runtime.wasmReducePreprocess = async ({key: key, value: value}) => {
return {key: 'this-key-was-overwritten', value: value};
};
12 changes: 12 additions & 0 deletions server/Job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@ class Job {
private reducer: AbstractReducer;
private shuffler: AbstractShuffler;
private numMapCompleted: number;
private isFirstAccess: boolean;
private callbackWhenAccessedFirstly: () => void;
private callbackWhenCompleted: (result?: any) => void;

constructor(jobId: string) {
this.jobId = jobId;
this.dataController = new DataController(this);
this.numMapCompleted = 0;
this.isFirstAccess = true;
this.callbackWhenAccessedFirstly = () => {}; // default: do nothing
this.callbackWhenCompleted = () => {}; // default: do nothing
}

Expand Down Expand Up @@ -61,6 +65,10 @@ class Job {
}

getNextTask(): Task | null {
if (this.isFirstAccess) {
this.callbackWhenAccessedFirstly();
this.isFirstAccess = false;
}
let task: Task = null;
if (!this.hasMapCompleted()) {
const nextMapperInputData = this.dataController.getNextMapperInputData();
Expand Down Expand Up @@ -140,6 +148,10 @@ class Job {
return this.dataController.getReducerResultPairs();
}

setCallbackWhenAccessedFirstly(callback: () => void): void {
this.callbackWhenAccessedFirstly = callback;
}

setCallbackWhenCompleted(callback: (result?: any) => void): void {
this.callbackWhenCompleted = callback;
}
Expand Down
8 changes: 8 additions & 0 deletions server/WasmMapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ class WasmMapper extends AbstractMapper {

private wasmJs: string;
private wasmBinary: Buffer;
private wasmPreprocessJs: string;

map(
inputData: any,
Expand All @@ -21,6 +22,10 @@ class WasmMapper extends AbstractMapper {
return this.wasmBinary;
}

getWasmPreprocessJs(): string {
return this.wasmPreprocessJs;
}

setWasmJs(js: string) {
this.wasmJs = js;
}
Expand All @@ -29,5 +34,8 @@ class WasmMapper extends AbstractMapper {
this.wasmBinary = binary;
}

setWasmPreprocessJs(js: string) {
this.wasmPreprocessJs = js;
}

}
9 changes: 9 additions & 0 deletions server/WasmReducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ class WasmReducer extends AbstractReducer {

private wasmJs: string;
private wasmBinary: Buffer;
private wasmPreprocessJs: string;

reduce(
inputData: Map<any, any[]>,
Expand All @@ -21,6 +22,10 @@ class WasmReducer extends AbstractReducer {
return this.wasmBinary;
}

getWasmPreprocessJs(): string {
return this.wasmPreprocessJs;
}

setWasmJs(js: string) {
this.wasmJs = js;
}
Expand All @@ -29,4 +34,8 @@ class WasmReducer extends AbstractReducer {
this.wasmBinary = binary;
}

setWasmPreprocessJs(js: string) {
this.wasmPreprocessJs = js;
}

}
6 changes: 4 additions & 2 deletions server/webapi-extension/WasmWebServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ class WasmWebServer {
const mapper = <WasmMapper>this.job.getMapper();
const data = {
wasmJs: mapper.getWasmJs(),
wasmBinary: mapper.getWasmBinary()
wasmBinary: mapper.getWasmBinary(),
wasmPreprocessJs: mapper.getWasmPreprocessJs()
};
res.send(data);
});
Expand All @@ -85,7 +86,8 @@ class WasmWebServer {
const reducer = <WasmReducer>this.job.getReducer();
const data = {
wasmJs: reducer.getWasmJs(),
wasmBinary: reducer.getWasmBinary()
wasmBinary: reducer.getWasmBinary(),
wasmPreprocessJs: reducer.getWasmPreprocessJs()
};
res.send(data);
});
Expand Down

0 comments on commit e240099

Please sign in to comment.