-
-
Notifications
You must be signed in to change notification settings - Fork 29
/
custom_save_fn.py
59 lines (49 loc) · 1.48 KB
/
custom_save_fn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# web imports
from flask import Flask
from flask_executor import Executor
from flask_executor.futures import Future
from flask_shell2http import Shell2HTTP
# Flask application instance
app = Flask(__name__)
# application factory
executor = Executor(app)
shell2http = Shell2HTTP(app, executor)
ENDPOINT = "echo"
def intercept_result(context, future: Future):
"""
Will be invoked on every process completion
"""
data = None
if future.done():
fname = context.get("read_result_from", None)
with open(fname) as f:
data = f.read()
# 1. get current result object
res = future.result() # this returns a dictionary
# 2. update the report variable,
# you may update only these: report,error,status
res["report"] = data
if context.get("force_success", False):
res["status"] = "success"
# 3. set new result
future._result = res
shell2http.register_command(
endpoint=ENDPOINT, command_name=ENDPOINT, callback_fn=intercept_result
)
# Test Runner
if __name__ == "__main__":
app.testing = True
c = app.test_client()
# request new process
data = {
"args": ["hello", "world"],
"callback_context": {
"read_result_from": "/path/to/saved/file",
"force_success": True,
},
}
r = c.post(f"/{ENDPOINT}", json=data)
# get result
result_url = r.get_json()["result_url"]
resp = c.get(result_url)
print(resp.get_json())