Skip to content

Commit

Permalink
Introduce synchronized signals
Browse files Browse the repository at this point in the history
  • Loading branch information
mbeckem committed Oct 4, 2024
1 parent ea12e2f commit 41faf0f
Show file tree
Hide file tree
Showing 6 changed files with 335 additions and 7 deletions.
3 changes: 2 additions & 1 deletion packages/reactivity-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

## v0.4.3 (Unreleased)

- Introduce `synchronized`, a new kind of signal designed to integrate foreign data sources.
- Introduce `subtleWatchDirty`, a function that allows one to watch for signal changes without triggering the re-evaluation of the signal.
- Add missing `forEach()` method to `ReactiveSet` and `ReactiveMap`.
- Add missing `forEach` method to `ReactiveSet` and `ReactiveMap`.
- Deprecate `syncEffectOnce` (use `subtleWatchDirty` instead).

## v0.4.2
Expand Down
165 changes: 164 additions & 1 deletion packages/reactivity-core/ReactiveImpl.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { describe, expect, it, vi } from "vitest";
import { batch, computed, external, reactive } from "./ReactiveImpl";
import { batch, computed, external, reactive, synchronized } from "./ReactiveImpl";
import { syncEffect } from "./sync";

describe("reactive", () => {
Expand Down Expand Up @@ -295,3 +295,166 @@ describe("external", () => {
expect(count).toBe(2);
});
});

describe("synchronized", () => {
it("always accesses the data source when not watched", () => {
const getter = vi.fn().mockReturnValue(123);
const sync = synchronized(getter, () => {
throw new Error("not called");
});
expect(getter).toHaveBeenCalledTimes(0);

expect(sync.value).toBe(123);
expect(getter).toHaveBeenCalledTimes(1);

expect(sync.value).toBe(123);
expect(getter).toHaveBeenCalledTimes(2); // NOT cached
});

it("subscribes to the data source when watched", () => {
const getter = vi.fn().mockReturnValue(123);
const unsubscribe = vi.fn();
const subscribe = vi.fn().mockReturnValue(unsubscribe);

const sync = synchronized(getter, subscribe);
expect(getter).toHaveBeenCalledTimes(0);
expect(subscribe).toHaveBeenCalledTimes(0);

const { destroy } = syncEffect(() => sync.value);
expect(getter).toHaveBeenCalledTimes(1);
expect(subscribe).toHaveBeenCalledTimes(1);

const { destroy: destroy2 } = syncEffect(() => sync.value);
expect(getter).toHaveBeenCalledTimes(1); // cached
expect(subscribe).toHaveBeenCalledTimes(1); // not called again for second active effect

destroy2();
expect(unsubscribe).toHaveBeenCalledTimes(0);

destroy();
expect(unsubscribe).toHaveBeenCalledTimes(1);

sync.value;
expect(getter).toHaveBeenCalledTimes(2); // called again
});

it("notifies watchers when the data source changes", () => {
const ds = new DataSource(0);
const getter = vi.fn(() => ds.value);
const sync = synchronized(getter, (cb) => ds.subscribe(cb));

// not subscribed yet
expect(ds.listener).toBe(undefined);

// setup effect
const spy = vi.fn();
syncEffect(() => {
spy(sync.value);
});
expect(spy).toHaveBeenCalledTimes(1);
expect(spy).toHaveBeenLastCalledWith(0);
expect(ds.listener).toBeDefined();
expect(getter).toHaveBeenCalledTimes(1);

// data is cached
sync.value;
expect(getter).toHaveBeenCalledTimes(1);

// update data source
ds.value = 1;
expect(spy).toHaveBeenCalledTimes(2);
expect(spy).toHaveBeenLastCalledWith(1);
expect(getter).toHaveBeenCalledTimes(2);
});

// See this comment:
// https://github.com/tc39/proposal-signals/issues/237#issuecomment-2346834056
//
// The values here are different from the linked comment (some got lost).
// This is because preact-signals probes the first dependency of `result`
// and re-evaluates `sync` one additional time.
//
// This is probably not a show stopper because the signal just re-validates more often than necessary.
// It would be more of a problem if it would cache too aggressively.
it("has weird behavior if the data source has side effects", () => {
let count = 0;
const sync = synchronized(
() => count++,
() => () => {}
);
const dep1 = computed(() => sync.value);
const dep2 = computed(() => sync.value);
const result = computed(() => `${dep1.value},${dep1.value},${dep2.value},${dep2.value}`);
expect(result.value).toMatchInlineSnapshot(`"0,2,3,5"`);

const { destroy } = syncEffect(() => {
expect(result.value).toMatchInlineSnapshot(`"6,6,6,6"`);
});
destroy();

expect(result.value).toMatchInlineSnapshot(`"10,12,13,15"`);
});

it("does not cache computes across many levels", () => {
const getter = vi.fn().mockReturnValue(1);
const sync = synchronized(getter, () => {
throw new Error("not called");
});

const c1 = computed(() => sync.value);
const c2 = computed(() => c1.value);
const c3 = computed(() => c2.value);
const c4 = computed(() => c3.value + c2.value);
expect(c4.value).toBe(2);
expect(getter.mock.calls.length).toMatchInlineSnapshot(`2`);

// High number of re-computations probably due to re-validation of
// dependencies (see previous test).
// As long as the value is correct, this is not a major problem.
getter.mockReturnValue(2);
expect(c4.value).toBe(4);
expect(getter.mock.calls.length).toMatchInlineSnapshot(`8`);

getter.mockReturnValue(3);
expect(c4.value).toBe(6);
expect(getter.mock.calls.length).toMatchInlineSnapshot(`14`);
});
});

