Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve reliability by reverting to transaction mode #12

Merged
merged 22 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d40c7be
refactor(postgres): disable statement preparation across edge worker …
jumski Jan 31, 2025
812ff39
refactor(logging): Improve task execution logging verbosity
jumski Jan 31, 2025
1d3eeb1
chore(deps): update dependencies and adjust Supabase connection pool
jumski Feb 1, 2025
a0a56b4
refactor(queue): replace p-queue with @henrygd/queue
jumski Feb 3, 2025
0d84999
refactor(edge-worker): simplify delay mechanism in max concurrency fu…
jumski Feb 3, 2025
53c8cce
feat(exports): expand module public API with internal exports
jumski Feb 3, 2025
b5e5ca1
docs(edge-worker): Update Supabase edge worker installation documenta…
jumski Feb 3, 2025
cc01a23
docs(edge-worker): Update Supabase pooler configuration example
jumski Feb 3, 2025
2133b3a
docs(edge-worker): remove troubleshooting documentation
jumski Feb 3, 2025
4432895
chore(version): bump edge-worker package version
jumski Feb 3, 2025
183bbaa
refactor(worker): prevent race conditions in Worker.stop()
jumski Feb 5, 2025
f4952b2
feat(observability): Add SQL debug queries for connection and process…
jumski Feb 5, 2025
040c057
test(performance): Increase message volume and remove unnecessary delay
jumski Feb 5, 2025
425056e
refactor(edge-worker): consolidate database and utility functions
jumski Feb 6, 2025
7838581
docs(edge-worker): Update project status documentation
jumski Feb 6, 2025
a022485
docs(website): Update project status page and navigation label
jumski Feb 6, 2025
8ae50ed
style(animation): enhance hero image with scale and glow animations
jumski Feb 6, 2025
15bfa6e
style(css): Update logo glow effect with theme-aware color variables
jumski Feb 6, 2025
707a200
refactor(sql): Enhance debug processing gaps query with retry count a…
jumski Feb 6, 2025
9ad1558
docs(edge-worker): Update getting started documentation with detailed…
jumski Feb 6, 2025
4d5671f
feat(edge-worker): enhance URL scraping with fetch response logging
jumski Feb 6, 2025
0079118
docs(edge-worker): Update web scraping worker getting started guide
jumski Feb 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions pkgs/edge-worker/deno.json
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
{
"name": "@pgflow/edge-worker",
"version": "0.0.2",
"version": "0.0.3",
"license": "MIT",
"exports": "./mod.ts",
"imports": {
"postgres": "npm:[email protected]",
"@std/async": "jsr:@std/async",
"@henrygd/queue": "jsr:@henrygd/queue@^1.0.7",
"@std/assert": "jsr:@std/assert",
"@std/testing/mock": "jsr:@std/testing/mock",
"@std/async": "jsr:@std/async",
"@std/log": "jsr:@std/log@^0.224.13",
"p-queue": "npm:p-queue@^8.0.1"
"@std/testing/mock": "jsr:@std/testing/mock",
"postgres": "npm:[email protected]"
}
}
22 changes: 5 additions & 17 deletions pkgs/edge-worker/deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions pkgs/edge-worker/mod.ts
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
export { EdgeWorker, type EdgeWorkerConfig } from './src/EdgeWorker.ts';

