Skip to content

Commit

Permalink
refactor: clickhouse migration unkeyed#2 (unkeyed#2135)
Browse files Browse the repository at this point in the history
* feat: clickhouse keyverification tables

* feat(clickhouse): add new file latest_verifications.ts for querying latest verifications from Clickhouse
feat(tinybird): remove unused import 'time' from node:console
feat(tinybird): remove unused functions and pipes related to verifications and active keys
feat(tinybird): add comment indicating completion of changes by @andreas

* [autofix.ci] apply automated fixes

* feat: add ratelimits

* refactor: clickhouse table names

* refactor: replace more pipes

* refactor: replace more pipes

* refactor: replace more pipes

* wip: everything broken

* [autofix.ci] apply automated fixes

* wip

* feat: billing verifications

* wip

* fix: merge conflicts

* feat: ratelimits on clickhouse

* feat: ratelimit logs

* wip

* feat: it builds again

* feat: downmigrations

* feat: downmigrations

* fix: type export

* Update internal/clickhouse/src/ratelimits.ts

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* fix: table engine

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 6, 2024
1 parent aec5dc6 commit 0552549
Show file tree
Hide file tree
Showing 87 changed files with 2,148 additions and 2,422 deletions.
2 changes: 1 addition & 1 deletion Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ tasks:
env:
GOOSE_DRIVER: clickhouse
GOOSE_DBSTRING: "tcp://default:[email protected]:9000"
GOOSE_MIGRATION_DIR: ./apps/agent/pkg/clickhouse/schema
GOOSE_MIGRATION_DIR: ./internal/clickhouse/schema
cmds:
- goose up

Expand Down

This file was deleted.

