From d796bb57e671e1132aa6947f9b7ca48ff10225e3 Mon Sep 17 00:00:00 2001 From: Alexander Onnikov Date: Fri, 18 Oct 2024 19:05:04 +0700 Subject: [PATCH 1/9] wip: cloud collaborator Signed-off-by: Alexander Onnikov --- common/config/rush/pnpm-lock.yaml | 45 +++ dev/docker-compose.yaml | 6 +- dev/prod/config.json | 5 +- dev/prod/public/config-dev.json | 5 +- dev/prod/public/config.json | 5 +- rush.json | 5 + workers/collaborator/.eslintrc.js | 7 + workers/collaborator/jest.config.js | 7 + workers/collaborator/package.json | 46 +++ workers/collaborator/src/collaborator.ts | 335 ++++++++++++++++++ workers/collaborator/src/document.ts | 66 ++++ workers/collaborator/src/env.ts | 22 ++ workers/collaborator/src/index.ts | 71 ++++ workers/collaborator/src/protocol.ts | 73 ++++ workers/collaborator/src/types.ts | 42 +++ workers/collaborator/src/utils.ts | 30 ++ workers/collaborator/src/ydoc.ts | 89 +++++ workers/collaborator/tsconfig copy.json | 12 + workers/collaborator/tsconfig.json | 12 + .../collaborator/worker-configuration.d.ts | 6 + workers/collaborator/wrangler.toml | 32 ++ 21 files changed, 915 insertions(+), 6 deletions(-) create mode 100644 workers/collaborator/.eslintrc.js create mode 100644 workers/collaborator/jest.config.js create mode 100644 workers/collaborator/package.json create mode 100644 workers/collaborator/src/collaborator.ts create mode 100644 workers/collaborator/src/document.ts create mode 100644 workers/collaborator/src/env.ts create mode 100644 workers/collaborator/src/index.ts create mode 100644 workers/collaborator/src/protocol.ts create mode 100644 workers/collaborator/src/types.ts create mode 100644 workers/collaborator/src/utils.ts create mode 100644 workers/collaborator/src/ydoc.ts create mode 100644 workers/collaborator/tsconfig copy.json create mode 100644 workers/collaborator/tsconfig.json create mode 100644 workers/collaborator/worker-configuration.d.ts create mode 100644 workers/collaborator/wrangler.toml diff --git a/common/config/rush/pnpm-lock.yaml b/common/config/rush/pnpm-lock.yaml index 39acd3418e5..c46843cf616 100644 --- a/common/config/rush/pnpm-lock.yaml +++ b/common/config/rush/pnpm-lock.yaml @@ -158,6 +158,9 @@ dependencies: '@rush-temp/cloud-branding': specifier: file:./projects/cloud-branding.tgz version: file:projects/cloud-branding.tgz(@types/node@20.11.19)(bufferutil@4.0.8)(esbuild@0.20.1)(ts-node@10.9.2)(utf-8-validate@6.0.4) + '@rush-temp/cloud-collaborator': + specifier: file:./projects/cloud-collaborator.tgz + version: file:projects/cloud-collaborator.tgz(@types/node@20.11.19)(bufferutil@4.0.8)(esbuild@0.20.1)(ts-node@10.9.2)(utf-8-validate@6.0.4) '@rush-temp/cloud-datalake': specifier: file:./projects/cloud-datalake.tgz version: file:projects/cloud-datalake.tgz(@types/node@20.11.19)(bufferutil@4.0.8)(esbuild@0.20.1)(ts-node@10.9.2)(utf-8-validate@6.0.4) @@ -23205,6 +23208,48 @@ packages: - utf-8-validate dev: false + file:projects/cloud-collaborator.tgz(@types/node@20.11.19)(bufferutil@4.0.8)(esbuild@0.20.1)(ts-node@10.9.2)(utf-8-validate@6.0.4): + resolution: {integrity: sha512-QNx9L+csvt4b90kZlVN5Nl0XFCynmmohJSObAh+NeLx0C4yDLPAfAhiIB7fzraoEfOZ2yzomUYirYHTApx10fA==, tarball: file:projects/cloud-collaborator.tgz} + id: file:projects/cloud-collaborator.tgz + name: '@rush-temp/cloud-collaborator' + version: 0.0.0 + dependencies: + '@cloudflare/workers-types': 4.20241004.0 + '@types/jest': 29.5.12 + '@typescript-eslint/eslint-plugin': 6.21.0(@typescript-eslint/parser@6.21.0)(eslint@8.56.0)(typescript@5.6.2) + '@typescript-eslint/parser': 6.21.0(eslint@8.56.0)(typescript@5.6.2) + eslint: 8.56.0 + eslint-config-standard-with-typescript: 40.0.0(@typescript-eslint/eslint-plugin@6.21.0)(eslint-plugin-import@2.29.1)(eslint-plugin-n@15.7.0)(eslint-plugin-promise@6.1.1)(eslint@8.56.0)(typescript@5.6.2) + eslint-plugin-import: 2.29.1(eslint@8.56.0) + eslint-plugin-n: 15.7.0(eslint@8.56.0) + eslint-plugin-promise: 6.1.1(eslint@8.56.0) + itty-router: 5.0.18 + jest: 29.7.0(@types/node@20.11.19)(ts-node@10.9.2) + lib0: 0.2.89 + prettier: 3.2.5 + ts-jest: 29.1.2(esbuild@0.20.1)(jest@29.7.0)(typescript@5.6.2) + typescript: 5.6.2 + wrangler: 3.80.2(@cloudflare/workers-types@4.20241004.0)(bufferutil@4.0.8)(utf-8-validate@6.0.4) + y-prosemirror: 1.2.12(y-protocols@1.0.6)(yjs@13.6.19) + y-protocols: 1.0.6(yjs@13.6.19) + yjs: 13.6.19 + transitivePeerDependencies: + - '@babel/core' + - '@jest/types' + - '@types/node' + - babel-jest + - babel-plugin-macros + - bufferutil + - esbuild + - node-notifier + - prosemirror-model + - prosemirror-state + - prosemirror-view + - supports-color + - ts-node + - utf-8-validate + dev: false + file:projects/cloud-datalake.tgz(@types/node@20.11.19)(bufferutil@4.0.8)(esbuild@0.20.1)(ts-node@10.9.2)(utf-8-validate@6.0.4): resolution: {integrity: sha512-pOjup2gTfDH4Qq9r4LTeVnf+nDQvlTSUOxLtT+bKnrHM0jQQ705v6wYtmkpWD7CtEwwCvS9CH+iPZwB5ZNpzqQ==, tarball: file:projects/cloud-datalake.tgz} id: file:projects/cloud-datalake.tgz diff --git a/dev/docker-compose.yaml b/dev/docker-compose.yaml index d3f260117c0..11bdd463554 100644 --- a/dev/docker-compose.yaml +++ b/dev/docker-compose.yaml @@ -94,7 +94,7 @@ services: # - REGION_INFO=cockroach|CockroachDB - TRANSACTOR_URL=ws://host.docker.internal:3333,ws://host.docker.internal:3331;;pg,ws://host.docker.internal:3332;;cockroach, - SES_URL= - - STORAGE_CONFIG=${STORAGE_CONFIG} + - STORAGE_CONFIG=datalake|https://huly.net - FRONT_URL=http://host.docker.internal:8087 - RESERVED_DB_NAMES=telegram,gmail,github - MODEL_ENABLED=* @@ -159,7 +159,7 @@ services: - STATS_URL=http://host.docker.internal:4900 - SES_URL= - REGION=pg - - STORAGE_CONFIG=${STORAGE_CONFIG} + - STORAGE_CONFIG=datalake|https://huly.net - FRONT_URL=http://host.docker.internal:8087 - RESERVED_DB_NAMES=telegram,gmail,github - MODEL_ENABLED=* @@ -240,7 +240,7 @@ services: - TELEGRAM_URL=http://host.docker.internal:8086 - REKONI_URL=http://host.docker.internal:4004 - COLLABORATOR_URL=ws://host.docker.internal:3078 - - STORAGE_CONFIG=${STORAGE_CONFIG} + - STORAGE_CONFIG=datalake|https://huly.net - GITHUB_URL=http://host.docker.internal:3500 - PRINT_URL=http://host.docker.internal:4005 - SIGN_URL=http://host.docker.internal:4006 diff --git a/dev/prod/config.json b/dev/prod/config.json index f03fb9b3338..9839d5195b7 100644 --- a/dev/prod/config.json +++ b/dev/prod/config.json @@ -1,7 +1,10 @@ { "ACCOUNTS_URL":"http://localhost:3000", "COLLABORATOR_URL": "ws://localhost:3078", - "UPLOAD_URL":"/files", + "UPLOAD_URL":"http://localhost:8787/upload/form-data/:workspace", + "UPLOAD_CONFIG":"signed-url|100|http://localhost:8787/upload/signed-url/:workspace/:blobId", + "FILES_URL":"http://localhost:8787/blob/:workspace/:blobId", + "PREVIEW_CONFIG":"http://localhost:8787/image/fit=scale-down,width=:size/:workspace/:blobId", "REKONI_URL": "http://localhost:4004", "PRINT_URL": "http://localhost:4005", "SIGN_URL": "http://localhost:4006", diff --git a/dev/prod/public/config-dev.json b/dev/prod/public/config-dev.json index 8f0b1e22f26..cbede7e92f1 100644 --- a/dev/prod/public/config-dev.json +++ b/dev/prod/public/config-dev.json @@ -1,6 +1,9 @@ { "ACCOUNTS_URL":"https://account.hc.engineering", - "UPLOAD_URL":"/files", + "UPLOAD_URL":"http://localhost:8787/upload/form-data/:workspace", + "UPLOAD_CONFIG":"signed-url|100|http://localhost:8787/upload/signed-url/:workspace/:blobId", + "FILES_URL":"http://localhost:8787/blob/:workspace/:blobId", + "PREVIEW_CONFIG":"http://localhost:8787/image/fit=scale-down,width=:size/:workspace/:blobId", "MODEL_VERSION": null, "TELEGRAM_URL": "https://telegram.hc.engineering", "GMAIL_URL": "https://gmail.hc.engineering", diff --git a/dev/prod/public/config.json b/dev/prod/public/config.json index d2f1c4d3a0a..617a1d09160 100644 --- a/dev/prod/public/config.json +++ b/dev/prod/public/config.json @@ -1,7 +1,10 @@ { "ACCOUNTS_URL":"/account", "COLLABORATOR_URL": "ws://localhost:3078", - "UPLOAD_URL":"/files", + "UPLOAD_URL":"http://localhost:8787/upload/form-data/:workspace", + "UPLOAD_CONFIG":"signed-url|100|http://localhost:8787/upload/signed-url/:workspace/:blobId", + "FILES_URL":"http://localhost:8787/blob/:workspace/:blobId", + "PREVIEW_CONFIG":"http://localhost:8787/image/fit=scale-down,width=:size/:workspace/:blobId", "TELEGRAM_URL": "http://localhost:8086", "GMAIL_URL": "http://localhost:8088", "CALENDAR_URL": "http://localhost:8095", diff --git a/rush.json b/rush.json index e36f8d94032..04672db766b 100644 --- a/rush.json +++ b/rush.json @@ -2215,6 +2215,11 @@ "packageName": "@hcengineering/cloud-transactor", "projectFolder": "workers/transactor", "shouldPublish": false + }, + { + "packageName": "@hcengineering/cloud-collaborator", + "projectFolder": "workers/collaborator", + "shouldPublish": false } ] } diff --git a/workers/collaborator/.eslintrc.js b/workers/collaborator/.eslintrc.js new file mode 100644 index 00000000000..ce90fb9646f --- /dev/null +++ b/workers/collaborator/.eslintrc.js @@ -0,0 +1,7 @@ +module.exports = { + extends: ['./node_modules/@hcengineering/platform-rig/profiles/node/eslint.config.json'], + parserOptions: { + tsconfigRootDir: __dirname, + project: './tsconfig.json' + } +} diff --git a/workers/collaborator/jest.config.js b/workers/collaborator/jest.config.js new file mode 100644 index 00000000000..2cfd408b679 --- /dev/null +++ b/workers/collaborator/jest.config.js @@ -0,0 +1,7 @@ +module.exports = { + preset: 'ts-jest', + testEnvironment: 'node', + testMatch: ['**/?(*.)+(spec|test).[jt]s?(x)'], + roots: ["./src"], + coverageReporters: ["text-summary", "html"] +} diff --git a/workers/collaborator/package.json b/workers/collaborator/package.json new file mode 100644 index 00000000000..7cb77ed9783 --- /dev/null +++ b/workers/collaborator/package.json @@ -0,0 +1,46 @@ +{ + "name": "@hcengineering/cloud-collaborator", + "version": "0.6.0", + "main": "lib/index.js", + "types": "types/index.d.ts", + "template": "@hcengineering/cloud-package", + "scripts": { + "deploy": "wrangler deploy", + "dev": "wrangler dev --port 4021", + "start": "wrangler dev --port 4021", + "cf-typegen": "wrangler types", + "build": "compile", + "build:watch": "compile", + "test": "jest --passWithNoTests --silent --forceExit", + "format": "format src", + "_phase:build": "compile transpile src", + "_phase:test": "jest --passWithNoTests --silent --forceExit", + "_phase:format": "format src", + "_phase:validate": "compile validate" + }, + "devDependencies": { + "@hcengineering/platform-rig": "^0.6.0", + "@cloudflare/workers-types": "^4.20240729.0", + "typescript": "^5.3.3", + "wrangler": "^3.80.1", + "jest": "^29.7.0", + "prettier": "^3.1.0", + "ts-jest": "^29.1.1", + "@typescript-eslint/eslint-plugin": "^6.11.0", + "@typescript-eslint/parser": "^6.11.0", + "eslint-config-standard-with-typescript": "^40.0.0", + "eslint-plugin-import": "^2.26.0", + "eslint-plugin-n": "^15.4.0", + "eslint-plugin-promise": "^6.1.1", + "eslint": "^8.54.0", + "@types/jest": "^29.5.5", + "@hcengineering/cloud-datalake": "^0.6.0" + }, + "dependencies": { + "itty-router": "^5.0.18", + "lib0": "^0.2.88", + "yjs": "^13.6.19", + "y-protocols": "^1.0.6", + "y-prosemirror": "^1.2.12" + } +} diff --git a/workers/collaborator/src/collaborator.ts b/workers/collaborator/src/collaborator.ts new file mode 100644 index 00000000000..b2440ab61b5 --- /dev/null +++ b/workers/collaborator/src/collaborator.ts @@ -0,0 +1,335 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { DurableObject } from 'cloudflare:workers' +import { type RouterType, Router, error } from 'itty-router' +import * as encoding from 'lib0/encoding' +import { applyUpdate, encodeStateAsUpdate } from 'yjs' + +import { Document } from './document' +import { type Env } from './env' +import * as protocol from './protocol' +import type { + AwarenessUpdate, + DocumentRequest, + RpcGetContentRequest, + RpcRequest, + RpcUpdateContentRequest +} from './types' +import { jsonToYDoc, yDocToJSON } from './ydoc' +import { parseDocumentName } from './utils' + +export const PREFERRED_SAVE_SIZE = 500 +export const PREFERRED_SAVE_INTERVAL = 30 * 1000 + +/** + * YDoc state is stored as a series of updates in the Durable Object KV storage. + * + * Durable Object KV + * - documentId: string, document Id as provided by the client + * - versionId: string, latest document version Id + * - version-*: string, maps version Id to datalake blob Id + * - updates: Uint8Array[]: list of pending updates to be saved + */ +export class Collaborator extends DurableObject { + private readonly router: RouterType + private readonly doc: Document + private updates: Uint8Array[] + private documentId: string = '' + private hydrated: boolean = false + + constructor (ctx: DurableObjectState, env: Env) { + super(ctx, env) + + this.router = Router() + .get('/:id', async ({ documentId, headers }) => { + if (headers.get('Upgrade') !== 'websocket') { + return new Response('Expected header Upgrade: websocket', { status: 426 }) + } + + const { 0: client, 1: server } = new WebSocketPair() + this.ctx.acceptWebSocket(server) + await this.handleSession(server, documentId) + + return new Response(null, { status: 101, webSocket: client }) + }) + .post('/rpc/:id', async (request) => { + const rpc = await request.json() + switch (rpc.method) { + case 'getContent': + return this.handleRpcGetContent(rpc) + case 'updateContent': + return this.handleRpcUpdateContent(rpc) + default: + return Response.json({ error: 'Bad request' }, { status: 400 }) + } + }) + .all('*', () => error(404)) + + this.doc = new Document() + this.updates = [] + + void this.hydrate() + } + + handleRpcGetContent (request: RpcGetContentRequest): Response { + const content: Record = {} + for (const field of this.doc.share.keys()) { + content[field] = JSON.stringify(yDocToJSON(this.doc, field)) + } + return Response.json({ content }, { status: 200 }) + } + + handleRpcUpdateContent (request: RpcUpdateContentRequest): Response { + this.doc.transact(() => { + Object.entries(request.payload.content).forEach(([field, value]) => { + jsonToYDoc(JSON.parse(value), this.doc, field) + }) + }) + + return Response.json({}, { status: 200 }) + } + + async fetch (request: Request): Promise { + return await this.router.fetch(request).catch(error) + } + + async webSocketMessage (ws: WebSocket, message: ArrayBuffer | string): Promise { + if (typeof message === 'string') { + console.warn('Unexpected message type:', message) + return + } + + try { + const encoder = protocol.handleMessage(this.doc, new Uint8Array(message), ws) + if (encoding.length(encoder) > 1) { + ws.send(encoding.toUint8Array(encoder)) + } + } catch (error) { + console.error('WebSocket message error:', error) + } + } + + async webSocketClose (ws: WebSocket, code: number, reason: string, wasClean: boolean): Promise { + console.log('WebSocket closed:', code, reason, wasClean) + await this.handleClose(ws, 1000) + } + + async webSocketError (ws: WebSocket, error: unknown): Promise { + console.error('WebSocket error:', error) + await this.handleClose(ws, 1011, 'error') + } + + async alarm (): Promise { + await this.hydrate() + await this.writeDocument() + } + + async hydrate (): Promise { + if (this.hydrated) { + return + } + + await this.ctx.blockConcurrencyWhile(async () => { + const documentId = (await this.ctx.storage.get('documentId')) ?? '' + + if (documentId === '') { + // name is not set, hydrate later + return + } + + this.documentId = documentId + + // restore document state + await this.readDocument() + + this.ctx.getWebSockets().forEach((ws: WebSocket) => { + this.doc.addConnection(ws) + }) + + // enable update listeners only after the document is restored + // eslint-disable-next-line @typescript-eslint/no-misused-promises + this.doc.on('update', this.handleDocUpdate.bind(this)) + this.doc.awareness.on('update', this.handleAwarenessUpdate.bind(this)) + + this.hydrated = true + }) + } + + async handleSession (ws: WebSocket, documentId: string): Promise { + if (this.documentId === '') { + this.documentId = documentId + await this.ctx.storage.put('documentId', documentId) + } + + await this.hydrate() + + this.doc.addConnection(ws) + + const encoder = protocol.forceSyncMessage(this.doc) + ws.send(encoding.toUint8Array(encoder)) + + const clients = Array.from(this.doc.awareness.states.keys()) + if (clients.length > 0) { + const encoder = protocol.awarenessMessage(this.doc, clients) + ws.send(encoding.toUint8Array(encoder)) + } + } + + async handleAwarenessUpdate ({ added, updated, removed }: AwarenessUpdate, origin: any): Promise { + // broadcast awareness state + const clients = [...added, ...updated, ...removed] + const encoder = protocol.awarenessMessage(this.doc, clients) + await this.broadcastMessage(encoding.toUint8Array(encoder)) + + // persist awareness state + const state = this.doc.awareness.getLocalState() + await this.ctx.storage.put('awareness', state) + } + + async handleDocUpdate (update: Uint8Array, origin: any): Promise { + // save update + this.updates.push(update) + await this.ctx.storage.put('updates', [...this.updates]) + + await this.ctx.storage.setAlarm(Date.now() + PREFERRED_SAVE_INTERVAL) + if (this.updates.length > PREFERRED_SAVE_SIZE) { + void this.writeDocument() + } + + // broadcast update + const encoder = protocol.updateMessage(update, origin) + await this.broadcastMessage(encoding.toUint8Array(encoder), origin) + } + + async broadcastMessage (message: Uint8Array, origin?: any): Promise { + const wss = this.ctx + .getWebSockets() + .filter((ws) => ws !== origin) + .filter((ws) => ws.readyState === WebSocket.OPEN) + const promises = wss.map(async (ws) => { + await this.sendMessage(ws, message) + }) + await Promise.all(promises) + } + + async sendMessage (ws: WebSocket, message: Uint8Array): Promise { + try { + ws.send(message) + } catch (error) { + console.error('Failed to send message:', error) + await this.handleClose(ws, 1011, 'error') + } + } + + async handleClose (ws: WebSocket, code: number, reason?: string): Promise { + const clients = this.ctx.getWebSockets().length + + try { + ws.close(code, reason) + } catch (err) { + console.error('Failed to close WebSocket:', err) + } + + this.doc.removeConnection(ws) + + // last client disconnected, write document + if (clients === 1) { + await this.writeDocument() + } + } + + async readDocument (): Promise { + console.log('reading document from storage') + + // restore document state from storage or datalake + const { workspaceId, documentId } = parseDocumentName(this.documentId) + + // find the blob id containing last version + const versions = await this.ctx.storage.list({ prefix: 'version-', reverse: true, limit: 1 }) + const blobId = versions.values().next().value ?? documentId + + try { + console.log('loading from datalake', workspaceId, documentId, blobId) + const buffer = await this.env.DATALAKE.getBlob(workspaceId, blobId) + applyUpdate(this.doc, new Uint8Array(buffer)) + + console.log('loaded from datalake', workspaceId, documentId, blobId) + } catch (err) { + console.error('loading from datalake error', workspaceId, documentId, blobId, err) + // the blob might be missing, ignore errors + } + + // restore cached updates + const updates = await this.ctx.storage.get>('updates') + if (updates !== undefined && updates.length > 0) { + console.log('- restore updates', updates.length) + this.doc.transact(() => { + updates.forEach((update) => { + applyUpdate(this.doc, update) + this.updates.push(update) + }) + }) + } + + // restore awareness state + const awareness = await this.ctx.storage.get>('awareness') + if (awareness !== undefined) { + console.log('- restore awareness', awareness) + this.doc.awareness.setLocalState(awareness) + } + } + + async writeDocument (): Promise { + await this.ctx.storage.deleteAlarm() + + console.log('saving document to storage') + + const updates = this.updates + this.updates = [] + + if (updates.length === 0) { + console.log('no document updates to save') + return + } + + try { + const { workspaceId, documentId } = parseDocumentName(this.documentId) + const versionId = nextVersionId() + const blobId = datalakeBlobId(documentId, versionId) + + const update = encodeStateAsUpdate(this.doc) + await this.env.DATALAKE.putBlob(workspaceId, blobId, new Uint8Array(update), 'application/ydoc') + + void this.ctx.storage.put('updates', []) + void this.ctx.storage.put('version-' + versionId, blobId) + void this.ctx.storage.put('versionId', versionId) + + console.log('saved document', documentId, versionId, blobId) + } catch (error) { + // save failed, restore updates + console.error('Failed to save document:', error) + this.updates.push(...updates) + } + } +} + +function nextVersionId (): number { + return Date.now() +} + +function datalakeBlobId (documentId: string, version: number): string { + return `${documentId}-${version}` +} diff --git a/workers/collaborator/src/document.ts b/workers/collaborator/src/document.ts new file mode 100644 index 00000000000..665534092f1 --- /dev/null +++ b/workers/collaborator/src/document.ts @@ -0,0 +1,66 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { Doc as YDoc } from 'yjs' +import { Awareness, removeAwarenessStates } from 'y-protocols/awareness' +import { type AwarenessUpdate } from './types' + +interface SessionState { + clients: Set +} + +export class Document extends YDoc { + awareness: Awareness + sessions: Map + + constructor () { + super({ gc: false }) + + this.sessions = new Map() + + this.awareness = new Awareness(this) + this.awareness.setLocalState(null) + + this.awareness.on('update', this.handleAwarenessUpdate.bind(this)) + } + + addConnection (ws: WebSocket): void { + const state = ws.deserializeAttachment() ?? { clients: new Set() } + this.sessions.set(ws, state) + } + + removeConnection (ws: WebSocket): void { + const state = this.sessions.get(ws) + if (state !== undefined && state.clients.size > 0) { + removeAwarenessStates(this.awareness, Array.from(state.clients), null) + } + + this.sessions.delete(ws) + } + + private handleAwarenessUpdate ({ added, removed }: AwarenessUpdate, origin: any): void { + if (origin == null || !(origin instanceof WebSocket)) return + + if (added.length > 0 || removed.length > 0) { + const state = this.sessions.get(origin) + if (state !== undefined) { + added.forEach((client) => state.clients.add(client)) + removed.forEach((client) => state.clients.delete(client)) + + origin.serializeAttachment(state) + } + } + } +} diff --git a/workers/collaborator/src/env.ts b/workers/collaborator/src/env.ts new file mode 100644 index 00000000000..4d81e0f7ed5 --- /dev/null +++ b/workers/collaborator/src/env.ts @@ -0,0 +1,22 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import type DatalakeWorker from '@hcengineering/cloud-datalake' +import { type Collaborator } from './collaborator' + +export interface Env { + COLLABORATOR: DurableObjectNamespace + DATALAKE: Service +} diff --git a/workers/collaborator/src/index.ts b/workers/collaborator/src/index.ts new file mode 100644 index 00000000000..cef1f9b6ad0 --- /dev/null +++ b/workers/collaborator/src/index.ts @@ -0,0 +1,71 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { type IRequestStrict, type RequestHandler, Router, cors, error, html } from 'itty-router' +import { type Env } from './env' +import { type DocumentRequest } from './types' + +export { Collaborator } from './collaborator' + +const { preflight, corsify } = cors({ maxAge: 86400 }) + +const withDocumentId: RequestHandler = (request) => { + if (request.params.id === undefined || request.params.id === '') { + return error(400, 'Missing document id') + } + request.documentId = decodeURIComponent(request.params.id) +} + +const router = Router() + .options('*', preflight) + .get('/:id', withDocumentId, (request, env) => { + const { documentId, headers } = request + if (headers.get('Upgrade') !== 'websocket') { + return new Response('Expected header Upgrade: websocket', { status: 426 }) + } + + const id = env.COLLABORATOR.idFromName(documentId) + const stub = env.COLLABORATOR.get(id) + + return stub.fetch(request) + }) + .post('/rpc/:id', withDocumentId, async (request, env) => { + const { documentId } = request + + const id = env.COLLABORATOR.idFromName(documentId) + const stub = env.COLLABORATOR.get(id) + + return stub.fetch(request) + }) + .all('/', () => + html( + `Huly® Collaborator™ https://huly.io + © 2024 Huly Labs` + ) + ) + .all('*', () => new Response('Not found', { status: 404 })) + +export default { + async fetch (request: Request, env: Env): Promise { + return await router + .fetch(request, env) + .catch(error) + .then((response) => { + // workaround for "Can't modify immutable headers" error + // see https://github.com/kwhitley/itty-router/issues/242 + return corsify(new Response(response.body, response)) + }) + } +} satisfies ExportedHandler diff --git a/workers/collaborator/src/protocol.ts b/workers/collaborator/src/protocol.ts new file mode 100644 index 00000000000..91ab33c3539 --- /dev/null +++ b/workers/collaborator/src/protocol.ts @@ -0,0 +1,73 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import * as decoding from 'lib0/decoding' +import * as encoding from 'lib0/encoding' +import * as awarenessProtocol from 'y-protocols/awareness' +import * as syncProtocol from 'y-protocols/sync' +import { type Document } from './document' + +export enum MessageType { + Sync = 0, + Awareness = 1 +} + +export function forceSyncMessage (doc: Document): encoding.Encoder { + const encoder = encoding.createEncoder() + + encoding.writeVarUint(encoder, MessageType.Sync) + syncProtocol.writeSyncStep1(encoder, doc) + + return encoder +} + +export function updateMessage (update: Uint8Array, origin: any): encoding.Encoder { + const encoder = encoding.createEncoder() + + encoding.writeVarUint(encoder, MessageType.Sync) + syncProtocol.writeUpdate(encoder, update) + + return encoder +} + +export function awarenessMessage (doc: Document, clients: Array): encoding.Encoder { + const encoder = encoding.createEncoder() + + encoding.writeVarUint(encoder, MessageType.Awareness) + encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(doc.awareness, clients)) + + return encoder +} + +export function handleMessage (doc: Document, message: Uint8Array, origin: any): encoding.Encoder { + const encoder = encoding.createEncoder() + const decoder = decoding.createDecoder(message) + const messageType = decoding.readVarUint(decoder) + + switch (messageType) { + case MessageType.Sync: + encoding.writeVarUint(encoder, MessageType.Sync) + syncProtocol.readSyncMessage(decoder, encoder, doc, origin) + break + + case MessageType.Awareness: + awarenessProtocol.applyAwarenessUpdate(doc.awareness, decoding.readVarUint8Array(decoder), origin) + break + + default: + throw new Error('Unknown message type') + } + return encoder +} diff --git a/workers/collaborator/src/types.ts b/workers/collaborator/src/types.ts new file mode 100644 index 00000000000..6085d5a8f13 --- /dev/null +++ b/workers/collaborator/src/types.ts @@ -0,0 +1,42 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { type IRequestStrict } from 'itty-router' + +export type DocumentRequest = { + documentId: string +} & IRequestStrict + +// https://github.com/yjs/y-protocols/blob/master/awareness.js#L134 +export interface AwarenessUpdate { + added: Array + updated: Array + removed: Array +} + +export type RpcRequest = RpcGetContentRequest | RpcUpdateContentRequest + +export interface RpcGetContentRequest { + method: 'getContent' +} + +export interface RpcUpdateContentRequest { + method: 'updateContent' + payload: RpcUpdateContentPayload +} + +export interface RpcUpdateContentPayload { + content: Record +} diff --git a/workers/collaborator/src/utils.ts b/workers/collaborator/src/utils.ts new file mode 100644 index 00000000000..5a6c9d0c0d8 --- /dev/null +++ b/workers/collaborator/src/utils.ts @@ -0,0 +1,30 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +/** + * For the sake of consistency and backwards compatibility, document name has the same format + * as in Huly collaborator: "workspaceId://documentId:HEAD". + */ +export function parseDocumentName (name: string): { workspaceId: string, documentId: string, versionId: string } { + const parts = name.split('://') + if (parts.length !== 2) { + throw new Error('Malformed document id') + } + + const workspaceId = parts[0] + const [documentId, versionId] = parts[1].split(':', 2) + + return { workspaceId, documentId, versionId } +} diff --git a/workers/collaborator/src/ydoc.ts b/workers/collaborator/src/ydoc.ts new file mode 100644 index 00000000000..c9c210fe8cb --- /dev/null +++ b/workers/collaborator/src/ydoc.ts @@ -0,0 +1,89 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { type Doc as YDoc, XmlElement as YXmlElement, XmlText as YXmlText } from 'yjs' +import { yDocToProsemirrorJSON } from 'y-prosemirror' + +/** ProseMirror Mark JSON representation */ +export interface JMark { + type: string + attrs?: Record +} + +/** ProseMirror Node JSON representation */ +interface JNode { + type: string + content?: JNode[] + marks?: JMark[] + attrs?: Record + text?: string +} + +/** + * Converts YDoc to ProseMirror JSON object + * @param ydoc YDoc + * @param field YDoc field name + * */ +export function yDocToJSON (ydoc: YDoc, field: string): Record { + return yDocToProsemirrorJSON(ydoc, field) +} + +/** + * Converts ProseMirror JSON object to YDoc without ProseMirror schema + * @param json ProseMirror JSON object + * @param field YDoc field name + * */ +export function jsonToYDoc (json: Record, ydoc: YDoc, field: string): void { + const nodes = json.type === 'doc' ? json.content ?? [] : [json] + const content = nodes.map(nodeToYXmlElement) + + const fragment = ydoc.getXmlFragment(field) + fragment.delete(0, fragment.length) + fragment.push(content) +} + +/** Convert ProseMirror JSON Node representation to YXmlElement */ +function nodeToYXmlElement (node: JNode): YXmlElement | YXmlText { + const elem = node.type === 'text' ? new YXmlText() : new YXmlElement(node.type) + + if (elem instanceof YXmlElement) { + if (node.content !== undefined && node.content.length > 0) { + const content = node.content.map(nodeToYXmlElement) + elem.push(content) + } + } else { + // https://github.com/yjs/y-prosemirror/blob/master/src/plugins/sync-plugin.js#L777 + const attributes: Record = {} + if (node.marks !== undefined) { + node.marks.forEach((mark) => { + attributes[mark.type] = mark.attrs ?? {} + }) + } + elem.applyDelta([ + { + insert: node.text ?? '', + attributes + } + ]) + } + + if (node.attrs !== undefined) { + Object.entries(node.attrs).forEach(([key, value]) => { + elem.setAttribute(key, value) + }) + } + + return elem +} diff --git a/workers/collaborator/tsconfig copy.json b/workers/collaborator/tsconfig copy.json new file mode 100644 index 00000000000..da8672e6cbc --- /dev/null +++ b/workers/collaborator/tsconfig copy.json @@ -0,0 +1,12 @@ +{ + "extends": "./node_modules/@hcengineering/platform-rig/profiles/default/tsconfig.json", + + "compilerOptions": { + "rootDir": "./src", + "outDir": "./lib", + "declarationDir": "./types", + "tsBuildInfoFile": ".build/build.tsbuildinfo", + "types": ["@cloudflare/workers-types", "jest"], + "lib": ["esnext"] + } +} \ No newline at end of file diff --git a/workers/collaborator/tsconfig.json b/workers/collaborator/tsconfig.json new file mode 100644 index 00000000000..da8672e6cbc --- /dev/null +++ b/workers/collaborator/tsconfig.json @@ -0,0 +1,12 @@ +{ + "extends": "./node_modules/@hcengineering/platform-rig/profiles/default/tsconfig.json", + + "compilerOptions": { + "rootDir": "./src", + "outDir": "./lib", + "declarationDir": "./types", + "tsBuildInfoFile": ".build/build.tsbuildinfo", + "types": ["@cloudflare/workers-types", "jest"], + "lib": ["esnext"] + } +} \ No newline at end of file diff --git a/workers/collaborator/worker-configuration.d.ts b/workers/collaborator/worker-configuration.d.ts new file mode 100644 index 00000000000..b004d064f6a --- /dev/null +++ b/workers/collaborator/worker-configuration.d.ts @@ -0,0 +1,6 @@ +// Generated by Wrangler by running `wrangler types` + +interface Env { + COLLABORATOR: DurableObjectNamespace; + DATALAKE: Fetcher; +} diff --git a/workers/collaborator/wrangler.toml b/workers/collaborator/wrangler.toml new file mode 100644 index 00000000000..547576fbe4b --- /dev/null +++ b/workers/collaborator/wrangler.toml @@ -0,0 +1,32 @@ +#:schema node_modules/wrangler/config-schema.json +name = "collaborator-worker" +main = "src/index.ts" +compatibility_date = "2024-07-01" +compatibility_flags = ["nodejs_compat"] +keep_vars = true + +[[services]] +binding = "DATALAKE" +service = "datalake-worker" + +[[durable_objects.bindings]] +name = "COLLABORATOR" +class_name = "Collaborator" + +[[migrations]] +tag = "v1" +new_classes = ["Collaborator"] + +[env.staging] +name = "collaborator-worker-staging" + +services = [ + { binding = "DATALAKE", service = "datalake-worker-staging" } +] + +[env.dev] +name = "collaborator-worker-dev" + +services = [ + { binding = "DATALAKE", service = "datalake-worker-dev" } +] From 77fc5554ec1dcad1480ab2343844486994c7548f Mon Sep 17 00:00:00 2001 From: Alexander Onnikov Date: Wed, 18 Dec 2024 16:01:53 +0700 Subject: [PATCH 2/9] refactor collaborator Signed-off-by: Alexander Onnikov --- dev/docker-compose.yaml | 3 +- .../src/provider/cloud.ts | 6 +- .../src/provider/utils.ts | 2 +- workers/collaborator/package.json | 1 + workers/collaborator/src/collaborator.ts | 375 ++++++++++++------ workers/collaborator/src/connection.ts | 31 ++ workers/collaborator/src/index.ts | 24 +- workers/collaborator/src/metrics.ts | 110 +++-- workers/collaborator/src/types.ts | 22 +- workers/collaborator/src/utils.ts | 33 +- workers/collaborator/wrangler.toml | 30 +- workers/datalake/src/blob.ts | 7 +- workers/datalake/src/index.ts | 14 +- workers/datalake/src/s3.ts | 8 +- 14 files changed, 453 insertions(+), 213 deletions(-) create mode 100644 workers/collaborator/src/connection.ts diff --git a/dev/docker-compose.yaml b/dev/docker-compose.yaml index 11bdd463554..851e948e80d 100644 --- a/dev/docker-compose.yaml +++ b/dev/docker-compose.yaml @@ -239,7 +239,8 @@ services: - CALENDAR_URL=http://host.docker.internal:8095 - TELEGRAM_URL=http://host.docker.internal:8086 - REKONI_URL=http://host.docker.internal:4004 - - COLLABORATOR_URL=ws://host.docker.internal:3078 + - COLLABORATOR=cloud + - COLLABORATOR_URL=wss://cl.huly.net - STORAGE_CONFIG=datalake|https://huly.net - GITHUB_URL=http://host.docker.internal:3500 - PRINT_URL=http://host.docker.internal:4005 diff --git a/plugins/text-editor-resources/src/provider/cloud.ts b/plugins/text-editor-resources/src/provider/cloud.ts index 76136f15908..780e3206fcf 100644 --- a/plugins/text-editor-resources/src/provider/cloud.ts +++ b/plugins/text-editor-resources/src/provider/cloud.ts @@ -24,14 +24,14 @@ export interface DatalakeCollabProviderParameters { token: string document: YDoc - content: Ref | null + source: Ref | null } export class CloudCollabProvider extends WebsocketProvider implements Provider { readonly loaded: Promise - constructor ({ document, url, name, content }: DatalakeCollabProviderParameters) { - const params = content != null ? { content } : undefined + constructor ({ document, url, name, source }: DatalakeCollabProviderParameters) { + const params = source != null ? { source } : undefined super(url, encodeURIComponent(name), document, { params }) diff --git a/plugins/text-editor-resources/src/provider/utils.ts b/plugins/text-editor-resources/src/provider/utils.ts index f075289587b..2fb20d350ab 100644 --- a/plugins/text-editor-resources/src/provider/utils.ts +++ b/plugins/text-editor-resources/src/provider/utils.ts @@ -48,7 +48,7 @@ export function createRemoteProvider (ydoc: Ydoc, doc: CollaborativeDoc, content url: collaboratorUrl, name: documentId, document: ydoc, - content, + source: content, token }) : new HocuspocusCollabProvider({ diff --git a/workers/collaborator/package.json b/workers/collaborator/package.json index 7cb77ed9783..b6c3d1588ce 100644 --- a/workers/collaborator/package.json +++ b/workers/collaborator/package.json @@ -34,6 +34,7 @@ "eslint-plugin-promise": "^6.1.1", "eslint": "^8.54.0", "@types/jest": "^29.5.5", + "@types/node": "~20.11.16", "@hcengineering/cloud-datalake": "^0.6.0" }, "dependencies": { diff --git a/workers/collaborator/src/collaborator.ts b/workers/collaborator/src/collaborator.ts index b2440ab61b5..6c30f631b54 100644 --- a/workers/collaborator/src/collaborator.ts +++ b/workers/collaborator/src/collaborator.ts @@ -14,22 +14,24 @@ // import { DurableObject } from 'cloudflare:workers' -import { type RouterType, Router, error } from 'itty-router' +import { type RouterType, type IRequest, Router, error } from 'itty-router' import * as encoding from 'lib0/encoding' import { applyUpdate, encodeStateAsUpdate } from 'yjs' import { Document } from './document' import { type Env } from './env' +import { ConsoleLogger, type MetricsContext, withMetrics } from './metrics' import * as protocol from './protocol' import type { AwarenessUpdate, - DocumentRequest, + RpcCreateContentRequest, RpcGetContentRequest, RpcRequest, RpcUpdateContentRequest } from './types' +import { decodeDocumentId, extractStrParam, jsonBlobId, ydocBlobId } from './utils' import { jsonToYDoc, yDocToJSON } from './ydoc' -import { parseDocumentName } from './utils' +import { ConnectionManager } from './connection' export const PREFERRED_SAVE_SIZE = 500 export const PREFERRED_SAVE_INTERVAL = 30 * 1000 @@ -44,71 +46,132 @@ export const PREFERRED_SAVE_INTERVAL = 30 * 1000 * - updates: Uint8Array[]: list of pending updates to be saved */ export class Collaborator extends DurableObject { + private readonly logger = new ConsoleLogger() + private readonly connections: ConnectionManager private readonly router: RouterType private readonly doc: Document - private updates: Uint8Array[] + private readonly updates: Uint8Array[] private documentId: string = '' + private source: string = '' private hydrated: boolean = false constructor (ctx: DurableObjectState, env: Env) { super(ctx, env) + this.connections = new ConnectionManager(this.ctx) + this.doc = new Document() + this.updates = [] + this.router = Router() - .get('/:id', async ({ documentId, headers }) => { - if (headers.get('Upgrade') !== 'websocket') { - return new Response('Expected header Upgrade: websocket', { status: 426 }) - } + .get('/:id', async (request) => { + return await withMetrics('connnect', (ctx) => { + return this.handleConnect(ctx, request) + }) + }) + .post('/rpc/:id', async (request, env) => { + return await withMetrics('rpc', (ctx) => { + return this.handleRpc(ctx, request) + }) + }) + } + + async fetch (request: Request): Promise { + return await this.router.fetch(request).catch(error) + } + + async handleConnect (ctx: MetricsContext, request: IRequest): Promise { + const documentId = decodeURIComponent(request.params.id) + const source = decodeURIComponent(extractStrParam(request.query.source) ?? '') + const headers = request.headers + + if (headers.get('Upgrade') !== 'websocket') { + return new Response('Expected header Upgrade: websocket', { status: 426 }) + } - const { 0: client, 1: server } = new WebSocketPair() - this.ctx.acceptWebSocket(server) - await this.handleSession(server, documentId) + const { 0: client, 1: server } = new WebSocketPair() + this.connections.accept(server) - return new Response(null, { status: 101, webSocket: client }) + await ctx.with('session', async (ctx) => { + await this.handleSession(ctx, server, documentId, source) }) - .post('/rpc/:id', async (request) => { - const rpc = await request.json() - switch (rpc.method) { - case 'getContent': - return this.handleRpcGetContent(rpc) - case 'updateContent': - return this.handleRpcUpdateContent(rpc) - default: - return Response.json({ error: 'Bad request' }, { status: 400 }) + + return new Response(null, { status: 101, webSocket: client }) + } + + async handleRpc (ctx: MetricsContext, request: IRequest): Promise { + const documentId = decodeURIComponent(request.params.id) + const rpc = await request.json() + + return await ctx.with(rpc.method, async (ctx) => { + try { + switch (rpc.method) { + case 'getContent': + return this.handleRpcGetContent(ctx, documentId, rpc) + case 'createContent': + return await this.handleRpcCreateContent(ctx, documentId, rpc) + case 'updateContent': + return this.handleRpcUpdateContent(ctx, documentId, rpc) + default: + return Response.json({ error: 'Bad request' }, { status: 400 }) + } + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + ctx.error('failed to perform rpc request', { error: message }) + return error(500) } }) - .all('*', () => error(404)) + } - this.doc = new Document() - this.updates = [] + handleRpcGetContent (ctx: MetricsContext, id: string, request: RpcGetContentRequest): Response { + const content: Record = {} - void this.hydrate() + ctx.withSync('ydoc.read', () => { + for (const field of this.doc.share.keys()) { + content[field] = JSON.stringify(yDocToJSON(this.doc, field)) + } + }) + + return Response.json({ content }, { status: 200 }) } - handleRpcGetContent (request: RpcGetContentRequest): Response { + async handleRpcCreateContent (ctx: MetricsContext, id: string, request: RpcCreateContentRequest): Promise { + const documentId = decodeDocumentId(id) const content: Record = {} - for (const field of this.doc.share.keys()) { - content[field] = JSON.stringify(yDocToJSON(this.doc, field)) + + ctx.withSync('ydoc.write', () => { + this.doc.transact(() => { + Object.entries(request.payload.content).forEach(([field, value]) => { + jsonToYDoc(JSON.parse(value), this.doc, field) + }) + }) + }) + + for (const [field, value] of Object.entries(request.payload.content)) { + const blobId = jsonBlobId(documentId) + await ctx.with('datalake.putBlob', async () => { + await this.env.DATALAKE.putBlob(documentId.workspaceId, blobId, value, 'application/json') + }) + content[field] = blobId } + return Response.json({ content }, { status: 200 }) } - handleRpcUpdateContent (request: RpcUpdateContentRequest): Response { - this.doc.transact(() => { - Object.entries(request.payload.content).forEach(([field, value]) => { - jsonToYDoc(JSON.parse(value), this.doc, field) + handleRpcUpdateContent (ctx: MetricsContext, id: string, request: RpcUpdateContentRequest): Response { + ctx.withSync('ydoc.write', () => { + this.doc.transact(() => { + Object.entries(request.payload.content).forEach(([field, value]) => { + jsonToYDoc(JSON.parse(value), this.doc, field) + }) }) }) return Response.json({}, { status: 200 }) } - async fetch (request: Request): Promise { - return await this.router.fetch(request).catch(error) - } - async webSocketMessage (ws: WebSocket, message: ArrayBuffer | string): Promise { if (typeof message === 'string') { - console.warn('Unexpected message type:', message) + this.logger.warn('unexpected message type', { message }) return } @@ -117,18 +180,19 @@ export class Collaborator extends DurableObject { if (encoding.length(encoder) > 1) { ws.send(encoding.toUint8Array(encoder)) } - } catch (error) { - console.error('WebSocket message error:', error) + } catch (err) { + const error = err instanceof Error ? err.message : String(err) + this.logger.error('WebSocket message error', { error }) } } async webSocketClose (ws: WebSocket, code: number, reason: string, wasClean: boolean): Promise { - console.log('WebSocket closed:', code, reason, wasClean) + this.logger.log('WebSocket closed', { code, reason, wasClean }) await this.handleClose(ws, 1000) } async webSocketError (ws: WebSocket, error: unknown): Promise { - console.error('WebSocket error:', error) + this.logger.error('WebSocket error', { error }) await this.handleClose(ws, 1011, 'error') } @@ -144,6 +208,7 @@ export class Collaborator extends DurableObject { await this.ctx.blockConcurrencyWhile(async () => { const documentId = (await this.ctx.storage.get('documentId')) ?? '' + const source = (await this.ctx.storage.get('source')) ?? '' if (documentId === '') { // name is not set, hydrate later @@ -151,41 +216,61 @@ export class Collaborator extends DurableObject { } this.documentId = documentId + this.source = source - // restore document state - await this.readDocument() + await withMetrics('hydrate', async (ctx) => { + await ctx.with('readDocument', async (ctx) => { + await this.readDocument(ctx) + }) - this.ctx.getWebSockets().forEach((ws: WebSocket) => { - this.doc.addConnection(ws) - }) + ctx.withSync('restoreConnections', () => { + const connections = this.connections.getConnections() + connections.forEach((ws: WebSocket) => { + this.doc.addConnection(ws) + }) + }) - // enable update listeners only after the document is restored - // eslint-disable-next-line @typescript-eslint/no-misused-promises - this.doc.on('update', this.handleDocUpdate.bind(this)) - this.doc.awareness.on('update', this.handleAwarenessUpdate.bind(this)) + ctx.withSync('restoreListeners', () => { + // enable update listeners only after the document is restored + // eslint-disable-next-line @typescript-eslint/no-misused-promises + this.doc.on('update', this.handleDocUpdate.bind(this)) + this.doc.awareness.on('update', this.handleAwarenessUpdate.bind(this)) + }) + }) this.hydrated = true }) } - async handleSession (ws: WebSocket, documentId: string): Promise { - if (this.documentId === '') { + async handleSession (ctx: MetricsContext, ws: WebSocket, documentId: string, source: string): Promise { + if (this.documentId !== documentId) { this.documentId = documentId await this.ctx.storage.put('documentId', documentId) } - await this.hydrate() + if (this.source !== source) { + this.source = source + await this.ctx.storage.put('source', source) + } - this.doc.addConnection(ws) + await ctx.with('hydrate', async () => { + await this.hydrate() + }) - const encoder = protocol.forceSyncMessage(this.doc) - ws.send(encoding.toUint8Array(encoder)) + this.doc.addConnection(ws) - const clients = Array.from(this.doc.awareness.states.keys()) - if (clients.length > 0) { - const encoder = protocol.awarenessMessage(this.doc, clients) + ctx.withSync('forceSync', () => { + const encoder = protocol.forceSyncMessage(this.doc) ws.send(encoding.toUint8Array(encoder)) - } + }) + + ctx.withSync('awareness', () => { + const clients = Array.from(this.doc.awareness.states.keys()) + if (clients.length > 0) { + const encoder = protocol.awarenessMessage(this.doc, clients) + ws.send(encoding.toUint8Array(encoder)) + } + }) } async handleAwarenessUpdate ({ added, updated, removed }: AwarenessUpdate, origin: any): Promise { @@ -215,8 +300,8 @@ export class Collaborator extends DurableObject { } async broadcastMessage (message: Uint8Array, origin?: any): Promise { - const wss = this.ctx - .getWebSockets() + const connections = this.connections.getConnections() + const wss = connections .filter((ws) => ws !== origin) .filter((ws) => ws.readyState === WebSocket.OPEN) const promises = wss.map(async (ws) => { @@ -228,19 +313,21 @@ export class Collaborator extends DurableObject { async sendMessage (ws: WebSocket, message: Uint8Array): Promise { try { ws.send(message) - } catch (error) { - console.error('Failed to send message:', error) + } catch (err) { + const error = err instanceof Error ? err.message : String(err) + this.logger.error('failed to send message', { error }) await this.handleClose(ws, 1011, 'error') } } async handleClose (ws: WebSocket, code: number, reason?: string): Promise { - const clients = this.ctx.getWebSockets().length + const clients = this.connections.count() try { ws.close(code, reason) } catch (err) { - console.error('Failed to close WebSocket:', err) + const error = err instanceof Error ? err.message : String(err) + this.logger.error('failed to close WebSocket', { error }) } this.doc.removeConnection(ws) @@ -251,85 +338,127 @@ export class Collaborator extends DurableObject { } } - async readDocument (): Promise { - console.log('reading document from storage') - + async readDocument (ctx: MetricsContext): Promise { // restore document state from storage or datalake - const { workspaceId, documentId } = parseDocumentName(this.documentId) + const source = this.source + const documentId = decodeDocumentId(this.documentId) + const workspaceId = documentId.workspaceId + + const blobId = ydocBlobId(documentId) - // find the blob id containing last version - const versions = await this.ctx.storage.list({ prefix: 'version-', reverse: true, limit: 1 }) - const blobId = versions.values().next().value ?? documentId + let loaded = false try { - console.log('loading from datalake', workspaceId, documentId, blobId) - const buffer = await this.env.DATALAKE.getBlob(workspaceId, blobId) - applyUpdate(this.doc, new Uint8Array(buffer)) + ctx.log('loading from datalake', { workspaceId, documentId, blobId }) + + await ctx.with('fromYdoc', async (ctx) => { + const buffer = await ctx.with('datalake.getBlob', () => { + return this.env.DATALAKE.getBlob(workspaceId, blobId) + }) + + ctx.withSync('applyUpdate', () => { + applyUpdate(this.doc, new Uint8Array(buffer)) + }) - console.log('loaded from datalake', workspaceId, documentId, blobId) + loaded = true + ctx.log('loaded from datalake', { workspaceId, documentId, blobId }) + }) } catch (err) { - console.error('loading from datalake error', workspaceId, documentId, blobId, err) // the blob might be missing, ignore errors + const error = err instanceof Error ? err.message : String(err) + ctx.error('loading from datalake error', { workspaceId, documentId, blobId, error }) } - // restore cached updates - const updates = await this.ctx.storage.get>('updates') - if (updates !== undefined && updates.length > 0) { - console.log('- restore updates', updates.length) - this.doc.transact(() => { - updates.forEach((update) => { - applyUpdate(this.doc, update) - this.updates.push(update) + if (!loaded && source !== '') { + try { + ctx.log('loading from datalake', { workspaceId, documentId, source }) + + await ctx.with('fromJson', async (ctx) => { + const buffer = await ctx.with('datalake.getBlob', () => { + return this.env.DATALAKE.getBlob(workspaceId, source) + }) + + ctx.withSync('jsonToYDoc', () => { + jsonToYDoc(JSON.parse(String(buffer)), this.doc, documentId.objectAttr) + }) + + ctx.log('loaded from datalake', { workspaceId, documentId, blobId }) }) - }) + } catch (err) { + // the blob might be missing, ignore errors + const error = err instanceof Error ? err.message : String(err) + ctx.error('loading from datalake error', { workspaceId, documentId, source, error }) + } } + // restore cached updates + await ctx.with('restore updates', async () => { + try { + const updates = await this.ctx.storage.get>('updates') + if (updates !== undefined && updates.length > 0) { + this.doc.transact(() => { + updates.forEach((update) => { + applyUpdate(this.doc, update) + this.updates.push(update) + }) + }) + } + } catch (err) { + const error = err instanceof Error ? err.message : String(err) + ctx.error('failed to restore updates', { workspaceId, documentId, error }) + } + }) + // restore awareness state - const awareness = await this.ctx.storage.get>('awareness') - if (awareness !== undefined) { - console.log('- restore awareness', awareness) - this.doc.awareness.setLocalState(awareness) - } + await ctx.with('restore awareness', async () => { + try { + const awareness = await this.ctx.storage.get>('awareness') + if (awareness !== undefined) { + this.doc.awareness.setLocalState(awareness) + } + } catch (err) { + const error = err instanceof Error ? err.message : String(err) + ctx.error('failed to restore awareness', { workspaceId, documentId, error }) + } + }) } async writeDocument (): Promise { await this.ctx.storage.deleteAlarm() - console.log('saving document to storage') - - const updates = this.updates - this.updates = [] - + const updates = this.updates.splice(0) if (updates.length === 0) { - console.log('no document updates to save') return } - try { - const { workspaceId, documentId } = parseDocumentName(this.documentId) - const versionId = nextVersionId() - const blobId = datalakeBlobId(documentId, versionId) - - const update = encodeStateAsUpdate(this.doc) - await this.env.DATALAKE.putBlob(workspaceId, blobId, new Uint8Array(update), 'application/ydoc') - - void this.ctx.storage.put('updates', []) - void this.ctx.storage.put('version-' + versionId, blobId) - void this.ctx.storage.put('versionId', versionId) - - console.log('saved document', documentId, versionId, blobId) - } catch (error) { - // save failed, restore updates - console.error('Failed to save document:', error) - this.updates.push(...updates) - } - } -} + await withMetrics('write document', async (ctx) => { + try { + const documentId = decodeDocumentId(this.documentId) + const workspaceId = documentId.workspaceId + + // save ydoc content + const update = ctx.withSync('ydoc.encodeStateAsUpdate', () => encodeStateAsUpdate(this.doc)) + await ctx.with('datalake.putBlob', async () => { + const blobId = ydocBlobId(documentId) + await this.env.DATALAKE.putBlob(workspaceId, blobId, new Uint8Array(update), 'application/ydoc') + ctx.log('saved ydoc content to datalake', { documentId, blobId }) + }) -function nextVersionId (): number { - return Date.now() -} + void this.ctx.storage.put('updates', []) -function datalakeBlobId (documentId: string, version: number): string { - return `${documentId}-${version}` + // save json snapshot + const blobId = jsonBlobId(documentId) + const markup = JSON.stringify(yDocToJSON(this.doc, documentId.objectAttr)) + await ctx.with('datalake.putBlob', async () => { + await this.env.DATALAKE.putBlob(workspaceId, blobId, markup, 'application/json') + ctx.log('saved json content to datalake', { documentId, blobId }) + }) + } catch (err) { + // save failed, restore updates + const error = err instanceof Error ? err.message : String(err) + ctx.error('failed to save document', { documentId: this.documentId, error }) + this.updates.unshift(...updates) + } + }) + } } diff --git a/workers/collaborator/src/connection.ts b/workers/collaborator/src/connection.ts new file mode 100644 index 00000000000..a34c3997360 --- /dev/null +++ b/workers/collaborator/src/connection.ts @@ -0,0 +1,31 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +export class ConnectionManager { + constructor (private readonly state: DurableObjectState) {} + + count (): number { + return this.state.getWebSockets().length + } + + accept (connection: WebSocket): void { + this.state.acceptWebSocket(connection) + } + + getConnections (): WebSocket[] { + const connections = this.state.getWebSockets() + return [...connections] + } +} diff --git a/workers/collaborator/src/index.ts b/workers/collaborator/src/index.ts index cef1f9b6ad0..e25ddef93e0 100644 --- a/workers/collaborator/src/index.ts +++ b/workers/collaborator/src/index.ts @@ -13,25 +13,19 @@ // limitations under the License. // -import { type IRequestStrict, type RequestHandler, Router, cors, error, html } from 'itty-router' +import { type IRequestStrict, Router, cors, error, html } from 'itty-router' import { type Env } from './env' -import { type DocumentRequest } from './types' export { Collaborator } from './collaborator' const { preflight, corsify } = cors({ maxAge: 86400 }) -const withDocumentId: RequestHandler = (request) => { - if (request.params.id === undefined || request.params.id === '') { - return error(400, 'Missing document id') - } - request.documentId = decodeURIComponent(request.params.id) -} - const router = Router() .options('*', preflight) - .get('/:id', withDocumentId, (request, env) => { - const { documentId, headers } = request + .get('/:id', async (request, env) => { + const { headers } = request + const documentId = decodeURIComponent(request.params.id) + if (headers.get('Upgrade') !== 'websocket') { return new Response('Expected header Upgrade: websocket', { status: 426 }) } @@ -39,15 +33,15 @@ const router = Router() const id = env.COLLABORATOR.idFromName(documentId) const stub = env.COLLABORATOR.get(id) - return stub.fetch(request) + return await stub.fetch(request) }) - .post('/rpc/:id', withDocumentId, async (request, env) => { - const { documentId } = request + .post('/rpc/:id', async (request, env) => { + const documentId = decodeURIComponent(request.params.id) const id = env.COLLABORATOR.idFromName(documentId) const stub = env.COLLABORATOR.get(id) - return stub.fetch(request) + return await stub.fetch(request) }) .all('/', () => html( diff --git a/workers/collaborator/src/metrics.ts b/workers/collaborator/src/metrics.ts index 5c800b879b8..0a44339138d 100644 --- a/workers/collaborator/src/metrics.ts +++ b/workers/collaborator/src/metrics.ts @@ -13,60 +13,115 @@ // limitations under the License. // -import { type Env } from './env' +export interface Logger { + log: (message: string, data?: Record) => void + warn: (message: string, data?: Record) => void + error: (message: string, data?: Record) => void + debug: (message: string, data?: Record) => void +} + +export class ConsoleLogger implements Logger { + log (message: string, data?: Record): void { + console.log({ message, ...data }) + } + + warn (message: string, data?: Record): void { + console.warn({ message, ...data }) + } + + error (message: string, data?: Record): void { + console.error({ message, ...data }) + } + + debug (message: string, data?: Record): void { + console.debug({ message, ...data }) + } +} -export async function withMetrics (name: string, fn: (ctx: MetricsContext) => Promise): Promise { - const ctx = new MetricsContext() +export async function withMetrics ( + name: string, + fn: (ctx: MetricsContext) => Promise, + logger?: Logger +): Promise { + logger ??= new ConsoleLogger() + const ctx = new MetricsContext(logger) const start = performance.now() try { return await fn(ctx) + } catch (err: any) { + logger.error(err instanceof Error ? err.message : String(err)) + throw err } finally { const total = performance.now() - start const ops = ctx.metrics const message = `${name} total=${total} ` + ctx.toString() - console.log({ message, total, ops }) + logger.log(message, { total, ops }) } } -export interface MetricsData { +interface MetricsData { op: string time: number + error?: string + children?: MetricsData[] } export class MetricsContext { - metrics: Array = [] + readonly metrics: Array = [] + + constructor (private readonly logger: Logger) {} - debug (...data: any[]): void { - console.debug(...data) + debug (message: string, data?: Record): void { + this.logger.debug(message, data) } - log (...data: any[]): void { - console.log(...data) + log (message: string, data?: Record): void { + this.logger.log(message, data) } - error (...data: any[]): void { - console.error(...data) + warn (message: string, data?: Record): void { + this.logger.warn(message, data) } - async with(op: string, fn: () => Promise): Promise { + error (message: string, data?: Record): void { + this.logger.error(message, data) + } + + async with(op: string, fn: (ctx: MetricsContext) => Promise): Promise { + const ctx = new MetricsContext(this.logger) const start = performance.now() + + let error: string | undefined + try { - return await fn() + return await fn(ctx) + } catch (err: any) { + error = err instanceof Error ? err.message : String(err) + throw err } finally { const time = performance.now() - start - this.metrics.push({ op, time }) + const children = ctx.metrics + this.metrics.push(error !== undefined ? { op, time, error, children } : { op, time, children }) } } - withSync(op: string, fn: () => T): T { + withSync(op: string, fn: (ctx: MetricsContext) => T): T { + const ctx = new MetricsContext(this.logger) const start = performance.now() + + let error: string | undefined + try { - return fn() + return fn(ctx) + } catch (err: any) { + error = err instanceof Error ? err.message : String(err) + throw err } finally { const time = performance.now() - start - this.metrics.push({ op, time }) + const children = ctx.metrics + this.metrics.push(error !== undefined ? { op, time, error, children } : { op, time, children }) } } @@ -74,22 +129,3 @@ export class MetricsContext { return this.metrics.map((p) => `${p.op}=${p.time}`).join(' ') } } - -export class LoggedDatalake { - constructor ( - private readonly datalake: Env['DATALAKE'], - private readonly ctx: MetricsContext - ) {} - - async getBlob (workspace: string, name: string): Promise { - return await this.ctx.with('datalake.getBlob', () => { - return this.datalake.getBlob(workspace, name) - }) - } - - async putBlob (workspace: string, name: string, data: ArrayBuffer | Blob | string, type: string): Promise { - await this.ctx.with('datalake.putBlob', () => { - return this.datalake.putBlob(workspace, name, data, type) - }) - } -} diff --git a/workers/collaborator/src/types.ts b/workers/collaborator/src/types.ts index 6085d5a8f13..ac102013f5a 100644 --- a/workers/collaborator/src/types.ts +++ b/workers/collaborator/src/types.ts @@ -13,12 +13,6 @@ // limitations under the License. // -import { type IRequestStrict } from 'itty-router' - -export type DocumentRequest = { - documentId: string -} & IRequestStrict - // https://github.com/yjs/y-protocols/blob/master/awareness.js#L134 export interface AwarenessUpdate { added: Array @@ -26,10 +20,16 @@ export interface AwarenessUpdate { removed: Array } -export type RpcRequest = RpcGetContentRequest | RpcUpdateContentRequest +export type RpcRequest = RpcGetContentRequest | RpcCreateContentRequest | RpcUpdateContentRequest export interface RpcGetContentRequest { method: 'getContent' + payload: RpcGetContentPayload +} + +export interface RpcCreateContentRequest { + method: 'createContent' + payload: RpcCreateContentPayload } export interface RpcUpdateContentRequest { @@ -37,6 +37,14 @@ export interface RpcUpdateContentRequest { payload: RpcUpdateContentPayload } +export interface RpcGetContentPayload { + source?: string +} + +export interface RpcCreateContentPayload { + content: Record +} + export interface RpcUpdateContentPayload { content: Record } diff --git a/workers/collaborator/src/utils.ts b/workers/collaborator/src/utils.ts index 5a6c9d0c0d8..a9e5b94b7fb 100644 --- a/workers/collaborator/src/utils.ts +++ b/workers/collaborator/src/utils.ts @@ -13,18 +13,31 @@ // limitations under the License. // -/** - * For the sake of consistency and backwards compatibility, document name has the same format - * as in Huly collaborator: "workspaceId://documentId:HEAD". - */ -export function parseDocumentName (name: string): { workspaceId: string, documentId: string, versionId: string } { - const parts = name.split('://') - if (parts.length !== 2) { +export interface DocumentId { + workspaceId: string + objectClass: string + objectId: string + objectAttr: string +} + +export function decodeDocumentId (name: string): DocumentId { + const [workspaceId, objectClass, objectId, objectAttr] = name.split('|') + if (workspaceId == null || objectClass == null || objectId == null || objectAttr == null) { throw new Error('Malformed document id') } + return { workspaceId, objectClass, objectId, objectAttr } +} - const workspaceId = parts[0] - const [documentId, versionId] = parts[1].split(':', 2) +export function ydocBlobId ({ objectId, objectAttr }: DocumentId): string { + // generate ydoc blob id compatible with platform collaborator + return `${objectId}%${objectAttr}` +} + +export function jsonBlobId ({ objectId, objectAttr }: DocumentId): string { + // generate ydoc json id compatible with platform collaborator + return [objectId, objectAttr, Date.now()].join('-') +} - return { workspaceId, documentId, versionId } +export function extractStrParam (value: string | string[] | undefined): string | undefined { + return Array.isArray(value) ? value[0] : value } diff --git a/workers/collaborator/wrangler.toml b/workers/collaborator/wrangler.toml index 547576fbe4b..1624911561a 100644 --- a/workers/collaborator/wrangler.toml +++ b/workers/collaborator/wrangler.toml @@ -1,7 +1,7 @@ #:schema node_modules/wrangler/config-schema.json name = "collaborator-worker" main = "src/index.ts" -compatibility_date = "2024-07-01" +compatibility_date = "2024-11-11" compatibility_flags = ["nodejs_compat"] keep_vars = true @@ -17,6 +17,10 @@ class_name = "Collaborator" tag = "v1" new_classes = ["Collaborator"] +[observability] +enabled = true +head_sampling_rate = 1 + [env.staging] name = "collaborator-worker-staging" @@ -24,9 +28,33 @@ services = [ { binding = "DATALAKE", service = "datalake-worker-staging" } ] +[[env.staging.durable_objects.bindings]] +name = "COLLABORATOR" +class_name = "Collaborator" + +[[env.staging.migrations]] +tag = "v1" +new_classes = ["Collaborator"] + +[env.staging.observability] +enabled = true +head_sampling_rate = 1 + [env.dev] name = "collaborator-worker-dev" services = [ { binding = "DATALAKE", service = "datalake-worker-dev" } ] + +[[env.dev.durable_objects.bindings]] +name = "COLLABORATOR" +class_name = "Collaborator" + +[[env.dev.migrations]] +tag = "v1" +new_classes = ["Collaborator"] + +[env.dev.observability] +enabled = true +head_sampling_rate = 1 diff --git a/workers/datalake/src/blob.ts b/workers/datalake/src/blob.ts index fd4782cb6e7..fa0751ab991 100644 --- a/workers/datalake/src/blob.ts +++ b/workers/datalake/src/blob.ts @@ -195,7 +195,7 @@ export async function handleUploadFormData ( const { name, type, lastModified } = file try { const metadata = await withPostgres(env, ctx, metrics, (db) => { - return saveBlob(env, db, file.stream(), file.size, type, workspace, name, lastModified) + return saveBlob(env, db, file.stream(), workspace, name, { type, size: file.size, lastModified }) }) // TODO this probably should happen via queue, let it be here for now @@ -220,14 +220,13 @@ export async function saveBlob ( env: Env, db: BlobDB, stream: ReadableStream, - size: number, - type: string, workspace: string, name: string, - lastModified: number + metadata: Omit ): Promise { const { location, bucket } = selectStorage(env, workspace) + const { size, type, lastModified } = metadata const httpMetadata = { contentType: type, cacheControl, lastModified } const filename = getUniqueFilename() diff --git a/workers/datalake/src/index.ts b/workers/datalake/src/index.ts index 012b547795d..c4a42172224 100644 --- a/workers/datalake/src/index.ts +++ b/workers/datalake/src/index.ts @@ -14,7 +14,7 @@ // import { WorkerEntrypoint } from 'cloudflare:workers' -import { type IRequest, type IRequestStrict, type RequestHandler, Router, error, html } from 'itty-router' +import { type IRequestStrict, type RequestHandler, Router, error, html } from 'itty-router' import { handleBlobDelete, handleBlobGet, handleBlobHead, handleBlobList, handleUploadFormData } from './blob' import { cors } from './cors' @@ -91,7 +91,7 @@ router .all('*', () => error(404)) export default class DatalakeWorker extends WorkerEntrypoint { - async fetch (request: IRequest): Promise { + async fetch (request: Request): Promise { const start = performance.now() const context = new MetricsContext() @@ -117,8 +117,8 @@ export default class DatalakeWorker extends WorkerEntrypoint { } async getBlob (workspace: string, name: string): Promise { - const request = new Request(`https://datalake/blob/${workspace}/${name}`) - const response = await router.fetch(request) + const request = new Request(`https://datalake/blob/${workspace}/${encodeURIComponent(name)}`) + const response = await this.fetch(request) if (!response.ok) { console.error({ error: 'datalake error: ' + response.statusText, workspace, name }) @@ -129,13 +129,13 @@ export default class DatalakeWorker extends WorkerEntrypoint { } async putBlob (workspace: string, name: string, data: ArrayBuffer | Blob | string, type: string): Promise { - const request = new Request(`https://datalake/upload/form-data/${workspace}`) - const body = new FormData() const blob = new Blob([data], { type }) body.set('file', blob, name) - const response = await router.fetch(request, { method: 'POST', body }) + const request = new Request(`https://datalake/upload/form-data/${workspace}`, { method: 'POST', body }) + + const response = await this.fetch(request) if (!response.ok) { console.error({ error: 'datalake error: ' + response.statusText, workspace, name }) diff --git a/workers/datalake/src/s3.ts b/workers/datalake/src/s3.ts index 2f1e1d1f47e..4c59c8773f2 100644 --- a/workers/datalake/src/s3.ts +++ b/workers/datalake/src/s3.ts @@ -64,14 +64,14 @@ export async function handleS3Blob ( return error(400) } - const contentType = object.headers.get('content-type') ?? 'application/octet-stream' + const type = object.headers.get('content-type') ?? 'application/octet-stream' const contentLengthHeader = object.headers.get('content-length') ?? '0' const lastModifiedHeader = object.headers.get('last-modified') - const contentLength = Number.parseInt(contentLengthHeader) + const size = Number.parseInt(contentLengthHeader) const lastModified = lastModifiedHeader !== null ? new Date(lastModifiedHeader).getTime() : Date.now() - const result = await saveBlob(env, db, object.body, contentLength, contentType, workspace, name, lastModified) - return json(result) + const metadata = await saveBlob(env, db, object.body, workspace, name, { lastModified, size, type }) + return json(metadata) }) } From 1f7a73711c9835bde0692d07d6f8a6bfef79433a Mon Sep 17 00:00:00 2001 From: Alexander Onnikov Date: Wed, 18 Dec 2024 17:21:30 +0700 Subject: [PATCH 3/9] move broadcast to document Signed-off-by: Alexander Onnikov --- workers/collaborator/src/collaborator.ts | 64 +------------- workers/collaborator/src/connection.ts | 31 ------- workers/collaborator/src/document.ts | 107 +++++++++++++++++++---- 3 files changed, 93 insertions(+), 109 deletions(-) delete mode 100644 workers/collaborator/src/connection.ts diff --git a/workers/collaborator/src/collaborator.ts b/workers/collaborator/src/collaborator.ts index 6c30f631b54..3ee0d24973f 100644 --- a/workers/collaborator/src/collaborator.ts +++ b/workers/collaborator/src/collaborator.ts @@ -15,13 +15,11 @@ import { DurableObject } from 'cloudflare:workers' import { type RouterType, type IRequest, Router, error } from 'itty-router' -import * as encoding from 'lib0/encoding' import { applyUpdate, encodeStateAsUpdate } from 'yjs' import { Document } from './document' import { type Env } from './env' import { ConsoleLogger, type MetricsContext, withMetrics } from './metrics' -import * as protocol from './protocol' import type { AwarenessUpdate, RpcCreateContentRequest, @@ -31,7 +29,6 @@ import type { } from './types' import { decodeDocumentId, extractStrParam, jsonBlobId, ydocBlobId } from './utils' import { jsonToYDoc, yDocToJSON } from './ydoc' -import { ConnectionManager } from './connection' export const PREFERRED_SAVE_SIZE = 500 export const PREFERRED_SAVE_INTERVAL = 30 * 1000 @@ -47,7 +44,6 @@ export const PREFERRED_SAVE_INTERVAL = 30 * 1000 */ export class Collaborator extends DurableObject { private readonly logger = new ConsoleLogger() - private readonly connections: ConnectionManager private readonly router: RouterType private readonly doc: Document private readonly updates: Uint8Array[] @@ -58,7 +54,6 @@ export class Collaborator extends DurableObject { constructor (ctx: DurableObjectState, env: Env) { super(ctx, env) - this.connections = new ConnectionManager(this.ctx) this.doc = new Document() this.updates = [] @@ -89,7 +84,7 @@ export class Collaborator extends DurableObject { } const { 0: client, 1: server } = new WebSocketPair() - this.connections.accept(server) + this.ctx.acceptWebSocket(server) await ctx.with('session', async (ctx) => { await this.handleSession(ctx, server, documentId, source) @@ -175,15 +170,7 @@ export class Collaborator extends DurableObject { return } - try { - const encoder = protocol.handleMessage(this.doc, new Uint8Array(message), ws) - if (encoding.length(encoder) > 1) { - ws.send(encoding.toUint8Array(encoder)) - } - } catch (err) { - const error = err instanceof Error ? err.message : String(err) - this.logger.error('WebSocket message error', { error }) - } + this.doc.handleMessage(new Uint8Array(message), ws) } async webSocketClose (ws: WebSocket, code: number, reason: string, wasClean: boolean): Promise { @@ -224,7 +211,7 @@ export class Collaborator extends DurableObject { }) ctx.withSync('restoreConnections', () => { - const connections = this.connections.getConnections() + const connections = this.ctx.getWebSockets() connections.forEach((ws: WebSocket) => { this.doc.addConnection(ws) }) @@ -258,27 +245,9 @@ export class Collaborator extends DurableObject { }) this.doc.addConnection(ws) - - ctx.withSync('forceSync', () => { - const encoder = protocol.forceSyncMessage(this.doc) - ws.send(encoding.toUint8Array(encoder)) - }) - - ctx.withSync('awareness', () => { - const clients = Array.from(this.doc.awareness.states.keys()) - if (clients.length > 0) { - const encoder = protocol.awarenessMessage(this.doc, clients) - ws.send(encoding.toUint8Array(encoder)) - } - }) } async handleAwarenessUpdate ({ added, updated, removed }: AwarenessUpdate, origin: any): Promise { - // broadcast awareness state - const clients = [...added, ...updated, ...removed] - const encoder = protocol.awarenessMessage(this.doc, clients) - await this.broadcastMessage(encoding.toUint8Array(encoder)) - // persist awareness state const state = this.doc.awareness.getLocalState() await this.ctx.storage.put('awareness', state) @@ -293,35 +262,10 @@ export class Collaborator extends DurableObject { if (this.updates.length > PREFERRED_SAVE_SIZE) { void this.writeDocument() } - - // broadcast update - const encoder = protocol.updateMessage(update, origin) - await this.broadcastMessage(encoding.toUint8Array(encoder), origin) - } - - async broadcastMessage (message: Uint8Array, origin?: any): Promise { - const connections = this.connections.getConnections() - const wss = connections - .filter((ws) => ws !== origin) - .filter((ws) => ws.readyState === WebSocket.OPEN) - const promises = wss.map(async (ws) => { - await this.sendMessage(ws, message) - }) - await Promise.all(promises) - } - - async sendMessage (ws: WebSocket, message: Uint8Array): Promise { - try { - ws.send(message) - } catch (err) { - const error = err instanceof Error ? err.message : String(err) - this.logger.error('failed to send message', { error }) - await this.handleClose(ws, 1011, 'error') - } } async handleClose (ws: WebSocket, code: number, reason?: string): Promise { - const clients = this.connections.count() + const clients = this.ctx.getWebSockets().length try { ws.close(code, reason) diff --git a/workers/collaborator/src/connection.ts b/workers/collaborator/src/connection.ts deleted file mode 100644 index a34c3997360..00000000000 --- a/workers/collaborator/src/connection.ts +++ /dev/null @@ -1,31 +0,0 @@ -// -// Copyright © 2024 Hardcore Engineering Inc. -// -// Licensed under the Eclipse Public License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. You may -// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// -// See the License for the specific language governing permissions and -// limitations under the License. -// - -export class ConnectionManager { - constructor (private readonly state: DurableObjectState) {} - - count (): number { - return this.state.getWebSockets().length - } - - accept (connection: WebSocket): void { - this.state.acceptWebSocket(connection) - } - - getConnections (): WebSocket[] { - const connections = this.state.getWebSockets() - return [...connections] - } -} diff --git a/workers/collaborator/src/document.ts b/workers/collaborator/src/document.ts index 665534092f1..4e712c233c2 100644 --- a/workers/collaborator/src/document.ts +++ b/workers/collaborator/src/document.ts @@ -15,52 +15,123 @@ import { Doc as YDoc } from 'yjs' import { Awareness, removeAwarenessStates } from 'y-protocols/awareness' -import { type AwarenessUpdate } from './types' -interface SessionState { - clients: Set -} +import * as encoding from 'lib0/encoding' +import * as protocol from './protocol' +import { type AwarenessUpdate } from './types' export class Document extends YDoc { awareness: Awareness - sessions: Map + connections: Map> constructor () { super({ gc: false }) - this.sessions = new Map() + this.connections = new Map() this.awareness = new Awareness(this) this.awareness.setLocalState(null) + this.on('update', this.handleUpdate.bind(this)) this.awareness.on('update', this.handleAwarenessUpdate.bind(this)) } addConnection (ws: WebSocket): void { - const state = ws.deserializeAttachment() ?? { clients: new Set() } - this.sessions.set(ws, state) + const state = ws.deserializeAttachment() ?? new Set() + this.connections.set(ws, state) + + const awareness = this.awareness + + // Force sync document state + const encoder = protocol.forceSyncMessage(this) + ws.send(encoding.toUint8Array(encoder)) + + // Force sync awareness state + if (awareness.states.size > 0) { + const clients = Array.from(awareness.states.keys()) + if (clients.length > 0) { + const encoder = protocol.awarenessMessage(this, clients) + ws.send(encoding.toUint8Array(encoder)) + } + } } removeConnection (ws: WebSocket): void { - const state = this.sessions.get(ws) - if (state !== undefined && state.clients.size > 0) { - removeAwarenessStates(this.awareness, Array.from(state.clients), null) + closeConnection(this, ws) + } + + handleMessage (message: Uint8Array, origin: WebSocket): void { + try { + const encoder = protocol.handleMessage(this, new Uint8Array(message), origin) + if (encoding.length(encoder) > 1) { + origin.send(encoding.toUint8Array(encoder)) + } + } catch (err) { + const error = err instanceof Error ? err.message : String(err) + console.error('WebSocket message error', { error }) } + } - this.sessions.delete(ws) + private handleUpdate (update: Uint8Array, origin: any): void { + const encoder = protocol.updateMessage(update, origin) + broadcast(this, encoding.toUint8Array(encoder), [origin]) } - private handleAwarenessUpdate ({ added, removed }: AwarenessUpdate, origin: any): void { + private handleAwarenessUpdate ({ added, updated, removed }: AwarenessUpdate, origin: any): void { + const changed = [...added, ...updated, ...removed] + const encoder = protocol.awarenessMessage(this, changed) + broadcast(this, encoding.toUint8Array(encoder)) + if (origin == null || !(origin instanceof WebSocket)) return if (added.length > 0 || removed.length > 0) { - const state = this.sessions.get(origin) - if (state !== undefined) { - added.forEach((client) => state.clients.add(client)) - removed.forEach((client) => state.clients.delete(client)) + const connIDs = this.connections.get(origin) + if (connIDs !== undefined) { + added.forEach((client) => connIDs.add(client)) + removed.forEach((client) => connIDs.delete(client)) - origin.serializeAttachment(state) + origin.serializeAttachment(connIDs) } } } } + +function closeConnection (doc: Document, ws: WebSocket): void { + if (doc.connections.has(ws)) { + const connIDs = doc.connections.get(ws) + doc.connections.delete(ws) + + if (connIDs !== undefined && connIDs.size > 0) { + removeAwarenessStates(doc.awareness, Array.from(connIDs), null) + } + } + + try { + ws.close() + } catch (err) { + const error = err instanceof Error ? err.message : String(err) + console.error('failed to close WebSocket', { error }) + } +} + +function broadcast (doc: Document, message: Uint8Array, exclude: any[] = []): void { + doc.connections.forEach((_, ws) => { + if (!exclude.includes(ws)) { + send(doc, ws, message) + } + }) +} + +function send (doc: Document, ws: WebSocket, message: Uint8Array): void { + if (ws.readyState !== undefined && ws.readyState !== WebSocket.CONNECTING && ws.readyState !== WebSocket.OPEN) { + closeConnection(doc, ws) + } + + try { + ws.send(message) + } catch (err) { + const error = err instanceof Error ? err.message : String(err) + console.error('failed to send message', { error }) + closeConnection(doc, ws) + } +} From 1e98d543c785cbfbde41d6c651ff72b51446f01a Mon Sep 17 00:00:00 2001 From: Alexander Onnikov Date: Wed, 18 Dec 2024 23:49:04 +0700 Subject: [PATCH 4/9] remove sync logger Signed-off-by: Alexander Onnikov --- workers/collaborator/src/collaborator.ts | 68 +++++++++++------------- workers/collaborator/src/document.ts | 10 ++-- workers/collaborator/src/metrics.ts | 18 ------- workers/datalake/src/index.ts | 6 ++- 4 files changed, 42 insertions(+), 60 deletions(-) diff --git a/workers/collaborator/src/collaborator.ts b/workers/collaborator/src/collaborator.ts index 3ee0d24973f..1ffa6ce34e5 100644 --- a/workers/collaborator/src/collaborator.ts +++ b/workers/collaborator/src/collaborator.ts @@ -54,7 +54,7 @@ export class Collaborator extends DurableObject { constructor (ctx: DurableObjectState, env: Env) { super(ctx, env) - this.doc = new Document() + this.doc = new Document(this.logger) this.updates = [] this.router = Router() @@ -120,11 +120,9 @@ export class Collaborator extends DurableObject { handleRpcGetContent (ctx: MetricsContext, id: string, request: RpcGetContentRequest): Response { const content: Record = {} - ctx.withSync('ydoc.read', () => { - for (const field of this.doc.share.keys()) { - content[field] = JSON.stringify(yDocToJSON(this.doc, field)) - } - }) + for (const field of this.doc.share.keys()) { + content[field] = JSON.stringify(yDocToJSON(this.doc, field)) + } return Response.json({ content }, { status: 200 }) } @@ -133,11 +131,12 @@ export class Collaborator extends DurableObject { const documentId = decodeDocumentId(id) const content: Record = {} - ctx.withSync('ydoc.write', () => { - this.doc.transact(() => { - Object.entries(request.payload.content).forEach(([field, value]) => { - jsonToYDoc(JSON.parse(value), this.doc, field) - }) + this.doc.transact(() => { + Object.entries(request.payload.content).forEach(([field, value]) => { + if (value !== undefined && value !== null && value !== '') { + const json = JSON.parse(String(value)) + jsonToYDoc(json, this.doc, field) + } }) }) @@ -153,11 +152,10 @@ export class Collaborator extends DurableObject { } handleRpcUpdateContent (ctx: MetricsContext, id: string, request: RpcUpdateContentRequest): Response { - ctx.withSync('ydoc.write', () => { - this.doc.transact(() => { - Object.entries(request.payload.content).forEach(([field, value]) => { - jsonToYDoc(JSON.parse(value), this.doc, field) - }) + this.doc.transact(() => { + Object.entries(request.payload.content).forEach(([field, value]) => { + const json = JSON.parse(String(value)) + jsonToYDoc(json, this.doc, field) }) }) @@ -210,19 +208,15 @@ export class Collaborator extends DurableObject { await this.readDocument(ctx) }) - ctx.withSync('restoreConnections', () => { - const connections = this.ctx.getWebSockets() - connections.forEach((ws: WebSocket) => { - this.doc.addConnection(ws) - }) + const connections = this.ctx.getWebSockets() + connections.forEach((ws: WebSocket) => { + this.doc.addConnection(ws) }) - ctx.withSync('restoreListeners', () => { - // enable update listeners only after the document is restored - // eslint-disable-next-line @typescript-eslint/no-misused-promises - this.doc.on('update', this.handleDocUpdate.bind(this)) - this.doc.awareness.on('update', this.handleAwarenessUpdate.bind(this)) - }) + // enable update listeners only after the document is restored + // eslint-disable-next-line @typescript-eslint/no-misused-promises + this.doc.on('update', this.handleDocUpdate.bind(this)) + this.doc.awareness.on('update', this.handleAwarenessUpdate.bind(this)) }) this.hydrated = true @@ -300,12 +294,12 @@ export class Collaborator extends DurableObject { return this.env.DATALAKE.getBlob(workspaceId, blobId) }) - ctx.withSync('applyUpdate', () => { + if (buffer !== undefined) { applyUpdate(this.doc, new Uint8Array(buffer)) - }) - loaded = true - ctx.log('loaded from datalake', { workspaceId, documentId, blobId }) + loaded = true + ctx.log('loaded from datalake', { workspaceId, documentId, blobId }) + } }) } catch (err) { // the blob might be missing, ignore errors @@ -322,11 +316,13 @@ export class Collaborator extends DurableObject { return this.env.DATALAKE.getBlob(workspaceId, source) }) - ctx.withSync('jsonToYDoc', () => { - jsonToYDoc(JSON.parse(String(buffer)), this.doc, documentId.objectAttr) - }) + if (buffer !== undefined) { + const json = JSON.parse(String(buffer)) + jsonToYDoc(json, this.doc, documentId.objectAttr) - ctx.log('loaded from datalake', { workspaceId, documentId, blobId }) + loaded = true + ctx.log('loaded from datalake', { workspaceId, documentId, blobId }) + } }) } catch (err) { // the blob might be missing, ignore errors @@ -381,7 +377,7 @@ export class Collaborator extends DurableObject { const workspaceId = documentId.workspaceId // save ydoc content - const update = ctx.withSync('ydoc.encodeStateAsUpdate', () => encodeStateAsUpdate(this.doc)) + const update = encodeStateAsUpdate(this.doc) await ctx.with('datalake.putBlob', async () => { const blobId = ydocBlobId(documentId) await this.env.DATALAKE.putBlob(workspaceId, blobId, new Uint8Array(update), 'application/ydoc') diff --git a/workers/collaborator/src/document.ts b/workers/collaborator/src/document.ts index 4e712c233c2..75bf1a4e449 100644 --- a/workers/collaborator/src/document.ts +++ b/workers/collaborator/src/document.ts @@ -15,16 +15,16 @@ import { Doc as YDoc } from 'yjs' import { Awareness, removeAwarenessStates } from 'y-protocols/awareness' - import * as encoding from 'lib0/encoding' import * as protocol from './protocol' +import { type Logger } from './metrics' import { type AwarenessUpdate } from './types' export class Document extends YDoc { awareness: Awareness connections: Map> - constructor () { + constructor (readonly logger: Logger) { super({ gc: false }) this.connections = new Map() @@ -68,7 +68,7 @@ export class Document extends YDoc { } } catch (err) { const error = err instanceof Error ? err.message : String(err) - console.error('WebSocket message error', { error }) + this.logger.error('WebSocket message error', { error }) } } @@ -110,7 +110,7 @@ function closeConnection (doc: Document, ws: WebSocket): void { ws.close() } catch (err) { const error = err instanceof Error ? err.message : String(err) - console.error('failed to close WebSocket', { error }) + doc.logger.error('failed to close WebSocket', { error }) } } @@ -131,7 +131,7 @@ function send (doc: Document, ws: WebSocket, message: Uint8Array): void { ws.send(message) } catch (err) { const error = err instanceof Error ? err.message : String(err) - console.error('failed to send message', { error }) + doc.logger.error('failed to send message', { error }) closeConnection(doc, ws) } } diff --git a/workers/collaborator/src/metrics.ts b/workers/collaborator/src/metrics.ts index 0a44339138d..64ddf17ec44 100644 --- a/workers/collaborator/src/metrics.ts +++ b/workers/collaborator/src/metrics.ts @@ -107,24 +107,6 @@ export class MetricsContext { } } - withSync(op: string, fn: (ctx: MetricsContext) => T): T { - const ctx = new MetricsContext(this.logger) - const start = performance.now() - - let error: string | undefined - - try { - return fn(ctx) - } catch (err: any) { - error = err instanceof Error ? err.message : String(err) - throw err - } finally { - const time = performance.now() - start - const children = ctx.metrics - this.metrics.push(error !== undefined ? { op, time, error, children } : { op, time, children }) - } - } - toString (): string { return this.metrics.map((p) => `${p.op}=${p.time}`).join(' ') } diff --git a/workers/datalake/src/index.ts b/workers/datalake/src/index.ts index c4a42172224..c29eab2ca54 100644 --- a/workers/datalake/src/index.ts +++ b/workers/datalake/src/index.ts @@ -116,10 +116,14 @@ export default class DatalakeWorker extends WorkerEntrypoint { } } - async getBlob (workspace: string, name: string): Promise { + async getBlob (workspace: string, name: string): Promise { const request = new Request(`https://datalake/blob/${workspace}/${encodeURIComponent(name)}`) const response = await this.fetch(request) + if (response.status === 404) { + return undefined + } + if (!response.ok) { console.error({ error: 'datalake error: ' + response.statusText, workspace, name }) throw new Error(`Failed to fetch blob: ${response.statusText}`) From d3c1a9c566a49a932adf26029da57eb095b72662 Mon Sep 17 00:00:00 2001 From: Alexander Onnikov Date: Thu, 19 Dec 2024 00:54:11 +0700 Subject: [PATCH 5/9] fix: add token to cloud collaborator Signed-off-by: Alexander Onnikov --- .../src/provider/cloud.ts | 4 +-- workers/collaborator/src/collaborator.ts | 18 ++++++++++--- workers/collaborator/src/index.ts | 27 +++++++++++++++---- 3 files changed, 38 insertions(+), 11 deletions(-) diff --git a/plugins/text-editor-resources/src/provider/cloud.ts b/plugins/text-editor-resources/src/provider/cloud.ts index 780e3206fcf..9aa70b8df6e 100644 --- a/plugins/text-editor-resources/src/provider/cloud.ts +++ b/plugins/text-editor-resources/src/provider/cloud.ts @@ -30,8 +30,8 @@ export interface DatalakeCollabProviderParameters { export class CloudCollabProvider extends WebsocketProvider implements Provider { readonly loaded: Promise - constructor ({ document, url, name, source }: DatalakeCollabProviderParameters) { - const params = source != null ? { source } : undefined + constructor ({ document, url, name, source, token }: DatalakeCollabProviderParameters) { + const params = { token, source: source ?? '' } super(url, encodeURIComponent(name), document, { params }) diff --git a/workers/collaborator/src/collaborator.ts b/workers/collaborator/src/collaborator.ts index 1ffa6ce34e5..f51e4971e5c 100644 --- a/workers/collaborator/src/collaborator.ts +++ b/workers/collaborator/src/collaborator.ts @@ -134,8 +134,13 @@ export class Collaborator extends DurableObject { this.doc.transact(() => { Object.entries(request.payload.content).forEach(([field, value]) => { if (value !== undefined && value !== null && value !== '') { - const json = JSON.parse(String(value)) - jsonToYDoc(json, this.doc, field) + try { + const json = JSON.parse(String(value)) + jsonToYDoc(json, this.doc, field) + } catch (err) { + const error = err instanceof Error ? err.message : String(err) + ctx.error('Failed to process JSON', { error }) + } } }) }) @@ -154,8 +159,13 @@ export class Collaborator extends DurableObject { handleRpcUpdateContent (ctx: MetricsContext, id: string, request: RpcUpdateContentRequest): Response { this.doc.transact(() => { Object.entries(request.payload.content).forEach(([field, value]) => { - const json = JSON.parse(String(value)) - jsonToYDoc(json, this.doc, field) + try { + const json = JSON.parse(String(value)) + jsonToYDoc(json, this.doc, field) + } catch (err) { + const error = err instanceof Error ? err.message : String(err) + ctx.error('Failed to process JSON', { error }) + } }) }) diff --git a/workers/collaborator/src/index.ts b/workers/collaborator/src/index.ts index e25ddef93e0..c4484dad83b 100644 --- a/workers/collaborator/src/index.ts +++ b/workers/collaborator/src/index.ts @@ -13,16 +13,33 @@ // limitations under the License. // -import { type IRequestStrict, Router, cors, error, html } from 'itty-router' +import { type IRequest, type IRequestStrict, type RequestHandler, Router, cors, error, html } from 'itty-router' import { type Env } from './env' export { Collaborator } from './collaborator' const { preflight, corsify } = cors({ maxAge: 86400 }) +const withToken: RequestHandler = (request: IRequest) => { + let token = request.query.token + + if (token === undefined || token === '') { + const authorization = request.headers.get('Authorization') + if (authorization != null && authorization.startsWith('Bearer ')) { + token = authorization.substring(7) + } + } + + if (token === undefined || token === '') { + return error(401, 'Unauthorized') + } + + request.token = token +} + const router = Router() .options('*', preflight) - .get('/:id', async (request, env) => { + .get('/:id', withToken, async (request, env) => { const { headers } = request const documentId = decodeURIComponent(request.params.id) @@ -33,15 +50,15 @@ const router = Router() const id = env.COLLABORATOR.idFromName(documentId) const stub = env.COLLABORATOR.get(id) - return await stub.fetch(request) + return stub.fetch(request) }) - .post('/rpc/:id', async (request, env) => { + .post('/rpc/:id', withToken, async (request, env) => { const documentId = decodeURIComponent(request.params.id) const id = env.COLLABORATOR.idFromName(documentId) const stub = env.COLLABORATOR.get(id) - return await stub.fetch(request) + return stub.fetch(request) }) .all('/', () => html( From c170f145248f208062b25a65a8e7a8c19fd75139 Mon Sep 17 00:00:00 2001 From: Alexander Onnikov Date: Thu, 19 Dec 2024 00:54:57 +0700 Subject: [PATCH 6/9] use db_url in datalake Signed-off-by: Alexander Onnikov --- workers/datalake/src/db.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/workers/datalake/src/db.ts b/workers/datalake/src/db.ts index be5a03f99e9..c38feffe8e9 100644 --- a/workers/datalake/src/db.ts +++ b/workers/datalake/src/db.ts @@ -53,7 +53,8 @@ export async function withPostgres ( fn: (db: BlobDB) => Promise ): Promise { const sql = metrics.withSync('db.connect', () => { - return postgres(env.HYPERDRIVE.connectionString, { + const url = env.DB_URL !== '' && env.DB_URL !== undefined ? env.DB_URL : env.HYPERDRIVE.connectionString + return postgres(url, { connection: { application_name: 'datalake' }, From 5b1931b9f3c6d2547b710f2e85bf7dc80f122d05 Mon Sep 17 00:00:00 2001 From: Alexander Onnikov Date: Thu, 19 Dec 2024 11:49:39 +0700 Subject: [PATCH 7/9] remove dependency on types/node Signed-off-by: Alexander Onnikov --- workers/collaborator/package.json | 1 - 1 file changed, 1 deletion(-) diff --git a/workers/collaborator/package.json b/workers/collaborator/package.json index b6c3d1588ce..7cb77ed9783 100644 --- a/workers/collaborator/package.json +++ b/workers/collaborator/package.json @@ -34,7 +34,6 @@ "eslint-plugin-promise": "^6.1.1", "eslint": "^8.54.0", "@types/jest": "^29.5.5", - "@types/node": "~20.11.16", "@hcengineering/cloud-datalake": "^0.6.0" }, "dependencies": { From dcbc9d4ddb902a6b4bac1b1482a04fc7a99c60f1 Mon Sep 17 00:00:00 2001 From: Alexander Onnikov Date: Thu, 19 Dec 2024 18:30:38 +0700 Subject: [PATCH 8/9] rever docker-compose changes Signed-off-by: Alexander Onnikov --- dev/docker-compose.yaml | 9 ++++----- dev/prod/config.json | 5 +---- dev/prod/public/config-dev.json | 5 +---- dev/prod/public/config.json | 5 +---- 4 files changed, 7 insertions(+), 17 deletions(-) diff --git a/dev/docker-compose.yaml b/dev/docker-compose.yaml index 851e948e80d..d3f260117c0 100644 --- a/dev/docker-compose.yaml +++ b/dev/docker-compose.yaml @@ -94,7 +94,7 @@ services: # - REGION_INFO=cockroach|CockroachDB - TRANSACTOR_URL=ws://host.docker.internal:3333,ws://host.docker.internal:3331;;pg,ws://host.docker.internal:3332;;cockroach, - SES_URL= - - STORAGE_CONFIG=datalake|https://huly.net + - STORAGE_CONFIG=${STORAGE_CONFIG} - FRONT_URL=http://host.docker.internal:8087 - RESERVED_DB_NAMES=telegram,gmail,github - MODEL_ENABLED=* @@ -159,7 +159,7 @@ services: - STATS_URL=http://host.docker.internal:4900 - SES_URL= - REGION=pg - - STORAGE_CONFIG=datalake|https://huly.net + - STORAGE_CONFIG=${STORAGE_CONFIG} - FRONT_URL=http://host.docker.internal:8087 - RESERVED_DB_NAMES=telegram,gmail,github - MODEL_ENABLED=* @@ -239,9 +239,8 @@ services: - CALENDAR_URL=http://host.docker.internal:8095 - TELEGRAM_URL=http://host.docker.internal:8086 - REKONI_URL=http://host.docker.internal:4004 - - COLLABORATOR=cloud - - COLLABORATOR_URL=wss://cl.huly.net - - STORAGE_CONFIG=datalake|https://huly.net + - COLLABORATOR_URL=ws://host.docker.internal:3078 + - STORAGE_CONFIG=${STORAGE_CONFIG} - GITHUB_URL=http://host.docker.internal:3500 - PRINT_URL=http://host.docker.internal:4005 - SIGN_URL=http://host.docker.internal:4006 diff --git a/dev/prod/config.json b/dev/prod/config.json index 9839d5195b7..f03fb9b3338 100644 --- a/dev/prod/config.json +++ b/dev/prod/config.json @@ -1,10 +1,7 @@ { "ACCOUNTS_URL":"http://localhost:3000", "COLLABORATOR_URL": "ws://localhost:3078", - "UPLOAD_URL":"http://localhost:8787/upload/form-data/:workspace", - "UPLOAD_CONFIG":"signed-url|100|http://localhost:8787/upload/signed-url/:workspace/:blobId", - "FILES_URL":"http://localhost:8787/blob/:workspace/:blobId", - "PREVIEW_CONFIG":"http://localhost:8787/image/fit=scale-down,width=:size/:workspace/:blobId", + "UPLOAD_URL":"/files", "REKONI_URL": "http://localhost:4004", "PRINT_URL": "http://localhost:4005", "SIGN_URL": "http://localhost:4006", diff --git a/dev/prod/public/config-dev.json b/dev/prod/public/config-dev.json index cbede7e92f1..8f0b1e22f26 100644 --- a/dev/prod/public/config-dev.json +++ b/dev/prod/public/config-dev.json @@ -1,9 +1,6 @@ { "ACCOUNTS_URL":"https://account.hc.engineering", - "UPLOAD_URL":"http://localhost:8787/upload/form-data/:workspace", - "UPLOAD_CONFIG":"signed-url|100|http://localhost:8787/upload/signed-url/:workspace/:blobId", - "FILES_URL":"http://localhost:8787/blob/:workspace/:blobId", - "PREVIEW_CONFIG":"http://localhost:8787/image/fit=scale-down,width=:size/:workspace/:blobId", + "UPLOAD_URL":"/files", "MODEL_VERSION": null, "TELEGRAM_URL": "https://telegram.hc.engineering", "GMAIL_URL": "https://gmail.hc.engineering", diff --git a/dev/prod/public/config.json b/dev/prod/public/config.json index 617a1d09160..d2f1c4d3a0a 100644 --- a/dev/prod/public/config.json +++ b/dev/prod/public/config.json @@ -1,10 +1,7 @@ { "ACCOUNTS_URL":"/account", "COLLABORATOR_URL": "ws://localhost:3078", - "UPLOAD_URL":"http://localhost:8787/upload/form-data/:workspace", - "UPLOAD_CONFIG":"signed-url|100|http://localhost:8787/upload/signed-url/:workspace/:blobId", - "FILES_URL":"http://localhost:8787/blob/:workspace/:blobId", - "PREVIEW_CONFIG":"http://localhost:8787/image/fit=scale-down,width=:size/:workspace/:blobId", + "UPLOAD_URL":"/files", "TELEGRAM_URL": "http://localhost:8086", "GMAIL_URL": "http://localhost:8088", "CALENDAR_URL": "http://localhost:8095", From 301d62067f1c9a3be47a4843f1952ca5634e631c Mon Sep 17 00:00:00 2001 From: Alexander Onnikov Date: Thu, 19 Dec 2024 21:51:38 +0700 Subject: [PATCH 9/9] fix rush check issues Signed-off-by: Alexander Onnikov --- common/config/rush/pnpm-lock.yaml | 6 +++--- workers/collaborator/package.json | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/common/config/rush/pnpm-lock.yaml b/common/config/rush/pnpm-lock.yaml index c46843cf616..3f97ba669cf 100644 --- a/common/config/rush/pnpm-lock.yaml +++ b/common/config/rush/pnpm-lock.yaml @@ -23209,12 +23209,12 @@ packages: dev: false file:projects/cloud-collaborator.tgz(@types/node@20.11.19)(bufferutil@4.0.8)(esbuild@0.20.1)(ts-node@10.9.2)(utf-8-validate@6.0.4): - resolution: {integrity: sha512-QNx9L+csvt4b90kZlVN5Nl0XFCynmmohJSObAh+NeLx0C4yDLPAfAhiIB7fzraoEfOZ2yzomUYirYHTApx10fA==, tarball: file:projects/cloud-collaborator.tgz} + resolution: {integrity: sha512-fwUF4g20XILvd0nYY4zeEr/p5DhEJ8oh3Ssjd1YFAc0jdlqbhjbmo0OWrKhYDkxvSOwnt0VjFPvqisJ/jH0iQQ==, tarball: file:projects/cloud-collaborator.tgz} id: file:projects/cloud-collaborator.tgz name: '@rush-temp/cloud-collaborator' version: 0.0.0 dependencies: - '@cloudflare/workers-types': 4.20241004.0 + '@cloudflare/workers-types': 4.20241022.0 '@types/jest': 29.5.12 '@typescript-eslint/eslint-plugin': 6.21.0(@typescript-eslint/parser@6.21.0)(eslint@8.56.0)(typescript@5.6.2) '@typescript-eslint/parser': 6.21.0(eslint@8.56.0)(typescript@5.6.2) @@ -23229,7 +23229,7 @@ packages: prettier: 3.2.5 ts-jest: 29.1.2(esbuild@0.20.1)(jest@29.7.0)(typescript@5.6.2) typescript: 5.6.2 - wrangler: 3.80.2(@cloudflare/workers-types@4.20241004.0)(bufferutil@4.0.8)(utf-8-validate@6.0.4) + wrangler: 3.97.0(@cloudflare/workers-types@4.20241022.0)(bufferutil@4.0.8)(utf-8-validate@6.0.4) y-prosemirror: 1.2.12(y-protocols@1.0.6)(yjs@13.6.19) y-protocols: 1.0.6(yjs@13.6.19) yjs: 13.6.19 diff --git a/workers/collaborator/package.json b/workers/collaborator/package.json index 7cb77ed9783..2f8a7802401 100644 --- a/workers/collaborator/package.json +++ b/workers/collaborator/package.json @@ -20,9 +20,9 @@ }, "devDependencies": { "@hcengineering/platform-rig": "^0.6.0", - "@cloudflare/workers-types": "^4.20240729.0", + "@cloudflare/workers-types": "^4.20241022.0", "typescript": "^5.3.3", - "wrangler": "^3.80.1", + "wrangler": "^3.97.0", "jest": "^29.7.0", "prettier": "^3.1.0", "ts-jest": "^29.1.1",