Skip to content

Commit

Permalink
bpf/proxy: conntrack cleaner put outside
Browse files Browse the repository at this point in the history
Conntrack scanner is a standalone component that can be ues to various
tasks when periodically interating over the conntrack map and it is
handy to have it detached from the kubeproxy.
  • Loading branch information
tomastigera committed Dec 11, 2020
1 parent f7878a5 commit f1eabe8
Show file tree
Hide file tree
Showing 13 changed files with 375 additions and 266 deletions.
276 changes: 109 additions & 167 deletions bpf/conntrack/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,88 +49,6 @@ func DefaultTimeouts() Timeouts {
}
}

// ScanVerdict represents the set of values returned by EntryScan
type ScanVerdict int

const (
// ScanVerdictOK means entry is fine and should remain
ScanVerdictOK ScanVerdict = iota
// ScanVerdictDelete means entry should be deleted
ScanVerdictDelete
)

// EntryGet is a function prototype provided to EntryScanner in case it needs to
// evaluate other entries to make a verdict
type EntryGet func(Key) (Value, error)

// EntryScanner is a function prototype to be called on every entry by the scanner
type EntryScanner func(Key, Value, EntryGet) ScanVerdict

// Scanner iterates over a provided conntrack map and call a set of EntryScanner
// functions on each entry in the order as they were passed to NewScanner. If
// any of the EntryScanner returns ScanVerdictDelete, it deletes the entry, does
// not call any other EntryScanner and continues the iteration.
//
// It provides a delete-save iteration over the conntrack table for multiple
// evaluation functions, to keep their implementation simpler.
type Scanner struct {
ctMap bpf.Map
scanners []EntryScanner
}

// NewScanner returns a scanner for the given conntrack map and the set of
// EntryScanner. They are executed in the provided order on each entry.
func NewScanner(ctMap bpf.Map, scanners ...EntryScanner) *Scanner {
return &Scanner{
ctMap: ctMap,
scanners: scanners,
}
}

// Scan executes a scanning iteration
func (s *Scanner) Scan() {
debug := log.GetLevel() >= log.DebugLevel

var ctKey Key
var ctVal Value

err := s.ctMap.Iter(func(k, v []byte) bpf.IteratorAction {
copy(ctKey[:], k[:])
copy(ctVal[:], v[:])

if debug {
log.WithFields(log.Fields{
"key": ctKey,
"entry": ctVal,
}).Debug("Examining conntrack entry")
}

for _, scanner := range s.scanners {
if verdict := scanner(ctKey, ctVal, s.get); verdict == ScanVerdictDelete {
if debug {
log.Debug("Deleting conntrack entry.")
}
return bpf.IterDelete
}
}
return bpf.IterNone
})

if err != nil {
log.WithError(err).Warn("Failed to iterate over conntrack map")
}
}

func (s *Scanner) get(k Key) (Value, error) {
v, err := s.ctMap.Get(k.AsBytes())

if err != nil {
return Value{}, err
}

return ValueFromBytes(v), nil
}

type LivenessScanner struct {
timeouts Timeouts
dsr bool
Expand Down Expand Up @@ -163,7 +81,7 @@ func WithTimeShim(shim timeshim.Interface) LivenessScannerOpt {
}
}

