Skip to content

Commit

Permalink
DOM: Implement some(), every(), find() Observable operators
Browse files Browse the repository at this point in the history
This CL implements the following Observable operators:
 - some()
 - every()
 - find()

Their inclusion is to be consistent with Iterator helpers [1] and Async
iterator helpers [2], and is discussed in
WICG/observable#126 and to a lesser extent
WICG/observable#106.

See WICG/observable#137 for the specification.

[1]: https://github.com/tc39/proposal-iterator-helpers
[2]: https://github.com/tc39/proposal-async-iterator-helpers

For WPTs:
Co-authored-by: [email protected]

Bug: 40282760
Change-Id: I0fcbb9d716d6ef1727b050fd82a6fa20f53dea4b
  • Loading branch information
domfarolino authored and chromium-wpt-export-bot committed Apr 24, 2024
1 parent 4ef549f commit 4e938da
Show file tree
Hide file tree
Showing 3 changed files with 514 additions and 0 deletions.
276 changes: 276 additions & 0 deletions dom/observable/tentative/observable-every.any.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
promise_test(async () => {
const source = new Observable(subscriber => {
subscriber.next("good");
subscriber.next("good");
subscriber.next("good");
subscriber.complete();
});

const result = await source.every((value) => value === "good");

assert_true(result, "Promise resolves with true if all values pass the predicate");
}, "every(): Promise resolves to true if all values pass the predicate");

promise_test(async () => {
const source = new Observable(subscriber => {
subscriber.next("good");
subscriber.next("good");
subscriber.next("bad");
subscriber.complete();
});

const result = await source.every((value) => value === "good");

assert_false(result, "Promise resolves with false if any value fails the predicate");
}, "every(): Promise resolves to false if any value fails the predicate");

promise_test(async () => {
let tornDown = false;
let subscriberActiveAfterFailingPredicate = true;
const source = new Observable(subscriber => {
subscriber.addTeardown(() => tornDown = true);
subscriber.next("good");
subscriber.next("good");
subscriber.next("bad");
subscriberActiveAfterFailingPredicate = subscriber.active;
subscriber.next("good");
subscriber.complete();
});

const result = await source.every((value) => value === "good");

assert_false(result, "Promise resolves with false if any value fails the predicate");
assert_false(subscriberActiveAfterFailingPredicate,
"Subscriber becomes inactive because every() unsubscribed");
}, "every(): Abort the subscription to the source if the predicate does not pass");

promise_test(async () => {
const logs = [];

const source = createTestSubject({
onSubscribe: () => logs.push("subscribed to source"),
onTeardown: () => logs.push("teardown"),
});

const resultPromise = source.every((value, index) => {
logs.push(`Predicate called with ${value}, ${index}`);
return true;
});

let promiseResolved = false;

resultPromise.then(() => promiseResolved = true);

assert_array_equals(logs, ["subscribed to source"],
"calling every() subscribes to the source immediately");

source.next("a");
assert_array_equals(logs, [
"subscribed to source",
"Predicate called with a, 0"
], "Predicate called with the value and the index");

source.next("b");
assert_array_equals(logs, [
"subscribed to source",
"Predicate called with a, 0",
"Predicate called with b, 1",
], "Predicate called with the value and the index");

// wait a tick, just to prove that you have to wait for complete to be called.
await Promise.resolve();

assert_false(promiseResolved,
"Promise should not resolve until after the source completes");

source.complete();
assert_array_equals(logs, [
"subscribed to source",
"Predicate called with a, 0",
"Predicate called with b, 1",
"teardown",
], "Teardown function called immediately after the source completes");

const result = await resultPromise;

assert_true(result,
"Promise resolves with true if all values pass the predicate");
}, "every(): Lifecycle checks when all values pass the predicate");

promise_test(async () => {
const logs = [];

const source = createTestSubject({
onSubscribe: () => logs.push("subscribed to source"),
onTeardown: () => logs.push("teardown"),
});

const resultPromise = source.every((value, index) => {
logs.push(`Predicate called with ${value}, ${index}`);
return value === "good";
});

let promiseResolved = false;

resultPromise.then(() => promiseResolved = true);

assert_array_equals(logs, ["subscribed to source"],
"calling every() subscribes to the source immediately");

source.next("good");
source.next("good");
assert_array_equals(logs, [
"subscribed to source",
"Predicate called with good, 0",
"Predicate called with good, 1",
], "Predicate called with the value and the index");

assert_false(promiseResolved, "Promise should not resolve until after the predicate fails");

source.next("bad");
assert_array_equals(logs, [
"subscribed to source",
"Predicate called with good, 0",
"Predicate called with good, 1",
"Predicate called with bad, 2",
"teardown",
], "Predicate called with the value and the index, failing predicate immediately aborts subscription to source");

const result = await resultPromise;

assert_false(result, "Promise resolves with false if any value fails the predicate");
}, "every(): Lifecycle checks when any value fails the predicate");

promise_test(async () => {
const source = new Observable(subscriber => {
subscriber.complete();
});

const result = await source.every(() => true);

assert_true(result,
"Promise resolves with true if the observable completes without " +
"emitting a value");
}, "every(): Resolves with true if the observable completes without " +
"emitting a value");

promise_test(async () => {
const source = new Observable(subscriber => {
subscriber.error(new Error("error from source"));
});

let rejection = undefined;
try {
await source.every(() => true);
} catch (e) {
rejection = e;
}

assert_true(rejection instanceof Error,
"Promise rejects with the error emitted from the source observable");

assert_equals(rejection.message, "error from source",
"Promise rejects with the error emitted from the source observable");
}, "every(): Rejects with any error emitted from the source observable");

