Skip to content

Commit

Permalink
feat(publisher): implement fs worker (#188)
Browse files Browse the repository at this point in the history
Signed-off-by: wuhuizuo <[email protected]>

Signed-off-by: wuhuizuo <[email protected]>
  • Loading branch information
wuhuizuo authored Oct 31, 2024
1 parent 30432bc commit 0de8b53
Show file tree
Hide file tree
Showing 15 changed files with 1,139 additions and 334 deletions.
97 changes: 97 additions & 0 deletions publisher/cmd/worker/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package main

import (
"fmt"
"os"

"gopkg.in/yaml.v3"

"github.com/go-redis/redis/v8"
"github.com/rs/zerolog/log"
"github.com/segmentio/kafka-go"

"github.com/PingCAP-QE/ee-apps/publisher/pkg/config"
"github.com/PingCAP-QE/ee-apps/publisher/pkg/impl"
)

// Load and parse configuration.
func loadConfig(configFile string) (config.Worker, error) {
var config config.Worker
{
configData, err := os.ReadFile(configFile)
if err != nil {
return config, fmt.Errorf("error reading config file: %v", err)
}
if err := yaml.Unmarshal(configData, &config); err != nil {
return config, fmt.Errorf("error parsing config file: %v", err)
}
}
return config, nil
}

func initTiupWorkerFromConfig(configFile string) (*kafka.Reader, impl.Worker, error) {
config, err := loadConfig(configFile)
if err != nil {
return nil, nil, err
}

// Configure Redis client.
redisClient := redis.NewClient(&redis.Options{
Addr: config.Redis.Addr,
Password: config.Redis.Password,
Username: config.Redis.Username,
DB: config.Redis.DB,
})

worker, err := impl.NewTiupWorker(&log.Logger, redisClient, config.Options)
if err != nil {
log.Fatal().Err(err).Msg("Error creating tiup publishing worker")
}

kafkaReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: config.Kafka.Brokers,
Topic: config.Kafka.Topic,
GroupID: config.Kafka.ConsumerGroup,
MinBytes: 10e3,
MaxBytes: 10e6,
CommitInterval: 5000,
Logger: kafka.LoggerFunc(log.Printf),
})

return kafkaReader, worker, nil
}

func initFsWorkerFromConfig(configFile string) (*kafka.Reader, impl.Worker) {
config, err := loadConfig(configFile)
if err != nil {
log.Fatal().Err(err).Msg("load config failed")
}

// Configure Redis client.
redisClient := redis.NewClient(&redis.Options{
Addr: config.Redis.Addr,
Password: config.Redis.Password,
Username: config.Redis.Username,
DB: config.Redis.DB,
})

var worker impl.Worker
{
worker, err = impl.NewFsWorker(&log.Logger, redisClient, config.Options)
if err != nil {
log.Fatal().Err(err).Msg("Error creating tiup publishing worker")
}
}

kafkaReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: config.Kafka.Brokers,
Topic: config.Kafka.Topic,
GroupID: config.Kafka.ConsumerGroup,
MinBytes: 10e3,
MaxBytes: 10e6,
CommitInterval: 5000,
Logger: kafka.LoggerFunc(log.Printf),
})

return kafkaReader, worker
}
76 changes: 11 additions & 65 deletions publisher/cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,21 @@ import (
"os/signal"
"sync"
"syscall"
"time"

"github.com/cloudevents/sdk-go/v2/event"
"github.com/go-redis/redis/v8"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/segmentio/kafka-go"
"gopkg.in/yaml.v3"

"github.com/PingCAP-QE/ee-apps/publisher/pkg/config"
"github.com/PingCAP-QE/ee-apps/publisher/pkg/impl"
)

func main() {
// Parse command-line flags
var (
configFile = flag.String("config", "config.yaml", "Path to config file")
dbgF = flag.Bool("debug", false, "Enable debug mode")
tiupConfigFile = flag.String("tiup-config", "config.yaml", "Path to config file")
fsConfigFile = flag.String("fs-config", "config.yaml", "Path to config file")
dbgF = flag.Bool("debug", false, "Enable debug mode")
)
flag.Parse()

Expand All @@ -38,58 +35,8 @@ func main() {
zerolog.SetGlobalLevel(zerolog.InfoLevel)
}

// Load and parse configuration
var config config.Worker
{
configData, err := os.ReadFile(*configFile)
if err != nil {
log.Fatal().Err(err).Msg("Error reading config file")
}
if err := yaml.Unmarshal(configData, &config); err != nil {
log.Fatal().Err(err).Msg("Error parsing config file")
}
}

// Configure Redis client
redisClient := redis.NewClient(&redis.Options{
Addr: config.Redis.Addr,
Password: config.Redis.Password,
Username: config.Redis.Username,
DB: config.Redis.DB,
})

ctx := log.Logger.WithContext(context.Background())
// Create TiUP publisher
var handler *impl.Worker
{
var err error
nigthlyInterval, err := time.ParseDuration(config.Options.NightlyInterval)
if err != nil {
log.Fatal().Err(err).Msg("Error parsing nightly interval")
}
handler, err = impl.NewWorker(&log.Logger, redisClient, impl.WorkerOptions{
MirrorURL: config.Options.MirrorURL,
LarkWebhookURL: config.Options.LarkWebhookURL,
NightlyInterval: nigthlyInterval,
})
if err != nil {
log.Fatal().Err(err).Msg("Error creating handler")
}
}

// Configure Kafka reader
var reader *kafka.Reader
{
reader = kafka.NewReader(kafka.ReaderConfig{
Brokers: config.Kafka.Brokers,
Topic: config.Kafka.Topic,
GroupID: config.Kafka.ConsumerGroup,
MinBytes: 10e3,
MaxBytes: 10e6,
CommitInterval: 5000,
Logger: kafka.LoggerFunc(log.Printf),
})
}
tiupPublishRequestKafkaReader, tiupWorker := initFsWorkerFromConfig(*tiupConfigFile)
fsPublishRequestKafkaReader, fsWorker := initFsWorkerFromConfig(*fsConfigFile)

// Create channel used by both the signal handler and server goroutines
// to notify the main goroutine when to stop the server.
Expand All @@ -103,22 +50,21 @@ func main() {
errc <- fmt.Errorf("%s", <-c)
}()

// Start workers.
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(ctx)

start(ctx, reader, handler, &wg, errc)
ctx, cancel := context.WithCancel(context.Background())
startWorker(ctx, &wg, tiupPublishRequestKafkaReader, tiupWorker)
startWorker(ctx, &wg, fsPublishRequestKafkaReader, fsWorker)

// Wait for signal.
log.Warn().Msgf("exiting (%v)", <-errc)

// Send cancellation signal to the goroutines.
cancel()

wg.Wait()
log.Warn().Msg("exited")
}