// Internal exports - use with caution
export { Worker, type WorkerConfig } from './src/Worker.ts';
export { Queries } from './src/Queries.ts';
export { Queue } from './src/Queue.ts';
export * as types from './src/types.ts';
32 changes: 32 additions & 0 deletions pkgs/edge-worker/sql/queries/debug_connections.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
WITH pg_conns AS (
SELECT
state,
count(*) AS count,
(SELECT setting::int FROM pg_settings WHERE name = 'max_connections') AS max_connections
FROM pg_stat_activity
WHERE application_name = 'supavisor'
GROUP BY state
),
client_conns AS (
SELECT
state,
count(*) AS count,
(SELECT current_setting('supavisor.max_client_connections')::int) AS max_client_connections
FROM supavisor_clients
GROUP BY state
)
SELECT
'from_supavisor_to_postgres' AS connection_type,
state,
count,
max_connections,
round(count::numeric * 100 / max_connections, 2) AS percent_of_limit
FROM pg_conns
UNION ALL
SELECT
'to_supavisor' AS connection_type,
state,
count,
max_client_connections,
round(count::numeric * 100 / max_client_connections, 2) AS percent_of_limit
FROM client_conns;
115 changes: 115 additions & 0 deletions pkgs/edge-worker/sql/queries/debug_processing_gaps.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
-- select count(*) from pgmq.a_max_concurrency;
-- select read_ct, count(*) from pgmq.q_max_concurrency group by read_ct;
select read_ct - 1 as retries_count, count(*)
from pgmq.a_max_concurrency group by read_ct order by read_ct;

select * from pgmq.metrics('max_concurrency');

select * from pgmq.a_max_concurrency limit 10;
select EXTRACT(EPOCH FROM (max(archived_at) - min(enqueued_at))) as total_seconds from pgmq.a_max_concurrency;

-- Processing time ranges per read_ct
SELECT
read_ct - 1 as retry_count,
COUNT(*) as messages,
round(avg(EXTRACT(EPOCH FROM (vt - make_interval(secs => 3) - enqueued_at))), 2) as avg_s,
round(min(EXTRACT(EPOCH FROM (vt - make_interval(secs => 3) - enqueued_at))), 2) as min_s,
round(max(EXTRACT(EPOCH FROM (vt - make_interval(secs => 3) - enqueued_at))), 2) as max_s
FROM pgmq.a_max_concurrency
GROUP BY read_ct
ORDER BY read_ct;

-- Processing time percentiles
WITH processing_times AS (
SELECT archived_at - (vt - make_interval(secs=>3)) as processing_time
FROM pgmq.a_max_concurrency
)
SELECT
ROUND(EXTRACT(epoch FROM percentile_cont(0.50) WITHIN GROUP (ORDER BY processing_time)) * 1000) as p50_ms,
ROUND(EXTRACT(epoch FROM percentile_cont(0.75) WITHIN GROUP (ORDER BY processing_time)) * 1000) as p75_ms,
ROUND(EXTRACT(epoch FROM percentile_cont(0.90) WITHIN GROUP (ORDER BY processing_time)) * 1000) as p90_ms,
ROUND(EXTRACT(epoch FROM percentile_cont(0.95) WITHIN GROUP (ORDER BY processing_time)) * 1000) as p95_ms,
ROUND(EXTRACT(epoch FROM percentile_cont(0.99) WITHIN GROUP (ORDER BY processing_time)) * 1000) as p99_ms,
ROUND(EXTRACT(epoch FROM MIN(processing_time)) * 1000) as min_ms,
ROUND(EXTRACT(epoch FROM MAX(processing_time)) * 1000) as max_ms
FROM processing_times;

-- Total processing time for messages with read_ct 1 or 2
SELECT
round(sum(EXTRACT(EPOCH FROM (archived_at - enqueued_at))), 2) as total_processing_seconds
FROM pgmq.a_max_concurrency
WHERE read_ct IN (1, 2);

-- Distribution of processing times in configurable intervals
WITH
interval_conf AS (
SELECT 1 as interval_seconds
),
processing_times AS (
SELECT
EXTRACT(EPOCH FROM (archived_at - enqueued_at)) as seconds
FROM pgmq.a_max_concurrency
)
SELECT
((floor(seconds / interval_seconds) * interval_seconds) || '-' ||
(floor(seconds / interval_seconds) * interval_seconds + interval_seconds) || 's')::text as time_bucket,
COUNT(*) as message_count,
round((COUNT(*)::numeric / interval_seconds), 1) as messages_per_second,
SUM(COUNT(*)) OVER (ORDER BY floor(seconds / interval_seconds)) as total_processed_so_far
FROM processing_times, interval_conf
GROUP BY floor(seconds / interval_seconds), interval_seconds
ORDER BY floor(seconds / interval_seconds);


