Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dev CLI: cleanup deprecated background worker files (fixes #1572) #1595

Merged
merged 2 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/unlucky-meals-develop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"trigger.dev": patch
---

cleanup deprecated background worker files (fixes #1572)
6 changes: 6 additions & 0 deletions packages/cli-v3/src/commands/dev.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const DevCommandOptions = CommonCommandOptions.extend({
projectRef: z.string().optional(),
skipUpdateCheck: z.boolean().default(false),
envFile: z.string().optional(),
keepTmpFiles: z.boolean().default(false),
});

export type DevCommandOptions = z.infer<typeof DevCommandOptions>;
Expand All @@ -38,6 +39,10 @@ export function configureDevCommand(program: Command) {
)
.option("--debug-otel", "Enable OpenTelemetry debugging")
.option("--skip-update-check", "Skip checking for @trigger.dev package updates")
.option(
"--keep-tmp-files",
"Keep temporary files after the dev session ends, helpful for debugging"
)
).action(async (options) => {
wrapCommandAction("dev", DevCommandOptions, options, async (opts) => {
await devCommand(opts);
Expand Down Expand Up @@ -151,6 +156,7 @@ async function startDev(options: StartDevOptions) {
initialMode: "local",
dashboardUrl: options.login.dashboardUrl,
showInteractiveDevSession: true,
keepTmpFiles: options.keepTmpFiles,
});
}

Expand Down
43 changes: 13 additions & 30 deletions packages/cli-v3/src/dev/backgroundWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ export class BackgroundWorkerCoordinator {
}

this._backgroundWorkers.set(worker.serverWorker.id, worker);

this.onWorkerRegistered.post({
worker,
id: worker.serverWorker.id,
Expand All @@ -126,14 +127,6 @@ export class BackgroundWorkerCoordinator {
});
}

close() {
for (const worker of this._backgroundWorkers.values()) {
worker.close();
}

this._backgroundWorkers.clear();
}

async executeTaskRun(id: string, payload: TaskRunExecutionPayload, messageId: string) {
const worker = this._backgroundWorkers.get(id);

Expand Down Expand Up @@ -186,11 +179,11 @@ export class BackgroundWorkerCoordinator {
export type BackgroundWorkerOptions = {
env: Record<string, string>;
cwd: string;
stop: () => void;
};

export class BackgroundWorker {
public onTaskRunHeartbeat: Evt<string> = new Evt();
private _onClose: Evt<void> = new Evt();

public deprecated: boolean = false;
public manifest: WorkerManifest | undefined;
Expand All @@ -199,33 +192,27 @@ export class BackgroundWorker {
_taskRunProcesses: Map<string, TaskRunProcess> = new Map();
private _taskRunProcessesBeingKilled: Map<number, TaskRunProcess> = new Map();

private _closed: boolean = false;

constructor(
public build: BuildManifest,
public params: BackgroundWorkerOptions
) {}

deprecate() {
this.deprecated = true;
}

close() {
if (this._closed) {
if (this.deprecated) {
return;
}

this._closed = true;
this.deprecated = true;

this.onTaskRunHeartbeat.detach();
this.#tryStopWorker();
}

// We need to close all the task run processes
for (const taskRunProcess of this._taskRunProcesses.values()) {
taskRunProcess.cleanup(true);
}
#tryStopWorker() {
if (this.deprecated && this._taskRunProcesses.size === 0) {
logger.debug("Worker deprecated, stopping", { outputPath: this.build.outputPath });

// Delete worker files
this._onClose.post();
this.params.stop();
}
}

get inProgressRuns(): Array<string> {
Expand Down Expand Up @@ -301,8 +288,6 @@ export class BackgroundWorker {
throw new Error("Worker not initialized");
}

this._closed = false;

logger.debug(this.#prefixedMessage(payload, "killing current task run process before attempt"));

await this.#killCurrentTaskRunProcessBeforeAttempt(payload.execution.run.id);
Expand Down Expand Up @@ -332,6 +317,8 @@ export class BackgroundWorker {
// Only delete the task run process if the pid matches
if (taskRunProcess?.pid === pid) {
this._taskRunProcesses.delete(payload.execution.run.id);

this.#tryStopWorker();
}

if (pid) {
Expand Down Expand Up @@ -435,10 +422,6 @@ export class BackgroundWorker {
payload: TaskRunExecutionPayload,
messageId: string
): Promise<TaskRunExecutionResult> {
if (this._closed) {
throw new Error("Worker is closed");
}

if (!this.manifest) {
throw new Error("Worker not initialized");
}
Expand Down
20 changes: 16 additions & 4 deletions packages/cli-v3/src/dev/devSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import { type DevCommandOptions } from "../commands/dev.js";
import { eventBus } from "../utilities/eventBus.js";
import { logger } from "../utilities/logger.js";
import { resolveFileSources } from "../utilities/sourceFiles.js";
import { EphemeralDirectory, getTmpDir } from "../utilities/tempDirectories.js";
import { clearTmpDirs, EphemeralDirectory, getTmpDir } from "../utilities/tempDirectories.js";
import { VERSION } from "../version.js";
import { startDevOutput } from "./devOutput.js";
import { startWorkerRuntime } from "./workerRuntime.js";
import { existsSync, mkdirSync, rmSync } from "node:fs";

export type DevSessionOptions = {
name: string | undefined;
Expand All @@ -37,6 +38,7 @@ export type DevSessionOptions = {
rawArgs: DevCommandOptions;
client: CliApiClient;
onErr?: (error: Error) => void;
keepTmpFiles: boolean;
};

export type DevSessionInstance = {
Expand All @@ -49,8 +51,10 @@ export async function startDevSession({
rawArgs,
client,
dashboardUrl,
keepTmpFiles,
}: DevSessionOptions): Promise<DevSessionInstance> {
const destination = getTmpDir(rawConfig.workingDir, "build");
clearTmpDirs(rawConfig.workingDir);
const destination = getTmpDir(rawConfig.workingDir, "build", keepTmpFiles);

const runtime = await startWorkerRuntime({
name,
Expand Down Expand Up @@ -96,7 +100,7 @@ export async function startDevSession({
try {
logger.debug("Updated bundle", { bundle, buildManifest });

await runtime.initializeWorker(buildManifest);
await runtime.initializeWorker(buildManifest, workerDir?.remove ?? (() => {}));
} catch (error) {
if (error instanceof Error) {
eventBus.emit("backgroundWorkerIndexingError", buildManifest, error);
Expand Down Expand Up @@ -124,6 +128,14 @@ export async function startDevSession({
if (bundled) {
eventBus.emit("rebuildStarted", "dev");
}

const outdir = b.initialOptions.outdir;
if (outdir && existsSync(outdir)) {
logger.debug("Removing outdir", { outdir });

rmSync(outdir, { recursive: true, force: true });
mkdirSync(outdir, { recursive: true });
}
});
b.onEnd(async (result: esbuild.BuildResult) => {
const errors = result.errors;
Expand All @@ -141,7 +153,7 @@ export async function startDevSession({
// First bundle, no need to update bundle
bundled = true;
} else {
const workerDir = getTmpDir(rawConfig.workingDir, "build");
const workerDir = getTmpDir(rawConfig.workingDir, "build", keepTmpFiles);

await updateBuild(result, workerDir);
}
Expand Down
9 changes: 7 additions & 2 deletions packages/cli-v3/src/dev/workerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import { VERSION } from "../version.js";

export interface WorkerRuntime {
shutdown(): Promise<void>;
initializeWorker(manifest: BuildManifest): Promise<void>;
initializeWorker(manifest: BuildManifest, stop: () => void): Promise<void>;
}

export type WorkerRuntimeOptions = {
Expand Down Expand Up @@ -167,9 +167,10 @@ class DevWorkerRuntime implements WorkerRuntime {
}
}

async initializeWorker(manifest: BuildManifest, options?: { cwd?: string }): Promise<void> {
async initializeWorker(manifest: BuildManifest, stop: () => void): Promise<void> {
if (this.lastBuild && this.lastBuild.contentHash === manifest.contentHash) {
eventBus.emit("workerSkipped");
stop();
return;
}

Expand All @@ -178,18 +179,21 @@ class DevWorkerRuntime implements WorkerRuntime {
const backgroundWorker = new BackgroundWorker(manifest, {
env,
cwd: this.options.config.workingDir,
stop,
});

await backgroundWorker.initialize();

if (!backgroundWorker.manifest) {
stop();
throw new Error("Could not initialize worker");
}

const issues = validateWorkerManifest(backgroundWorker.manifest);

if (issues.length > 0) {
issues.forEach((issue) => logger.error(issue));
stop();
return;
}

Expand All @@ -213,6 +217,7 @@ class DevWorkerRuntime implements WorkerRuntime {
);

if (!backgroundWorkerRecord.success) {
stop();
throw new Error(backgroundWorkerRecord.error);
}

Expand Down
21 changes: 19 additions & 2 deletions packages/cli-v3/src/utilities/tempDirectories.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@ export function getTmpDir(
const tmpPrefix = path.join(tmpRoot, `${prefix}-`);
const tmpDir = fs.realpathSync(fs.mkdtempSync(tmpPrefix));

let removeDir = keep ? () => {} : () => fs.rmSync(tmpDir, { recursive: true, force: true });
let removeExitListener = keep ? () => {} : onExit(removeDir);
const removeDir = () => {
try {
return fs.rmSync(tmpDir, { recursive: true, force: true });
} catch (e) {
// This sometimes fails on Windows with EBUSY
}
};
const removeExitListener = keep ? () => {} : onExit(removeDir);

return {
path: tmpDir,
Expand All @@ -41,3 +47,14 @@ export function getTmpDir(
},
};
}

export function clearTmpDirs(projectRoot: string | undefined) {
projectRoot ??= process.cwd();
const tmpRoot = path.join(projectRoot, ".trigger", "tmp");

try {
fs.rmSync(tmpRoot, { recursive: true, force: true });
} catch (e) {
// This sometimes fails on Windows with EBUSY
}
}
Loading