diff --git a/cypress/e2e/api/SyncServiceProvider.spec.js b/cypress/e2e/api/SyncServiceProvider.spec.js index aae87849b4e..2d0e940fb8f 100644 --- a/cypress/e2e/api/SyncServiceProvider.spec.js +++ b/cypress/e2e/api/SyncServiceProvider.spec.js @@ -60,6 +60,7 @@ describe('Sync service provider', function() { * @param {object} ydoc Yjs document */ function createProvider(ydoc) { + const queue = [] const syncService = new SyncService({ serialize: () => 'Serialized', getDocumentState: () => null, @@ -70,6 +71,7 @@ describe('Sync service provider', function() { syncService, fileId, initialSession: null, + queue, disableBc: true, }) } diff --git a/src/components/Editor.vue b/src/components/Editor.vue index 407004d9992..2ccfcb1511a 100644 --- a/src/components/Editor.vue +++ b/src/components/Editor.vue @@ -329,6 +329,7 @@ export default { }, created() { this.$ydoc = new Doc() + this.$queue = [] // The following can be useful for debugging ydoc updates // this.$ydoc.on('update', function(update, origin, doc, tr) { // console.debug('ydoc update', update, origin, doc, tr) @@ -377,10 +378,13 @@ export default { this.listenSyncServiceEvents() + this.$providers.forEach(p => p?.destroy()) + this.$providers = [] const syncServiceProvider = createSyncServiceProvider({ ydoc: this.$ydoc, syncService: this.$syncService, fileId: this.fileId, + queue: this.$queue, initialSession: this.initialSession, }) this.$providers.push(syncServiceProvider) diff --git a/src/services/SyncServiceProvider.js b/src/services/SyncServiceProvider.js index dc015304f97..8dff59e4e6c 100644 --- a/src/services/SyncServiceProvider.js +++ b/src/services/SyncServiceProvider.js @@ -30,15 +30,16 @@ import { logger } from '../helpers/logger.js' * @param {object} options.ydoc - the Ydoc * @param {object} options.syncService - sync service to build upon * @param {number} options.fileId - file id of the file to open + * @param {number} options.queue - queue for outgoing steps * @param {object} options.initialSession - initialSession to start from * @param {boolean} options.disableBc - disable broadcast channel synchronization (default: disabled in debug mode, enabled otherwise) */ -export default function createSyncServiceProvider({ ydoc, syncService, fileId, initialSession, disableBc }) { +export default function createSyncServiceProvider({ ydoc, syncService, fileId, initialSession, queue, disableBc }) { if (!fileId) { // We need a file id as a unique identifier for y.js as otherwise state might leak between different files throw new Error('fileId is required') } - const WebSocketPolyfill = initWebSocketPolyfill(syncService, fileId, initialSession) + const WebSocketPolyfill = initWebSocketPolyfill(syncService, fileId, initialSession, queue) disableBc = disableBc ?? !!window?._oc_debug const websocketProvider = new WebsocketProvider( 'ws://localhost:1234', diff --git a/src/services/WebSocketPolyfill.js b/src/services/WebSocketPolyfill.js index 8bb8a0c5fd5..9273027bf2e 100644 --- a/src/services/WebSocketPolyfill.js +++ b/src/services/WebSocketPolyfill.js @@ -28,8 +28,9 @@ import { encodeArrayBuffer, decodeArrayBuffer } from '../helpers/base64.js' * @param {object} syncService - the sync service to build upon * @param {number} fileId - id of the file to open * @param {object} initialSession - initial session to open + * @param {object[]} queue - queue for the outgoing steps */ -export default function initWebSocketPolyfill(syncService, fileId, initialSession) { +export default function initWebSocketPolyfill(syncService, fileId, initialSession, queue) { return class WebSocketPolyfill { #url @@ -41,11 +42,9 @@ export default function initWebSocketPolyfill(syncService, fileId, initialSessio onclose onopen #handlers - #queue constructor(url) { this.url = url - this.#queue = [] logger.debug('WebSocketPolyfill#constructor', { url, fileId, initialSession }) this.#registerHandlers({ opened: ({ version, session }) => { @@ -83,32 +82,34 @@ export default function initWebSocketPolyfill(syncService, fileId, initialSessio // Useful for debugging what steps are sent and how they were initiated // data.forEach(logStep) - this.#queue.push(...data) + queue.push(...data) let outbox = [] return syncService.sendSteps(() => { - outbox = [...this.#queue] const data = { steps: this.#steps, awareness: this.#awareness, version: this.#version, } - this.#queue = [] + outbox = [...queue] logger.debug('sending steps ', data) return data - })?.catch(err => { - logger.error(err) - // try to send the steps again - this.#queue = [...outbox, ...this.#queue] - }) + })?.then(ret => { + // only keep the steps that were not send yet + queue.splice(0, + queue.length, + ...queue.filter(s => !outbox.includes(s)), + ) + return ret + }, err => logger.error(err)) } get #steps() { - return this.#queue.map(s => encodeArrayBuffer(s)) + return queue.map(s => encodeArrayBuffer(s)) .filter(s => s < 'AQ') } get #awareness() { - return this.#queue.map(s => encodeArrayBuffer(s)) + return queue.map(s => encodeArrayBuffer(s)) .findLast(s => s > 'AQ') || '' } @@ -124,19 +125,24 @@ export default function initWebSocketPolyfill(syncService, fileId, initialSessio } #sendRemainingSteps() { - if (this.#queue.length) { + if (queue.length) { + let outbox = [] return syncService.sendStepsNow(() => { const data = { steps: this.#steps, awareness: this.#awareness, version: this.#version, } - this.#queue = [] + outbox = [...queue] logger.debug('sending final steps ', data) return data - })?.catch(err => { - logger.error(err) - }) + })?.then(() => { + // only keep the steps that were not send yet + queue.splice(0, + queue.length, + ...queue.filter(s => !outbox.includes(s)), + ) + }, err => logger.error(err)) } } diff --git a/src/tests/services/WebsocketPolyfill.spec.js b/src/tests/services/WebsocketPolyfill.spec.js new file mode 100644 index 00000000000..1536d6e6992 --- /dev/null +++ b/src/tests/services/WebsocketPolyfill.spec.js @@ -0,0 +1,114 @@ +import initWebSocketPolyfill from '../../services/WebSocketPolyfill.js' + +describe('Init function', () => { + + it('returns a websocket polyfill class', () => { + const syncService = { on: jest.fn(), open: jest.fn() } + const Polyfill = initWebSocketPolyfill(syncService) + const websocket = new Polyfill('url') + expect(websocket).toBeInstanceOf(Polyfill) + }) + + it('registers handlers', () => { + const syncService = { on: jest.fn(), open: jest.fn() } + const Polyfill = initWebSocketPolyfill(syncService) + const websocket = new Polyfill('url') + expect(syncService.on).toHaveBeenCalled() + }) + + it('opens sync service', () => { + const syncService = { on: jest.fn(), open: jest.fn() } + const fileId = 123 + const initialSession = { } + const Polyfill = initWebSocketPolyfill(syncService, fileId, initialSession) + const websocket = new Polyfill('url') + expect(syncService.open).toHaveBeenCalledWith({ fileId, initialSession }) + }) + + it('sends steps to sync service', async () => { + const syncService = { + on: jest.fn(), + open: jest.fn(), + sendSteps: async getData => getData(), + } + const queue = [ 'initial' ] + const data = { dummy: 'data' } + const Polyfill = initWebSocketPolyfill(syncService, null, null, queue) + const websocket = new Polyfill('url') + const result = websocket.send(data) + expect(result).toBeInstanceOf(Promise) + expect(queue).toEqual([ 'initial' , data ]) + const dataSendOut = await result + expect(queue).toEqual([]) + expect(dataSendOut).toHaveProperty('awareness') + expect(dataSendOut).toHaveProperty('steps') + expect(dataSendOut).toHaveProperty('version') + }) + + it('handles early reject', async () => { + const syncService = { + on: jest.fn(), + open: jest.fn(), + sendSteps: jest.fn().mockRejectedValue('error before reading steps in sync service'), + } + const queue = [ 'initial' ] + const data = { dummy: 'data' } + const Polyfill = initWebSocketPolyfill(syncService, null, null, queue) + const websocket = new Polyfill('url') + const result = websocket.send(data) + expect(queue).toEqual([ 'initial' , data ]) + expect(result).toBeInstanceOf(Promise) + const returned = await result + expect(returned).toBeUndefined() + expect(queue).toEqual([ 'initial' , data ]) + }) + + it('handles reject after reading data', async () => { + const syncService = { + on: jest.fn(), + open: jest.fn(), + sendSteps: jest.fn().mockImplementation( async getData => { + getData() + throw 'error when sending in sync service' + }), + } + const queue = [ 'initial' ] + const data = { dummy: 'data' } + const Polyfill = initWebSocketPolyfill(syncService, null, null, queue) + const websocket = new Polyfill('url') + const result = websocket.send(data) + expect(queue).toEqual([ 'initial' , data ]) + expect(result).toBeInstanceOf(Promise) + const returned = await result + expect(returned).toBeUndefined() + expect(queue).toEqual([ 'initial' , data ]) + }) + + it('queue survives a close', async () => { + const syncService = { + on: jest.fn(), + open: jest.fn(), + sendSteps: jest.fn().mockImplementation( async getData => { + getData() + throw 'error when sending in sync service' + }), + sendStepsNow: jest.fn().mockImplementation( async getData => { + getData() + throw 'sendStepsNow error when sending' + }), + off: jest.fn(), + close: jest.fn( async data => data ), + } + const queue = [ 'initial' ] + const data = { dummy: 'data' } + const Polyfill = initWebSocketPolyfill(syncService, null, null, queue) + const websocket = new Polyfill('url') + websocket.onclose = jest.fn() + await websocket.send(data) + const promise = websocket.close() + expect(queue).toEqual([ 'initial' , data ]) + await promise + expect(queue).toEqual([ 'initial' , data ]) + }) + +})