-- First let's check the raw distribution
WITH processing_times AS (
SELECT
EXTRACT(EPOCH FROM (archived_at - enqueued_at)) as seconds
FROM pgmq.a_max_concurrency
)
SELECT
floor(seconds) as seconds,
COUNT(*) as message_count
FROM processing_times
WHERE seconds BETWEEN 165 AND 381
GROUP BY floor(seconds)
ORDER BY floor(seconds);


-- Examine messages around the gap
WITH processing_times AS (
SELECT
msg_id,
enqueued_at,
archived_at,
EXTRACT(EPOCH FROM (archived_at - enqueued_at)) as processing_time,
read_ct
FROM pgmq.a_max_concurrency
)
SELECT
msg_id,
enqueued_at,
archived_at,
round(processing_time::numeric, 2) as processing_seconds,
read_ct
FROM processing_times
WHERE
processing_time BETWEEN 164 AND 380
ORDER BY processing_time;

-- Show processing time distribution by retry count
WITH processing_times AS (
SELECT
EXTRACT(EPOCH FROM (archived_at - enqueued_at)) as processing_time,
read_ct,
width_bucket(EXTRACT(EPOCH FROM (archived_at - enqueued_at)), 0, 400, 20) as time_bucket
FROM pgmq.a_max_concurrency
)
SELECT
((time_bucket - 1) * 20) || '-' || (time_bucket * 20) || 's' as time_range,
read_ct,
COUNT(*) as message_count
FROM processing_times
GROUP BY time_bucket, read_ct
ORDER BY time_bucket, read_ct;