promise_test(async () => {
const source = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});

let rejection = undefined;
try {
await source.every((value) => {
if (value <= 2) return true;
throw new Error("bad value");
});
} catch (e) {
rejection = e;
}

assert_true(rejection instanceof Error,
"Promise rejects with any error thrown from the predicate");

assert_equals(rejection.message, "bad value",
"Promise rejects with any error thrown from the predicate");
}, "every(): Rejects with any error thrown from the predicate");

promise_test(async () => {
const indices = [];

const source = new Observable(subscriber => {
subscriber.next("a");
subscriber.next("b");
subscriber.next("c");
subscriber.complete();
});

const value = await source.every((value, index) => {
indices.push(index);
return true;
});

assert_array_equals(indices, [0, 1, 2]);

assert_true(value,
"Promise resolves with true if all values pass the predicate");
}, "every(): Index is passed into the predicate");

promise_test(async () => {
const source = new Observable(subscriber => {});

const controller = new AbortController();
const promise = source.every(() => true, { signal: controller.signal });
controller.abort();

let rejection = undefined;
try {
await promise;
} catch (e) {
rejection = e;
}

assert_true(rejection instanceof DOMException,
"Promise rejects with a DOMException if the source Observable is aborted");

assert_equals(rejection.name, "AbortError",
"Promise rejects with a DOMException if the source Observable is aborted");
}, "every(): Rejects with a DOMException if the source Observable is aborted");

function createTestSubject(options) {
const onTeardown = options?.onTeardown;

const subscribers = new Set();
const subject = new Observable(subscriber => {
options?.onSubscribe?.();
subscribers.add(subscriber);
subscriber.addTeardown(() => subscribers.delete(subscriber));
if (onTeardown) {
subscriber.addTeardown(onTeardown);
}
});

subject.next = (value) => {
for (const subscriber of Array.from(subscribers)) {
subscriber.next(value);
}
};
subject.error = (error) => {
for (const subscriber of Array.from(subscribers)) {
subscriber.error(error);
}
};
subject.complete = () => {
for (const subscriber of Array.from(subscribers)) {
subscriber.complete();
}
};
subject.subscriberCount = () => {
return subscribers.size;
};

return subject;
}
110 changes: 110 additions & 0 deletions dom/observable/tentative/observable-find.any.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
promise_test(async () => {
let inactiveAfterB = false;
const source = new Observable(subscriber => {
subscriber.next("a");
subscriber.next("b");
inactiveAfterB = !subscriber.active;
subscriber.next("c");
subscriber.complete();
});

const value = await source.find((value) => value === "b");

assert_equals(value, "b", "Promise resolves with the first value that passes the predicate");

assert_true(inactiveAfterB, "subscriber is inactive after the first value that passes the predicate");
}, "find(): Promise resolves with the first value that passes the predicate");

promise_test(async () => {
const source = new Observable(subscriber => {
subscriber.next("a");
subscriber.next("b");
subscriber.next("c");
subscriber.complete();
});

const value = await source.find(() => false);

assert_equals(value, undefined, "Promise resolves with undefined if no value passes the predicate");
}, "find(): Promise resolves with undefined if no value passes the predicate");

promise_test(async () => {
const error = new Error("error from source");
const source = new Observable(subscriber => {
subscriber.error(error);
});

let rejection = undefined;
try {
await source.find(() => true);
} catch (e) {
rejection = e;
}

assert_equals(rejection, error, "Promise rejects with the error emitted from the source Observable");

assert_equals(rejection.message, "error from source", "Promise rejects with the error emitted from the source Observable");
}, "find(): Promise rejects with the error emitted from the source Observable");

promise_test(async () => {
const source = new Observable(subscriber => {
subscriber.next("ignored");
});

let rejection = undefined;
try {
await source.find(() => {
throw new Error("thrown from predicate");
});
} catch (e) {
rejection = e;
}

assert_true(rejection instanceof Error, "Promise rejects with any error thrown from the predicate");

assert_equals(rejection.message, "thrown from predicate", "Promise rejects with any error thrown from the predicate");
}, "find(): Promise rejects with any error thrown from the predicate");

promise_test(async () => {
let indices = [];

const source = new Observable(subscriber => {
subscriber.next("a");
subscriber.next("b");
subscriber.next("c");
subscriber.complete();
});

const value = await source.find((value, index) => {
indices.push(index);
return false;
});

assert_equals(value, undefined, "Promise resolves with undefined if no value passes the predicate");

assert_array_equals(indices, [0, 1, 2], "find(): Passes the index of the value to the predicate");
}, "find(): Passes the index of the value to the predicate");

promise_test(async () => {
const controller = new AbortController();
const source = new Observable(subscriber => {
subscriber.next("a");
subscriber.next("b");
subscriber.next("c");
subscriber.complete();
});

controller.abort();
const promise = source.find(() => true, { signal: controller.signal });

let rejection = undefined;
try {
await promise;
} catch (e) {
rejection = e;
}

assert_not_equals(rejection, undefined, 'rejection is assigned when Promise rejects');
assert_true(rejection instanceof DOMException, "Promise rejects with DOMException when the signal is aborted");
assert_equals(rejection.name, "AbortError", "Promise rejects with DOMException when the signal is aborted");
}, "find(): Rejects with DOMException when the signal is aborted");
Loading

0 comments on commit 4e938da

Please sign in to comment.