From feebef0e6b221bc36049e34fc646840c77596745 Mon Sep 17 00:00:00 2001 From: Hiroyuki Matsuo Date: Thu, 6 Sep 2018 17:14:03 +0900 Subject: [PATCH 1/4] [update] add event when task is accessed firstly --- server/Job.ts | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/server/Job.ts b/server/Job.ts index 9fa1bc5..6c41a27 100644 --- a/server/Job.ts +++ b/server/Job.ts @@ -15,12 +15,16 @@ class Job { private reducer: AbstractReducer; private shuffler: AbstractShuffler; private numMapCompleted: number; + private isFirstAccess: boolean; + private callbackWhenAccessedFirstly: () => void; private callbackWhenCompleted: (result?: any) => void; constructor(jobId: string) { this.jobId = jobId; this.dataController = new DataController(this); this.numMapCompleted = 0; + this.isFirstAccess = true; + this.callbackWhenAccessedFirstly = () => {}; // default: do nothing this.callbackWhenCompleted = () => {}; // default: do nothing } @@ -61,6 +65,10 @@ class Job { } getNextTask(): Task | null { + if (this.isFirstAccess) { + this.callbackWhenAccessedFirstly(); + this.isFirstAccess = false; + } let task: Task = null; if (!this.hasMapCompleted()) { const nextMapperInputData = this.dataController.getNextMapperInputData(); @@ -140,6 +148,10 @@ class Job { return this.dataController.getReducerResultPairs(); } + setCallbackWhenAccessedFirstly(callback: () => void): void { + this.callbackWhenAccessedFirstly = callback; + } + setCallbackWhenCompleted(callback: (result?: any) => void): void { this.callbackWhenCompleted = callback; } From ab4b314c7460703c490887b121974663bab0568e Mon Sep 17 00:00:00 2001 From: Hiroyuki Matsuo Date: Sun, 30 Sep 2018 15:37:38 +0900 Subject: [PATCH 2/4] [update] minor changes --- client/src/wasm-madoop.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/client/src/wasm-madoop.ts b/client/src/wasm-madoop.ts index bd2b641..e2958e0 100644 --- a/client/src/wasm-madoop.ts +++ b/client/src/wasm-madoop.ts @@ -81,9 +81,9 @@ let execEmit: Function = () => { const map = module.cwrap('map', null, ['number']); func = (inputDataString: string) => { // Pass the argument through the pointer which addresses Emscripten's heap - const bufferSize = module.lengthBytesUTF8(inputDataString); - const buffer = module._malloc(bufferSize + 1); - module.stringToUTF8(inputDataString, buffer, bufferSize + 1); + const bufferSize = module.lengthBytesUTF8(inputDataString) + 1; + const buffer = module._malloc(bufferSize); + module.stringToUTF8(inputDataString, buffer, bufferSize); map(buffer); module._free(buffer); }; @@ -101,9 +101,9 @@ let execEmit: Function = () => { const reduce = module.cwrap('reduce', null, ['string', 'number']); func = (keyString: string, valuesString: string) => { // Pass the argument through the pointer which addresses Emscripten's heap - const bufferSize = module.lengthBytesUTF8(valuesString); - const buffer = module._malloc(bufferSize + 1); - module.stringToUTF8(valuesString, buffer, bufferSize + 1); + const bufferSize = module.lengthBytesUTF8(valuesString) + 1; + const buffer = module._malloc(bufferSize); + module.stringToUTF8(valuesString, buffer, bufferSize); reduce(keyString, buffer); module._free(buffer); }; From 6a960f9892a98d3a7a0b3f746e02c3cc24faa36d Mon Sep 17 00:00:00 2001 From: Hiroyuki Matsuo Date: Sun, 30 Sep 2018 19:27:26 +0900 Subject: [PATCH 3/4] [update] refs #38 add preprocessing feature for map/reduce using WASM --- client/src/wasm-madoop.ts | 28 +++++++++++++++++++----- server/WasmMapper.ts | 8 +++++++ server/WasmReducer.ts | 9 ++++++++ server/webapi-extension/WasmWebServer.ts | 6 +++-- 4 files changed, 44 insertions(+), 7 deletions(-) diff --git a/client/src/wasm-madoop.ts b/client/src/wasm-madoop.ts index e2958e0..e3ae6c5 100644 --- a/client/src/wasm-madoop.ts +++ b/client/src/wasm-madoop.ts @@ -10,6 +10,17 @@ let execEmit: Function = () => { throw new Error('[Madoop] function `execEmit` not defined.'); }; +const madoop = { + runtime: { + wasmMapPreprocess: async data => { + return data; + }, + wasmReducePreprocess: async ({key: key, value: value}) => { + return {key: key, value: value}; + } + } +}; + ((): void => { const __SERVER_ENDPOINT_URL = @@ -68,9 +79,11 @@ let execEmit: Function = () => { } const wasmData: { wasmJs: string, - wasmBinary: {type: string, data: Array} + wasmBinary: {type: string, data: Array}, + wasmPreprocessJs: string } = await ajaxGet(`${__SERVER_ENDPOINT_URL}/wasmData/${metaInfo.phase}`).then(res => res.json()); applyScript(wasmData.wasmJs); + if (wasmData.wasmPreprocessJs) { (new Function(wasmData.wasmPreprocessJs))(); } let func; if (metaInfo.phase === 'map') { await new Promise((resolve, reject) => { @@ -140,12 +153,17 @@ let execEmit: Function = () => { result.push(element); }; if (nextTask.metaInfo.phase === 'map') { - func(nextTask.inputData); // call above `execEmit` inside + const preprocessed = await madoop.runtime.wasmMapPreprocess(nextTask.inputData); + func(preprocessed); // call above `execEmit` inside } else if (nextTask.metaInfo.phase === 'reduce') { const inputDataObject: { key: any, values: any[] }[] = JSON.parse(nextTask.inputData); - inputDataObject.forEach(element => { - func(element.key, element.values.toString()); // call above `execEmit` inside - }); + for (const element of inputDataObject) { + const preprocessed = await madoop.runtime.wasmReducePreprocess({ + key: element.key, + value: element.values.toString() + }); + func(preprocessed.key, preprocessed.value); // call above `execEmit` inside + } } else { throw new Error(`[Madoop] invalid task id provided: ${nextTask.metaInfo.phase}`); } diff --git a/server/WasmMapper.ts b/server/WasmMapper.ts index 6a9cea7..981c4b1 100644 --- a/server/WasmMapper.ts +++ b/server/WasmMapper.ts @@ -5,6 +5,7 @@ class WasmMapper extends AbstractMapper { private wasmJs: string; private wasmBinary: Buffer; + private wasmPreprocessJs: string; map( inputData: any, @@ -21,6 +22,10 @@ class WasmMapper extends AbstractMapper { return this.wasmBinary; } + getWasmPreprocessJs(): string { + return this.wasmPreprocessJs; + } + setWasmJs(js: string) { this.wasmJs = js; } @@ -29,5 +34,8 @@ class WasmMapper extends AbstractMapper { this.wasmBinary = binary; } + setWasmPreprocessJs(js: string) { + this.wasmPreprocessJs = js; + } } diff --git a/server/WasmReducer.ts b/server/WasmReducer.ts index 27ca810..5fa6426 100644 --- a/server/WasmReducer.ts +++ b/server/WasmReducer.ts @@ -5,6 +5,7 @@ class WasmReducer extends AbstractReducer { private wasmJs: string; private wasmBinary: Buffer; + private wasmPreprocessJs: string; reduce( inputData: Map, @@ -21,6 +22,10 @@ class WasmReducer extends AbstractReducer { return this.wasmBinary; } + getWasmPreprocessJs(): string { + return this.wasmPreprocessJs; + } + setWasmJs(js: string) { this.wasmJs = js; } @@ -29,4 +34,8 @@ class WasmReducer extends AbstractReducer { this.wasmBinary = binary; } + setWasmPreprocessJs(js: string) { + this.wasmPreprocessJs = js; + } + } diff --git a/server/webapi-extension/WasmWebServer.ts b/server/webapi-extension/WasmWebServer.ts index 72d3a24..87f432a 100644 --- a/server/webapi-extension/WasmWebServer.ts +++ b/server/webapi-extension/WasmWebServer.ts @@ -75,7 +75,8 @@ class WasmWebServer { const mapper = this.job.getMapper(); const data = { wasmJs: mapper.getWasmJs(), - wasmBinary: mapper.getWasmBinary() + wasmBinary: mapper.getWasmBinary(), + wasmPreprocessJs: mapper.getWasmPreprocessJs() }; res.send(data); }); @@ -85,7 +86,8 @@ class WasmWebServer { const reducer = this.job.getReducer(); const data = { wasmJs: reducer.getWasmJs(), - wasmBinary: reducer.getWasmBinary() + wasmBinary: reducer.getWasmBinary(), + wasmPreprocessJs: reducer.getWasmPreprocessJs() }; res.send(data); }); From aa34b8557d74400e6290dca5242f77d841872cd6 Mon Sep 17 00:00:00 2001 From: Hiroyuki Matsuo Date: Sun, 30 Sep 2018 19:40:04 +0900 Subject: [PATCH 4/4] [add] sample code for preprocessing --- sample/preprocess/.gitignore | 3 ++ sample/preprocess/MyInputData.ts | 12 +++++++ sample/preprocess/MyShuffler.ts | 20 +++++++++++ sample/preprocess/app-bbvc-wasm.ts | 38 +++++++++++++++++++++ sample/preprocess/compile.sh | 27 +++++++++++++++ sample/preprocess/lib_emit_func.js | 5 +++ sample/preprocess/map.cpp | 14 ++++++++ sample/preprocess/reduce.cpp | 10 ++++++ sample/preprocess/wasm_map_preprocess.js | 4 +++ sample/preprocess/wasm_reduce_preprocess.js | 3 ++ 10 files changed, 136 insertions(+) create mode 100644 sample/preprocess/.gitignore create mode 100644 sample/preprocess/MyInputData.ts create mode 100644 sample/preprocess/MyShuffler.ts create mode 100644 sample/preprocess/app-bbvc-wasm.ts create mode 100755 sample/preprocess/compile.sh create mode 100644 sample/preprocess/lib_emit_func.js create mode 100644 sample/preprocess/map.cpp create mode 100644 sample/preprocess/reduce.cpp create mode 100644 sample/preprocess/wasm_map_preprocess.js create mode 100644 sample/preprocess/wasm_reduce_preprocess.js diff --git a/sample/preprocess/.gitignore b/sample/preprocess/.gitignore new file mode 100644 index 0000000..a47278e --- /dev/null +++ b/sample/preprocess/.gitignore @@ -0,0 +1,3 @@ +map.js +reduce.js +*.wasm diff --git a/sample/preprocess/MyInputData.ts b/sample/preprocess/MyInputData.ts new file mode 100644 index 0000000..c257b33 --- /dev/null +++ b/sample/preprocess/MyInputData.ts @@ -0,0 +1,12 @@ +import {AbstractInputData} from '../../'; + +export default +class MyInputData extends AbstractInputData { + + constructor() { + super(); + this.addInputData('https://raw.githubusercontent.com/h-matsuo/madoop/master/README.md'); + this.addInputData('https://raw.githubusercontent.com/h-matsuo/madoop/master/LICENSE'); + } + +} diff --git a/sample/preprocess/MyShuffler.ts b/sample/preprocess/MyShuffler.ts new file mode 100644 index 0000000..2413e64 --- /dev/null +++ b/sample/preprocess/MyShuffler.ts @@ -0,0 +1,20 @@ +import {AbstractShuffler} from '../../'; + +export default +class MyShuffler extends AbstractShuffler { + + setUpReducerInputData( + mapperResult: Map + ): Map[] { + + const reduceInputData = []; + const data = new Map(); + mapperResult.forEach((values, key) => { + data.set(key, values); + }); + reduceInputData.push(data); + return reduceInputData; + + } + +} diff --git a/sample/preprocess/app-bbvc-wasm.ts b/sample/preprocess/app-bbvc-wasm.ts new file mode 100644 index 0000000..ee7f84d --- /dev/null +++ b/sample/preprocess/app-bbvc-wasm.ts @@ -0,0 +1,38 @@ +import {Job, WasmMapper, WasmReducer, WasmWebServer} from '../../'; +import MyInputData from './MyInputData'; +import MyShuffler from './MyShuffler'; + +import * as fs from 'fs'; + +const job = new Job('preprocess'); +const inputData = new MyInputData(); +const mapper = new WasmMapper(); +const reducer = new WasmReducer(); +const shuffler = new MyShuffler(); +const server = new WasmWebServer(); + +const wasmJsMap = fs.readFileSync('./map.js', 'utf8'); +const wasmBinaryMap = fs.readFileSync('./map.wasm'); +const wasmPreprocessJsMap = fs.readFileSync('./wasm_map_preprocess.js', 'utf8'); +mapper.setWasmJs(wasmJsMap); +mapper.setWasmBinary(wasmBinaryMap); +mapper.setWasmPreprocessJs(wasmPreprocessJsMap); + +const wasmJsReducer = fs.readFileSync('./reduce.js', 'utf8'); +const wasmBinaryReducer = fs.readFileSync('./reduce.wasm'); +const wasmPreprocessJsReducer = fs.readFileSync('./wasm_reduce_preprocess.js', 'utf8'); +reducer.setWasmJs(wasmJsReducer); +reducer.setWasmBinary(wasmBinaryReducer); +reducer.setWasmPreprocessJs(wasmPreprocessJsReducer); + +job.setInputData(inputData); +job.setMapper(mapper); +job.setReducer(reducer); +job.setShuffler(shuffler); +job.setCallbackWhenCompleted(result => { + console.log(result); + process.exit(0); +}); +server.setJob(job); +server.setRoot('/madoop/wasm'); +server.run(); diff --git a/sample/preprocess/compile.sh b/sample/preprocess/compile.sh new file mode 100755 index 0000000..42f9c25 --- /dev/null +++ b/sample/preprocess/compile.sh @@ -0,0 +1,27 @@ +#!/usr/bin/env bash + +EMSCRIPTEN_IMAGE_NAME="trzeci/emscripten-slim" +EMSCRIPTEN_IMAGE_TAG="sdk-tag-1.38.8-64bit" + +SCRIPT_DIR="$(cd $(dirname "${BASH_SOURCE:-$0}"); pwd)" + +run_emscripten () { + docker run --rm -t \ + -v "${SCRIPT_DIR}:/src" \ + "${EMSCRIPTEN_IMAGE_NAME}:${EMSCRIPTEN_IMAGE_TAG}" "$@" +} + +compile() { + run_emscripten emcc "${1}.cpp" \ + -std=c++11 \ + -s WASM=1 \ + -s MODULARIZE=1 \ + -s ALLOW_MEMORY_GROWTH=1 \ + -s "EXPORTED_FUNCTIONS=['_${1}']" \ + -s "EXTRA_EXPORTED_RUNTIME_METHODS=['cwrap', 'lengthBytesUTF8', 'stringToUTF8']" \ + --js-library lib_emit_func.js \ + -o "${1}.js" +} + +compile map +compile reduce diff --git a/sample/preprocess/lib_emit_func.js b/sample/preprocess/lib_emit_func.js new file mode 100644 index 0000000..bcf3e7b --- /dev/null +++ b/sample/preprocess/lib_emit_func.js @@ -0,0 +1,5 @@ +mergeInto(LibraryManager.library, { + emit_func: function (key, value) { + execEmit(Pointer_stringify(key), Pointer_stringify(value)); + } +}); diff --git a/sample/preprocess/map.cpp b/sample/preprocess/map.cpp new file mode 100644 index 0000000..cbfe9ff --- /dev/null +++ b/sample/preprocess/map.cpp @@ -0,0 +1,14 @@ +#include + +extern "C" { // REQUIRED to prevent C++ name mangling + + extern void emit_func(const char*, const char*); // REQUIRED to emit key-value pairs + + void map(const char* data) + { + std::cout << "received data (preprocessed by wasm_map_preprocess.js):" + << std::endl << data << std::endl; + emit_func("this-key-will-be-overwritten-by-wasm_reduce_preprocess.js", data); + } + +} diff --git a/sample/preprocess/reduce.cpp b/sample/preprocess/reduce.cpp new file mode 100644 index 0000000..eb7243a --- /dev/null +++ b/sample/preprocess/reduce.cpp @@ -0,0 +1,10 @@ +extern "C" { // REQUIRED to prevent C++ name mangling + + extern void emit_func(const char*, const char*); // REQUIRED to emit key-value pairs + + void reduce(const char* key, const char* values_str) + { + emit_func(key, values_str); + } + +} diff --git a/sample/preprocess/wasm_map_preprocess.js b/sample/preprocess/wasm_map_preprocess.js new file mode 100644 index 0000000..dd9c10f --- /dev/null +++ b/sample/preprocess/wasm_map_preprocess.js @@ -0,0 +1,4 @@ +madoop.runtime.wasmMapPreprocess = async url => { + const text = await fetch(url).then(res => res.text()); + return text; +}; diff --git a/sample/preprocess/wasm_reduce_preprocess.js b/sample/preprocess/wasm_reduce_preprocess.js new file mode 100644 index 0000000..e4ca028 --- /dev/null +++ b/sample/preprocess/wasm_reduce_preprocess.js @@ -0,0 +1,3 @@ +madoop.runtime.wasmReducePreprocess = async ({key: key, value: value}) => { + return {key: 'this-key-was-overwritten', value: value}; +};