From 709c4757584799c354bf8d1caaf5d1891c2b762c Mon Sep 17 00:00:00 2001 From: Marco Castelluccio Date: Thu, 26 Sep 2024 01:10:41 +0000 Subject: [PATCH] Bug 1919940 [wpt PR 48214] - DOM: Implement async iterable conversion support for Observables, a=testonly MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Automatic update from web-platform-tests DOM: Implement async iterable conversion support for Observables This CL implements async iterator support for Observables, and adds tons of WPTs exercising subtle functionality of iterable and async iterable Observable conversion semantics. It implements the spec text in https://github.com/WICG/observable/pull/160, and is a follow-up to https://crrev.com/c/5840672, which brings async iterable support to core bindings code. This CL amounts to a partial implementation of async iterable support; what's missing and what will be included as a follow-up is: 1. Support for calling the Async Iterator's `return()` method [1] when an Observable — when consuming an async iterable — aborts its subscription before iterable exhaustion. 3. A possible refactor to move some of the logic that handles the `ScriptIterator` into `ScriptIterator` itself, per discussion in [2]. [1]: https://tc39.es/ecma262/#sec-asynciterator-interface [2]: https://chromium-review.googlesource.com/c/chromium/src/+/5840672/comment/72df95a9_dd32d801/ R=masonfchromium.org Bug: 40282760, 363015168 Change-Id: I5f31f4028613245025de71b8975fc92e9d1def0a Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/5850509 Reviewed-by: Mason Freed Commit-Queue: Dominic Farolino Cr-Commit-Position: refs/heads/main{#1356228} -- wpt-commits: 4b27fc79b6ce4edaf1d6a8c0072bd71fce62a6b2 wpt-pr: 48214 UltraBlame original commit: 2a63fcd3013866bf2a31144f3dd0fa10b96130ac --- .../tentative/observable-from.any.js | 575 ++++++++++++++++++ 1 file changed, 575 insertions(+) diff --git a/testing/web-platform/tests/dom/observable/tentative/observable-from.any.js b/testing/web-platform/tests/dom/observable/tentative/observable-from.any.js index e47685541931c..b7b9163799602 100644 --- a/testing/web-platform/tests/dom/observable/tentative/observable-from.any.js +++ b/testing/web-platform/tests/dom/observable/tentative/observable-from.any.js @@ -541,3 +541,578 @@ test(() => { + +test(() => { + const sync_iterable = { + [Symbol.asyncIterator]: null, + [Symbol.iterator]() { + return { + value: 0, + next() { + if (this.value === 2) + return {value: undefined, done: true}; + else + return {value: this.value++, done: false}; + } + } + }, + }; + + const results = []; + const source = Observable.from(sync_iterable).subscribe(v => results.push(v)); + assert_array_equals(results, [0, 1]); +}, "from(): Async iterable protocol null, converts as iterator"); + +promise_test(async t => { + const results = []; + const async_iterable = { + [Symbol.asyncIterator]() { + results.push("[Symbol.asyncIterator]() invoked"); + return { + val: 0, + next() { + return new Promise(resolve => { + t.step_timeout(() => { + resolve({ + value: this.val, + done: this.val++ === 4 ? true : false, + }); + }, 400); + }); + }, + }; + }, + }; + + const source = Observable.from(async_iterable); + assert_array_equals(results, []); + + await new Promise(resolve => { + source.subscribe({ + next: v => { + results.push(`Observing ${v}`); + queueMicrotask(() => results.push(`next() microtask interleaving (v=${v})`)); + }, + complete: () => { + results.push('complete()'); + resolve(); + }, + }); + }); + + assert_array_equals(results, [ + "[Symbol.asyncIterator]() invoked", + "Observing 0", + "next() microtask interleaving (v=0)", + "Observing 1", + "next() microtask interleaving (v=1)", + "Observing 2", + "next() microtask interleaving (v=2)", + "Observing 3", + "next() microtask interleaving (v=3)", + "complete()", + ]); +}, "from(): Asynchronous iterable conversion"); + + + + + + + + + + + + + + +promise_test(async t => { + const async_iterable = { + slow: true, + [Symbol.asyncIterator]() { + + + + + const shouldBeSlow = this.slow; + this.slow = false; + + return { + val: 0, + next() { + + + return new Promise(resolve => { + t.step_timeout(() => resolve({ + value: `${this.val}-${shouldBeSlow ? 'slow' : 'fast'}`, + done: this.val++ === 4 ? true : false, + }), shouldBeSlow ? 200 : 0); + }); + }, + }; + }, + }; + + const results = []; + const source = Observable.from(async_iterable); + + const subscribeFunction = function(resolve, reject) { + source.subscribe({ + next: v => results.push(v), + complete: () => resolve(), + }); + + + t.step_timeout(() => reject('TIMEOUT'), 3000); + } + + const slow_promise = new Promise(subscribeFunction); + const fast_promise = new Promise(subscribeFunction); + await Promise.all([slow_promise, fast_promise]); + assert_array_equals(results, [ + '0-fast', + '1-fast', + '2-fast', + '3-fast', + '0-slow', + '1-slow', + '2-slow', + '3-slow', + ]); +}, "from(): Asynchronous iterable multiple in-flight subscriptions competing"); + +promise_test(async () => { + const async_generator = async function*() { + yield 1; + yield 2; + yield 3; + }; + + const results = []; + const source = Observable.from(async_generator()); + + const subscribeFunction = function(resolve) { + source.subscribe({ + next: v => results.push(v), + complete: () => resolve(), + }); + } + await new Promise(subscribeFunction); + assert_array_equals(results, [1, 2, 3]); + await new Promise(subscribeFunction); + assert_array_equals(results, [1, 2, 3]); +}, "from(): Asynchronous generator conversion: can only be used once"); + + + + + + + + + + + + + + +promise_test(async () => { + const results = []; + + const async_iterable = { + [Symbol.asyncIterator]() { + return { + next() { + return { + value: undefined, + get done() { + results.push('done() GETTER called'); + return true; + }, + }; + }, + }; + }, + }; + + const source = Observable.from(async_iterable); + assert_array_equals(results, []); + + queueMicrotask(() => results.push('Microtask queued before subscription')); + source.subscribe(); + assert_array_equals(results, []); + + await Promise.resolve(); + assert_array_equals(results, [ + "Microtask queued before subscription", + "done() GETTER called", + ]); +}, "from(): Promise-wrapping semantics of IteratorResult interface"); + + + + + + +test(() => { + const error = new Error("[Symbol.asyncIterator] error"); + const results = []; + const async_iterable = { + [Symbol.asyncIterator]() { + results.push("[Symbol.asyncIterator]() invoked"); + throw error; + } + }; + + Observable.from(async_iterable).subscribe({ + error: e => results.push(e), + }); + + assert_array_equals(results, [ + "[Symbol.asyncIterator]() invoked", + error, + ]); +}, "from(): Errors thrown in Symbol.asyncIterator() are propagated synchronously"); + + + + +promise_test(async () => { + const nextError = new Error('next error'); + const async_iterable = { + [Symbol.asyncIterator]() { + return { + get next() { + throw nextError; + } + }; + } + }; + + const results = []; + Observable.from(async_iterable).subscribe({ + error: e => results.push(e), + }); + + assert_array_equals(results, []); + + + await Promise.resolve(); + assert_array_equals(results, [nextError]); +}, "from(): Errors thrown in async iterator's next() GETTER are propagated " + + "in a microtask"); +promise_test(async () => { + const nextError = new Error('next error'); + const async_iterable = { + [Symbol.asyncIterator]() { + return { + next() { + throw nextError; + } + }; + } + }; + + const results = []; + Observable.from(async_iterable).subscribe({ + error: e => results.push(e), + }); + + assert_array_equals(results, []); + await Promise.resolve(); + assert_array_equals(results, [nextError]); +}, "from(): Errors thrown in async iterator's next() are propagated in a microtask"); + +test(() => { + const results = []; + const iterable = { + [Symbol.iterator]() { + return { + val: 0, + next() { + results.push(`IteratorRecord#next() pushing ${this.val}`); + return { + value: this.val, + done: this.val++ === 10 ? true : false, + }; + }, + return() { + results.push(`IteratorRecord#return() called with this.val=${this.val}`); + }, + }; + }, + }; + + const ac = new AbortController(); + Observable.from(iterable).subscribe(v => { + results.push(`Observing ${v}`); + if (v === 3) { + ac.abort(); + } + }, {signal: ac.signal}); + + assert_array_equals(results, [ + "IteratorRecord#next() pushing 0", + "Observing 0", + "IteratorRecord#next() pushing 1", + "Observing 1", + "IteratorRecord#next() pushing 2", + "Observing 2", + "IteratorRecord#next() pushing 3", + "Observing 3", + "IteratorRecord#return() called with this.val=4", + ]); +}, "from(): Aborting sync iterable midway through iteration both stops iteration " + + "and invokes `IteratorRecord#return()"); + + + + + + + + + + + + + + +promise_test(async () => { + const results = []; + const async_iterable = { + asyncIteratorGotten: false, + get [Symbol.asyncIterator]() { + results.push("[Symbol.asyncIterator] GETTER"); + if (this.asyncIteratorGotten) { + return null; + } + + this.asyncIteratorGotten = true; + + + return function() {}; + }, + + [Symbol.iterator]() { + results.push('[Symbol.iterator]() invoked as fallback'); + return { + val: 0, + next() { + return { + value: this.val, + done: this.val++ === 4 ? true : false, + }; + }, + }; + }, + }; + + const source = Observable.from(async_iterable); + assert_array_equals(results, [ + "[Symbol.asyncIterator] GETTER", + ]); + + await new Promise((resolve, reject) => { + source.subscribe({ + next: v => { + results.push(`Observing ${v}`); + queueMicrotask(() => results.push(`next() microtask interleaving (v=${v})`)); + }, + error: e => reject(e), + complete: () => { + results.push('complete()'); + resolve(); + }, + }); + }); + + assert_array_equals(results, [ + + "[Symbol.asyncIterator] GETTER", + + "[Symbol.asyncIterator] GETTER", + "[Symbol.iterator]() invoked as fallback", + "Observing 0", + "next() microtask interleaving (v=0)", + "Observing 1", + "next() microtask interleaving (v=1)", + "Observing 2", + "next() microtask interleaving (v=2)", + "Observing 3", + "next() microtask interleaving (v=3)", + "complete()", + ]); +}, "from(): Asynchronous iterable conversion, with synchronous iterable fallback"); + +test(() => { + const results = []; + let generatorFinalized = false; + + const generator = function*() { + try { + for (let n = 0; n < 10; n++) { + yield n; + } + } finally { + generatorFinalized = true; + } + }; + + const observable = Observable.from(generator()); + const abortController = new AbortController(); + + observable.subscribe(n => { + results.push(n); + if (n === 3) { + abortController.abort(); + } + }, {signal: abortController.signal}); + + assert_array_equals(results, [0, 1, 2, 3]); + assert_true(generatorFinalized); +}, "from(): Generator finally block runs when subscription is aborted"); + +test(() => { + const results = []; + let generatorFinalized = false; + + const generator = function*() { + try { + for (let n = 0; n < 10; n++) { + yield n; + } + } catch { + assert_unreached("generator should not be aborted"); + } finally { + generatorFinalized = true; + } + }; + + const observable = Observable.from(generator()); + + observable.subscribe((n) => { + results.push(n); + }); + + assert_array_equals(results, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + assert_true(generatorFinalized); +}, "from(): Generator finally block run when Observable completes"); + +test(() => { + const results = []; + let generatorFinalized = false; + + const generator = function*() { + try { + for (let n = 0; n < 10; n++) { + yield n; + } + throw new Error('from the generator'); + } finally { + generatorFinalized = true; + } + }; + + const observable = Observable.from(generator()); + + observable.subscribe({ + next: n => results.push(n), + error: e => results.push(e.message) + }); + + assert_array_equals(results, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, "from the generator"]); + assert_true(generatorFinalized); +}, "from(): Generator finally block run when Observable errors"); + +promise_test(async t => { + const results = []; + let generatorFinalized = false; + + async function* asyncGenerator() { + try { + for (let n = 0; n < 10; n++) { + yield n; + } + } finally { + generatorFinalized = true; + } + } + + const observable = Observable.from(asyncGenerator()); + const abortController = new AbortController(); + + await new Promise((resolve) => { + observable.subscribe((n) => { + results.push(n); + if (n === 3) { + abortController.abort(); + resolve(); + } + }, {signal: abortController.signal}); + }); + + assert_array_equals(results, [0, 1, 2, 3]); + assert_true(generatorFinalized); +}, "from(): Async generator finally block run when subscription is aborted"); + +promise_test(async t => { + const results = []; + let generatorFinalized = false; + + async function* asyncGenerator() { + try { + for (let n = 0; n < 10; n++) { + yield n; + } + } finally { + generatorFinalized = true; + } + } + + const observable = Observable.from(asyncGenerator()); + + await new Promise(resolve => { + observable.subscribe({ + next: n => results.push(n), + complete: () => resolve(), + }); + }); + + assert_array_equals(results, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + assert_true(generatorFinalized); +}, "from(): Async generator finally block runs when Observable completes"); + +promise_test(async t => { + const results = []; + let generatorFinalized = false; + + async function* asyncGenerator() { + try { + for (let n = 0; n < 10; n++) { + if (n === 4) { + throw new Error('from the async generator'); + } + yield n; + } + } finally { + generatorFinalized = true; + } + } + + const observable = Observable.from(asyncGenerator()); + + await new Promise((resolve) => { + observable.subscribe({ + next: (n) => results.push(n), + error: (e) => { + results.push(e.message); + resolve(); + } + }); + }); + + assert_array_equals(results, [0, 1, 2, 3, "from the async generator"]); + assert_true(generatorFinalized); +}, "from(): Async generator finally block run when Observable errors");