diff --git a/Util/HowToRx/Rx.story.lua b/Util/HowToRx/Rx.story.lua new file mode 100644 index 0000000..39b5873 --- /dev/null +++ b/Util/HowToRx/Rx.story.lua @@ -0,0 +1,170 @@ +--[[ + This is an abbreviated implementation of the internals of Rx. + Use this to construct a mental model of what an observable is. + + Run the examples by choosing an example at the bottom of this file, + then click on this story file in Hoarcekat. +]] + +type CleanupFn = () -> () + +type Subscriber = { + Fire: (self: Subscriber, T) -> (), + -- In Real Rx, there is also Fail and Complete here +} + +type Observable = { + Subscribe: (self: Observable, onFire: (T) -> ()) -> CleanupFn, + -- Actually returns Observable if that's what the last transformer returns + Pipe: (self: Observable, transformers: {Transformer}) -> any +} + +-- It's really (Observable) -> Observable but luau doesn't like recursive types +type Transformer = (any) -> any + +--[[ Example + Suppose the following have types + src: Observable + t1: Observable -> Observable + t2: Observable -> Observable + + Then Pipe(src, {t1, t2}): Observable, and its equivalent to t2(t1(src)) +]] +local function Pipe(source: Observable, transformers: {Transformer}): Observable + local current = source + for _, transformer in transformers do + current = transformer(current) + end + return current +end + +local function observable(onSubscribe: (Subscriber) -> CleanupFn?): Observable + return { + Subscribe = function(self, onFire: (T) -> ()) + local cleanup = onSubscribe({ + Fire = function(_, value) + onFire(value) + end, + }) + return cleanup or function() end + end, + Pipe = (Pipe :: any), + } +end + +local observeStuff: Observable = observable(function(sub: Subscriber) + + local thread = task.spawn(function() + for i=1, 3 do + sub:Fire(i) + end + task.wait(1) + sub:Fire(4) + task.wait(1) + sub:Fire(5) + task.wait(3) + sub:Fire(6) + end) + + return function() + print("observeStuff cleanup") + task.cancel(thread) + end +end) + +local function map(project: (T) -> U): (Observable) -> Observable + -- return a transformer + return function(source: Observable) + -- return the transformed observable + return observable(function(sub: Subscriber) + -- return the cleanup function + return source:Subscribe(function(value: T) + sub:Fire(project(value)) + end) + end) + end +end + +local function switchMap(project: (T) -> Observable): (Observable) -> Observable + return function(source: Observable) + return observable(function(sub: Subscriber) + local innerCleanup + + local outerCleanup = source:Subscribe(function(tValue: T) + if innerCleanup then + innerCleanup() + innerCleanup = nil + end + innerCleanup = project(tValue):Subscribe(function(uValue: U) + sub:Fire(uValue) + end) + end) + return function() + if innerCleanup then + innerCleanup() + innerCleanup = nil + end + outerCleanup() + end + end) + end +end + +local function example1() + + local cleanup = observeStuff:Pipe { + map(function(value) + return value * 2 + end), + map(function(value) + return value + 1 + end), + }:Subscribe(function(value) + print(value) + end) + + print("After subscribe") + + return cleanup +end + +local function example2() + + local function project(value) + return observable(function(sub) + local thread = task.spawn(function() + for i = string.byte("a"), string.byte("h") do + sub:Fire(string.char(i)) + task.wait(0.5) + end + end) + + return function() + print("inner cleanup", value) + task.cancel(thread) + end + end) + end + + local cleanup = observeStuff:Pipe { + switchMap(function(value) + return project(value):Pipe { + map(function(innerValue) + return value..innerValue + end) + } + end) + }:Subscribe(print) + + return cleanup +end + +-- Hoarcekat calls this function when you click on Rx +return function() + + -- This runs the example and then gives the cleanup function to Hoarcekat + -- Pick one of them to uncomment + + -- return example1() + -- return example2() +end