From 90bdf6149bbc7951035f530941d3a3e986c4a60a Mon Sep 17 00:00:00 2001 From: Korbinian Stoemmer Date: Sun, 30 Jul 2023 08:56:48 +0200 Subject: [PATCH 1/9] reconcile subscriptions --- .../loadtest/events/GenericEvent/events.go | 203 ---------- .../loadtest/events/GenericFactory/factory.go | 58 --- internal/loadtest/events/events.go | 24 -- internal/loadtest/events/factory.go | 150 ++++++++ internal/loadtest/events/factory_test.go | 75 ++++ internal/loadtest/events/generator.go | 154 ++++++++ internal/loadtest/publisher/publisher.go | 39 +- internal/loadtest/sender/cloudevent/sender.go | 4 +- internal/loadtest/sender/interface.go | 4 +- .../loadtest/sender/legacyevent/sender.go | 3 +- internal/loadtest/sender/sender.go | 361 +++++------------- 11 files changed, 490 insertions(+), 585 deletions(-) delete mode 100644 internal/loadtest/events/GenericEvent/events.go delete mode 100644 internal/loadtest/events/GenericFactory/factory.go create mode 100644 internal/loadtest/events/factory.go create mode 100644 internal/loadtest/events/factory_test.go create mode 100644 internal/loadtest/events/generator.go diff --git a/internal/loadtest/events/GenericEvent/events.go b/internal/loadtest/events/GenericEvent/events.go deleted file mode 100644 index dfb147b..0000000 --- a/internal/loadtest/events/GenericEvent/events.go +++ /dev/null @@ -1,203 +0,0 @@ -package GenericEvent - -import ( - "context" - "fmt" - "log" - "sync" - "time" - - cev2 "github.com/cloudevents/sdk-go/v2" - - "github.com/kyma-project/eventing-tools/internal/loadtest/events/payload" - "github.com/kyma-project/eventing-tools/internal/tree" -) - -type Event struct { - source string - version string - name string - eps int - starttime string - feedback chan int - counter chan int - success chan int - events chan *Event - cancel context.CancelFunc - successes *tree.Node - eventtype string - wg *sync.WaitGroup - running bool - stopper sync.Mutex -} - -func (e *Event) Events() <-chan *Event { - return e.events -} - -func (e *Event) Source() string { - return e.source -} - -func (e *Event) Feedback() chan<- int { - return e.feedback -} - -func (e *Event) Success() chan<- int { - return e.success -} - -func (e *Event) Eps() int { - return e.eps -} - -func (e *Event) Counter() <-chan int { - return e.counter - -} - -func NewEvent(format, name, source string, eps int) *Event { - e := Event{ - version: format, - name: name, - eps: eps, - starttime: time.Now().Format("2006-01-02T15:04:05"), - source: source, - eventtype: fmt.Sprintf("%s.%s", name, format), - wg: &sync.WaitGroup{}, - } - return &e -} - -func (e *Event) handleSuccess(ctx context.Context) { - defer e.wg.Done() - for { - select { - case <-ctx.Done(): - fmt.Printf("%v.%v: %v\n", e.starttime, e.name, e.successes) - fmt.Printf("DONE success %v.%v\n", e.source, e.eventtype) - return - case val := <-e.success: - e.successes = tree.InsertInt(e.successes, val) - } - } -} - -func (e *Event) PrintStats() string { - return fmt.Sprintf("%v.%v.%v.%v: %v\n", e.starttime, e.source, e.name, e.version, e.successes) -} - -func (e *Event) fillCounter(ctx context.Context) { - defer e.wg.Done() - var c int - var next int - for { - select { - case next = <-e.feedback: - break - default: - next = c - c++ - } - select { - case <-ctx.Done(): - fmt.Printf("DONE counter %v.%v\n", e.source, e.eventtype) - return - case e.counter <- next: - break - } - } -} - -func (e *Event) queueEvent(ctx context.Context) { - defer e.wg.Done() - defer func() { - if r := recover(); r != nil { - log.Println("recovered from in queueEvent: ", r) - } - }() - - t := time.NewTicker(time.Second) - defer t.Stop() - - // queue event immediately - for { - select { - case <-t.C: - for i := 0; i < e.eps; i++ { - select { - case <-ctx.Done(): - close(e.events) - fmt.Printf("DONE queue %v.%v\n", e.source, e.eventtype) - return - case e.events <- e: - continue - } - } - case <-ctx.Done(): - close(e.events) - fmt.Printf("DONE queue %v.%v\n", e.source, e.eventtype) - return - } - } -} - -func (e *Event) Stop() { - e.stopper.Lock() - if !e.running { - return - } - e.cancel() - fmt.Printf("waiting for %v.%v\n", e.source, e.eventtype) - e.wg.Wait() - fmt.Printf("DONE waiting for %v.%v\n", e.source, e.eventtype) - e.running = false - e.stopper.Unlock() -} - -func (e *Event) Start() { - if e.running { - return - } - e.running = true - e.events = make(chan *Event, e.eps) - e.counter = make(chan int, e.eps*4) - e.feedback = make(chan int, e.eps*4) - e.success = make(chan int, e.eps*4) - ctx, cancel := context.WithCancel(context.Background()) - e.cancel = cancel - e.successes = nil - e.wg.Add(1) - go e.fillCounter(ctx) - e.wg.Add(1) - go e.handleSuccess(ctx) - e.wg.Add(1) - go e.queueEvent(ctx) -} - -func (e *Event) ToLegacyEvent(seq int) payload.LegacyEvent { - d := payload.DTO{ - Start: e.starttime, - Value: seq, - } - return payload.LegacyEvent{ - Data: d, - EventType: e.name, - EventTypeVersion: e.version, - EventTime: time.Now().Format("2006-01-02T15:04:05.000Z"), - EventTracing: true, - } -} - -func (e *Event) ToCloudEvent(seq int) (cev2.Event, error) { - - ce := cev2.NewEvent() - ce.SetType(e.eventtype) - ce.SetSource(e.source) - d := payload.DTO{ - Start: e.starttime, - Value: seq, - } - err := ce.SetData(cev2.ApplicationJSON, d) - return ce, err -} diff --git a/internal/loadtest/events/GenericFactory/factory.go b/internal/loadtest/events/GenericFactory/factory.go deleted file mode 100644 index a11a17a..0000000 --- a/internal/loadtest/events/GenericFactory/factory.go +++ /dev/null @@ -1,58 +0,0 @@ -package GenericFactory - -import ( - "fmt" - "regexp" - "strconv" - "strings" - - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - - "github.com/kyma-project/eventing-tools/internal/loadtest/api/subscription/v1alpha2" - "github.com/kyma-project/eventing-tools/internal/loadtest/events/GenericEvent" -) - -type GenericEventFactory struct { -} - -const ( - formatLabel = "eventing-loadtest" -) - -func New() *GenericEventFactory { - return &GenericEventFactory{} -} - -func (g *GenericEventFactory) FromSubscription(subscription *unstructured.Unstructured, eventFormat string) []*GenericEvent.Event { - sub, err := v1alpha2.ToSubscription(subscription) - if err != nil { - return nil - } - events := []*GenericEvent.Event{} - - // for now we support only type matching standard - if sub.Spec.TypeMatching != v1alpha2.Standard { - return events - } - - if sub.GetLabels()[formatLabel] != eventFormat { - return events - } - - re := regexp.MustCompile(`.v(\d+)$`) - for _, et := range sub.Spec.Types { - if !re.MatchString(et) { - continue - } - rss := re.FindStringSubmatch(et) - for _, rs := range rss { - rate, err := strconv.Atoi(rs) - if err != nil { - continue - } - name := strings.TrimSuffix(et, fmt.Sprintf(".v%v", rs)) // trim the `.v` to get a clean event type without trailing `.`. - events = append(events, GenericEvent.NewEvent(fmt.Sprintf("v%v", rs), name, sub.Spec.Source, rate)) // create a clean `v` version indicator. - } - } - return events -} diff --git a/internal/loadtest/events/events.go b/internal/loadtest/events/events.go index 5b960b8..ea71ba4 100644 --- a/internal/loadtest/events/events.go +++ b/internal/loadtest/events/events.go @@ -1,30 +1,6 @@ package events -import ( - cev2 "github.com/cloudevents/sdk-go/v2" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - - "github.com/kyma-project/eventing-tools/internal/loadtest/events/GenericEvent" - "github.com/kyma-project/eventing-tools/internal/loadtest/events/payload" -) - const ( LegacyFormat = "legacy" CloudeventFormat = "cloudevent" ) - -type Event interface { - ToCloudEvent(int, string) (cev2.Event, error) - ToLegacyEvent(int) payload.LegacyEvent - Stop() - Eps() int - Counter() <-chan int - Source() string - Feedback() chan<- int - Success() chan<- int - Events() <-chan Event -} - -type EventFactory interface { - FromSubscription(*unstructured.Unstructured, string) []*GenericEvent.Event -} diff --git a/internal/loadtest/events/factory.go b/internal/loadtest/events/factory.go new file mode 100644 index 0000000..46fb92e --- /dev/null +++ b/internal/loadtest/events/factory.go @@ -0,0 +1,150 @@ +package events + +import ( + "fmt" + "regexp" + "strconv" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + "github.com/kyma-project/eventing-tools/internal/loadtest/api/subscription/v1alpha2" + "github.com/kyma-project/eventing-tools/internal/loadtest/subscription" +) + +type eventGenerator map[string]*Generator + +type Factory struct { + generators map[NamespaceName]eventGenerator + senderC chan<- Event +} + +func (f *Factory) OnNewSubscription(subscription *unstructured.Unstructured) { + f.FromSubscription(subscription) +} + +func (f *Factory) OnChangedSubscription(subscription *unstructured.Unstructured) { + f.FromSubscription(subscription) +} + +func (f *Factory) OnDeleteSubscription(subscription *unstructured.Unstructured) { + f.FromSubscription(subscription) +} + +func NewGeneratorFactory(senderC chan<- Event) *Factory { + return &Factory{ + generators: map[NamespaceName]eventGenerator{}, + senderC: senderC, + } +} + +type NamespaceName struct { + Name, Namespace string +} + +const ( + formatLabel = "eventing-loadtest" +) + +var _ subscription.Notifiable = &Factory{} + +func (f *Factory) FromSubscription(subscription *unstructured.Unstructured) error { + sub, err := v1alpha2.ToSubscription(subscription) + if err != nil { + return err + } + return f.reconcile(sub) +} + +func (f *Factory) reconcile(sub *v1alpha2.Subscription) error { + // for now we support only type matching standard + if sub.Spec.TypeMatching != v1alpha2.Standard { + return nil + } + nn := NamespaceName{ + Name: sub.Name, + Namespace: sub.Namespace, + } + + // delete subscription + if sub.DeletionTimestamp != nil { + if g, ok := f.generators[nn]; ok { + f.stopGenerators(g) + delete(f.generators, nn) + return nil + } + } + + // create default eventGenerator (empty) + if _, ok := f.generators[nn]; !ok { + f.generators[nn] = eventGenerator{} + } + + eg := f.generators[nn] + + // check if eventtypes have been removed + for etgen, gen := range eg { + for _, et := range EventTypeFromSubscription(sub) { + if et == etgen { + break + } + } + // remove generators for removed eventType + gen.Stop() + delete(eg, etgen) + } + + // handle adding EventTypes + for _, et := range EventTypeFromSubscription(sub) { + //for etgen, gen := range eg { + for etgen, gen := range eg { + if et == etgen { + // update the eventFormat to the one specified in the subscription + updateGeneratorFormat(sub, gen) + break + } + } + // create a new EventGenerator for the found eventType and start it + gen := f.ConfigureAndStartGenerator(sub, et) + if gen != nil { + eg[et] = gen + } + } + return nil +} + +func (f *Factory) Stop() { + for _, gens := range f.generators { + for _, gen := range gens { + gen.Stop() + } + } +} + +func (f *Factory) ConfigureAndStartGenerator(sub *v1alpha2.Subscription, eventType string) *Generator { + re := regexp.MustCompile(`.v(\d+)$`) + if !re.MatchString(eventType) { + return nil + } + rs := re.FindStringSubmatch(eventType)[1] + rate, err := strconv.Atoi(rs) + if err != nil { + return nil + } + gen := NewGenerator(eventType, sub.Spec.Source, rate, sub.GetLabels()[formatLabel], f.senderC) // create a clean `v` version indicator. + gen.Start() + return gen +} + +func (f *Factory) stopGenerators(generators eventGenerator) { + for _, g := range generators { + g.Stop() + } +} + +func EventTypeFromSubscription(sub *v1alpha2.Subscription) []string { + var ets []string + for _, t := range sub.Spec.Types { + ets = append(ets, fmt.Sprintf("%v.%v", sub.Spec.Source, t)) + } + return ets +} diff --git a/internal/loadtest/events/factory_test.go b/internal/loadtest/events/factory_test.go new file mode 100644 index 0000000..ab36a44 --- /dev/null +++ b/internal/loadtest/events/factory_test.go @@ -0,0 +1,75 @@ +package events + +import ( + "testing" + "time" + + "golang.org/x/net/context" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/kyma-project/eventing-tools/internal/loadtest/api/subscription/v1alpha2" +) + +func TestFactory_reconcile(t *testing.T) { + type fields struct { + generators map[NamespaceName]eventGenerator + senderC chan Event + } + type args struct { + sub *v1alpha2.Subscription + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + {name: "new subscription", + fields: fields{ + generators: map[NamespaceName]eventGenerator{}, + senderC: make(chan Event), + }, + args: args{ + sub: &v1alpha2.Subscription{ + ObjectMeta: metav1.ObjectMeta{ + Name: "new", + Namespace: "new", + }, + Spec: v1alpha2.SubscriptionSpec{ + Sink: "", + TypeMatching: v1alpha2.Standard, + Source: "source", + Types: []string{"foo.bar.v1", "bar.foo.v1"}, + Config: nil, + }, + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + go func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case e := <-tt.fields.senderC: + t.Logf("%+v", e) + } + } + }(ctx) + f := &Factory{ + generators: tt.fields.generators, + senderC: tt.fields.senderC, + } + if err := f.reconcile(tt.args.sub); (err != nil) != tt.wantErr { + t.Errorf("reconcile() error = %v, wantErr %v", err, tt.wantErr) + } + time.Sleep(1 * time.Second) + f.Stop() + cancel() + }) + } +} diff --git a/internal/loadtest/events/generator.go b/internal/loadtest/events/generator.go new file mode 100644 index 0000000..a9f89dd --- /dev/null +++ b/internal/loadtest/events/generator.go @@ -0,0 +1,154 @@ +package events + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/kyma-project/eventing-tools/internal/loadtest/api/subscription/v1alpha2" +) + +type Generator struct { + source string + version string + name string + eps int + starttime string + cancel context.CancelFunc + eventtype string + running bool + counter int + counterLock sync.Mutex + id int + sink string + c chan<- Event + wg sync.WaitGroup + format EventFormat +} + +type Event struct { + eventtype string + source string + sink string + id int + startTime string + format EventFormat +} + +func NewGenerator(eventType, source string, eps int, format string, senderC chan<- Event) *Generator { + e := Generator{ + eps: eps, + starttime: time.Now().Format("2006-01-02T15:04:05"), + source: source, + eventtype: eventType, + format: EventFormatFromString(format), + c: senderC, + } + return &e +} + +type EventStats struct { + eventtype, source, startTime string + sent int +} + +func updateGeneratorFormat(sub *v1alpha2.Subscription, gen *Generator) { + f := EventFormatFromString(sub.GetLabels()[formatLabel]) + if gen.format != f { + gen.format = f + gen.starttime = time.Now().Format("2006-01-02T15:04:05") + } +} + +func (e *Generator) fillChan(ctx context.Context, c chan<- Event) { + t := time.NewTicker(time.Second) + defer t.Stop() + remaining := e.eps + sent := 0 + id := 0 + for { + select { + case <-ctx.Done(): + fmt.Printf("DONE counter %v.%v\n", e.source, e.eventtype) + return + case <-t.C: + remaining = e.eps + //stats <- EventStats{ + // eventtype: e.eventtype, + // source: e.source, + // startTime: e.starttime, + // sent: sent, + //} + sent = 0 + default: + if remaining > 0 { + c <- Event{ + eventtype: e.eventtype, + source: e.source, + sink: e.sink, + id: id, + startTime: e.starttime, + format: e.format, + } + sent++ + remaining-- + } + } + } +} + +func (e *Generator) Start() { + ctx, cancel := context.WithCancel(context.Background()) + e.cancel = cancel + go func() { + e.wg.Add(1) + defer e.wg.Done() + e.fillChan(ctx, e.c) + }() +} + +func (e *Generator) Stop() { + e.cancel() + e.wg.Wait() +} + +// func (e *Generator) ToLegacyEvent(seq int) payload.LegacyEvent { +// d := payload.DTO{ +// Start: e.starttime, +// Value: seq, +// } +// return payload.LegacyEvent{ +// Data: d, +// EventType: e.name, +// EventTypeVersion: e.version, +// EventTime: time.Now().Format("2006-01-02T15:04:05.000Z"), +// EventTracing: true, +// } +// } +// +// func (e *Generator) ToCloudEvent(seq int) (cev2.Event, error) { +// ce := cev2.NewEvent() +// ce.SetType(e.eventtype) +// ce.SetSource(e.source) +// d := payload.DTO{ +// Start: e.starttime, +// Value: seq, +// } +// err := ce.SetData(cev2.ApplicationJSON, d) +// return ce, err +// } + +func EventFormatFromString(format string) EventFormat { + if format == "legacy" { + return Legacy + } + return CloudEvent +} + +type EventFormat int + +const ( + Legacy EventFormat = iota + CloudEvent +) diff --git a/internal/loadtest/publisher/publisher.go b/internal/loadtest/publisher/publisher.go index 1b3a704..a5fe31b 100644 --- a/internal/loadtest/publisher/publisher.go +++ b/internal/loadtest/publisher/publisher.go @@ -7,10 +7,8 @@ import ( "k8s.io/client-go/dynamic" "github.com/kyma-project/eventing-tools/internal/k8s" - "github.com/kyma-project/eventing-tools/internal/loadtest/config" + "github.com/kyma-project/eventing-tools/internal/loadtest/events" sender2 "github.com/kyma-project/eventing-tools/internal/loadtest/sender" - "github.com/kyma-project/eventing-tools/internal/loadtest/sender/cloudevent" - "github.com/kyma-project/eventing-tools/internal/loadtest/sender/legacyevent" "github.com/kyma-project/eventing-tools/internal/loadtest/subscription" "github.com/kyma-project/eventing-tools/internal/logger" "github.com/kyma-project/eventing-tools/internal/probes" @@ -23,34 +21,27 @@ const ( ) func Start(port int) { - appConfig := config.New() + //appConfig := config.New() k8sConfig := k8s.ConfigOrDie() - k8sClient := k8s.ClientOrDie(k8sConfig) + //k8sClient := k8s.ClientOrDie(k8sConfig) dynamicClient := dynamic.NewForConfigOrDie(k8sConfig) - legacySender := legacyevent.NewSender(appConfig) - legacyEventSender := sender2.NewSender(appConfig, legacySender) + sender, senderC := sender2.NewSender() + factory := events.NewGeneratorFactory(senderC) - ceSender := cloudevent.NewSender(appConfig) - ceEventSender := sender2.NewSender(appConfig, ceSender) + //config.NewWatcher(k8sClient, Namespace, ConfigMapName). + // OnAddNotify(ceEventSender). + // OnUpdateNotify(ceEventSender). + // OnDeleteNotify(ceEventSender). + // OnDeleteNotifyMe(). + // Watch() - config.NewWatcher(k8sClient, Namespace, ConfigMapName). - OnAddNotify(legacyEventSender). - OnUpdateNotify(legacyEventSender). - OnDeleteNotify(legacyEventSender). - OnAddNotify(ceEventSender). - OnUpdateNotify(ceEventSender). - OnDeleteNotify(ceEventSender). - OnDeleteNotifyMe(). - Watch() + sender.Start() subscription.NewWatcher(dynamicClient). - OnAddNotify(ceEventSender). - OnUpdateNotify(ceEventSender). - OnDeleteNotify(ceEventSender). - OnAddNotify(legacyEventSender). - OnUpdateNotify(legacyEventSender). - OnDeleteNotify(legacyEventSender). + OnAddNotify(factory). + OnUpdateNotify(factory). + OnDeleteNotify(factory). Watch() http.HandleFunc(probes.EndpointReadyz, probes.DefaultHandler) diff --git a/internal/loadtest/sender/cloudevent/sender.go b/internal/loadtest/sender/cloudevent/sender.go index dea14b2..7f98711 100644 --- a/internal/loadtest/sender/cloudevent/sender.go +++ b/internal/loadtest/sender/cloudevent/sender.go @@ -11,7 +11,6 @@ import ( "github.com/kyma-project/eventing-tools/internal/client/cloudevents" "github.com/kyma-project/eventing-tools/internal/loadtest/config" "github.com/kyma-project/eventing-tools/internal/loadtest/events" - "github.com/kyma-project/eventing-tools/internal/loadtest/events/GenericEvent" "github.com/kyma-project/eventing-tools/internal/loadtest/sender" ) @@ -38,8 +37,7 @@ func NewSender(conf *config.Config) *Sender { return s } -func (s *Sender) SendEvent(evt *GenericEvent.Event, ack chan<- int, nack chan<- int, undelivered chan<- int) { - +func (s *Sender) SendEvent(evt *events.Generator, ack chan<- int, nack chan<- int, undelivered chan<- int) { seq := <-evt.Counter() ce, err := evt.ToCloudEvent(seq) diff --git a/internal/loadtest/sender/interface.go b/internal/loadtest/sender/interface.go index 1247d1c..d8a1f18 100644 --- a/internal/loadtest/sender/interface.go +++ b/internal/loadtest/sender/interface.go @@ -4,7 +4,7 @@ import ( "net/http" "github.com/kyma-project/eventing-tools/internal/loadtest/config" - "github.com/kyma-project/eventing-tools/internal/loadtest/events/GenericEvent" + "github.com/kyma-project/eventing-tools/internal/loadtest/events" ) type ConfigHandler interface { @@ -14,7 +14,7 @@ type ConfigHandler interface { } type Sender interface { - SendEvent(e *GenericEvent.Event, ack chan<- int, nack chan<- int, undelivered chan<- int) + SendEvent(e *events.Generator, ack chan<- int, nack chan<- int, undelivered chan<- int) Format() string Init(t *http.Transport, cfg *config.Config) } diff --git a/internal/loadtest/sender/legacyevent/sender.go b/internal/loadtest/sender/legacyevent/sender.go index 53188cb..abe0253 100644 --- a/internal/loadtest/sender/legacyevent/sender.go +++ b/internal/loadtest/sender/legacyevent/sender.go @@ -10,7 +10,6 @@ import ( "github.com/kyma-project/eventing-tools/internal/loadtest/config" "github.com/kyma-project/eventing-tools/internal/loadtest/events" - "github.com/kyma-project/eventing-tools/internal/loadtest/events/GenericEvent" "github.com/kyma-project/eventing-tools/internal/loadtest/sender" ) @@ -39,7 +38,7 @@ func NewSender(conf *config.Config) *Sender { return s } -func (s *Sender) SendEvent(evt *GenericEvent.Event, ack, nack, undelivered chan<- int) { +func (s *Sender) SendEvent(evt *events.Generator, ack, nack, undelivered chan<- int) { seq := <-evt.Counter() diff --git a/internal/loadtest/sender/sender.go b/internal/loadtest/sender/sender.go index 0712bb5..723e0e5 100644 --- a/internal/loadtest/sender/sender.go +++ b/internal/loadtest/sender/sender.go @@ -3,36 +3,17 @@ package sender import ( "context" "fmt" - "log" - "reflect" - "strings" "sync" - "sync/atomic" - "time" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - - "github.com/kyma-project/eventing-tools/internal/client/transport" - "github.com/kyma-project/eventing-tools/internal/loadtest/config" "github.com/kyma-project/eventing-tools/internal/loadtest/events" - "github.com/kyma-project/eventing-tools/internal/loadtest/events/GenericEvent" - "github.com/kyma-project/eventing-tools/internal/loadtest/events/GenericFactory" - "github.com/kyma-project/eventing-tools/internal/loadtest/subscription" ) // compile-time check for interfaces implementation. -var _ subscription.Notifiable = &EventSender{} -// Sender sends cloud events. +// Sender sends cloud factories. type EventSender struct { ctx context.Context cancel context.CancelFunc - config *config.Config - events map[string][]*GenericEvent.Event - factory events.EventFactory - process chan bool - running bool undelivered int32 ack int32 nack int32 @@ -41,270 +22,112 @@ type EventSender struct { stopper sync.Mutex sender Sender acks, nacks, undelivereds chan int + events chan events.Event + cnclCtx context.Context } -func (s *EventSender) FormatName() string { - // TODO implement me - panic("implement me") -} - -func NewSender(conf *config.Config, sender Sender) *EventSender { - s := &EventSender{config: conf} - s.undelivered = 0 - s.ack = 0 - s.nack = 0 - s.events = make(map[string][]*GenericEvent.Event) - s.factory = GenericFactory.New() - s.sender = sender - s.acks = make(chan int) - s.nacks = make(chan int) - s.undelivereds = make(chan int) - - return s -} - -func (s *EventSender) NotifyAdd(cm *corev1.ConfigMap) { - s.stopper.Lock() - defer s.stopper.Unlock() - s.stop() - config.Map(cm, s.config) - t := transport.New(s.config.MaxIdleConns, s.config.MaxConnsPerHost, s.config.MaxIdleConnsPerHost, s.config.IdleConnTimeout) - s.sender.Init(t, s.config) - s.ctx, s.cancel = context.WithCancel(context.TODO()) - s.process = make(chan bool, s.config.EpsLimit) - s.start() -} - -func (s *EventSender) NotifyUpdate(cm *corev1.ConfigMap) { - s.stopper.Lock() - defer s.stopper.Unlock() - s.stop() - config.Map(cm, s.config) - t := transport.New(s.config.MaxIdleConns, s.config.MaxConnsPerHost, s.config.MaxIdleConnsPerHost, s.config.IdleConnTimeout) - s.sender.Init(t, s.config) - s.ctx, s.cancel = context.WithCancel(context.TODO()) - s.process = make(chan bool, s.config.EpsLimit) - s.start() -} - -func (s *EventSender) NotifyDelete(*corev1.ConfigMap) { - s.stopper.Lock() - defer s.stopper.Unlock() - s.stop() -} - -func (s *EventSender) OnNewSubscription(subscription *unstructured.Unstructured) { - ne := s.factory.FromSubscription(subscription, s.sender.Format()) - if len(ne) == 0 { - return - } - s.stopper.Lock() - defer s.stopper.Unlock() - - // s.queue = make(chan events.Event, buffer) - for _, e := range ne { - e.Start() - } - s.mapLock.Lock() - defer s.mapLock.Unlock() - s.events[fmt.Sprintf("%v/%v", subscription.GetNamespace(), subscription.GetName())] = ne -} - -func (s *EventSender) OnChangedSubscription(subscription *unstructured.Unstructured) { - if subscription.GetDeletionTimestamp() != nil { - return - } - s.stopper.Lock() - defer s.stopper.Unlock() - for _, e := range s.events[fmt.Sprintf("%v/%v", subscription.GetNamespace(), subscription.GetName())] { - e.Stop() - } - s.mapLock.Lock() - defer s.mapLock.Unlock() - delete(s.events, fmt.Sprintf("%v/%v", subscription.GetNamespace(), subscription.GetName())) - - ne := s.factory.FromSubscription(subscription, s.sender.Format()) - if len(ne) == 0 { - return - } - - for _, e := range ne { - e.Start() - } - s.events[fmt.Sprintf("%v/%v", subscription.GetNamespace(), subscription.GetName())] = ne -} - -func (s *EventSender) OnDeleteSubscription(sub *unstructured.Unstructured) { - s.stopper.Lock() - s.mapLock.Lock() - defer s.mapLock.Unlock() - defer s.stopper.Unlock() - for _, e := range s.events[fmt.Sprintf("%v/%v", sub.GetNamespace(), sub.GetName())] { - e.Stop() - } - delete(s.events, fmt.Sprintf("%v/%v", sub.GetNamespace(), sub.GetName())) -} - -func (s *EventSender) init() { -} - -func (s *EventSender) start() { - s.ctx, s.cancel = context.WithCancel(context.TODO()) - s.mapLock.RLock() - for _, subs := range s.events { - for _, e := range subs { - e.Start() - } +func NewSender() (*EventSender, chan<- events.Event) { + eventsC := make(chan events.Event) + s := &EventSender{ + events: eventsC, } - s.mapLock.RUnlock() - s.sendEventsAsync() - s.wg.Add(1) - go s.refillMaxEps(time.Second) - s.wg.Add(1) - go s.reportUsageAsync(time.Second, 20*time.Second) -} - -func (s *EventSender) stop() { - // recover from closing already closed channels - defer func() { - if r := recover(); r != nil { - log.Println("recovered from: ", r) - } + return s, eventsC + //s.undelivered = 0 + //s.ack = 0 + //s.nack = 0 + //s.factories = make(map[string][]*events.Generator) + //s.factory = Factory.New() + //s.sender = sender + //s.acks = make(chan int) + //s.nacks = make(chan int) + //s.undelivereds = make(chan int) + // + //return s +} + +func (s *EventSender) Start() { + s.ctx, s.cancel = context.WithCancel(context.Background()) + go func() { + s.wg.Add(1) + defer s.wg.Done() + s.sendEvents() }() - s.mapLock.RLock() - for _, subs := range s.events { - for _, e := range subs { - e.Stop() - } - } - s.mapLock.RUnlock() +} - s.running = false +func (s *EventSender) Stop() { s.cancel() - close(s.process) s.wg.Wait() } -func (s *EventSender) sendEventsAsync() { - for i := 0; i < s.config.Workers; i++ { - s.wg.Add(1) - go s.sendEvents() - } -} - -func (s *EventSender) reportUsageAsync(send, success time.Duration) { - - defer func() { - s.wg.Done() - }() - - sendt := time.NewTicker(send) - defer sendt.Stop() - succt := time.NewTicker(success) - defer succt.Stop() - - for { - select { - case na := <-s.acks: - atomic.AddInt32(&s.ack, int32(na)) - case nn := <-s.nacks: - atomic.AddInt32(&s.nack, int32(nn)) - case nu := <-s.undelivereds: - atomic.AddInt32(&s.undelivered, int32(nu)) - - case <-s.ctx.Done(): - targetEPS := s.ComputeTotalEventsPerSecond() - log.Printf( - "%v: | target_eps:% 4d (% 4d)| undelivered:% 4d | ack:% 4d | nack:% 4d | sum:% 4d |", - s.sender.Format(), targetEPS, s.config.EpsLimit, s.undelivered, s.ack, s.nack, s.undelivered+s.ack+s.nack, - ) - return - case <-sendt.C: - targetEPS := s.ComputeTotalEventsPerSecond() - if targetEPS == 0 { - continue - } - log.Printf( - "%v: | target_eps:% 4d (% 4d)| undelivered:% 4d | ack:% 4d | nack:% 4d | sum:% 4d |", - s.sender.Format(), targetEPS, s.config.EpsLimit, s.undelivered, s.ack, s.nack, s.undelivered+s.ack+s.nack, - ) - // reset counts for last report - atomic.StoreInt32(&s.undelivered, 0) - atomic.StoreInt32(&s.ack, 0) - atomic.StoreInt32(&s.nack, 0) - case <-succt.C: - s.mapLock.RLock() - stats := []string{fmt.Sprintf("%v:", s.sender.Format())} - for _, subs := range s.events { - for _, e := range subs { - stats = append(stats, e.PrintStats()) - } - } - fmt.Println(strings.Join(stats, "\n\t")) - s.mapLock.RUnlock() - } - } -} +//func (s *EventSender) NotifyAdd(cm *corev1.ConfigMap) { +// s.stopper.Lock() +// defer s.stopper.Unlock() +// s.stop() +// config.Map(cm, s.config) +// t := transport.New(s.config.MaxIdleConns, s.config.MaxConnsPerHost, s.config.MaxIdleConnsPerHost, s.config.IdleConnTimeout) +// s.sender.Init(t, s.config) +// s.ctx, s.cancel = context.WithCancel(context.TODO()) +// s.process = make(chan bool, s.config.EpsLimit) +// s.start() +//} +// +//func (s *EventSender) NotifyUpdate(cm *corev1.ConfigMap) { +// s.stopper.Lock() +// defer s.stopper.Unlock() +// s.stop() +// config.Map(cm, s.config) +// t := transport.New(s.config.MaxIdleConns, s.config.MaxConnsPerHost, s.config.MaxIdleConnsPerHost, s.config.IdleConnTimeout) +// s.sender.Init(t, s.config) +// s.ctx, s.cancel = context.WithCancel(context.TODO()) +// s.process = make(chan bool, s.config.EpsLimit) +// s.start() +//} +// +//func (s *EventSender) NotifyDelete(*corev1.ConfigMap) { +// s.stopper.Lock() +// defer s.stopper.Unlock() +// s.stop() +//} func (s *EventSender) sendEvents() { - for { - var cases []reflect.SelectCase - s.mapLock.RLock() - for _, subs := range s.events { - for _, e := range subs { - if e.Events() != nil { - cases = append(cases, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(e.Events()), - }) - } - } - } - s.mapLock.RUnlock() - if len(cases) == 0 { - time.Sleep(500 * time.Millisecond) - continue - } - - _, value, ok := reflect.Select(cases) - if !ok { - continue - } - - e := value.Interface().(*GenericEvent.Event) - <-s.process - go s.sender.SendEvent(e, s.acks, s.nacks, s.undelivereds) - } -} - -func (s *EventSender) ComputeTotalEventsPerSecond() int { - eps := 0 - s.mapLock.RLock() - for _, subs := range s.events { - for _, e := range subs { - eps += e.Eps() - } - } - s.mapLock.RUnlock() - return eps -} - -func (s *EventSender) refillMaxEps(d time.Duration) { - defer func() { - if r := recover(); r != nil { - fmt.Println("Recovered in refillMaxEps", r) - } - }() - - t := time.NewTicker(d) for { select { - case <-t.C: - for i := 0; i < s.config.EpsLimit; i++ { - s.process <- true - } + case e := <-s.events: + fmt.Sprintf("%+v", e) case <-s.ctx.Done(): return } } } + +//func (s *EventSender) ComputeTotalEventsPerSecond() int { +// eps := 0 +// s.mapLock.RLock() +// for _, subs := range s.factories { +// for _, e := range subs { +// eps += e.Eps() +// } +// } +// s.mapLock.RUnlock() +// return eps +//} + +//func (s *EventSender) refillMaxEps(d time.Duration) { +// defer func() { +// if r := recover(); r != nil { +// fmt.Println("Recovered in refillMaxEps", r) +// } +// }() +// +// t := time.NewTicker(d) +// for { +// select { +// case <-t.C: +// for i := 0; i < s.config.EpsLimit; i++ { +// s.process <- true +// } +// case <-s.ctx.Done(): +// return +// } +// } +//} From 301aa5b123581a24f6fe763a5e6bade7683afddc Mon Sep 17 00:00:00 2001 From: Korbinian Stoemmer Date: Tue, 1 Aug 2023 11:57:46 +0200 Subject: [PATCH 2/9] simplify loadtest --- internal/loadtest/config/config.go | 62 +---- internal/loadtest/config/notifiable.go | 6 + internal/loadtest/events/events.go | 6 - internal/loadtest/events/factory.go | 9 +- internal/loadtest/events/factory_test.go | 2 +- internal/loadtest/events/generator.go | 93 +++----- internal/loadtest/publisher/publisher.go | 14 +- internal/loadtest/sender/cloudevent/sender.go | 57 +++-- .../sender/{ => interface}/interface.go | 9 +- .../loadtest/sender/legacyevent/sender.go | 72 +++--- internal/loadtest/sender/sender.go | 217 +++++++++++------- resources/loadtest/base/kustomization.yaml | 1 + 12 files changed, 278 insertions(+), 270 deletions(-) delete mode 100644 internal/loadtest/events/events.go rename internal/loadtest/sender/{ => interface}/interface.go (60%) diff --git a/internal/loadtest/config/config.go b/internal/loadtest/config/config.go index 832da24..48db1bf 100644 --- a/internal/loadtest/config/config.go +++ b/internal/loadtest/config/config.go @@ -3,33 +3,17 @@ package config import ( "flag" "fmt" - "strings" "time" ) // Config represents environment config. type Config struct { - ServerAddress string - PublishEndpoint string `config:"publish_endpoint"` - UseLegacyEvents bool `config:"use_legacy_events"` - EventSource string `config:"event_source"` - MaxInflightMessages0 string `config:"max_inflight_messages_0"` - MaxInflightMessages1 string `config:"max_inflight_messages_1"` - EventName0 string `config:"event_name_0"` - EventName1 string `config:"event_name_1"` - VersionFormat string `config:"version_format"` - GenerateCount0 int `config:"generate_count_0"` - GenerateCount1 int `config:"generate_count_1"` - EpsStart0 int `config:"eps_start_0"` - EpsStart1 int `config:"eps_start_1"` - EpsIncrement0 int `config:"eps_increment_0"` - EpsIncrement1 int `config:"eps_increment_1"` - EpsLimit int `config:"eps_limit"` - Workers int `config:"workers"` - MaxIdleConns int `config:"max_idle_conns"` - MaxConnsPerHost int `config:"max_conns_per_host"` - MaxIdleConnsPerHost int `config:"max_idle_conns_per_host"` - IdleConnTimeout time.Duration `config:"idle_conn_timeout"` + ServerAddress string + PublishHost string `config:"publish_host"` + MaxIdleConns int `config:"max_idle_conns"` + MaxConnsPerHost int `config:"max_conns_per_host"` + MaxIdleConnsPerHost int `config:"max_idle_conns_per_host"` + IdleConnTimeout time.Duration `config:"idle_conn_timeout"` } func New() *Config { @@ -39,39 +23,9 @@ func New() *Config { return c } -// IsVersionFormatEmpty returns true if event version is empty. -func (c *Config) IsVersionFormatEmpty() bool { - return len(strings.TrimSpace(c.VersionFormat)) == 0 -} - -// IsEventName0Empty returns true if event name 0 is empty. -func (c *Config) IsEventName0Empty() bool { - return len(strings.TrimSpace(c.EventName0)) == 0 -} - -// IsEventName1Empty returns true if event name 1 is empty. -func (c *Config) IsEventName1Empty() bool { - return len(strings.TrimSpace(c.EventName1)) == 0 -} - -// ComputeEventsCount returns the count of events to generate. -func (c *Config) ComputeEventsCount() int { - count := 0 - if c.IsVersionFormatEmpty() { - return count - } - if !c.IsEventName0Empty() { - count += c.GenerateCount0 - } - if !c.IsEventName1Empty() { - count += c.GenerateCount1 - } - return count -} - func (c *Config) String() string { return fmt.Sprintf( - "ServerAddress: %v PublishEndpoint: %v MaxInflightMessages0: %v MaxInflightMessages1: %v EventFormat0: %v EventFormat1: %v GenerateCount0: %v GenerateCount1: %v EpsStart0: %v EpsStart1: %v EpsIncrement0: %v EpsIncrement1: %v EpsLimit: %v Workers: %v MaxIdleConns: %v MaxConnsPerHost : %v MaxIdleConnsPerHost: %v IdleConnTimeout: %v", - c.ServerAddress, c.PublishEndpoint, c.MaxInflightMessages0, c.MaxInflightMessages1, c.EventName0, c.EventName1, c.GenerateCount0, c.GenerateCount1, c.EpsStart0, c.EpsStart1, c.EpsIncrement0, c.EpsIncrement1, c.EpsLimit, c.Workers, c.MaxIdleConns, c.MaxConnsPerHost, c.MaxIdleConnsPerHost, c.IdleConnTimeout, + "ServerAddress: %v PublishHost: %v MaxIdleConns: %v MaxConnsPerHost : %v MaxIdleConnsPerHost: %v IdleConnTimeout: %v", + c.ServerAddress, c.PublishHost, c.MaxIdleConns, c.MaxConnsPerHost, c.MaxIdleConnsPerHost, c.IdleConnTimeout, ) } diff --git a/internal/loadtest/config/notifiable.go b/internal/loadtest/config/notifiable.go index e4999ad..9f2e2bf 100644 --- a/internal/loadtest/config/notifiable.go +++ b/internal/loadtest/config/notifiable.go @@ -2,6 +2,12 @@ package config import corev1 "k8s.io/api/core/v1" +type Notifiable interface { + AddNotifiable + UpdateNotifiable + DeleteNotifiable +} + type AddNotifiable interface { NotifyAdd(*corev1.ConfigMap) } diff --git a/internal/loadtest/events/events.go b/internal/loadtest/events/events.go deleted file mode 100644 index ea71ba4..0000000 --- a/internal/loadtest/events/events.go +++ /dev/null @@ -1,6 +0,0 @@ -package events - -const ( - LegacyFormat = "legacy" - CloudeventFormat = "cloudevent" -) diff --git a/internal/loadtest/events/factory.go b/internal/loadtest/events/factory.go index 46fb92e..a2adc19 100644 --- a/internal/loadtest/events/factory.go +++ b/internal/loadtest/events/factory.go @@ -1,7 +1,6 @@ package events import ( - "fmt" "regexp" "strconv" @@ -82,10 +81,11 @@ func (f *Factory) reconcile(sub *v1alpha2.Subscription) error { eg := f.generators[nn] // check if eventtypes have been removed +currentGenerator: for etgen, gen := range eg { for _, et := range EventTypeFromSubscription(sub) { if et == etgen { - break + continue currentGenerator } } // remove generators for removed eventType @@ -94,13 +94,14 @@ func (f *Factory) reconcile(sub *v1alpha2.Subscription) error { } // handle adding EventTypes +currentEventType: for _, et := range EventTypeFromSubscription(sub) { //for etgen, gen := range eg { for etgen, gen := range eg { if et == etgen { // update the eventFormat to the one specified in the subscription updateGeneratorFormat(sub, gen) - break + continue currentEventType } } // create a new EventGenerator for the found eventType and start it @@ -144,7 +145,7 @@ func (f *Factory) stopGenerators(generators eventGenerator) { func EventTypeFromSubscription(sub *v1alpha2.Subscription) []string { var ets []string for _, t := range sub.Spec.Types { - ets = append(ets, fmt.Sprintf("%v.%v", sub.Spec.Source, t)) + ets = append(ets, t) } return ets } diff --git a/internal/loadtest/events/factory_test.go b/internal/loadtest/events/factory_test.go index ab36a44..68c6c02 100644 --- a/internal/loadtest/events/factory_test.go +++ b/internal/loadtest/events/factory_test.go @@ -38,7 +38,7 @@ func TestFactory_reconcile(t *testing.T) { Spec: v1alpha2.SubscriptionSpec{ Sink: "", TypeMatching: v1alpha2.Standard, - Source: "source", + Source: "Source", Types: []string{"foo.bar.v1", "bar.foo.v1"}, Config: nil, }, diff --git a/internal/loadtest/events/generator.go b/internal/loadtest/events/generator.go index a9f89dd..f6d1d89 100644 --- a/internal/loadtest/events/generator.go +++ b/internal/loadtest/events/generator.go @@ -10,32 +10,27 @@ import ( ) type Generator struct { - source string - version string - name string - eps int - starttime string - cancel context.CancelFunc - eventtype string - running bool - counter int - counterLock sync.Mutex - id int - sink string - c chan<- Event - wg sync.WaitGroup - format EventFormat -} - -type Event struct { - eventtype string source string - sink string + eps int + starttime string + cancel context.CancelFunc + eventtype string id int - startTime string + sink string + c chan<- Event + wg sync.WaitGroup format EventFormat } +type Event struct { + EventType string + Source string + Sink string + ID int + StartTime string + Format EventFormat +} + func NewGenerator(eventType, source string, eps int, format string, senderC chan<- Event) *Generator { e := Generator{ eps: eps, @@ -57,6 +52,7 @@ func updateGeneratorFormat(sub *v1alpha2.Subscription, gen *Generator) { f := EventFormatFromString(sub.GetLabels()[formatLabel]) if gen.format != f { gen.format = f + gen.id = 0 gen.starttime = time.Now().Format("2006-01-02T15:04:05") } } @@ -65,7 +61,6 @@ func (e *Generator) fillChan(ctx context.Context, c chan<- Event) { t := time.NewTicker(time.Second) defer t.Stop() remaining := e.eps - sent := 0 id := 0 for { select { @@ -74,25 +69,18 @@ func (e *Generator) fillChan(ctx context.Context, c chan<- Event) { return case <-t.C: remaining = e.eps - //stats <- EventStats{ - // eventtype: e.eventtype, - // source: e.source, - // startTime: e.starttime, - // sent: sent, - //} - sent = 0 default: if remaining > 0 { c <- Event{ - eventtype: e.eventtype, - source: e.source, - sink: e.sink, - id: id, - startTime: e.starttime, - format: e.format, + EventType: e.eventtype, + Source: e.source, + Sink: e.sink, + ID: id, + StartTime: e.starttime, + Format: e.format, } - sent++ remaining-- + id++ } } } @@ -113,32 +101,6 @@ func (e *Generator) Stop() { e.wg.Wait() } -// func (e *Generator) ToLegacyEvent(seq int) payload.LegacyEvent { -// d := payload.DTO{ -// Start: e.starttime, -// Value: seq, -// } -// return payload.LegacyEvent{ -// Data: d, -// EventType: e.name, -// EventTypeVersion: e.version, -// EventTime: time.Now().Format("2006-01-02T15:04:05.000Z"), -// EventTracing: true, -// } -// } -// -// func (e *Generator) ToCloudEvent(seq int) (cev2.Event, error) { -// ce := cev2.NewEvent() -// ce.SetType(e.eventtype) -// ce.SetSource(e.source) -// d := payload.DTO{ -// Start: e.starttime, -// Value: seq, -// } -// err := ce.SetData(cev2.ApplicationJSON, d) -// return ce, err -// } - func EventFormatFromString(format string) EventFormat { if format == "legacy" { return Legacy @@ -148,6 +110,13 @@ func EventFormatFromString(format string) EventFormat { type EventFormat int +func (e EventFormat) String() string { + if e == Legacy { + return "legacy" + } + return "cloudevent" +} + const ( Legacy EventFormat = iota CloudEvent diff --git a/internal/loadtest/publisher/publisher.go b/internal/loadtest/publisher/publisher.go index a5fe31b..946aca1 100644 --- a/internal/loadtest/publisher/publisher.go +++ b/internal/loadtest/publisher/publisher.go @@ -7,6 +7,7 @@ import ( "k8s.io/client-go/dynamic" "github.com/kyma-project/eventing-tools/internal/k8s" + "github.com/kyma-project/eventing-tools/internal/loadtest/config" "github.com/kyma-project/eventing-tools/internal/loadtest/events" sender2 "github.com/kyma-project/eventing-tools/internal/loadtest/sender" "github.com/kyma-project/eventing-tools/internal/loadtest/subscription" @@ -23,18 +24,17 @@ const ( func Start(port int) { //appConfig := config.New() k8sConfig := k8s.ConfigOrDie() - //k8sClient := k8s.ClientOrDie(k8sConfig) + k8sClient := k8s.ClientOrDie(k8sConfig) dynamicClient := dynamic.NewForConfigOrDie(k8sConfig) sender, senderC := sender2.NewSender() factory := events.NewGeneratorFactory(senderC) - //config.NewWatcher(k8sClient, Namespace, ConfigMapName). - // OnAddNotify(ceEventSender). - // OnUpdateNotify(ceEventSender). - // OnDeleteNotify(ceEventSender). - // OnDeleteNotifyMe(). - // Watch() + config.NewWatcher(k8sClient, Namespace, ConfigMapName). + OnAddNotify(sender). + OnUpdateNotify(sender). + OnDeleteNotify(sender). + Watch() sender.Start() diff --git a/internal/loadtest/sender/cloudevent/sender.go b/internal/loadtest/sender/cloudevent/sender.go index 7f98711..2334ff4 100644 --- a/internal/loadtest/sender/cloudevent/sender.go +++ b/internal/loadtest/sender/cloudevent/sender.go @@ -3,66 +3,77 @@ package cloudevent import ( "context" "fmt" - "net/http" cev2 "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/client" "github.com/kyma-project/eventing-tools/internal/client/cloudevents" + "github.com/kyma-project/eventing-tools/internal/client/transport" "github.com/kyma-project/eventing-tools/internal/loadtest/config" "github.com/kyma-project/eventing-tools/internal/loadtest/events" - "github.com/kyma-project/eventing-tools/internal/loadtest/sender" + "github.com/kyma-project/eventing-tools/internal/loadtest/events/payload" + "github.com/kyma-project/eventing-tools/internal/loadtest/sender/interface" ) // compile-time check for interfaces implementation. -var _ sender.Sender = &Sender{} +var _ _interface.Sender = &Sender{} // Sender sends cloud events. type Sender struct { - client client.Client - config *config.Config + ackC, nackC, undeliveredC chan<- events.Event + client client.Client + config config.Config } -func (s *Sender) Format() string { - return events.CloudeventFormat +func (s *Sender) Format() events.EventFormat { + return events.CloudEvent } -func (s *Sender) Init(t *http.Transport, cfg *config.Config) { - s.config = cfg +func NewSender(conf config.Config, ackC, nackC, undeliveredC chan<- events.Event) *Sender { + t := transport.New(conf.MaxIdleConns, conf.MaxConnsPerHost, conf.MaxIdleConnsPerHost, conf.IdleConnTimeout) + s := &Sender{ + config: conf, + ackC: ackC, + nackC: nackC, + undeliveredC: undeliveredC, + } s.client = cloudevents.NewClientOrDie(t) -} - -func NewSender(conf *config.Config) *Sender { - s := &Sender{config: conf} return s } -func (s *Sender) SendEvent(evt *events.Generator, ack chan<- int, nack chan<- int, undelivered chan<- int) { - seq := <-evt.Counter() +func ToCloudEvent(event events.Event) (cev2.Event, error) { + ce := cev2.NewEvent() + ce.SetType(event.EventType) + ce.SetSource(event.Source) + d := payload.DTO{ + Start: event.StartTime, + Value: event.ID, + } + err := ce.SetData(cev2.ApplicationJSON, d) + return ce, err +} - ce, err := evt.ToCloudEvent(seq) +func (s *Sender) SendEvent(event events.Event) { + ce, err := ToCloudEvent(event) if err != nil { return } - endpoint := fmt.Sprintf("%v/publish", s.config.PublishEndpoint) + endpoint := fmt.Sprintf("%v/publish", s.config.PublishHost) ctx := cev2.WithEncodingStructured(cev2.ContextWithTarget(context.Background(), endpoint)) resp := s.client.Send(ctx, ce) switch { case cev2.IsUndelivered(resp): { - undelivered <- 1 - evt.Feedback() <- seq + s.undeliveredC <- event } case cev2.IsACK(resp): { - ack <- 1 - evt.Success() <- seq + s.ackC <- event } case cev2.IsNACK(resp): { - nack <- 1 - evt.Feedback() <- seq + s.nackC <- event } } } diff --git a/internal/loadtest/sender/interface.go b/internal/loadtest/sender/interface/interface.go similarity index 60% rename from internal/loadtest/sender/interface.go rename to internal/loadtest/sender/interface/interface.go index d8a1f18..d9fb537 100644 --- a/internal/loadtest/sender/interface.go +++ b/internal/loadtest/sender/interface/interface.go @@ -1,8 +1,6 @@ -package sender +package _interface import ( - "net/http" - "github.com/kyma-project/eventing-tools/internal/loadtest/config" "github.com/kyma-project/eventing-tools/internal/loadtest/events" ) @@ -14,7 +12,6 @@ type ConfigHandler interface { } type Sender interface { - SendEvent(e *events.Generator, ack chan<- int, nack chan<- int, undelivered chan<- int) - Format() string - Init(t *http.Transport, cfg *config.Config) + SendEvent(event events.Event) + Format() events.EventFormat } diff --git a/internal/loadtest/sender/legacyevent/sender.go b/internal/loadtest/sender/legacyevent/sender.go index abe0253..3056057 100644 --- a/internal/loadtest/sender/legacyevent/sender.go +++ b/internal/loadtest/sender/legacyevent/sender.go @@ -7,50 +7,73 @@ import ( "fmt" "io" "net/http" + "strings" + "time" + "github.com/kyma-project/eventing-tools/internal/client/transport" "github.com/kyma-project/eventing-tools/internal/loadtest/config" "github.com/kyma-project/eventing-tools/internal/loadtest/events" - "github.com/kyma-project/eventing-tools/internal/loadtest/sender" + "github.com/kyma-project/eventing-tools/internal/loadtest/events/payload" + "github.com/kyma-project/eventing-tools/internal/loadtest/sender/interface" ) -var _ sender.Sender = &Sender{} +var _ _interface.Sender = &Sender{} // Sender sends legacy events. type Sender struct { - client *http.Client - config *config.Config + ackC, nackC, undeliveredC chan<- events.Event + client *http.Client + config config.Config } -func (s *Sender) Init(t *http.Transport, cfg *config.Config) { - s.config = cfg - s.client = &http.Client{ - Transport: t, - } -} - -func (s *Sender) Format() string { - return events.LegacyFormat +func (s *Sender) Format() events.EventFormat { + return events.Legacy } -func NewSender(conf *config.Config) *Sender { - s := &Sender{config: conf} +func NewSender(cfg config.Config, ackC, nackC, undeliveredC chan<- events.Event) *Sender { + s := &Sender{ + config: cfg, + client: &http.Client{ + Transport: transport.New(cfg.MaxIdleConns, cfg.MaxConnsPerHost, cfg.MaxIdleConnsPerHost, cfg.IdleConnTimeout), + }, + ackC: ackC, + nackC: nackC, + undeliveredC: undeliveredC, + } return s } -func (s *Sender) SendEvent(evt *events.Generator, ack, nack, undelivered chan<- int) { +func ToLegacyEvent(event events.Event) payload.LegacyEvent { + d := payload.DTO{ + Start: event.StartTime, + Value: event.ID, + } + eventtype, version := splitEventType(event.EventType) + return payload.LegacyEvent{ + Data: d, + EventType: eventtype, + EventTypeVersion: version, + EventTime: time.Now().Format("2006-01-02T15:04:05.000Z"), + EventTracing: true, + } +} - seq := <-evt.Counter() +func splitEventType(eventType string) (string, string) { + i := strings.LastIndex(eventType, ".") + return eventType[0:i], eventType[i+1:] +} +func (s *Sender) SendEvent(event events.Event) { // Build a http request out of the legacy event. - le := evt.ToLegacyEvent(seq) + le := ToLegacyEvent(event) b, err := json.Marshal(le) if err != nil { return } r := bytes.NewReader(b) - legacyEndpoint := fmt.Sprintf("%s/%s/v1/events", s.config.PublishEndpoint, evt.Source()) + legacyEndpoint := fmt.Sprintf("%s/%s/v1/events", s.config.PublishHost, event.Source) rq, err := http.NewRequestWithContext(context.TODO(), http.MethodPost, legacyEndpoint, r) if err != nil { return @@ -59,13 +82,12 @@ func (s *Sender) SendEvent(evt *events.Generator, ack, nack, undelivered chan<- // Send the http request. if s.client == nil { - undelivered <- 1 + s.undeliveredC <- event return } resp, err := s.client.Do(rq) if err != nil { - undelivered <- 1 - evt.Feedback() <- seq + s.undeliveredC <- event return } @@ -76,13 +98,11 @@ func (s *Sender) SendEvent(evt *events.Generator, ack, nack, undelivered chan<- switch { case resp.StatusCode/100 == 2: { - ack <- 1 - evt.Success() <- seq + s.ackC <- event } default: { - nack <- 1 - evt.Feedback() <- seq + s.nackC <- event } } } diff --git a/internal/loadtest/sender/sender.go b/internal/loadtest/sender/sender.go index 723e0e5..c8834b7 100644 --- a/internal/loadtest/sender/sender.go +++ b/internal/loadtest/sender/sender.go @@ -3,50 +3,82 @@ package sender import ( "context" "fmt" + "os" + "sort" "sync" + "text/tabwriter" + "time" + corev1 "k8s.io/api/core/v1" + + "github.com/kyma-project/eventing-tools/internal/loadtest/config" "github.com/kyma-project/eventing-tools/internal/loadtest/events" + "github.com/kyma-project/eventing-tools/internal/loadtest/sender/cloudevent" + "github.com/kyma-project/eventing-tools/internal/loadtest/sender/interface" + "github.com/kyma-project/eventing-tools/internal/loadtest/sender/legacyevent" ) // compile-time check for interfaces implementation. +var _ config.Notifiable = &EventSender{} // Sender sends cloud factories. type EventSender struct { + cfg config.Config ctx context.Context cancel context.CancelFunc - undelivered int32 - ack int32 - nack int32 - mapLock sync.RWMutex + ackC, nackC, undeliveredC chan events.Event wg sync.WaitGroup - stopper sync.Mutex - sender Sender acks, nacks, undelivereds chan int events chan events.Event - cnclCtx context.Context + senders []_interface.Sender + limitC chan any + writer *tabwriter.Writer +} + +func (s *EventSender) NotifyAdd(configMap *corev1.ConfigMap) { + // TODO update config + config.Map(configMap, &s.cfg) + s.senders = append(s.senders, cloudevent.NewSender(s.cfg, s.ackC, s.nackC, s.undeliveredC)) + s.senders = append(s.senders, legacyevent.NewSender(s.cfg, s.ackC, s.nackC, s.undeliveredC)) + +} + +func (s *EventSender) NotifyUpdate(configMap *corev1.ConfigMap) { + //TODO implement me + config.Map(configMap, &s.cfg) + s.senders = make([]_interface.Sender, 0) + s.senders = append(s.senders, cloudevent.NewSender(s.cfg, s.ackC, s.nackC, s.undeliveredC)) + s.senders = append(s.senders, legacyevent.NewSender(s.cfg, s.ackC, s.nackC, s.undeliveredC)) +} + +func (s *EventSender) NotifyDelete(_ *corev1.ConfigMap) { + s.senders = make([]_interface.Sender, 0) } func NewSender() (*EventSender, chan<- events.Event) { eventsC := make(chan events.Event) + ackC := make(chan events.Event) + nackC := make(chan events.Event) + undeliveredC := make(chan events.Event) s := &EventSender{ - events: eventsC, + writer: new(tabwriter.Writer), + events: eventsC, + ackC: ackC, + nackC: nackC, + undeliveredC: undeliveredC, } + s.writer.Init(os.Stdout, 8, 8, 0, '\t', tabwriter.AlignRight) return s, eventsC - //s.undelivered = 0 - //s.ack = 0 - //s.nack = 0 - //s.factories = make(map[string][]*events.Generator) - //s.factory = Factory.New() - //s.sender = sender - //s.acks = make(chan int) - //s.nacks = make(chan int) - //s.undelivereds = make(chan int) - // - //return s } func (s *EventSender) Start() { s.ctx, s.cancel = context.WithCancel(context.Background()) + s.limitC = make(chan any, 3000) + go func() { + s.wg.Add(1) + defer s.wg.Done() + s.doAccounting() + }() go func() { s.wg.Add(1) defer s.wg.Done() @@ -59,75 +91,98 @@ func (s *EventSender) Stop() { s.wg.Wait() } -//func (s *EventSender) NotifyAdd(cm *corev1.ConfigMap) { -// s.stopper.Lock() -// defer s.stopper.Unlock() -// s.stop() -// config.Map(cm, s.config) -// t := transport.New(s.config.MaxIdleConns, s.config.MaxConnsPerHost, s.config.MaxIdleConnsPerHost, s.config.IdleConnTimeout) -// s.sender.Init(t, s.config) -// s.ctx, s.cancel = context.WithCancel(context.TODO()) -// s.process = make(chan bool, s.config.EpsLimit) -// s.start() -//} -// -//func (s *EventSender) NotifyUpdate(cm *corev1.ConfigMap) { -// s.stopper.Lock() -// defer s.stopper.Unlock() -// s.stop() -// config.Map(cm, s.config) -// t := transport.New(s.config.MaxIdleConns, s.config.MaxConnsPerHost, s.config.MaxIdleConnsPerHost, s.config.IdleConnTimeout) -// s.sender.Init(t, s.config) -// s.ctx, s.cancel = context.WithCancel(context.TODO()) -// s.process = make(chan bool, s.config.EpsLimit) -// s.start() -//} -// -//func (s *EventSender) NotifyDelete(*corev1.ConfigMap) { -// s.stopper.Lock() -// defer s.stopper.Unlock() -// s.stop() -//} - func (s *EventSender) sendEvents() { for { select { case e := <-s.events: - fmt.Sprintf("%+v", e) + // here we have to actually send messages to the sink + for _, es := range s.senders { + if es.Format() == e.Format { + s.limitC <- struct{}{} + go func() { + es.SendEvent(e) + <-s.limitC + }() + break + } + } case <-s.ctx.Done(): return } } } -//func (s *EventSender) ComputeTotalEventsPerSecond() int { -// eps := 0 -// s.mapLock.RLock() -// for _, subs := range s.factories { -// for _, e := range subs { -// eps += e.Eps() -// } -// } -// s.mapLock.RUnlock() -// return eps -//} - -//func (s *EventSender) refillMaxEps(d time.Duration) { -// defer func() { -// if r := recover(); r != nil { -// fmt.Println("Recovered in refillMaxEps", r) -// } -// }() -// -// t := time.NewTicker(d) -// for { -// select { -// case <-t.C: -// for i := 0; i < s.config.EpsLimit; i++ { -// s.process <- true -// } -// case <-s.ctx.Done(): -// return -// } -// } -//} +type stat struct { + acks, nacks, undelivered int +} + +type stats map[string]stat + +func (s *EventSender) doAccounting() { + dur := 10 * time.Second + tickOne := time.NewTicker(1 * time.Second) + tickTen := time.NewTicker(dur) + defer tickOne.Stop() + defer tickTen.Stop() + cs := make(stats) + var all stat + fmt.Fprintf(s.writer, "%s\t%s\t%s", "ACK", "NACK", "UNDELIVERED") + fmt.Fprintf(s.writer, "\n%s\t%s\t%s", "----", "----", "----") + s.writer.Flush() + + for { + select { + case <-tickOne.C: + fmt.Fprintf(s.writer, "\n%v\t%v\t%v", all.acks, all.nacks, all.undelivered) + s.writer.Flush() + all = stat{} + case <-tickTen.C: + s.printStats(cs, dur) + cs = make(stats) + case e := <-s.ackC: + all.acks++ + st := cs["["+e.Format.String()+"]"+e.Source+"/"+e.EventType] + st.acks++ + cs["["+e.Format.String()+"]"+e.Source+"/"+e.EventType] = st + case e := <-s.nackC: + all.nacks++ + st := cs["["+e.Format.String()+"]"+e.Source+"/"+e.EventType] + st.nacks++ + cs["["+e.Format.String()+"]"+e.Source+"/"+e.EventType] = st + case e := <-s.undeliveredC: + all.undelivered++ + st := cs["["+e.Format.String()+"]"+e.Source+"/"+e.EventType] + st.undelivered++ + cs["["+e.Format.String()+"]"+e.Source+"/"+e.EventType] = st + case <-s.ctx.Done(): + fmt.Fprintf(s.writer, "\n%v\t%v\t%v\n", all.acks, all.nacks, all.undelivered) + s.writer.Flush() + s.printStats(cs, dur) + return + } + } +} + +func (s *EventSender) printStats(cs stats, interval time.Duration) { + // initialize tabwriter + + fmt.Fprint(s.writer, "\n--------------------------------------------") + s.writer.Flush() + + fmt.Fprintf(s.writer, "\n%s\t%s\t%s\t%s", "TYPE", "ACK", "NACK", "UNDELIVERED") + fmt.Fprintf(s.writer, "\n%s\t%s\t%s\t%s\t", "----", "----", "----", "----") + + var ak []string + for k := range cs { + ak = append(ak, k) + } + sort.Strings(ak) + for _, k := range ak { + fmt.Fprintf(s.writer, "\n%v\t%v\t%v\t%v", k, float64(cs[k].acks)/interval.Seconds(), float64(cs[k].nacks)/interval.Seconds(), float64(cs[k].undelivered)/interval.Seconds()) + } + s.writer.Flush() + fmt.Fprintln(s.writer, "\n--------------------------------------------") + s.writer.Flush() + fmt.Fprintf(s.writer, "%s\t%s\t%s\t", "ACK", "NACK", "UNDELIVERED") + fmt.Fprintf(s.writer, "\n%s\t%s\t%s\t", "----", "----", "----") +} diff --git a/resources/loadtest/base/kustomization.yaml b/resources/loadtest/base/kustomization.yaml index 1573173..f4216b7 100644 --- a/resources/loadtest/base/kustomization.yaml +++ b/resources/loadtest/base/kustomization.yaml @@ -17,3 +17,4 @@ resources: - rbac.yaml - service.yaml - subscriber.yaml +- subscriptions.yaml From 871a354d8f17a648855e8c45a29f4f589bcb8da4 Mon Sep 17 00:00:00 2001 From: Korbinian Stoemmer Date: Tue, 1 Aug 2023 11:58:13 +0200 Subject: [PATCH 3/9] add subscriptions --- resources/loadtest/base/subscriptions.yaml | 41 ++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 resources/loadtest/base/subscriptions.yaml diff --git a/resources/loadtest/base/subscriptions.yaml b/resources/loadtest/base/subscriptions.yaml new file mode 100644 index 0000000..d4702d3 --- /dev/null +++ b/resources/loadtest/base/subscriptions.yaml @@ -0,0 +1,41 @@ +apiVersion: v1 +kind: List +items: + - apiVersion: eventing.kyma-project.io/v1alpha2 + kind: Subscription + metadata: + labels: + app: partner-handler-queue + eventing-loadtest: cloudevent + name: cloudevents + namespace: eventing-test + spec: + sink: http://loadtest-subscriber-0.eventing-test.svc.cluster.local + source: "cloudevent" + typeMatching: standard + types: + - CE.v8 + - CE.v16 + - CE.v32 + - CE.v64 + - CE.v128 + - CE.v256 + - apiVersion: eventing.kyma-project.io/v1alpha2 + kind: Subscription + metadata: + labels: + app: partner-handler-queue + eventing-loadtest: legacy + name: legacy + namespace: eventing-test + spec: + sink: http://loadtest-subscriber-1.eventing-test.svc.cluster.local + source: "legacy" + typeMatching: standard + types: + - LEGACY.v8 + - LEGACY.v16 + - LEGACY.v32 + - LEGACY.v64 + - LEGACY.v128 + - LEGACY.v256 From 524f2cbc0036a1cbe658e9e60f226c208172e2bd Mon Sep 17 00:00:00 2001 From: Korbinian Stoemmer Date: Tue, 8 Aug 2023 15:39:05 +0200 Subject: [PATCH 4/9] synchronize id generation --- internal/loadtest/events/generator.go | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/internal/loadtest/events/generator.go b/internal/loadtest/events/generator.go index f6d1d89..7333d72 100644 --- a/internal/loadtest/events/generator.go +++ b/internal/loadtest/events/generator.go @@ -20,6 +20,7 @@ type Generator struct { c chan<- Event wg sync.WaitGroup format EventFormat + lock sync.Mutex } type Event struct { @@ -51,9 +52,7 @@ type EventStats struct { func updateGeneratorFormat(sub *v1alpha2.Subscription, gen *Generator) { f := EventFormatFromString(sub.GetLabels()[formatLabel]) if gen.format != f { - gen.format = f - gen.id = 0 - gen.starttime = time.Now().Format("2006-01-02T15:04:05") + gen.Update(f) } } @@ -61,7 +60,6 @@ func (e *Generator) fillChan(ctx context.Context, c chan<- Event) { t := time.NewTicker(time.Second) defer t.Stop() remaining := e.eps - id := 0 for { select { case <-ctx.Done(): @@ -71,6 +69,12 @@ func (e *Generator) fillChan(ctx context.Context, c chan<- Event) { remaining = e.eps default: if remaining > 0 { + // ensure nowone resets the id atm + e.lock.Lock() + id := e.id + e.id++ + e.lock.Unlock() + c <- Event{ EventType: e.eventtype, Source: e.source, @@ -80,12 +84,20 @@ func (e *Generator) fillChan(ctx context.Context, c chan<- Event) { Format: e.format, } remaining-- - id++ } } } } +func (e *Generator) Update(format EventFormat) { + // let's stop all concurrency for a while + e.lock.Lock() + e.format = format + e.id = 0 + e.starttime = time.Now().Format("2006-01-02T15:04:05") + e.lock.Unlock() +} + func (e *Generator) Start() { ctx, cancel := context.WithCancel(context.Background()) e.cancel = cancel From ca87d16ba3c87045c50743085d2777972eb3750c Mon Sep 17 00:00:00 2001 From: Korbinian Stoemmer Date: Tue, 8 Aug 2023 16:31:11 +0200 Subject: [PATCH 5/9] change base config --- resources/loadtest/base/configmap.yaml | 2 +- resources/loadtest/base/subscriptions.yaml | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/resources/loadtest/base/configmap.yaml b/resources/loadtest/base/configmap.yaml index 382e9f3..d1415df 100644 --- a/resources/loadtest/base/configmap.yaml +++ b/resources/loadtest/base/configmap.yaml @@ -4,7 +4,7 @@ metadata: name: loadtest-publisher namespace: eventing-test data: - publish_endpoint: "http://eventing-publisher-proxy.kyma-system" + publish_host: "http://eventing-publisher-proxy.kyma-system" use_legacy_events: "false" event_source: "noapp" version_format: "v%04d" diff --git a/resources/loadtest/base/subscriptions.yaml b/resources/loadtest/base/subscriptions.yaml index d4702d3..fd2f52c 100644 --- a/resources/loadtest/base/subscriptions.yaml +++ b/resources/loadtest/base/subscriptions.yaml @@ -11,6 +11,8 @@ items: namespace: eventing-test spec: sink: http://loadtest-subscriber-0.eventing-test.svc.cluster.local + config: + maxInFlightMessages: "200" source: "cloudevent" typeMatching: standard types: @@ -20,6 +22,7 @@ items: - CE.v64 - CE.v128 - CE.v256 + - CE.v512 - apiVersion: eventing.kyma-project.io/v1alpha2 kind: Subscription metadata: @@ -30,6 +33,8 @@ items: namespace: eventing-test spec: sink: http://loadtest-subscriber-1.eventing-test.svc.cluster.local + config: + maxInFlightMessages: "200" source: "legacy" typeMatching: standard types: @@ -39,3 +44,4 @@ items: - LEGACY.v64 - LEGACY.v128 - LEGACY.v256 + - LEGACY.v512 From 11bd1f6094a1cfe61a50e956efbe98eed675fa2c Mon Sep 17 00:00:00 2001 From: Korbinian Stoemmer Date: Tue, 8 Aug 2023 16:51:16 +0200 Subject: [PATCH 6/9] code cleanup --- internal/loadtest/events/factory.go | 26 ++++++++++---------------- internal/loadtest/events/generator.go | 7 +------ internal/loadtest/sender/sender.go | 5 ++--- 3 files changed, 13 insertions(+), 25 deletions(-) diff --git a/internal/loadtest/events/factory.go b/internal/loadtest/events/factory.go index a2adc19..846c15e 100644 --- a/internal/loadtest/events/factory.go +++ b/internal/loadtest/events/factory.go @@ -18,15 +18,15 @@ type Factory struct { } func (f *Factory) OnNewSubscription(subscription *unstructured.Unstructured) { - f.FromSubscription(subscription) + f.StartReconcile(subscription) } func (f *Factory) OnChangedSubscription(subscription *unstructured.Unstructured) { - f.FromSubscription(subscription) + f.StartReconcile(subscription) } func (f *Factory) OnDeleteSubscription(subscription *unstructured.Unstructured) { - f.FromSubscription(subscription) + f.StartReconcile(subscription) } func NewGeneratorFactory(senderC chan<- Event) *Factory { @@ -46,12 +46,14 @@ const ( var _ subscription.Notifiable = &Factory{} -func (f *Factory) FromSubscription(subscription *unstructured.Unstructured) error { +func (f *Factory) StartReconcile(subscription *unstructured.Unstructured) { sub, err := v1alpha2.ToSubscription(subscription) if err != nil { - return err + // // TODO[k15r]: log + return } - return f.reconcile(sub) + //nolint:errcheck // TODO[k15r]: get rid of these errors + f.reconcile(sub) } func (f *Factory) reconcile(sub *v1alpha2.Subscription) error { @@ -83,7 +85,7 @@ func (f *Factory) reconcile(sub *v1alpha2.Subscription) error { // check if eventtypes have been removed currentGenerator: for etgen, gen := range eg { - for _, et := range EventTypeFromSubscription(sub) { + for _, et := range sub.Spec.Types { if et == etgen { continue currentGenerator } @@ -95,7 +97,7 @@ currentGenerator: // handle adding EventTypes currentEventType: - for _, et := range EventTypeFromSubscription(sub) { + for _, et := range sub.Spec.Types { //for etgen, gen := range eg { for etgen, gen := range eg { if et == etgen { @@ -141,11 +143,3 @@ func (f *Factory) stopGenerators(generators eventGenerator) { g.Stop() } } - -func EventTypeFromSubscription(sub *v1alpha2.Subscription) []string { - var ets []string - for _, t := range sub.Spec.Types { - ets = append(ets, t) - } - return ets -} diff --git a/internal/loadtest/events/generator.go b/internal/loadtest/events/generator.go index 7333d72..0edc4a3 100644 --- a/internal/loadtest/events/generator.go +++ b/internal/loadtest/events/generator.go @@ -44,11 +44,6 @@ func NewGenerator(eventType, source string, eps int, format string, senderC chan return &e } -type EventStats struct { - eventtype, source, startTime string - sent int -} - func updateGeneratorFormat(sub *v1alpha2.Subscription, gen *Generator) { f := EventFormatFromString(sub.GetLabels()[formatLabel]) if gen.format != f { @@ -101,8 +96,8 @@ func (e *Generator) Update(format EventFormat) { func (e *Generator) Start() { ctx, cancel := context.WithCancel(context.Background()) e.cancel = cancel + e.wg.Add(1) go func() { - e.wg.Add(1) defer e.wg.Done() e.fillChan(ctx, e.c) }() diff --git a/internal/loadtest/sender/sender.go b/internal/loadtest/sender/sender.go index c8834b7..5bdd1d9 100644 --- a/internal/loadtest/sender/sender.go +++ b/internal/loadtest/sender/sender.go @@ -28,7 +28,6 @@ type EventSender struct { cancel context.CancelFunc ackC, nackC, undeliveredC chan events.Event wg sync.WaitGroup - acks, nacks, undelivereds chan int events chan events.Event senders []_interface.Sender limitC chan any @@ -74,13 +73,13 @@ func NewSender() (*EventSender, chan<- events.Event) { func (s *EventSender) Start() { s.ctx, s.cancel = context.WithCancel(context.Background()) s.limitC = make(chan any, 3000) + s.wg.Add(1) go func() { - s.wg.Add(1) defer s.wg.Done() s.doAccounting() }() + s.wg.Add(1) go func() { - s.wg.Add(1) defer s.wg.Done() s.sendEvents() }() From 9154ce2a54f44674d3035a289b9f8519ca806587 Mon Sep 17 00:00:00 2001 From: Korbinian Stoemmer Date: Wed, 9 Aug 2023 09:36:17 +0200 Subject: [PATCH 7/9] add unit test github action --- .github/workflows/lint.yml | 48 ----------------------------------- .github/workflows/quality.yml | 39 ++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 48 deletions(-) delete mode 100644 .github/workflows/lint.yml create mode 100644 .github/workflows/quality.yml diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml deleted file mode 100644 index 3f0bceb..0000000 --- a/.github/workflows/lint.yml +++ /dev/null @@ -1,48 +0,0 @@ -name: golangci-lint -on: - push: - tags: - - v* - branches: - - main - pull_request: - branches: - - main -permissions: - contents: read - # Optional: allow read access to pull request. Use with `only-new-issues` option. - pull-requests: read -jobs: - golangci: - name: lint - runs-on: ubuntu-latest - steps: - - uses: actions/setup-go@v3 - with: - go-version: '1.19' - - uses: actions/checkout@v3 - - name: golangci-lint - uses: golangci/golangci-lint-action@v3 - with: - # Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version - version: v1.51 - args: --timeout=5m - - # Optional: working directory, useful for monorepos - # working-directory: somedir - - # Optional: golangci-lint command line arguments. - # args: --issues-exit-code=0 - - # Optional: show only new issues if it's a pull request. The default value is `false`. - # only-new-issues: true - - # Optional: if set to true then the all caching functionality will be complete disabled, - # takes precedence over all other caching options. - # skip-cache: true - - # Optional: if set to true then the action don't cache or restore ~/go/pkg. - # skip-pkg-cache: true - - # Optional: if set to true then the action don't cache or restore ~/.cache/go-build. - # skip-build-cache: true diff --git a/.github/workflows/quality.yml b/.github/workflows/quality.yml new file mode 100644 index 0000000..44ea705 --- /dev/null +++ b/.github/workflows/quality.yml @@ -0,0 +1,39 @@ +name: Code Quality +on: + push: + tags: + - v* + branches: + - main + pull_request: + branches: + - main +permissions: + contents: read + # Optional: allow read access to pull request. Use with `only-new-issues` option. + pull-requests: read +jobs: + lint: + name: lint + runs-on: ubuntu-latest + steps: + - uses: actions/setup-go@v3 + with: + go-version: '1.19' + - uses: actions/checkout@v3 + - name: golangci-lint + uses: golangci/golangci-lint-action@v3 + with: + # Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version + version: v1.51 + args: --timeout=5m + test: + name: Unit Test + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-go@v4 + with: + go-version: '1.20' + - name: test + run: go test ./... From 245db882f6283ef4fb4ae21a10605e118e5101d1 Mon Sep 17 00:00:00 2001 From: Korbinian Stoemmer Date: Wed, 9 Aug 2023 10:34:15 +0200 Subject: [PATCH 8/9] add build action --- .github/workflows/build.yml | 108 ++++++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 .github/workflows/build.yml diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..8224cba --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,108 @@ +# This workflow will build a golang project +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go + +name: Test + +on: + push: + branches: [ "main" ] + tags: [ '*.*.*' ] + pull_request: + branches: [ "main" ] + +env: + # Use docker.io for Docker Hub if empty + REGISTRY: ghcr.io + # github.repository as / + IMAGE_NAME: ${{ github.repository }} + +jobs: + + build: + runs-on: ubuntu-latest + permissions: write-all + + steps: + - uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v3 + with: + go-version: 1.19 + cache: true + + - name: Build + run: go build -v ./... + + - name: Test + run: go test -v ./... + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + + # Login against a Docker registry except on PR + # https://github.com/docker/login-action + - name: Log into registry ${{ env.REGISTRY }} + if: github.event_name != 'pull_request' + uses: docker/login-action@v2 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Extract Docker metadata + id: meta + uses: docker/metadata-action@v4 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + tags: | + type=sha + type=semver,pattern={{version}},event=tag + + - name: Build Docker image + id: build-and-push + uses: docker/build-push-action@v4 + with: + context: . + push: ${{ github.event_name != 'pull_request' }} + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + load: ${{ github.event_name == 'pull_request' }} + cache-from: type=gha + cache-to: type=gha,mode=max + + # - name: K3s test + # run: | + # curl -sfL https://get.k3s.io | INSTALL_K3S_VERSION="v1.25.5+k3s1" K3S_KUBECONFIG_MODE=644 INSTALL_K3S_EXEC="server --docker --disable traefik" sh - + # mkdir -p ~/.kube + # cp /etc/rancher/k3s/k3s.yaml ~/.kube/config + # chmod 600 ~/.kube/config + # make install + # make deploy IMG=$(echo $DOCKER_METADATA_OUTPUT_JSON | jq -r .tags[0]) + # make test USE_EXISTING_CLUSTER=true + # + # + # - name: Generate release artifacts + # if: startsWith(github.ref, 'refs/tags/') + # run: | + # kubectl kustomize config/default >cluster-ip-operator.yaml + # curl -Lo kyma https://storage.googleapis.com/kyma-cli-unstable/kyma-linux + # chmod +x kyma + # ./kyma alpha create module -n kyma-project.io/cluster-ip --version $GITHUB_REF_NAME \ + # --registry ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}-module \ + # -o cluster-ip-module-template.yaml -v + # + # - name: Setup tmate session + # if: failure() + # uses: mxschmitt/action-tmate@v3 + # with: + # limit-access-to-actor: true + # + # - name: Release + # uses: softprops/action-gh-release@v1 + # if: startsWith(github.ref, 'refs/tags/') + # with: + # files: | + # cluster-ip-operator.yaml + # cluster-ip-module-template.yaml + # From b79426eb89f2e65dbb8438841460977bfb3f6144 Mon Sep 17 00:00:00 2001 From: Korbinian Stoemmer Date: Fri, 11 Aug 2023 15:22:22 +0200 Subject: [PATCH 9/9] check if events get sent during test --- internal/loadtest/events/factory_test.go | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/internal/loadtest/events/factory_test.go b/internal/loadtest/events/factory_test.go index 68c6c02..c8c7f0a 100644 --- a/internal/loadtest/events/factory_test.go +++ b/internal/loadtest/events/factory_test.go @@ -19,12 +19,13 @@ func TestFactory_reconcile(t *testing.T) { sub *v1alpha2.Subscription } tests := []struct { - name string - fields fields - args args - wantErr bool + name string + fields fields + args args + wantErr bool + checkEventsFunc func([]Event) bool }{ - {name: "new subscription", + {name: "new subscription - starts sending events", fields: fields{ generators: map[NamespaceName]eventGenerator{}, senderC: make(chan Event), @@ -45,18 +46,22 @@ func TestFactory_reconcile(t *testing.T) { }, }, wantErr: false, + checkEventsFunc: func(events []Event) bool { + return len(events) > 0 + }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) + var receivedEvents []Event go func(ctx context.Context) { for { select { case <-ctx.Done(): return case e := <-tt.fields.senderC: - t.Logf("%+v", e) + receivedEvents = append(receivedEvents, e) } } }(ctx) @@ -70,6 +75,9 @@ func TestFactory_reconcile(t *testing.T) { time.Sleep(1 * time.Second) f.Stop() cancel() + if !tt.checkEventsFunc(receivedEvents) { + t.Fail() + } }) } }