func start(ctx context.Context, reader *kafka.Reader, handler *impl.Worker, wg *sync.WaitGroup, errc chan error) {
func startWorker(ctx context.Context, wg *sync.WaitGroup, reader *kafka.Reader, worker impl.Worker) {
(*wg).Add(1)
go func() {
defer (*wg).Done()
Expand All @@ -142,7 +88,7 @@ func start(ctx context.Context, reader *kafka.Reader, handler *impl.Worker, wg *
}

log.Debug().Str("ce-id", cloudEvent.ID()).Str("ce-type", cloudEvent.Type()).Msg("received cloud event")
handler.Handle(cloudEvent)
worker.Handle(cloudEvent)
}
}
}()
Expand Down
8 changes: 4 additions & 4 deletions publisher/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ go 1.23
require (
github.com/PingCAP-QE/ee-apps/dl v0.0.0-20240926143418-da3f56b05e46
github.com/cloudevents/sdk-go/v2 v2.15.2
github.com/go-redis/redis/v8 v8.11.5
github.com/google/uuid v1.6.0
github.com/rs/zerolog v1.33.0
github.com/segmentio/kafka-go v0.4.47
goa.design/clue v1.0.6
Expand All @@ -14,22 +16,20 @@ require (
oras.land/oras-go/v2 v2.5.0
)

require github.com/fsnotify/fsnotify v1.7.0 // indirect

require (
github.com/aws/smithy-go v1.20.3 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dimfeld/httppath v0.0.0-20170720192232-ee938bf73598 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-chi/chi/v5 v5.1.0 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-redis/redis/v8 v8.11.5
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/ks3sdklib/aws-sdk-go v1.3.0
github.com/manveru/faker v0.0.0-20171103152722-9fbc68a78c4d // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
Expand Down
4 changes: 4 additions & 0 deletions publisher/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/ks3sdklib/aws-sdk-go v1.3.0 h1:EmZaUWlrxrYo33v8auaRhyG3es3gK6WtiEI6I+8BdGU=
github.com/ks3sdklib/aws-sdk-go v1.3.0/go.mod h1:jGcsV0dJgMmStAyqjkKVUu6F167pAXYZAS3LqoZMmtM=
github.com/manveru/faker v0.0.0-20171103152722-9fbc68a78c4d h1:Zj+PHjnhRYWBK6RqCDBcAhLXoi3TzC27Zad/Vn+gnVQ=
github.com/manveru/faker v0.0.0-20171103152722-9fbc68a78c4d/go.mod h1:WZy8Q5coAB1zhY9AOBJP0O6J4BuDfbupUDavKY+I3+s=
github.com/manveru/gobdd v0.0.0-20131210092515-f1a17fdd710b h1:3E44bLeN8uKYdfQqVQycPnaVviZdBLbizFhU49mtbe4=
Expand Down Expand Up @@ -85,9 +87,11 @@ github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUan
github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
Expand Down
8 changes: 2 additions & 6 deletions publisher/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@ type Worker struct {
KafkaBasic `yaml:",inline" json:",inline"`
ConsumerGroup string `yaml:"consumer_group" json:"consumer_group,omitempty"`
} `yaml:"kafka" json:"kafka,omitempty"`
Redis Redis `yaml:"redis" json:"redis,omitempty"`
Options struct {
MirrorURL string `yaml:"mirror_url" json:"mirror_url,omitempty"`
LarkWebhookURL string `yaml:"lark_webhook_url" json:"lark_webhook_url,omitempty"`
NightlyInterval string `yaml:"nightly_interval" json:"nightly_interval,omitempty"`
}
Redis Redis `yaml:"redis" json:"redis,omitempty"`
Options map[string]string `yaml:"options" json:"options,omitempty"`
}

type Service struct {
Expand Down
2 changes: 1 addition & 1 deletion publisher/pkg/impl/fileserver_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewFileserver(logger *zerolog.Logger, kafkaWriter *kafka.Writer, redisClien
func (s *fileserversrvc) RequestToPublish(ctx context.Context, p *fileserver.RequestToPublishPayload) (res []string, err error) {
s.logger.Info().Msgf("fileserver.request-to-publish")
// 1. Analyze the artifact_url to get the repo and tag and the tiup package information.
publishRequests, err := analyzeFromOciArtifactUrl(p.ArtifactURL)
publishRequests, err := analyzeFsFromOciArtifactUrl(p.ArtifactURL)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 0de8b53

Please sign in to comment.