Skip to content

Commit

Permalink
Merge pull request #1094 from openziti/event-updates
Browse files Browse the repository at this point in the history
Update for changes to fabric events model
  • Loading branch information
plorenz authored Jul 19, 2022
2 parents ca9be53 + 1fb3339 commit 5135777
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 53 deletions.
4 changes: 2 additions & 2 deletions controller/env/appenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ import (
"github.com/openziti/fabric/controller/network"
"github.com/openziti/fabric/controller/xctrl"
"github.com/openziti/fabric/controller/xmgmt"
"github.com/openziti/foundation/v2/errorz"
"github.com/openziti/identity"
"github.com/openziti/metrics"
"github.com/openziti/foundation/v2/errorz"
"github.com/openziti/sdk-golang/ziti/config"
"github.com/openziti/sdk-golang/ziti/constants"
"github.com/openziti/storage/boltz"
Expand Down Expand Up @@ -465,7 +465,7 @@ func (ae *AppEnv) InitPersistence() error {
})

ae.Managers = model.InitEntityManagers(ae)
events.Init(ae.GetDbProvider(), ae.BoltStores, ae.GetHostController().GetCloseNotifyChannel())
events.Init(ae.GetHostController().GetNetwork(), ae.GetDbProvider(), ae.BoltStores, ae.GetHostController().GetCloseNotifyChannel())