2 changes: 1 addition & 1 deletion apps/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
"@hono/zod-validator": "^0.2.1",
"@planetscale/database": "^1.16.0",
"@unkey/cache": "workspace:^",
"@unkey/clickhouse-zod": "workspace:^",
"@unkey/clickhouse": "workspace:^",
"@unkey/db": "workspace:^",
"@unkey/encryption": "workspace:^",
"@unkey/error": "workspace:^",
Expand Down
36 changes: 27 additions & 9 deletions apps/api/src/pkg/analytics.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { NoopTinybird, Tinybird } from "@chronark/zod-bird";
import * as ch from "@unkey/clickhouse-zod";
import { ClickHouse } from "@unkey/clickhouse";
import { newId } from "@unkey/id";
import { auditLogSchemaV1, unkeyAuditLogEvents } from "@unkey/schema/src/auditlog";
import { ratelimitSchemaV1 } from "@unkey/schema/src/ratelimit-tinybird";
Expand All @@ -18,7 +18,7 @@ const dateToUnixMilli = z.string().transform((t) => new Date(t.split(" ").at(0)
export class Analytics {
public readonly readClient: Tinybird | NoopTinybird;
public readonly writeClient: Tinybird | NoopTinybird;
private clickhouse: ch.Clickhouse;
private clickhouse: ClickHouse;

constructor(opts: {
tinybirdToken?: string;
Expand All @@ -38,12 +38,12 @@ export class Analytics {
? new Tinybird({ token: opts.tinybirdProxy.token, baseUrl: opts.tinybirdProxy.url })
: this.readClient;

this.clickhouse = opts.clickhouse ? new ch.Client({ url: opts.clickhouse.url }) : new ch.Noop();
this.clickhouse = new ClickHouse({ url: opts.clickhouse?.url });
}

public get insertSdkTelemetry() {
return this.clickhouse.insert({
table: "default.raw_telemetry_sdks_v1",
return this.clickhouse.client.insert({
table: "telemetry.raw_sdks_v1",
schema: z.object({
request_id: z.string(),
time: z.number().int(),
Expand Down Expand Up @@ -107,6 +107,20 @@ export class Analytics {
})),
});
}
public get insertRatelimit() {
return this.clickhouse.client.insert({
table: "ratelimits.raw_ratelimits_v1",
schema: z.object({
request_id: z.string(),
time: z.number().int(),
workspace_id: z.string(),
namespace_id: z.string(),
identifier: z.string(),
passed: z.boolean(),
}),
});
}

//tinybird
public get ingestRatelimit() {
return this.writeClient.buildIngestEndpoint({
Expand All @@ -116,8 +130,8 @@ export class Analytics {
}

public get insertKeyVerification() {
return this.clickhouse.insert({
table: "default.raw_key_verifications_v1",
return this.clickhouse.client.insert({
table: "verifications.raw_key_verifications_v1",
schema: z.object({
request_id: z.string(),
time: z.number().int(),
Expand All @@ -140,8 +154,8 @@ export class Analytics {
}

public get insertApiRequest() {
return this.clickhouse.insert({
table: "default.raw_api_requests_v1",
return this.clickhouse.client.insert({
table: "metrics.raw_api_requests_v1",
schema: z.object({
request_id: z.string(),
time: z.number().int(),
Expand All @@ -158,6 +172,10 @@ export class Analytics {
service_latency: z.number().int(),
user_agent: z.string(),
ip_address: z.string(),
continent: z.string().nullable().default(""),
city: z.string().nullable().default(""),
country: z.string().nullable().default(""),
colo: z.string().nullable().default(""),
}),
});
}
Expand Down
2 changes: 1 addition & 1 deletion apps/api/src/pkg/keys/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ export class KeyService {
}

return [
res.val.pass,
res.val.passed,
{
remaining: res.val.remaining,
limit: ratelimits.default?.limit,
Expand Down
8 changes: 8 additions & 0 deletions apps/api/src/pkg/middleware/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ export function metrics(): MiddlewareHandler<HonoEnv> {
service_latency: Date.now() - c.get("requestStartedAt"),
ip_address: c.req.header("True-Client-IP") ?? c.req.header("CF-Connecting-IP") ?? "",
user_agent: c.req.header("User-Agent") ?? "",
// @ts-ignore - this is a bug in the types
continent: c.req.raw?.cf?.continent,
// @ts-ignore - this is a bug in the types
country: c.req.raw?.cf?.country,
// @ts-ignore - this is a bug in the types
colo: c.req.raw?.cf?.colo,
// @ts-ignore - this is a bug in the types
city: c.req.raw?.cf?.city,
}),
);
}
Expand Down
16 changes: 8 additions & 8 deletions apps/api/src/pkg/ratelimit/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export class AgentRatelimiter implements RateLimiter {
source: "cloudflare",
});
return Ok({
pass: res.success,
passed: res.success,
reset: -1,
current: -1,
remaining: -1,
Expand All @@ -109,7 +109,7 @@ export class AgentRatelimiter implements RateLimiter {
identifier: req.identifier,
mode: req.async ? "async" : "sync",
error: !!res.err,
success: res?.val?.pass,
success: res?.val?.passed,
source: "agent",
});
return res;
Expand All @@ -127,7 +127,7 @@ export class AgentRatelimiter implements RateLimiter {
if (r.err) {
return r;
}
if (!r.val.pass) {
if (!r.val.passed) {
return r;
}
}
Expand All @@ -137,7 +137,7 @@ export class AgentRatelimiter implements RateLimiter {

return Ok({
current: -1,
pass: true,
passed: true,
reset: -1,
remaining: -1,
triggered: null,
Expand All @@ -162,7 +162,7 @@ export class AgentRatelimiter implements RateLimiter {
const cached = this.cache.get(id) ?? { current: 0, reset: 0 };
if (cached.current >= req.limit) {
return Ok({
pass: false,
passed: false,
current: cached.current,
reset,
remaining: 0,
Expand Down Expand Up @@ -224,7 +224,7 @@ export class AgentRatelimiter implements RateLimiter {
if (cached.current + cost > req.limit) {
return Ok({
current: cached.current,
pass: false,
passed: false,
reset,
remaining: req.limit - cached.current,
triggered: req.name,
Expand All @@ -234,7 +234,7 @@ export class AgentRatelimiter implements RateLimiter {
this.setCacheMax(id, cached.current, reset);

return Ok({
pass: true,
passed: true,
current: cached.current,
reset,
remaining: req.limit - cached.current,
Expand Down Expand Up @@ -288,7 +288,7 @@ export class AgentRatelimiter implements RateLimiter {
return Ok({
current: Number(res.limit - res.remaining),
reset: Number(res.reset),
pass: res.success,
passed: res.success,
remaining: Number(res.remaining),
triggered: res.success ? null : req.name,
});
Expand Down
2 changes: 1 addition & 1 deletion apps/api/src/pkg/ratelimit/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export const ratelimitResponseSchema = z.object({
current: z.number(),
remaining: z.number(),
reset: z.number(),
pass: z.boolean(),
passed: z.boolean(),
/**
* The name of the limit that triggered a rejection
*/
Expand Down
4 changes: 2 additions & 2 deletions apps/api/src/pkg/ratelimit/noop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ export class NoopRateLimiter implements RateLimiter {
_c: Context,
_req: RatelimitRequest,
): Promise<Result<RatelimitResponse, RatelimitError>> {
return Ok({ current: 0, pass: true, reset: 0, remaining: 0, triggered: null });
return Ok({ current: 0, passed: true, reset: 0, remaining: 0, triggered: null });
}
public async multiLimit(
_c: Context,
_req: Array<RatelimitRequest>,
): Promise<Result<RatelimitResponse, RatelimitError>> {
return Ok({ current: 0, pass: true, reset: 0, remaining: 0, triggered: null });
return Ok({ current: 0, passed: true, reset: 0, remaining: 0, triggered: null });
}
}
23 changes: 17 additions & 6 deletions apps/api/src/routes/v1_ratelimit_limit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,17 @@ export const registerV1RatelimitLimit = (app: App) =>
});
}
const remaining = Math.max(0, limit - ratelimitResponse.current);

c.executionCtx.waitUntil(
analytics.insertRatelimit({
workspace_id: rootKey.authorizedWorkspaceId,
namespace_id: namespace.id,
request_id: c.get("requestId"),
identifier: req.identifier,
time: Date.now(),
passed: ratelimitResponse.passed,
}),
);
c.executionCtx.waitUntil(
analytics
.ingestRatelimit({
Expand All @@ -353,7 +364,7 @@ export const registerV1RatelimitLimit = (app: App) =>

time: Date.now(),
serviceLatency: -1,
success: ratelimitResponse.pass,
success: ratelimitResponse.passed,
remaining,
config: {
limit,
Expand Down Expand Up @@ -392,12 +403,12 @@ export const registerV1RatelimitLimit = (app: App) =>
id: rootKey.key.id,
},
description: "ratelimit",
event: ratelimitResponse.pass ? "ratelimit.success" : "ratelimit.denied",
event: ratelimitResponse.passed ? "ratelimit.success" : "ratelimit.denied",
meta: {
requestId: c.get("requestId"),
namespacId: namespace.id,
identifier: req.identifier,
success: ratelimitResponse.pass,
success: ratelimitResponse.passed,
},
time: Date.now(),
resources: req.resources ?? [],
Expand All @@ -417,12 +428,12 @@ export const registerV1RatelimitLimit = (app: App) =>
id: rootKey.key.id,
},
description: "ratelimit",
event: ratelimitResponse.pass ? "ratelimit.success" : "ratelimit.denied",
event: ratelimitResponse.passed ? "ratelimit.success" : "ratelimit.denied",
meta: {
requestId: c.get("requestId"),
namespacId: namespace.id,
identifier: req.identifier,
success: ratelimitResponse.pass,
success: ratelimitResponse.passed,
},
time: Date.now(),
resources: req.resources,
Expand All @@ -438,6 +449,6 @@ export const registerV1RatelimitLimit = (app: App) =>
limit,
remaining,
reset: ratelimitResponse.reset,
success: ratelimitResponse.pass,
success: ratelimitResponse.passed,
});
});
Loading

0 comments on commit 0552549

Please sign in to comment.