class DataSource {
#listener: (() => void) | undefined;
#value: number;

constructor(value = 0) {
this.#value = value;
}

get listener() {
return this.#listener;
}

get value() {
return this.#value;
}

set value(value: number) {
if (value !== this.#value) {
this.#value = value;
this.#listener?.();
}
}

subscribe(listener: () => void) {
if (this.#listener) {
throw new Error("Already subscribed");
}

this.#listener = listener;
return () => {
if (this.#listener !== listener) {
throw new Error("Invalid unsubscribe call");
}
this.#listener = undefined;
};
}
}
159 changes: 155 additions & 4 deletions packages/reactivity-core/ReactiveImpl.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
Signal,
ReadonlySignal as RawReadonlySignal,
Signal as RawSignal,
batch as rawBatch,
computed as rawComputed,
signal as rawSignal,
Expand All @@ -9,9 +10,10 @@ import {
AddBrand,
AddWritableBrand,
ExternalReactive,
Reactive,
ReadonlyReactive,
RemoveBrand,
Reactive
SubscribeFunc
} from "./types";

/**
Expand Down Expand Up @@ -163,6 +165,67 @@ export function external<T>(compute: () => T, options?: ReactiveOptions<T>): Ext
return externalReactive as ExternalReactive<T>;
}

/**
* Creates a signal that synchronizes with a foreign data source.
*
* This kind of signal is useful when integrating another library (DOM APIs, etc.) that does not
* use the reactivity system provided by this library.
* The major advantage of this API is that it will automatically subscribe to the foreign data source while the signal is actually being used.
*
* Principles:
* - The `getter` function should return the current value from the foreign data source.
* It should be cheap to call and should not have side effects.
* Ideally it performs some caching on its own, but this is not strictly needed.
* - The `subscribe` function should implement whatever logic is necessary to listen for changes.
* It receives a callback function that should be called whenever the value changes.
* `subscribe` should return a cleanup function to unsubscribe - it will be called automatically when appropriate.
* - When the signal is not watched in some way, accesses to the `getter` are not cached.
* - When the signal is being watched (e.g. by an effect), the signal will automatically subscribe to the foreign data source
* and cache the current value until it is informed about a change.
*
* Example:
*
* ```ts
* import { synchronized, watchValue } from "@conterra/reactivity-core";
*
* const abortController = new AbortController();
* const abortSignal = abortController.signal;
* const aborted = synchronized(
* () => abortSignal.aborted,
* (callback) => {
* abortSignal.addEventListener("abort", callback);
* return () => {
* abortSignal.removeEventListener("abort", callback);
* };
* }
* );
*
* watchValue(
* () => aborted.value,
* (aborted) => {
* console.log("Aborted:", aborted);
* },
* {
* immediate: true
* }
* );
*
* setTimeout(() => {
* abortController.abort();
* }, 1000);
*
* // Prints:
* // Aborted: false
* // Aborted: true
* ```
*
* @group Primitives
*/
export function synchronized<T>(getter: () => T, subscribe: SubscribeFunc): ReadonlyReactive<T> {
const impl = new SynchronizedReactiveImpl(getter, subscribe);
return impl as AddBrand<typeof impl>;
}

