Skip to content

Commit

Permalink
Implement bucket sharding (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
rxdn authored Sep 3, 2024
1 parent bd57603 commit 07eab92
Show file tree
Hide file tree
Showing 34 changed files with 915 additions and 46 deletions.
2 changes: 1 addition & 1 deletion cmd/cleanuser/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func parseCsv(file string) map[uint64][]int {
for i, h := range header {
if h == "guild_id" {
guildIdIdx = i
} else if h == "ticket_id" {
} else if h == "ticket_id" || h == "id" {
ticketIdIdx = i
}
}
Expand Down
1 change: 0 additions & 1 deletion cmd/fix-file-names/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func main() {
}

ch := client.ListObjects(context.Background(), conf.Bucket, minio.ListObjectsOptions{
Prefix: "8",
Recursive: true,
})

Expand Down
29 changes: 18 additions & 11 deletions cmd/logarchiver/main.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package main

import (
"context"
"fmt"
"github.com/TicketsBot/common/observability"
"github.com/TicketsBot/logarchiver/pkg/config"
"github.com/TicketsBot/logarchiver/pkg/http"
"github.com/TicketsBot/logarchiver/pkg/repository"
"github.com/TicketsBot/logarchiver/pkg/s3client"
"github.com/getsentry/sentry-go"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"go.uber.org/zap"
"time"
)

func main() {
Expand Down Expand Up @@ -41,21 +43,26 @@ func main() {
panic(err)
}

logger.Debug("Starting minio client...")
logger.Info("Connecting to database...")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// create minio client
client, err := minio.New(conf.Endpoint, &minio.Options{
Creds: credentials.NewStaticV4(conf.AccessKey, conf.SecretKey, ""),
Secure: true,
})
store, err := repository.ConnectPostgres(ctx, conf)
if err != nil {
logger.Fatal("Failed to create minio client", zap.Error(err), zap.String("endpoint", conf.Endpoint))
panic(err) // logger.Fatal should exit already
logger.Fatal("Failed to connect to database", zap.Error(err))
}

logger.Info("Connected.")

logger.Debug("Starting S3 client manager...")
clientManager := s3client.NewShardedClientManager(conf, store)
if err := clientManager.Load(ctx); err != nil {
logger.Fatal("Failed to load S3 clients", zap.Error(err))
}

logger.Debug("Starting HTTP server...")

server := http.NewServer(logger, conf, client)
server := http.NewServer(logger, conf, store, clientManager)
go server.RemoveQueue.StartReaper()
server.RegisterRoutes()
server.Start()
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ require (
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.0.1 // indirect
github.com/jackc/pgservicefile v0.0.0-20200307190119-3430c5407db8 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgtype v1.3.0 // indirect
github.com/jackc/pgx v3.6.2+incompatible // indirect
github.com/jackc/pgx/v5 v5.6.0 // indirect
github.com/jackc/puddle v1.1.0 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ github.com/jackc/pgproto3/v2 v2.0.1 h1:Rdjp4NFjwHnEslx2b66FfCI2S0LhO4itac3hXz6WX
github.com/jackc/pgproto3/v2 v2.0.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
github.com/jackc/pgservicefile v0.0.0-20200307190119-3430c5407db8 h1:Q3tB+ExeflWUW7AFcAhXqk40s9mnNYLk1nOkKNZ5GnU=
github.com/jackc/pgservicefile v0.0.0-20200307190119-3430c5407db8/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgtype v0.0.0-20190421001408-4ed0de4755e0/go.mod h1:hdSHsc1V01CGwFsrv11mJRHWJ6aifDLfdV3aVjFF0zg=
github.com/jackc/pgtype v0.0.0-20190824184912-ab885b375b90/go.mod h1:KcahbBH1nCMSo2DXpzsoWOAfFkdEtEJpPbVLq8eE+mc=
github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrUS8lot6TQqcg7mtthZ9T0EoIBFiJcmcyw=
Expand All @@ -97,10 +99,14 @@ github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9
github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc=
github.com/jackc/pgx/v4 v4.6.0 h1:Fh0O9GdlG4gYpjpwOqjdEodJUQM9jzN3Hdv7PN0xmm0=
github.com/jackc/pgx/v4 v4.6.0/go.mod h1:vPh43ZzxijXUVJ+t/EmXBtFmbFVO72cuneCT9oAlxAg=
github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY=
github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw=
github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v1.1.0 h1:musOWczZC/rSbqut475Vfcczg7jJsdUQf0D6oKPLgNU=
github.com/jackc/puddle v1.1.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
Expand Down Expand Up @@ -244,6 +250,7 @@ golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
18 changes: 18 additions & 0 deletions migrations/0001-init-schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CREATE TABLE buckets
(
"id" uuid PRIMARY KEY DEFAULT gen_random_uuid(),
"endpoint_url" VARCHAR(255) NOT NULL,
"name" VARCHAR(255) NOT NULL,
"active" BOOLEAN NOT NULL DEFAULT FALSE
);

CREATE TABLE objects
(
"guild_id" int8 NOT NULL,
"ticket_id" int4 NOT NULL,
"bucket_id" uuid NOT NULL,
PRIMARY KEY ("guild_id", "ticket_id"),
FOREIGN KEY ("bucket_id") REFERENCES "buckets" ("id")
);

CREATE INDEX objects_guid_id_idx ON objects ("guild_id");
24 changes: 18 additions & 6 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,33 @@
package config

import "github.com/caarlos0/env"
import (
"github.com/caarlos0/env"
"github.com/google/uuid"
"reflect"
)

type Config struct {
Address string `env:"ARCHIVER_ADDR"`

Endpoint string `env:"S3_ENDPOINT"`
Bucket string `env:"S3_BUCKET"`
AccessKey string `env:"S3_ACCESS"`
SecretKey string `env:"S3_SECRET"`
AccessKey string `env:"S3_ACCESS"`
SecretKey string `env:"S3_SECRET"`
DefaultBucketId uuid.UUID `env:"DEFAULT_BUCKET_ID"`

SentryDsn string `env:"SENTRY_DSN"`
ProductionMode bool `env:"PRODUCTION_MODE" envDefault:"false"`
AdminAuthToken string `env:"ADMIN_AUTH_TOKEN"`

DatabaseUri string `env:"DATABASE_URI"`
}

func Parse() (conf Config) {
if err := env.Parse(&conf); err != nil {
parsers := env.CustomParsers{
reflect.TypeOf(uuid.UUID{}): func(value string) (interface{}, error) {
return uuid.Parse(value)
},
}

if err := env.ParseWithFuncs(&conf, parsers); err != nil {
panic(err)
}

Expand Down
112 changes: 112 additions & 0 deletions pkg/http/admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package http

import (
"errors"
"github.com/TicketsBot/logarchiver/pkg/repository"
"github.com/TicketsBot/logarchiver/pkg/repository/model"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"go.uber.org/zap"
"net/http"
)

func (s *Server) adminListBuckets(ctx *gin.Context) {
var buckets []model.Bucket
if err := s.store.Tx(ctx, func(r repository.Repositories) (err error) {
buckets, err = r.Buckets().ListBuckets(ctx)
return
}); err != nil {
s.Logger.Error("Error fetching buckets", zap.Error(err))
ctx.JSON(500, gin.H{
"message": "Error fetching buckets",
})
return
}

ctx.JSON(200, buckets)
}

func (s *Server) adminCreateBucket(ctx *gin.Context) {
type body struct {
EndpointUrl string `json:"endpoint_url"`
Name string `json:"name"`
}

var b body
if err := ctx.BindJSON(&b); err != nil {
ctx.JSON(400, gin.H{
"message": "missing endpoint_url or name",
})
return
}

var bucketId uuid.UUID
if err := s.store.Tx(ctx, func(r repository.Repositories) (err error) {
bucketId, err = r.Buckets().CreateBucket(ctx, b.EndpointUrl, b.Name)
return
}); err != nil {
s.Logger.Error("Error creating bucket", zap.Error(err), zap.Any("request_data", b))
ctx.JSON(500, gin.H{
"message": "Error creating bucket",
})
return
}

ctx.JSON(http.StatusCreated, model.Bucket{
Id: bucketId,
EndpointUrl: b.EndpointUrl,
Name: b.Name,
Active: false,
})
}

func (s *Server) adminSetActiveBucket(ctx *gin.Context) {
type body struct {
BucketId uuid.UUID `json:"bucket_id"`
}

var b body
if err := ctx.BindJSON(&b); err != nil {
ctx.JSON(400, gin.H{
"message": "missing bucket_id",
})
return
}

var ErrBucketNotFound = errors.New("bucket not found")
if err := s.store.Tx(ctx, func(r repository.Repositories) error {
buckets, err := r.Buckets().ListBuckets(ctx)
if err != nil {
return err
}

found := false
for _, bucket := range buckets {
if bucket.Id == b.BucketId {
found = true
break
}
}

if !found {
return ErrBucketNotFound
}

return r.Buckets().SetActiveBucket(ctx, b.BucketId)
}); err != nil {
if errors.Is(err, ErrBucketNotFound) {
ctx.JSON(404, gin.H{
"message": "bucket not found",
})
} else {
s.Logger.Error("Error setting active bucket", zap.Error(err), zap.Any("request_data", b))
ctx.JSON(500, gin.H{
"message": "Error setting active bucket",
})
}

return
}

ctx.Status(204)
}
24 changes: 24 additions & 0 deletions pkg/http/middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package http

import "github.com/gin-gonic/gin"

func (s *Server) middlewareAuthAdmin(ctx *gin.Context) {
if len(s.Config.AdminAuthToken) == 0 {
s.Logger.Error("Admin authentication token not set")
ctx.JSON(500, gin.H{
"message": "misconfigured server",
})
ctx.Abort()
return
}

if ctx.GetHeader("Authorization") != s.Config.AdminAuthToken {
ctx.JSON(401, gin.H{
"message": "unauthorized",
})
ctx.Abort()
return
}

ctx.Next()
}
Loading

0 comments on commit 07eab92

Please sign in to comment.