persistence.ServiceEvents.AddServiceEventHandler(ae.HandleServiceEvent)
ae.BoltStores.Identity.AddListener(boltz.EventDelete, func(i ...interface{}) {
Expand Down
22 changes: 15 additions & 7 deletions controller/persistence/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/openziti/fabric/controller/network"
"github.com/openziti/fabric/controller/xt"
"github.com/openziti/fabric/controller/xt_smartrouting"
"github.com/openziti/fabric/event"
"github.com/openziti/fabric/events"
tests "github.com/openziti/fabric/tests"
"github.com/openziti/foundation/v2/versions"
"github.com/openziti/identity"
Expand All @@ -34,13 +36,6 @@ import (
"testing"
)

type testConfig struct {
ctx *TestContext
options *network.Options
metricsRegistry metrics.Registry
versionProvider versions.VersionProvider
}

func newTestConfig(ctx *TestContext) *testConfig {
options := network.DefaultOptions()
options.MinRouterCost = 0
Expand All @@ -50,9 +45,22 @@ func newTestConfig(ctx *TestContext) *testConfig {
options: options,
metricsRegistry: metrics.NewRegistry("test", nil),
versionProvider: tests.NewVersionProviderTest(),
eventDispatcher: events.NewDispatcher(ctx.closeNotify),
}
}

type testConfig struct {
ctx *TestContext
options *network.Options
metricsRegistry metrics.Registry
versionProvider versions.VersionProvider
eventDispatcher *events.Dispatcher
}

func (self *testConfig) GetEventDispatcher() event.Dispatcher {
return self.eventDispatcher
}

func (self *testConfig) GetId() *identity.TokenId {
return &identity.TokenId{Token: "test"}
}
Expand Down
5 changes: 0 additions & 5 deletions events/formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@ import (
"strings"
)

func init() {
events.RegisterEventHandlerType("file", edgeFileEventLoggerFactory{})
events.RegisterEventHandlerType("stdout", edgeStdOutLoggerFactory{})
}

type edgeFormatterFactory struct{}

func (f edgeFormatterFactory) NewLoggingHandler(format string, buffer int, out io.WriteCloser) (interface{}, error) {
Expand Down
19 changes: 10 additions & 9 deletions events/registrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,12 @@ package events

import (
"github.com/openziti/edge/controller/persistence"
"github.com/openziti/fabric/events"
"github.com/openziti/storage/boltz"
"github.com/openziti/fabric/controller/network"
"github.com/openziti/foundation/v2/cowslice"
"github.com/openziti/storage/boltz"
"time"
)

func init() {
events.RegisterEventType(ApiSessionEventNS, registerApiSessionEventHandler)
events.RegisterEventType(SessionEventNS, registerSessionEventHandler)
events.RegisterEventType(EntityCountEventNS, registerEntityCountEventHandler)
}

func AddSessionEventHandler(handler SessionEventHandler) {
cowslice.Append(sessionEventHandlerRegistry, handler)
}
Expand All @@ -30,7 +24,14 @@ func RemoveApiSessionEventHandler(handler ApiSessionEventHandler) {
cowslice.Delete(apiSessionEventHandlerRegistry, handler)
}

func Init(dbProvider persistence.DbProvider, stores *persistence.Stores, closeNotify <-chan struct{}) {
func Init(n *network.Network, dbProvider persistence.DbProvider, stores *persistence.Stores, closeNotify <-chan struct{}) {
n.GetEventDispatcher().RegisterEventType(ApiSessionEventNS, registerApiSessionEventHandler)
n.GetEventDispatcher().RegisterEventType(SessionEventNS, registerSessionEventHandler)
n.GetEventDispatcher().RegisterEventType(EntityCountEventNS, registerEntityCountEventHandler)

n.GetEventDispatcher().RegisterEventHandlerFactory("file", edgeFileEventLoggerFactory{})
n.GetEventDispatcher().RegisterEventHandlerFactory("stdout", edgeStdOutLoggerFactory{})

stores.ApiSession.AddListener(boltz.EventCreate, apiSessionCreated)
stores.ApiSession.AddListener(boltz.EventDelete, apiSessionDeleted)

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/netfoundry/secretstream v0.1.2
github.com/openziti/channel v0.18.57
github.com/openziti/fabric v0.19.27
github.com/openziti/fabric v0.19.28
github.com/openziti/foundation/v2 v2.0.1
github.com/openziti/identity v1.0.5
github.com/openziti/jwks v1.0.1
Expand All @@ -46,7 +46,7 @@ require (
github.com/orcaman/concurrent-map/v2 v2.0.0
github.com/pkg/errors v0.9.1
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/sirupsen/logrus v1.8.1
github.com/sirupsen/logrus v1.9.0
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
github.com/spf13/cobra v1.5.0
github.com/spf13/pflag v1.0.5
Expand Down
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,8 @@ github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
github.com/openziti/channel v0.18.57 h1:6Kp0kDMr18Ra2FZ+XR+GjmdOaGoP2NghgI8x0oWvXhs=
github.com/openziti/channel v0.18.57/go.mod h1:Q5iXnIvs9nWoCcx+S+yQ9BPiAhnRshbVP8zoKviztOw=
github.com/openziti/fabric v0.19.27 h1:xy++igCRCdoQYiSh8/AKHBJxuex9G7/O7hCy9+ueHWk=
github.com/openziti/fabric v0.19.27/go.mod h1:4oeFKVqCQan1MjXH7v6je8gO4dCSYbZOW9Ljfi9F8Mw=
github.com/openziti/fabric v0.19.28 h1:uBh4VeaMEvgm3JMDuLPvg9kq1Am6csXECvbrvrJaxxo=
github.com/openziti/fabric v0.19.28/go.mod h1:8zgOd4JZGtC+tgscMPJV2WBQLVHLtMZZUAHvfs5eNyk=
github.com/openziti/foundation/v2 v2.0.1 h1:HKYwAyxlZ4zmlt0t9Y/v6SWso0qUaeWapkWOlQDKPEk=
github.com/openziti/foundation/v2 v2.0.1/go.mod h1:L75kwCC5WTUPqxuAd3G+WMBompaElMb/nYlJjR1sJ9Q=
github.com/openziti/identity v1.0.5 h1:PKW1tj0ctOWgscL3P4gKiVFfKrP6dYeP0U6R8Aw6DKM=
Expand Down Expand Up @@ -508,8 +508,9 @@ github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx
github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e h1:MRM5ITcdelLK2j1vwZ3Je0FKVCfqOLp5zO6trqMLYs0=
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e/go.mod h1:XV66xRDqSt+GTGFMVlhk3ULuV0y9ZmzeVGR4mloJI3M=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
Expand Down
8 changes: 8 additions & 0 deletions router/xgress_edge/perf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ type mirrorLink struct {
acks chan *xgress.Acknowledgement
}

func (link *mirrorLink) DialAddress() string {
return "tcp:localhost:1234"
}

func (link *mirrorLink) GetAddresses() []*ctrl_pb.LinkConn {
return nil
}

func (link *mirrorLink) IsClosed() bool {
return false
}
Expand Down
52 changes: 27 additions & 25 deletions tests/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"fmt"
events2 "github.com/openziti/edge/events"
"github.com/openziti/fabric/controller/xt_smartrouting"
"github.com/openziti/fabric/events"
"github.com/openziti/fabric/event"
"github.com/openziti/foundation/v2/concurrenz"
"github.com/openziti/foundation/v2/stringz"
"github.com/pkg/errors"
Expand Down Expand Up @@ -57,7 +57,7 @@ func (self *eventsCollector) acceptEvent(event interface{}) {
logrus.Warnf("%v: %v %+v\n", reflect.TypeOf(event), event, event)
}

func (self *eventsCollector) AcceptUsageEvent(event *events.UsageEvent) {
func (self *eventsCollector) AcceptUsageEvent(event *event.UsageEvent) {
self.acceptEvent(event)
if self.notified.CompareAndSwap(false, true) {
close(self.usageEventNotify)
Expand All @@ -68,7 +68,7 @@ func (self *eventsCollector) AcceptSessionEvent(event *events2.SessionEvent) {
self.acceptEvent(event)
}

func (self *eventsCollector) AcceptCircuitEvent(event *events.CircuitEvent) {
func (self *eventsCollector) AcceptCircuitEvent(event *event.CircuitEvent) {
self.acceptEvent(event)
}

Expand All @@ -83,20 +83,22 @@ func Test_EventsTest(t *testing.T) {
ctx := NewTestContext(t)
defer ctx.Teardown()

ctx.StartServer()

ec := &eventsCollector{
usageEventNotify: make(chan struct{}),
}

unregisterFabricSessionEventsHandler := events.RegisterCircuitEventHandler(ec)
defer unregisterFabricSessionEventsHandler()
dispatcher := ctx.fabricController.GetEventDispatcher()

dispatcher.AddCircuitEventHandler(ec)
defer dispatcher.RemoveCircuitEventHandler(ec)

events2.AddSessionEventHandler(ec)
defer events2.RemoveSessionEventHandler(ec)

unregisterUsageEventsHandler := events.RegisterUsageEventHandler(ec)
defer unregisterUsageEventsHandler()

ctx.StartServer()
dispatcher.AddUsageEventHandler(ec)
defer dispatcher.RemoveUsageEventHandler(ec)

ctx.RequireAdminManagementApiLogin()
ctx.RequireAdminClientApiLogin()
Expand Down Expand Up @@ -135,48 +137,48 @@ func Test_EventsTest(t *testing.T) {

ctx.Teardown()

for _, event := range ec.events {
fmt.Printf("%v: %v %+v\n", reflect.TypeOf(event), event, event)
for _, evt := range ec.events {
fmt.Printf("%v: %v %+v\n", reflect.TypeOf(evt), evt, evt)
}

event := ec.PopNextEvent(ctx)
edgeSession, ok := event.(*events2.SessionEvent)
evt := ec.PopNextEvent(ctx)
edgeSession, ok := evt.(*events2.SessionEvent)
ctx.Req.True(ok)
ctx.Req.Equal("edge.sessions", edgeSession.Namespace)
ctx.Req.Equal("created", edgeSession.EventType)
ctx.Req.Equal(hostIdentity.Id, edgeSession.IdentityId)

event = ec.PopNextEvent(ctx)
edgeSession, ok = event.(*events2.SessionEvent)
evt = ec.PopNextEvent(ctx)
edgeSession, ok = evt.(*events2.SessionEvent)
ctx.Req.True(ok)
ctx.Req.Equal("edge.sessions", edgeSession.Namespace)
ctx.Req.Equal("created", edgeSession.EventType)
ctx.Req.Equal(clientIdentity.Id, edgeSession.IdentityId)

event = ec.PopNextEvent(ctx)
circuitEvent, ok := event.(*events.CircuitEvent)
evt = ec.PopNextEvent(ctx)
circuitEvent, ok := evt.(*event.CircuitEvent)
ctx.Req.True(ok)
ctx.Req.Equal("fabric.circuits", circuitEvent.Namespace)
ctx.Req.Equal("created", circuitEvent.EventType)
ctx.Req.Equal("created", string(circuitEvent.EventType))
ctx.Req.Equal(service.Id, circuitEvent.ServiceId)
ctx.Req.Equal(edgeSession.Id, circuitEvent.ClientId)

for i := 0; i < 3; i++ {
event = ec.PopNextEvent(ctx)
if usage, ok := event.(*events.UsageEvent); ok {
evt = ec.PopNextEvent(ctx)
if usage, ok := evt.(*event.UsageEvent); ok {
ctx.Req.Equal("fabric.usage", usage.Namespace)
ctx.Req.Equal(uint32(2), usage.Version)
ctx.Req.Equal(circuitEvent.CircuitId, usage.CircuitId)
expected := []string{"usage.ingress.rx", "usage.egress.tx"}
ctx.Req.True(stringz.Contains(expected, usage.EventType), "was %v, expected one of %+v", usage.EventType, expected)
ctx.Req.Equal(ctx.edgeRouterEntity.id, usage.SourceId)
ctx.Req.Equal(uint64(26), usage.Usage)
} else if fabricSession, ok := event.(*events.CircuitEvent); ok {
ctx.Req.Equal("fabric.circuits", fabricSession.Namespace)
ctx.Req.Equal("deleted", fabricSession.EventType)
ctx.Req.Equal(edgeSession.Id, fabricSession.ClientId)
} else if circuitEvent, ok := evt.(*event.CircuitEvent); ok {
ctx.Req.Equal("fabric.circuits", circuitEvent.Namespace)
ctx.Req.Equal("deleted", string(circuitEvent.EventType))
ctx.Req.Equal(edgeSession.Id, circuitEvent.ClientId)
} else {
ctx.Req.Fail("unexpected event type: %v", reflect.TypeOf(event))
ctx.Req.Fail("unexpected event type: %v", reflect.TypeOf(evt))
}
}
}

0 comments on commit 5135777

Please sign in to comment.