From 0de8b53e0feb9e2a0ad656fb30b712fd84e8e173 Mon Sep 17 00:00:00 2001 From: wuhuizuo Date: Thu, 31 Oct 2024 21:31:33 +0800 Subject: [PATCH] feat(publisher): implement fs worker (#188) Signed-off-by: wuhuizuo Signed-off-by: wuhuizuo --- publisher/cmd/worker/init.go | 97 +++++ publisher/cmd/worker/main.go | 76 +--- publisher/go.mod | 8 +- publisher/go.sum | 4 + publisher/pkg/config/config.go | 8 +- publisher/pkg/impl/fileserver_service.go | 2 +- publisher/pkg/impl/fileserver_worker.go | 154 +++++++ publisher/pkg/impl/funcs.go | 236 +++-------- publisher/pkg/impl/funcs_fs.go | 215 ++++++++++ publisher/pkg/impl/funcs_fs_test.go | 386 ++++++++++++++++++ publisher/pkg/impl/funcs_test.go | 6 +- publisher/pkg/impl/funcs_tiup.go | 174 ++++++++ publisher/pkg/impl/tiup_service.go | 2 +- .../pkg/impl/{worker.go => tiup_worker.go} | 80 ++-- publisher/pkg/impl/types.go | 25 +- 15 files changed, 1139 insertions(+), 334 deletions(-) create mode 100644 publisher/cmd/worker/init.go create mode 100644 publisher/pkg/impl/fileserver_worker.go create mode 100644 publisher/pkg/impl/funcs_fs.go create mode 100644 publisher/pkg/impl/funcs_fs_test.go create mode 100644 publisher/pkg/impl/funcs_tiup.go rename publisher/pkg/impl/{worker.go => tiup_worker.go} (70%) diff --git a/publisher/cmd/worker/init.go b/publisher/cmd/worker/init.go new file mode 100644 index 0000000..da48f89 --- /dev/null +++ b/publisher/cmd/worker/init.go @@ -0,0 +1,97 @@ +package main + +import ( + "fmt" + "os" + + "gopkg.in/yaml.v3" + + "github.com/go-redis/redis/v8" + "github.com/rs/zerolog/log" + "github.com/segmentio/kafka-go" + + "github.com/PingCAP-QE/ee-apps/publisher/pkg/config" + "github.com/PingCAP-QE/ee-apps/publisher/pkg/impl" +) + +// Load and parse configuration. +func loadConfig(configFile string) (config.Worker, error) { + var config config.Worker + { + configData, err := os.ReadFile(configFile) + if err != nil { + return config, fmt.Errorf("error reading config file: %v", err) + } + if err := yaml.Unmarshal(configData, &config); err != nil { + return config, fmt.Errorf("error parsing config file: %v", err) + } + } + return config, nil +} + +func initTiupWorkerFromConfig(configFile string) (*kafka.Reader, impl.Worker, error) { + config, err := loadConfig(configFile) + if err != nil { + return nil, nil, err + } + + // Configure Redis client. + redisClient := redis.NewClient(&redis.Options{ + Addr: config.Redis.Addr, + Password: config.Redis.Password, + Username: config.Redis.Username, + DB: config.Redis.DB, + }) + + worker, err := impl.NewTiupWorker(&log.Logger, redisClient, config.Options) + if err != nil { + log.Fatal().Err(err).Msg("Error creating tiup publishing worker") + } + + kafkaReader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: config.Kafka.Brokers, + Topic: config.Kafka.Topic, + GroupID: config.Kafka.ConsumerGroup, + MinBytes: 10e3, + MaxBytes: 10e6, + CommitInterval: 5000, + Logger: kafka.LoggerFunc(log.Printf), + }) + + return kafkaReader, worker, nil +} + +func initFsWorkerFromConfig(configFile string) (*kafka.Reader, impl.Worker) { + config, err := loadConfig(configFile) + if err != nil { + log.Fatal().Err(err).Msg("load config failed") + } + + // Configure Redis client. + redisClient := redis.NewClient(&redis.Options{ + Addr: config.Redis.Addr, + Password: config.Redis.Password, + Username: config.Redis.Username, + DB: config.Redis.DB, + }) + + var worker impl.Worker + { + worker, err = impl.NewFsWorker(&log.Logger, redisClient, config.Options) + if err != nil { + log.Fatal().Err(err).Msg("Error creating tiup publishing worker") + } + } + + kafkaReader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: config.Kafka.Brokers, + Topic: config.Kafka.Topic, + GroupID: config.Kafka.ConsumerGroup, + MinBytes: 10e3, + MaxBytes: 10e6, + CommitInterval: 5000, + Logger: kafka.LoggerFunc(log.Printf), + }) + + return kafkaReader, worker +} diff --git a/publisher/cmd/worker/main.go b/publisher/cmd/worker/main.go index cec0ec1..5c47fce 100644 --- a/publisher/cmd/worker/main.go +++ b/publisher/cmd/worker/main.go @@ -9,24 +9,21 @@ import ( "os/signal" "sync" "syscall" - "time" "github.com/cloudevents/sdk-go/v2/event" - "github.com/go-redis/redis/v8" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/segmentio/kafka-go" - "gopkg.in/yaml.v3" - "github.com/PingCAP-QE/ee-apps/publisher/pkg/config" "github.com/PingCAP-QE/ee-apps/publisher/pkg/impl" ) func main() { // Parse command-line flags var ( - configFile = flag.String("config", "config.yaml", "Path to config file") - dbgF = flag.Bool("debug", false, "Enable debug mode") + tiupConfigFile = flag.String("tiup-config", "config.yaml", "Path to config file") + fsConfigFile = flag.String("fs-config", "config.yaml", "Path to config file") + dbgF = flag.Bool("debug", false, "Enable debug mode") ) flag.Parse() @@ -38,58 +35,8 @@ func main() { zerolog.SetGlobalLevel(zerolog.InfoLevel) } - // Load and parse configuration - var config config.Worker - { - configData, err := os.ReadFile(*configFile) - if err != nil { - log.Fatal().Err(err).Msg("Error reading config file") - } - if err := yaml.Unmarshal(configData, &config); err != nil { - log.Fatal().Err(err).Msg("Error parsing config file") - } - } - - // Configure Redis client - redisClient := redis.NewClient(&redis.Options{ - Addr: config.Redis.Addr, - Password: config.Redis.Password, - Username: config.Redis.Username, - DB: config.Redis.DB, - }) - - ctx := log.Logger.WithContext(context.Background()) - // Create TiUP publisher - var handler *impl.Worker - { - var err error - nigthlyInterval, err := time.ParseDuration(config.Options.NightlyInterval) - if err != nil { - log.Fatal().Err(err).Msg("Error parsing nightly interval") - } - handler, err = impl.NewWorker(&log.Logger, redisClient, impl.WorkerOptions{ - MirrorURL: config.Options.MirrorURL, - LarkWebhookURL: config.Options.LarkWebhookURL, - NightlyInterval: nigthlyInterval, - }) - if err != nil { - log.Fatal().Err(err).Msg("Error creating handler") - } - } - - // Configure Kafka reader - var reader *kafka.Reader - { - reader = kafka.NewReader(kafka.ReaderConfig{ - Brokers: config.Kafka.Brokers, - Topic: config.Kafka.Topic, - GroupID: config.Kafka.ConsumerGroup, - MinBytes: 10e3, - MaxBytes: 10e6, - CommitInterval: 5000, - Logger: kafka.LoggerFunc(log.Printf), - }) - } + tiupPublishRequestKafkaReader, tiupWorker := initFsWorkerFromConfig(*tiupConfigFile) + fsPublishRequestKafkaReader, fsWorker := initFsWorkerFromConfig(*fsConfigFile) // Create channel used by both the signal handler and server goroutines // to notify the main goroutine when to stop the server. @@ -103,22 +50,21 @@ func main() { errc <- fmt.Errorf("%s", <-c) }() + // Start workers. var wg sync.WaitGroup - ctx, cancel := context.WithCancel(ctx) - - start(ctx, reader, handler, &wg, errc) + ctx, cancel := context.WithCancel(context.Background()) + startWorker(ctx, &wg, tiupPublishRequestKafkaReader, tiupWorker) + startWorker(ctx, &wg, fsPublishRequestKafkaReader, fsWorker) // Wait for signal. log.Warn().Msgf("exiting (%v)", <-errc) - // Send cancellation signal to the goroutines. cancel() - wg.Wait() log.Warn().Msg("exited") } -func start(ctx context.Context, reader *kafka.Reader, handler *impl.Worker, wg *sync.WaitGroup, errc chan error) { +func startWorker(ctx context.Context, wg *sync.WaitGroup, reader *kafka.Reader, worker impl.Worker) { (*wg).Add(1) go func() { defer (*wg).Done() @@ -142,7 +88,7 @@ func start(ctx context.Context, reader *kafka.Reader, handler *impl.Worker, wg * } log.Debug().Str("ce-id", cloudEvent.ID()).Str("ce-type", cloudEvent.Type()).Msg("received cloud event") - handler.Handle(cloudEvent) + worker.Handle(cloudEvent) } } }() diff --git a/publisher/go.mod b/publisher/go.mod index 085ac21..50bb03f 100644 --- a/publisher/go.mod +++ b/publisher/go.mod @@ -5,6 +5,8 @@ go 1.23 require ( github.com/PingCAP-QE/ee-apps/dl v0.0.0-20240926143418-da3f56b05e46 github.com/cloudevents/sdk-go/v2 v2.15.2 + github.com/go-redis/redis/v8 v8.11.5 + github.com/google/uuid v1.6.0 github.com/rs/zerolog v1.33.0 github.com/segmentio/kafka-go v0.4.47 goa.design/clue v1.0.6 @@ -14,22 +16,20 @@ require ( oras.land/oras-go/v2 v2.5.0 ) -require github.com/fsnotify/fsnotify v1.7.0 // indirect - require ( github.com/aws/smithy-go v1.20.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dimfeld/httppath v0.0.0-20170720192232-ee938bf73598 // indirect + github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-chi/chi/v5 v5.1.0 // indirect github.com/go-logfmt/logfmt v0.6.0 // indirect github.com/go-logr/logr v1.4.2 // indirect - github.com/go-redis/redis/v8 v8.11.5 - github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.9 // indirect + github.com/ks3sdklib/aws-sdk-go v1.3.0 github.com/manveru/faker v0.0.0-20171103152722-9fbc68a78c4d // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect diff --git a/publisher/go.sum b/publisher/go.sum index 2aa68dc..5ae52aa 100644 --- a/publisher/go.sum +++ b/publisher/go.sum @@ -44,6 +44,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/ks3sdklib/aws-sdk-go v1.3.0 h1:EmZaUWlrxrYo33v8auaRhyG3es3gK6WtiEI6I+8BdGU= +github.com/ks3sdklib/aws-sdk-go v1.3.0/go.mod h1:jGcsV0dJgMmStAyqjkKVUu6F167pAXYZAS3LqoZMmtM= github.com/manveru/faker v0.0.0-20171103152722-9fbc68a78c4d h1:Zj+PHjnhRYWBK6RqCDBcAhLXoi3TzC27Zad/Vn+gnVQ= github.com/manveru/faker v0.0.0-20171103152722-9fbc68a78c4d/go.mod h1:WZy8Q5coAB1zhY9AOBJP0O6J4BuDfbupUDavKY+I3+s= github.com/manveru/gobdd v0.0.0-20131210092515-f1a17fdd710b h1:3E44bLeN8uKYdfQqVQycPnaVviZdBLbizFhU49mtbe4= @@ -85,9 +87,11 @@ github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUan github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= diff --git a/publisher/pkg/config/config.go b/publisher/pkg/config/config.go index fd794e1..2405685 100644 --- a/publisher/pkg/config/config.go +++ b/publisher/pkg/config/config.go @@ -5,12 +5,8 @@ type Worker struct { KafkaBasic `yaml:",inline" json:",inline"` ConsumerGroup string `yaml:"consumer_group" json:"consumer_group,omitempty"` } `yaml:"kafka" json:"kafka,omitempty"` - Redis Redis `yaml:"redis" json:"redis,omitempty"` - Options struct { - MirrorURL string `yaml:"mirror_url" json:"mirror_url,omitempty"` - LarkWebhookURL string `yaml:"lark_webhook_url" json:"lark_webhook_url,omitempty"` - NightlyInterval string `yaml:"nightly_interval" json:"nightly_interval,omitempty"` - } + Redis Redis `yaml:"redis" json:"redis,omitempty"` + Options map[string]string `yaml:"options" json:"options,omitempty"` } type Service struct { diff --git a/publisher/pkg/impl/fileserver_service.go b/publisher/pkg/impl/fileserver_service.go index 47fe1f9..04cca9b 100644 --- a/publisher/pkg/impl/fileserver_service.go +++ b/publisher/pkg/impl/fileserver_service.go @@ -39,7 +39,7 @@ func NewFileserver(logger *zerolog.Logger, kafkaWriter *kafka.Writer, redisClien func (s *fileserversrvc) RequestToPublish(ctx context.Context, p *fileserver.RequestToPublishPayload) (res []string, err error) { s.logger.Info().Msgf("fileserver.request-to-publish") // 1. Analyze the artifact_url to get the repo and tag and the tiup package information. - publishRequests, err := analyzeFromOciArtifactUrl(p.ArtifactURL) + publishRequests, err := analyzeFsFromOciArtifactUrl(p.ArtifactURL) if err != nil { return nil, err } diff --git a/publisher/pkg/impl/fileserver_worker.go b/publisher/pkg/impl/fileserver_worker.go new file mode 100644 index 0000000..f26bf45 --- /dev/null +++ b/publisher/pkg/impl/fileserver_worker.go @@ -0,0 +1,154 @@ +package impl + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "slices" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/go-redis/redis/v8" + "github.com/ks3sdklib/aws-sdk-go/aws" + "github.com/ks3sdklib/aws-sdk-go/aws/awsutil" + "github.com/ks3sdklib/aws-sdk-go/aws/credentials" + "github.com/ks3sdklib/aws-sdk-go/service/s3" + "github.com/rs/zerolog" +) + +type fsWorker struct { + logger zerolog.Logger + redisClient redis.Cmdable + options struct { + LarkWebhookURL string + S3 struct { + BucketName string + } + } + s3Client *s3.S3 +} + +func NewFsWorker(logger *zerolog.Logger, redisClient redis.Cmdable, options map[string]string) (*fsWorker, error) { + handler := fsWorker{redisClient: redisClient} + if logger == nil { + handler.logger = zerolog.New(os.Stderr).With().Timestamp().Logger() + } else { + handler.logger = *logger + } + handler.options.LarkWebhookURL = options["lark_webhook_url"] + handler.options.S3.BucketName = options["s3.bucket_name"] + + cre := credentials.NewStaticCredentials(options["s3_access_key_id"], + options["s3_secret_access_key"], + options["s3_session_token"]) + handler.s3Client = s3.New(&aws.Config{ + Credentials: cre, + Region: options["s3.region"], + Endpoint: options["s3.endpoint"], + }) + + return &handler, nil +} + +func (p *fsWorker) SupportEventTypes() []string { + return []string{EventTypeFsPublishRequest} +} + +// Handle for test case run events +func (p *fsWorker) Handle(event cloudevents.Event) cloudevents.Result { + if !slices.Contains(p.SupportEventTypes(), event.Type()) { + return cloudevents.ResultNACK + } + p.redisClient.SetXX(context.Background(), event.ID(), PublishStateProcessing, redis.KeepTTL) + + data := new(PublishRequest) + if err := event.DataAs(&data); err != nil { + return cloudevents.NewReceipt(false, "invalid data: %v", err) + } + + result := p.handle(data) + switch { + case cloudevents.IsACK(result): + p.redisClient.SetXX(context.Background(), event.ID(), PublishStateSuccess, redis.KeepTTL) + case cloudevents.IsNACK(result): + p.redisClient.SetXX(context.Background(), event.ID(), PublishStateFailed, redis.KeepTTL) + p.notifyLark(&data.Publish, result) + default: + p.redisClient.SetXX(context.Background(), event.ID(), PublishStateCanceled, redis.KeepTTL) + } + + return result +} + +func (p *fsWorker) handle(data *PublishRequest) cloudevents.Result { + // 1. get the the file content from data.From. + err := doWithOCIFile(data.From.Oci, func(input io.Reader) error { + return doWithTempFileFromReader(input, func(inputF *os.File) error { + // 2. publish the tarball. + return p.publish(inputF, &data.Publish) + }) + }) + if err != nil { + p.logger.Err(err).Msg("publish to fileserver failed") + return cloudevents.NewReceipt(false, "publish to fileserver failed: %v", err) + } + p.logger.Info().Msg("publish to fileserver success") + + return cloudevents.ResultACK +} + +func (p *fsWorker) notifyLark(publishInfo *PublishInfo, err error) { + if p.options.LarkWebhookURL == "" { + return + } + + message := fmt.Sprintf("Failed to publish %s/%s/%s file to fileserver: %v", + publishInfo.Name, + publishInfo.Version, + publishInfo.EntryPoint, + err) + + payload := map[string]interface{}{ + "msg_type": "text", + "content": map[string]string{ + "text": message, + }, + } + + jsonPayload, err := json.Marshal(payload) + if err != nil { + p.logger.Err(err).Msg("failed to marshal JSON payload") + } + + resp, err := http.Post(p.options.LarkWebhookURL, "application/json", bytes.NewBuffer(jsonPayload)) + if err != nil { + p.logger.Err(err).Msg("failed to send notification to Lark") + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + p.logger.Error().Msgf("Lark API returned non-OK status: %d", resp.StatusCode) + } +} + +func (p *fsWorker) publish(content io.ReadSeeker, info *PublishInfo) error { + targetPath := targetFsFullPaths(info) + // upload file to the KingSoft cloud object bucket with the target path as key. + + key := targetPath[0] + resp, err := p.s3Client.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(p.options.S3.BucketName), // 存储空间名称,必填 + Key: aws.String(key), // 对象的key,必填 + Body: content, // 要上传的文件,必填 + ACL: aws.String("public-read"), // 对象的访问权限,非必填 + }) + if err != nil { + return err + } + fmt.Println(awsutil.StringValue(resp)) + return nil + +} diff --git a/publisher/pkg/impl/funcs.go b/publisher/pkg/impl/funcs.go index 179e7be..92f5eec 100644 --- a/publisher/pkg/impl/funcs.go +++ b/publisher/pkg/impl/funcs.go @@ -2,14 +2,11 @@ package impl import ( "context" - "crypto/sha256" "encoding/json" "fmt" "io" "net/http" "os" - "path/filepath" - "regexp" "strings" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -22,67 +19,6 @@ import ( "github.com/PingCAP-QE/ee-apps/dl/pkg/oci" ) -const nightlyVerSuffix = "-nightly" - -var ( - pkgNameRegex = regexp.MustCompile(`^(.+)-v\d+\.\d+\.\d+`) - pkgVersionNightlyRegex = regexp.MustCompile(`(-\d+-g[0-9a-f]{7,})$`) - ociGATagRegex = regexp.MustCompile(`^(v\d+\.\d+\.\d+)_(linux|darwin)_(amd64|arm64)$`) - ociNightlyTagRegex = regexp.MustCompile(`^(master|main)_(linux|darwin)_(amd64|arm64)$`) - tiupVersionRegex = regexp.MustCompile(`^v\d+\.\d+\.\d+.*(-nightly)$`) -) - -// # GA case: -// # when -// # - the version is "vX.Y.Z-pre" and -// # - the artifact_url has suffix: "vX.Y.Z_(linux|darwin)_(amd64|arm64)", -// # then -// # - set the version to "vX.Y.Z" - -// check the remote file after published. -func postCheckTiupPkg(localFile, remoteFileURL string) error { - // 1. Calculate the sha256sum of the local file - localSum, err := calculateSHA256(localFile) - if err != nil { - return fmt.Errorf("failed to calculate local file sha256: %v", err) - } - - // 2. Download the remote file - tempFile, err := downloadHTTPFile(remoteFileURL) - if err != nil { - return fmt.Errorf("failed to download remote file: %v", err) - } - defer os.Remove(tempFile) - - // 3. Calculate the sha256sum of the remote file - remoteSum, err := calculateSHA256(tempFile) - if err != nil { - return fmt.Errorf("failed to calculate remote file sha256: %v", err) - } - - // 4. Compare the two sha256sums - if localSum != remoteSum { - return fmt.Errorf("sha256 mismatch: local %s, remote %s", localSum, remoteSum) - } - - return nil -} - -func calculateSHA256(filePath string) (string, error) { - f, err := os.Open(filePath) - if err != nil { - return "", err - } - defer f.Close() - - h := sha256.New() - if _, err := io.Copy(h, f); err != nil { - return "", err - } - - return fmt.Sprintf("%x", h.Sum(nil)), nil -} - func downloadFile(data *PublishRequest) (string, error) { switch data.From.Type { case FromTypeOci: @@ -95,53 +31,71 @@ func downloadFile(data *PublishRequest) (string, error) { } } -func downloadHTTPFile(url string) (string, error) { - resp, err := http.Get(url) - if err != nil { - return "", err - } - defer resp.Body.Close() +func downloadOCIFile(from *FromOci) (ret string, err error) { + err = doWithOCIFile(from, func(input io.Reader) error { + ret, err = downloadFileFromReader(input) + return err + }) - if resp.StatusCode != http.StatusOK { - return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode) - } + return +} + +func downloadHTTPFile(url string) (ret string, err error) { + err = doWithHttpFile(url, func(input io.Reader) error { + ret, err = downloadFileFromReader(input) + return err + }) + return +} - tempFile, err := os.CreateTemp("", "remote_file_*") +func downloadFileFromReader(input io.Reader) (ret string, err error) { + doWithTempFileFromReader(input, func(input *os.File) error { + ret = input.Name() + return nil + }) + + return +} + +func doWithTempFileFromReader(input io.Reader, fn func(input *os.File) error) error { + tempFile, err := os.CreateTemp("", "download_file_*") if err != nil { - return "", err + return err } defer tempFile.Close() + if _, err := io.Copy(tempFile, input); err != nil { + return err + } + + return fn(tempFile) +} - _, err = io.Copy(tempFile, resp.Body) +func doWithHttpFile(url string, fn func(input io.Reader) error) error { + resp, err := http.Get(url) if err != nil { - return "", err + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected status code: %d", resp.StatusCode) } - return tempFile.Name(), nil + return fn(resp.Body) } -func downloadOCIFile(from *FromOci) (string, error) { +func doWithOCIFile(from *FromOci, fn func(input io.Reader) error) error { repo, err := getOciRepo(from.Repo) if err != nil { - return "", err + return err } rc, _, err := oci.NewFileReadCloser(context.Background(), repo, from.Tag, from.File) if err != nil { - return "", err + return err } defer rc.Close() - tempFile, err := os.CreateTemp("", "oci_download_*.tar.gz") - if err != nil { - return "", err - } - defer tempFile.Close() - - if _, err := io.Copy(tempFile, rc); err != nil { - return "", err - } - - return tempFile.Name(), nil + return fn(rc) } func getOciRepo(repo string) (*remote.Repository, error) { @@ -160,83 +114,6 @@ func getOciRepo(repo string) (*remote.Repository, error) { return repository, nil } -// Anlyze the artifact config and return the publish requests. -// -// Steps: -// 1. fetch the artifact config like "oras manifest fetch-config $repo:$tag" command, but we just use go code. -// 2. judge if the key "net.pingcap.tibuild.tiup" existed in the result of previous result. If not we stop and return empty. -// 3. loop for every element of the values of "net.pingcap.tibuild.tiup". -// 3.1 set the publish `from` part: -// 3.1.1 set the publish from type as "oci" -// 3.1.2 set the publish from repo and tag from param `repo`, `tag`. -// 3.1.3 set the publish from file with value of "file" key in the element. -// 3.2. set the publish info -// 3.2.1 set the publish info version with value of "org.opencontainers.image.version" key in top config. -// 3.2.2 set the publish info os with value of "net.pingcap.tibuild.os" key in top config. -// 3.2.3 set the publish info arch with value of "net.pingcap.tibuild.architecture" key in top config. -// 3.2.4 set the publish info name with prefix part of the value of "file" key in the element, right trim from the "-vX.Y.Z" part. -// 3.2.5 set the publish info description, entrypoint with value of same key in the element. -func AnalyzeFromOciArtifact(repo, tag string) ([]PublishRequest, error) { - // 1. Fetch the artifact config - config, ociDigest, err := fetchOCIArtifactConfig(repo, tag) - if err != nil { - return nil, err - } - - // 2. Check if "net.pingcap.tibuild.tiup" exists - tiupPackages, ok := config["net.pingcap.tibuild.tiup"].([]interface{}) - if !ok || len(tiupPackages) == 0 { - return nil, nil // No TiUP packages to publish - } - - // Get common information - os := config["net.pingcap.tibuild.os"].(string) - arch := config["net.pingcap.tibuild.architecture"].(string) - version := transformVer(config["org.opencontainers.image.version"].(string), tag) - - // 3. Loop through TiUP packages - var publishRequests []PublishRequest - for _, pkg := range tiupPackages { - pkgMap := pkg.(map[string]interface{}) - file := pkgMap["file"].(string) - - // 3.1 Set the publish 'from' part - from := From{ - Type: FromTypeOci, - Oci: &FromOci{ - Repo: repo, - File: file, - // use digest to avoid the problem of new override on the tag. - Tag: ociDigest, - }, - } - - // 3.2 Set the publish info - publishInfo := PublishInfo{ - Name: pkgName(file), - Version: version, - OS: os, - Arch: arch, - Description: pkgMap["description"].(string), - EntryPoint: pkgMap["entrypoint"].(string), - } - publishRequests = append(publishRequests, PublishRequest{ - From: from, - Publish: publishInfo, - }) - } - - return publishRequests, nil -} - -func analyzeFromOciArtifactUrl(url string) ([]PublishRequest, error) { - repo, tag, err := splitRepoAndTag(url) - if err != nil { - return nil, err - } - return AnalyzeFromOciArtifact(repo, tag) -} - func splitRepoAndTag(url string) (string, string, error) { splitor := ":" if strings.Contains(url, "@sha256:") { @@ -281,24 +158,3 @@ func fetchOCIArtifactConfig(repo, tag string) (map[string]interface{}, string, e return config, desc.Digest.String(), nil } - -// Get tiup pkg name from tarball filename -func pkgName(tarballPath string) string { - matches := pkgNameRegex.FindStringSubmatch(filepath.Base(tarballPath)) - if len(matches) > 1 { - return matches[1] - } - return "" -} - -func transformVer(version, tag string) string { - switch { - case ociGATagRegex.MatchString(tag): // GA case - return strings.TrimSuffix(version, "-pre") - case ociNightlyTagRegex.MatchString(tag): // Nightly case - // we replace the suffix part of version: '-[0-9]+-g[0-9a-f]+$' to "-nightly" - return pkgVersionNightlyRegex.ReplaceAllString(version, "") + "-nightly" - default: - return version - } -} diff --git a/publisher/pkg/impl/funcs_fs.go b/publisher/pkg/impl/funcs_fs.go new file mode 100644 index 0000000..d2f0482 --- /dev/null +++ b/publisher/pkg/impl/funcs_fs.go @@ -0,0 +1,215 @@ +package impl + +import ( + "context" + "fmt" + "path/filepath" + "regexp" + "strings" + + "github.com/PingCAP-QE/ee-apps/dl/pkg/oci" +) + +var reTagOsArchSuffix = regexp.MustCompile(`_(linux|darwin)_(amd64|arm64)$`) + +// Anlyze the artifact config and return the publish requests. +// +// Steps: +// 1. fetch the artifact config like "oras manifest fetch-config $repo:$tag" command, but we just use go code. +// 2. judge if the key "net.pingcap.tibuild.tiup" existed in the result of previous result. If not we stop and return empty. +// 3. loop for every element of the values of "net.pingcap.tibuild.tiup". +// 3.1 set the publish `from` part: +// 3.1.1 set the publish from type as "oci" +// 3.1.2 set the publish from repo and tag from param `repo`, `tag`. +// 3.1.3 set the publish from file with value of "file" key in the element. +// 3.2. set the publish info +// 3.2.1 set the publish info version with value of "org.opencontainers.image.version" key in top config. +// 3.2.2 set the publish info os with value of "net.pingcap.tibuild.os" key in top config. +// 3.2.3 set the publish info arch with value of "net.pingcap.tibuild.architecture" key in top config. +// 3.2.4 set the publish info name with prefix part of the value of "file" key in the element, right trim from the "-vX.Y.Z" part. +// 3.2.5 set the publish info description, entrypoint with value of same key in the element. +func analyzeFsFromOciArtifact(repo, tag string) ([]PublishRequest, error) { + switch { + case strings.HasSuffix(repo, "/pingcap/tidb-tools/package"): + return analyzeFsFromOciArtifactForTiDBTools(repo, tag) + case strings.HasSuffix(repo, "/pingcap/tidb-binlog/package"): + return analyzeFsFromOciArtifactForTiDBBinlog(repo, tag) + // more special cases can be added here. + } + + // 1. Fetch the artifact config + config, ociDigest, err := fetchOCIArtifactConfig(repo, tag) + if err != nil { + return nil, err + } + + // 2. Check if "net.pingcap.tibuild.tiup" exists + tiupPackages, ok := config["net.pingcap.tibuild.tiup"].([]interface{}) + if !ok || len(tiupPackages) == 0 { + return nil, nil // No fileserver packages to publish + } + + // Get common information + version := transformFsVer(config["net.pingcap.tibuild.git-sha"].(string), tag) + + // 3. Loop through TiUP packages + var publishRequests []PublishRequest + for _, pkg := range tiupPackages { + pkgMap := pkg.(map[string]interface{}) + file := pkgMap["file"].(string) + + // 3.1 Set the publish 'from' part + from := From{ + Type: FromTypeOci, + Oci: &FromOci{ + Repo: repo, + File: file, + // use digest to avoid the problem of new override on the tag. + Tag: ociDigest, + }, + } + + // 3.2 Set the publish info + publishInfo := PublishInfo{ + Name: tiupPkgName(file), + Version: version, + // TODO: if the pkgName is "tidb", then the entry point should be "tidb-server.tar.gz" + EntryPoint: transformFsEntryPoint(file), + } + publishRequests = append(publishRequests, PublishRequest{ + From: from, + Publish: publishInfo, + }) + } + + return publishRequests, nil +} + +func analyzeFsFromOciArtifactForTiDBTools(repo, tag string) ([]PublishRequest, error) { + // 1. Fetch the artifact config + config, ociDigest, err := fetchOCIArtifactConfig(repo, tag) + if err != nil { + return nil, err + } + + // Get common information + version := transformFsVer(config["net.pingcap.tibuild.git-sha"].(string), tag) + + // Find the file. + repository, err := getOciRepo(repo) + if err != nil { + return nil, fmt.Errorf("failed to get OCI repository: %v", err) + } + + files, err := oci.ListFiles(context.Background(), repository, tag) + if err != nil { + return nil, fmt.Errorf("failed to get file list: %v", err) + } + var file string + for _, f := range files { + if strings.HasSuffix(f, ".tar.gz") && strings.HasPrefix(f, "tidb-tools-") { + file = f + break + } + } + + // 3.1 Set the publish 'from' part + from := From{ + Type: FromTypeOci, + Oci: &FromOci{ + Repo: repo, + File: file, + Tag: ociDigest, + }, + } + + // 3.2 Set the publish info + publishInfo := PublishInfo{ + Name: "tidb-tools", + Version: version, + EntryPoint: "centos7/tidb-tools.tar.gz", + } + return []PublishRequest{{From: from, Publish: publishInfo}}, nil +} + +func analyzeFsFromOciArtifactForTiDBBinlog(repo, tag string) ([]PublishRequest, error) { + // 1. Fetch the artifact config + config, ociDigest, err := fetchOCIArtifactConfig(repo, tag) + if err != nil { + return nil, err + } + + // Get common information + version := transformFsVer(config["net.pingcap.tibuild.git-sha"].(string), tag) + + // Find the file. + repository, err := getOciRepo(repo) + if err != nil { + return nil, fmt.Errorf("failed to get OCI repository: %v", err) + } + + files, err := oci.ListFiles(context.Background(), repository, tag) + if err != nil { + return nil, fmt.Errorf("failed to get file list: %v", err) + } + var file string + for _, f := range files { + if strings.HasSuffix(f, ".tar.gz") && strings.HasPrefix(f, "binaries-") { + file = f + break + } + } + + // 3.1 Set the publish 'from' part + from := From{ + Type: FromTypeOci, + Oci: &FromOci{ + Repo: repo, + File: file, + Tag: ociDigest, + }, + } + + // 3.2 Set the publish info + publishInfo := PublishInfo{ + Name: "tidb-binlog", + Version: version, + EntryPoint: "centos7/tidb-binlog.tar.gz", + } + return []PublishRequest{{From: from, Publish: publishInfo}}, nil +} + +func analyzeFsFromOciArtifactUrl(url string) ([]PublishRequest, error) { + repo, tag, err := splitRepoAndTag(url) + if err != nil { + return nil, err + } + return analyzeFsFromOciArtifact(repo, tag) +} + +func transformFsVer(commitSHA1, tag string) string { + // Remove OS and arch suffix using regex + branch := reTagOsArchSuffix.ReplaceAllString(tag, "") + return fmt.Sprintf("%s#%s", branch, commitSHA1) +} + +func transformFsEntryPoint(file string) string { + base := tiupPkgName(file) + switch base { + case "tidb", "pd", "tikv": + return fmt.Sprintf("centos7/%s-server.tar.gz", base) + default: + return fmt.Sprintf("centos7/%s.tar.gz", base) + } +} + +func targetFsFullPaths(p *PublishInfo) []string { + var ret []string + + // the / path: pingcap//// + ret = append(ret, filepath.Join("pingcap", p.Name, strings.ReplaceAll(p.Version, "#", "/"), p.EntryPoint)) + // the / path: pingcap/// + ret = append(ret, filepath.Join("pingcap", p.Name, filepath.Base(strings.ReplaceAll(p.Version, "#", "/")), p.EntryPoint)) + + return ret +} diff --git a/publisher/pkg/impl/funcs_fs_test.go b/publisher/pkg/impl/funcs_fs_test.go new file mode 100644 index 0000000..79c32b2 --- /dev/null +++ b/publisher/pkg/impl/funcs_fs_test.go @@ -0,0 +1,386 @@ +package impl + +import ( + "encoding/json" + "reflect" + "testing" +) + +func Test_analyzeFsFromOciArtifact(t *testing.T) { + // t.Skipf("maybe it is out of date") + type args struct { + repo string + tag string + } + tests := []struct { + name string + args args + want []PublishRequest + wantErr bool + }{ + { + name: "Empty config", + args: args{ + repo: "hub.pingcap.net/pingcap/tidb/package", + tag: "empty_config", + }, + want: nil, + wantErr: true, + }, + { + name: "Valid fs config - tidb", + args: args{ + repo: "hub.pingcap.net/pingcap/tidb/package", + tag: "master_linux_amd64", + }, + want: []PublishRequest{ + { + From: From{ + Type: FromTypeOci, + Oci: &FromOci{ + Repo: "hub.pingcap.net/pingcap/tidb/package", + Tag: "sha256:1b146aa1b65e0a4fb2044f464a7fbceb025fcd6a2ddf0c28ab53f671503eea9d", + File: "tidb-v8.5.0-alpha-22-ga22fc590cc-linux-amd64.tar.gz", + }, + }, + Publish: PublishInfo{ + Name: "tidb", + Version: "master#a22fc590cc9efb13c025386712f39338c9821187", + EntryPoint: "centos7/tidb-server.tar.gz", + }, + }, + { + From: From{ + Type: FromTypeOci, + Oci: &FromOci{ + Repo: "hub.pingcap.net/pingcap/tidb/package", + Tag: "sha256:1b146aa1b65e0a4fb2044f464a7fbceb025fcd6a2ddf0c28ab53f671503eea9d", + File: "br-v8.5.0-alpha-22-ga22fc590cc-linux-amd64.tar.gz", + }, + }, + Publish: PublishInfo{ + Name: "br", + Version: "master#a22fc590cc9efb13c025386712f39338c9821187", + EntryPoint: "centos7/br.tar.gz", + }, + }, + { + From: From{ + Type: FromTypeOci, + Oci: &FromOci{ + Repo: "hub.pingcap.net/pingcap/tidb/package", + Tag: "sha256:1b146aa1b65e0a4fb2044f464a7fbceb025fcd6a2ddf0c28ab53f671503eea9d", + File: "dumpling-v8.5.0-alpha-22-ga22fc590cc-linux-amd64.tar.gz", + }, + }, + Publish: PublishInfo{ + Name: "dumpling", + Version: "master#a22fc590cc9efb13c025386712f39338c9821187", + EntryPoint: "centos7/dumpling.tar.gz", + }, + }, + { + From: From{ + Type: FromTypeOci, + Oci: &FromOci{ + Repo: "hub.pingcap.net/pingcap/tidb/package", + Tag: "sha256:1b146aa1b65e0a4fb2044f464a7fbceb025fcd6a2ddf0c28ab53f671503eea9d", + File: "tidb-lightning-v8.5.0-alpha-22-ga22fc590cc-linux-amd64.tar.gz", + }, + }, + Publish: PublishInfo{ + Name: "tidb-lightning", + Version: "master#a22fc590cc9efb13c025386712f39338c9821187", + EntryPoint: "centos7/tidb-lightning.tar.gz", + }, + }, + }, + wantErr: false, + }, + { + name: "Valid fs config - tiflow", + args: args{ + repo: "hub.pingcap.net/pingcap/tiflow/package", + tag: "master_linux_amd64", + }, + want: []PublishRequest{ + { + From: From{ + Type: "oci", + Oci: &FromOci{ + Repo: "hub.pingcap.net/pingcap/tiflow/package", + Tag: "sha256:3bd51f5057646e3d7894573d07af1af63d94336f9323c67106caf265c191054f", + File: "cdc-v8.5.0-alpha-3-g0510cf054-linux-amd64.tar.gz", + }, + }, + Publish: PublishInfo{ + Name: "cdc", + Version: "master#0510cf05400ec2302052a517d281bf3aff7cfc04", + EntryPoint: "centos7/cdc.tar.gz", + }, + }, + { + From: From{ + Type: "oci", + Oci: &FromOci{ + Repo: "hub.pingcap.net/pingcap/tiflow/package", + Tag: "sha256:3bd51f5057646e3d7894573d07af1af63d94336f9323c67106caf265c191054f", + File: "dm-master-v8.5.0-alpha-3-g0510cf054-linux-amd64.tar.gz", + }, + }, + Publish: PublishInfo{ + Name: "dm-master", + Version: "master#0510cf05400ec2302052a517d281bf3aff7cfc04", + EntryPoint: "centos7/dm-master.tar.gz", + }, + }, + { + From: From{ + Type: "oci", + Oci: &FromOci{ + Repo: "hub.pingcap.net/pingcap/tiflow/package", + Tag: "sha256:3bd51f5057646e3d7894573d07af1af63d94336f9323c67106caf265c191054f", + File: "dm-worker-v8.5.0-alpha-3-g0510cf054-linux-amd64.tar.gz", + }, + }, + Publish: PublishInfo{ + Name: "dm-worker", + Version: "master#0510cf05400ec2302052a517d281bf3aff7cfc04", + EntryPoint: "centos7/dm-worker.tar.gz", + }, + }, + { + From: From{ + Type: "oci", + Oci: &FromOci{ + Repo: "hub.pingcap.net/pingcap/tiflow/package", + Tag: "sha256:3bd51f5057646e3d7894573d07af1af63d94336f9323c67106caf265c191054f", + File: "dmctl-v8.5.0-alpha-3-g0510cf054-linux-amd64.tar.gz", + }, + }, + Publish: PublishInfo{ + Name: "dmctl", + Version: "master#0510cf05400ec2302052a517d281bf3aff7cfc04", + EntryPoint: "centos7/dmctl.tar.gz", + }, + }, + }, + wantErr: false, + }, + { + name: "Valid fs config - tiflash", + args: args{ + repo: "hub.pingcap.net/pingcap/tiflash/package", + tag: "master_linux_amd64", + }, + want: []PublishRequest{ + { + From: From{ + Type: FromTypeOci, + Oci: &FromOci{ + Repo: "hub.pingcap.net/pingcap/tiflash/package", + Tag: "sha256:0903955bbcf4ce8306af7f3138863ed899c5000112cd64d04ce07bb7dd9a816a", + File: "tiflash-v8.5.0-alpha-7-g5dd3a733a-linux-amd64.tar.gz", + }, + }, + Publish: PublishInfo{ + Name: "tiflash", + Version: "master#5dd3a733a2dec9ee3548353356583b21cd03d54d", + EntryPoint: "centos7/tiflash.tar.gz", + }, + }, + }, + wantErr: false, + }, + { + name: "Valid fs config - pd", + args: args{ + repo: "hub.pingcap.net/tikv/pd/package", + tag: "master_linux_amd64", + }, + want: []PublishRequest{ + { + From: From{ + Type: FromTypeOci, + Oci: &FromOci{ + Repo: "hub.pingcap.net/tikv/pd/package", + Tag: "sha256:eac8e279cd5acb2fa900640175418837c3f8517299d76a7509afd5ab9c6f939d", + File: "pd-v8.5.0-alpha-2-g649393a4-linux-amd64.tar.gz", + }, + }, + Publish: PublishInfo{ + Name: "pd", + Version: "master#649393a40725e9c91397790ede93dd3a0b7f08ef", + EntryPoint: "centos7/pd.tar.gz", + }, + }, + { + From: From{ + Type: FromTypeOci, + Oci: &FromOci{ + Repo: "hub.pingcap.net/tikv/pd/package", + Tag: "sha256:eac8e279cd5acb2fa900640175418837c3f8517299d76a7509afd5ab9c6f939d", + File: "pd-recover-v8.5.0-alpha-2-g649393a4-linux-amd64.tar.gz", + }, + }, + Publish: PublishInfo{ + Name: "pd-recover", + Version: "master#649393a40725e9c91397790ede93dd3a0b7f08ef", + EntryPoint: "centos7/pd-recover.tar.gz", + }, + }, + }, + wantErr: false, + }, + { + name: "Valid fs config - tikv", + args: args{ + repo: "hub.pingcap.net/tikv/tikv/package", + tag: "master_linux_amd64", + }, + want: []PublishRequest{ + { + From: From{ + Type: FromTypeOci, + Oci: &FromOci{ + Repo: "hub.pingcap.net/tikv/tikv/package", + Tag: "sha256:c7a9b221bed19c885bf8a1bc00af1fb35cc5d789337c842508682eea4064f2fa", + File: "tikv-v8.5.0-alpha-2-gdc9cd3dbd-linux-amd64.tar.gz", + }, + }, + Publish: PublishInfo{ + Name: "tikv", + Version: "master#dc9cd3dbdace23ac1f590aaee7966705a7e12825", + EntryPoint: "centos7/tikv.tar.gz", + }, + }, + }, + wantErr: false, + }, + { + name: "Valid fs config - tidb-tools", + args: args{ + repo: "hub.pingcap.net/pingcap/tidb-tools/package", + tag: "master_linux_amd64", + }, + want: []PublishRequest{ + { + From: From{ + Type: FromTypeOci, + Oci: &FromOci{ + Repo: "hub.pingcap.net/pingcap/tidb-tools/package", + Tag: "sha256:315dbe7cc91fb998f9b2f3cd256f11d236cf235537a0b8f27457381c6cd30b19", + File: "tidb-tools-v7.5.3-2-gd226440-linux-amd64.tar.gz", + }, + }, + Publish: PublishInfo{ + Name: "tidb-tools", + Version: "master#d226440121147098eb5eb99cbc1efb94092ec68e", + EntryPoint: "centos7/tidb-tools.tar.gz", + }, + }, + }, + wantErr: false, + }, + { + name: "Valid fs config - tidb-binlog", + args: args{ + repo: "hub.pingcap.net/pingcap/tidb-binlog/package", + tag: "master_linux_amd64", + }, + want: []PublishRequest{ + { + From: From{ + Type: FromTypeOci, + Oci: &FromOci{ + Repo: "hub.pingcap.net/pingcap/tidb-binlog/package", + Tag: "sha256:437cf5943318cebae48adc6698380537a2c5173422ba7048196124c198aaa1b8", + File: "binaries-v8.3.0-alpha-1-g6905951-linux-amd64.tar.gz", + }, + }, + Publish: PublishInfo{ + Name: "tidb-binlog", + Version: "master#6905951ca9460e2d4e5a82273e01f6a36b4d1ef3", + EntryPoint: "centos7/tidb-binlog.tar.gz", + }, + }, + }, + wantErr: false, + }, + { + name: "Invalid repo", + args: args{ + repo: "invalid-repo", + tag: "v1.0.0", + }, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := analyzeFsFromOciArtifact(tt.args.repo, tt.args.tag) + if (err != nil) != tt.wantErr { + t.Errorf("analyzeFsFromOciArtifact() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + gotBytes, _ := json.MarshalIndent(got, "", " ") + wantBytes, _ := json.MarshalIndent(tt.want, "", " ") + t.Errorf("analyzeFsFromOciArtifact() = \n%s, want \n%s", gotBytes, wantBytes) + } + }) + } +} +func Test_targetFsFullPaths(t *testing.T) { + tests := []struct { + name string + p *PublishInfo + want []string + }{ + { + name: "Basic version with commit hash", + p: &PublishInfo{ + Name: "tidb", + Version: "master#abc123def", + EntryPoint: "bin/tidb-server", + }, + want: []string{ + "pingcap/tidb/master/abc123def/bin/tidb-server", + "pingcap/tidb/abc123def/bin/tidb-server", + }, + }, + { + name: "Version with multiple hash symbols", + p: &PublishInfo{ + Name: "pd", + Version: "release-5.0#abc", + EntryPoint: "pd-server", + }, + want: []string{ + "pingcap/pd/release-5.0/abc/pd-server", + "pingcap/pd/abc/pd-server", + }, + }, + { + name: "Complex entry point path", + p: &PublishInfo{ + Name: "tiflash", + Version: "nightly#xyz789", + EntryPoint: "opt/tiflash/bin/tiflash-server", + }, + want: []string{ + "pingcap/tiflash/nightly/xyz789/opt/tiflash/bin/tiflash-server", + "pingcap/tiflash/xyz789/opt/tiflash/bin/tiflash-server", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := targetFsFullPaths(tt.p) + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("targetFsFullPaths() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/publisher/pkg/impl/funcs_test.go b/publisher/pkg/impl/funcs_test.go index 54a5923..de0ffd2 100644 --- a/publisher/pkg/impl/funcs_test.go +++ b/publisher/pkg/impl/funcs_test.go @@ -221,7 +221,7 @@ func Test_analyzeFromOciArtifact(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := AnalyzeFromOciArtifact(tt.args.repo, tt.args.tag) + got, err := analyzeTiupFromOciArtifact(tt.args.repo, tt.args.tag) if (err != nil) != tt.wantErr { t.Errorf("analyzeFromOciArtifact() error = %v, wantErr %v", err, tt.wantErr) return @@ -271,7 +271,7 @@ func Test_pkgName(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := pkgName(tt.tarballPath); got != tt.want { + if got := tiupPkgName(tt.tarballPath); got != tt.want { t.Errorf("pkgName() = %v, want %v", got, tt.want) } }) @@ -330,7 +330,7 @@ func Test_transformVer(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := transformVer(tt.version, tt.tag) + got := transformTiupVer(tt.version, tt.tag) if got != tt.want { t.Errorf("transformVer() = %v, want %v", got, tt.want) } diff --git a/publisher/pkg/impl/funcs_tiup.go b/publisher/pkg/impl/funcs_tiup.go new file mode 100644 index 0000000..1ea4c30 --- /dev/null +++ b/publisher/pkg/impl/funcs_tiup.go @@ -0,0 +1,174 @@ +package impl + +import ( + "crypto/sha256" + "fmt" + "io" + "os" + "path/filepath" + "regexp" + "strings" +) + +const nightlyVerSuffix = "-nightly" + +var ( + pkgNameRegex = regexp.MustCompile(`^(.+)-v\d+\.\d+\.\d+`) + pkgVersionNightlyRegex = regexp.MustCompile(`(-\d+-g[0-9a-f]{7,})$`) + ociGATagRegex = regexp.MustCompile(`^(v\d+\.\d+\.\d+)_(linux|darwin)_(amd64|arm64)$`) + ociNightlyTagRegex = regexp.MustCompile(`^(master|main)_(linux|darwin)_(amd64|arm64)$`) + tiupVersionRegex = regexp.MustCompile(`^v\d+\.\d+\.\d+.*(-nightly)$`) +) + +// # GA case: +// # when +// # - the version is "vX.Y.Z-pre" and +// # - the artifact_url has suffix: "vX.Y.Z_(linux|darwin)_(amd64|arm64)", +// # then +// # - set the version to "vX.Y.Z" + +// check the remote file after published. +func postCheckTiupPkg(localFile, remoteFileURL string) error { + // 1. Calculate the sha256sum of the local file + localSum, err := calculateSHA256(localFile) + if err != nil { + return fmt.Errorf("failed to calculate local file sha256: %v", err) + } + + // 2. Download the remote file + tempFile, err := downloadHTTPFile(remoteFileURL) + if err != nil { + return fmt.Errorf("failed to download remote file: %v", err) + } + defer os.Remove(tempFile) + + // 3. Calculate the sha256sum of the remote file + remoteSum, err := calculateSHA256(tempFile) + if err != nil { + return fmt.Errorf("failed to calculate remote file sha256: %v", err) + } + + // 4. Compare the two sha256sums + if localSum != remoteSum { + return fmt.Errorf("sha256 mismatch: local %s, remote %s", localSum, remoteSum) + } + + return nil +} + +func calculateSHA256(filePath string) (string, error) { + f, err := os.Open(filePath) + if err != nil { + return "", err + } + defer f.Close() + + h := sha256.New() + if _, err := io.Copy(h, f); err != nil { + return "", err + } + + return fmt.Sprintf("%x", h.Sum(nil)), nil +} + +// Anlyze the artifact config and return the publish requests. +// +// Steps: +// 1. fetch the artifact config like "oras manifest fetch-config $repo:$tag" command, but we just use go code. +// 2. judge if the key "net.pingcap.tibuild.tiup" existed in the result of previous result. If not we stop and return empty. +// 3. loop for every element of the values of "net.pingcap.tibuild.tiup". +// 3.1 set the publish `from` part: +// 3.1.1 set the publish from type as "oci" +// 3.1.2 set the publish from repo and tag from param `repo`, `tag`. +// 3.1.3 set the publish from file with value of "file" key in the element. +// 3.2. set the publish info +// 3.2.1 set the publish info version with value of "org.opencontainers.image.version" key in top config. +// 3.2.2 set the publish info os with value of "net.pingcap.tibuild.os" key in top config. +// 3.2.3 set the publish info arch with value of "net.pingcap.tibuild.architecture" key in top config. +// 3.2.4 set the publish info name with prefix part of the value of "file" key in the element, right trim from the "-vX.Y.Z" part. +// 3.2.5 set the publish info description, entrypoint with value of same key in the element. +func analyzeTiupFromOciArtifact(repo, tag string) ([]PublishRequest, error) { + // 1. Fetch the artifact config + config, ociDigest, err := fetchOCIArtifactConfig(repo, tag) + if err != nil { + return nil, err + } + + // 2. Check if "net.pingcap.tibuild.tiup" exists + tiupPackages, ok := config["net.pingcap.tibuild.tiup"].([]interface{}) + if !ok || len(tiupPackages) == 0 { + return nil, nil // No TiUP packages to publish + } + + // Get common information + os := config["net.pingcap.tibuild.os"].(string) + arch := config["net.pingcap.tibuild.architecture"].(string) + version := transformTiupVer(config["org.opencontainers.image.version"].(string), tag) + + // 3. Loop through TiUP packages + var publishRequests []PublishRequest + for _, pkg := range tiupPackages { + pkgMap := pkg.(map[string]interface{}) + file := pkgMap["file"].(string) + + // 3.1 Set the publish 'from' part + from := From{ + Type: FromTypeOci, + Oci: &FromOci{ + Repo: repo, + File: file, + // use digest to avoid the problem of new override on the tag. + Tag: ociDigest, + }, + } + + // 3.2 Set the publish info + publishInfo := PublishInfo{ + Name: tiupPkgName(file), + Version: version, + OS: os, + Arch: arch, + Description: pkgMap["description"].(string), + EntryPoint: pkgMap["entrypoint"].(string), + } + publishRequests = append(publishRequests, PublishRequest{ + From: from, + Publish: publishInfo, + }) + } + + return publishRequests, nil +} + +func analyzeTiupFromOciArtifactUrl(url string) ([]PublishRequest, error) { + repo, tag, err := splitRepoAndTag(url) + if err != nil { + return nil, err + } + return analyzeTiupFromOciArtifact(repo, tag) +} + +// Get tiup pkg name from tarball filename +func tiupPkgName(tarballPath string) string { + matches := pkgNameRegex.FindStringSubmatch(filepath.Base(tarballPath)) + if len(matches) > 1 { + return matches[1] + } + return "" +} + +func transformTiupVer(version, tag string) string { + switch { + case ociGATagRegex.MatchString(tag): // GA case + return strings.TrimSuffix(version, "-pre") + case ociNightlyTagRegex.MatchString(tag): // Nightly case + // we replace the suffix part of version: '-[0-9]+-g[0-9a-f]+$' to "-nightly" + return pkgVersionNightlyRegex.ReplaceAllString(version, "") + nightlyVerSuffix + default: + return version + } +} + +func isNightlyTiup(p PublishInfo) bool { + return tiupVersionRegex.MatchString(p.Version) +} diff --git a/publisher/pkg/impl/tiup_service.go b/publisher/pkg/impl/tiup_service.go index c5abf48..e196b85 100644 --- a/publisher/pkg/impl/tiup_service.go +++ b/publisher/pkg/impl/tiup_service.go @@ -39,7 +39,7 @@ func NewTiup(logger *zerolog.Logger, kafkaWriter *kafka.Writer, redisClient redi func (s *tiupsrvc) RequestToPublish(ctx context.Context, p *gentiup.RequestToPublishPayload) (res []string, err error) { s.logger.Info().Msgf("tiup.request-to-publish") // 1. Analyze the artifact_url to get the repo and tag and the tiup package information. - publishRequests, err := analyzeFromOciArtifactUrl(p.ArtifactURL) + publishRequests, err := analyzeTiupFromOciArtifactUrl(p.ArtifactURL) if err != nil { return nil, err } diff --git a/publisher/pkg/impl/worker.go b/publisher/pkg/impl/tiup_worker.go similarity index 70% rename from publisher/pkg/impl/worker.go rename to publisher/pkg/impl/tiup_worker.go index dae80c9..4f655de 100644 --- a/publisher/pkg/impl/worker.go +++ b/publisher/pkg/impl/tiup_worker.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "encoding/json" - "errors" "fmt" "net/http" "os" @@ -17,35 +16,41 @@ import ( "github.com/rs/zerolog" ) -type Worker struct { +type tiupWorker struct { logger zerolog.Logger redisClient redis.Cmdable - options WorkerOptions -} - -type WorkerOptions struct { - LarkWebhookURL string - MirrorURL string - NightlyInterval time.Duration + options struct { + LarkWebhookURL string + MirrorURL string + NightlyInterval time.Duration + } } -func NewWorker(logger *zerolog.Logger, redisClient redis.Cmdable, options WorkerOptions) (*Worker, error) { - handler := Worker{options: options, redisClient: redisClient} +func NewTiupWorker(logger *zerolog.Logger, redisClient redis.Cmdable, options map[string]string) (*tiupWorker, error) { + handler := tiupWorker{redisClient: redisClient} if logger == nil { handler.logger = zerolog.New(os.Stderr).With().Timestamp().Logger() } else { handler.logger = *logger } + nigthlyInterval, err := time.ParseDuration(options["nightly_interval"]) + if err != nil { + return nil, fmt.Errorf("parsing nightly interval failed: %v", err) + } + handler.options.NightlyInterval = nigthlyInterval + handler.options.MirrorURL = options["mirror_url"] + handler.options.LarkWebhookURL = options["lark_webhook_url"] + return &handler, nil } -func (p *Worker) SupportEventTypes() []string { - return []string{EventTypeTiupPublishRequest, EventTypeFsPublishRequest} +func (p *tiupWorker) SupportEventTypes() []string { + return []string{EventTypeTiupPublishRequest} } // Handle for test case run events -func (p *Worker) Handle(event cloudevents.Event) cloudevents.Result { +func (p *tiupWorker) Handle(event cloudevents.Event) cloudevents.Result { if !slices.Contains(p.SupportEventTypes(), event.Type()) { return cloudevents.ResultNACK } @@ -56,16 +61,7 @@ func (p *Worker) Handle(event cloudevents.Event) cloudevents.Result { return cloudevents.NewReceipt(false, "invalid data: %v", err) } - var result cloudevents.Result - switch event.Type() { - case EventTypeTiupPublishRequest: - result = p.rateLimitForTiup(data, p.options.NightlyInterval, p.handleTiup) - case EventTypeFsPublishRequest: - result = p.handleFs(data) - default: - result = fmt.Errorf("unsupported event type: %s", event.Type()) - } - + result := p.rateLimit(data, p.options.NightlyInterval, p.handle) switch { case cloudevents.IsACK(result): p.redisClient.SetXX(context.Background(), event.ID(), PublishStateSuccess, redis.KeepTTL) @@ -79,9 +75,9 @@ func (p *Worker) Handle(event cloudevents.Event) cloudevents.Result { return result } -func (p *Worker) rateLimitForTiup(data *PublishRequest, ttl time.Duration, run func(*PublishRequest) cloudevents.Result) cloudevents.Result { +func (p *tiupWorker) rateLimit(data *PublishRequest, ttl time.Duration, run func(*PublishRequest) cloudevents.Result) cloudevents.Result { // Skip rate limiting for no nightly builds - if !data.Publish.IsNightlyTiup() || ttl <= 0 { + if !isNightlyTiup(data.Publish) || ttl <= 0 { return run(data) } @@ -112,7 +108,7 @@ func (p *Worker) rateLimitForTiup(data *PublishRequest, ttl time.Duration, run f return fmt.Errorf("skip: rate limit exceeded for package %s", data.Publish.Name) } -func (p *Worker) handleTiup(data *PublishRequest) cloudevents.Result { +func (p *tiupWorker) handle(data *PublishRequest) cloudevents.Result { // 1. get the the tarball from data.From. saveTo, err := downloadFile(data) if err != nil { @@ -122,7 +118,7 @@ func (p *Worker) handleTiup(data *PublishRequest) cloudevents.Result { p.logger.Info().Msg("download file success") // 2. publish the tarball to the mirror. - if err := p.publishTiupPkg(saveTo, &data.Publish); err != nil { + if err := p.publish(saveTo, &data.Publish); err != nil { p.logger.Err(err).Msg("publish to mirror failed") return cloudevents.NewReceipt(false, "publish to mirror failed: %v", err) } @@ -140,26 +136,7 @@ func (p *Worker) handleTiup(data *PublishRequest) cloudevents.Result { return cloudevents.ResultACK } -func (p *Worker) handleFs(data *PublishRequest) cloudevents.Result { - // 1. get the the tarball from data.From. - saveTo, err := downloadFile(data) - if err != nil { - p.logger.Err(err).Msg("download file failed") - return cloudevents.NewReceipt(false, "download file failed: %v", err) - } - p.logger.Info().Msg("download file success") - - // 2. publish the tarball to the mirror. - if err := p.publishFileserver(saveTo, &data.Publish); err != nil { - p.logger.Err(err).Msg("publish to fileserver failed") - return cloudevents.NewReceipt(false, "publish to fileserver failed: %v", err) - } - p.logger.Info().Msg("publish to fs success") - - return cloudevents.ResultACK -} - -func (p *Worker) notifyLark(publishInfo *PublishInfo, err error) { +func (p *tiupWorker) notifyLark(publishInfo *PublishInfo, err error) { if p.options.LarkWebhookURL == "" { return } @@ -195,7 +172,7 @@ func (p *Worker) notifyLark(publishInfo *PublishInfo, err error) { } } -func (p *Worker) publishTiupPkg(file string, info *PublishInfo) error { +func (p *tiupWorker) publish(file string, info *PublishInfo) error { args := []string{"mirror", "publish", info.Name, info.Version, file, info.EntryPoint, "--os", info.OS, "--arch", info.Arch, "--desc", info.Description} if info.Standalone { args = append(args, "--standalone") @@ -216,8 +193,3 @@ func (p *Worker) publishTiupPkg(file string, info *PublishInfo) error { return nil } - -func (p *Worker) publishFileserver(file string, info *PublishInfo) error { - // to implement - return errors.New("not implemented") -} diff --git a/publisher/pkg/impl/types.go b/publisher/pkg/impl/types.go index 57ae9e2..5b44202 100644 --- a/publisher/pkg/impl/types.go +++ b/publisher/pkg/impl/types.go @@ -1,6 +1,10 @@ package impl -import "time" +import ( + "time" + + cloudevents "github.com/cloudevents/sdk-go/v2" +) const ( EventTypeTiupPublishRequest = "net.pingcap.tibuild.tiup-publish-request" @@ -31,19 +35,15 @@ type From struct { } type PublishInfo struct { - Name string `json:"name,omitempty"` // tiup pkg name or component name for fileserver - OS string `json:"os,omitempty"` - Arch string `json:"arch,omitempty"` - Version string `json:"version,omitempty"` + Name string `json:"name,omitempty"` // tiup pkg name or component name for fileserver + OS string `json:"os,omitempty"` // ignore for `EventTypeFsPublishRequest` + Arch string `json:"arch,omitempty"` // ignore for `EventTypeFsPublishRequest` + Version string `json:"version,omitempty"` // SemVer format for `EventTypeTiupPublishRequest` and "#" for `EventTypeFsPublishRequest` Description string `json:"description,omitempty"` // ignore for `EventTypeFsPublishRequest` - EntryPoint string `json:"entry_point,omitempty"` // ignore for `EventTypeFsPublishRequest` + EntryPoint string `json:"entry_point,omitempty"` // if event is `EventTypeFsPublishRequest`, the the value is the basename for store file, like tidb-server.tar.gz Standalone bool `json:"standalone,omitempty"` // ignore for `EventTypeFsPublishRequest` } -func (p *PublishInfo) IsNightlyTiup() bool { - return tiupVersionRegex.MatchString(p.Version) -} - type FromOci struct { Repo string `json:"repo,omitempty"` Tag string `json:"tag,omitempty"` @@ -53,3 +53,8 @@ type FromOci struct { type FromHTTP struct { URL string `json:"url,omitempty"` } + +// Worker provides handling for cloud events. +type Worker interface { + Handle(event cloudevents.Event) cloudevents.Result +}