Skip to content

Commit

Permalink
Merge pull request #2842 from Infisical/misc/add-pg-queue-init-flag
Browse files Browse the repository at this point in the history
misc: added pg queue init flag
  • Loading branch information
maidul98 authored Dec 5, 2024
2 parents ef82c66 + 0a1242d commit 2098bd3
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 93 deletions.
178 changes: 90 additions & 88 deletions backend/src/ee/services/audit-log/audit-log-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export const auditLogQueueServiceFactory = async ({
const appCfg = getConfig();

const pushToLog = async (data: TCreateAuditLogDTO) => {
if (appCfg.USE_PG_QUEUE) {
if (appCfg.USE_PG_QUEUE && appCfg.SHOULD_INIT_PG_QUEUE) {
await queueService.queuePg<QueueName.AuditLog>(QueueJobs.AuditLog, data, {
retryLimit: 10,
retryBackoff: true
Expand All @@ -52,96 +52,98 @@ export const auditLogQueueServiceFactory = async ({
}
};

await queueService.startPg<QueueName.AuditLog>(
QueueJobs.AuditLog,
async ([job]) => {
const { actor, event, ipAddress, projectId, userAgent, userAgentType } = job.data;
let { orgId } = job.data;
const MS_IN_DAY = 24 * 60 * 60 * 1000;
let project;

if (!orgId) {
// it will never be undefined for both org and project id
// TODO(akhilmhdh): use caching here in dal to avoid db calls
project = await projectDAL.findById(projectId as string);
orgId = project.orgId;
}

const plan = await licenseService.getPlan(orgId);
if (plan.auditLogsRetentionDays === 0) {
// skip inserting if audit log retention is 0 meaning its not supported
return;
}
if (appCfg.SHOULD_INIT_PG_QUEUE) {
await queueService.startPg<QueueName.AuditLog>(
QueueJobs.AuditLog,
async ([job]) => {
const { actor, event, ipAddress, projectId, userAgent, userAgentType } = job.data;
let { orgId } = job.data;
const MS_IN_DAY = 24 * 60 * 60 * 1000;
let project;

if (!orgId) {
// it will never be undefined for both org and project id
// TODO(akhilmhdh): use caching here in dal to avoid db calls
project = await projectDAL.findById(projectId as string);
orgId = project.orgId;
}

// For project actions, set TTL to project-level audit log retention config
// This condition ensures that the plan's audit log retention days cannot be bypassed
const ttlInDays =
project?.auditLogsRetentionDays && project.auditLogsRetentionDays < plan.auditLogsRetentionDays
? project.auditLogsRetentionDays
: plan.auditLogsRetentionDays;

const ttl = ttlInDays * MS_IN_DAY;

const auditLog = await auditLogDAL.create({
actor: actor.type,
actorMetadata: actor.metadata,
userAgent,
projectId,
projectName: project?.name,
ipAddress,
orgId,
eventType: event.type,
expiresAt: new Date(Date.now() + ttl),
eventMetadata: event.metadata,
userAgentType
});
const plan = await licenseService.getPlan(orgId);
if (plan.auditLogsRetentionDays === 0) {
// skip inserting if audit log retention is 0 meaning its not supported
return;
}

const logStreams = orgId ? await auditLogStreamDAL.find({ orgId }) : [];
await Promise.allSettled(
logStreams.map(
async ({
url,
encryptedHeadersTag,
encryptedHeadersIV,
encryptedHeadersKeyEncoding,
encryptedHeadersCiphertext
}) => {
const streamHeaders =
encryptedHeadersIV && encryptedHeadersCiphertext && encryptedHeadersTag
? (JSON.parse(
infisicalSymmetricDecrypt({
keyEncoding: encryptedHeadersKeyEncoding as SecretKeyEncoding,
iv: encryptedHeadersIV,
tag: encryptedHeadersTag,
ciphertext: encryptedHeadersCiphertext
})
) as LogStreamHeaders[])
: [];

const headers: RawAxiosRequestHeaders = { "Content-Type": "application/json" };

if (streamHeaders.length)
streamHeaders.forEach(({ key, value }) => {
headers[key] = value;
// For project actions, set TTL to project-level audit log retention config
// This condition ensures that the plan's audit log retention days cannot be bypassed
const ttlInDays =
project?.auditLogsRetentionDays && project.auditLogsRetentionDays < plan.auditLogsRetentionDays
? project.auditLogsRetentionDays
: plan.auditLogsRetentionDays;

const ttl = ttlInDays * MS_IN_DAY;

const auditLog = await auditLogDAL.create({
actor: actor.type,
actorMetadata: actor.metadata,
userAgent,
projectId,
projectName: project?.name,
ipAddress,
orgId,
eventType: event.type,
expiresAt: new Date(Date.now() + ttl),
eventMetadata: event.metadata,
userAgentType
});

const logStreams = orgId ? await auditLogStreamDAL.find({ orgId }) : [];
await Promise.allSettled(
logStreams.map(
async ({
url,
encryptedHeadersTag,
encryptedHeadersIV,
encryptedHeadersKeyEncoding,
encryptedHeadersCiphertext
}) => {
const streamHeaders =
encryptedHeadersIV && encryptedHeadersCiphertext && encryptedHeadersTag
? (JSON.parse(
infisicalSymmetricDecrypt({
keyEncoding: encryptedHeadersKeyEncoding as SecretKeyEncoding,
iv: encryptedHeadersIV,
tag: encryptedHeadersTag,
ciphertext: encryptedHeadersCiphertext
})
) as LogStreamHeaders[])
: [];

const headers: RawAxiosRequestHeaders = { "Content-Type": "application/json" };

if (streamHeaders.length)
streamHeaders.forEach(({ key, value }) => {
headers[key] = value;
});

return request.post(url, auditLog, {
headers,
// request timeout
timeout: AUDIT_LOG_STREAM_TIMEOUT,
// connection timeout
signal: AbortSignal.timeout(AUDIT_LOG_STREAM_TIMEOUT)
});

return request.post(url, auditLog, {
headers,
// request timeout
timeout: AUDIT_LOG_STREAM_TIMEOUT,
// connection timeout
signal: AbortSignal.timeout(AUDIT_LOG_STREAM_TIMEOUT)
});
}
)
);
},
{
batchSize: 1,
workerCount: 30,
pollingIntervalSeconds: 0.5
}
);
}
)
);
},
{
batchSize: 1,
workerCount: 30,
pollingIntervalSeconds: 0.5
}
);
}

queueService.start(QueueName.AuditLog, async (job) => {
const { actor, event, ipAddress, projectId, userAgent, userAgentType } = job.data;
Expand Down
3 changes: 2 additions & 1 deletion backend/src/lib/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ const envSchema = z
HSM_KEY_LABEL: zpStr(z.string().optional()),
HSM_SLOT: z.coerce.number().optional().default(0),

USE_PG_QUEUE: zodStrBool.default("false")
USE_PG_QUEUE: zodStrBool.default("false"),
SHOULD_INIT_PG_QUEUE: zodStrBool.default("false")
})
// To ensure that basic encryption is always possible.
.refine(
Expand Down
13 changes: 9 additions & 4 deletions backend/src/queue/queue-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
TScanFullRepoEventPayload,
TScanPushEventPayload
} from "@app/ee/services/secret-scanning/secret-scanning-queue/secret-scanning-queue-types";
import { getConfig } from "@app/lib/config/env";
import { logger } from "@app/lib/logger";
import {
TFailedIntegrationSyncEmailsPayload,
Expand Down Expand Up @@ -208,11 +209,15 @@ export const queueServiceFactory = (redisUrl: string, dbConnectionUrl: string) =
>;

const initialize = async () => {
await pgBoss.start();
const appCfg = getConfig();
if (appCfg.SHOULD_INIT_PG_QUEUE) {
logger.info("Initializing pg-queue...");
await pgBoss.start();

pgBoss.on("error", (error) => {
logger.error(error, "pg-queue error");
});
pgBoss.on("error", (error) => {
logger.error(error, "pg-queue error");
});
}
};

const start = <T extends QueueName>(
Expand Down

0 comments on commit 2098bd3

Please sign in to comment.