func (l *LivenessScanner) ScanEntry(ctKey Key, ctVal Value, get EntryGet) ScanVerdict {
func (l *LivenessScanner) Check(ctKey Key, ctVal Value, get EntryGet) ScanVerdict {
if l.cachedKTime == 0 || l.time.Since(l.goTimeOfLastKTimeLookup) > time.Second {
l.cachedKTime = l.time.KTimeNanos()
l.goTimeOfLastKTimeLookup = l.time.Now()
Expand Down Expand Up @@ -264,103 +182,127 @@ func (l *LivenessScanner) EntryExpired(nowNanos int64, proto uint8, entry Value)
}

// NATChecker returns true a given combination of frontend-backend exists
type NATChecker func(frontIP net.IP, frontPort uint16, backIP net.IP, backPort uint16, proto uint8) bool
type NATChecker interface {
ConntrackScanStart()
ConntrackScanEnd()
ConntrackFrontendHasBackend(ip net.IP, port uint16, backendIP net.IP, backendPort uint16, proto uint8) bool
}

// StaleNATScanner removes any entries to frontend that do not have the backend anymore.
type StaleNATScanner struct {
natChecker NATChecker
}

// NewStaleNATScanner returns an EntryScanner that checks if entries have
// exisitng NAT entries using the provided NATChecker and if not, it deletes
// them.
func NewStaleNATScanner(frontendHasBackend NATChecker) EntryScanner {
func NewStaleNATScanner(frontendHasBackend NATChecker) *StaleNATScanner {
return &StaleNATScanner{
natChecker: frontendHasBackend,
}
}

// Check checks the conntrack entry
func (sns *StaleNATScanner) Check(k Key, v Value, _ EntryGet) ScanVerdict {
debug := log.GetLevel() >= log.DebugLevel

return func(k Key, v Value, _ EntryGet) ScanVerdict {
switch v.Type() {
case TypeNormal:
// skip non-NAT entry

case TypeNATReverse:
proto := k.Proto()
ipA := k.AddrA()
ipB := k.AddrB()

portA := k.PortA()
portB := k.PortB()

svcIP := v.OrigIP()
svcPort := v.OrigPort()

// We cannot tell which leg is EP and which is the client, we must
// try both. If there is a record for one of them, it is still most
// likely an active entry.
if !frontendHasBackend(svcIP, svcPort, ipA, portA, proto) &&
!frontendHasBackend(svcIP, svcPort, ipB, portB, proto) {
if debug {
log.WithField("key", k).Debugf("TypeNATReverse is stale")
}
return ScanVerdictDelete
}
switch v.Type() {
case TypeNormal:
// skip non-NAT entry

case TypeNATReverse:
proto := k.Proto()
ipA := k.AddrA()
ipB := k.AddrB()

portA := k.PortA()
portB := k.PortB()

svcIP := v.OrigIP()
svcPort := v.OrigPort()

// We cannot tell which leg is EP and which is the client, we must
// try both. If there is a record for one of them, it is still most
// likely an active entry.
if !sns.natChecker.ConntrackFrontendHasBackend(svcIP, svcPort, ipA, portA, proto) &&
!sns.natChecker.ConntrackFrontendHasBackend(svcIP, svcPort, ipB, portB, proto) {
if debug {
log.WithField("key", k).Debugf("TypeNATReverse still active")
log.WithField("key", k).Debugf("TypeNATReverse is stale")
}
return ScanVerdictDelete
}
if debug {
log.WithField("key", k).Debugf("TypeNATReverse still active")
}

case TypeNATForward:
proto := k.Proto()
kA := k.AddrA()
kAport := k.PortA()
kB := k.AddrB()
kBport := k.PortB()
revKey := v.ReverseNATKey()
revA := revKey.AddrA()
revAport := revKey.PortA()
revB := revKey.AddrB()
revBport := revKey.PortB()

var (
svcIP, epIP net.IP
svcPort, epPort uint16
)

// Because client IP/Port are both in fwd key and rev key, we can
// can tell which one it is and thus determine exactly meaning of
// the other values.
if kA.Equal(revA) && kAport == revAport {
epIP = revB
epPort = revBport
svcIP = kB
svcPort = kBport
} else if kB.Equal(revA) && kBport == revAport {
epIP = revB
epPort = revBport
svcIP = kA
svcPort = kAport
} else if kA.Equal(revB) && kAport == revBport {
epIP = revA
epPort = revAport
svcIP = kB
svcPort = kBport
} else if kB.Equal(revB) && kBport == revBport {
epIP = revA
epPort = revAport
svcIP = kA
svcPort = kAport
} else {
log.WithFields(log.Fields{"key": k, "value": v}).Error("Mismatch between key and rev key")
return ScanVerdictOK // don't touch, will get deleted when expired
}
case TypeNATForward:
proto := k.Proto()
kA := k.AddrA()
kAport := k.PortA()
kB := k.AddrB()
kBport := k.PortB()
revKey := v.ReverseNATKey()
revA := revKey.AddrA()
revAport := revKey.PortA()
revB := revKey.AddrB()
revBport := revKey.PortB()

var (
svcIP, epIP net.IP
svcPort, epPort uint16
)

// Because client IP/Port are both in fwd key and rev key, we can
// can tell which one it is and thus determine exactly meaning of
// the other values.
if kA.Equal(revA) && kAport == revAport {
epIP = revB
epPort = revBport
svcIP = kB
svcPort = kBport
} else if kB.Equal(revA) && kBport == revAport {
epIP = revB
epPort = revBport
svcIP = kA
svcPort = kAport
} else if kA.Equal(revB) && kAport == revBport {
epIP = revA
epPort = revAport
svcIP = kB
svcPort = kBport
} else if kB.Equal(revB) && kBport == revBport {
epIP = revA
epPort = revAport
svcIP = kA
svcPort = kAport
} else {
log.WithFields(log.Fields{"key": k, "value": v}).Error("Mismatch between key and rev key")
return ScanVerdictOK // don't touch, will get deleted when expired
}

if !frontendHasBackend(svcIP, svcPort, epIP, epPort, proto) {
if debug {
log.WithField("key", k).Debugf("TypeNATForward is stale")
}
return ScanVerdictDelete
}
if !sns.natChecker.ConntrackFrontendHasBackend(svcIP, svcPort, epIP, epPort, proto) {
if debug {
log.WithField("key", k).Debugf("TypeNATForward still active")
log.WithField("key", k).Debugf("TypeNATForward is stale")
}

default:
log.WithField("conntrack.Value.Type()", v.Type()).Warn("Unknown type")
return ScanVerdictDelete
}
if debug {
log.WithField("key", k).Debugf("TypeNATForward still active")
}

return ScanVerdictOK
default:
log.WithField("conntrack.Value.Type()", v.Type()).Warn("Unknown type")
}

return ScanVerdictOK
}

// IterationStart satisfies EntryScannerSynced
func (sns *StaleNATScanner) IterationStart() {
sns.natChecker.ConntrackScanStart()
}

// IterationEnd satisfies EntryScannerSynced
func (sns *StaleNATScanner) IterationEnd() {
sns.natChecker.ConntrackScanEnd()
}
22 changes: 18 additions & 4 deletions bpf/conntrack/conntrack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ var _ = Describe("BPF Conntrack LivenessCalculator", func() {
Expect(mockTime.KTimeNanos()).To(BeNumerically("==", now))
ctMap = mock.NewMockMap(conntrack.MapParams)
lc = conntrack.NewLivenessScanner(timeouts, false, conntrack.WithTimeShim(mockTime))
scanner = conntrack.NewScanner(ctMap, lc.ScanEntry)
scanner = conntrack.NewScanner(ctMap, lc)
})

DescribeTable(
Expand Down Expand Up @@ -145,6 +145,19 @@ var _ = Describe("BPF Conntrack LivenessCalculator", func() {
)
})

type dummyNATChecker struct {
check func(fIP net.IP, fPort uint16, bIP net.IP, bPort uint16, proto uint8) bool
}

func (d dummyNATChecker) ConntrackFrontendHasBackend(fIP net.IP, fPort uint16, bIP net.IP,
bPort uint16, proto uint8) bool {

return d.check(fIP, fPort, bIP, bPort, proto)
}

func (dummyNATChecker) ConntrackScanStart() {}
func (dummyNATChecker) ConntrackScanEnd() {}

var _ = Describe("BPF Conntrack StaleNATScanner", func() {

clientIP := net.IPv4(1, 1, 1, 1)
Expand All @@ -158,18 +171,19 @@ var _ = Describe("BPF Conntrack StaleNATScanner", func() {

DescribeTable("forward entries",
func(k conntrack.Key, v conntrack.Value, verdict conntrack.ScanVerdict) {
staleNATScanner := conntrack.NewStaleNATScanner(
func(fIP net.IP, fPort uint16, bIP net.IP, bPort uint16, proto uint8) bool {
staleNATScanner := conntrack.NewStaleNATScanner(dummyNATChecker{
check: func(fIP net.IP, fPort uint16, bIP net.IP, bPort uint16, proto uint8) bool {
Expect(proto).To(Equal(uint8(123)))
Expect(fIP.Equal(svcIP)).To(BeTrue())
Expect(fPort).To(Equal(svcPort))
Expect(bIP.Equal(backendIP)).To(BeTrue())
Expect(bPort).To(Equal(backendPort))
return false
},
},
)

Expect(verdict).To(Equal(staleNATScanner(k, v, nil)))
Expect(verdict).To(Equal(staleNATScanner.Check(k, v, nil)))
},
Entry("keyA - revA",
conntrack.NewKey(123, clientIP, clientPort, svcIP, svcPort),
Expand Down
Loading

0 comments on commit f1eabe8

Please sign in to comment.