Skip to content

Commit

Permalink
couple of small fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
chronark committed Dec 6, 2024
1 parent e622204 commit fad7fe5
Show file tree
Hide file tree
Showing 12 changed files with 270 additions and 124 deletions.
6 changes: 3 additions & 3 deletions .github/actions/install/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ runs:
with:
go-version-file: ./apps/agent/go.mod
cache-dependency-path: ./apps/agent/go.sum

- run: go mod download
if: ${{ inputs.go == 'true' }}
shell: bash
Expand All @@ -32,7 +32,7 @@ runs:
- name: Install Task
uses: arduino/setup-task@v2
if: ${{ inputs.go == 'true' }}

- name: Setup Node
if: ${{ inputs.ts == 'true' }}
uses: actions/setup-node@v4
Expand Down Expand Up @@ -69,4 +69,4 @@ runs:
shell: bash
run: |
pnpm install --recursive
npm i -g wrangler
npm i -g wrangler@latest
1 change: 1 addition & 0 deletions .github/workflows/job_clickhouse_migration_preview.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ jobs:
- name: Migrate
run: goose clickhouse "${{ secrets.CLICKHOUSE_URL }}" up
working-directory: internal/clickhouse/schema
e
2 changes: 1 addition & 1 deletion apps/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"@vitest/ui": "^1.6.0",
"typescript": "^5.5.3",
"vitest": "^1.6.0",
"wrangler": "^3.80.5"
"wrangler": "^3.92.0"
},
"dependencies": {
"@axiomhq/js": "1.0.0-rc.2",
Expand Down
13 changes: 12 additions & 1 deletion apps/api/src/pkg/middleware/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ export function metrics(): MiddlewareHandler<HonoEnv> {

let requestBody = await c.req.raw.clone().text();
requestBody = requestBody.replaceAll(/"key":\s*"[a-zA-Z0-9_]+"/g, '"key": "<REDACTED>"');
requestBody = requestBody.replaceAll(
/"plaintext":\s*"[a-zA-Z0-9_]+"/g,
'"plaintext": "<REDACTED>"',
);
const start = performance.now();
const m = {
isolateId: c.get("isolateId"),
Expand Down Expand Up @@ -92,6 +96,13 @@ export function metrics(): MiddlewareHandler<HonoEnv> {
responseHeaders.push(`${k}: ${v}`);
});

let responseBody = await c.res.clone().text();
responseBody = responseBody.replaceAll(/"key":\s*"[a-zA-Z0-9_]+"/g, '"key": "<REDACTED>"');
responseBody = responseBody.replaceAll(
/"plaintext":\s*"[a-zA-Z0-9_]+"/g,
'"plaintext": "<REDACTED>"',
);

c.executionCtx.waitUntil(
analytics.insertApiRequest({
request_id: c.get("requestId"),
Expand All @@ -109,7 +120,7 @@ export function metrics(): MiddlewareHandler<HonoEnv> {
request_body: requestBody,
response_status: c.res.status,
response_headers: responseHeaders,
response_body: await c.res.clone().text(),
response_body: responseBody,
error: m.error ?? "",
service_latency: Date.now() - c.get("requestStartedAt"),
ip_address: c.req.header("True-Client-IP") ?? c.req.header("CF-Connecting-IP") ?? "",
Expand Down
32 changes: 17 additions & 15 deletions apps/api/src/routes/v1_keys_verifyKey.ts
Original file line number Diff line number Diff line change
Expand Up @@ -366,21 +366,23 @@ export const registerV1KeysVerifyKey = (app: App) =>
}
: undefined,
};
c.executionCtx.waitUntil(
// new clickhouse
analytics.insertKeyVerification({
request_id: c.get("requestId"),
time: Date.now(),
workspace_id: val.key.workspaceId,
key_space_id: val.key.keyAuthId,
key_id: val.key.id,
// @ts-expect-error
region: c.req.raw.cf.colo ?? "",
outcome: val.code ?? "VALID",
identity_id: val.identity?.id,
tags: req.tags ?? [],
}),
);
if (val.code) {
c.executionCtx.waitUntil(
// new clickhouse
analytics.insertKeyVerification({
request_id: c.get("requestId"),
time: Date.now(),
workspace_id: val.key.workspaceId,
key_space_id: val.key.keyAuthId,
key_id: val.key.id,
// @ts-expect-error
region: c.req.raw.cf.colo ?? "",
outcome: val.code ?? "",
identity_id: val.identity?.id,
tags: req.tags ?? [],
}),
);
}

return c.json(responseBody);
});
2 changes: 1 addition & 1 deletion apps/logdrain/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@
"devDependencies": {
"@cloudflare/workers-types": "4.20240603.0",
"typescript": "^5.5.3",
"wrangler": "^3.80.5"
"wrangler": "^3.92.0"
}
}
2 changes: 1 addition & 1 deletion apps/workflows/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"devDependencies": {
"@cloudflare/workers-types": "^4.20241022.0",
"typescript": "^5.0.4",
"wrangler": "^3.83.0"
"wrangler": "^3.92.0"
},
"dependencies": {
"@planetscale/database": "^1.19.0",
Expand Down
5 changes: 4 additions & 1 deletion apps/workflows/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import type { Env } from "./lib/env";

export { CountKeys } from "./workflows/count_keys_per_keyspace";
export { RefillRemaining } from "./workflows/refill_keys";

export default {
async scheduled(event: ScheduledEvent, env: Env, _ctx: ExecutionContext) {
console.info(event);
switch (event.cron) {
case "*/5 * * * *": {
case "* * * * *": {
const instance = await env.REFILL_REMAINING.create();
console.info(JSON.stringify({ event, instance }));

Expand Down
72 changes: 39 additions & 33 deletions apps/workflows/src/workflows/count_keys_per_keyspace.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { WorkflowEntrypoint, type WorkflowEvent, type WorkflowStep } from "cloudflare:workers";

import { and, createConnection, eq, isNull, schema, sql } from "../lib/db";
import { and, count, createConnection, eq, isNull, schema } from "../lib/db";
import type { Env } from "../lib/env";

// User-defined params passed to your workflow
Expand All @@ -10,48 +10,54 @@ type Params = {};
// <docs-tag name="workflow-entrypoint">
export class CountKeys extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
const now = event.timestamp.getUTCDate();
const now = event.timestamp.getTime();

const db = createConnection({
host: this.env.DATABASE_HOST,
username: this.env.DATABASE_USERNAME,
password: this.env.DATABASE_PASSWORD,
});

let cursor = "";
let done = false;

do {
const keySpaces = await step.do(`fetch keyspaces - cursor:${cursor}`, async () =>
db.query.keyAuth.findMany({
where: (table, { gt, and, isNull, lt }) =>
while (!done) {
/**
* I know all of this is in a single step, which is stupid and does not use steps as intended.
* But they have a 512 step limit and we need like 30k...
*/
await step.do("fetch keyspaces", async () => {
const keySpaces = await db.query.keyAuth.findMany({
where: (table, { or, and, isNull, lt }) =>
and(
gt(table.id, cursor),
isNull(table.deletedAt),
lt(table.sizeLastUpdatedAt, now - 60_000),
), // if older than 60s
limit: 100,
}),
);

for (const keySpace of keySpaces) {
const count = await db
.select({ count: sql<string>`count(*)` })
.from(schema.keys)
.where(and(eq(schema.keys.keyAuthId, keySpace.id), isNull(schema.keys.deletedAt)));

keySpace.sizeApprox = Number.parseInt(count?.at(0)?.count ?? "0");
keySpace.sizeLastUpdatedAt = Date.now();

await db
.update(schema.keyAuth)
.set({
sizeApprox: keySpace.sizeApprox,
sizeLastUpdatedAt: keySpace.sizeLastUpdatedAt,
})
.where(eq(schema.keyAuth.id, keySpace.id));
}
cursor = keySpaces.at(-1)?.id ?? "";
} while (cursor);
or(isNull(table.sizeLastUpdatedAt), lt(table.sizeLastUpdatedAt, now - 600_000)),
),
orderBy: (table, { asc }) => asc(table.sizeLastUpdatedAt),
limit: 490, // we can do 1000 subrequests and need 2 per keyspace + this requests
});
if (keySpaces.length === 0) {
done = true;
}
console.info(`found ${keySpaces.length} key spaces`);

for (const keySpace of keySpaces) {
const rows = await db
.select({ count: count() })
.from(schema.keys)
.where(and(eq(schema.keys.keyAuthId, keySpace.id), isNull(schema.keys.deletedAt)));

await db
.update(schema.keyAuth)
.set({
sizeApprox: rows.at(0)?.count ?? 0,
sizeLastUpdatedAt: Date.now(),
})
.where(eq(schema.keyAuth.id, keySpace.id));
}
// this just prints on the cf dashboard, we don't use the return value
return { keySpaces: keySpaces.length };
});
}
await step.do("heartbeat", async () => {
await fetch(this.env.HEARTBEAT_URL_COUNT_KEYS);
});
Expand Down
3 changes: 2 additions & 1 deletion apps/workflows/wrangler.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ compatibility_date = "2024-10-22"
enabled = true
head_sampling_rate = 1 # optional. default = 1.


[[workflows]]
# name of your workflow
name = "refill-remaining"
Expand All @@ -26,4 +27,4 @@ binding = "COUNT_KEYS"
class_name = "CountKeys"

[triggers]
crons = ["*/5 * * * *", "0 0 * * *"]
crons = ["* * * * *", "0 0 * * *"]
21 changes: 9 additions & 12 deletions internal/clickhouse/src/verifications.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,15 @@ export function getVerificationsPerHour(ch: Querier) {
SELECT
time,
outcome,
sum(count) as count,
tags
FROM verifications.key_verifications_per_hour_v2
sum(count) as count
FROM verifications.key_verifications_per_hour_v1
WHERE
workspace_id = {workspaceId: String}
AND key_space_id = {keySpaceId: String}
AND time >= fromUnixTimestamp64Milli({start: Int64})
AND time <= fromUnixTimestamp64Milli({end: Int64})
${args.keyId ? "AND key_id = {keyId: String}" : ""}
GROUP BY time, outcome, tags
GROUP BY time, outcome
ORDER BY time ASC
WITH FILL
FROM toStartOfHour(fromUnixTimestamp64Milli({start: Int64}))
Expand All @@ -89,16 +88,15 @@ export function getVerificationsPerDay(ch: Querier) {
SELECT
time,
outcome,
sum(count) as count,
tags
FROM verifications.key_verifications_per_day_v2
sum(count) as count
FROM verifications.key_verifications_per_day_v1
WHERE
workspace_id = {workspaceId: String}
AND key_space_id = {keySpaceId: String}
AND time >= fromUnixTimestamp64Milli({start: Int64})
AND time <= fromUnixTimestamp64Milli({end: Int64})
${args.keyId ? "AND key_id = {keyId: String}" : ""}
GROUP BY time, outcome, tags
GROUP BY time, outcome
ORDER BY time ASC
WITH FILL
FROM toStartOfDay(fromUnixTimestamp64Milli({start: Int64}))
Expand All @@ -116,16 +114,15 @@ export function getVerificationsPerMonth(ch: Querier) {
SELECT
time,
outcome,
sum(count) as count,
tags
FROM verifications.key_verifications_per_month_v2
sum(count) as count
FROM verifications.key_verifications_per_month_v1
WHERE
workspace_id = {workspaceId: String}
AND key_space_id = {keySpaceId: String}
AND time >= fromUnixTimestamp64Milli({start: Int64})
AND time <= fromUnixTimestamp64Milli({end: Int64})
${args.keyId ? "AND key_id = {keyId: String}" : ""}
GROUP BY time, outcome, tags
GROUP BY time, outcome
ORDER BY time ASC
WITH FILL
FROM toStartOfDay(fromUnixTimestamp64Milli({start: Int64}))
Expand Down
Loading

0 comments on commit fad7fe5

Please sign in to comment.