Skip to content

Commit

Permalink
feat: postgres datastore implementation - part 2 (#205)
Browse files Browse the repository at this point in the history
Signed-off-by: Michal Fiedorowicz <[email protected]>
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Luke Tucker <[email protected]>
  • Loading branch information
3 people authored Dec 21, 2024
1 parent 3b38a6a commit 3f46f8a
Show file tree
Hide file tree
Showing 33 changed files with 1,432 additions and 1,500 deletions.
15 changes: 8 additions & 7 deletions diode-server/cmd/reconciler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ func main() {

dbURL := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", cfg.PostgresHost, cfg.PostgresPort, cfg.PostgresUser, cfg.PostgresPassword, cfg.PostgresDBName)

if err := runDBMigrations(ctx, s.Logger(), dbURL); err != nil {
s.Logger().Error("failed to run db migrations", "error", err)
os.Exit(1)
if cfg.MigrationEnabled {
if err := runDBMigrations(ctx, s.Logger(), dbURL); err != nil {
s.Logger().Error("failed to run db migrations", "error", err)
os.Exit(1)
}
}

dbPool, err := pgxpool.New(ctx, dbURL)
Expand All @@ -41,10 +43,9 @@ func main() {
}
defer dbPool.Close()

ingestionLogRepo := postgres.NewIngestionLogRepository(dbPool)
changeSetRepo := postgres.NewChangeSetRepository(dbPool)
repository := postgres.NewRepository(dbPool)

ingestionProcessor, err := reconciler.NewIngestionProcessor(ctx, s.Logger(), ingestionLogRepo, changeSetRepo)
ingestionProcessor, err := reconciler.NewIngestionProcessor(ctx, s.Logger(), repository)
if err != nil {
s.Logger().Error("failed to instantiate ingestion processor", "error", err)
os.Exit(1)
Expand All @@ -55,7 +56,7 @@ func main() {
os.Exit(1)
}

gRPCServer, err := reconciler.NewServer(ctx, s.Logger())
gRPCServer, err := reconciler.NewServer(ctx, s.Logger(), repository)
if err != nil {
s.Logger().Error("failed to instantiate gRPC server", "error", err)
os.Exit(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
-- Create the ingestion_logs table
CREATE TABLE IF NOT EXISTS ingestion_logs
(
id SERIAL PRIMARY KEY,
ingestion_log_ksuid CHAR(27) NOT NULL,
id INTEGER PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
external_id VARCHAR(255) NOT NULL,
data_type VARCHAR(255),
state INTEGER,
request_id VARCHAR(255),
Expand All @@ -21,10 +21,10 @@ CREATE TABLE IF NOT EXISTS ingestion_logs
);

-- Create indices
CREATE INDEX IF NOT EXISTS idx_ingestion_logs_ingestion_log_ksuid ON ingestion_logs(ingestion_log_ksuid);
CREATE INDEX IF NOT EXISTS idx_ingestion_logs_data_type ON ingestion_logs(data_type);
CREATE INDEX IF NOT EXISTS idx_ingestion_logs_state ON ingestion_logs(state);
CREATE INDEX IF NOT EXISTS idx_ingestion_logs_request_id ON ingestion_logs(request_id);
CREATE INDEX IF NOT EXISTS idx_ingestion_logs_external_id ON ingestion_logs (external_id);
CREATE INDEX IF NOT EXISTS idx_ingestion_logs_data_type ON ingestion_logs (data_type);
CREATE INDEX IF NOT EXISTS idx_ingestion_logs_state ON ingestion_logs (state);
CREATE INDEX IF NOT EXISTS idx_ingestion_logs_request_id ON ingestion_logs (request_id);

-- +goose Down

Expand Down
63 changes: 40 additions & 23 deletions diode-server/dbstore/postgres/migrations/00002_change_sets.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,62 @@
-- Create the change_sets table
CREATE TABLE IF NOT EXISTS change_sets
(
id SERIAL PRIMARY KEY,
change_set_ksuid CHAR(27) NOT NULL,
ingestion_log_id INTEGER NOT NULL,
branch_name VARCHAR(255),
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
id INTEGER PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
external_id VARCHAR(255) NOT NULL,
ingestion_log_id INTEGER NOT NULL,
branch_id VARCHAR(255),
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

-- Create indices
CREATE INDEX IF NOT EXISTS idx_change_sets_change_set_ksuid ON change_sets(change_set_ksuid);
CREATE INDEX IF NOT EXISTS idx_change_sets_external_id ON change_sets (external_id);
CREATE INDEX IF NOT EXISTS idx_change_sets_ingestion_log_id ON change_sets (ingestion_log_id);

-- Create the changes table
CREATE TABLE IF NOT EXISTS changes
(
id SERIAL PRIMARY KEY,
change_ksuid CHAR(27) NOT NULL,
change_set_id INTEGER NOT NULL,
change_type VARCHAR(50) NOT NULL,
object_type VARCHAR(100) NOT NULL,
object_id INTEGER,
object_version INTEGER,
data JSONB,
id INTEGER PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
external_id VARCHAR(255) NOT NULL,
change_set_id INTEGER NOT NULL,
change_type VARCHAR(50) NOT NULL,
object_type VARCHAR(100) NOT NULL,
object_id INTEGER,
object_version INTEGER,
data JSONB,
sequence_number INTEGER,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

-- Create indices
CREATE INDEX IF NOT EXISTS idx_changes_change_ksuid ON changes(change_ksuid);
CREATE INDEX IF NOT EXISTS idx_changes_change_set_id ON changes(change_set_id);
CREATE INDEX IF NOT EXISTS idx_changes_change_type ON changes(change_type);
CREATE INDEX IF NOT EXISTS idx_changes_object_type ON changes(object_type);
CREATE INDEX IF NOT EXISTS idx_changes_external_id ON changes (external_id);
CREATE INDEX IF NOT EXISTS idx_changes_change_set_id ON changes (change_set_id);
CREATE INDEX IF NOT EXISTS idx_changes_change_type ON changes (change_type);
CREATE INDEX IF NOT EXISTS idx_changes_object_type ON changes (object_type);

-- Add foreign key constraints
ALTER TABLE change_sets ADD CONSTRAINT fk_change_sets_ingestion_logs FOREIGN KEY (ingestion_log_id) REFERENCES ingestion_logs(id);
ALTER TABLE changes ADD CONSTRAINT fk_changes_change_sets FOREIGN KEY (change_set_id) REFERENCES change_sets(id);
ALTER TABLE change_sets
ADD CONSTRAINT fk_change_sets_ingestion_logs FOREIGN KEY (ingestion_log_id) REFERENCES ingestion_logs (id);
ALTER TABLE changes
ADD CONSTRAINT fk_changes_change_sets FOREIGN KEY (change_set_id) REFERENCES change_sets (id);

-- Create a view to join ingestion_logs with aggregated change_set and changes
CREATE VIEW v_ingestion_logs_with_change_set AS
SELECT ingestion_logs.*,
row_to_json(change_sets.*) AS change_set,
JSON_AGG(changes.* ORDER BY changes.sequence_number ASC) FILTER ( WHERE changes.id IS NOT NULL ) AS changes
FROM ingestion_logs
LEFT JOIN change_sets on ingestion_logs.id = change_sets.ingestion_log_id
LEFT JOIN changes on change_sets.id = changes.change_set_id
GROUP BY ingestion_logs.id, change_sets.id
ORDER BY ingestion_logs.id DESC, change_sets.id DESC;

-- +goose Down

-- Drop the v_ingestion_logs_with_change_sets view
DROP VIEW IF EXISTS v_ingestion_logs_with_change_sets;

-- Drop the changes table
DROP TABLE changes;

Expand Down
4 changes: 2 additions & 2 deletions diode-server/dbstore/postgres/queries/change_sets.sql
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
-- name: CreateChangeSet :one

INSERT INTO change_sets (change_set_ksuid, ingestion_log_id, branch_name)
INSERT INTO change_sets (external_id, ingestion_log_id, branch_id)
VALUES ($1, $2, $3)
RETURNING *;

-- name: CreateChange :one

INSERT INTO changes (change_ksuid, change_set_id, change_type, object_type, object_id, object_version, data,
INSERT INTO changes (external_id, change_set_id, change_type, object_type, object_id, object_version, data,
sequence_number)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING *;
39 changes: 37 additions & 2 deletions diode-server/dbstore/postgres/queries/ingestion_logs.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,39 @@
-- name: CreateIngestionLog :one
INSERT INTO ingestion_logs (ingestion_log_ksuid, data_type, state, request_id, ingestion_ts, producer_app_name,
INSERT INTO ingestion_logs (external_id, data_type, state, request_id, ingestion_ts, producer_app_name,
producer_app_version, sdk_name, sdk_version, entity, source_metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING *;
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
RETURNING *;

-- name: UpdateIngestionLogStateWithError :exec
UPDATE ingestion_logs
SET state = $2,
error = $3
WHERE id = $1
RETURNING *;

-- name: CountIngestionLogsPerState :many
SELECT state, COUNT(*) AS count
FROM ingestion_logs
GROUP BY state;

-- name: RetrieveIngestionLogs :many
SELECT *
FROM ingestion_logs
WHERE (state = sqlc.narg('state') OR sqlc.narg('state') IS NULL)
AND (data_type = sqlc.narg('data_type') OR sqlc.narg('data_type') IS NULL)
AND (ingestion_ts >= sqlc.narg('ingestion_ts_start') OR sqlc.narg('ingestion_ts_start') IS NULL)
AND (ingestion_ts <= sqlc.narg('ingestion_ts_end') OR sqlc.narg('ingestion_ts_end') IS NULL)
ORDER BY id DESC
LIMIT sqlc.arg('limit') OFFSET sqlc.arg('offset');

-- name: RetrieveIngestionLogsWithChangeSets :many
SELECT v_ingestion_logs_with_change_set.*
FROM v_ingestion_logs_with_change_set
WHERE (v_ingestion_logs_with_change_set.state = sqlc.narg('state') OR sqlc.narg('state') IS NULL)
AND (v_ingestion_logs_with_change_set.data_type = sqlc.narg('data_type') OR sqlc.narg('data_type') IS NULL)
AND (v_ingestion_logs_with_change_set.ingestion_ts >= sqlc.narg('ingestion_ts_start') OR
sqlc.narg('ingestion_ts_start') IS NULL)
AND (v_ingestion_logs_with_change_set.ingestion_ts <= sqlc.narg('ingestion_ts_end') OR
sqlc.narg('ingestion_ts_end') IS NULL)
ORDER BY v_ingestion_logs_with_change_set.id DESC
LIMIT sqlc.arg('limit') OFFSET sqlc.arg('offset');
43 changes: 0 additions & 43 deletions diode-server/dbstore/postgres/repositories.go

This file was deleted.

Loading

0 comments on commit 3f46f8a

Please sign in to comment.