From 2a6c082fabc05be0fc98ad309cce4000f3f2b5ce Mon Sep 17 00:00:00 2001 From: Dominic Farolino Date: Tue, 24 Sep 2024 17:17:35 +0000 Subject: [PATCH] Bug 1920020 [wpt PR 48280] - DOM: Implement abortable async iterable Observables, a=testonly Automatic update from web-platform-tests DOM: Implement abortable async iterable Observables The IteratorRecord#return() function exists as an optional method that sync and async iterator records can supply [1] [2]. They allow for the language, or any consumer of an iterable, to signal to the iterable that the consumer will stop consuming values prematurely (i.e., before exhaustion). This method must be invoked when the consumer aborts its subscription to an Observable that was derived from an iterable. The abort reason is supplied to the `return()` iterator function for completeness. This CL: 1. Adds tests for sync & async iterables 2. Implements this for async iterables A follow-up CL will implement this for sync iterables. The semantics are specified in https://github.com/WICG/observable/pull/160. [1]: https://tc39.es/ecma262/#table-iterator-interface-optional-properties [2]: https://tc39.es/ecma262/#table-async-iterator-optional Bug: 40282760 Change-Id: Ie1091b24b233afecdec572feadc129bcc8a2d4d3 Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/5854985 Reviewed-by: Mason Freed Commit-Queue: Dominic Farolino Reviewed-by: Nate Chapin Cr-Commit-Position: refs/heads/main@{#1359083} -- wpt-commits: 83154d0455e572de16e84ebee72f56df73d2ceb3 wpt-pr: 48280 --- .../tentative/observable-from.any.js | 440 ++++++++++++++++++ 1 file changed, 440 insertions(+) diff --git a/testing/web-platform/tests/dom/observable/tentative/observable-from.any.js b/testing/web-platform/tests/dom/observable/tentative/observable-from.any.js index e924f1f5e8406..d90104ffcd625 100644 --- a/testing/web-platform/tests/dom/observable/tentative/observable-from.any.js +++ b/testing/web-platform/tests/dom/observable/tentative/observable-from.any.js @@ -681,6 +681,36 @@ promise_test(async t => { '3-slow', ]); }, "from(): Asynchronous iterable multiple in-flight subscriptions competing"); +// This test is like the above, ensuring that multiple subscriptions to the same +// sync-iterable-converted-Observable can exist at a time. Since sync iterables +// push all of their values to the Observable synchronously, the way to do this +// is subscribe to the sync iterable Observable *inside* the next handler of the +// same Observable. +test(() => { + const results = []; + + const array = [1, 2, 3, 4, 5]; + const source = Observable.from(array); + source.subscribe({ + next: v => { + results.push(v); + if (v === 3) { + // Pushes all 5 values to `results` right after the first instance of `3`. + source.subscribe({ + next: v => results.push(v), + complete: () => results.push('inner complete'), + }); + } + }, + complete: () => results.push('outer complete'), + }); + + assert_array_equals(results, [ + 1, 2, 3, + 1, 2, 3, 4, 5, 'inner complete', + 4, 5, 'outer complete' + ]); +}, "from(): Sync iterable multiple in-flight subscriptions competing"); promise_test(async () => { const async_generator = async function*() { @@ -865,6 +895,126 @@ test(() => { ]); }, "from(): Aborting sync iterable midway through iteration both stops iteration " + "and invokes `IteratorRecord#return()"); +// Like the above test, but for async iterables. +promise_test(async t => { + const results = []; + const iterable = { + [Symbol.asyncIterator]() { + return { + val: 0, + next() { + results.push(`IteratorRecord#next() pushing ${this.val}`); + return { + value: this.val, + done: this.val++ === 10 ? true : false, + }; + }, + return(reason) { + results.push(`IteratorRecord#return() called with reason=${reason}`); + return {done: true}; + }, + }; + }, + }; + + const ac = new AbortController(); + await new Promise(resolve => { + Observable.from(iterable).subscribe(v => { + results.push(`Observing ${v}`); + if (v === 3) { + ac.abort(`Aborting because v=${v}`); + resolve(); + } + }, {signal: ac.signal}); + }); + + assert_array_equals(results, [ + "IteratorRecord#next() pushing 0", + "Observing 0", + "IteratorRecord#next() pushing 1", + "Observing 1", + "IteratorRecord#next() pushing 2", + "Observing 2", + "IteratorRecord#next() pushing 3", + "Observing 3", + "IteratorRecord#return() called with reason=Aborting because v=3", + ]); +}, "from(): Aborting async iterable midway through iteration both stops iteration " + + "and invokes `IteratorRecord#return()"); + +test(() => { + const iterable = { + [Symbol.iterator]() { + return { + val: 0, + next() { + return {value: this.val, done: this.val++ === 10 ? true : false}; + }, + // Not returning an Object results in a TypeError being thrown. + return(reason) {}, + }; + }, + }; + + let thrownError = null; + const ac = new AbortController(); + Observable.from(iterable).subscribe(v => { + if (v === 3) { + try { + ac.abort(`Aborting because v=${v}`); + } catch (e) { + thrownError = e; + } + } + }, {signal: ac.signal}); + + assert_not_equals(thrownError, null, "abort() threw an Error"); + assert_true(thrownError instanceof TypeError); + assert_true(thrownError.message.includes('return()')); + assert_true(thrownError.message.includes('Object')); +}, "from(): Sync iterable: `Iterator#return()` must return an Object, or an " + + "error is thrown"); +// This test is just like the above but for async iterables. It asserts that a +// Promise is rejected when `return()` does not return an Object. +promise_test(async t => { + const iterable = { + [Symbol.asyncIterator]() { + return { + val: 0, + next() { + return {value: this.val, done: this.val++ === 10 ? true : false}; + }, + // Not returning an Object results in a rejected Promise. + return(reason) {}, + }; + }, + }; + + const unhandled_rejection_promise = new Promise((resolve, reject) => { + const unhandled_rejection_handler = e => resolve(e.reason); + self.addEventListener("unhandledrejection", unhandled_rejection_handler); + t.add_cleanup(() => + self.removeEventListener("unhandledrejection", unhandled_rejection_handler)); + + t.step_timeout(() => reject('Timeout'), 3000); + }); + + const ac = new AbortController(); + await new Promise(resolve => { + Observable.from(iterable).subscribe(v => { + if (v === 3) { + ac.abort(`Aborting because v=${v}`); + resolve(); + } + }, {signal: ac.signal}); + }); + + const reason = await unhandled_rejection_promise; + assert_true(reason instanceof TypeError); + assert_true(reason.message.includes('return()')); + assert_true(reason.message.includes('Object')); +}, "from(): Async iterable: `Iterator#return()` must return an Object, or a " + + "Promise rejects asynchronously"); // This test exercises the logic of `GetIterator()` async->sync fallback // logic. Specifically, we have an object that is an async iterable — that is, @@ -1116,3 +1266,293 @@ promise_test(async t => { assert_array_equals(results, [0, 1, 2, 3, "from the async generator"]); assert_true(generatorFinalized); }, "from(): Async generator finally block run when Observable errors"); + +// Test what happens when `return()` throws an error upon abort. +test(() => { + const results = []; + const iterable = { + [Symbol.iterator]() { + return { + val: 0, + next() { + results.push('next() called'); + return {value: this.val, done: this.val++ === 10 ? true : false}; + }, + return() { + results.push('return() about to throw an error'); + throw new Error('return() error'); + }, + }; + } + }; + + const ac = new AbortController(); + const source = Observable.from(iterable); + source.subscribe(v => { + if (v === 3) { + try { + ac.abort(); + } catch (e) { + results.push(`AbortController#abort() threw an error: ${e.message}`); + } + } + }, {signal: ac.signal}); + + assert_array_equals(results, [ + 'next() called', + 'next() called', + 'next() called', + 'next() called', + 'return() about to throw an error', + 'AbortController#abort() threw an error: return() error', + ]); +}, "from(): Sync iterable: error thrown from IteratorRecord#return() can be " + + "synchronously caught"); +promise_test(async t => { + const results = []; + const iterable = { + [Symbol.asyncIterator]() { + return { + val: 0, + next() { + results.push('next() called'); + return {value: this.val, done: this.val++ === 10 ? true : false}; + }, + return() { + results.push('return() about to throw an error'); + // For async iterables, errors thrown in `return()` end up in a + // returned rejected Promise, so no error appears on the stack + // immediately. See [1]. + // + // [1]: https://whatpr.org/webidl/1397.html#async-iterator-close. + throw new Error('return() error'); + }, + }; + } + }; + + const unhandled_rejection_promise = new Promise((resolve, reject) => { + const unhandled_rejection_handler = e => resolve(e.reason); + self.addEventListener("unhandledrejection", unhandled_rejection_handler); + t.add_cleanup(() => + self.removeEventListener("unhandledrejection", unhandled_rejection_handler)); + + t.step_timeout(() => reject('Timeout'), 1500); + }); + + const ac = new AbortController(); + const source = Observable.from(iterable); + await new Promise((resolve, reject) => { + source.subscribe(v => { + if (v === 3) { + try { + ac.abort(); + results.push('No error thrown synchronously'); + resolve('No error thrown synchronously'); + } catch (e) { + results.push(`AbortController#abort() threw an error: ${e.message}`); + reject(e); + } + } + }, {signal: ac.signal}); + }); + + assert_array_equals(results, [ + 'next() called', + 'next() called', + 'next() called', + 'next() called', + 'return() about to throw an error', + 'No error thrown synchronously', + ]); + + const reason = await unhandled_rejection_promise; + assert_true(reason instanceof Error); + assert_equals(reason.message, "return() error", + "Custom error text passed through rejected Promise"); +}, "from(): Async iterable: error thrown from IteratorRecord#return() is " + + "wrapped in rejected Promise"); + +test(() => { + const results = []; + const iterable = { + impl() { + return { + next() { + results.push('next() running'); + return {done: true}; + } + }; + } + }; + + iterable[Symbol.iterator] = iterable.impl; + { + const source = Observable.from(iterable); + source.subscribe({}, {signal: AbortSignal.abort()}); + assert_array_equals(results, []); + } + iterable[Symbol.iterator] = undefined; + iterable[Symbol.asyncIterator] = iterable.impl; + { + const source = Observable.from(iterable); + source.subscribe({}, {signal: AbortSignal.abort()}); + assert_array_equals(results, []); + } +}, "from(): Subscribing to an iterable Observable with an aborted signal " + + "does not call next()"); + +test(() => { + const results = []; + const ac = new AbortController(); + + const iterable = { + [Symbol.iterator]() { + ac.abort(); + return { + val: 0, + next() { + results.push('next() called'); + return {done: true}; + }, + return() { + results.push('return() called'); + } + }; + } + }; + + const source = Observable.from(iterable); + source.subscribe({ + next: v => results.push(v), + complete: () => results.push('complete'), + }, {signal: ac.signal}); + + assert_array_equals(results, []); +}, "from(): When iterable conversion aborts the subscription, next() is " + + "never called"); +test(() => { + const results = []; + const ac = new AbortController(); + + const iterable = { + [Symbol.asyncIterator]() { + ac.abort(); + return { + val: 0, + next() { + results.push('next() called'); + return {done: true}; + }, + return() { + results.push('return() called'); + } + }; + } + }; + + const source = Observable.from(iterable); + source.subscribe({ + next: v => results.push(v), + complete: () => results.push('complete'), + }, {signal: ac.signal}); + + assert_array_equals(results, []); +}, "from(): When async iterable conversion aborts the subscription, next() " + + "is never called"); + +// This test asserts some very subtle behavior with regard to async iterables +// and a mid-subscription signal abort. Specifically it detects that a signal +// abort ensures that the `next()` method is not called again on the iterator +// again, BUT detects that pending Promise from the *previous* `next()` call +// still has its IteratorResult object examined. I.e., the implementation +// inspecting the `done` attribute on the resolved IteratorResult is observable +// event after abort() takes place. +promise_test(async () => { + const results = []; + let resolveNext = null; + + const iterable = { + [Symbol.asyncIterator]() { + return { + next() { + results.push('next() called'); + return new Promise(resolve => { + resolveNext = resolve; + }); + }, + return() { + results.push('return() called'); + } + }; + } + }; + + const ac = new AbortController(); + const source = Observable.from(iterable); + source.subscribe({ + next: v => results.push(v), + complete: () => results.push('complete'), + }, {signal: ac.signal}); + + assert_array_equals(results, [ + "next() called", + ]); + + // First abort, ensuring `return()` is called. + ac.abort(); + + assert_array_equals(results, [ + "next() called", + "return() called", + ]); + + // Then resolve the pending `next()` Promise to an object whose `done` getter + // reports to the test whether it was accessed. We have to wait one microtask + // for the internal Observable implementation to finish "reacting" to said + // `next()` promise resolution, for it to grab the `done` attribute. + await new Promise(resolveOuter => { + resolveNext({ + get done() { + results.push('IteratorResult.done GETTER'); + resolveOuter(); + return true; + } + }); + }); + + assert_array_equals(results, [ + "next() called", + "return() called", + "IteratorResult.done GETTER", + // Note that "next() called" does not make another appearance. + ]); +}, "from(): Aborting an async iterable subscription stops subsequent next() " + + "calls, but old next() Promise reactions are web-observable"); + +test(() => { + const results = []; + const iterable = { + [Symbol.iterator]() { + return { + val: 0, + next() { + return {value: this.val, done: this.val++ === 4 ? true : false}; + }, + return() { + results.push('return() called'); + }, + }; + } + }; + + const source = Observable.from(iterable); + const ac = new AbortController(); + source.subscribe({ + next: v => results.push(v), + complete: () => results.push('complete'), + }, {signal: ac.signal}); + + ac.abort(); // Must do nothing! + assert_array_equals(results, [0, 1, 2, 3, 'complete']); +}, "from(): Abort after complete does NOT call IteratorRecord#return()");