Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: postgres datastore implementation - part 2 #205

Merged
merged 29 commits into from
Dec 21, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
be2497c
feat: change postgres PK type
mfiedorowicz Dec 4, 2024
0199542
feat: reconciler - implement postgres.CreateIngestionLog
mfiedorowicz Dec 4, 2024
da90760
feat: change postgres char to varchar
mfiedorowicz Dec 4, 2024
b469c38
chore: SQL format
mfiedorowicz Dec 5, 2024
6e174f6
Merge branch 'develop' into postgres-db-store-pt-2
mfiedorowicz Dec 9, 2024
013e630
feat: update postgres schemas and queries
mfiedorowicz Dec 17, 2024
ad4918c
chore: regenerate mocks
mfiedorowicz Dec 17, 2024
8bcc199
feat: add branch ID to change set
mfiedorowicz Dec 17, 2024
413b6c2
feat: implement storage of ingestion logs and change sets in postgres…
mfiedorowicz Dec 17, 2024
df235eb
feat: retrieve ingestion logs from postgres
mfiedorowicz Dec 17, 2024
a72bdc4
chore: adjust unit tests
mfiedorowicz Dec 17, 2024
35b87cd
chore(deps): bump golang.org/x/crypto in /diode-server (#200)
dependabot[bot] Dec 12, 2024
19897cc
chore: docker compose development overrides (#201)
ltucker Dec 12, 2024
57d9c37
feat: add branch support (#202)
ltucker Dec 16, 2024
9a3b235
feat: ActionIngestionLog rpc (#203)
ltucker Dec 16, 2024
78d16a0
chore: enable gofumpt lint (#204)
ltucker Dec 16, 2024
e3012cc
chore: gofumpt
mfiedorowicz Dec 17, 2024
c12f1d1
chore: remove redis migrations
mfiedorowicz Dec 17, 2024
c2048dc
chore: remove writing ingestion log to redis
mfiedorowicz Dec 17, 2024
5269c76
fix: adjust unit tests
mfiedorowicz Dec 17, 2024
7228a54
Merge branch 'feat-netbox-assurance' into postgres-db-store-pt-2
mfiedorowicz Dec 17, 2024
394698b
chore: gofumpt
mfiedorowicz Dec 17, 2024
9303d33
Merge branch 'feat-netbox-assurance' into postgres-db-store-pt-2
mfiedorowicz Dec 19, 2024
02f1314
fix: retrieving ingestion logs with or without change sets
mfiedorowicz Dec 19, 2024
d835510
feat: use single repository for ingestion logs and change sets
mfiedorowicz Dec 19, 2024
e8c32fc
feat: add postgres view retrieving ingestion log with latest change s…
mfiedorowicz Dec 20, 2024
bb7fe25
feat: refactor retrieving ingestion logs
mfiedorowicz Dec 20, 2024
0ad52e5
chore: go mod tidy
mfiedorowicz Dec 20, 2024
4145fed
feat: postgres tables with hybrid IDs
mfiedorowicz Dec 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions diode-server/cmd/reconciler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ func main() {

dbURL := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", cfg.PostgresHost, cfg.PostgresPort, cfg.PostgresUser, cfg.PostgresPassword, cfg.PostgresDBName)

if err := runDBMigrations(ctx, s.Logger(), dbURL); err != nil {
s.Logger().Error("failed to run db migrations", "error", err)
os.Exit(1)
if cfg.MigrationEnabled {
if err := runDBMigrations(ctx, s.Logger(), dbURL); err != nil {
s.Logger().Error("failed to run db migrations", "error", err)
os.Exit(1)
}
}

dbPool, err := pgxpool.New(ctx, dbURL)
Expand All @@ -41,10 +43,9 @@ func main() {
}
defer dbPool.Close()

ingestionLogRepo := postgres.NewIngestionLogRepository(dbPool)
changeSetRepo := postgres.NewChangeSetRepository(dbPool)
repository := postgres.NewRepository(dbPool)

ingestionProcessor, err := reconciler.NewIngestionProcessor(ctx, s.Logger(), ingestionLogRepo, changeSetRepo)
ingestionProcessor, err := reconciler.NewIngestionProcessor(ctx, s.Logger(), repository)
if err != nil {
s.Logger().Error("failed to instantiate ingestion processor", "error", err)
os.Exit(1)
Expand All @@ -55,7 +56,7 @@ func main() {
os.Exit(1)
}

gRPCServer, err := reconciler.NewServer(ctx, s.Logger())
gRPCServer, err := reconciler.NewServer(ctx, s.Logger(), repository)
if err != nil {
s.Logger().Error("failed to instantiate gRPC server", "error", err)
os.Exit(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
-- Create the ingestion_logs table
CREATE TABLE IF NOT EXISTS ingestion_logs
(
id SERIAL PRIMARY KEY,
ingestion_log_ksuid CHAR(27) NOT NULL,
id INTEGER PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
ingestion_log_uuid VARCHAR(255) NOT NULL,
data_type VARCHAR(255),
state INTEGER,
request_id VARCHAR(255),
Expand All @@ -21,10 +21,10 @@ CREATE TABLE IF NOT EXISTS ingestion_logs
);

-- Create indices
CREATE INDEX IF NOT EXISTS idx_ingestion_logs_ingestion_log_ksuid ON ingestion_logs(ingestion_log_ksuid);
CREATE INDEX IF NOT EXISTS idx_ingestion_logs_data_type ON ingestion_logs(data_type);
CREATE INDEX IF NOT EXISTS idx_ingestion_logs_state ON ingestion_logs(state);
CREATE INDEX IF NOT EXISTS idx_ingestion_logs_request_id ON ingestion_logs(request_id);
CREATE INDEX IF NOT EXISTS idx_ingestion_logs_ingestion_log_uuid ON ingestion_logs (ingestion_log_uuid);
CREATE INDEX IF NOT EXISTS idx_ingestion_logs_data_type ON ingestion_logs (data_type);
CREATE INDEX IF NOT EXISTS idx_ingestion_logs_state ON ingestion_logs (state);
CREATE INDEX IF NOT EXISTS idx_ingestion_logs_request_id ON ingestion_logs (request_id);

-- +goose Down

Expand Down
60 changes: 37 additions & 23 deletions diode-server/dbstore/postgres/migrations/00002_change_sets.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,59 @@
-- Create the change_sets table
CREATE TABLE IF NOT EXISTS change_sets
(
id SERIAL PRIMARY KEY,
change_set_ksuid CHAR(27) NOT NULL,
ingestion_log_id INTEGER NOT NULL,
branch_name VARCHAR(255),
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
id INTEGER PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
change_set_uuid VARCHAR(255) NOT NULL,
ingestion_log_id INTEGER NOT NULL,
branch_id VARCHAR(255),
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

-- Create indices
CREATE INDEX IF NOT EXISTS idx_change_sets_change_set_ksuid ON change_sets(change_set_ksuid);
CREATE INDEX IF NOT EXISTS idx_change_sets_change_set_uuid ON change_sets (change_set_uuid);

-- Create the changes table
CREATE TABLE IF NOT EXISTS changes
(
id SERIAL PRIMARY KEY,
change_ksuid CHAR(27) NOT NULL,
change_set_id INTEGER NOT NULL,
change_type VARCHAR(50) NOT NULL,
object_type VARCHAR(100) NOT NULL,
object_id INTEGER,
object_version INTEGER,
data JSONB,
id INTEGER PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
change_uuid VARCHAR(255) NOT NULL,
change_set_id INTEGER NOT NULL,
change_type VARCHAR(50) NOT NULL,
object_type VARCHAR(100) NOT NULL,
object_id INTEGER,
object_version INTEGER,
data JSONB,
sequence_number INTEGER,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

-- Create indices
CREATE INDEX IF NOT EXISTS idx_changes_change_ksuid ON changes(change_ksuid);
CREATE INDEX IF NOT EXISTS idx_changes_change_set_id ON changes(change_set_id);
CREATE INDEX IF NOT EXISTS idx_changes_change_type ON changes(change_type);
CREATE INDEX IF NOT EXISTS idx_changes_object_type ON changes(object_type);
CREATE INDEX IF NOT EXISTS idx_changes_change_uuid ON changes (change_uuid);
CREATE INDEX IF NOT EXISTS idx_changes_change_set_id ON changes (change_set_id);
CREATE INDEX IF NOT EXISTS idx_changes_change_type ON changes (change_type);
CREATE INDEX IF NOT EXISTS idx_changes_object_type ON changes (object_type);

-- Add foreign key constraints
ALTER TABLE change_sets ADD CONSTRAINT fk_change_sets_ingestion_logs FOREIGN KEY (ingestion_log_id) REFERENCES ingestion_logs(id);
ALTER TABLE changes ADD CONSTRAINT fk_changes_change_sets FOREIGN KEY (change_set_id) REFERENCES change_sets(id);
ALTER TABLE change_sets
ADD CONSTRAINT fk_change_sets_ingestion_logs FOREIGN KEY (ingestion_log_id) REFERENCES ingestion_logs (id);
ALTER TABLE changes
ADD CONSTRAINT fk_changes_change_sets FOREIGN KEY (change_set_id) REFERENCES change_sets (id);

-- Create a view to join ingestion_logs with aggregated change_set and changes
CREATE VIEW v_ingestion_logs_with_change_set AS
SELECT ingestion_logs.*, row_to_json(change_sets.*) AS change_set, JSON_AGG(changes.* ORDER BY changes.sequence_number ASC) FILTER ( WHERE changes.id IS NOT NULL ) AS changes
FROM ingestion_logs
LEFT JOIN change_sets on ingestion_logs.id = change_sets.ingestion_log_id
LEFT JOIN changes on change_sets.id = changes.change_set_id
GROUP BY ingestion_logs.id, change_sets.id
ORDER BY ingestion_logs.id DESC, change_sets.id DESC;

-- +goose Down

-- Drop the v_ingestion_logs_with_change_sets view
DROP VIEW IF EXISTS v_ingestion_logs_with_change_sets;

-- Drop the changes table
DROP TABLE changes;

Expand Down
4 changes: 2 additions & 2 deletions diode-server/dbstore/postgres/queries/change_sets.sql
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
-- name: CreateChangeSet :one

INSERT INTO change_sets (change_set_ksuid, ingestion_log_id, branch_name)
INSERT INTO change_sets (change_set_uuid, ingestion_log_id, branch_id)
VALUES ($1, $2, $3)
RETURNING *;

-- name: CreateChange :one

INSERT INTO changes (change_ksuid, change_set_id, change_type, object_type, object_id, object_version, data,
INSERT INTO changes (change_uuid, change_set_id, change_type, object_type, object_id, object_version, data,
sequence_number)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING *;
38 changes: 36 additions & 2 deletions diode-server/dbstore/postgres/queries/ingestion_logs.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,38 @@
-- name: CreateIngestionLog :one
INSERT INTO ingestion_logs (ingestion_log_ksuid, data_type, state, request_id, ingestion_ts, producer_app_name,
INSERT INTO ingestion_logs (ingestion_log_uuid, data_type, state, request_id, ingestion_ts, producer_app_name,
producer_app_version, sdk_name, sdk_version, entity, source_metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING *;
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
RETURNING *;

-- name: UpdateIngestionLogStateWithError :exec
UPDATE ingestion_logs
SET state = $2,
error = $3
WHERE id = $1
RETURNING *;

-- name: CountIngestionLogsPerState :many
SELECT state, COUNT(*) AS count
FROM ingestion_logs
GROUP BY state;

-- name: RetrieveIngestionLogs :many
SELECT *
FROM ingestion_logs
WHERE (state = sqlc.narg('state') OR sqlc.narg('state') IS NULL)
AND (data_type = sqlc.narg('data_type') OR sqlc.narg('data_type') IS NULL)
AND (ingestion_ts >= sqlc.narg('ingestion_ts_start') OR sqlc.narg('ingestion_ts_start') IS NULL)
AND (ingestion_ts <= sqlc.narg('ingestion_ts_end') OR sqlc.narg('ingestion_ts_end') IS NULL)
ORDER BY id DESC
LIMIT sqlc.arg('limit') OFFSET sqlc.arg('offset');

-- name: RetrieveIngestionLogsWithChangeSets :many
SELECT v_ingestion_logs_with_change_set.*
FROM v_ingestion_logs_with_change_set
WHERE (v_ingestion_logs_with_change_set.state = sqlc.narg('state') OR sqlc.narg('state') IS NULL)
AND (v_ingestion_logs_with_change_set.data_type = sqlc.narg('data_type') OR sqlc.narg('data_type') IS NULL)
AND (v_ingestion_logs_with_change_set.ingestion_ts >= sqlc.narg('ingestion_ts_start') OR sqlc.narg('ingestion_ts_start') IS NULL)
AND (v_ingestion_logs_with_change_set.ingestion_ts <= sqlc.narg('ingestion_ts_end') OR sqlc.narg('ingestion_ts_end') IS NULL)
ORDER BY v_ingestion_logs_with_change_set.id DESC
LIMIT sqlc.arg('limit') OFFSET sqlc.arg('offset');

43 changes: 0 additions & 43 deletions diode-server/dbstore/postgres/repositories.go

This file was deleted.

Loading
Loading