diff --git a/dom/observable/tentative/observable-from.any.js b/dom/observable/tentative/observable-from.any.js index cc4d3119d2170c..e924f1f5e84061 100644 --- a/dom/observable/tentative/observable-from.any.js +++ b/dom/observable/tentative/observable-from.any.js @@ -538,6 +538,581 @@ test(() => { }, "from(): Throws 'callable' error when @@iterator property is a " + "non-callable primitive"); -// TODO(dom@chromium.org): Add another test like the above, but for -// `[Symbol.asyncIterator] = null` falling back to `[Symbol.iterator]` -// conversion. +// This test exercises the line of spec prose that says: +// +// "If |asyncIteratorMethodRecord|'s [[Value]] is undefined or null, then jump +// to the step labeled 'From iterable'." +test(() => { + const sync_iterable = { + [Symbol.asyncIterator]: null, + [Symbol.iterator]() { + return { + value: 0, + next() { + if (this.value === 2) + return {value: undefined, done: true}; + else + return {value: this.value++, done: false}; + } + } + }, + }; + + const results = []; + const source = Observable.from(sync_iterable).subscribe(v => results.push(v)); + assert_array_equals(results, [0, 1]); +}, "from(): Async iterable protocol null, converts as iterator"); + +promise_test(async t => { + const results = []; + const async_iterable = { + [Symbol.asyncIterator]() { + results.push("[Symbol.asyncIterator]() invoked"); + return { + val: 0, + next() { + return new Promise(resolve => { + t.step_timeout(() => { + resolve({ + value: this.val, + done: this.val++ === 4 ? true : false, + }); + }, 400); + }); + }, + }; + }, + }; + + const source = Observable.from(async_iterable); + assert_array_equals(results, []); + + await new Promise(resolve => { + source.subscribe({ + next: v => { + results.push(`Observing ${v}`); + queueMicrotask(() => results.push(`next() microtask interleaving (v=${v})`)); + }, + complete: () => { + results.push('complete()'); + resolve(); + }, + }); + }); + + assert_array_equals(results, [ + "[Symbol.asyncIterator]() invoked", + "Observing 0", + "next() microtask interleaving (v=0)", + "Observing 1", + "next() microtask interleaving (v=1)", + "Observing 2", + "next() microtask interleaving (v=2)", + "Observing 3", + "next() microtask interleaving (v=3)", + "complete()", + ]); +}, "from(): Asynchronous iterable conversion"); + +// This test is a more chaotic version of the above. It ensures that a single +// Observable can handle multiple in-flight subscriptions to the same underlying +// async iterable without the two subscriptions competing. +// +// This test is added because it is easy to imagine an implementation whereby +// upon subscription, the Observable's internal subscribe callback takes the +// underlying async iterable object, and simply pulls the async iterator off of +// it (by invoking `@@asyncIterator`), and saves it alongside the underlying +// async iterable. This async iterator would be used to manage values as they +// are asynchronously emitted from the underlying object, but this value can get +// OVERWRITTEN by a brand new subscription that comes in before the first +// subscription has completed. In a broken implementation, this overwriting +// would prevent the first subscription from ever completing. +promise_test(async t => { + const async_iterable = { + slow: true, + [Symbol.asyncIterator]() { + // The first time @@asyncIterator is called, `shouldBeSlow` is true, and + // when the return object takes closure of it, all values are emitted + // SLOWLY asynchronously. The second time, `shouldBeSlow` is false, and + // all values are emitted FAST but still asynchronous. + const shouldBeSlow = this.slow; + this.slow = false; + + return { + val: 0, + next() { + // Returns a Promise that resolves in a random amount of time less + // than a second. + return new Promise(resolve => { + t.step_timeout(() => resolve({ + value: `${this.val}-${shouldBeSlow ? 'slow' : 'fast'}`, + done: this.val++ === 4 ? true : false, + }), shouldBeSlow ? 200 : 0); + }); + }, + }; + }, + }; + + const results = []; + const source = Observable.from(async_iterable); + + const subscribeFunction = function(resolve, reject) { + source.subscribe({ + next: v => results.push(v), + complete: () => resolve(), + }); + + // A broken implementation will rely on this timeout. + t.step_timeout(() => reject('TIMEOUT'), 3000); + } + + const slow_promise = new Promise(subscribeFunction); + const fast_promise = new Promise(subscribeFunction); + await Promise.all([slow_promise, fast_promise]); + assert_array_equals(results, [ + '0-fast', + '1-fast', + '2-fast', + '3-fast', + '0-slow', + '1-slow', + '2-slow', + '3-slow', + ]); +}, "from(): Asynchronous iterable multiple in-flight subscriptions competing"); + +promise_test(async () => { + const async_generator = async function*() { + yield 1; + yield 2; + yield 3; + }; + + const results = []; + const source = Observable.from(async_generator()); + + const subscribeFunction = function(resolve) { + source.subscribe({ + next: v => results.push(v), + complete: () => resolve(), + }); + } + await new Promise(subscribeFunction); + assert_array_equals(results, [1, 2, 3]); + await new Promise(subscribeFunction); + assert_array_equals(results, [1, 2, 3]); +}, "from(): Asynchronous generator conversion: can only be used once"); + +// The value returned by an async iterator object's `next()` method is supposed +// to be a Promise. But this requirement "isn't enforced": see [1]. Therefore, +// the Observable spec unconditionally wraps the return value in a resolved +// Promise, as is standard practice [2]. +// +// This test ensures that even if the object returned from an async iterator's +// `next()` method is a synchronously-available object with `done: true` +// (instead of a Promise), the `done` property is STILL not retrieved +// synchronously. In other words, we test that the Promise-wrapping is +// implemented. +// +// [1]: https://tc39.es/ecma262/#table-async-iterator-r +// [2]: https://matrixlogs.bakkot.com/WHATWG/2024-08-30#L30 +promise_test(async () => { + const results = []; + + const async_iterable = { + [Symbol.asyncIterator]() { + return { + next() { + return { + value: undefined, + get done() { + results.push('done() GETTER called'); + return true; + }, + }; + }, + }; + }, + }; + + const source = Observable.from(async_iterable); + assert_array_equals(results, []); + + queueMicrotask(() => results.push('Microtask queued before subscription')); + source.subscribe(); + assert_array_equals(results, []); + + await Promise.resolve(); + assert_array_equals(results, [ + "Microtask queued before subscription", + "done() GETTER called", + ]); +}, "from(): Promise-wrapping semantics of IteratorResult interface"); + +// Errors thrown from [Symbol.asyncIterator] are propagated to the observer +// synchronously. This is because in language constructs (i.e., for-await of +// loops) that invoke [Symbol.asyncIterator]() that throw errors, the errors are +// synchronously propagated to script outside of the loop, and are catchable. +// Observables follow this precedent. +test(() => { + const error = new Error("[Symbol.asyncIterator] error"); + const results = []; + const async_iterable = { + [Symbol.asyncIterator]() { + results.push("[Symbol.asyncIterator]() invoked"); + throw error; + } + }; + + Observable.from(async_iterable).subscribe({ + error: e => results.push(e), + }); + + assert_array_equals(results, [ + "[Symbol.asyncIterator]() invoked", + error, + ]); +}, "from(): Errors thrown in Symbol.asyncIterator() are propagated synchronously"); + +// AsyncIterable: next() throws exception instead of return Promise. Any errors +// that occur during the the retrieval of `next()` always result in a rejected +// Promise. Therefore, the error makes it to the Observer with microtask timing. +promise_test(async () => { + const nextError = new Error('next error'); + const async_iterable = { + [Symbol.asyncIterator]() { + return { + get next() { + throw nextError; + } + }; + } + }; + + const results = []; + Observable.from(async_iterable).subscribe({ + error: e => results.push(e), + }); + + assert_array_equals(results, []); + // Wait one microtask since the error will be propagated through a rejected + // Promise managed by the async iterable conversion semantics. + await Promise.resolve(); + assert_array_equals(results, [nextError]); +}, "from(): Errors thrown in async iterator's next() GETTER are propagated " + + "in a microtask"); +promise_test(async () => { + const nextError = new Error('next error'); + const async_iterable = { + [Symbol.asyncIterator]() { + return { + next() { + throw nextError; + } + }; + } + }; + + const results = []; + Observable.from(async_iterable).subscribe({ + error: e => results.push(e), + }); + + assert_array_equals(results, []); + await Promise.resolve(); + assert_array_equals(results, [nextError]); +}, "from(): Errors thrown in async iterator's next() are propagated in a microtask"); + +test(() => { + const results = []; + const iterable = { + [Symbol.iterator]() { + return { + val: 0, + next() { + results.push(`IteratorRecord#next() pushing ${this.val}`); + return { + value: this.val, + done: this.val++ === 10 ? true : false, + }; + }, + return() { + results.push(`IteratorRecord#return() called with this.val=${this.val}`); + }, + }; + }, + }; + + const ac = new AbortController(); + Observable.from(iterable).subscribe(v => { + results.push(`Observing ${v}`); + if (v === 3) { + ac.abort(); + } + }, {signal: ac.signal}); + + assert_array_equals(results, [ + "IteratorRecord#next() pushing 0", + "Observing 0", + "IteratorRecord#next() pushing 1", + "Observing 1", + "IteratorRecord#next() pushing 2", + "Observing 2", + "IteratorRecord#next() pushing 3", + "Observing 3", + "IteratorRecord#return() called with this.val=4", + ]); +}, "from(): Aborting sync iterable midway through iteration both stops iteration " + + "and invokes `IteratorRecord#return()"); + +// This test exercises the logic of `GetIterator()` async->sync fallback +// logic. Specifically, we have an object that is an async iterable — that is, +// it has a callback [Symbol.asyncIterator] implementation. Observable.from() +// detects this, and commits to converting the object from the async iterable +// protocol. Then, after conversion but before subscription, the object is +// modified such that it no longer implements the async iterable protocol. +// +// But since it still implements the *iterable* protocol, ECMAScript's +// `GetIterator()` abstract algorithm [1] is fully exercised, which is spec'd to +// fall-back to the synchronous iterable protocol if it exists, and create a +// fully async iterable out of the synchronous iterable. +// +// [1]: https://tc39.es/ecma262/#sec-getiterator +promise_test(async () => { + const results = []; + const async_iterable = { + asyncIteratorGotten: false, + get [Symbol.asyncIterator]() { + results.push("[Symbol.asyncIterator] GETTER"); + if (this.asyncIteratorGotten) { + return null; // Both null and undefined work here. + } + + this.asyncIteratorGotten = true; + // The only requirement for `this` to be converted as an async + // iterable -> Observable is that the return value be callable (i.e., a function). + return function() {}; + }, + + [Symbol.iterator]() { + results.push('[Symbol.iterator]() invoked as fallback'); + return { + val: 0, + next() { + return { + value: this.val, + done: this.val++ === 4 ? true : false, + }; + }, + }; + }, + }; + + const source = Observable.from(async_iterable); + assert_array_equals(results, [ + "[Symbol.asyncIterator] GETTER", + ]); + + await new Promise((resolve, reject) => { + source.subscribe({ + next: v => { + results.push(`Observing ${v}`); + queueMicrotask(() => results.push(`next() microtask interleaving (v=${v})`)); + }, + error: e => reject(e), + complete: () => { + results.push('complete()'); + resolve(); + }, + }); + }); + + assert_array_equals(results, [ + // Old: + "[Symbol.asyncIterator] GETTER", + // New: + "[Symbol.asyncIterator] GETTER", + "[Symbol.iterator]() invoked as fallback", + "Observing 0", + "next() microtask interleaving (v=0)", + "Observing 1", + "next() microtask interleaving (v=1)", + "Observing 2", + "next() microtask interleaving (v=2)", + "Observing 3", + "next() microtask interleaving (v=3)", + "complete()", + ]); +}, "from(): Asynchronous iterable conversion, with synchronous iterable fallback"); + +test(() => { + const results = []; + let generatorFinalized = false; + + const generator = function*() { + try { + for (let n = 0; n < 10; n++) { + yield n; + } + } finally { + generatorFinalized = true; + } + }; + + const observable = Observable.from(generator()); + const abortController = new AbortController(); + + observable.subscribe(n => { + results.push(n); + if (n === 3) { + abortController.abort(); + } + }, {signal: abortController.signal}); + + assert_array_equals(results, [0, 1, 2, 3]); + assert_true(generatorFinalized); +}, "from(): Generator finally block runs when subscription is aborted"); + +test(() => { + const results = []; + let generatorFinalized = false; + + const generator = function*() { + try { + for (let n = 0; n < 10; n++) { + yield n; + } + } catch { + assert_unreached("generator should not be aborted"); + } finally { + generatorFinalized = true; + } + }; + + const observable = Observable.from(generator()); + + observable.subscribe((n) => { + results.push(n); + }); + + assert_array_equals(results, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + assert_true(generatorFinalized); +}, "from(): Generator finally block run when Observable completes"); + +test(() => { + const results = []; + let generatorFinalized = false; + + const generator = function*() { + try { + for (let n = 0; n < 10; n++) { + yield n; + } + throw new Error('from the generator'); + } finally { + generatorFinalized = true; + } + }; + + const observable = Observable.from(generator()); + + observable.subscribe({ + next: n => results.push(n), + error: e => results.push(e.message) + }); + + assert_array_equals(results, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, "from the generator"]); + assert_true(generatorFinalized); +}, "from(): Generator finally block run when Observable errors"); + +promise_test(async t => { + const results = []; + let generatorFinalized = false; + + async function* asyncGenerator() { + try { + for (let n = 0; n < 10; n++) { + yield n; + } + } finally { + generatorFinalized = true; + } + } + + const observable = Observable.from(asyncGenerator()); + const abortController = new AbortController(); + + await new Promise((resolve) => { + observable.subscribe((n) => { + results.push(n); + if (n === 3) { + abortController.abort(); + resolve(); + } + }, {signal: abortController.signal}); + }); + + assert_array_equals(results, [0, 1, 2, 3]); + assert_true(generatorFinalized); +}, "from(): Async generator finally block run when subscription is aborted"); + +promise_test(async t => { + const results = []; + let generatorFinalized = false; + + async function* asyncGenerator() { + try { + for (let n = 0; n < 10; n++) { + yield n; + } + } finally { + generatorFinalized = true; + } + } + + const observable = Observable.from(asyncGenerator()); + + await new Promise(resolve => { + observable.subscribe({ + next: n => results.push(n), + complete: () => resolve(), + }); + }); + + assert_array_equals(results, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + assert_true(generatorFinalized); +}, "from(): Async generator finally block runs when Observable completes"); + +promise_test(async t => { + const results = []; + let generatorFinalized = false; + + async function* asyncGenerator() { + try { + for (let n = 0; n < 10; n++) { + if (n === 4) { + throw new Error('from the async generator'); + } + yield n; + } + } finally { + generatorFinalized = true; + } + } + + const observable = Observable.from(asyncGenerator()); + + await new Promise((resolve) => { + observable.subscribe({ + next: (n) => results.push(n), + error: (e) => { + results.push(e.message); + resolve(); + } + }); + }); + + assert_array_equals(results, [0, 1, 2, 3, "from the async generator"]); + assert_true(generatorFinalized); +}, "from(): Async generator finally block run when Observable errors");