Skip to content

Commit

Permalink
⚡️ boilerplate to add cloudflare queues
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Bierman committed Jul 16, 2024
1 parent bc20b1f commit e32c992
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 7 deletions.
19 changes: 14 additions & 5 deletions server/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import { Hono } from 'hono';
import { cors } from 'hono/cors';
import { logger } from 'hono/logger';
import { fetchHandler } from 'trpc-playground/handlers/fetch';
import { appRouter } from './routes/trpcRouter';
import { honoTRPCServer } from './trpc/server';
import { cors } from 'hono/cors';
import { securityHeaders } from './middleware/securityHeaders';
import { enforceHttps } from './middleware/enforceHttps';
import router from './routes';
import { CORS_METHODS } from './config';
import { queue } from './queue';

interface Bindings {
export interface Bindings {
[key: string]: any;
DB: IDBDatabase;
JWT_VERIFICATION_KEY: string;
Expand Down Expand Up @@ -48,8 +50,7 @@ app.use('*', async (c, next) => {
});

// SETUP LOGGING
// tRPC is already logging requests, but you can add your own middleware
// app.use('*', logger());
app.use('*', logger());

// SETUP TRPC SERVER
app.use(`${TRPC_API_ENDPOINT}/*`, honoTRPCServer({ router: appRouter }));
Expand All @@ -67,4 +68,12 @@ app.use(TRPC_PLAYGROUND_ENDPOINT, async (c, next) => {
// SET UP HTTP ROUTES
app.route(`${HTTP_ENDPOINT}`, router);

export default app;
// SETUP CLOUDFLARE WORKER WITH EVENT HANDLERS
const worker = {
...app,
fetch: app.fetch,
queue,
};

// EXPORT WORKER
export default worker;
41 changes: 41 additions & 0 deletions server/src/queue/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { MessageBatch } from '@cloudflare/workers-types';
import { Bindings } from '..';

async function handleLogQueue(batch: MessageBatch<Error>, env: Bindings) {
for (const message of batch.messages) {
console.log('Processing log message:', message.body);
// Add your log processing logic here
}
}

const handleQueueDefault = async (
batch: MessageBatch<Error>,
env: Bindings,
) => {
console.error(`No handler found for queue: ${batch.queue}`);
// Optionally, implement a default behavior or error handling
};

// Add more handler functions as needed

// Create a Map for handlers
const queueHandlersMap = new Map<
string,
(batch: MessageBatch<Error>, env: Bindings) => Promise<void>
>([
['log-queue', handleLogQueue],
// Add more handlers here
]);

export async function queue(
batch: MessageBatch<Error>,
env: Bindings,
): Promise<void> {
try {
const handler = queueHandlersMap.get(batch.queue) || handleQueueDefault;
await handler(batch, env);
} catch (error) {
console.error(`Error processing queue: ${batch.queue}`, error);
// Optionally, add more error handling logic here
}
}
17 changes: 15 additions & 2 deletions server/wrangler.toml.example
Original file line number Diff line number Diff line change
@@ -1,14 +1,27 @@
name = "packrat_api"
main = "src/index.ts"
compatibility_date = "2024-03-14"
compatibility_flags = ["nodejs_compat"]
# node_compat = true # Sometimes this is needed for tests
compatibility_flags = ["nodejs_compat"] # Sometimes this is needed for tests
# node_compat = true

[[ d1_databases ]]
binding = "DB"
database_name = "production"
database_id = "3f9677cd-7dd9-4a2c-92c6-be5dbbf47baa"

[[queues.producers]]
queue = "packrat-error-queue"
binding = "ERROR_QUEUE"

[[queues.consumers]]
queue = "packrat-error-queue"
max_batch_size = 100
max_batch_timeout = 30

[[r2_buckets]]
bucket_name = "packrat-error-bucket"
binding = "ERROR_BUCKET"

# Add vars below
# [vars]
# GOOGLE_CLIENT_ID=""
Expand Down

0 comments on commit e32c992

Please sign in to comment.