Skip to content

Commit

Permalink
return worker group details from connect call
Browse files Browse the repository at this point in the history
  • Loading branch information
nicktrn committed Dec 20, 2024
1 parent 482a98d commit d3386e2
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 12 deletions.
13 changes: 8 additions & 5 deletions apps/webapp/app/routes/api.v1.worker-actions.connect.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import { json, TypedResponse } from "@remix-run/server-runtime";
import {
WorkerApiConnectRequestBody,
WorkerApiConnectResponseBody,
} from "@trigger.dev/worker";
import { WorkerApiConnectRequestBody, WorkerApiConnectResponseBody } from "@trigger.dev/worker";
import { createActionWorkerApiRoute } from "~/services/routeBuilders/apiBuilder.server";

export const action = createActionWorkerApiRoute(
Expand All @@ -11,6 +8,12 @@ export const action = createActionWorkerApiRoute(
},
async ({ authenticatedWorker, body }): Promise<TypedResponse<WorkerApiConnectResponseBody>> => {
await authenticatedWorker.connect(body.metadata);
return json({ ok: true });
return json({
ok: true,
workerGroup: {
type: authenticatedWorker.type,
name: authenticatedWorker.name,
},
});
}
);
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ export class WorkerGroupTokenService extends WithRunEngine {
prisma: this._prisma,
engine: this._engine,
type: WorkerInstanceGroupType.MANAGED,
name: workerGroup.name,
workerGroupId: workerGroup.id,
workerInstanceId: workerInstance.id,
masterQueue: workerGroup.masterQueue,
Expand Down Expand Up @@ -242,6 +243,7 @@ export class WorkerGroupTokenService extends WithRunEngine {
prisma: this._prisma,
engine: this._engine,
type: WorkerInstanceGroupType.UNMANAGED,
name: workerGroup.name,
workerGroupId: workerGroup.id,
workerInstanceId: workerInstance.id,
masterQueue: workerGroup.masterQueue,
Expand Down Expand Up @@ -481,6 +483,7 @@ export type WorkerInstanceEnv = z.infer<typeof WorkerInstanceEnv>;

export type AuthenticatedWorkerInstanceOptions = WithRunEngineOptions<{
type: WorkerInstanceGroupType;
name: string;
workerGroupId: string;
workerInstanceId: string;
masterQueue: string;
Expand All @@ -492,20 +495,22 @@ export type AuthenticatedWorkerInstanceOptions = WithRunEngineOptions<{

export class AuthenticatedWorkerInstance extends WithRunEngine {
readonly type: WorkerInstanceGroupType;
readonly name: string;
readonly workerGroupId: string;
readonly workerInstanceId: string;
readonly masterQueue: string;
readonly environment: RuntimeEnvironment | null;
readonly deploymentId?: string;
readonly backgroundWorkerId?: string;

// FIXME
// FIXME: Required for unmanaged workers
readonly isLatestDeployment = true;

constructor(opts: AuthenticatedWorkerInstanceOptions) {
super({ prisma: opts.prisma, engine: opts.engine });

this.type = opts.type;
this.name = opts.name;
this.workerGroupId = opts.workerGroupId;
this.workerInstanceId = opts.workerInstanceId;
this.masterQueue = opts.masterQueue;
Expand Down Expand Up @@ -715,6 +720,7 @@ export class AuthenticatedWorkerInstance extends WithRunEngine {
if (this.type === WorkerInstanceGroupType.MANAGED) {
return {
type: WorkerInstanceGroupType.MANAGED,
name: this.name,
workerGroupId: this.workerGroupId,
workerInstanceId: this.workerInstanceId,
masterQueue: this.masterQueue,
Expand All @@ -723,6 +729,7 @@ export class AuthenticatedWorkerInstance extends WithRunEngine {

return {
type: WorkerInstanceGroupType.UNMANAGED,
name: this.name,
workerGroupId: this.workerGroupId,
workerInstanceId: this.workerInstanceId,
masterQueue: this.masterQueue,
Expand Down Expand Up @@ -761,12 +768,14 @@ export class AuthenticatedWorkerInstance extends WithRunEngine {
export type WorkerGroupTokenAuthenticationResponse =
| {
type: typeof WorkerInstanceGroupType.MANAGED;
name: string;
workerGroupId: string;
workerInstanceId: string;
masterQueue: string;
}
| {
type: typeof WorkerInstanceGroupType.UNMANAGED;
name: string;
workerGroupId: string;
workerInstanceId: string;
masterQueue: string;
Expand Down
4 changes: 4 additions & 0 deletions packages/worker/src/supervisor/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ export type WorkerApiConnectRequestBody = z.infer<typeof WorkerApiConnectRequest

export const WorkerApiConnectResponseBody = z.object({
ok: z.literal(true),
workerGroup: z.object({
type: z.string(),
name: z.string(),
}),
});
export type WorkerApiConnectResponseBody = z.infer<typeof WorkerApiConnectResponseBody>;

Expand Down
19 changes: 13 additions & 6 deletions packages/worker/src/supervisor/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,17 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
extraHeaders: getDefaultWorkerHeaders(this.opts),
});
this.socket.on("run:notify", ({ version, run }) => {
console.log("[WorkerSession] Received run notification", { version, run });
console.log("[WorkerSession][WS] Received run notification", { version, run });
this.emit("runNotification", { time: new Date(), run });
});
this.socket.on("connect", () => {
console.log("[WorkerSession] Connected to platform");
console.log("[WorkerSession][WS] Connected to platform");
});
this.socket.on("connect_error", (error) => {
console.error("[WorkerSession] Connection error", { error });
console.error("[WorkerSession][WS] Connection error", { error });
});
this.socket.on("disconnect", (reason, description) => {
console.log("[WorkerSession] Disconnected from platform", { reason, description });
console.log("[WorkerSession][WS] Disconnected from platform", { reason, description });
});
}

Expand All @@ -122,10 +122,17 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
});

if (!connect.success) {
console.error("[WorkerSession] Failed to connect via HTTP client", { error: connect.error });
throw new Error("[WorkerSession] Failed to connect via HTTP client");
console.error("[WorkerSession][HTTP] Failed to connect", { error: connect.error });
throw new Error("[WorkerSession][HTTP] Failed to connect");
}

const { workerGroup } = connect.data;

console.log("[WorkerSession][HTTP] Connected to platform", {
type: workerGroup.type,
name: workerGroup.name,
});

this.queueConsumer.start();
this.heartbeatService.start();
this.createSocket();
Expand Down

0 comments on commit d3386e2

Please sign in to comment.