Skip to content

Commit

Permalink
feat: route reconciliation phase 3
Browse files Browse the repository at this point in the history
  • Loading branch information
aauren committed Oct 25, 2024
1 parent 482ebec commit 5e23294
Show file tree
Hide file tree
Showing 13 changed files with 365 additions and 129 deletions.
4 changes: 2 additions & 2 deletions pkg/bgp/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"strconv"
"strings"

"github.com/cloudnativelabs/kube-router/v2/pkg/utils"
"github.com/cloudnativelabs/kube-router/v2/pkg"
gobgp "github.com/osrg/gobgp/v3/pkg/packet/bgp"
)

Expand All @@ -21,7 +21,7 @@ const (
// GenerateRouterID will generate a router ID based upon the user's configuration (or lack there of) and the node's
// primary IP address if the user has not specified. If the user has configured the router ID as "generate" then we
// will generate a router ID based upon fnv hashing the node's primary IP address.
func GenerateRouterID(nodeIPAware utils.NodeIPAware, configRouterID string) (string, error) {
func GenerateRouterID(nodeIPAware pkg.NodeIPAware, configRouterID string) (string, error) {
switch {
case configRouterID == "generate":
h := fnv.New32a()
Expand Down
23 changes: 16 additions & 7 deletions pkg/bgp/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,20 @@ import (
"k8s.io/klog/v2"
)

func PathChanged(path *gobgpapi.Path, pl PeerLister, rs pkg.RouteSyncer, tc pkg.TunnelCleaner) error {
type PathHandler struct {
PeerLister PeerLister
RouteInjector pkg.RouteInjector
RouteSyncer pkg.RouteSyncer
TunnelCleaner pkg.TunnelCleaner
}

func (ph *PathHandler) Changed(path *gobgpapi.Path) error {
klog.V(2).Infof("Path Looks Like: %s", path.String())
dst, nextHop, err := ParsePath(path)
if err != nil {
return err
}
tunnelName := tc.GenerateTunnelName(nextHop.String())
tunnelName := ph.TunnelCleaner.GenerateTunnelName(nextHop.String())

// If we've made it this far, then it is likely that the node is holding a destination route for this path already.
// If the path we've received from GoBGP is a withdrawal, we should clean up any lingering routes that may exist
Expand All @@ -23,23 +30,25 @@ func PathChanged(path *gobgpapi.Path, pl PeerLister, rs pkg.RouteSyncer, tc pkg.
// The path might be withdrawn because the peer became unestablished or it may be withdrawn because just the
// path was withdrawn. Check to see if the peer is still established before deciding whether to clean the
// tunnel and tunnel routes or whether to just delete the destination route.
peerEstablished, err := IsPeerEstablished(pl, nextHop.String())
peerEstablished, err := IsPeerEstablished(ph.PeerLister, nextHop.String())
if err != nil {
klog.Errorf("encountered error while checking peer status: %v", err)
}
if err == nil && !peerEstablished {
klog.V(1).Infof("Peer '%s' was not found any longer, removing tunnel and routes",
nextHop.String())
// Also delete route from state map so that it doesn't get re-synced after deletion
rs.DelInjectedRoute(dst)
tc.CleanupTunnel(dst, tunnelName)
ph.RouteSyncer.DelInjectedRoute(dst)
ph.TunnelCleaner.CleanupTunnel(dst, tunnelName)
return nil
}

// Also delete route from state map so that it doesn't get re-synced after deletion
rs.DelInjectedRoute(dst)
ph.RouteSyncer.DelInjectedRoute(dst)
return nil
}

return nil
// If this is not a withdraw, then we need to process the route. This takes care of creating any necessary tunnels,
// and adding any necessary host routes depending on the user's config
return ph.RouteInjector.InjectRoute(dst, nextHop)
}
3 changes: 2 additions & 1 deletion pkg/controllers/netpol/network_policy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"
"time"

"github.com/cloudnativelabs/kube-router/v2/pkg"
"github.com/cloudnativelabs/kube-router/v2/pkg/healthcheck"
"github.com/cloudnativelabs/kube-router/v2/pkg/metrics"
"github.com/cloudnativelabs/kube-router/v2/pkg/options"
Expand Down Expand Up @@ -63,7 +64,7 @@ var (

// NetworkPolicyController struct to hold information required by NetworkPolicyController
type NetworkPolicyController struct {
krNode utils.NodeIPAndFamilyAware
krNode pkg.NodeIPAndFamilyAware
serviceClusterIPRanges []net.IPNet
serviceExternalIPRanges []net.IPNet
serviceLoadBalancerIPRanges []net.IPNet
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/proxy/network_services_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"syscall"
"time"

"github.com/cloudnativelabs/kube-router/v2/pkg"
"github.com/cloudnativelabs/kube-router/v2/pkg/healthcheck"
"github.com/cloudnativelabs/kube-router/v2/pkg/metrics"
"github.com/cloudnativelabs/kube-router/v2/pkg/options"
Expand Down Expand Up @@ -108,7 +109,7 @@ const (

// NetworkServicesController struct stores information needed by the controller
type NetworkServicesController struct {
krNode utils.NodeAware
krNode pkg.NodeAware
syncPeriod time.Duration
mu sync.Mutex
serviceMap serviceInfoMap
Expand Down
137 changes: 107 additions & 30 deletions pkg/controllers/routing/host_route_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@ import (
"sync"
"time"

"github.com/cloudnativelabs/kube-router/v2/pkg"
"github.com/cloudnativelabs/kube-router/v2/pkg/bgp"
"github.com/cloudnativelabs/kube-router/v2/pkg/routes"

gobgpapi "github.com/osrg/gobgp/v3/api"
gobgp "github.com/osrg/gobgp/v3/pkg/server"
"github.com/vishvananda/netlink"
"k8s.io/klog/v2"
)

type BGPServerUnsetError struct{}
type BGPPathListerUnsetError struct{}
type BGPListError struct {
msg string
err error
}

func (b BGPServerUnsetError) Error() string {
func (b BGPPathListerUnsetError) Error() string {
return "BGP server not yet specified"
}

Expand All @@ -46,13 +46,14 @@ func (b BGPListError) Unwrap() error {

// RouteSync is a struct that holds all of the information needed for syncing routes to the kernel's routing table
type RouteSync struct {
routeTableStateMap map[string]*netlink.Route
injectedRoutesSyncPeriod time.Duration
mutex sync.Mutex
routeReplacer func(route *netlink.Route) error
routeDeleter func(destinationSubnet *net.IPNet) error
routeAdder func(route *netlink.Route) error
bgpServer *gobgp.BgpServer
routeTableStateMap map[string]*netlink.Route
routeSyncPeriod time.Duration
mutex sync.Mutex
routeReplacer func(route *netlink.Route) error
routeDeleter func(destinationSubnet *net.IPNet) error
routeAdder func(route *netlink.Route) error
routeInjector pkg.RouteInjector
pathLister pkg.BGPPathLister
}

// addInjectedRoute adds a route to the route map that is regularly synced to the kernel's routing table
Expand All @@ -77,51 +78,114 @@ func (rs *RouteSync) DelInjectedRoute(dst *net.IPNet) {
}
}

func (rs *RouteSync) checkState(authoritativeState map[string]*netlink.Route) ([]*netlink.Route, []*netlink.Route) {
// While we're iterating over the state map, we should hold the mutex to prevent any other operations from
// interfering with the state map
rs.mutex.Lock()
defer rs.mutex.Unlock()

routesToAdd := make([]*netlink.Route, 0)
routesToDelete := make([]*netlink.Route, 0)

// Compare the routes source of truth from BGP to the routes in our state, searching for any routes that might be
// missing from the state and adding them if they are missing
for dst, route := range authoritativeState {
if _, ok := rs.routeTableStateMap[dst]; ok {
klog.V(3).Infof("Route already exists for destination: %s", dst)
continue
}

routesToAdd = append(routesToAdd, route)
}

// Compare the routes in our state to the routes source of truth from BGP, searching for any routes that might be
// missing from BGP and deleting them if they are missing
for dst, route := range rs.routeTableStateMap {
if _, ok := authoritativeState[dst]; ok {
klog.V(3).Infof("Route already exists for destination: %s", dst)
continue
}

routesToDelete = append(routesToDelete, route)
}

return routesToAdd, routesToDelete
}

func (rs *RouteSync) checkCacheAgainstBGP() error {
convertPathsToRouteMap := func(path []*gobgpapi.Path) map[string]*netlink.Route {
routeMap := make(map[string]*netlink.Route, 0)
for _, p := range path {
klog.V(3).Infof("Path: %v", p)

// Leave out withdraw paths from the map, we don't need to worry about tracking them because we are going to
// delete any routes not found in the map we're returning anyway
if p.IsWithdraw {
klog.V(3).Infof("Path is a withdrawal, skipping")
continue
}

// Seems like a valid path, let's parse it
dst, nh, err := bgp.ParsePath(p)
if err != nil {
klog.Warningf("Failed to parse BGP path, not failing so as to not block updating paths that are "+
"valid: %v", err)
}

// Add path to our map
routeMap[dst.String()] = &netlink.Route{
Dst: dst,
Gw: nh,
Protocol: routes.ZebraOriginator,
}
}

return routeMap
}

if rs.bgpServer == nil {
return BGPServerUnsetError{}
// During startup, it is possible for this function to possibly be called before the BGP server has been set on it,
// in this case, return BGPServerUnsetError
if rs.pathLister == nil {
return BGPPathListerUnsetError{}
}
rs.mutex.Lock()
defer rs.mutex.Unlock()
allPaths := make([]*gobgpapi.Path, 0)

// Create a var for tracking all of the paths we're about to get
allPaths := make([]*gobgpapi.Path, 0)
pathList := func(path *gobgpapi.Destination) {
allPaths = append(allPaths, path.Paths...)
}

// Call ListPath() for all families passing in our pathList function from above, to set allPaths
for _, family := range []*gobgpapi.Family{
{Afi: gobgpapi.Family_AFI_IP, Safi: gobgpapi.Family_SAFI_UNICAST},
{Afi: gobgpapi.Family_AFI_IP6, Safi: gobgpapi.Family_SAFI_UNICAST}} {
err := rs.bgpServer.ListPath(context.Background(), &gobgpapi.ListPathRequest{Family: family}, pathList)
err := rs.pathLister.ListPath(context.Background(), &gobgpapi.ListPathRequest{Family: family}, pathList)
if err != nil {
return newBGPListError("Failed to list BGP paths", err)
}
}

// Convert all paths to a map of routes, this serves as our authoritative source of truth for what routes should be
bgpRoutes := convertPathsToRouteMap(allPaths)

// REPLACE ME
for dst, route := range bgpRoutes {
if dst != "" && route != nil {
return nil
// Check the state of the routes against the authoritative source of truth
routesToAdd, routesToDelete := rs.checkState(bgpRoutes)

// Add missing routes
for _, route := range routesToAdd {
klog.Infof("Found route from BGP that did not exist in state, adding to state: %s", route)
err := rs.routeInjector.InjectRoute(route.Dst, route.Gw)
if err != nil {
klog.Errorf("Failed to inject route: %v", err)
}
}

// Delete routes that are no longer in the authoritative source of truth
for _, route := range routesToDelete {
klog.Infof("Found route in state that did not exist in BGP, deleting from state: %s", route)
err := rs.routeDeleter(route.Dst)
if err != nil {
klog.Errorf("Failed to delete route: %v", err)
}
}

Expand All @@ -148,12 +212,26 @@ func (rs *RouteSync) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
wg.Add(1)
go func(stopCh <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
t := time.NewTicker(rs.injectedRoutesSyncPeriod)
defer t.Stop()
t1 := time.NewTicker(rs.routeSyncPeriod)
// Check our local state against BGP once for every 5 route syncs
t2 := time.NewTicker(5 * rs.routeSyncPeriod)
defer t1.Stop()
for {
select {
case <-t.C:
case <-t1.C:
rs.SyncLocalRouteTable()
case <-t2.C:
err := rs.checkCacheAgainstBGP()
if err != nil {
switch err.(type) {
case BGPPathListerUnsetError:
klog.Warningf("BGP server not yet set, cannot check cache against BGP")
case BGPListError:
klog.Errorf("Failed to check cache against BGP due to BGP error: %v", err)
default:
klog.Errorf("Failed to check cache against BGP: %v", err)
}
}
case <-stopCh:
klog.Infof("Shutting down local route synchronization")
return
Expand All @@ -162,25 +240,24 @@ func (rs *RouteSync) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
}(stopCh, wg)
}

// addBGPServer adds a BGP server to the routeSyncer so that it can be used to advertise routes
//
//nolint:unused // we're going to implement this later
func (rs *RouteSync) addBGPServer(server *gobgp.BgpServer) {
rs.bgpServer = server
// addBGPPathLister adds a BGP server to the routeSyncer so that it can be used to advertise routes
func (rs *RouteSync) AddBGPPathLister(pl pkg.BGPPathLister) {
rs.pathLister = pl
}

// NewRouteSyncer creates a new routeSyncer that, when run, will sync routes kept in its local state table every
// syncPeriod
func NewRouteSyncer(syncPeriod time.Duration) *RouteSync {
func NewRouteSyncer(ri pkg.RouteInjector, syncPeriod time.Duration) *RouteSync {
rs := RouteSync{}
rs.routeTableStateMap = make(map[string]*netlink.Route)
rs.injectedRoutesSyncPeriod = syncPeriod
rs.routeSyncPeriod = syncPeriod
rs.mutex = sync.Mutex{}

// We substitute the RouteR* functions here so that we can easily monkey patch it in our unit tests
rs.routeReplacer = netlink.RouteReplace
rs.routeDeleter = routes.DeleteByDestination
rs.routeAdder = netlink.RouteAdd
rs.routeInjector = ri

return &rs
}
Loading

0 comments on commit 5e23294

Please sign in to comment.