Skip to content

Commit

Permalink
dev CLI: cleanup deprecated background worker files (fixes #1572)
Browse files Browse the repository at this point in the history
  • Loading branch information
ericallam committed Jan 9, 2025
1 parent 51a054f commit 7b35c4c
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 38 deletions.
6 changes: 6 additions & 0 deletions packages/cli-v3/src/commands/dev.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const DevCommandOptions = CommonCommandOptions.extend({
projectRef: z.string().optional(),
skipUpdateCheck: z.boolean().default(false),
envFile: z.string().optional(),
keepTmpFiles: z.boolean().default(false),
});

export type DevCommandOptions = z.infer<typeof DevCommandOptions>;
Expand All @@ -38,6 +39,10 @@ export function configureDevCommand(program: Command) {
)
.option("--debug-otel", "Enable OpenTelemetry debugging")
.option("--skip-update-check", "Skip checking for @trigger.dev package updates")
.option(
"--keep-tmp-files",
"Keep temporary files after the dev session ends, helpful for debugging"
)
).action(async (options) => {
wrapCommandAction("dev", DevCommandOptions, options, async (opts) => {
await devCommand(opts);
Expand Down Expand Up @@ -151,6 +156,7 @@ async function startDev(options: StartDevOptions) {
initialMode: "local",
dashboardUrl: options.login.dashboardUrl,
showInteractiveDevSession: true,
keepTmpFiles: options.keepTmpFiles,
});
}

Expand Down
43 changes: 13 additions & 30 deletions packages/cli-v3/src/dev/backgroundWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ export class BackgroundWorkerCoordinator {
}

this._backgroundWorkers.set(worker.serverWorker.id, worker);

this.onWorkerRegistered.post({
worker,
id: worker.serverWorker.id,
Expand All @@ -126,14 +127,6 @@ export class BackgroundWorkerCoordinator {
});
}

close() {
for (const worker of this._backgroundWorkers.values()) {
worker.close();
}

this._backgroundWorkers.clear();
}

async executeTaskRun(id: string, payload: TaskRunExecutionPayload, messageId: string) {
const worker = this._backgroundWorkers.get(id);

Expand Down Expand Up @@ -186,11 +179,11 @@ export class BackgroundWorkerCoordinator {
export type BackgroundWorkerOptions = {
env: Record<string, string>;
cwd: string;
stop: () => void;
};

export class BackgroundWorker {
public onTaskRunHeartbeat: Evt<string> = new Evt();
private _onClose: Evt<void> = new Evt();

public deprecated: boolean = false;
public manifest: WorkerManifest | undefined;
Expand All @@ -199,33 +192,27 @@ export class BackgroundWorker {
_taskRunProcesses: Map<string, TaskRunProcess> = new Map();
private _taskRunProcessesBeingKilled: Map<number, TaskRunProcess> = new Map();

private _closed: boolean = false;

constructor(
public build: BuildManifest,
public params: BackgroundWorkerOptions
) {}

deprecate() {
this.deprecated = true;
}

close() {
if (this._closed) {
if (this.deprecated) {
return;
}

this._closed = true;
this.deprecated = true;

this.onTaskRunHeartbeat.detach();
this.#tryStopWorker();
}

// We need to close all the task run processes
for (const taskRunProcess of this._taskRunProcesses.values()) {
taskRunProcess.cleanup(true);
}
#tryStopWorker() {
if (this.deprecated && this._taskRunProcesses.size === 0) {
logger.debug("Worker deprecated, stopping", { outputPath: this.build.outputPath });

// Delete worker files
this._onClose.post();
this.params.stop();
}
}

get inProgressRuns(): Array<string> {
Expand Down Expand Up @@ -301,8 +288,6 @@ export class BackgroundWorker {
throw new Error("Worker not initialized");
}

this._closed = false;

logger.debug(this.#prefixedMessage(payload, "killing current task run process before attempt"));

await this.#killCurrentTaskRunProcessBeforeAttempt(payload.execution.run.id);
Expand Down Expand Up @@ -332,6 +317,8 @@ export class BackgroundWorker {
// Only delete the task run process if the pid matches
if (taskRunProcess?.pid === pid) {
this._taskRunProcesses.delete(payload.execution.run.id);

this.#tryStopWorker();
}

if (pid) {
Expand Down Expand Up @@ -435,10 +422,6 @@ export class BackgroundWorker {
payload: TaskRunExecutionPayload,
messageId: string
): Promise<TaskRunExecutionResult> {
if (this._closed) {
throw new Error("Worker is closed");
}

if (!this.manifest) {
throw new Error("Worker not initialized");
}
Expand Down
20 changes: 16 additions & 4 deletions packages/cli-v3/src/dev/devSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import { type DevCommandOptions } from "../commands/dev.js";
import { eventBus } from "../utilities/eventBus.js";
import { logger } from "../utilities/logger.js";
import { resolveFileSources } from "../utilities/sourceFiles.js";
import { EphemeralDirectory, getTmpDir } from "../utilities/tempDirectories.js";
import { clearTmpDirs, EphemeralDirectory, getTmpDir } from "../utilities/tempDirectories.js";
import { VERSION } from "../version.js";
import { startDevOutput } from "./devOutput.js";
import { startWorkerRuntime } from "./workerRuntime.js";
import { existsSync, mkdirSync, rmSync } from "node:fs";

export type DevSessionOptions = {
name: string | undefined;
Expand All @@ -37,6 +38,7 @@ export type DevSessionOptions = {
rawArgs: DevCommandOptions;
client: CliApiClient;
onErr?: (error: Error) => void;
keepTmpFiles: boolean;
};

export type DevSessionInstance = {
Expand All @@ -49,8 +51,10 @@ export async function startDevSession({
rawArgs,
client,
dashboardUrl,
keepTmpFiles,
}: DevSessionOptions): Promise<DevSessionInstance> {
const destination = getTmpDir(rawConfig.workingDir, "build");
clearTmpDirs(rawConfig.workingDir);
const destination = getTmpDir(rawConfig.workingDir, "build", keepTmpFiles);

const runtime = await startWorkerRuntime({
name,
Expand Down Expand Up @@ -96,7 +100,7 @@ export async function startDevSession({
try {
logger.debug("Updated bundle", { bundle, buildManifest });

await runtime.initializeWorker(buildManifest);
await runtime.initializeWorker(buildManifest, workerDir?.remove ?? (() => {}));
} catch (error) {
if (error instanceof Error) {
eventBus.emit("backgroundWorkerIndexingError", buildManifest, error);
Expand Down Expand Up @@ -124,6 +128,14 @@ export async function startDevSession({
if (bundled) {
eventBus.emit("rebuildStarted", "dev");
}

const outdir = b.initialOptions.outdir;
if (outdir && existsSync(outdir)) {
logger.debug("Removing outdir", { outdir });

rmSync(outdir, { recursive: true, force: true });
mkdirSync(outdir, { recursive: true });
}
});
b.onEnd(async (result: esbuild.BuildResult) => {
const errors = result.errors;
Expand All @@ -141,7 +153,7 @@ export async function startDevSession({
// First bundle, no need to update bundle
bundled = true;
} else {
const workerDir = getTmpDir(rawConfig.workingDir, "build");
const workerDir = getTmpDir(rawConfig.workingDir, "build", keepTmpFiles);

await updateBuild(result, workerDir);
}
Expand Down
9 changes: 7 additions & 2 deletions packages/cli-v3/src/dev/workerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import { VERSION } from "../version.js";

export interface WorkerRuntime {
shutdown(): Promise<void>;
initializeWorker(manifest: BuildManifest): Promise<void>;
initializeWorker(manifest: BuildManifest, stop: () => void): Promise<void>;
}

export type WorkerRuntimeOptions = {
Expand Down Expand Up @@ -167,9 +167,10 @@ class DevWorkerRuntime implements WorkerRuntime {
}
}

async initializeWorker(manifest: BuildManifest, options?: { cwd?: string }): Promise<void> {
async initializeWorker(manifest: BuildManifest, stop: () => void): Promise<void> {
if (this.lastBuild && this.lastBuild.contentHash === manifest.contentHash) {
eventBus.emit("workerSkipped");
stop();
return;
}

Expand All @@ -178,18 +179,21 @@ class DevWorkerRuntime implements WorkerRuntime {
const backgroundWorker = new BackgroundWorker(manifest, {
env,
cwd: this.options.config.workingDir,
stop,
});

await backgroundWorker.initialize();

if (!backgroundWorker.manifest) {
stop();
throw new Error("Could not initialize worker");
}

const issues = validateWorkerManifest(backgroundWorker.manifest);

if (issues.length > 0) {
issues.forEach((issue) => logger.error(issue));
stop();
return;
}

Expand All @@ -213,6 +217,7 @@ class DevWorkerRuntime implements WorkerRuntime {
);

if (!backgroundWorkerRecord.success) {
stop();
throw new Error(backgroundWorkerRecord.error);
}

Expand Down
21 changes: 19 additions & 2 deletions packages/cli-v3/src/utilities/tempDirectories.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@ export function getTmpDir(
const tmpPrefix = path.join(tmpRoot, `${prefix}-`);
const tmpDir = fs.realpathSync(fs.mkdtempSync(tmpPrefix));

let removeDir = keep ? () => {} : () => fs.rmSync(tmpDir, { recursive: true, force: true });
let removeExitListener = keep ? () => {} : onExit(removeDir);
const removeDir = () => {
try {
return fs.rmSync(tmpDir, { recursive: true, force: true });
} catch (e) {
// This sometimes fails on Windows with EBUSY
}
};
const removeExitListener = keep ? () => {} : onExit(removeDir);

return {
path: tmpDir,
Expand All @@ -41,3 +47,14 @@ export function getTmpDir(
},
};
}

export function clearTmpDirs(projectRoot: string | undefined) {
projectRoot ??= process.cwd();
const tmpRoot = path.join(projectRoot, ".trigger", "tmp");

try {
fs.rmSync(tmpRoot, { recursive: true, force: true });
} catch (e) {
// This sometimes fails on Windows with EBUSY
}
}

0 comments on commit 7b35c4c

Please sign in to comment.