From a0392f1e0349c889ffc3706872b135aec23b7f03 Mon Sep 17 00:00:00 2001 From: Dominic Farolino Date: Tue, 24 Sep 2024 16:33:41 +0000 Subject: [PATCH] Bug 1919940 [wpt PR 48214] - DOM: Implement async iterable conversion support for Observables, a=testonly MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Automatic update from web-platform-tests DOM: Implement async iterable conversion support for Observables This CL implements async iterator support for Observables, and adds tons of WPTs exercising subtle functionality of iterable and async iterable Observable conversion semantics. It implements the spec text in https://github.com/WICG/observable/pull/160, and is a follow-up to https://crrev.com/c/5840672, which brings async iterable support to core bindings code. This CL amounts to a partial implementation of async iterable support; what's missing and what will be included as a follow-up is: 1. Support for calling the Async Iterator's `return()` method [1] when an Observable — when consuming an async iterable — aborts its subscription before iterable exhaustion. 3. A possible refactor to move some of the logic that handles the `ScriptIterator` into `ScriptIterator` itself, per discussion in [2]. [1]: https://tc39.es/ecma262/#sec-asynciterator-interface [2]: https://chromium-review.googlesource.com/c/chromium/src/+/5840672/comment/72df95a9_dd32d801/ R=masonf@chromium.org Bug: 40282760, 363015168 Change-Id: I5f31f4028613245025de71b8975fc92e9d1def0a Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/5850509 Reviewed-by: Mason Freed Commit-Queue: Dominic Farolino Cr-Commit-Position: refs/heads/main@{#1356228} -- wpt-commits: 4b27fc79b6ce4edaf1d6a8c0072bd71fce62a6b2 wpt-pr: 48214 --- .../tentative/observable-from.any.js | 581 +++++++++++++++++- 1 file changed, 578 insertions(+), 3 deletions(-) diff --git a/testing/web-platform/tests/dom/observable/tentative/observable-from.any.js b/testing/web-platform/tests/dom/observable/tentative/observable-from.any.js index cc4d3119d2170..e924f1f5e8406 100644 --- a/testing/web-platform/tests/dom/observable/tentative/observable-from.any.js +++ b/testing/web-platform/tests/dom/observable/tentative/observable-from.any.js @@ -538,6 +538,581 @@ test(() => { }, "from(): Throws 'callable' error when @@iterator property is a " + "non-callable primitive"); -// TODO(dom@chromium.org): Add another test like the above, but for -// `[Symbol.asyncIterator] = null` falling back to `[Symbol.iterator]` -// conversion. +// This test exercises the line of spec prose that says: +// +// "If |asyncIteratorMethodRecord|'s [[Value]] is undefined or null, then jump +// to the step labeled 'From iterable'." +test(() => { + const sync_iterable = { + [Symbol.asyncIterator]: null, + [Symbol.iterator]() { + return { + value: 0, + next() { + if (this.value === 2) + return {value: undefined, done: true}; + else + return {value: this.value++, done: false}; + } + } + }, + }; + + const results = []; + const source = Observable.from(sync_iterable).subscribe(v => results.push(v)); + assert_array_equals(results, [0, 1]); +}, "from(): Async iterable protocol null, converts as iterator"); + +promise_test(async t => { + const results = []; + const async_iterable = { + [Symbol.asyncIterator]() { + results.push("[Symbol.asyncIterator]() invoked"); + return { + val: 0, + next() { + return new Promise(resolve => { + t.step_timeout(() => { + resolve({ + value: this.val, + done: this.val++ === 4 ? true : false, + }); + }, 400); + }); + }, + }; + }, + }; + + const source = Observable.from(async_iterable); + assert_array_equals(results, []); + + await new Promise(resolve => { + source.subscribe({ + next: v => { + results.push(`Observing ${v}`); + queueMicrotask(() => results.push(`next() microtask interleaving (v=${v})`)); + }, + complete: () => { + results.push('complete()'); + resolve(); + }, + }); + }); + + assert_array_equals(results, [ + "[Symbol.asyncIterator]() invoked", + "Observing 0", + "next() microtask interleaving (v=0)", + "Observing 1", + "next() microtask interleaving (v=1)", + "Observing 2", + "next() microtask interleaving (v=2)", + "Observing 3", + "next() microtask interleaving (v=3)", + "complete()", + ]); +}, "from(): Asynchronous iterable conversion"); + +// This test is a more chaotic version of the above. It ensures that a single +// Observable can handle multiple in-flight subscriptions to the same underlying +// async iterable without the two subscriptions competing. +// +// This test is added because it is easy to imagine an implementation whereby +// upon subscription, the Observable's internal subscribe callback takes the +// underlying async iterable object, and simply pulls the async iterator off of +// it (by invoking `@@asyncIterator`), and saves it alongside the underlying +// async iterable. This async iterator would be used to manage values as they +// are asynchronously emitted from the underlying object, but this value can get +// OVERWRITTEN by a brand new subscription that comes in before the first +// subscription has completed. In a broken implementation, this overwriting +// would prevent the first subscription from ever completing. +promise_test(async t => { + const async_iterable = { + slow: true, + [Symbol.asyncIterator]() { + // The first time @@asyncIterator is called, `shouldBeSlow` is true, and + // when the return object takes closure of it, all values are emitted + // SLOWLY asynchronously. The second time, `shouldBeSlow` is false, and + // all values are emitted FAST but still asynchronous. + const shouldBeSlow = this.slow; + this.slow = false; + + return { + val: 0, + next() { + // Returns a Promise that resolves in a random amount of time less + // than a second. + return new Promise(resolve => { + t.step_timeout(() => resolve({ + value: `${this.val}-${shouldBeSlow ? 'slow' : 'fast'}`, + done: this.val++ === 4 ? true : false, + }), shouldBeSlow ? 200 : 0); + }); + }, + }; + }, + }; + + const results = []; + const source = Observable.from(async_iterable); + + const subscribeFunction = function(resolve, reject) { + source.subscribe({ + next: v => results.push(v), + complete: () => resolve(), + }); + + // A broken implementation will rely on this timeout. + t.step_timeout(() => reject('TIMEOUT'), 3000); + } + + const slow_promise = new Promise(subscribeFunction); + const fast_promise = new Promise(subscribeFunction); + await Promise.all([slow_promise, fast_promise]); + assert_array_equals(results, [ + '0-fast', + '1-fast', + '2-fast', + '3-fast', + '0-slow', + '1-slow', + '2-slow', + '3-slow', + ]); +}, "from(): Asynchronous iterable multiple in-flight subscriptions competing"); + +promise_test(async () => { + const async_generator = async function*() { + yield 1; + yield 2; + yield 3; + }; + + const results = []; + const source = Observable.from(async_generator()); + + const subscribeFunction = function(resolve) { + source.subscribe({ + next: v => results.push(v), + complete: () => resolve(), + }); + } + await new Promise(subscribeFunction); + assert_array_equals(results, [1, 2, 3]); + await new Promise(subscribeFunction); + assert_array_equals(results, [1, 2, 3]); +}, "from(): Asynchronous generator conversion: can only be used once"); + +// The value returned by an async iterator object's `next()` method is supposed +// to be a Promise. But this requirement "isn't enforced": see [1]. Therefore, +// the Observable spec unconditionally wraps the return value in a resolved +// Promise, as is standard practice [2]. +// +// This test ensures that even if the object returned from an async iterator's +// `next()` method is a synchronously-available object with `done: true` +// (instead of a Promise), the `done` property is STILL not retrieved +// synchronously. In other words, we test that the Promise-wrapping is +// implemented. +// +// [1]: https://tc39.es/ecma262/#table-async-iterator-r +// [2]: https://matrixlogs.bakkot.com/WHATWG/2024-08-30#L30 +promise_test(async () => { + const results = []; + + const async_iterable = { + [Symbol.asyncIterator]() { + return { + next() { + return { + value: undefined, + get done() { + results.push('done() GETTER called'); + return true; + }, + }; + }, + }; + }, + }; + + const source = Observable.from(async_iterable); + assert_array_equals(results, []); + + queueMicrotask(() => results.push('Microtask queued before subscription')); + source.subscribe(); + assert_array_equals(results, []); + + await Promise.resolve(); + assert_array_equals(results, [ + "Microtask queued before subscription", + "done() GETTER called", + ]); +}, "from(): Promise-wrapping semantics of IteratorResult interface"); + +// Errors thrown from [Symbol.asyncIterator] are propagated to the observer +// synchronously. This is because in language constructs (i.e., for-await of +// loops) that invoke [Symbol.asyncIterator]() that throw errors, the errors are +// synchronously propagated to script outside of the loop, and are catchable. +// Observables follow this precedent. +test(() => { + const error = new Error("[Symbol.asyncIterator] error"); + const results = []; + const async_iterable = { + [Symbol.asyncIterator]() { + results.push("[Symbol.asyncIterator]() invoked"); + throw error; + } + }; + + Observable.from(async_iterable).subscribe({ + error: e => results.push(e), + }); + + assert_array_equals(results, [ + "[Symbol.asyncIterator]() invoked", + error, + ]); +}, "from(): Errors thrown in Symbol.asyncIterator() are propagated synchronously"); + +// AsyncIterable: next() throws exception instead of return Promise. Any errors +// that occur during the the retrieval of `next()` always result in a rejected +// Promise. Therefore, the error makes it to the Observer with microtask timing. +promise_test(async () => { + const nextError = new Error('next error'); + const async_iterable = { + [Symbol.asyncIterator]() { + return { + get next() { + throw nextError; + } + }; + } + }; + + const results = []; + Observable.from(async_iterable).subscribe({ + error: e => results.push(e), + }); + + assert_array_equals(results, []); + // Wait one microtask since the error will be propagated through a rejected + // Promise managed by the async iterable conversion semantics. + await Promise.resolve(); + assert_array_equals(results, [nextError]); +}, "from(): Errors thrown in async iterator's next() GETTER are propagated " + + "in a microtask"); +promise_test(async () => { + const nextError = new Error('next error'); + const async_iterable = { + [Symbol.asyncIterator]() { + return { + next() { + throw nextError; + } + }; + } + }; + + const results = []; + Observable.from(async_iterable).subscribe({ + error: e => results.push(e), + }); + + assert_array_equals(results, []); + await Promise.resolve(); + assert_array_equals(results, [nextError]); +}, "from(): Errors thrown in async iterator's next() are propagated in a microtask"); + +test(() => { + const results = []; + const iterable = { + [Symbol.iterator]() { + return { + val: 0, + next() { + results.push(`IteratorRecord#next() pushing ${this.val}`); + return { + value: this.val, + done: this.val++ === 10 ? true : false, + }; + }, + return() { + results.push(`IteratorRecord#return() called with this.val=${this.val}`); + }, + }; + }, + }; + + const ac = new AbortController(); + Observable.from(iterable).subscribe(v => { + results.push(`Observing ${v}`); + if (v === 3) { + ac.abort(); + } + }, {signal: ac.signal}); + + assert_array_equals(results, [ + "IteratorRecord#next() pushing 0", + "Observing 0", + "IteratorRecord#next() pushing 1", + "Observing 1", + "IteratorRecord#next() pushing 2", + "Observing 2", + "IteratorRecord#next() pushing 3", + "Observing 3", + "IteratorRecord#return() called with this.val=4", + ]); +}, "from(): Aborting sync iterable midway through iteration both stops iteration " + + "and invokes `IteratorRecord#return()"); + +// This test exercises the logic of `GetIterator()` async->sync fallback +// logic. Specifically, we have an object that is an async iterable — that is, +// it has a callback [Symbol.asyncIterator] implementation. Observable.from() +// detects this, and commits to converting the object from the async iterable +// protocol. Then, after conversion but before subscription, the object is +// modified such that it no longer implements the async iterable protocol. +// +// But since it still implements the *iterable* protocol, ECMAScript's +// `GetIterator()` abstract algorithm [1] is fully exercised, which is spec'd to +// fall-back to the synchronous iterable protocol if it exists, and create a +// fully async iterable out of the synchronous iterable. +// +// [1]: https://tc39.es/ecma262/#sec-getiterator +promise_test(async () => { + const results = []; + const async_iterable = { + asyncIteratorGotten: false, + get [Symbol.asyncIterator]() { + results.push("[Symbol.asyncIterator] GETTER"); + if (this.asyncIteratorGotten) { + return null; // Both null and undefined work here. + } + + this.asyncIteratorGotten = true; + // The only requirement for `this` to be converted as an async + // iterable -> Observable is that the return value be callable (i.e., a function). + return function() {}; + }, + + [Symbol.iterator]() { + results.push('[Symbol.iterator]() invoked as fallback'); + return { + val: 0, + next() { + return { + value: this.val, + done: this.val++ === 4 ? true : false, + }; + }, + }; + }, + }; + + const source = Observable.from(async_iterable); + assert_array_equals(results, [ + "[Symbol.asyncIterator] GETTER", + ]); + + await new Promise((resolve, reject) => { + source.subscribe({ + next: v => { + results.push(`Observing ${v}`); + queueMicrotask(() => results.push(`next() microtask interleaving (v=${v})`)); + }, + error: e => reject(e), + complete: () => { + results.push('complete()'); + resolve(); + }, + }); + }); + + assert_array_equals(results, [ + // Old: + "[Symbol.asyncIterator] GETTER", + // New: + "[Symbol.asyncIterator] GETTER", + "[Symbol.iterator]() invoked as fallback", + "Observing 0", + "next() microtask interleaving (v=0)", + "Observing 1", + "next() microtask interleaving (v=1)", + "Observing 2", + "next() microtask interleaving (v=2)", + "Observing 3", + "next() microtask interleaving (v=3)", + "complete()", + ]); +}, "from(): Asynchronous iterable conversion, with synchronous iterable fallback"); + +test(() => { + const results = []; + let generatorFinalized = false; + + const generator = function*() { + try { + for (let n = 0; n < 10; n++) { + yield n; + } + } finally { + generatorFinalized = true; + } + }; + + const observable = Observable.from(generator()); + const abortController = new AbortController(); + + observable.subscribe(n => { + results.push(n); + if (n === 3) { + abortController.abort(); + } + }, {signal: abortController.signal}); + + assert_array_equals(results, [0, 1, 2, 3]); + assert_true(generatorFinalized); +}, "from(): Generator finally block runs when subscription is aborted"); + +test(() => { + const results = []; + let generatorFinalized = false; + + const generator = function*() { + try { + for (let n = 0; n < 10; n++) { + yield n; + } + } catch { + assert_unreached("generator should not be aborted"); + } finally { + generatorFinalized = true; + } + }; + + const observable = Observable.from(generator()); + + observable.subscribe((n) => { + results.push(n); + }); + + assert_array_equals(results, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + assert_true(generatorFinalized); +}, "from(): Generator finally block run when Observable completes"); + +test(() => { + const results = []; + let generatorFinalized = false; + + const generator = function*() { + try { + for (let n = 0; n < 10; n++) { + yield n; + } + throw new Error('from the generator'); + } finally { + generatorFinalized = true; + } + }; + + const observable = Observable.from(generator()); + + observable.subscribe({ + next: n => results.push(n), + error: e => results.push(e.message) + }); + + assert_array_equals(results, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, "from the generator"]); + assert_true(generatorFinalized); +}, "from(): Generator finally block run when Observable errors"); + +promise_test(async t => { + const results = []; + let generatorFinalized = false; + + async function* asyncGenerator() { + try { + for (let n = 0; n < 10; n++) { + yield n; + } + } finally { + generatorFinalized = true; + } + } + + const observable = Observable.from(asyncGenerator()); + const abortController = new AbortController(); + + await new Promise((resolve) => { + observable.subscribe((n) => { + results.push(n); + if (n === 3) { + abortController.abort(); + resolve(); + } + }, {signal: abortController.signal}); + }); + + assert_array_equals(results, [0, 1, 2, 3]); + assert_true(generatorFinalized); +}, "from(): Async generator finally block run when subscription is aborted"); + +promise_test(async t => { + const results = []; + let generatorFinalized = false; + + async function* asyncGenerator() { + try { + for (let n = 0; n < 10; n++) { + yield n; + } + } finally { + generatorFinalized = true; + } + } + + const observable = Observable.from(asyncGenerator()); + + await new Promise(resolve => { + observable.subscribe({ + next: n => results.push(n), + complete: () => resolve(), + }); + }); + + assert_array_equals(results, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + assert_true(generatorFinalized); +}, "from(): Async generator finally block runs when Observable completes"); + +promise_test(async t => { + const results = []; + let generatorFinalized = false; + + async function* asyncGenerator() { + try { + for (let n = 0; n < 10; n++) { + if (n === 4) { + throw new Error('from the async generator'); + } + yield n; + } + } finally { + generatorFinalized = true; + } + } + + const observable = Observable.from(asyncGenerator()); + + await new Promise((resolve) => { + observable.subscribe({ + next: (n) => results.push(n), + error: (e) => { + results.push(e.message); + resolve(); + } + }); + }); + + assert_array_equals(results, [0, 1, 2, 3, "from the async generator"]); + assert_true(generatorFinalized); +}, "from(): Async generator finally block run when Observable errors");