16 changes: 9 additions & 7 deletions pkgs/edge-worker/src/ExecutionController.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import PQueue from 'p-queue';
import { newQueue, type Queue as PromiseQueue } from '@henrygd/queue';
import { MessageExecutor } from './MessageExecutor.ts';
import { Queue } from './Queue.ts';
import { Json } from './types.ts';
Expand All @@ -15,7 +15,7 @@ export interface ExecutionConfig {
export class ExecutionController<MessagePayload extends Json> {
private logger = getLogger('ExecutionController');
private queue: Queue<MessagePayload>;
private pqueue: PQueue;
private promiseQueue: PromiseQueue;
private archiver: BatchArchiver<MessagePayload>;
private signal: AbortSignal;
private retryLimit: number;
Expand All @@ -30,7 +30,7 @@ export class ExecutionController<MessagePayload extends Json> {
this.signal = abortSignal;
this.retryLimit = config.retryLimit;
this.retryDelay = config.retryDelay;
this.pqueue = new PQueue({ concurrency: config.maxConcurrent });
this.promiseQueue = newQueue(config.maxConcurrent);
this.archiver = new BatchArchiver(queue);
}

Expand All @@ -48,11 +48,13 @@ export class ExecutionController<MessagePayload extends Json> {
this.retryDelay
);

this.logger.info(`Starting execution for ${executor.msgId}`);
this.logger.info(`Scheduling execution of task ${executor.msgId}`);

return await this.pqueue.add(async () => {
return await this.promiseQueue.add(async () => {
try {
this.logger.debug(`Executing task ${executor.msgId}...`);
await executor.execute();
this.logger.debug(`Execution successful for ${executor.msgId}`);
} catch (error) {
this.logger.error(`Execution failed for ${executor.msgId}:`, error);
throw error;
Expand All @@ -61,7 +63,7 @@ export class ExecutionController<MessagePayload extends Json> {
}

async awaitCompletion() {
await this.pqueue.onIdle();
await this.archiver.flush();
await this.promiseQueue.done();
// await this.archiver.flush();
}
}
2 changes: 2 additions & 0 deletions pkgs/edge-worker/src/MessageExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ export class MessageExecutor<MessagePayload extends Json> {
// Check if already aborted before starting
this.signal.throwIfAborted();

this.logger.debug(`Executing task ${this.msgId}...`);
await this.messageHandler(this.record.message!);

this.logger.debug(
`Task ${this.msgId} completed successfully, archiving...`
);
await this.queue.archive(this.msgId);
this.logger.debug(`Archived task ${this.msgId} successfully`);

// TODO: uncomment when ready to debug this
// await this.batchArchiver.add(this.msgId);
Expand Down
11 changes: 8 additions & 3 deletions pkgs/edge-worker/src/Worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ export class Worker<MessagePayload extends Json> {

this.sql = postgres(this.config.connectionString, {
max: this.config.maxPgConnections,
prepare: true,
prepare: false,
});

const queue = new Queue<MessagePayload>(this.sql, this.config.queueName);
Expand Down Expand Up @@ -85,7 +85,7 @@ export class Worker<MessagePayload extends Json> {
}

async startOnlyOnce(workerBootstrap: WorkerBootstrap) {
if (this.lifecycle.isRunning()) {
if (this.lifecycle.isRunning) {
this.logger.debug('Worker already running, ignoring start request');
return;
}
Expand Down Expand Up @@ -121,6 +121,11 @@ export class Worker<MessagePayload extends Json> {
}

async stop() {
// If the worker is already stopping or stopped, do nothing
if (this.lifecycle.isStopping || this.lifecycle.isStopped) {
return;
}

this.lifecycle.transitionToStopping();

try {
Expand Down Expand Up @@ -150,7 +155,7 @@ export class Worker<MessagePayload extends Json> {
* Returns true if worker state is Running and worker was not stopped
*/
private get isMainLoopActive() {
return this.lifecycle.isRunning() && !this.isAborted;
return this.lifecycle.isRunning && !this.isAborted;
}

private get abortSignal() {
Expand Down
10 changes: 9 additions & 1 deletion pkgs/edge-worker/src/WorkerLifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,18 @@ export class WorkerLifecycle<MessagePayload extends Json> {
await this.heartbeat?.send();
}

isRunning(): boolean {
get isRunning() {
return this.workerState.isRunning;
}

get isStopping() {
return this.workerState.isStopping;
}

get isStopped() {
return this.workerState.isStopped;
}

transitionToStopping() {
this.workerState.transitionTo(States.Stopping);
}
Expand Down
4 changes: 4 additions & 0 deletions pkgs/edge-worker/src/WorkerState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ export class WorkerState {
return this.state === States.Stopping;
}

get isStopped() {
return this.state === States.Stopped;
}

transitionTo(state: States) {
this.logger.debug(
`[WorkerState] Starting transition to '${state}' (current state: ${this.state})`
Expand Down
4 changes: 2 additions & 2 deletions pkgs/edge-worker/supabase/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ major_version = 15
[db.pooler]
enabled = true
port = 50329
pool_mode = "session"
pool_mode = "transaction"
default_pool_size = 200
max_client_conn = 200
max_client_conn = 250

[db.seed]
enabled = true
Expand Down
7 changes: 1 addition & 6 deletions pkgs/edge-worker/supabase/functions/cpu_intensive/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
import { EdgeWorker } from '../_src/EdgeWorker.ts';
import postgres from 'postgres';
import { crypto } from 'jsr:@std/crypto';

const EDGE_WORKER_DB_URL = Deno.env.get('EDGE_WORKER_DB_URL')!;
console.log('EDGE_WORKER_DB_URL', EDGE_WORKER_DB_URL);

const sql = postgres(EDGE_WORKER_DB_URL, { prepare: true });
import { sql } from '../utils.ts';

async function cpuIntensiveTask() {
let data = new TextEncoder().encode('burn');
Expand Down
Loading
Loading