-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
89 lines (62 loc) · 2.45 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import abc
from typing import Union, overload
import runloop
from runloop import FunctionCompleteFulfillmentResult, FunctionOutput, FunctionInvocation, Latch, LatchResultType, \
FunctionCompleteFulfillment, TimeFulfillment, ApiFulfillment, LatchType, TimeFulfillmentResult, ApiFulfillmentResult
@runloop.function
def add(a: int, b: int) -> int:
return a + b
@runloop.function
def subtract(a: int, b: int) -> int:
return a - b
@runloop.function
def append_to_dict(my_dict: dict[str, list[str]]) -> list[str]:
my_dict["inner"].append("hello")
return my_dict["inner"]
class TestLatch(Latch):
def __init__(self, fulfillment: FunctionCompleteFulfillmentResult):
self._fulfillment = fulfillment
def await_result(self) -> LatchResultType:
return self._fulfillment
class TestScheduler(runloop.Scheduler):
def __init__(self):
super().__init__()
@overload
@abc.abstractmethod
def create_latch(self, latch_name: str, fulfillment: TimeFulfillment) -> Latch[TimeFulfillmentResult]:
raise NotImplementedError()
@overload
@abc.abstractmethod
def create_latch(
self, latch_name: str, fulfillment: FunctionCompleteFulfillment[FunctionOutput]
) -> Latch[FunctionCompleteFulfillmentResult[FunctionOutput]]:
raise NotImplementedError()
@overload
@abc.abstractmethod
def create_latch(
self, latch_name: str, fulfillment: ApiFulfillment[LatchType]
) -> Latch[ApiFulfillmentResult[LatchType]]:
raise NotImplementedError()
def schedule_at_time(self, function_invocation: FunctionInvocation, scheduled_time_ms: int) -> str:
raise NotImplementedError()
def launch(
self,
function_invocation: FunctionInvocation[FunctionOutput]
) -> Latch[FunctionCompleteFulfillmentResult[FunctionOutput]]:
return TestLatch(
fulfillment=FunctionCompleteFulfillmentResult(
output=function_invocation.invoke()
)
)
@runloop.function
def schedule_calculations(scheduler: runloop.Scheduler, a: int, b: int) -> str:
fn1 = add(a, b)
fn2 = subtract(a, b)
latches = [scheduler.launch(fn) for fn in [fn1, fn2]]
results = [latch.await_result() for latch in latches]
print(f"results: {[x.output for x in results]}")
return "hello"
if __name__ == "__main__":
print("Hello, World!")
scheduler = TestScheduler()
schedule_calculations(scheduler, 2, 1).invoke()