From d40c7be3b57b2de606ccd3ecd8284d969c7f46f5 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Fri, 31 Jan 2025 16:22:50 +0100 Subject: [PATCH 01/22] refactor(postgres): disable statement preparation across edge worker functions Modify postgres connection configuration to set prepare to false in: - Worker.ts - cpu_intensive/index.ts - max_concurrency/index.ts - serial_sleep/index.ts - tests/sql.ts This change updates the postgres connection settings to disable statement preparation, potentially addressing performance or compatibility issues across multiple edge worker functions and test configurations. --- pkgs/edge-worker/src/Worker.ts | 2 +- pkgs/edge-worker/supabase/config.toml | 2 +- pkgs/edge-worker/supabase/functions/cpu_intensive/index.ts | 2 +- pkgs/edge-worker/supabase/functions/max_concurrency/index.ts | 2 +- pkgs/edge-worker/supabase/functions/serial_sleep/index.ts | 2 +- pkgs/edge-worker/tests/sql.ts | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkgs/edge-worker/src/Worker.ts b/pkgs/edge-worker/src/Worker.ts index d980c05..40076eb 100644 --- a/pkgs/edge-worker/src/Worker.ts +++ b/pkgs/edge-worker/src/Worker.ts @@ -53,7 +53,7 @@ export class Worker { this.sql = postgres(this.config.connectionString, { max: this.config.maxPgConnections, - prepare: true, + prepare: false, }); const queue = new Queue(this.sql, this.config.queueName); diff --git a/pkgs/edge-worker/supabase/config.toml b/pkgs/edge-worker/supabase/config.toml index 2820cb4..22c6dc4 100644 --- a/pkgs/edge-worker/supabase/config.toml +++ b/pkgs/edge-worker/supabase/config.toml @@ -12,7 +12,7 @@ major_version = 15 [db.pooler] enabled = true port = 50329 -pool_mode = "session" +pool_mode = "transaction" default_pool_size = 200 max_client_conn = 200 diff --git a/pkgs/edge-worker/supabase/functions/cpu_intensive/index.ts b/pkgs/edge-worker/supabase/functions/cpu_intensive/index.ts index 445178e..884e70b 100644 --- a/pkgs/edge-worker/supabase/functions/cpu_intensive/index.ts +++ b/pkgs/edge-worker/supabase/functions/cpu_intensive/index.ts @@ -5,7 +5,7 @@ import { crypto } from 'jsr:@std/crypto'; const EDGE_WORKER_DB_URL = Deno.env.get('EDGE_WORKER_DB_URL')!; console.log('EDGE_WORKER_DB_URL', EDGE_WORKER_DB_URL); -const sql = postgres(EDGE_WORKER_DB_URL, { prepare: true }); +const sql = postgres(EDGE_WORKER_DB_URL, { prepare: false }); async function cpuIntensiveTask() { let data = new TextEncoder().encode('burn'); diff --git a/pkgs/edge-worker/supabase/functions/max_concurrency/index.ts b/pkgs/edge-worker/supabase/functions/max_concurrency/index.ts index 029ca3d..2867da4 100644 --- a/pkgs/edge-worker/supabase/functions/max_concurrency/index.ts +++ b/pkgs/edge-worker/supabase/functions/max_concurrency/index.ts @@ -5,7 +5,7 @@ import { delay } from 'jsr:@std/async'; const EDGE_WORKER_DB_URL = Deno.env.get('EDGE_WORKER_DB_URL')!; console.log('EDGE_WORKER_DB_URL', EDGE_WORKER_DB_URL); -const sql = postgres(EDGE_WORKER_DB_URL, { prepare: true }); +const sql = postgres(EDGE_WORKER_DB_URL, { prepare: false }); async function incrementSeq() { // await delay(1000); diff --git a/pkgs/edge-worker/supabase/functions/serial_sleep/index.ts b/pkgs/edge-worker/supabase/functions/serial_sleep/index.ts index 607928f..52a69fd 100644 --- a/pkgs/edge-worker/supabase/functions/serial_sleep/index.ts +++ b/pkgs/edge-worker/supabase/functions/serial_sleep/index.ts @@ -3,7 +3,7 @@ import { delay } from 'jsr:@std/async'; import postgres from 'postgres'; const EDGE_WORKER_DB_URL = Deno.env.get('EDGE_WORKER_DB_URL')!; -const sql = postgres(EDGE_WORKER_DB_URL, { prepare: true }); +const sql = postgres(EDGE_WORKER_DB_URL, { prepare: false }); await sql`CREATE SEQUENCE IF NOT EXISTS test_seq`; await sql`SELECT pgmq.create('serial_sleep')`; diff --git a/pkgs/edge-worker/tests/sql.ts b/pkgs/edge-worker/tests/sql.ts index 5b2cdf0..b20dfad 100644 --- a/pkgs/edge-worker/tests/sql.ts +++ b/pkgs/edge-worker/tests/sql.ts @@ -4,7 +4,7 @@ const DB_URL = 'postgresql://postgres:postgres@127.0.0.1:50322/postgres'; export function createSql() { return postgres(DB_URL, { - prepare: true, + prepare: false, onnotice(_: unknown) { // no-op to silence notices }, From 812ff391855cedcc72cae4e7485899b1ea7465a5 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Fri, 31 Jan 2025 16:44:27 +0100 Subject: [PATCH 02/22] refactor(logging): Improve task execution logging verbosity Enhance logging for task execution and archiving process with more descriptive debug messages. Updated log statements to provide clearer insights into task lifecycle and execution status. --- pkgs/edge-worker/src/ExecutionController.ts | 4 +++- pkgs/edge-worker/src/MessageExecutor.ts | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pkgs/edge-worker/src/ExecutionController.ts b/pkgs/edge-worker/src/ExecutionController.ts index b3ca307..d7a28dd 100644 --- a/pkgs/edge-worker/src/ExecutionController.ts +++ b/pkgs/edge-worker/src/ExecutionController.ts @@ -48,11 +48,13 @@ export class ExecutionController { this.retryDelay ); - this.logger.info(`Starting execution for ${executor.msgId}`); + this.logger.info(`Scheduling execution of task ${executor.msgId}`); return await this.pqueue.add(async () => { try { + this.logger.debug(`Executing task ${executor.msgId}...`); await executor.execute(); + this.logger.debug(`Execution successful for ${executor.msgId}`); } catch (error) { this.logger.error(`Execution failed for ${executor.msgId}:`, error); throw error; diff --git a/pkgs/edge-worker/src/MessageExecutor.ts b/pkgs/edge-worker/src/MessageExecutor.ts index 96dcef0..362fb53 100644 --- a/pkgs/edge-worker/src/MessageExecutor.ts +++ b/pkgs/edge-worker/src/MessageExecutor.ts @@ -47,12 +47,14 @@ export class MessageExecutor { // Check if already aborted before starting this.signal.throwIfAborted(); + this.logger.debug(`Executing task ${this.msgId}...`); await this.messageHandler(this.record.message!); this.logger.debug( `Task ${this.msgId} completed successfully, archiving...` ); await this.queue.archive(this.msgId); + this.logger.debug(`Archived task ${this.msgId} successfully`); // TODO: uncomment when ready to debug this // await this.batchArchiver.add(this.msgId); From 1d3eeb12884bd51288de29cfdf9603dc825fadaf Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Sat, 1 Feb 2025 23:44:12 +0100 Subject: [PATCH 03/22] chore(deps): update dependencies and adjust Supabase connection pool Update import statements in deno.json to reorganize and add new dependencies, and increase max client connections in Supabase configuration from 200 to 250 --- pkgs/edge-worker/deno.json | 9 +++++---- pkgs/edge-worker/supabase/config.toml | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pkgs/edge-worker/deno.json b/pkgs/edge-worker/deno.json index 30a76b2..92a5d98 100644 --- a/pkgs/edge-worker/deno.json +++ b/pkgs/edge-worker/deno.json @@ -4,11 +4,12 @@ "license": "MIT", "exports": "./mod.ts", "imports": { - "postgres": "npm:postgres@3.4.5", - "@std/async": "jsr:@std/async", + "@henrygd/queue": "jsr:@henrygd/queue@^1.0.7", "@std/assert": "jsr:@std/assert", - "@std/testing/mock": "jsr:@std/testing/mock", + "@std/async": "jsr:@std/async", "@std/log": "jsr:@std/log@^0.224.13", - "p-queue": "npm:p-queue@^8.0.1" + "@std/testing/mock": "jsr:@std/testing/mock", + "p-queue": "npm:p-queue@^8.0.1", + "postgres": "npm:postgres@3.4.5" } } diff --git a/pkgs/edge-worker/supabase/config.toml b/pkgs/edge-worker/supabase/config.toml index 22c6dc4..73fdbfb 100644 --- a/pkgs/edge-worker/supabase/config.toml +++ b/pkgs/edge-worker/supabase/config.toml @@ -14,7 +14,7 @@ enabled = true port = 50329 pool_mode = "transaction" default_pool_size = 200 -max_client_conn = 200 +max_client_conn = 250 [db.seed] enabled = true From a0a56b457b75b08934cfd29c05be31766c4728fa Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Mon, 3 Feb 2025 13:17:29 +0100 Subject: [PATCH 04/22] refactor(queue): replace p-queue with @henrygd/queue Replace deprecated p-queue library with @henrygd/queue, updating import statements, queue initialization, and method calls in ExecutionController. Remove unnecessary dependencies and update project lock file accordingly. Breaking changes: - Switched from npm:p-queue to jsr:@henrygd/queue - Modified queue initialization and method usage - Commented out archiver flush method --- pkgs/edge-worker/deno.json | 1 - pkgs/edge-worker/deno.lock | 22 +++++---------------- pkgs/edge-worker/src/ExecutionController.ts | 12 +++++------ 3 files changed, 11 insertions(+), 24 deletions(-) diff --git a/pkgs/edge-worker/deno.json b/pkgs/edge-worker/deno.json index 92a5d98..8e214ca 100644 --- a/pkgs/edge-worker/deno.json +++ b/pkgs/edge-worker/deno.json @@ -9,7 +9,6 @@ "@std/async": "jsr:@std/async", "@std/log": "jsr:@std/log@^0.224.13", "@std/testing/mock": "jsr:@std/testing/mock", - "p-queue": "npm:p-queue@^8.0.1", "postgres": "npm:postgres@3.4.5" } } diff --git a/pkgs/edge-worker/deno.lock b/pkgs/edge-worker/deno.lock index 5829935..45d6d4d 100644 --- a/pkgs/edge-worker/deno.lock +++ b/pkgs/edge-worker/deno.lock @@ -3,6 +3,7 @@ "packages": { "specifiers": { "jsr:@deno-library/progress": "jsr:@deno-library/progress@1.5.1", + "jsr:@henrygd/queue@^1.0.7": "jsr:@henrygd/queue@1.0.7", "jsr:@std/assert": "jsr:@std/assert@1.0.10", "jsr:@std/assert@^1.0.10": "jsr:@std/assert@1.0.10", "jsr:@std/async": "jsr:@std/async@1.0.9", @@ -14,7 +15,6 @@ "jsr:@std/io@^0.225.0": "jsr:@std/io@0.225.0", "jsr:@std/log@^0.224.13": "jsr:@std/log@0.224.13", "jsr:@std/testing": "jsr:@std/testing@1.0.9", - "npm:p-queue@^8.0.1": "npm:p-queue@8.1.0", "npm:postgres@3.4.5": "npm:postgres@3.4.5" }, "jsr": { @@ -25,6 +25,9 @@ "jsr:@std/io@0.225.0" ] }, + "@henrygd/queue@1.0.7": { + "integrity": "98cade132744bb420957c5413393f76eb8ba7261826f026c8a89a562b8fa2961" + }, "@std/assert@1.0.10": { "integrity": "59b5cbac5bd55459a19045d95cc7c2ff787b4f8527c0dd195078ff6f9481fbb3", "dependencies": [ @@ -65,21 +68,6 @@ } }, "npm": { - "eventemitter3@5.0.1": { - "integrity": "sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==", - "dependencies": {} - }, - "p-queue@8.1.0": { - "integrity": "sha512-mxLDbbGIBEXTJL0zEx8JIylaj3xQ7Z/7eEVjcF9fJX4DBiH9oqe+oahYnlKKxm0Ci9TlWTyhSHgygxMxjIB2jw==", - "dependencies": { - "eventemitter3": "eventemitter3@5.0.1", - "p-timeout": "p-timeout@6.1.3" - } - }, - "p-timeout@6.1.3": { - "integrity": "sha512-UJUyfKbwvr/uZSV6btANfb+0t/mOhKV/KXcCUTp8FcQI+v/0d+wXqH4htrW0E4rR6WiEO/EPvUFiV9D5OI4vlw==", - "dependencies": {} - }, "postgres@3.4.5": { "integrity": "sha512-cDWgoah1Gez9rN3H4165peY9qfpEo+SA61oQv65O3cRUE1pOEoJWwddwcqKE8XZYjbblOJlYDlLV4h67HrEVDg==", "dependencies": {} @@ -370,11 +358,11 @@ }, "workspace": { "dependencies": [ + "jsr:@henrygd/queue@^1.0.7", "jsr:@std/assert", "jsr:@std/async", "jsr:@std/log@^0.224.13", "jsr:@std/testing", - "npm:p-queue@^8.0.1", "npm:postgres@3.4.5" ] } diff --git a/pkgs/edge-worker/src/ExecutionController.ts b/pkgs/edge-worker/src/ExecutionController.ts index d7a28dd..90e6e0a 100644 --- a/pkgs/edge-worker/src/ExecutionController.ts +++ b/pkgs/edge-worker/src/ExecutionController.ts @@ -1,4 +1,4 @@ -import PQueue from 'p-queue'; +import { newQueue, type Queue as PromiseQueue } from '@henrygd/queue'; import { MessageExecutor } from './MessageExecutor.ts'; import { Queue } from './Queue.ts'; import { Json } from './types.ts'; @@ -15,7 +15,7 @@ export interface ExecutionConfig { export class ExecutionController { private logger = getLogger('ExecutionController'); private queue: Queue; - private pqueue: PQueue; + private promiseQueue: PromiseQueue; private archiver: BatchArchiver; private signal: AbortSignal; private retryLimit: number; @@ -30,7 +30,7 @@ export class ExecutionController { this.signal = abortSignal; this.retryLimit = config.retryLimit; this.retryDelay = config.retryDelay; - this.pqueue = new PQueue({ concurrency: config.maxConcurrent }); + this.promiseQueue = newQueue(config.maxConcurrent); this.archiver = new BatchArchiver(queue); } @@ -50,7 +50,7 @@ export class ExecutionController { this.logger.info(`Scheduling execution of task ${executor.msgId}`); - return await this.pqueue.add(async () => { + return await this.promiseQueue.add(async () => { try { this.logger.debug(`Executing task ${executor.msgId}...`); await executor.execute(); @@ -63,7 +63,7 @@ export class ExecutionController { } async awaitCompletion() { - await this.pqueue.onIdle(); - await this.archiver.flush(); + await this.promiseQueue.done(); + // await this.archiver.flush(); } } From 0d84999e8ba656980f17eb3efb8f0a4b20f01be0 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Mon, 3 Feb 2025 14:21:27 +0100 Subject: [PATCH 05/22] refactor(edge-worker): simplify delay mechanism in max concurrency function Remove random delay and replace with minimal delay to optimize function behavior --- .../edge-worker/supabase/functions/max_concurrency/index.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkgs/edge-worker/supabase/functions/max_concurrency/index.ts b/pkgs/edge-worker/supabase/functions/max_concurrency/index.ts index 2867da4..494b42c 100644 --- a/pkgs/edge-worker/supabase/functions/max_concurrency/index.ts +++ b/pkgs/edge-worker/supabase/functions/max_concurrency/index.ts @@ -8,9 +8,9 @@ console.log('EDGE_WORKER_DB_URL', EDGE_WORKER_DB_URL); const sql = postgres(EDGE_WORKER_DB_URL, { prepare: false }); async function incrementSeq() { - // await delay(1000); - const randTimeMs = Math.floor(Math.random() * 100 + 50); - await delay(randTimeMs); + await delay(0); + // const randTimeMs = Math.floor(Math.random() * 10 + 5); + // await delay(randTimeMs); console.log( '[max_concurrency] last_val =', await sql`SELECT nextval('test_seq')` From 53c8cce4bf81e6efb551c99baeba4c818468854f Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Mon, 3 Feb 2025 14:57:43 +0100 Subject: [PATCH 06/22] feat(exports): expand module public API with internal exports Add additional internal exports for Worker, Queries, Queue, and types to provide more comprehensive access to edge worker package internals --- pkgs/edge-worker/mod.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkgs/edge-worker/mod.ts b/pkgs/edge-worker/mod.ts index a422cee..700d2a0 100644 --- a/pkgs/edge-worker/mod.ts +++ b/pkgs/edge-worker/mod.ts @@ -1 +1,7 @@ export { EdgeWorker, type EdgeWorkerConfig } from './src/EdgeWorker.ts'; + +// Internal exports - use with caution +export { Worker, type WorkerConfig } from './src/Worker.ts'; +export { Queries } from './src/Queries.ts'; +export { Queue } from './src/Queue.ts'; +export * as types from './src/types.ts'; From b5e5ca1b48dc0f1cc21f6a7089633b10dda8ddb5 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Mon, 3 Feb 2025 15:13:16 +0100 Subject: [PATCH 07/22] docs(edge-worker): Update Supabase edge worker installation documentation Update connection pooling configuration and add note about transaction mode requirements for edge workers. Modify config.toml example to enable pooler and clarify connection settings. --- .../getting-started/install-edge-worker.mdx | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/pkgs/website/src/content/docs/edge-worker/getting-started/install-edge-worker.mdx b/pkgs/website/src/content/docs/edge-worker/getting-started/install-edge-worker.mdx index e86d8a6..9d8ddd9 100644 --- a/pkgs/website/src/content/docs/edge-worker/getting-started/install-edge-worker.mdx +++ b/pkgs/website/src/content/docs/edge-worker/getting-started/install-edge-worker.mdx @@ -41,9 +41,6 @@ If you haven't installed the CLI yet or need to upgrade, see Supabase's [install Your worker needs to connect to your Supabase project's database. - Since the worker is a long-lived process, we should use Supavisor **Session Mode** connection - (see more in [Connect to your database](https://supabase.com/docs/guides/database/connecting-to-postgres#more-about-connection-pooling) docs). - Edge Worker looks for the connection string in the `EDGE_WORKER_DB_URL` environment variable. For local development, put this in `supabase/functions/.env`: @@ -55,9 +52,9 @@ If you haven't installed the CLI yet or need to upgrade, see Supabase's [install 1. ### Setup Connection Pool Modify the `db.pooler` section in your `supabase/config.toml` file - to enable pooler and switch it to Supavisor **Session Mode**: + to enable pooler and make sure that `db.pool_mode` is set to `"transaction"`. - ```diff lang="toml" + ```diff lang="toml" /pool_mode = "transaction"/ [db.pooler] - enabled = false + enabled = true @@ -65,14 +62,18 @@ If you haven't installed the CLI yet or need to upgrade, see Supabase's [install port = 54329 # Specifies when a server connection can be reused by other clients. # Configure one of the supported pooler modes: `transaction`, `session`. - - pool_mode = "transaction" - + pool_mode = "session" + pool_mode = "transaction" # How many server connections to allow per user/database pair. default_pool_size = 20 # Maximum number of client connections allowed. max_client_conn = 100 ``` + :::note[Transaction Mode] + Edge Worker requires **transaction mode** connection because of Edge + Function early termination. This will change in the future. + ::: + 1. ### Setup Edge Runtime policy We need to change the Edge Runtime policy to `per_worker` to enable From cc01a23fb3a621dc710395592038586485cbdfd0 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Mon, 3 Feb 2025 15:18:15 +0100 Subject: [PATCH 08/22] docs(edge-worker): Update Supabase pooler configuration example Enable database pooler in config example and update code block formatting --- .../docs/edge-worker/getting-started/install-edge-worker.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkgs/website/src/content/docs/edge-worker/getting-started/install-edge-worker.mdx b/pkgs/website/src/content/docs/edge-worker/getting-started/install-edge-worker.mdx index 9d8ddd9..dcc6cb2 100644 --- a/pkgs/website/src/content/docs/edge-worker/getting-started/install-edge-worker.mdx +++ b/pkgs/website/src/content/docs/edge-worker/getting-started/install-edge-worker.mdx @@ -54,7 +54,7 @@ If you haven't installed the CLI yet or need to upgrade, see Supabase's [install Modify the `db.pooler` section in your `supabase/config.toml` file to enable pooler and make sure that `db.pool_mode` is set to `"transaction"`. - ```diff lang="toml" /pool_mode = "transaction"/ + ```diff lang="toml" {8} [db.pooler] - enabled = false + enabled = true From 2133b3abdbe20b218c1bd8a47b8fbb8c59b58ee4 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Mon, 3 Feb 2025 15:22:54 +0100 Subject: [PATCH 09/22] docs(edge-worker): remove troubleshooting documentation Deleted documentation file for edge worker troubleshooting, which contained details about worker message processing issues and connection timeout errors --- .../edge-worker/_drafts/troubleshooting.md | 37 ------------------- 1 file changed, 37 deletions(-) delete mode 100644 pkgs/website/src/content/docs/edge-worker/_drafts/troubleshooting.md diff --git a/pkgs/website/src/content/docs/edge-worker/_drafts/troubleshooting.md b/pkgs/website/src/content/docs/edge-worker/_drafts/troubleshooting.md deleted file mode 100644 index c275e3f..0000000 --- a/pkgs/website/src/content/docs/edge-worker/_drafts/troubleshooting.md +++ /dev/null @@ -1,37 +0,0 @@ ---- -title: Troubleshooting -draft: true -prev: - link: /edge-worker/observability - label: Observability -next: - link: /edge-worker/ideas - label: Ideas ---- - -This page serves as a place to document common issues and their solutions. - -## Worker stopped processing messages - -This is the main issue with current implementation and I believe it is due -to abruptly terminated SQL connections - we use Session Mode after all, -so any abruptly-closed connection will rely on pooler's `idle_timeout` -to close it. If the timeout is too big it can lead to the depletion -of the pool - Workers will create more connections than will be reclained. - -``` -[Error] Timeout-triggered archive failed: PostgresError: DbHandler exited - at ErrorResponse (https://deno.land/x/postgresjs@v3.4.5/src/connection.js:791:26) - at handle (https://deno.land/x/postgresjs@v3.4.5/src/connection.js:477:6) - at data (https://deno.land/x/postgresjs@v3.4.5/src/connection.js:318:9) - at https://deno.land/x/postgresjs@v3.4.5/polyfills.js:138:30 - at Array.forEach () - at call (https://deno.land/x/postgresjs@v3.4.5/polyfills.js:138:16) - at success (https://deno.land/x/postgresjs@v3.4.5/polyfills.js:98:9) - at eventLoopTick (ext:core/01_core.js:168:7) { - name: "PostgresError", - severity_local: "FATAL", - severity: "FATAL", - code: "XX000" -} -``` From 443289505e4fc128044b6b670eb98045d5236af3 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Mon, 3 Feb 2025 16:00:14 +0100 Subject: [PATCH 10/22] chore(version): bump edge-worker package version Incremented package version from 0.0.2 to 0.0.3 in deno.json configuration --- pkgs/edge-worker/deno.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkgs/edge-worker/deno.json b/pkgs/edge-worker/deno.json index 8e214ca..0fa954a 100644 --- a/pkgs/edge-worker/deno.json +++ b/pkgs/edge-worker/deno.json @@ -1,6 +1,6 @@ { "name": "@pgflow/edge-worker", - "version": "0.0.2", + "version": "0.0.3", "license": "MIT", "exports": "./mod.ts", "imports": { From 183bbaae919c163872e61156b7f8810a30966a51 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Wed, 5 Feb 2025 11:23:41 +0100 Subject: [PATCH 11/22] refactor(worker): prevent race conditions in Worker.stop() Adds a check to the Worker.stop() that just returns if worker is stopping or stopped. This should never happen because `onbeforeunload` is called only once, but for some reason in high concurrency scenarios it is called again and triggers transition error on WorkerState (good thing!). --- pkgs/edge-worker/src/Worker.ts | 9 +++++++-- pkgs/edge-worker/src/WorkerLifecycle.ts | 10 +++++++++- pkgs/edge-worker/src/WorkerState.ts | 4 ++++ 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/pkgs/edge-worker/src/Worker.ts b/pkgs/edge-worker/src/Worker.ts index 40076eb..dbeb071 100644 --- a/pkgs/edge-worker/src/Worker.ts +++ b/pkgs/edge-worker/src/Worker.ts @@ -85,7 +85,7 @@ export class Worker { } async startOnlyOnce(workerBootstrap: WorkerBootstrap) { - if (this.lifecycle.isRunning()) { + if (this.lifecycle.isRunning) { this.logger.debug('Worker already running, ignoring start request'); return; } @@ -121,6 +121,11 @@ export class Worker { } async stop() { + // If the worker is already stopping or stopped, do nothing + if (this.lifecycle.isStopping || this.lifecycle.isStopped) { + return; + } + this.lifecycle.transitionToStopping(); try { @@ -150,7 +155,7 @@ export class Worker { * Returns true if worker state is Running and worker was not stopped */ private get isMainLoopActive() { - return this.lifecycle.isRunning() && !this.isAborted; + return this.lifecycle.isRunning && !this.isAborted; } private get abortSignal() { diff --git a/pkgs/edge-worker/src/WorkerLifecycle.ts b/pkgs/edge-worker/src/WorkerLifecycle.ts index 6ca7063..f4d78a4 100644 --- a/pkgs/edge-worker/src/WorkerLifecycle.ts +++ b/pkgs/edge-worker/src/WorkerLifecycle.ts @@ -75,10 +75,18 @@ export class WorkerLifecycle { await this.heartbeat?.send(); } - isRunning(): boolean { + get isRunning() { return this.workerState.isRunning; } + get isStopping() { + return this.workerState.isStopping; + } + + get isStopped() { + return this.workerState.isStopped; + } + transitionToStopping() { this.workerState.transitionTo(States.Stopping); } diff --git a/pkgs/edge-worker/src/WorkerState.ts b/pkgs/edge-worker/src/WorkerState.ts index c478d85..8f870ad 100644 --- a/pkgs/edge-worker/src/WorkerState.ts +++ b/pkgs/edge-worker/src/WorkerState.ts @@ -59,6 +59,10 @@ export class WorkerState { return this.state === States.Stopping; } + get isStopped() { + return this.state === States.Stopped; + } + transitionTo(state: States) { this.logger.debug( `[WorkerState] Starting transition to '${state}' (current state: ${this.state})` From f4952b2826977b764a3a7662f665f22cc4aa6c73 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Wed, 5 Feb 2025 17:26:29 +0100 Subject: [PATCH 12/22] feat(observability): Add SQL debug queries for connection and processing metrics Add two new SQL query files to help diagnose system performance: - debug_connections.sql: Tracks connection states and usage for Supavisor and Postgres - debug_processing_gaps.sql: Provides detailed analysis of message processing times, retries, and performance bottlenecks --- .../sql/queries/debug_connections.sql | 32 ++++++ .../sql/queries/debug_processing_gaps.sql | 99 +++++++++++++++++++ 2 files changed, 131 insertions(+) create mode 100644 pkgs/edge-worker/sql/queries/debug_connections.sql create mode 100644 pkgs/edge-worker/sql/queries/debug_processing_gaps.sql diff --git a/pkgs/edge-worker/sql/queries/debug_connections.sql b/pkgs/edge-worker/sql/queries/debug_connections.sql new file mode 100644 index 0000000..4cf0ce1 --- /dev/null +++ b/pkgs/edge-worker/sql/queries/debug_connections.sql @@ -0,0 +1,32 @@ +WITH pg_conns AS ( + SELECT + state, + count(*) AS count, + (SELECT setting::int FROM pg_settings WHERE name = 'max_connections') AS max_connections + FROM pg_stat_activity + WHERE application_name = 'supavisor' + GROUP BY state +), +client_conns AS ( + SELECT + state, + count(*) AS count, + (SELECT current_setting('supavisor.max_client_connections')::int) AS max_client_connections + FROM supavisor_clients + GROUP BY state +) +SELECT + 'from_supavisor_to_postgres' AS connection_type, + state, + count, + max_connections, + round(count::numeric * 100 / max_connections, 2) AS percent_of_limit +FROM pg_conns +UNION ALL +SELECT + 'to_supavisor' AS connection_type, + state, + count, + max_client_connections, + round(count::numeric * 100 / max_client_connections, 2) AS percent_of_limit +FROM client_conns; diff --git a/pkgs/edge-worker/sql/queries/debug_processing_gaps.sql b/pkgs/edge-worker/sql/queries/debug_processing_gaps.sql new file mode 100644 index 0000000..a4dda38 --- /dev/null +++ b/pkgs/edge-worker/sql/queries/debug_processing_gaps.sql @@ -0,0 +1,99 @@ +-- select count(*) from pgmq.a_max_concurrency; +-- select read_ct, count(*) from pgmq.q_max_concurrency group by read_ct; +select read_ct - 1 as retries_count, count(*) +from pgmq.a_max_concurrency group by read_ct order by read_ct; + +select * from pgmq.metrics('max_concurrency'); + +select * from pgmq.a_max_concurrency limit 10; +select EXTRACT(EPOCH FROM (max(archived_at) - min(enqueued_at))) as total_seconds from pgmq.a_max_concurrency; + +-- Processing time ranges per read_ct +SELECT + read_ct, + COUNT(*) as messages, + round(avg(EXTRACT(EPOCH FROM (archived_at - enqueued_at))), 2) as avg_s, + round(min(EXTRACT(EPOCH FROM (archived_at - enqueued_at))), 2) as min_s, + round(max(EXTRACT(EPOCH FROM (archived_at - enqueued_at))), 2) as max_s +FROM pgmq.a_max_concurrency +GROUP BY read_ct +ORDER BY read_ct; + +-- Total processing time for messages with read_ct 1 or 2 +SELECT + round(sum(EXTRACT(EPOCH FROM (archived_at - enqueued_at))), 2) as total_processing_seconds +FROM pgmq.a_max_concurrency +WHERE read_ct IN (1, 2); + +-- Distribution of processing times in configurable intervals +WITH +interval_conf AS ( + SELECT 1 as interval_seconds +), +processing_times AS ( + SELECT + EXTRACT(EPOCH FROM (archived_at - enqueued_at)) as seconds + FROM pgmq.a_max_concurrency +) +SELECT + ((floor(seconds / interval_seconds) * interval_seconds) || '-' || + (floor(seconds / interval_seconds) * interval_seconds + interval_seconds) || 's')::text as time_bucket, + COUNT(*) as message_count, + round((COUNT(*)::numeric / interval_seconds), 1) as messages_per_second, + SUM(COUNT(*)) OVER (ORDER BY floor(seconds / interval_seconds)) as total_processed_so_far +FROM processing_times, interval_conf +GROUP BY floor(seconds / interval_seconds), interval_seconds +ORDER BY floor(seconds / interval_seconds); + + +-- First let's check the raw distribution +WITH processing_times AS ( + SELECT + EXTRACT(EPOCH FROM (archived_at - enqueued_at)) as seconds + FROM pgmq.a_max_concurrency +) +SELECT + floor(seconds) as seconds, + COUNT(*) as message_count +FROM processing_times +WHERE seconds BETWEEN 165 AND 381 +GROUP BY floor(seconds) +ORDER BY floor(seconds); + + +-- Examine messages around the gap +WITH processing_times AS ( + SELECT + msg_id, + enqueued_at, + archived_at, + EXTRACT(EPOCH FROM (archived_at - enqueued_at)) as processing_time, + read_ct + FROM pgmq.a_max_concurrency +) +SELECT + msg_id, + enqueued_at, + archived_at, + round(processing_time::numeric, 2) as processing_seconds, + read_ct +FROM processing_times +WHERE + processing_time BETWEEN 164 AND 380 +ORDER BY processing_time; + +-- Show processing time distribution by retry count +WITH processing_times AS ( + SELECT + EXTRACT(EPOCH FROM (archived_at - enqueued_at)) as processing_time, + read_ct, + width_bucket(EXTRACT(EPOCH FROM (archived_at - enqueued_at)), 0, 400, 20) as time_bucket + FROM pgmq.a_max_concurrency +) +SELECT + ((time_bucket - 1) * 20) || '-' || (time_bucket * 20) || 's' as time_range, + read_ct, + COUNT(*) as message_count +FROM processing_times +GROUP BY time_bucket, read_ct +ORDER BY time_bucket, read_ct; From 040c057b1c9473e7b1f90e4e07d4e026531a5341 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Wed, 5 Feb 2025 17:29:53 +0100 Subject: [PATCH 13/22] test(performance): Increase message volume and remove unnecessary delay Adjust test configuration by increasing message count and remove redundant delay in sequence increment function --- pkgs/edge-worker/supabase/functions/max_concurrency/index.ts | 4 ---- pkgs/edge-worker/tests/e2e/performance.test.ts | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/pkgs/edge-worker/supabase/functions/max_concurrency/index.ts b/pkgs/edge-worker/supabase/functions/max_concurrency/index.ts index 494b42c..726b25c 100644 --- a/pkgs/edge-worker/supabase/functions/max_concurrency/index.ts +++ b/pkgs/edge-worker/supabase/functions/max_concurrency/index.ts @@ -1,6 +1,5 @@ import { EdgeWorker } from '../_src/EdgeWorker.ts'; import postgres from 'postgres'; -import { delay } from 'jsr:@std/async'; const EDGE_WORKER_DB_URL = Deno.env.get('EDGE_WORKER_DB_URL')!; console.log('EDGE_WORKER_DB_URL', EDGE_WORKER_DB_URL); @@ -8,9 +7,6 @@ console.log('EDGE_WORKER_DB_URL', EDGE_WORKER_DB_URL); const sql = postgres(EDGE_WORKER_DB_URL, { prepare: false }); async function incrementSeq() { - await delay(0); - // const randTimeMs = Math.floor(Math.random() * 10 + 5); - // await delay(randTimeMs); console.log( '[max_concurrency] last_val =', await sql`SELECT nextval('test_seq')` diff --git a/pkgs/edge-worker/tests/e2e/performance.test.ts b/pkgs/edge-worker/tests/e2e/performance.test.ts index d9a69f2..b9bcce4 100644 --- a/pkgs/edge-worker/tests/e2e/performance.test.ts +++ b/pkgs/edge-worker/tests/e2e/performance.test.ts @@ -8,7 +8,7 @@ import { waitForBatchArchiver, } from './_helpers.ts'; -const MESSAGES_TO_SEND = 20000; +const MESSAGES_TO_SEND = 100000; const WORKER_NAME = 'max_concurrency'; Deno.test( From 425056ef11c36a8d98cb50fda2223b1c45a900fe Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Thu, 6 Feb 2025 10:21:32 +0100 Subject: [PATCH 14/22] refactor(edge-worker): consolidate database and utility functions Extract common database and utility functions into a centralized utils module, simplifying imports and reducing code duplication across edge worker functions. Reduce boilerplate by moving shared configurations and utility methods to a single file. - Create new utils.ts with shared SQL connection, sleep function, and random integer generator - Remove redundant database URL logging and connection setup in individual functions - Simplify imports across multiple edge worker function files - Reduce test message count in performance test --- .../supabase/functions/cpu_intensive/index.ts | 7 +------ .../supabase/functions/increment_sequence/index.ts | 10 +++------- .../supabase/functions/max_concurrency/index.ts | 9 +++------ .../supabase/functions/serial_sleep/index.ts | 10 ++-------- pkgs/edge-worker/supabase/functions/utils.ts | 13 +++++++++++++ pkgs/edge-worker/tests/e2e/performance.test.ts | 2 +- 6 files changed, 23 insertions(+), 28 deletions(-) create mode 100644 pkgs/edge-worker/supabase/functions/utils.ts diff --git a/pkgs/edge-worker/supabase/functions/cpu_intensive/index.ts b/pkgs/edge-worker/supabase/functions/cpu_intensive/index.ts index 884e70b..ecbb486 100644 --- a/pkgs/edge-worker/supabase/functions/cpu_intensive/index.ts +++ b/pkgs/edge-worker/supabase/functions/cpu_intensive/index.ts @@ -1,11 +1,6 @@ import { EdgeWorker } from '../_src/EdgeWorker.ts'; -import postgres from 'postgres'; import { crypto } from 'jsr:@std/crypto'; - -const EDGE_WORKER_DB_URL = Deno.env.get('EDGE_WORKER_DB_URL')!; -console.log('EDGE_WORKER_DB_URL', EDGE_WORKER_DB_URL); - -const sql = postgres(EDGE_WORKER_DB_URL, { prepare: false }); +import { sql } from '../utils.ts'; async function cpuIntensiveTask() { let data = new TextEncoder().encode('burn'); diff --git a/pkgs/edge-worker/supabase/functions/increment_sequence/index.ts b/pkgs/edge-worker/supabase/functions/increment_sequence/index.ts index fd19aee..856cf90 100644 --- a/pkgs/edge-worker/supabase/functions/increment_sequence/index.ts +++ b/pkgs/edge-worker/supabase/functions/increment_sequence/index.ts @@ -1,12 +1,8 @@ import { EdgeWorker } from '../_src/EdgeWorker.ts'; -import postgres from 'postgres'; +import { sql } from '../utils.ts'; -const EDGE_WORKER_DB_URL = Deno.env.get('EDGE_WORKER_DB_URL')!; -console.log('EDGE_WORKER_DB_URL', EDGE_WORKER_DB_URL); - -const sql = postgres(EDGE_WORKER_DB_URL); -await sql`CREATE SEQUENCE IF NOT EXISTS test_seq`; -await sql`SELECT pgmq.create('increment_sequence')`; +// await sql`CREATE SEQUENCE IF NOT EXISTS test_seq`; +// await sql`SELECT pgmq.create('increment_sequence')`; async function incrementCounter() { console.log( diff --git a/pkgs/edge-worker/supabase/functions/max_concurrency/index.ts b/pkgs/edge-worker/supabase/functions/max_concurrency/index.ts index 726b25c..12d8de6 100644 --- a/pkgs/edge-worker/supabase/functions/max_concurrency/index.ts +++ b/pkgs/edge-worker/supabase/functions/max_concurrency/index.ts @@ -1,12 +1,9 @@ import { EdgeWorker } from '../_src/EdgeWorker.ts'; -import postgres from 'postgres'; - -const EDGE_WORKER_DB_URL = Deno.env.get('EDGE_WORKER_DB_URL')!; -console.log('EDGE_WORKER_DB_URL', EDGE_WORKER_DB_URL); - -const sql = postgres(EDGE_WORKER_DB_URL, { prepare: false }); +import { sleep, sql } from '../utils.ts'; async function incrementSeq() { + await sleep(50); + console.log( '[max_concurrency] last_val =', await sql`SELECT nextval('test_seq')` diff --git a/pkgs/edge-worker/supabase/functions/serial_sleep/index.ts b/pkgs/edge-worker/supabase/functions/serial_sleep/index.ts index 52a69fd..ab432ef 100644 --- a/pkgs/edge-worker/supabase/functions/serial_sleep/index.ts +++ b/pkgs/edge-worker/supabase/functions/serial_sleep/index.ts @@ -1,17 +1,11 @@ import { EdgeWorker } from '../_src/EdgeWorker.ts'; -import { delay } from 'jsr:@std/async'; -import postgres from 'postgres'; - -const EDGE_WORKER_DB_URL = Deno.env.get('EDGE_WORKER_DB_URL')!; -const sql = postgres(EDGE_WORKER_DB_URL, { prepare: false }); -await sql`CREATE SEQUENCE IF NOT EXISTS test_seq`; -await sql`SELECT pgmq.create('serial_sleep')`; +import { sql, sleep } from '../utils.ts'; const sleep1s = async () => { console.time('Task time'); const lastVal = await sql`SELECT nextval('test_seq')`; console.log('[serial_sleep] lastVal =', lastVal); - await delay(1000); + await sleep(1000); console.timeEnd('Task time'); }; diff --git a/pkgs/edge-worker/supabase/functions/utils.ts b/pkgs/edge-worker/supabase/functions/utils.ts new file mode 100644 index 0000000..147dcac --- /dev/null +++ b/pkgs/edge-worker/supabase/functions/utils.ts @@ -0,0 +1,13 @@ +import postgres from 'postgres'; +import { delay } from '@std/async'; + +const EDGE_WORKER_DB_URL = Deno.env.get('EDGE_WORKER_DB_URL')!; +console.log('EDGE_WORKER_DB_URL', EDGE_WORKER_DB_URL); + +export const sql = postgres(EDGE_WORKER_DB_URL, { prepare: false }); + +export const sleep = delay; + +export function randomInt(min: number, max: number) { + return Math.floor(Math.random() * (max - min + 1)) + min; +} diff --git a/pkgs/edge-worker/tests/e2e/performance.test.ts b/pkgs/edge-worker/tests/e2e/performance.test.ts index b9bcce4..d9a69f2 100644 --- a/pkgs/edge-worker/tests/e2e/performance.test.ts +++ b/pkgs/edge-worker/tests/e2e/performance.test.ts @@ -8,7 +8,7 @@ import { waitForBatchArchiver, } from './_helpers.ts'; -const MESSAGES_TO_SEND = 100000; +const MESSAGES_TO_SEND = 20000; const WORKER_NAME = 'max_concurrency'; Deno.test( From 78385818f90719303c83bf497b643c7a93ada3d9 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Thu, 6 Feb 2025 13:55:03 +0100 Subject: [PATCH 15/22] docs(edge-worker): Update project status documentation Refine documentation for Edge Worker project status, including: - Restructure content for clarity - Update description of connection pool saturation issue - Revise section on high-concurrency challenges - Add planned next steps and architectural improvement notes Provides more focused and precise overview of current system limitations and future improvements --- .../docs/edge-worker/project-status.md | 49 +++++++------------ 1 file changed, 19 insertions(+), 30 deletions(-) diff --git a/pkgs/website/src/content/docs/edge-worker/project-status.md b/pkgs/website/src/content/docs/edge-worker/project-status.md index 6eb1151..c85520e 100644 --- a/pkgs/website/src/content/docs/edge-worker/project-status.md +++ b/pkgs/website/src/content/docs/edge-worker/project-status.md @@ -14,42 +14,30 @@ will be reorganized into logical sub-configurations. The current configuration s considered stable and will change in future releases. ::: -I am actively working on fixing various issues and improving the overall system reliability. +This page is an overview of the issues that are observed but not yet resolved fully. -Here are few that I managed to observe but have not yet fixed: +> I am actively working and communicating with Supabase Edge Runtime team to make +> the worker as robust as possible, so it can be a solid foundation +> for the Workflow Orchestration Engine I am building. -### PostgresError: DbHandler exited +### Connection Pool Saturation Under High Load -When processing a high volume of jobs with increased concurrency (10 or more), -the system occasionally fails with the following error: +**Scenario:** +A large volume of messages is processed continuously at high concurrency (10 or more) with fast handlers (<50ms execution time). -``` -[Error] Timeout-triggered archive failed: PostgresError: DbHandler exited - at ErrorResponse (https://deno.land/x/postgresjs@v3.4.5/src/connection.js:791:26) - at handle (https://deno.land/x/postgresjs@v3.4.5/src/connection.js:477:6) - at data (https://deno.land/x/postgresjs@v3.4.5/src/connection.js:318:9) - at https://deno.land/x/postgresjs@v3.4.5/polyfills.js:138:30 - at Array.forEach () - at call (https://deno.land/x/postgresjs@v3.4.5/polyfills.js:138:16) - at success (https://deno.land/x/postgresjs@v3.4.5/polyfills.js:98:9) - at eventLoopTick (ext:core/01_core.js:168:7) { - name: "PostgresError", - severity_local: "FATAL", - severity: "FATAL", - code: "XX000" -} -``` +**Observed Behavior:** + - Some connections are not properly closed by the worker before being hard-terminated + - This results in zombie connections + - The connection pooler should reclaim these connections after `client_idle_timeout` + - However, if the worker respawns too quickly, the pooler cannot keep up + - This can trigger **"Max client connections reached"** errors + - These errors automatically resolve after zombie connections are reclaimed, but will reoccur if high load persists -This issue appears to be related to abruptly terminated SQL connections in Session Mode. -When this error occurs, it prevents the system from spawning new instances. +**Impact:** +Most users under normal operating conditions will not encounter this behavior. -### Postgres Deadlocks - -In high-concurrency scenarios, I've observed occasional deadlocks. These occur due to -race conditions between message archiving and message pickup -when visibility timeouts expire (educated guess). - -The planned solution involves implementing worker-side retries for SQL queries. +**Next Steps:** +An RFC for updates to Supabase Edge Runtime is in progress. ### Planned Architecture Improvements @@ -57,6 +45,7 @@ Following the resolution of current issues, a major architectural refactor is pl The main goals are to: #### Implement proper dependency injection + - Introduce a factory/builder pattern - Enable easy component swapping, including: - MessageExecutor (required for pgflow orchestrator integration) From a022485bad225391ba348a4b567e692fab43375b Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Thu, 6 Feb 2025 13:59:03 +0100 Subject: [PATCH 16/22] docs(website): Update project status page and navigation label Remove warning emoji from project status title and navigation Add badge to highlight project status importance --- pkgs/website/astro.config.mjs | 3 ++- pkgs/website/src/content/docs/edge-worker/project-status.md | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pkgs/website/astro.config.mjs b/pkgs/website/astro.config.mjs index 8304e75..eecaf0d 100644 --- a/pkgs/website/astro.config.mjs +++ b/pkgs/website/astro.config.mjs @@ -26,7 +26,8 @@ export default defineConfig({ autogenerate: { directory: 'edge-worker/getting-started' }, }, { - label: '⚠️ Project Status', + label: 'Project Status', + badge: { text: 'important', variant: 'caution' }, link: '/edge-worker/project-status', }, ], diff --git a/pkgs/website/src/content/docs/edge-worker/project-status.md b/pkgs/website/src/content/docs/edge-worker/project-status.md index c85520e..22fa110 100644 --- a/pkgs/website/src/content/docs/edge-worker/project-status.md +++ b/pkgs/website/src/content/docs/edge-worker/project-status.md @@ -1,5 +1,5 @@ --- -title: ⚠️ Project Status +title: Project Status --- :::danger[Not ready for production!] From 8ae50ed2d89a7dd1f143f15ca4db2228b6072bfa Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Thu, 6 Feb 2025 16:32:05 +0100 Subject: [PATCH 17/22] style(animation): enhance hero image with scale and glow animations Add dynamic scaling and breathing glow animations for hero image - Replace static hover transform with keyframe animations - Implement scale-up animation from 0.7 to 1 - Create breathing-glow effect with subtle shadow pulsing --- pkgs/website/src/styles/global.css | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/pkgs/website/src/styles/global.css b/pkgs/website/src/styles/global.css index eaff4a8..640a435 100644 --- a/pkgs/website/src/styles/global.css +++ b/pkgs/website/src/styles/global.css @@ -39,10 +39,27 @@ svg .secondary { } .hero img { - transition: all 4s ease; + animation: scale-up 4s ease-in-out forwards, + breathing-glow 8s cubic-bezier(0.4, 0, 0.6, 1) infinite; } -.hero img:hover { - transform: scale(1.2); - filter: drop-shadow(0 0 15px var(--sl-color-accent)); +@keyframes scale-up { + from { + transform: scale(0.7); + } + + to { + transform: scale(1); + } +} + +@keyframes breathing-glow { + 0%, + 100% { + filter: drop-shadow(0 0 0 transparent); + } + + 50% { + filter: drop-shadow(0 0 30px rgba(153, 121, 211, 0.6)); + } } From 15bfa6e583456063d9ffa4e081688b74853eb754 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Thu, 6 Feb 2025 16:38:41 +0100 Subject: [PATCH 18/22] style(css): Update logo glow effect with theme-aware color variables Add theme-specific logo glow color variables and replace hardcoded color with dynamic variable reference --- pkgs/website/src/styles/global.css | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pkgs/website/src/styles/global.css b/pkgs/website/src/styles/global.css index 640a435..52c0326 100644 --- a/pkgs/website/src/styles/global.css +++ b/pkgs/website/src/styles/global.css @@ -29,6 +29,15 @@ --sl-color-black: #ffffff; } +/* glow variables */ +:root { + --logo-glow-color: rgba(153, 121, 211, 0.4); +} + +:root[data-theme='light'] { + --logo-glow-color: rgba(153, 121, 211, 0.7); +} + /* Logo styling */ svg .primary { fill: var(--sl-color-accent); @@ -60,6 +69,6 @@ svg .secondary { } 50% { - filter: drop-shadow(0 0 30px rgba(153, 121, 211, 0.6)); + filter: drop-shadow(0 0 30px var(--logo-glow-color)); } } From 707a200f68f440b97c39fa7b50e9009cd5f83d20 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Thu, 6 Feb 2025 16:40:40 +0100 Subject: [PATCH 19/22] refactor(sql): Enhance debug processing gaps query with retry count and percentile metrics Modify SQL query to: - Add retry count calculation - Adjust processing time calculations - Introduce percentile-based processing time metrics - Include min/max processing time measurements --- .../sql/queries/debug_processing_gaps.sql | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/pkgs/edge-worker/sql/queries/debug_processing_gaps.sql b/pkgs/edge-worker/sql/queries/debug_processing_gaps.sql index a4dda38..66cb7ec 100644 --- a/pkgs/edge-worker/sql/queries/debug_processing_gaps.sql +++ b/pkgs/edge-worker/sql/queries/debug_processing_gaps.sql @@ -10,15 +10,30 @@ select EXTRACT(EPOCH FROM (max(archived_at) - min(enqueued_at))) as total_second -- Processing time ranges per read_ct SELECT - read_ct, + read_ct - 1 as retry_count, COUNT(*) as messages, - round(avg(EXTRACT(EPOCH FROM (archived_at - enqueued_at))), 2) as avg_s, - round(min(EXTRACT(EPOCH FROM (archived_at - enqueued_at))), 2) as min_s, - round(max(EXTRACT(EPOCH FROM (archived_at - enqueued_at))), 2) as max_s + round(avg(EXTRACT(EPOCH FROM (vt - make_interval(secs => 3) - enqueued_at))), 2) as avg_s, + round(min(EXTRACT(EPOCH FROM (vt - make_interval(secs => 3) - enqueued_at))), 2) as min_s, + round(max(EXTRACT(EPOCH FROM (vt - make_interval(secs => 3) - enqueued_at))), 2) as max_s FROM pgmq.a_max_concurrency GROUP BY read_ct ORDER BY read_ct; +-- Processing time percentiles +WITH processing_times AS ( + SELECT archived_at - (vt - make_interval(secs=>3)) as processing_time + FROM pgmq.a_max_concurrency +) +SELECT + ROUND(EXTRACT(epoch FROM percentile_cont(0.50) WITHIN GROUP (ORDER BY processing_time)) * 1000) as p50_ms, + ROUND(EXTRACT(epoch FROM percentile_cont(0.75) WITHIN GROUP (ORDER BY processing_time)) * 1000) as p75_ms, + ROUND(EXTRACT(epoch FROM percentile_cont(0.90) WITHIN GROUP (ORDER BY processing_time)) * 1000) as p90_ms, + ROUND(EXTRACT(epoch FROM percentile_cont(0.95) WITHIN GROUP (ORDER BY processing_time)) * 1000) as p95_ms, + ROUND(EXTRACT(epoch FROM percentile_cont(0.99) WITHIN GROUP (ORDER BY processing_time)) * 1000) as p99_ms, + ROUND(EXTRACT(epoch FROM MIN(processing_time)) * 1000) as min_ms, + ROUND(EXTRACT(epoch FROM MAX(processing_time)) * 1000) as max_ms +FROM processing_times; + -- Total processing time for messages with read_ct 1 or 2 SELECT round(sum(EXTRACT(EPOCH FROM (archived_at - enqueued_at))), 2) as total_processing_seconds @@ -97,3 +112,4 @@ SELECT FROM processing_times GROUP BY time_bucket, read_ct ORDER BY time_bucket, read_ct; + From 9ad1558270455a3cac5c476e714eda9c6ce10ea9 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Thu, 6 Feb 2025 17:12:11 +0100 Subject: [PATCH 20/22] docs(edge-worker): Update getting started documentation with detailed worker output Enhance example logs with more specific worker execution details, including worker ID and task scheduling information --- .../getting-started/create-first-worker.mdx | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pkgs/website/src/content/docs/edge-worker/getting-started/create-first-worker.mdx b/pkgs/website/src/content/docs/edge-worker/getting-started/create-first-worker.mdx index 230dba1..0998461 100644 --- a/pkgs/website/src/content/docs/edge-worker/getting-started/create-first-worker.mdx +++ b/pkgs/website/src/content/docs/edge-worker/getting-started/create-first-worker.mdx @@ -57,7 +57,11 @@ Before starting, please read the [Install Edge Worker](/edge-worker/getting-star curl http://localhost:54321/functions/v1/ ``` - This will boot a new instance and start your worker. + This will boot a new instance and start your worker: + + ```bash frame="none" + [Info] worker_id= [WorkerLifecycle] Ensuring queue 'tasks' exists... + ``` 4. ### Process your first message @@ -75,7 +79,9 @@ Before starting, please read the [Install Edge Worker](/edge-worker/getting-star The message will be processed immediately and you should see the following output: ```bash frame="none" - [Info] Scraping URL... https://example.com/ + [Info] worker_id= [ExecutionController] Scheduling execution of task 1 + + [Info] Scraped website! { url: "https://example.com", status: 200 } ```