Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add job priority support and prioritize user initiated indexing requests #2051

Merged
merged 1 commit into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ Live reloads are not available in this mode, however, if you use start the serve

#### Using `start:all`

Instead of running `pnpm start:base`, you can alternatively use `pnpm start:all` which also serves a few other realms on other ports--this is convenient if you wish to switch between the app and the tests without having to restart servers. Use the environment variable `WORKER_COUNT` to add additional workers. By default there is 1 worker for each realm server. Here's what is spun up with `start:all`:
Instead of running `pnpm start:base`, you can alternatively use `pnpm start:all` which also serves a few other realms on other ports--this is convenient if you wish to switch between the app and the tests without having to restart servers. Use the environment variable `WORKER_HIGH_PRIORITY_COUNT` to add additional workers that service only user initiated requests and `WORKER_ALL_PRIORITY_COUNT` to add workers that service all jobs (system or user initiated). By default there is 1 all priority worker for each realm server. Here's what is spun up with `start:all`:

| Port | Description | Running `start:all` | Running `start:base` |
| ----- | ------------------------------------------------------------- | ------------------- | -------------------- |
Expand Down Expand Up @@ -123,7 +123,7 @@ When running tests we isolate the database between each test run by actually cre
If you wish to drop the development databases you can execute:

```
pnpm drop-all-dbs
pnpm full-reset
```

You can then run `pnpm migrate up` (with `PGDATABASE` set accordingly if you want to migrate a database other than `boxel`) or just start the realm server (`pnpm start:all`) to create the database again.
Expand Down
25 changes: 15 additions & 10 deletions packages/host/app/lib/browser-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export class BrowserQueue implements QueuePublisher, QueueRunner {
private jobs: {
jobId: number;
jobType: string;
arg: PgPrimitive;
args: PgPrimitive;
notifier: Deferred<any>;
}[] = [];
private types: Map<string, (arg: any) => Promise<any>> = new Map();
Expand Down Expand Up @@ -50,12 +50,17 @@ export class BrowserQueue implements QueuePublisher, QueueRunner {
this.debouncedDrainJobs();
}

async publish<T>(
jobType: string,
_concurrencyGroup: string | null,
_timeout: number,
arg: PgPrimitive,
): Promise<Job<T>> {
async publish<T>({
jobType,
concurrencyGroup: _concurrencyGroup,
timeout: _timeout,
args,
}: {
jobType: string;
concurrencyGroup: string | null;
timeout: number;
args: PgPrimitive;
}): Promise<Job<T>> {
if (this.isDestroyed) {
throw new Error(`Cannot publish job on a destroyed Queue`);
}
Expand All @@ -66,7 +71,7 @@ export class BrowserQueue implements QueuePublisher, QueueRunner {
jobId,
notifier,
jobType,
arg,
args,
});
this.debouncedDrainJobs();
return job;
Expand All @@ -84,7 +89,7 @@ export class BrowserQueue implements QueuePublisher, QueueRunner {
let jobs = [...this.jobs];
this.jobs = [];
for (let workItem of jobs) {
let { jobId, jobType, notifier, arg } = workItem;
let { jobId, jobType, notifier, args } = workItem;
let handler = this.types.get(jobType);
if (!handler) {
// no handler for this job, add it back to the queue
Expand All @@ -96,7 +101,7 @@ export class BrowserQueue implements QueuePublisher, QueueRunner {
this.onBeforeJob(jobId);
}
try {
notifier.fulfill(await handler(arg));
notifier.fulfill(await handler(args));
} catch (e: any) {
notifier.reject(e);
}
Expand Down
16 changes: 16 additions & 0 deletions packages/postgres/migrations/1736950557343_queue-priority.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
exports.up = (pgm) => {
pgm.sql('delete from job_reservations');
pgm.sql('delete from jobs');

pgm.addColumns('jobs', {
priority: { type: 'integer', notNull: true, default: 0 },
});
pgm.createIndex('jobs', 'priority');
};

exports.down = (pgm) => {
pgm.dropIndex('jobs', 'priority');
pgm.dropColumns('jobs', {
priority: 'integer',
});
};
Loading
Loading