Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce postgres dbstore with migrations #198

Merged
merged 14 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions diode-server/.mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mockname: "{{.InterfaceName}}"
structname: "{{.InterfaceName}}.go"
filename: "{{.InterfaceName | lower }}.go"
with-expecter: true
issue-845-fix: true
packages:
github.com/netboxlabs/diode/diode-server/netboxdiodeplugin:
config:
Expand Down
26 changes: 24 additions & 2 deletions diode-server/Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
.PHONY: deps lint test test-coverage build-all docker-all docker-compose-up docker-compose-down clean reconciler-proto-gen

SERVICES := $(shell find ./cmd/* -type d -exec basename {} \;)
BUILD_SERVICES = $(addprefix build-,$(SERVICES))
DOCKER_SERVICES = $(addprefix docker-,$(SERVICES))
Expand All @@ -23,15 +21,25 @@ else
DOCKER_COMPOSE := docker-compose
endif

.PHONY: install-dev-tools
install-dev-tools:
@go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest
@go install github.com/vektra/mockery/v2@latest
@go install github.com/sqlc-dev/sqlc/cmd/sqlc@latest

.PHONY: deps
deps:
@go mod tidy

.PHONY: lint
lint:
@golangci-lint run ./... --config ../.github/golangci.yaml

.PHONY: test
test:
@go test -race ./...

.PHONY: test-coverage
test-coverage:
@mkdir -p .coverage
@go test -race -cover -json -coverprofile=.coverage/cover.out.tmp ./... | grep -Ev "diodepb|reconcilerpb|cmd|mocks|sentry" | tparse -format=markdown > .coverage/test-report.md
Expand All @@ -44,6 +52,7 @@ $(BUILD_SERVICES):
@CGO_ENABLED=$(CGO_ENABLED) GOOS=$(GOOS) GOARCH=$(GOARCH) GOARM=$(GOARM)
@go build -ldflags "$(LD_FLAGS)" -o $(BUILD_DIR)/$(SVC) ./cmd/$(SVC)

.PHONY: build-all
build-all: $(BUILD_SERVICES)

.PHONY: $(DOCKER_SERVICES)
Expand All @@ -61,11 +70,14 @@ $(DOCKER_SERVICES):
--tag=$(ORG_NAME)/$(REPO_NAME)-$(SVC):$(DIODE_VERSION)-$(COMMIT_SHA) \
-f docker/Dockerfile .

.PHONY: docker-all
docker-all: $(DOCKER_SERVICES)

.PHONY: docker-compose-up
docker-compose-up:
$(DOCKER_COMPOSE) --env-file docker/sample.env -f docker/docker-compose.yaml up -d --build

.PHONY: docker-compose-down
docker-compose-down:
$(DOCKER_COMPOSE) --env-file docker/sample.env -f docker/docker-compose.yaml down --remove-orphans

Expand All @@ -79,12 +91,22 @@ docker-compose-dev-down:
@DIODE_VERSION=$(DIODE_VERSION) COMMIT_SHA=$(COMMIT_SHA) DIODE_TAG=$(DIODE_VERSION)-$(COMMIT_SHA) PROJECT_NAME=diode-dev \
$(DOCKER_COMPOSE) --env-file docker/sample.env -f docker/docker-compose.yaml -f docker/docker-compose.dev.yaml down --remove-orphans

.PHONY: docker-compose-netbox-up
docker-compose-netbox-up:
$(DOCKER_COMPOSE) -f docker/docker-compose.netbox.yaml up -d --build

.PHONY: docker-compose-netbox-down
docker-compose-netbox-down:
$(DOCKER_COMPOSE) -f docker/docker-compose.netbox.yaml down

.PHONY: clean
clean:
@rm -rf $(BUILD_DIR)/*

.PHONY: gen-mocks
gen-mocks:
mockery

.PHONY: gen-dbstore
gen-dbstore:
sqlc generate
4 changes: 2 additions & 2 deletions diode-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ cd /opt/diode
Download the default `docker-compose.yaml` and `.env` files from this repository:

```bash
curl -o docker-compose.yaml https://raw.githubusercontent.com/netboxlabs/diode/develop/diode-server/docker/docker-compose.yaml
curl -o .env https://raw.githubusercontent.com/netboxlabs/diode/develop/diode-server/docker/sample.env
curl -o docker-compose.yaml https://raw.githubusercontent.com/netboxlabs/diode/release/diode-server/docker/docker-compose.yaml
curl -o .env https://raw.githubusercontent.com/netboxlabs/diode/release/diode-server/docker/sample.env
```

Edit the `.env` to match your environment:
Expand Down
53 changes: 52 additions & 1 deletion diode-server/cmd/reconciler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,18 @@ package main

import (
"context"
"fmt"
"log/slog"
"os"

"github.com/getsentry/sentry-go"
"github.com/jackc/pgx/v5/pgxpool"
_ "github.com/jackc/pgx/v5/stdlib" // pgx to database/sql compatibility
"github.com/kelseyhightower/envconfig"
"github.com/pressly/goose/v3"

"github.com/netboxlabs/diode/diode-server/dbstore/postgres"
"github.com/netboxlabs/diode/diode-server/migrator"
"github.com/netboxlabs/diode/diode-server/reconciler"
"github.com/netboxlabs/diode/diode-server/server"
)
Expand All @@ -16,7 +24,27 @@ func main() {

defer s.Recover(sentry.CurrentHub())

ingestionProcessor, err := reconciler.NewIngestionProcessor(ctx, s.Logger())
var cfg reconciler.Config
envconfig.MustProcess("", &cfg)

dbURL := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", cfg.PostgresHost, cfg.PostgresPort, cfg.PostgresUser, cfg.PostgresPassword, cfg.PostgresDBName)

if err := runDBMigrations(ctx, s.Logger(), dbURL); err != nil {
s.Logger().Error("failed to run db migrations", "error", err)
os.Exit(1)
}

dbPool, err := pgxpool.New(ctx, dbURL)
if err != nil {
s.Logger().Error("failed to connect to postgres database", "error", err)
os.Exit(1)
}
defer dbPool.Close()

ingestionLogRepo := postgres.NewIngestionLogRepository(dbPool)
changeSetRepo := postgres.NewChangeSetRepository(dbPool)

ingestionProcessor, err := reconciler.NewIngestionProcessor(ctx, s.Logger(), ingestionLogRepo, changeSetRepo)
if err != nil {
s.Logger().Error("failed to instantiate ingestion processor", "error", err)
os.Exit(1)
Expand Down Expand Up @@ -45,3 +73,26 @@ func main() {
os.Exit(1)
}
}

func runDBMigrations(ctx context.Context, logger *slog.Logger, dbURL string) error {
dbDialect := "postgres"
db, err := goose.OpenDBWithDriver(dbDialect, dbURL)
if err != nil {
return fmt.Errorf("failed to open connection to database: %v", err)
}
defer func() {
if err := db.Close(); err != nil {
logger.Error("failed to close connection to database", "error", err)
}
}()

m, err := migrator.NewMigrator(logger, "postgres", db, "/etc/diode/migrations")
if err != nil {
return fmt.Errorf("failed to create migrator: %v", err)
}
if err := m.Run(ctx, migrator.OperationUp); err != nil {
return err
}

return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
-- +goose Up

-- Create the ingestion_logs table
CREATE TABLE IF NOT EXISTS ingestion_logs
(
id SERIAL PRIMARY KEY,
ingestion_log_ksuid CHAR(27) NOT NULL,
data_type VARCHAR(255),
state INTEGER,
request_id VARCHAR(255),
ingestion_ts BIGINT,
producer_app_name VARCHAR(255),
producer_app_version VARCHAR(255),
sdk_name VARCHAR(255),
sdk_version VARCHAR(255),
entity JSONB,
error JSONB,
source_metadata JSONB,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

-- Create indices
CREATE INDEX IF NOT EXISTS idx_ingestion_logs_ingestion_log_ksuid ON ingestion_logs(ingestion_log_ksuid);
CREATE INDEX IF NOT EXISTS idx_ingestion_logs_data_type ON ingestion_logs(data_type);
CREATE INDEX IF NOT EXISTS idx_ingestion_logs_state ON ingestion_logs(state);
CREATE INDEX IF NOT EXISTS idx_ingestion_logs_request_id ON ingestion_logs(request_id);

-- +goose Down

-- Drop the ingestion_logs table
DROP TABLE ingestion_logs;
49 changes: 49 additions & 0 deletions diode-server/dbstore/postgres/migrations/00002_change_sets.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
-- +goose Up

-- Create the change_sets table
CREATE TABLE IF NOT EXISTS change_sets
(
id SERIAL PRIMARY KEY,
change_set_ksuid CHAR(27) NOT NULL,
ingestion_log_id INTEGER NOT NULL,
branch_name VARCHAR(255),
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

-- Create indices
CREATE INDEX IF NOT EXISTS idx_change_sets_change_set_ksuid ON change_sets(change_set_ksuid);

-- Create the changes table
CREATE TABLE IF NOT EXISTS changes
(
id SERIAL PRIMARY KEY,
change_ksuid CHAR(27) NOT NULL,
change_set_id INTEGER NOT NULL,
change_type VARCHAR(50) NOT NULL,
object_type VARCHAR(100) NOT NULL,
object_id INTEGER,
object_version INTEGER,
data JSONB,
sequence_number INTEGER,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

-- Create indices
CREATE INDEX IF NOT EXISTS idx_changes_change_ksuid ON changes(change_ksuid);
CREATE INDEX IF NOT EXISTS idx_changes_change_set_id ON changes(change_set_id);
CREATE INDEX IF NOT EXISTS idx_changes_change_type ON changes(change_type);
CREATE INDEX IF NOT EXISTS idx_changes_object_type ON changes(object_type);

-- Add foreign key constraints
ALTER TABLE change_sets ADD CONSTRAINT fk_change_sets_ingestion_logs FOREIGN KEY (ingestion_log_id) REFERENCES ingestion_logs(id);
ALTER TABLE changes ADD CONSTRAINT fk_changes_change_sets FOREIGN KEY (change_set_id) REFERENCES change_sets(id);

-- +goose Down

-- Drop the changes table
DROP TABLE changes;

-- Drop the change_sets table
DROP TABLE change_sets;
12 changes: 12 additions & 0 deletions diode-server/dbstore/postgres/queries/change_sets.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- name: CreateChangeSet :one

INSERT INTO change_sets (change_set_ksuid, ingestion_log_id, branch_name)
VALUES ($1, $2, $3)
RETURNING *;

-- name: CreateChange :one

INSERT INTO changes (change_ksuid, change_set_id, change_type, object_type, object_id, object_version, data,
sequence_number)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING *;
4 changes: 4 additions & 0 deletions diode-server/dbstore/postgres/queries/ingestion_logs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- name: CreateIngestionLog :one
INSERT INTO ingestion_logs (ingestion_log_ksuid, data_type, state, request_id, ingestion_ts, producer_app_name,
producer_app_version, sdk_name, sdk_version, entity, source_metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING *;
43 changes: 43 additions & 0 deletions diode-server/dbstore/postgres/repositories.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package postgres

import (
"context"
"errors"

"github.com/netboxlabs/diode/diode-server/gen/dbstore/postgres"
"github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb"
)

// IngestionLogRepository allows interacting with ingestion logs.
type IngestionLogRepository struct {
queries *postgres.Queries
}

// NewIngestionLogRepository creates a new IngestionLogRepository.
func NewIngestionLogRepository(db postgres.DBTX) *IngestionLogRepository {
return &IngestionLogRepository{
queries: postgres.New(db),
}
}

// CreateIngestionLog creates a new ingestion log.
func (r *IngestionLogRepository) CreateIngestionLog(_ context.Context, _ *reconcilerpb.IngestionLog, _ []byte) error {
return errors.New("not implemented")
}

// ChangeSetRepository allows interacting with change sets.
type ChangeSetRepository struct {
queries *postgres.Queries
}

// NewChangeSetRepository creates a new ChangeSetRepository.
func NewChangeSetRepository(db postgres.DBTX) *ChangeSetRepository {
return &ChangeSetRepository{
queries: postgres.New(db),
}
}

// CreateChangeSet creates a new change set.
func (r *ChangeSetRepository) CreateChangeSet(_ context.Context, _ *reconcilerpb.ChangeSet) error {
return errors.New("not implemented")
}
33 changes: 32 additions & 1 deletion diode-server/docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,21 @@ services:
- MIGRATION_ENABLED=${MIGRATION_ENABLED}
- RECONCILER_RATE_LIMITER_RPS=${RECONCILER_RATE_LIMITER_RPS}
- RECONCILER_RATE_LIMITER_BURST=${RECONCILER_RATE_LIMITER_BURST}
- POSTGRES_HOST=${POSTGRES_HOST}
- POSTGRES_PORT=${POSTGRES_PORT}
- POSTGRES_DB_NAME=${POSTGRES_DB_NAME}
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
restart: always
ports: [ ]
volumes:
- ../dbstore/postgres/migrations:/etc/diode/migrations:z,ro
depends_on:
- diode-redis
diode-redis:
condition: service_started
postgres:
condition: service_healthy

diode-redis:
image: redis/redis-stack-server:latest
command:
Expand All @@ -82,6 +93,26 @@ services:
ports: [ ]
volumes:
- diode-redis-data:/data

postgres:
image: docker.io/postgres:16-alpine
environment:
- POSTGRES_DB=${POSTGRES_DB_NAME}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
- POSTGRES_USER=${POSTGRES_USER}
ports:
- ${POSTGRES_PORT:-5432}:5432
healthcheck:
test: pg_isready -q -t 2 -d $$POSTGRES_DB -U $$POSTGRES_USER
start_period: 20s
interval: 1s
timeout: 5s
retries: 5
volumes:
- diode-postgres-data:/var/lib/postgresql/data

volumes:
diode-redis-data:
driver: local
diode-postgres-data:
driver: local
5 changes: 5 additions & 0 deletions diode-server/docker/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,8 @@ SENTRY_DSN=
MIGRATION_ENABLED=true
RECONCILER_RATE_LIMITER_RPS=20
RECONCILER_RATE_LIMITER_BURST=1
POSTGRES_HOST=postgres
POSTGRES_PORT=5432
POSTGRES_DB_NAME=diode
POSTGRES_USER=diode
POSTGRES_PASSWORD=CHANGE.ME
Loading
Loading