diff --git a/internal/loadtest/config/config.go b/internal/loadtest/config/config.go index 832da24..48db1bf 100644 --- a/internal/loadtest/config/config.go +++ b/internal/loadtest/config/config.go @@ -3,33 +3,17 @@ package config import ( "flag" "fmt" - "strings" "time" ) // Config represents environment config. type Config struct { - ServerAddress string - PublishEndpoint string `config:"publish_endpoint"` - UseLegacyEvents bool `config:"use_legacy_events"` - EventSource string `config:"event_source"` - MaxInflightMessages0 string `config:"max_inflight_messages_0"` - MaxInflightMessages1 string `config:"max_inflight_messages_1"` - EventName0 string `config:"event_name_0"` - EventName1 string `config:"event_name_1"` - VersionFormat string `config:"version_format"` - GenerateCount0 int `config:"generate_count_0"` - GenerateCount1 int `config:"generate_count_1"` - EpsStart0 int `config:"eps_start_0"` - EpsStart1 int `config:"eps_start_1"` - EpsIncrement0 int `config:"eps_increment_0"` - EpsIncrement1 int `config:"eps_increment_1"` - EpsLimit int `config:"eps_limit"` - Workers int `config:"workers"` - MaxIdleConns int `config:"max_idle_conns"` - MaxConnsPerHost int `config:"max_conns_per_host"` - MaxIdleConnsPerHost int `config:"max_idle_conns_per_host"` - IdleConnTimeout time.Duration `config:"idle_conn_timeout"` + ServerAddress string + PublishHost string `config:"publish_host"` + MaxIdleConns int `config:"max_idle_conns"` + MaxConnsPerHost int `config:"max_conns_per_host"` + MaxIdleConnsPerHost int `config:"max_idle_conns_per_host"` + IdleConnTimeout time.Duration `config:"idle_conn_timeout"` } func New() *Config { @@ -39,39 +23,9 @@ func New() *Config { return c } -// IsVersionFormatEmpty returns true if event version is empty. -func (c *Config) IsVersionFormatEmpty() bool { - return len(strings.TrimSpace(c.VersionFormat)) == 0 -} - -// IsEventName0Empty returns true if event name 0 is empty. -func (c *Config) IsEventName0Empty() bool { - return len(strings.TrimSpace(c.EventName0)) == 0 -} - -// IsEventName1Empty returns true if event name 1 is empty. -func (c *Config) IsEventName1Empty() bool { - return len(strings.TrimSpace(c.EventName1)) == 0 -} - -// ComputeEventsCount returns the count of events to generate. -func (c *Config) ComputeEventsCount() int { - count := 0 - if c.IsVersionFormatEmpty() { - return count - } - if !c.IsEventName0Empty() { - count += c.GenerateCount0 - } - if !c.IsEventName1Empty() { - count += c.GenerateCount1 - } - return count -} - func (c *Config) String() string { return fmt.Sprintf( - "ServerAddress: %v PublishEndpoint: %v MaxInflightMessages0: %v MaxInflightMessages1: %v EventFormat0: %v EventFormat1: %v GenerateCount0: %v GenerateCount1: %v EpsStart0: %v EpsStart1: %v EpsIncrement0: %v EpsIncrement1: %v EpsLimit: %v Workers: %v MaxIdleConns: %v MaxConnsPerHost : %v MaxIdleConnsPerHost: %v IdleConnTimeout: %v", - c.ServerAddress, c.PublishEndpoint, c.MaxInflightMessages0, c.MaxInflightMessages1, c.EventName0, c.EventName1, c.GenerateCount0, c.GenerateCount1, c.EpsStart0, c.EpsStart1, c.EpsIncrement0, c.EpsIncrement1, c.EpsLimit, c.Workers, c.MaxIdleConns, c.MaxConnsPerHost, c.MaxIdleConnsPerHost, c.IdleConnTimeout, + "ServerAddress: %v PublishHost: %v MaxIdleConns: %v MaxConnsPerHost : %v MaxIdleConnsPerHost: %v IdleConnTimeout: %v", + c.ServerAddress, c.PublishHost, c.MaxIdleConns, c.MaxConnsPerHost, c.MaxIdleConnsPerHost, c.IdleConnTimeout, ) } diff --git a/internal/loadtest/config/notifiable.go b/internal/loadtest/config/notifiable.go index e4999ad..9f2e2bf 100644 --- a/internal/loadtest/config/notifiable.go +++ b/internal/loadtest/config/notifiable.go @@ -2,6 +2,12 @@ package config import corev1 "k8s.io/api/core/v1" +type Notifiable interface { + AddNotifiable + UpdateNotifiable + DeleteNotifiable +} + type AddNotifiable interface { NotifyAdd(*corev1.ConfigMap) } diff --git a/internal/loadtest/events/GenericEvent/events.go b/internal/loadtest/events/GenericEvent/events.go deleted file mode 100644 index dfb147b..0000000 --- a/internal/loadtest/events/GenericEvent/events.go +++ /dev/null @@ -1,203 +0,0 @@ -package GenericEvent - -import ( - "context" - "fmt" - "log" - "sync" - "time" - - cev2 "github.com/cloudevents/sdk-go/v2" - - "github.com/kyma-project/eventing-tools/internal/loadtest/events/payload" - "github.com/kyma-project/eventing-tools/internal/tree" -) - -type Event struct { - source string - version string - name string - eps int - starttime string - feedback chan int - counter chan int - success chan int - events chan *Event - cancel context.CancelFunc - successes *tree.Node - eventtype string - wg *sync.WaitGroup - running bool - stopper sync.Mutex -} - -func (e *Event) Events() <-chan *Event { - return e.events -} - -func (e *Event) Source() string { - return e.source -} - -func (e *Event) Feedback() chan<- int { - return e.feedback -} - -func (e *Event) Success() chan<- int { - return e.success -} - -func (e *Event) Eps() int { - return e.eps -} - -func (e *Event) Counter() <-chan int { - return e.counter - -} - -func NewEvent(format, name, source string, eps int) *Event { - e := Event{ - version: format, - name: name, - eps: eps, - starttime: time.Now().Format("2006-01-02T15:04:05"), - source: source, - eventtype: fmt.Sprintf("%s.%s", name, format), - wg: &sync.WaitGroup{}, - } - return &e -} - -func (e *Event) handleSuccess(ctx context.Context) { - defer e.wg.Done() - for { - select { - case <-ctx.Done(): - fmt.Printf("%v.%v: %v\n", e.starttime, e.name, e.successes) - fmt.Printf("DONE success %v.%v\n", e.source, e.eventtype) - return - case val := <-e.success: - e.successes = tree.InsertInt(e.successes, val) - } - } -} - -func (e *Event) PrintStats() string { - return fmt.Sprintf("%v.%v.%v.%v: %v\n", e.starttime, e.source, e.name, e.version, e.successes) -} - -func (e *Event) fillCounter(ctx context.Context) { - defer e.wg.Done() - var c int - var next int - for { - select { - case next = <-e.feedback: - break - default: - next = c - c++ - } - select { - case <-ctx.Done(): - fmt.Printf("DONE counter %v.%v\n", e.source, e.eventtype) - return - case e.counter <- next: - break - } - } -} - -func (e *Event) queueEvent(ctx context.Context) { - defer e.wg.Done() - defer func() { - if r := recover(); r != nil { - log.Println("recovered from in queueEvent: ", r) - } - }() - - t := time.NewTicker(time.Second) - defer t.Stop() - - // queue event immediately - for { - select { - case <-t.C: - for i := 0; i < e.eps; i++ { - select { - case <-ctx.Done(): - close(e.events) - fmt.Printf("DONE queue %v.%v\n", e.source, e.eventtype) - return - case e.events <- e: - continue - } - } - case <-ctx.Done(): - close(e.events) - fmt.Printf("DONE queue %v.%v\n", e.source, e.eventtype) - return - } - } -} - -func (e *Event) Stop() { - e.stopper.Lock() - if !e.running { - return - } - e.cancel() - fmt.Printf("waiting for %v.%v\n", e.source, e.eventtype) - e.wg.Wait() - fmt.Printf("DONE waiting for %v.%v\n", e.source, e.eventtype) - e.running = false - e.stopper.Unlock() -} - -func (e *Event) Start() { - if e.running { - return - } - e.running = true - e.events = make(chan *Event, e.eps) - e.counter = make(chan int, e.eps*4) - e.feedback = make(chan int, e.eps*4) - e.success = make(chan int, e.eps*4) - ctx, cancel := context.WithCancel(context.Background()) - e.cancel = cancel - e.successes = nil - e.wg.Add(1) - go e.fillCounter(ctx) - e.wg.Add(1) - go e.handleSuccess(ctx) - e.wg.Add(1) - go e.queueEvent(ctx) -} - -func (e *Event) ToLegacyEvent(seq int) payload.LegacyEvent { - d := payload.DTO{ - Start: e.starttime, - Value: seq, - } - return payload.LegacyEvent{ - Data: d, - EventType: e.name, - EventTypeVersion: e.version, - EventTime: time.Now().Format("2006-01-02T15:04:05.000Z"), - EventTracing: true, - } -} - -func (e *Event) ToCloudEvent(seq int) (cev2.Event, error) { - - ce := cev2.NewEvent() - ce.SetType(e.eventtype) - ce.SetSource(e.source) - d := payload.DTO{ - Start: e.starttime, - Value: seq, - } - err := ce.SetData(cev2.ApplicationJSON, d) - return ce, err -} diff --git a/internal/loadtest/events/GenericFactory/factory.go b/internal/loadtest/events/GenericFactory/factory.go deleted file mode 100644 index a11a17a..0000000 --- a/internal/loadtest/events/GenericFactory/factory.go +++ /dev/null @@ -1,58 +0,0 @@ -package GenericFactory - -import ( - "fmt" - "regexp" - "strconv" - "strings" - - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - - "github.com/kyma-project/eventing-tools/internal/loadtest/api/subscription/v1alpha2" - "github.com/kyma-project/eventing-tools/internal/loadtest/events/GenericEvent" -) - -type GenericEventFactory struct { -} - -const ( - formatLabel = "eventing-loadtest" -) - -func New() *GenericEventFactory { - return &GenericEventFactory{} -} - -func (g *GenericEventFactory) FromSubscription(subscription *unstructured.Unstructured, eventFormat string) []*GenericEvent.Event { - sub, err := v1alpha2.ToSubscription(subscription) - if err != nil { - return nil - } - events := []*GenericEvent.Event{} - - // for now we support only type matching standard - if sub.Spec.TypeMatching != v1alpha2.Standard { - return events - } - - if sub.GetLabels()[formatLabel] != eventFormat { - return events - } - - re := regexp.MustCompile(`.v(\d+)$`) - for _, et := range sub.Spec.Types { - if !re.MatchString(et) { - continue - } - rss := re.FindStringSubmatch(et) - for _, rs := range rss { - rate, err := strconv.Atoi(rs) - if err != nil { - continue - } - name := strings.TrimSuffix(et, fmt.Sprintf(".v%v", rs)) // trim the `.v` to get a clean event type without trailing `.`. - events = append(events, GenericEvent.NewEvent(fmt.Sprintf("v%v", rs), name, sub.Spec.Source, rate)) // create a clean `v` version indicator. - } - } - return events -} diff --git a/internal/loadtest/events/events.go b/internal/loadtest/events/events.go deleted file mode 100644 index 5b960b8..0000000 --- a/internal/loadtest/events/events.go +++ /dev/null @@ -1,30 +0,0 @@ -package events - -import ( - cev2 "github.com/cloudevents/sdk-go/v2" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - - "github.com/kyma-project/eventing-tools/internal/loadtest/events/GenericEvent" - "github.com/kyma-project/eventing-tools/internal/loadtest/events/payload" -) - -const ( - LegacyFormat = "legacy" - CloudeventFormat = "cloudevent" -) - -type Event interface { - ToCloudEvent(int, string) (cev2.Event, error) - ToLegacyEvent(int) payload.LegacyEvent - Stop() - Eps() int - Counter() <-chan int - Source() string - Feedback() chan<- int - Success() chan<- int - Events() <-chan Event -} - -type EventFactory interface { - FromSubscription(*unstructured.Unstructured, string) []*GenericEvent.Event -} diff --git a/internal/loadtest/events/factory.go b/internal/loadtest/events/factory.go new file mode 100644 index 0000000..846c15e --- /dev/null +++ b/internal/loadtest/events/factory.go @@ -0,0 +1,145 @@ +package events + +import ( + "regexp" + "strconv" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + "github.com/kyma-project/eventing-tools/internal/loadtest/api/subscription/v1alpha2" + "github.com/kyma-project/eventing-tools/internal/loadtest/subscription" +) + +type eventGenerator map[string]*Generator + +type Factory struct { + generators map[NamespaceName]eventGenerator + senderC chan<- Event +} + +func (f *Factory) OnNewSubscription(subscription *unstructured.Unstructured) { + f.StartReconcile(subscription) +} + +func (f *Factory) OnChangedSubscription(subscription *unstructured.Unstructured) { + f.StartReconcile(subscription) +} + +func (f *Factory) OnDeleteSubscription(subscription *unstructured.Unstructured) { + f.StartReconcile(subscription) +} + +func NewGeneratorFactory(senderC chan<- Event) *Factory { + return &Factory{ + generators: map[NamespaceName]eventGenerator{}, + senderC: senderC, + } +} + +type NamespaceName struct { + Name, Namespace string +} + +const ( + formatLabel = "eventing-loadtest" +) + +var _ subscription.Notifiable = &Factory{} + +func (f *Factory) StartReconcile(subscription *unstructured.Unstructured) { + sub, err := v1alpha2.ToSubscription(subscription) + if err != nil { + // // TODO[k15r]: log + return + } + //nolint:errcheck // TODO[k15r]: get rid of these errors + f.reconcile(sub) +} + +func (f *Factory) reconcile(sub *v1alpha2.Subscription) error { + // for now we support only type matching standard + if sub.Spec.TypeMatching != v1alpha2.Standard { + return nil + } + nn := NamespaceName{ + Name: sub.Name, + Namespace: sub.Namespace, + } + + // delete subscription + if sub.DeletionTimestamp != nil { + if g, ok := f.generators[nn]; ok { + f.stopGenerators(g) + delete(f.generators, nn) + return nil + } + } + + // create default eventGenerator (empty) + if _, ok := f.generators[nn]; !ok { + f.generators[nn] = eventGenerator{} + } + + eg := f.generators[nn] + + // check if eventtypes have been removed +currentGenerator: + for etgen, gen := range eg { + for _, et := range sub.Spec.Types { + if et == etgen { + continue currentGenerator + } + } + // remove generators for removed eventType + gen.Stop() + delete(eg, etgen) + } + + // handle adding EventTypes +currentEventType: + for _, et := range sub.Spec.Types { + //for etgen, gen := range eg { + for etgen, gen := range eg { + if et == etgen { + // update the eventFormat to the one specified in the subscription + updateGeneratorFormat(sub, gen) + continue currentEventType + } + } + // create a new EventGenerator for the found eventType and start it + gen := f.ConfigureAndStartGenerator(sub, et) + if gen != nil { + eg[et] = gen + } + } + return nil +} + +func (f *Factory) Stop() { + for _, gens := range f.generators { + for _, gen := range gens { + gen.Stop() + } + } +} + +func (f *Factory) ConfigureAndStartGenerator(sub *v1alpha2.Subscription, eventType string) *Generator { + re := regexp.MustCompile(`.v(\d+)$`) + if !re.MatchString(eventType) { + return nil + } + rs := re.FindStringSubmatch(eventType)[1] + rate, err := strconv.Atoi(rs) + if err != nil { + return nil + } + gen := NewGenerator(eventType, sub.Spec.Source, rate, sub.GetLabels()[formatLabel], f.senderC) // create a clean `v` version indicator. + gen.Start() + return gen +} + +func (f *Factory) stopGenerators(generators eventGenerator) { + for _, g := range generators { + g.Stop() + } +} diff --git a/internal/loadtest/events/factory_test.go b/internal/loadtest/events/factory_test.go new file mode 100644 index 0000000..c8c7f0a --- /dev/null +++ b/internal/loadtest/events/factory_test.go @@ -0,0 +1,83 @@ +package events + +import ( + "testing" + "time" + + "golang.org/x/net/context" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/kyma-project/eventing-tools/internal/loadtest/api/subscription/v1alpha2" +) + +func TestFactory_reconcile(t *testing.T) { + type fields struct { + generators map[NamespaceName]eventGenerator + senderC chan Event + } + type args struct { + sub *v1alpha2.Subscription + } + tests := []struct { + name string + fields fields + args args + wantErr bool + checkEventsFunc func([]Event) bool + }{ + {name: "new subscription - starts sending events", + fields: fields{ + generators: map[NamespaceName]eventGenerator{}, + senderC: make(chan Event), + }, + args: args{ + sub: &v1alpha2.Subscription{ + ObjectMeta: metav1.ObjectMeta{ + Name: "new", + Namespace: "new", + }, + Spec: v1alpha2.SubscriptionSpec{ + Sink: "", + TypeMatching: v1alpha2.Standard, + Source: "Source", + Types: []string{"foo.bar.v1", "bar.foo.v1"}, + Config: nil, + }, + }, + }, + wantErr: false, + checkEventsFunc: func(events []Event) bool { + return len(events) > 0 + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + var receivedEvents []Event + go func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case e := <-tt.fields.senderC: + receivedEvents = append(receivedEvents, e) + } + } + }(ctx) + f := &Factory{ + generators: tt.fields.generators, + senderC: tt.fields.senderC, + } + if err := f.reconcile(tt.args.sub); (err != nil) != tt.wantErr { + t.Errorf("reconcile() error = %v, wantErr %v", err, tt.wantErr) + } + time.Sleep(1 * time.Second) + f.Stop() + cancel() + if !tt.checkEventsFunc(receivedEvents) { + t.Fail() + } + }) + } +} diff --git a/internal/loadtest/events/generator.go b/internal/loadtest/events/generator.go new file mode 100644 index 0000000..0edc4a3 --- /dev/null +++ b/internal/loadtest/events/generator.go @@ -0,0 +1,130 @@ +package events + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/kyma-project/eventing-tools/internal/loadtest/api/subscription/v1alpha2" +) + +type Generator struct { + source string + eps int + starttime string + cancel context.CancelFunc + eventtype string + id int + sink string + c chan<- Event + wg sync.WaitGroup + format EventFormat + lock sync.Mutex +} + +type Event struct { + EventType string + Source string + Sink string + ID int + StartTime string + Format EventFormat +} + +func NewGenerator(eventType, source string, eps int, format string, senderC chan<- Event) *Generator { + e := Generator{ + eps: eps, + starttime: time.Now().Format("2006-01-02T15:04:05"), + source: source, + eventtype: eventType, + format: EventFormatFromString(format), + c: senderC, + } + return &e +} + +func updateGeneratorFormat(sub *v1alpha2.Subscription, gen *Generator) { + f := EventFormatFromString(sub.GetLabels()[formatLabel]) + if gen.format != f { + gen.Update(f) + } +} + +func (e *Generator) fillChan(ctx context.Context, c chan<- Event) { + t := time.NewTicker(time.Second) + defer t.Stop() + remaining := e.eps + for { + select { + case <-ctx.Done(): + fmt.Printf("DONE counter %v.%v\n", e.source, e.eventtype) + return + case <-t.C: + remaining = e.eps + default: + if remaining > 0 { + // ensure nowone resets the id atm + e.lock.Lock() + id := e.id + e.id++ + e.lock.Unlock() + + c <- Event{ + EventType: e.eventtype, + Source: e.source, + Sink: e.sink, + ID: id, + StartTime: e.starttime, + Format: e.format, + } + remaining-- + } + } + } +} + +func (e *Generator) Update(format EventFormat) { + // let's stop all concurrency for a while + e.lock.Lock() + e.format = format + e.id = 0 + e.starttime = time.Now().Format("2006-01-02T15:04:05") + e.lock.Unlock() +} + +func (e *Generator) Start() { + ctx, cancel := context.WithCancel(context.Background()) + e.cancel = cancel + e.wg.Add(1) + go func() { + defer e.wg.Done() + e.fillChan(ctx, e.c) + }() +} + +func (e *Generator) Stop() { + e.cancel() + e.wg.Wait() +} + +func EventFormatFromString(format string) EventFormat { + if format == "legacy" { + return Legacy + } + return CloudEvent +} + +type EventFormat int + +func (e EventFormat) String() string { + if e == Legacy { + return "legacy" + } + return "cloudevent" +} + +const ( + Legacy EventFormat = iota + CloudEvent +) diff --git a/internal/loadtest/publisher/publisher.go b/internal/loadtest/publisher/publisher.go index 1b3a704..946aca1 100644 --- a/internal/loadtest/publisher/publisher.go +++ b/internal/loadtest/publisher/publisher.go @@ -8,9 +8,8 @@ import ( "github.com/kyma-project/eventing-tools/internal/k8s" "github.com/kyma-project/eventing-tools/internal/loadtest/config" + "github.com/kyma-project/eventing-tools/internal/loadtest/events" sender2 "github.com/kyma-project/eventing-tools/internal/loadtest/sender" - "github.com/kyma-project/eventing-tools/internal/loadtest/sender/cloudevent" - "github.com/kyma-project/eventing-tools/internal/loadtest/sender/legacyevent" "github.com/kyma-project/eventing-tools/internal/loadtest/subscription" "github.com/kyma-project/eventing-tools/internal/logger" "github.com/kyma-project/eventing-tools/internal/probes" @@ -23,34 +22,26 @@ const ( ) func Start(port int) { - appConfig := config.New() + //appConfig := config.New() k8sConfig := k8s.ConfigOrDie() k8sClient := k8s.ClientOrDie(k8sConfig) dynamicClient := dynamic.NewForConfigOrDie(k8sConfig) - legacySender := legacyevent.NewSender(appConfig) - legacyEventSender := sender2.NewSender(appConfig, legacySender) - - ceSender := cloudevent.NewSender(appConfig) - ceEventSender := sender2.NewSender(appConfig, ceSender) + sender, senderC := sender2.NewSender() + factory := events.NewGeneratorFactory(senderC) config.NewWatcher(k8sClient, Namespace, ConfigMapName). - OnAddNotify(legacyEventSender). - OnUpdateNotify(legacyEventSender). - OnDeleteNotify(legacyEventSender). - OnAddNotify(ceEventSender). - OnUpdateNotify(ceEventSender). - OnDeleteNotify(ceEventSender). - OnDeleteNotifyMe(). + OnAddNotify(sender). + OnUpdateNotify(sender). + OnDeleteNotify(sender). Watch() + sender.Start() + subscription.NewWatcher(dynamicClient). - OnAddNotify(ceEventSender). - OnUpdateNotify(ceEventSender). - OnDeleteNotify(ceEventSender). - OnAddNotify(legacyEventSender). - OnUpdateNotify(legacyEventSender). - OnDeleteNotify(legacyEventSender). + OnAddNotify(factory). + OnUpdateNotify(factory). + OnDeleteNotify(factory). Watch() http.HandleFunc(probes.EndpointReadyz, probes.DefaultHandler) diff --git a/internal/loadtest/sender/cloudevent/sender.go b/internal/loadtest/sender/cloudevent/sender.go index dea14b2..2334ff4 100644 --- a/internal/loadtest/sender/cloudevent/sender.go +++ b/internal/loadtest/sender/cloudevent/sender.go @@ -3,68 +3,77 @@ package cloudevent import ( "context" "fmt" - "net/http" cev2 "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/client" "github.com/kyma-project/eventing-tools/internal/client/cloudevents" + "github.com/kyma-project/eventing-tools/internal/client/transport" "github.com/kyma-project/eventing-tools/internal/loadtest/config" "github.com/kyma-project/eventing-tools/internal/loadtest/events" - "github.com/kyma-project/eventing-tools/internal/loadtest/events/GenericEvent" - "github.com/kyma-project/eventing-tools/internal/loadtest/sender" + "github.com/kyma-project/eventing-tools/internal/loadtest/events/payload" + "github.com/kyma-project/eventing-tools/internal/loadtest/sender/interface" ) // compile-time check for interfaces implementation. -var _ sender.Sender = &Sender{} +var _ _interface.Sender = &Sender{} // Sender sends cloud events. type Sender struct { - client client.Client - config *config.Config + ackC, nackC, undeliveredC chan<- events.Event + client client.Client + config config.Config } -func (s *Sender) Format() string { - return events.CloudeventFormat +func (s *Sender) Format() events.EventFormat { + return events.CloudEvent } -func (s *Sender) Init(t *http.Transport, cfg *config.Config) { - s.config = cfg +func NewSender(conf config.Config, ackC, nackC, undeliveredC chan<- events.Event) *Sender { + t := transport.New(conf.MaxIdleConns, conf.MaxConnsPerHost, conf.MaxIdleConnsPerHost, conf.IdleConnTimeout) + s := &Sender{ + config: conf, + ackC: ackC, + nackC: nackC, + undeliveredC: undeliveredC, + } s.client = cloudevents.NewClientOrDie(t) -} - -func NewSender(conf *config.Config) *Sender { - s := &Sender{config: conf} return s } -func (s *Sender) SendEvent(evt *GenericEvent.Event, ack chan<- int, nack chan<- int, undelivered chan<- int) { - - seq := <-evt.Counter() +func ToCloudEvent(event events.Event) (cev2.Event, error) { + ce := cev2.NewEvent() + ce.SetType(event.EventType) + ce.SetSource(event.Source) + d := payload.DTO{ + Start: event.StartTime, + Value: event.ID, + } + err := ce.SetData(cev2.ApplicationJSON, d) + return ce, err +} - ce, err := evt.ToCloudEvent(seq) +func (s *Sender) SendEvent(event events.Event) { + ce, err := ToCloudEvent(event) if err != nil { return } - endpoint := fmt.Sprintf("%v/publish", s.config.PublishEndpoint) + endpoint := fmt.Sprintf("%v/publish", s.config.PublishHost) ctx := cev2.WithEncodingStructured(cev2.ContextWithTarget(context.Background(), endpoint)) resp := s.client.Send(ctx, ce) switch { case cev2.IsUndelivered(resp): { - undelivered <- 1 - evt.Feedback() <- seq + s.undeliveredC <- event } case cev2.IsACK(resp): { - ack <- 1 - evt.Success() <- seq + s.ackC <- event } case cev2.IsNACK(resp): { - nack <- 1 - evt.Feedback() <- seq + s.nackC <- event } } } diff --git a/internal/loadtest/sender/interface.go b/internal/loadtest/sender/interface/interface.go similarity index 58% rename from internal/loadtest/sender/interface.go rename to internal/loadtest/sender/interface/interface.go index 1247d1c..d9fb537 100644 --- a/internal/loadtest/sender/interface.go +++ b/internal/loadtest/sender/interface/interface.go @@ -1,10 +1,8 @@ -package sender +package _interface import ( - "net/http" - "github.com/kyma-project/eventing-tools/internal/loadtest/config" - "github.com/kyma-project/eventing-tools/internal/loadtest/events/GenericEvent" + "github.com/kyma-project/eventing-tools/internal/loadtest/events" ) type ConfigHandler interface { @@ -14,7 +12,6 @@ type ConfigHandler interface { } type Sender interface { - SendEvent(e *GenericEvent.Event, ack chan<- int, nack chan<- int, undelivered chan<- int) - Format() string - Init(t *http.Transport, cfg *config.Config) + SendEvent(event events.Event) + Format() events.EventFormat } diff --git a/internal/loadtest/sender/legacyevent/sender.go b/internal/loadtest/sender/legacyevent/sender.go index 53188cb..3056057 100644 --- a/internal/loadtest/sender/legacyevent/sender.go +++ b/internal/loadtest/sender/legacyevent/sender.go @@ -7,51 +7,73 @@ import ( "fmt" "io" "net/http" + "strings" + "time" + "github.com/kyma-project/eventing-tools/internal/client/transport" "github.com/kyma-project/eventing-tools/internal/loadtest/config" "github.com/kyma-project/eventing-tools/internal/loadtest/events" - "github.com/kyma-project/eventing-tools/internal/loadtest/events/GenericEvent" - "github.com/kyma-project/eventing-tools/internal/loadtest/sender" + "github.com/kyma-project/eventing-tools/internal/loadtest/events/payload" + "github.com/kyma-project/eventing-tools/internal/loadtest/sender/interface" ) -var _ sender.Sender = &Sender{} +var _ _interface.Sender = &Sender{} // Sender sends legacy events. type Sender struct { - client *http.Client - config *config.Config + ackC, nackC, undeliveredC chan<- events.Event + client *http.Client + config config.Config } -func (s *Sender) Init(t *http.Transport, cfg *config.Config) { - s.config = cfg - s.client = &http.Client{ - Transport: t, - } -} - -func (s *Sender) Format() string { - return events.LegacyFormat +func (s *Sender) Format() events.EventFormat { + return events.Legacy } -func NewSender(conf *config.Config) *Sender { - s := &Sender{config: conf} +func NewSender(cfg config.Config, ackC, nackC, undeliveredC chan<- events.Event) *Sender { + s := &Sender{ + config: cfg, + client: &http.Client{ + Transport: transport.New(cfg.MaxIdleConns, cfg.MaxConnsPerHost, cfg.MaxIdleConnsPerHost, cfg.IdleConnTimeout), + }, + ackC: ackC, + nackC: nackC, + undeliveredC: undeliveredC, + } return s } -func (s *Sender) SendEvent(evt *GenericEvent.Event, ack, nack, undelivered chan<- int) { +func ToLegacyEvent(event events.Event) payload.LegacyEvent { + d := payload.DTO{ + Start: event.StartTime, + Value: event.ID, + } + eventtype, version := splitEventType(event.EventType) + return payload.LegacyEvent{ + Data: d, + EventType: eventtype, + EventTypeVersion: version, + EventTime: time.Now().Format("2006-01-02T15:04:05.000Z"), + EventTracing: true, + } +} - seq := <-evt.Counter() +func splitEventType(eventType string) (string, string) { + i := strings.LastIndex(eventType, ".") + return eventType[0:i], eventType[i+1:] +} +func (s *Sender) SendEvent(event events.Event) { // Build a http request out of the legacy event. - le := evt.ToLegacyEvent(seq) + le := ToLegacyEvent(event) b, err := json.Marshal(le) if err != nil { return } r := bytes.NewReader(b) - legacyEndpoint := fmt.Sprintf("%s/%s/v1/events", s.config.PublishEndpoint, evt.Source()) + legacyEndpoint := fmt.Sprintf("%s/%s/v1/events", s.config.PublishHost, event.Source) rq, err := http.NewRequestWithContext(context.TODO(), http.MethodPost, legacyEndpoint, r) if err != nil { return @@ -60,13 +82,12 @@ func (s *Sender) SendEvent(evt *GenericEvent.Event, ack, nack, undelivered chan< // Send the http request. if s.client == nil { - undelivered <- 1 + s.undeliveredC <- event return } resp, err := s.client.Do(rq) if err != nil { - undelivered <- 1 - evt.Feedback() <- seq + s.undeliveredC <- event return } @@ -77,13 +98,11 @@ func (s *Sender) SendEvent(evt *GenericEvent.Event, ack, nack, undelivered chan< switch { case resp.StatusCode/100 == 2: { - ack <- 1 - evt.Success() <- seq + s.ackC <- event } default: { - nack <- 1 - evt.Feedback() <- seq + s.nackC <- event } } } diff --git a/internal/loadtest/sender/sender.go b/internal/loadtest/sender/sender.go index 0712bb5..5bdd1d9 100644 --- a/internal/loadtest/sender/sender.go +++ b/internal/loadtest/sender/sender.go @@ -3,308 +3,185 @@ package sender import ( "context" "fmt" - "log" - "reflect" - "strings" + "os" + "sort" "sync" - "sync/atomic" + "text/tabwriter" "time" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "github.com/kyma-project/eventing-tools/internal/client/transport" "github.com/kyma-project/eventing-tools/internal/loadtest/config" "github.com/kyma-project/eventing-tools/internal/loadtest/events" - "github.com/kyma-project/eventing-tools/internal/loadtest/events/GenericEvent" - "github.com/kyma-project/eventing-tools/internal/loadtest/events/GenericFactory" - "github.com/kyma-project/eventing-tools/internal/loadtest/subscription" + "github.com/kyma-project/eventing-tools/internal/loadtest/sender/cloudevent" + "github.com/kyma-project/eventing-tools/internal/loadtest/sender/interface" + "github.com/kyma-project/eventing-tools/internal/loadtest/sender/legacyevent" ) // compile-time check for interfaces implementation. -var _ subscription.Notifiable = &EventSender{} +var _ config.Notifiable = &EventSender{} -// Sender sends cloud events. +// Sender sends cloud factories. type EventSender struct { + cfg config.Config ctx context.Context cancel context.CancelFunc - config *config.Config - events map[string][]*GenericEvent.Event - factory events.EventFactory - process chan bool - running bool - undelivered int32 - ack int32 - nack int32 - mapLock sync.RWMutex + ackC, nackC, undeliveredC chan events.Event wg sync.WaitGroup - stopper sync.Mutex - sender Sender - acks, nacks, undelivereds chan int + events chan events.Event + senders []_interface.Sender + limitC chan any + writer *tabwriter.Writer } -func (s *EventSender) FormatName() string { - // TODO implement me - panic("implement me") -} - -func NewSender(conf *config.Config, sender Sender) *EventSender { - s := &EventSender{config: conf} - s.undelivered = 0 - s.ack = 0 - s.nack = 0 - s.events = make(map[string][]*GenericEvent.Event) - s.factory = GenericFactory.New() - s.sender = sender - s.acks = make(chan int) - s.nacks = make(chan int) - s.undelivereds = make(chan int) - - return s -} - -func (s *EventSender) NotifyAdd(cm *corev1.ConfigMap) { - s.stopper.Lock() - defer s.stopper.Unlock() - s.stop() - config.Map(cm, s.config) - t := transport.New(s.config.MaxIdleConns, s.config.MaxConnsPerHost, s.config.MaxIdleConnsPerHost, s.config.IdleConnTimeout) - s.sender.Init(t, s.config) - s.ctx, s.cancel = context.WithCancel(context.TODO()) - s.process = make(chan bool, s.config.EpsLimit) - s.start() -} - -func (s *EventSender) NotifyUpdate(cm *corev1.ConfigMap) { - s.stopper.Lock() - defer s.stopper.Unlock() - s.stop() - config.Map(cm, s.config) - t := transport.New(s.config.MaxIdleConns, s.config.MaxConnsPerHost, s.config.MaxIdleConnsPerHost, s.config.IdleConnTimeout) - s.sender.Init(t, s.config) - s.ctx, s.cancel = context.WithCancel(context.TODO()) - s.process = make(chan bool, s.config.EpsLimit) - s.start() -} +func (s *EventSender) NotifyAdd(configMap *corev1.ConfigMap) { + // TODO update config + config.Map(configMap, &s.cfg) + s.senders = append(s.senders, cloudevent.NewSender(s.cfg, s.ackC, s.nackC, s.undeliveredC)) + s.senders = append(s.senders, legacyevent.NewSender(s.cfg, s.ackC, s.nackC, s.undeliveredC)) -func (s *EventSender) NotifyDelete(*corev1.ConfigMap) { - s.stopper.Lock() - defer s.stopper.Unlock() - s.stop() } -func (s *EventSender) OnNewSubscription(subscription *unstructured.Unstructured) { - ne := s.factory.FromSubscription(subscription, s.sender.Format()) - if len(ne) == 0 { - return - } - s.stopper.Lock() - defer s.stopper.Unlock() - - // s.queue = make(chan events.Event, buffer) - for _, e := range ne { - e.Start() - } - s.mapLock.Lock() - defer s.mapLock.Unlock() - s.events[fmt.Sprintf("%v/%v", subscription.GetNamespace(), subscription.GetName())] = ne +func (s *EventSender) NotifyUpdate(configMap *corev1.ConfigMap) { + //TODO implement me + config.Map(configMap, &s.cfg) + s.senders = make([]_interface.Sender, 0) + s.senders = append(s.senders, cloudevent.NewSender(s.cfg, s.ackC, s.nackC, s.undeliveredC)) + s.senders = append(s.senders, legacyevent.NewSender(s.cfg, s.ackC, s.nackC, s.undeliveredC)) } -func (s *EventSender) OnChangedSubscription(subscription *unstructured.Unstructured) { - if subscription.GetDeletionTimestamp() != nil { - return - } - s.stopper.Lock() - defer s.stopper.Unlock() - for _, e := range s.events[fmt.Sprintf("%v/%v", subscription.GetNamespace(), subscription.GetName())] { - e.Stop() - } - s.mapLock.Lock() - defer s.mapLock.Unlock() - delete(s.events, fmt.Sprintf("%v/%v", subscription.GetNamespace(), subscription.GetName())) - - ne := s.factory.FromSubscription(subscription, s.sender.Format()) - if len(ne) == 0 { - return - } - - for _, e := range ne { - e.Start() - } - s.events[fmt.Sprintf("%v/%v", subscription.GetNamespace(), subscription.GetName())] = ne +func (s *EventSender) NotifyDelete(_ *corev1.ConfigMap) { + s.senders = make([]_interface.Sender, 0) } -func (s *EventSender) OnDeleteSubscription(sub *unstructured.Unstructured) { - s.stopper.Lock() - s.mapLock.Lock() - defer s.mapLock.Unlock() - defer s.stopper.Unlock() - for _, e := range s.events[fmt.Sprintf("%v/%v", sub.GetNamespace(), sub.GetName())] { - e.Stop() +func NewSender() (*EventSender, chan<- events.Event) { + eventsC := make(chan events.Event) + ackC := make(chan events.Event) + nackC := make(chan events.Event) + undeliveredC := make(chan events.Event) + s := &EventSender{ + writer: new(tabwriter.Writer), + events: eventsC, + ackC: ackC, + nackC: nackC, + undeliveredC: undeliveredC, } - delete(s.events, fmt.Sprintf("%v/%v", sub.GetNamespace(), sub.GetName())) -} - -func (s *EventSender) init() { + s.writer.Init(os.Stdout, 8, 8, 0, '\t', tabwriter.AlignRight) + return s, eventsC } -func (s *EventSender) start() { - s.ctx, s.cancel = context.WithCancel(context.TODO()) - s.mapLock.RLock() - for _, subs := range s.events { - for _, e := range subs { - e.Start() - } - } - s.mapLock.RUnlock() - s.sendEventsAsync() +func (s *EventSender) Start() { + s.ctx, s.cancel = context.WithCancel(context.Background()) + s.limitC = make(chan any, 3000) s.wg.Add(1) - go s.refillMaxEps(time.Second) + go func() { + defer s.wg.Done() + s.doAccounting() + }() s.wg.Add(1) - go s.reportUsageAsync(time.Second, 20*time.Second) -} - -func (s *EventSender) stop() { - // recover from closing already closed channels - defer func() { - if r := recover(); r != nil { - log.Println("recovered from: ", r) - } + go func() { + defer s.wg.Done() + s.sendEvents() }() - s.mapLock.RLock() - for _, subs := range s.events { - for _, e := range subs { - e.Stop() - } - } - s.mapLock.RUnlock() +} - s.running = false +func (s *EventSender) Stop() { s.cancel() - close(s.process) s.wg.Wait() } -func (s *EventSender) sendEventsAsync() { - for i := 0; i < s.config.Workers; i++ { - s.wg.Add(1) - go s.sendEvents() +func (s *EventSender) sendEvents() { + for { + select { + case e := <-s.events: + // here we have to actually send messages to the sink + for _, es := range s.senders { + if es.Format() == e.Format { + s.limitC <- struct{}{} + go func() { + es.SendEvent(e) + <-s.limitC + }() + break + } + } + case <-s.ctx.Done(): + return + } } } -func (s *EventSender) reportUsageAsync(send, success time.Duration) { +type stat struct { + acks, nacks, undelivered int +} - defer func() { - s.wg.Done() - }() +type stats map[string]stat - sendt := time.NewTicker(send) - defer sendt.Stop() - succt := time.NewTicker(success) - defer succt.Stop() +func (s *EventSender) doAccounting() { + dur := 10 * time.Second + tickOne := time.NewTicker(1 * time.Second) + tickTen := time.NewTicker(dur) + defer tickOne.Stop() + defer tickTen.Stop() + cs := make(stats) + var all stat + fmt.Fprintf(s.writer, "%s\t%s\t%s", "ACK", "NACK", "UNDELIVERED") + fmt.Fprintf(s.writer, "\n%s\t%s\t%s", "----", "----", "----") + s.writer.Flush() for { select { - case na := <-s.acks: - atomic.AddInt32(&s.ack, int32(na)) - case nn := <-s.nacks: - atomic.AddInt32(&s.nack, int32(nn)) - case nu := <-s.undelivereds: - atomic.AddInt32(&s.undelivered, int32(nu)) - + case <-tickOne.C: + fmt.Fprintf(s.writer, "\n%v\t%v\t%v", all.acks, all.nacks, all.undelivered) + s.writer.Flush() + all = stat{} + case <-tickTen.C: + s.printStats(cs, dur) + cs = make(stats) + case e := <-s.ackC: + all.acks++ + st := cs["["+e.Format.String()+"]"+e.Source+"/"+e.EventType] + st.acks++ + cs["["+e.Format.String()+"]"+e.Source+"/"+e.EventType] = st + case e := <-s.nackC: + all.nacks++ + st := cs["["+e.Format.String()+"]"+e.Source+"/"+e.EventType] + st.nacks++ + cs["["+e.Format.String()+"]"+e.Source+"/"+e.EventType] = st + case e := <-s.undeliveredC: + all.undelivered++ + st := cs["["+e.Format.String()+"]"+e.Source+"/"+e.EventType] + st.undelivered++ + cs["["+e.Format.String()+"]"+e.Source+"/"+e.EventType] = st case <-s.ctx.Done(): - targetEPS := s.ComputeTotalEventsPerSecond() - log.Printf( - "%v: | target_eps:% 4d (% 4d)| undelivered:% 4d | ack:% 4d | nack:% 4d | sum:% 4d |", - s.sender.Format(), targetEPS, s.config.EpsLimit, s.undelivered, s.ack, s.nack, s.undelivered+s.ack+s.nack, - ) + fmt.Fprintf(s.writer, "\n%v\t%v\t%v\n", all.acks, all.nacks, all.undelivered) + s.writer.Flush() + s.printStats(cs, dur) return - case <-sendt.C: - targetEPS := s.ComputeTotalEventsPerSecond() - if targetEPS == 0 { - continue - } - log.Printf( - "%v: | target_eps:% 4d (% 4d)| undelivered:% 4d | ack:% 4d | nack:% 4d | sum:% 4d |", - s.sender.Format(), targetEPS, s.config.EpsLimit, s.undelivered, s.ack, s.nack, s.undelivered+s.ack+s.nack, - ) - // reset counts for last report - atomic.StoreInt32(&s.undelivered, 0) - atomic.StoreInt32(&s.ack, 0) - atomic.StoreInt32(&s.nack, 0) - case <-succt.C: - s.mapLock.RLock() - stats := []string{fmt.Sprintf("%v:", s.sender.Format())} - for _, subs := range s.events { - for _, e := range subs { - stats = append(stats, e.PrintStats()) - } - } - fmt.Println(strings.Join(stats, "\n\t")) - s.mapLock.RUnlock() } } } -func (s *EventSender) sendEvents() { - for { - var cases []reflect.SelectCase - s.mapLock.RLock() - for _, subs := range s.events { - for _, e := range subs { - if e.Events() != nil { - cases = append(cases, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(e.Events()), - }) - } - } - } - s.mapLock.RUnlock() - if len(cases) == 0 { - time.Sleep(500 * time.Millisecond) - continue - } +func (s *EventSender) printStats(cs stats, interval time.Duration) { + // initialize tabwriter - _, value, ok := reflect.Select(cases) - if !ok { - continue - } + fmt.Fprint(s.writer, "\n--------------------------------------------") + s.writer.Flush() - e := value.Interface().(*GenericEvent.Event) - <-s.process - go s.sender.SendEvent(e, s.acks, s.nacks, s.undelivereds) - } -} + fmt.Fprintf(s.writer, "\n%s\t%s\t%s\t%s", "TYPE", "ACK", "NACK", "UNDELIVERED") + fmt.Fprintf(s.writer, "\n%s\t%s\t%s\t%s\t", "----", "----", "----", "----") -func (s *EventSender) ComputeTotalEventsPerSecond() int { - eps := 0 - s.mapLock.RLock() - for _, subs := range s.events { - for _, e := range subs { - eps += e.Eps() - } + var ak []string + for k := range cs { + ak = append(ak, k) } - s.mapLock.RUnlock() - return eps -} - -func (s *EventSender) refillMaxEps(d time.Duration) { - defer func() { - if r := recover(); r != nil { - fmt.Println("Recovered in refillMaxEps", r) - } - }() - - t := time.NewTicker(d) - for { - select { - case <-t.C: - for i := 0; i < s.config.EpsLimit; i++ { - s.process <- true - } - case <-s.ctx.Done(): - return - } + sort.Strings(ak) + for _, k := range ak { + fmt.Fprintf(s.writer, "\n%v\t%v\t%v\t%v", k, float64(cs[k].acks)/interval.Seconds(), float64(cs[k].nacks)/interval.Seconds(), float64(cs[k].undelivered)/interval.Seconds()) } + s.writer.Flush() + fmt.Fprintln(s.writer, "\n--------------------------------------------") + s.writer.Flush() + fmt.Fprintf(s.writer, "%s\t%s\t%s\t", "ACK", "NACK", "UNDELIVERED") + fmt.Fprintf(s.writer, "\n%s\t%s\t%s\t", "----", "----", "----") } diff --git a/resources/loadtest/base/configmap.yaml b/resources/loadtest/base/configmap.yaml index 382e9f3..d1415df 100644 --- a/resources/loadtest/base/configmap.yaml +++ b/resources/loadtest/base/configmap.yaml @@ -4,7 +4,7 @@ metadata: name: loadtest-publisher namespace: eventing-test data: - publish_endpoint: "http://eventing-publisher-proxy.kyma-system" + publish_host: "http://eventing-publisher-proxy.kyma-system" use_legacy_events: "false" event_source: "noapp" version_format: "v%04d" diff --git a/resources/loadtest/base/kustomization.yaml b/resources/loadtest/base/kustomization.yaml index 1573173..f4216b7 100644 --- a/resources/loadtest/base/kustomization.yaml +++ b/resources/loadtest/base/kustomization.yaml @@ -17,3 +17,4 @@ resources: - rbac.yaml - service.yaml - subscriber.yaml +- subscriptions.yaml diff --git a/resources/loadtest/base/subscriptions.yaml b/resources/loadtest/base/subscriptions.yaml new file mode 100644 index 0000000..fd2f52c --- /dev/null +++ b/resources/loadtest/base/subscriptions.yaml @@ -0,0 +1,47 @@ +apiVersion: v1 +kind: List +items: + - apiVersion: eventing.kyma-project.io/v1alpha2 + kind: Subscription + metadata: + labels: + app: partner-handler-queue + eventing-loadtest: cloudevent + name: cloudevents + namespace: eventing-test + spec: + sink: http://loadtest-subscriber-0.eventing-test.svc.cluster.local + config: + maxInFlightMessages: "200" + source: "cloudevent" + typeMatching: standard + types: + - CE.v8 + - CE.v16 + - CE.v32 + - CE.v64 + - CE.v128 + - CE.v256 + - CE.v512 + - apiVersion: eventing.kyma-project.io/v1alpha2 + kind: Subscription + metadata: + labels: + app: partner-handler-queue + eventing-loadtest: legacy + name: legacy + namespace: eventing-test + spec: + sink: http://loadtest-subscriber-1.eventing-test.svc.cluster.local + config: + maxInFlightMessages: "200" + source: "legacy" + typeMatching: standard + types: + - LEGACY.v8 + - LEGACY.v16 + - LEGACY.v32 + - LEGACY.v64 + - LEGACY.v128 + - LEGACY.v256 + - LEGACY.v512