diff --git a/cmd/cleanuser/main.go b/cmd/cleanuser/main.go index 6adde56..19db99a 100644 --- a/cmd/cleanuser/main.go +++ b/cmd/cleanuser/main.go @@ -206,7 +206,7 @@ func parseCsv(file string) map[uint64][]int { for i, h := range header { if h == "guild_id" { guildIdIdx = i - } else if h == "ticket_id" { + } else if h == "ticket_id" || h == "id" { ticketIdIdx = i } } diff --git a/cmd/fix-file-names/main.go b/cmd/fix-file-names/main.go index f354392..798e2cd 100644 --- a/cmd/fix-file-names/main.go +++ b/cmd/fix-file-names/main.go @@ -33,7 +33,6 @@ func main() { } ch := client.ListObjects(context.Background(), conf.Bucket, minio.ListObjectsOptions{ - Prefix: "8", Recursive: true, }) diff --git a/cmd/logarchiver/main.go b/cmd/logarchiver/main.go index f8b34f7..ef5a5dd 100644 --- a/cmd/logarchiver/main.go +++ b/cmd/logarchiver/main.go @@ -1,14 +1,16 @@ package main import ( + "context" "fmt" "github.com/TicketsBot/common/observability" "github.com/TicketsBot/logarchiver/pkg/config" "github.com/TicketsBot/logarchiver/pkg/http" + "github.com/TicketsBot/logarchiver/pkg/repository" + "github.com/TicketsBot/logarchiver/pkg/s3client" "github.com/getsentry/sentry-go" - "github.com/minio/minio-go/v7" - "github.com/minio/minio-go/v7/pkg/credentials" "go.uber.org/zap" + "time" ) func main() { @@ -41,21 +43,26 @@ func main() { panic(err) } - logger.Debug("Starting minio client...") + logger.Info("Connecting to database...") + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() - // create minio client - client, err := minio.New(conf.Endpoint, &minio.Options{ - Creds: credentials.NewStaticV4(conf.AccessKey, conf.SecretKey, ""), - Secure: true, - }) + store, err := repository.ConnectPostgres(ctx, conf) if err != nil { - logger.Fatal("Failed to create minio client", zap.Error(err), zap.String("endpoint", conf.Endpoint)) - panic(err) // logger.Fatal should exit already + logger.Fatal("Failed to connect to database", zap.Error(err)) + } + + logger.Info("Connected.") + + logger.Debug("Starting S3 client manager...") + clientManager := s3client.NewShardedClientManager(conf, store) + if err := clientManager.Load(ctx); err != nil { + logger.Fatal("Failed to load S3 clients", zap.Error(err)) } logger.Debug("Starting HTTP server...") - server := http.NewServer(logger, conf, client) + server := http.NewServer(logger, conf, store, clientManager) go server.RemoveQueue.StartReaper() server.RegisterRoutes() server.Start() diff --git a/go.mod b/go.mod index 588f31b..4d8eca7 100644 --- a/go.mod +++ b/go.mod @@ -33,10 +33,12 @@ require ( github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgproto3/v2 v2.0.1 // indirect - github.com/jackc/pgservicefile v0.0.0-20200307190119-3430c5407db8 // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/pgtype v1.3.0 // indirect github.com/jackc/pgx v3.6.2+incompatible // indirect + github.com/jackc/pgx/v5 v5.6.0 // indirect github.com/jackc/puddle v1.1.0 // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect diff --git a/go.sum b/go.sum index 8b37ac7..d517c15 100644 --- a/go.sum +++ b/go.sum @@ -85,6 +85,8 @@ github.com/jackc/pgproto3/v2 v2.0.1 h1:Rdjp4NFjwHnEslx2b66FfCI2S0LhO4itac3hXz6WX github.com/jackc/pgproto3/v2 v2.0.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= github.com/jackc/pgservicefile v0.0.0-20200307190119-3430c5407db8 h1:Q3tB+ExeflWUW7AFcAhXqk40s9mnNYLk1nOkKNZ5GnU= github.com/jackc/pgservicefile v0.0.0-20200307190119-3430c5407db8/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= github.com/jackc/pgtype v0.0.0-20190421001408-4ed0de4755e0/go.mod h1:hdSHsc1V01CGwFsrv11mJRHWJ6aifDLfdV3aVjFF0zg= github.com/jackc/pgtype v0.0.0-20190824184912-ab885b375b90/go.mod h1:KcahbBH1nCMSo2DXpzsoWOAfFkdEtEJpPbVLq8eE+mc= github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrUS8lot6TQqcg7mtthZ9T0EoIBFiJcmcyw= @@ -97,10 +99,14 @@ github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9 github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc= github.com/jackc/pgx/v4 v4.6.0 h1:Fh0O9GdlG4gYpjpwOqjdEodJUQM9jzN3Hdv7PN0xmm0= github.com/jackc/pgx/v4 v4.6.0/go.mod h1:vPh43ZzxijXUVJ+t/EmXBtFmbFVO72cuneCT9oAlxAg= +github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= +github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.0 h1:musOWczZC/rSbqut475Vfcczg7jJsdUQf0D6oKPLgNU= github.com/jackc/puddle v1.1.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= @@ -244,6 +250,7 @@ golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/migrations/0001-init-schema.sql b/migrations/0001-init-schema.sql new file mode 100644 index 0000000..c2a621a --- /dev/null +++ b/migrations/0001-init-schema.sql @@ -0,0 +1,18 @@ +CREATE TABLE buckets +( + "id" uuid PRIMARY KEY DEFAULT gen_random_uuid(), + "endpoint_url" VARCHAR(255) NOT NULL, + "name" VARCHAR(255) NOT NULL, + "active" BOOLEAN NOT NULL DEFAULT FALSE +); + +CREATE TABLE objects +( + "guild_id" int8 NOT NULL, + "ticket_id" int4 NOT NULL, + "bucket_id" uuid NOT NULL, + PRIMARY KEY ("guild_id", "ticket_id"), + FOREIGN KEY ("bucket_id") REFERENCES "buckets" ("id") +); + +CREATE INDEX objects_guid_id_idx ON objects ("guild_id"); diff --git a/pkg/config/config.go b/pkg/config/config.go index e7a1a0d..cd2bd5f 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -1,21 +1,33 @@ package config -import "github.com/caarlos0/env" +import ( + "github.com/caarlos0/env" + "github.com/google/uuid" + "reflect" +) type Config struct { Address string `env:"ARCHIVER_ADDR"` - Endpoint string `env:"S3_ENDPOINT"` - Bucket string `env:"S3_BUCKET"` - AccessKey string `env:"S3_ACCESS"` - SecretKey string `env:"S3_SECRET"` + AccessKey string `env:"S3_ACCESS"` + SecretKey string `env:"S3_SECRET"` + DefaultBucketId uuid.UUID `env:"DEFAULT_BUCKET_ID"` SentryDsn string `env:"SENTRY_DSN"` ProductionMode bool `env:"PRODUCTION_MODE" envDefault:"false"` + AdminAuthToken string `env:"ADMIN_AUTH_TOKEN"` + + DatabaseUri string `env:"DATABASE_URI"` } func Parse() (conf Config) { - if err := env.Parse(&conf); err != nil { + parsers := env.CustomParsers{ + reflect.TypeOf(uuid.UUID{}): func(value string) (interface{}, error) { + return uuid.Parse(value) + }, + } + + if err := env.ParseWithFuncs(&conf, parsers); err != nil { panic(err) } diff --git a/pkg/http/admin.go b/pkg/http/admin.go new file mode 100644 index 0000000..66da3b9 --- /dev/null +++ b/pkg/http/admin.go @@ -0,0 +1,112 @@ +package http + +import ( + "errors" + "github.com/TicketsBot/logarchiver/pkg/repository" + "github.com/TicketsBot/logarchiver/pkg/repository/model" + "github.com/gin-gonic/gin" + "github.com/google/uuid" + "go.uber.org/zap" + "net/http" +) + +func (s *Server) adminListBuckets(ctx *gin.Context) { + var buckets []model.Bucket + if err := s.store.Tx(ctx, func(r repository.Repositories) (err error) { + buckets, err = r.Buckets().ListBuckets(ctx) + return + }); err != nil { + s.Logger.Error("Error fetching buckets", zap.Error(err)) + ctx.JSON(500, gin.H{ + "message": "Error fetching buckets", + }) + return + } + + ctx.JSON(200, buckets) +} + +func (s *Server) adminCreateBucket(ctx *gin.Context) { + type body struct { + EndpointUrl string `json:"endpoint_url"` + Name string `json:"name"` + } + + var b body + if err := ctx.BindJSON(&b); err != nil { + ctx.JSON(400, gin.H{ + "message": "missing endpoint_url or name", + }) + return + } + + var bucketId uuid.UUID + if err := s.store.Tx(ctx, func(r repository.Repositories) (err error) { + bucketId, err = r.Buckets().CreateBucket(ctx, b.EndpointUrl, b.Name) + return + }); err != nil { + s.Logger.Error("Error creating bucket", zap.Error(err), zap.Any("request_data", b)) + ctx.JSON(500, gin.H{ + "message": "Error creating bucket", + }) + return + } + + ctx.JSON(http.StatusCreated, model.Bucket{ + Id: bucketId, + EndpointUrl: b.EndpointUrl, + Name: b.Name, + Active: false, + }) +} + +func (s *Server) adminSetActiveBucket(ctx *gin.Context) { + type body struct { + BucketId uuid.UUID `json:"bucket_id"` + } + + var b body + if err := ctx.BindJSON(&b); err != nil { + ctx.JSON(400, gin.H{ + "message": "missing bucket_id", + }) + return + } + + var ErrBucketNotFound = errors.New("bucket not found") + if err := s.store.Tx(ctx, func(r repository.Repositories) error { + buckets, err := r.Buckets().ListBuckets(ctx) + if err != nil { + return err + } + + found := false + for _, bucket := range buckets { + if bucket.Id == b.BucketId { + found = true + break + } + } + + if !found { + return ErrBucketNotFound + } + + return r.Buckets().SetActiveBucket(ctx, b.BucketId) + }); err != nil { + if errors.Is(err, ErrBucketNotFound) { + ctx.JSON(404, gin.H{ + "message": "bucket not found", + }) + } else { + s.Logger.Error("Error setting active bucket", zap.Error(err), zap.Any("request_data", b)) + ctx.JSON(500, gin.H{ + "message": "Error setting active bucket", + }) + } + + return + } + + ctx.Status(204) +} diff --git a/pkg/http/middleware.go b/pkg/http/middleware.go new file mode 100644 index 0000000..6d4940c --- /dev/null +++ b/pkg/http/middleware.go @@ -0,0 +1,24 @@ +package http + +import "github.com/gin-gonic/gin" + +func (s *Server) middlewareAuthAdmin(ctx *gin.Context) { + if len(s.Config.AdminAuthToken) == 0 { + s.Logger.Error("Admin authentication token not set") + ctx.JSON(500, gin.H{ + "message": "misconfigured server", + }) + ctx.Abort() + return + } + + if ctx.GetHeader("Authorization") != s.Config.AdminAuthToken { + ctx.JSON(401, gin.H{ + "message": "unauthorized", + }) + ctx.Abort() + return + } + + ctx.Next() +} diff --git a/pkg/http/purgeguild.go b/pkg/http/purgeguild.go index 0e9b6d9..751716d 100644 --- a/pkg/http/purgeguild.go +++ b/pkg/http/purgeguild.go @@ -5,11 +5,15 @@ import ( "errors" "fmt" "github.com/TicketsBot/logarchiver/internal" + "github.com/TicketsBot/logarchiver/pkg/repository" + "github.com/TicketsBot/logarchiver/pkg/repository/model" "github.com/gin-gonic/gin" "github.com/minio/minio-go/v7" "go.uber.org/zap" "net/http" "strconv" + "strings" + "sync" ) func (s *Server) purgeGuildHandler(ctx *gin.Context) { @@ -22,6 +26,17 @@ func (s *Server) purgeGuildHandler(ctx *gin.Context) { return } + defaultClient, err := s.s3Clients.Get(s.Config.DefaultBucketId) + if err != nil { + s.Logger.Error("Failed to get default S3 client", zap.Error(err), zap.String("bucket_id", s.Config.DefaultBucketId.String())) + + ctx.JSON(500, gin.H{ + "success": false, + "message": "Failed to get default S3 client", + }) + return + } + if err := s.RemoveQueue.StartOperation(guildId); err != nil { if errors.Is(err, internal.ErrOperationInProgress) { ctx.JSON(400, gin.H{ @@ -38,22 +53,33 @@ func (s *Server) purgeGuildHandler(ctx *gin.Context) { return } - removeCh := make(chan minio.ObjectInfo) - go func() { - opts := minio.RemoveObjectsOptions{} + type record struct { + model.Object + Key string + } + removeCh := make(chan record) + go func() { var errCount int - for err := range s.minio.RemoveObjects(context.Background(), s.Config.Bucket, removeCh, opts) { - s.RemoveQueue.AddError(guildId, err.ObjectName, err.Err) + for r := range removeCh { + client, err := s.s3Clients.Get(r.BucketId) + if err != nil { + s.Logger.Error("Failed to get S3 client", zap.Error(err), zap.String("bucket_id", r.BucketId.String())) + continue + } - s.Logger.Error( - "Failed to remove object", - zap.Error(err.Err), - zap.String("object", err.ObjectName), - zap.Uint64("guild", guildId), - ) + if err := client.Minio().RemoveObject(context.Background(), client.BucketName(), r.Key, minio.RemoveObjectOptions{}); err != nil { + s.RemoveQueue.AddError(guildId, r.Key, err) + + s.Logger.Error( + "Failed to remove object", + zap.Error(err), + zap.String("object", r.Key), + zap.Uint64("guild", guildId), + ) - errCount++ + errCount++ + } } if errCount > 0 { @@ -69,8 +95,15 @@ func (s *Server) purgeGuildHandler(ctx *gin.Context) { } }() + // For the default bucket, we'll have to list all objects. For new buckets, we can fetch a list of objects from + // the database. + + latch := sync.WaitGroup{} + latch.Add(2) + + // Fetch from the default bucket go func() { - objCh := s.minio.ListObjects(context.Background(), s.Config.Bucket, minio.ListObjectsOptions{ + objCh := defaultClient.Minio().ListObjects(context.Background(), defaultClient.BucketName(), minio.ListObjectsOptions{ Prefix: fmt.Sprintf("%d/", guildId), Recursive: true, }) @@ -82,13 +115,60 @@ func (s *Server) purgeGuildHandler(ctx *gin.Context) { zap.Uint64("guild", guildId), ) + // Parse ticket ID, in form guildId/ticketId or guildId/free-ticketId + cut := obj.Key[len(fmt.Sprintf("%d/", guildId)):] + if strings.HasPrefix(cut, "free-") { + cut = cut[len("free-"):] + } + + ticketId, err := strconv.Atoi(cut) + if err != nil { + s.Logger.Error("Failed to parse ticket ID", zap.Error(err), zap.String("object", obj.Key), zap.Uint64("guild", guildId)) + s.RemoveQueue.AddError(guildId, obj.Key, err) + continue + } + s.RemoveQueue.AddRemovedObject(guildId, obj.Key) - removeCh <- obj + removeCh <- record{ + Object: model.Object{ + GuildId: guildId, + TicketId: ticketId, + BucketId: s.Config.DefaultBucketId, + }, + Key: obj.Key, + } } close(removeCh) }() + // Fetch from the database + go func() { + var objects []model.Object + if err := s.store.Tx(context.Background(), func(r repository.Repositories) (err error) { + objects, err = r.Objects().ListByGuild(context.Background(), guildId) + return + }); err != nil { + s.Logger.Error("Failed to fetch objects from database", zap.Error(err), zap.Uint64("guild", guildId)) + return + } + + for _, obj := range objects { + s.Logger.Debug("Found object to remove", zap.String("bucket", obj.BucketId.String()), zap.Uint64("guild", guildId), zap.Int("ticket_id", obj.TicketId)) + s.RemoveQueue.AddRemovedObject(guildId, obj.S3Key()) + removeCh <- record{ + Object: obj, + Key: obj.S3Key(), + } + } + }() + + // Close the remove channel when both goroutines have completed + go func() { + latch.Wait() + close(removeCh) + }() + ctx.JSON(http.StatusAccepted, gin.H{ "success": true, }) diff --git a/pkg/http/server.go b/pkg/http/server.go index aa8230b..5e6cf5f 100644 --- a/pkg/http/server.go +++ b/pkg/http/server.go @@ -1,12 +1,15 @@ package http import ( + "context" "github.com/TicketsBot/logarchiver/internal" "github.com/TicketsBot/logarchiver/pkg/config" + "github.com/TicketsBot/logarchiver/pkg/repository" + "github.com/TicketsBot/logarchiver/pkg/repository/model" "github.com/TicketsBot/logarchiver/pkg/s3client" ginzap "github.com/gin-contrib/zap" "github.com/gin-gonic/gin" - "github.com/minio/minio-go/v7" + "github.com/google/uuid" "go.uber.org/zap" "time" ) @@ -16,18 +19,18 @@ type Server struct { Config config.Config RemoveQueue internal.RemoveQueue router *gin.Engine - minio *minio.Client - s3 *s3client.S3Client + store repository.Store + s3Clients *s3client.ShardedClientManager } -func NewServer(logger *zap.Logger, config config.Config, client *minio.Client) *Server { +func NewServer(logger *zap.Logger, config config.Config, store repository.Store, clientManager *s3client.ShardedClientManager) *Server { return &Server{ Logger: logger, Config: config, RemoveQueue: internal.NewRemoveQueue(logger), router: gin.New(), - minio: client, - s3: s3client.NewS3Client(client, config.Bucket), + store: store, + s3Clients: clientManager, } } @@ -41,6 +44,13 @@ func (s *Server) RegisterRoutes() { s.router.GET("/guild/status/:id", s.purgeStatusHandler) s.router.DELETE("/guild/:id", s.purgeGuildHandler) + + adminGroup := s.router.Group("/admin", s.middlewareAuthAdmin) + { + adminGroup.GET("/buckets", s.adminListBuckets) + adminGroup.POST("/buckets", s.adminCreateBucket) + adminGroup.PATCH("/buckets/active", s.adminSetActiveBucket) + } } func (s *Server) Start() { @@ -48,3 +58,53 @@ func (s *Server) Start() { panic(err) } } + +func (s *Server) getActiveClient(ctx context.Context) (*s3client.S3Client, model.Bucket, error) { + var bucket model.Bucket + + if err := s.store.Tx(ctx, func(r repository.Repositories) (err error) { + bucket, err = r.Buckets().GetActiveBucket(ctx) + return + }); err != nil { + return nil, model.Bucket{}, err + } + + client, err := s.s3Clients.Get(bucket.Id) + if err != nil { + return nil, model.Bucket{}, err + } + + return client, bucket, nil +} + +func (s *Server) getClientForObject(ctx context.Context, guild uint64, ticket int) (*s3client.S3Client, bool, error) { + bucketId := uuid.Nil + if err := s.store.Tx(ctx, func(r repository.Repositories) error { + object, ok, err := r.Objects().GetObject(ctx, guild, ticket) + if err != nil { + return err + } + + if !ok { + // TODO: Return status 404 + bucketId = s.Config.DefaultBucketId + } else { + bucketId = object.BucketId + } + + return nil + }); err != nil { + return nil, false, err + } + + if bucketId == uuid.Nil { + return nil, false, nil + } + + client, err := s.s3Clients.Get(bucketId) + if err != nil { + return nil, false, err + } + + return client, true, nil +} diff --git a/pkg/http/ticketdelete.go b/pkg/http/ticketdelete.go index be14c2e..75a2482 100644 --- a/pkg/http/ticketdelete.go +++ b/pkg/http/ticketdelete.go @@ -2,12 +2,13 @@ package http import ( "github.com/gin-gonic/gin" + "go.uber.org/zap" "net/http" "strconv" ) func (s *Server) ticketDeleteHandler(ctx *gin.Context) { - guild, err := strconv.ParseUint(ctx.Query("guild"), 10, 64) + guildId, err := strconv.ParseUint(ctx.Query("guild"), 10, 64) if err != nil { ctx.JSON(400, gin.H{ "message": "missing guild ID", @@ -23,10 +24,32 @@ func (s *Server) ticketDeleteHandler(ctx *gin.Context) { return } - if err := s.s3.DeleteTicket(ctx, guild, id); err != nil { + logger := s.Logger.With(zap.Uint64("guild", guildId), zap.Int("ticket", id)) + + // Find bucket + client, ok, err := s.getClientForObject(ctx, guildId, id) + if err != nil { + logger.Error("Failed to get client for object", zap.Error(err)) ctx.JSON(500, gin.H{ "message": err.Error(), }) + return + } + + if !ok { + logger.Warn("Ticket not found") + ctx.JSON(404, gin.H{ + "message": "ticket not found", + }) + return + } + + if err := client.DeleteTicket(ctx, guildId, id); err != nil { + logger.Error("Failed to delete ticket", zap.Error(err)) + ctx.JSON(500, gin.H{ + "message": err.Error(), + }) + return } ctx.Status(http.StatusNoContent) diff --git a/pkg/http/ticketget.go b/pkg/http/ticketget.go index df3480e..16e1f97 100644 --- a/pkg/http/ticketget.go +++ b/pkg/http/ticketget.go @@ -4,6 +4,7 @@ import ( "errors" "github.com/TicketsBot/logarchiver/pkg/s3client" "github.com/gin-gonic/gin" + "go.uber.org/zap" "strconv" ) @@ -24,7 +25,23 @@ func (s *Server) ticketGetHandler(ctx *gin.Context) { return } - data, err := s.s3.GetTicket(ctx, guild, id) + client, ok, err := s.getClientForObject(ctx, guild, id) + if err != nil { + s.Logger.Error("Failed to get client for object", zap.Error(err)) + ctx.JSON(500, gin.H{ + "message": err.Error(), + }) + return + } + + if !ok { + ctx.JSON(404, gin.H{ + "message": "ticket not found", + }) + return + } + + data, err := client.GetTicket(ctx, guild, id) if err != nil { var statusCode int if errors.Is(err, s3client.ErrTicketNotFound) { diff --git a/pkg/http/ticketupload.go b/pkg/http/ticketupload.go index 9456323..de23a52 100644 --- a/pkg/http/ticketupload.go +++ b/pkg/http/ticketupload.go @@ -1,7 +1,10 @@ package http import ( + "github.com/TicketsBot/logarchiver/pkg/repository" + "github.com/TicketsBot/logarchiver/pkg/repository/model" "github.com/gin-gonic/gin" + "go.uber.org/zap" "strconv" ) @@ -30,11 +33,47 @@ func (s *Server) ticketUploadHandler(ctx *gin.Context) { return } - if err := s.s3.StoreTicket(ctx, guild, id, body); err != nil { + // Get active bucket + client, bucket, err := s.getActiveClient(ctx) + if err != nil { + s.Logger.Error("Failed to get active client", zap.Error(err)) + + ctx.JSON(500, gin.H{ + "message": err.Error(), + }) + return + } + + // Create object and commit transaction BEFORE writing to S3, to prevent "lost" objects + if err := s.store.Tx(ctx, func(r repository.Repositories) error { + return r.Objects().CreateObject(ctx, model.Object{ + GuildId: guild, + TicketId: id, + BucketId: bucket.Id, + }) + }); err != nil { + s.Logger.Error("Failed to create object in DB", zap.Error(err)) ctx.JSON(500, gin.H{ "message": err.Error(), }) - } else { - ctx.JSON(200, gin.H{}) + return } + + if err := client.StoreTicket(ctx, guild, id, body); err != nil { + s.Logger.Error("Failed to store ticket", zap.Error(err)) + + // Try to remove object from DB, not the end of the world if it fails + if err := s.store.Tx(ctx, func(r repository.Repositories) error { + return r.Objects().DeleteObject(ctx, guild, id) + }); err != nil { + s.Logger.Error("Failed to delete object from DB", zap.Error(err)) + } + + ctx.JSON(500, gin.H{ + "message": err.Error(), + }) + return + } + + ctx.JSON(200, gin.H{}) } diff --git a/pkg/repository/errors.go b/pkg/repository/errors.go new file mode 100644 index 0000000..07a4ec2 --- /dev/null +++ b/pkg/repository/errors.go @@ -0,0 +1,5 @@ +package repository + +import "errors" + +var ErrNoActiveBucket = errors.New("no active bucket") diff --git a/pkg/repository/model/bucket.go b/pkg/repository/model/bucket.go new file mode 100644 index 0000000..7f03831 --- /dev/null +++ b/pkg/repository/model/bucket.go @@ -0,0 +1,10 @@ +package model + +import "github.com/google/uuid" + +type Bucket struct { + Id uuid.UUID `json:"id"` + EndpointUrl string `json:"endpoint_url"` + Name string `json:"name"` + Active bool `json:"active"` +} diff --git a/pkg/repository/model/object.go b/pkg/repository/model/object.go new file mode 100644 index 0000000..041baba --- /dev/null +++ b/pkg/repository/model/object.go @@ -0,0 +1,16 @@ +package model + +import ( + "fmt" + "github.com/google/uuid" +) + +type Object struct { + GuildId uint64 `json:"guild_id,string"` + TicketId int `json:"ticket_id"` + BucketId uuid.UUID `json:"bucket_id"` +} + +func (o Object) S3Key() string { + return fmt.Sprintf("%d/%d", o.GuildId, o.TicketId) +} diff --git a/pkg/repository/postgres.go b/pkg/repository/postgres.go new file mode 100644 index 0000000..79bf57c --- /dev/null +++ b/pkg/repository/postgres.go @@ -0,0 +1,43 @@ +package repository + +import ( + "context" + "github.com/TicketsBot/logarchiver/pkg/config" + "github.com/jackc/pgx/v5/pgxpool" +) + +type PostgresStore struct { + db *pgxpool.Pool +} + +var _ Store = (*PostgresStore)(nil) + +func NewPostgresRepository(db *pgxpool.Pool) *PostgresStore { + return &PostgresStore{ + db: db, + } +} + +func ConnectPostgres(ctx context.Context, config config.Config) (*PostgresStore, error) { + pool, err := pgxpool.New(ctx, config.DatabaseUri) + if err != nil { + return nil, err + } + + return NewPostgresRepository(pool), nil +} + +func (p *PostgresStore) Tx(ctx context.Context, f func(Repositories) error) error { + tx, err := p.db.Begin(ctx) + if err != nil { + return err + } + + defer tx.Rollback(ctx) + + if err := f(newPostgresRepositories(tx)); err != nil { + return err + } + + return tx.Commit(ctx) +} diff --git a/pkg/repository/postgresbuckets.go b/pkg/repository/postgresbuckets.go new file mode 100644 index 0000000..9227b61 --- /dev/null +++ b/pkg/repository/postgresbuckets.go @@ -0,0 +1,106 @@ +package repository + +import ( + "context" + _ "embed" + "errors" + "github.com/TicketsBot/logarchiver/pkg/repository/model" + "github.com/google/uuid" + "github.com/jackc/pgx/v5" +) + +type PostgresBucketRepository struct { + tx pgx.Tx +} + +var _ BucketRepository = (*PostgresBucketRepository)(nil) + +var ( + //go:embed sql/buckets/list.sql + queryListBuckets string + + //go:embed sql/buckets/get_active.sql + queryGetActiveBucket string + + //go:embed sql/buckets/set_active_remove_old.sql + querySetActiveBucketRemoveOld string + + //go:embed sql/buckets/set_active_set_new.sql + querySetActiveBucketSetNew string + + //go:embed sql/buckets/create.sql + queryCreateBucket string + + //go:embed sql/buckets/delete.sql + queryDeleteBucket string +) + +func newPostgresBucketRepository(tx pgx.Tx) *PostgresBucketRepository { + return &PostgresBucketRepository{ + tx: tx, + } +} + +func (p *PostgresBucketRepository) ListBuckets(ctx context.Context) ([]model.Bucket, error) { + res, err := p.tx.Query(ctx, queryListBuckets) + if err != nil { + return nil, err + } + + defer res.Close() + + buckets := make([]model.Bucket, 0) + for res.Next() { + var bucket model.Bucket + if err := res.Scan(&bucket.Id, &bucket.EndpointUrl, &bucket.Name, &bucket.Active); err != nil { + return nil, err + } + + buckets = append(buckets, bucket) + } + + return buckets, nil +} + +func (p *PostgresBucketRepository) GetActiveBucket(ctx context.Context) (model.Bucket, error) { + var bucket model.Bucket + if err := p.tx.QueryRow(ctx, queryGetActiveBucket).Scan(&bucket.Id, &bucket.EndpointUrl, &bucket.Name, &bucket.Active); err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return model.Bucket{}, ErrNoActiveBucket + } else { + return model.Bucket{}, err + } + } + + return bucket, nil +} + +func (p *PostgresBucketRepository) SetActiveBucket(ctx context.Context, id uuid.UUID) error { + if _, err := p.tx.Exec(ctx, querySetActiveBucketRemoveOld); err != nil { + return err + } + + if _, err := p.tx.Exec(ctx, querySetActiveBucketSetNew, id); err != nil { + return err + } + + return nil +} + +func (p *PostgresBucketRepository) CreateBucket(ctx context.Context, endpointUrl, name string) (uuid.UUID, error) { + id := uuid.New() + + if _, err := p.tx.Exec(ctx, queryCreateBucket, id, endpointUrl, name); err != nil { + return uuid.Nil, err + } + + return id, nil +} + +func (p *PostgresBucketRepository) DeleteBucket(ctx context.Context, id uuid.UUID) error { + if _, err := p.tx.Exec(ctx, queryDeleteBucket, id); err != nil { + return err + } + + return nil +} diff --git a/pkg/repository/postgresobjects.go b/pkg/repository/postgresobjects.go new file mode 100644 index 0000000..cede87a --- /dev/null +++ b/pkg/repository/postgresobjects.go @@ -0,0 +1,85 @@ +package repository + +import ( + "context" + _ "embed" + "errors" + "github.com/TicketsBot/logarchiver/pkg/repository/model" + "github.com/jackc/pgx/v5" +) + +type PostgresObjectRepository struct { + tx pgx.Tx +} + +var _ ObjectRepository = (*PostgresObjectRepository)(nil) + +var ( + //go:embed sql/objects/get.sql + queryGetObject string + + //go:embed sql/objects/list_by_guild.sql + queryListByGuild string + + //go:embed sql/objects/create.sql + queryCreateObject string + + //go:embed sql/objects/delete.sql + queryDeleteObject string +) + +func newPostgresObjectRepository(tx pgx.Tx) *PostgresObjectRepository { + return &PostgresObjectRepository{ + tx: tx, + } +} + +func (p *PostgresObjectRepository) GetObject(ctx context.Context, guildId uint64, ticketId int) (model.Object, bool, error) { + var object model.Object + if err := p.tx.QueryRow(ctx, queryGetObject, guildId, ticketId).Scan(&object.GuildId, &object.TicketId, &object.BucketId); err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return model.Object{}, false, nil + } else { + return model.Object{}, false, err + } + } + + return object, true, nil +} + +func (p *PostgresObjectRepository) ListByGuild(ctx context.Context, guildId uint64) ([]model.Object, error) { + rows, err := p.tx.Query(ctx, queryListByGuild, guildId) + if err != nil { + return nil, err + } + + defer rows.Close() + + var objects []model.Object + for rows.Next() { + var object model.Object + if err := rows.Scan(&object.GuildId, &object.TicketId, &object.BucketId); err != nil { + return nil, err + } + + objects = append(objects, object) + } + + return objects, nil +} + +func (p *PostgresObjectRepository) CreateObject(ctx context.Context, object model.Object) error { + if _, err := p.tx.Exec(ctx, queryCreateObject, object.GuildId, object.TicketId, object.BucketId); err != nil { + return err + } + + return nil +} + +func (p *PostgresObjectRepository) DeleteObject(ctx context.Context, guildId uint64, ticketId int) error { + if _, err := p.tx.Exec(ctx, queryDeleteObject, guildId, ticketId); err != nil { + return err + } + + return nil +} diff --git a/pkg/repository/postgresrepositories.go b/pkg/repository/postgresrepositories.go new file mode 100644 index 0000000..a0d4c31 --- /dev/null +++ b/pkg/repository/postgresrepositories.go @@ -0,0 +1,23 @@ +package repository + +import "github.com/jackc/pgx/v5" + +type PostgresRepositories struct { + tx pgx.Tx +} + +func newPostgresRepositories(tx pgx.Tx) *PostgresRepositories { + return &PostgresRepositories{ + tx: tx, + } +} + +var _ Repositories = (*PostgresRepositories)(nil) + +func (p *PostgresRepositories) Buckets() BucketRepository { + return newPostgresBucketRepository(p.tx) +} + +func (p *PostgresRepositories) Objects() ObjectRepository { + return newPostgresObjectRepository(p.tx) +} diff --git a/pkg/repository/repository.go b/pkg/repository/repository.go new file mode 100644 index 0000000..c140bb8 --- /dev/null +++ b/pkg/repository/repository.go @@ -0,0 +1,31 @@ +package repository + +import ( + "context" + "github.com/TicketsBot/logarchiver/pkg/repository/model" + "github.com/google/uuid" +) + +type Store interface { + Tx(context.Context, func(Repositories) error) error +} + +type Repositories interface { + Buckets() BucketRepository + Objects() ObjectRepository +} + +type BucketRepository interface { + ListBuckets(ctx context.Context) ([]model.Bucket, error) + GetActiveBucket(ctx context.Context) (model.Bucket, error) + SetActiveBucket(ctx context.Context, id uuid.UUID) error + CreateBucket(ctx context.Context, endpointUrl, name string) (uuid.UUID, error) + DeleteBucket(ctx context.Context, id uuid.UUID) error +} + +type ObjectRepository interface { + GetObject(ctx context.Context, guildId uint64, ticketId int) (model.Object, bool, error) + CreateObject(ctx context.Context, object model.Object) error + ListByGuild(ctx context.Context, guildId uint64) ([]model.Object, error) + DeleteObject(ctx context.Context, guildId uint64, ticketId int) error +} diff --git a/pkg/repository/sql/buckets/create.sql b/pkg/repository/sql/buckets/create.sql new file mode 100644 index 0000000..f25fc6a --- /dev/null +++ b/pkg/repository/sql/buckets/create.sql @@ -0,0 +1,2 @@ +INSERT INTO buckets ("id", "endpoint_url", "name", "active") +VALUES ($1, $2, $3, false); \ No newline at end of file diff --git a/pkg/repository/sql/buckets/delete.sql b/pkg/repository/sql/buckets/delete.sql new file mode 100644 index 0000000..36d37cd --- /dev/null +++ b/pkg/repository/sql/buckets/delete.sql @@ -0,0 +1,2 @@ +DELETE FROM buckets +WHERE "id" = $1; \ No newline at end of file diff --git a/pkg/repository/sql/buckets/get_active.sql b/pkg/repository/sql/buckets/get_active.sql new file mode 100644 index 0000000..1ac1d78 --- /dev/null +++ b/pkg/repository/sql/buckets/get_active.sql @@ -0,0 +1,3 @@ +SELECT "id", "endpoint_url", "name", "active" +FROM buckets +WHERE "active" = true; \ No newline at end of file diff --git a/pkg/repository/sql/buckets/list.sql b/pkg/repository/sql/buckets/list.sql new file mode 100644 index 0000000..727ae39 --- /dev/null +++ b/pkg/repository/sql/buckets/list.sql @@ -0,0 +1,2 @@ +SELECT "id", "endpoint_url", "name", "active" +FROM buckets; \ No newline at end of file diff --git a/pkg/repository/sql/buckets/set_active_remove_old.sql b/pkg/repository/sql/buckets/set_active_remove_old.sql new file mode 100644 index 0000000..8f2566f --- /dev/null +++ b/pkg/repository/sql/buckets/set_active_remove_old.sql @@ -0,0 +1,2 @@ +UPDATE buckets +SET active = false; \ No newline at end of file diff --git a/pkg/repository/sql/buckets/set_active_set_new.sql b/pkg/repository/sql/buckets/set_active_set_new.sql new file mode 100644 index 0000000..685fa1b --- /dev/null +++ b/pkg/repository/sql/buckets/set_active_set_new.sql @@ -0,0 +1,3 @@ +UPDATE buckets +SET active = true +WHERE "id" = $1; \ No newline at end of file diff --git a/pkg/repository/sql/objects/create.sql b/pkg/repository/sql/objects/create.sql new file mode 100644 index 0000000..6b308e9 --- /dev/null +++ b/pkg/repository/sql/objects/create.sql @@ -0,0 +1,2 @@ +INSERT INTO objects ("guild_id", "ticket_id", "bucket_id") +VALUES ($1, $2, $3); \ No newline at end of file diff --git a/pkg/repository/sql/objects/delete.sql b/pkg/repository/sql/objects/delete.sql new file mode 100644 index 0000000..9137485 --- /dev/null +++ b/pkg/repository/sql/objects/delete.sql @@ -0,0 +1,2 @@ +DELETE FROM objects +WHERE "guild_id" = $1 AND "ticket_id" = $2; \ No newline at end of file diff --git a/pkg/repository/sql/objects/get.sql b/pkg/repository/sql/objects/get.sql new file mode 100644 index 0000000..b12c6bf --- /dev/null +++ b/pkg/repository/sql/objects/get.sql @@ -0,0 +1,3 @@ +SELECT "guild_id", "ticket_id", "bucket_id" +FROM objects +WHERE "guild_id" = $1 AND "ticket_id" = $2; \ No newline at end of file diff --git a/pkg/repository/sql/objects/list_by_guild.sql b/pkg/repository/sql/objects/list_by_guild.sql new file mode 100644 index 0000000..12a9379 --- /dev/null +++ b/pkg/repository/sql/objects/list_by_guild.sql @@ -0,0 +1,3 @@ +SELECT "guild_id", "ticket_id", "bucket_id" +FROM objects +WHERE "guild_id" = $1; \ No newline at end of file diff --git a/pkg/s3client/client.go b/pkg/s3client/client.go index 15b4902..b46e939 100644 --- a/pkg/s3client/client.go +++ b/pkg/s3client/client.go @@ -5,12 +5,14 @@ import ( "context" "errors" "fmt" + "github.com/TicketsBot/logarchiver/pkg/repository/model" "github.com/minio/minio-go/v7" ) type S3Client struct { client *minio.Client bucketName string + bucket model.Bucket } func NewS3Client(client *minio.Client, bucketName string) *S3Client { @@ -85,6 +87,18 @@ func (c *S3Client) GetAllKeysForGuild(ctx context.Context, guildId uint64) ([]st return keys, nil } +// Minio returns the underlying minio client. This will be removed in the future, once the entries from the default +// bucket are migrated into the database. +func (c *S3Client) Minio() *minio.Client { + return c.client +} + +// BucketName returns the underlying minio client. This will be removed in the future, once the entries from the default +// bucket are migrated into the database. +func (c *S3Client) BucketName() string { + return c.bucketName +} + func isNotFoundErr(err error) bool { var resp minio.ErrorResponse return errors.As(err, &resp) && resp.Code == "NoSuchKey" diff --git a/pkg/s3client/clientmanager.go b/pkg/s3client/clientmanager.go new file mode 100644 index 0000000..a493ee1 --- /dev/null +++ b/pkg/s3client/clientmanager.go @@ -0,0 +1,112 @@ +package s3client + +import ( + "context" + "errors" + "github.com/TicketsBot/logarchiver/pkg/config" + "github.com/TicketsBot/logarchiver/pkg/repository" + "github.com/TicketsBot/logarchiver/pkg/repository/model" + "github.com/google/uuid" + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + "net/url" + "strings" + "sync" +) + +type ShardedClientManager struct { + config config.Config + store repository.Store + clients map[uuid.UUID]*S3Client + mu sync.RWMutex +} + +var ErrClientNotFound = errors.New("client not found") + +func NewShardedClientManager(config config.Config, store repository.Store) *ShardedClientManager { + return &ShardedClientManager{ + config: config, + store: store, + clients: make(map[uuid.UUID]*S3Client), + } +} + +func (s *ShardedClientManager) Load(ctx context.Context) error { + var buckets []model.Bucket + + if err := s.store.Tx(ctx, func(r repository.Repositories) error { + tmp, err := r.Buckets().ListBuckets(ctx) + if err != nil { + return err + } + + buckets = tmp + + return nil + }); err != nil { + return err + } + + s.mu.Lock() + defer s.mu.Unlock() + + s.clients = make(map[uuid.UUID]*S3Client) + + for _, bucket := range buckets { + // Extract host from endpoint URL + host, err := extractHost(bucket.EndpointUrl) + if err != nil { + return err + } + + m, err := minio.New(host, &minio.Options{ + Creds: credentials.NewStaticV4(s.config.AccessKey, s.config.SecretKey, ""), + Secure: true, + }) + + if err != nil { + return err + } + + s.clients[bucket.Id] = NewS3Client(m, bucket.Name) + } + + return nil +} + +func (s *ShardedClientManager) Get(bucketId uuid.UUID) (*S3Client, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + client, ok := s.clients[bucketId] + if !ok { + return nil, ErrClientNotFound + } + + return client, nil +} + +func (s *ShardedClientManager) GetAll() []*S3Client { + s.mu.RLock() + defer s.mu.RUnlock() + + clients := make([]*S3Client, 0, len(s.clients)) + for _, client := range s.clients { + clients = append(clients, client) + } + + return clients +} + +func extractHost(endpoint string) (string, error) { + if strings.HasPrefix(endpoint, "https://") || strings.HasPrefix(endpoint, "http://") { + parsed, err := url.Parse(endpoint) + if err != nil { + return "", err + } + + return parsed.Host, nil + } else { + return endpoint, nil + } +}