Skip to content

Commit

Permalink
Merge pull request #1483 from openziti/instant-sync-tidy
Browse files Browse the repository at this point in the history
Add additional logging. Remove unnecessary router lookups. May addres…
  • Loading branch information
plorenz authored Oct 31, 2023
2 parents c968338 + b270d18 commit 63cb63f
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 92 deletions.
99 changes: 55 additions & 44 deletions controller/handler_ctrl/accept.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,64 +59,75 @@ func (self *CtrlAccepter) Bind(binding channel.Binding) error {
ch := binding.GetChannel()

log := pfxlog.Logger().WithField("routerId", ch.Id())

// Use a new copy of the router instance each time we connect. That way we can tell on disconnect
// if we're working with the right connection, in case connects and disconnects happen quickly.
// It also means that the channel and connected time fields don't change and we don't have to protect them
if r, err := self.network.GetReloadedRouter(ch.Id()); err == nil {
if ch.Underlay().Headers() != nil {
if versionValue, found := ch.Underlay().Headers()[channel.HelloVersionHeader]; found {
if versionInfo, err := self.network.VersionProvider.EncoderDecoder().Decode(versionValue); err == nil {
r.VersionInfo = versionInfo
} else {
return errors.Wrap(err, "could not parse version info from router hello, closing router connection")
}
r, err := self.network.GetReloadedRouter(ch.Id())
if err != nil {
return err
}
if r == nil {
return errors.Errorf("no router with id [%v] found, closing connection", ch.Id())
}

if ch.Underlay().Headers() != nil {
if versionValue, found := ch.Underlay().Headers()[channel.HelloVersionHeader]; found {
if versionInfo, err := self.network.VersionProvider.EncoderDecoder().Decode(versionValue); err == nil {
r.VersionInfo = versionInfo
log = log.WithField("version", r.VersionInfo.Version).
WithField("revision", r.VersionInfo.Revision).
WithField("buildDate", r.VersionInfo.BuildDate).
WithField("os", r.VersionInfo.OS).
WithField("arch", r.VersionInfo.Arch)
} else {
return errors.New("no version info header, closing router connection")
return errors.Wrap(err, "could not parse version info from router hello, not accepting router connection")
}
r.Listeners = nil
if val, found := ch.Underlay().Headers()[int32(ctrl_pb.ContentType_ListenersHeader)]; found {
log.Debug("router reported listeners using listeners header")
listeners := &ctrl_pb.Listeners{}
if err := proto.Unmarshal(val, listeners); err != nil {
log.WithError(err).Error("unable to unmarshall listeners value")
} else {
r.SetLinkListeners(listeners.Listeners)
for _, listener := range listeners.Listeners {
log.WithField("address", listener.GetAddress()).
WithField("protocol", listener.GetProtocol()).
WithField("costTags", listener.GetCostTags()).
Debug("router listener")
}
}
} else {
return errors.New("no version info header, not accepting router connection")
}

r.Listeners = nil
if val, found := ch.Underlay().Headers()[int32(ctrl_pb.ContentType_ListenersHeader)]; found {
listeners := &ctrl_pb.Listeners{}
if err = proto.Unmarshal(val, listeners); err != nil {
log.WithError(err).Error("unable to unmarshall listeners value")
} else {
log.Warn("no advertised listeners")
}
if val, found := ch.Underlay().Headers()[int32(ctrl_pb.ContentType_RouterMetadataHeader)]; found {
log.Debug("router reported listeners using listeners header")
routerMetadata := &ctrl_pb.RouterMetadata{}
if err = proto.Unmarshal(val, routerMetadata); err != nil {
log.WithError(err).Error("unable to unmarshall router metadata value")
r.SetLinkListeners(listeners.Listeners)
for _, listener := range listeners.Listeners {
log.WithField("address", listener.GetAddress()).
WithField("protocol", listener.GetProtocol()).
WithField("costTags", listener.GetCostTags()).
Debug("router listener")
}
r.SetMetadata(routerMetadata)
}
} else {
return errors.New("no version info header, closing router connection")
}

r.Control = ch
r.ConnectTime = time.Now()
if err := binding.Bind(newBindHandler(self.heartbeatOptions, r, self.network, self.xctrls)); err != nil {
return errors.Wrap(err, "error binding router")
log.Debug("no advertised listeners")
}

if self.traceHandler != nil {
binding.AddPeekHandler(self.traceHandler)
if val, found := ch.Underlay().Headers()[int32(ctrl_pb.ContentType_RouterMetadataHeader)]; found {
routerMetadata := &ctrl_pb.RouterMetadata{}
if err = proto.Unmarshal(val, routerMetadata); err != nil {
log.WithError(err).Error("unable to unmarshall router metadata value")
}
r.SetMetadata(routerMetadata)
}
} else {
return errors.New("channel provided no headers, not accepting router connection as version info not provided")
}

log.Infof("accepted new router connection [r/%s]", r.Id)
r.Control = ch
r.ConnectTime = time.Now()
if err := binding.Bind(newBindHandler(self.heartbeatOptions, r, self.network, self.xctrls)); err != nil {
return errors.Wrap(err, "error binding router")
}

self.network.ConnectRouter(r)
if self.traceHandler != nil {
binding.AddPeekHandler(self.traceHandler)
}

log.Info("accepted new router connection")

self.network.ConnectRouter(r)

return nil
}
14 changes: 3 additions & 11 deletions controller/handler_edge_ctrl/hello.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,15 @@ import (
"github.com/openziti/channel/v2"
"github.com/openziti/ziti/common/pb/edge_ctrl_pb"
"github.com/openziti/ziti/controller/env"
"github.com/openziti/ziti/controller/network"
"google.golang.org/protobuf/proto"
)

type helloHandler struct {
appEnv *env.AppEnv
callback func(r *network.Router, respHello *edge_ctrl_pb.ClientHello)
callback func(routerId string, respHello *edge_ctrl_pb.ClientHello)
}

func NewHelloHandler(appEnv *env.AppEnv, callback func(r *network.Router, respHello *edge_ctrl_pb.ClientHello)) *helloHandler {
func NewHelloHandler(appEnv *env.AppEnv, callback func(routerId string, respHello *edge_ctrl_pb.ClientHello)) *helloHandler {
return &helloHandler{
appEnv: appEnv,
callback: callback,
Expand All @@ -48,12 +47,5 @@ func (h *helloHandler) HandleReceive(msg *channel.Message, ch channel.Channel) {
return
}

r := h.appEnv.GetHostController().GetNetwork().GetConnectedRouter(ch.Id())
if r == nil {
pfxlog.Logger().Errorf("could not find router %v, closing channel", ch.Id())
_ = ch.Close()
return
}

h.callback(r, respHello)
h.callback(ch.Id(), respHello)
}
14 changes: 3 additions & 11 deletions controller/handler_edge_ctrl/resync.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,15 @@ import (
"github.com/openziti/channel/v2"
"github.com/openziti/ziti/common/pb/edge_ctrl_pb"
"github.com/openziti/ziti/controller/env"
"github.com/openziti/ziti/controller/network"
"google.golang.org/protobuf/proto"
)

type resyncHandler struct {
appEnv *env.AppEnv
callback func(r *network.Router, respHello *edge_ctrl_pb.RequestClientReSync)
callback func(routerId string, respHello *edge_ctrl_pb.RequestClientReSync)
}

func NewResyncHandler(appEnv *env.AppEnv, callback func(r *network.Router, respHello *edge_ctrl_pb.RequestClientReSync)) *resyncHandler {
func NewResyncHandler(appEnv *env.AppEnv, callback func(routerId string, respHello *edge_ctrl_pb.RequestClientReSync)) *resyncHandler {
return &resyncHandler{
appEnv: appEnv,
callback: callback,
Expand All @@ -48,12 +47,5 @@ func (h *resyncHandler) HandleReceive(msg *channel.Message, ch channel.Channel)
return
}

r, err := h.appEnv.GetHostController().GetNetwork().GetRouter(ch.Id())
if err != nil {
pfxlog.Logger().WithError(err).Errorf("could not find router %v, closing channel", ch.Id())
_ = ch.Close()
return
}

h.callback(r, resyncReq)
h.callback(ch.Id(), resyncReq)
}
56 changes: 30 additions & 26 deletions controller/sync_strats/sync_instant.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ import (
"github.com/lucsky/cuid"
"github.com/michaelquigley/pfxlog"
"github.com/openziti/channel/v2"
"github.com/openziti/foundation/v2/debugz"
"github.com/openziti/foundation/v2/genext"
"github.com/openziti/storage/ast"
"github.com/openziti/ziti/common/build"
"github.com/openziti/ziti/common/pb/edge_ctrl_pb"
"github.com/openziti/ziti/controller/env"
"github.com/openziti/ziti/controller/event"
"github.com/openziti/ziti/controller/handler_edge_ctrl"
"github.com/openziti/ziti/controller/model"
"github.com/openziti/ziti/controller/persistence"
"github.com/openziti/ziti/common/build"
"github.com/openziti/ziti/controller/network"
"github.com/openziti/ziti/controller/event"
"github.com/openziti/foundation/v2/debugz"
"github.com/openziti/foundation/v2/genext"
"github.com/openziti/storage/ast"
"github.com/openziti/ziti/controller/persistence"
cmap "github.com/orcaman/concurrent-map/v2"
"go.etcd.io/bbolt"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -431,14 +431,18 @@ func (strategy *InstantStrategy) sendHello(rtx *RouterSender) {
}
}

func (strategy *InstantStrategy) ReceiveResync(r *network.Router, _ *edge_ctrl_pb.RequestClientReSync) {
rtx := strategy.rtxMap.Get(r.Id)
func (strategy *InstantStrategy) ReceiveResync(routerId string, _ *edge_ctrl_pb.RequestClientReSync) {
rtx := strategy.rtxMap.Get(routerId)

if rtx == nil {
routerName := "<unable to retrieve>"
if router, _ := strategy.ae.Managers.Router.Read(routerId); router != nil {
routerName = router.Name
}
pfxlog.Logger().
WithField("strategy", strategy.Type()).
WithField("routerId", r.Id).
WithField("routerName", r.Name).
WithField("routerId", routerId).
WithField("routerName", routerName).
Error("received resync from router that is currently not tracked by the strategy, dropping resync")
return
}
Expand All @@ -450,14 +454,18 @@ func (strategy *InstantStrategy) ReceiveResync(r *network.Router, _ *edge_ctrl_p
strategy.receivedClientHelloQueue <- rtx
}

func (strategy *InstantStrategy) ReceiveClientHello(r *network.Router, respHello *edge_ctrl_pb.ClientHello) {
rtx := strategy.rtxMap.Get(r.Id)
func (strategy *InstantStrategy) ReceiveClientHello(routerId string, respHello *edge_ctrl_pb.ClientHello) {
rtx := strategy.rtxMap.Get(routerId)

if rtx == nil {
routerName := "<unable to retrieve>"
if router, _ := strategy.ae.Managers.Router.Read(routerId); router != nil {
routerName = router.Name
}
pfxlog.Logger().
WithField("strategy", strategy.Type()).
WithField("routerId", r.Id).
WithField("routerName", r.Name).
WithField("routerId", routerId).
WithField("routerName", routerName).
Error("received hello from router that is currently not tracked by the strategy, dropping hello")
return
}
Expand All @@ -467,17 +475,12 @@ func (strategy *InstantStrategy) ReceiveClientHello(r *network.Router, respHello
WithField("protocols", respHello.Protocols).
WithField("protocolPorts", respHello.ProtocolPorts).
WithField("listeners", respHello.Listeners).
WithField("data", respHello.Data)

serverVersion := build.GetBuildInfo().Version()

if r.VersionInfo != nil {
logger = logger.WithField("version", r.VersionInfo.Version).
WithField("revision", r.VersionInfo.Revision).
WithField("buildDate", r.VersionInfo.BuildDate).
WithField("os", r.VersionInfo.OS).
WithField("arch", r.VersionInfo.Arch)
}
WithField("data", respHello.Data).
WithField("version", rtx.Router.VersionInfo.Version).
WithField("revision", rtx.Router.VersionInfo.Revision).
WithField("buildDate", rtx.Router.VersionInfo.BuildDate).
WithField("os", rtx.Router.VersionInfo.OS).
WithField("arch", rtx.Router.VersionInfo.Arch)

protocols := map[string]string{}

Expand All @@ -497,8 +500,9 @@ func (strategy *InstantStrategy) ReceiveClientHello(r *network.Router, respHello

rtx.SetHostname(respHello.Hostname)
rtx.SetProtocols(protocols)
rtx.SetVersionInfo(*r.VersionInfo)
rtx.SetVersionInfo(*rtx.Router.VersionInfo)

serverVersion := build.GetBuildInfo().Version()
logger.Infof("edge router sent hello with version [%s] to controller with version [%s]", respHello.Version, serverVersion)
strategy.receivedClientHelloQueue <- rtx
}
Expand Down

0 comments on commit 63cb63f

Please sign in to comment.