/**
* Executes a set of reactive updates implemented in `callback`.
* Effects are delayed until the batch has completed.
Expand Down Expand Up @@ -268,9 +331,9 @@ const CUSTOM_EQUALS = Symbol("equals");
abstract class ReactiveImpl<T>
implements RemoveBrand<ReadonlyReactive<T> & Reactive<T> & ExternalReactive<T>>
{
private [REACTIVE_SIGNAL]: Signal<T>;
private [REACTIVE_SIGNAL]: RawSignal<T>;

constructor(signal: Signal<T>) {
constructor(signal: RawSignal<T>) {
this[REACTIVE_SIGNAL] = signal;
}

Expand Down Expand Up @@ -330,6 +393,94 @@ class WritableReactiveImpl<T> extends ReactiveImpl<T> {
}
}

const INVALIDATE_SIGNAL = Symbol("invalidate_signal");
const IS_WATCHED = Symbol("is_watched");
const HAS_SCHEDULED_INVALIDATE = Symbol("has_scheduled_invalidate");

/**
* Custom signal implementation for "synchronized" values, i.e. values from a foreign source.
* The signal automatically subscribes to the foreign source once it becomes watched (it also unsubscribes automatically).
*
* Although the implementation is based on a `computed` signal, there may not be any caching involved, depending on the state of the signal.
*
* 1. The signal is not watched: the "computed" is always out of date.
* This is achieved by immediately invalidating the signal itself from within its own body.
* To the raw signal, this looks like a dependency cycle (which is allowed in this case).
* 2. The signal is watched: the value is cached until an update event is received; at which point the foreign data source is accessed again.
*
* See also https://github.com/tc39/proposal-signals/issues/237
*/
class SynchronizedReactiveImpl<T> extends ReactiveImpl<T> {
[INVALIDATE_SIGNAL] = rawSignal(false);
[IS_WATCHED] = false;
[HAS_SCHEDULED_INVALIDATE] = false;

constructor(getter: () => T, subscribe: SubscribeFunc) {
const rawSignal = rawComputedWithSubscriptionHook(
() => {
this[INVALIDATE_SIGNAL].value;
if (!this[IS_WATCHED]) {
this.#invalidate();
}
return rawUntracked(() => getter());
},
() => {
this[IS_WATCHED] = true;
const unsubscribe = subscribe(this.#invalidate);
return () => {
this[IS_WATCHED] = false;
unsubscribe();
this.#invalidate();
};
}
);
super(rawSignal);
}

#invalidate = () => {
this[INVALIDATE_SIGNAL].value = !this[INVALIDATE_SIGNAL].peek();
};
}

// Mangled member names. See https://github.com/preactjs/signals/blob/main/mangle.json.
const _SUBSCRIBE = "S";
const _UNSUBSCRIBE = "U";

type RawSignalInternals<T> = RawReadonlySignal<T> & {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
[_SUBSCRIBE](node: any): void;

// eslint-disable-next-line @typescript-eslint/no-explicit-any
[_UNSUBSCRIBE](node: any): void;
};

// Overrides the subscribe/unsubscribe methods of a signal to allow for custom subscription hooks.
function rawComputedWithSubscriptionHook<T>(
compute: () => T,
subscribe: () => () => void
): RawReadonlySignal<T> {
const signal = rawComputed(compute) as RawSignalInternals<T>;
const origSubscribe = signal[_SUBSCRIBE];
const origUnsubscribe = signal[_UNSUBSCRIBE];

let subscriptions = 0;
let cleanup: (() => void) | undefined;
signal[_SUBSCRIBE] = function patchedSubscribe(node: unknown) {
origSubscribe.call(this, node);
if (subscriptions++ === 0) {
cleanup = subscribe();
}
};
signal[_UNSUBSCRIBE] = function patchedUnsubscribe(node: unknown) {
origUnsubscribe.call(this, node);
if (--subscriptions === 0) {
cleanup?.();
cleanup = undefined;
}
};
return signal;
}

function computeWithEquals<T>(compute: () => T, equals: EqualsFunc<T>) {
let firstExecution = true;
let currentValue: T;
Expand Down
1 change: 1 addition & 0 deletions packages/reactivity-core/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export function effect(callback: EffectCallback): CleanupHandle {
};
}

// TODO: Replace useEffectOnce with subtleWatchDirty (or a similar approach using raw effects).
class AsyncEffect {
private callback: EffectCallback;
private cleanup: CleanupFunc | void | undefined;
Expand Down
Loading

0 comments on commit 41faf0f

Please sign in to comment.