Skip to content

Commit

Permalink
Merge pull request projectcalico#2632 from tomastigera/tomas-conntrac…
Browse files Browse the repository at this point in the history
…k-scanner

BPF conntrack scanner moved outside of kube-proxy
  • Loading branch information
tomastigera authored Dec 14, 2020
2 parents 74f2816 + 35a646b commit 0d79f37
Show file tree
Hide file tree
Showing 13 changed files with 390 additions and 279 deletions.
300 changes: 122 additions & 178 deletions bpf/conntrack/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,88 +49,6 @@ func DefaultTimeouts() Timeouts {
}
}

// ScanVerdict represents the set of values returned by EntryScan
type ScanVerdict int

const (
// ScanVerdictOK means entry is fine and should remain
ScanVerdictOK ScanVerdict = iota
// ScanVerdictDelete means entry should be deleted
ScanVerdictDelete
)

// EntryGet is a function prototype provided to EntryScanner in case it needs to
// evaluate other entries to make a verdict
type EntryGet func(Key) (Value, error)

// EntryScanner is a function prototype to be called on every entry by the scanner
type EntryScanner func(Key, Value, EntryGet) ScanVerdict

// Scanner iterates over a provided conntrack map and call a set of EntryScanner
// functions on each entry in the order as they were passed to NewScanner. If
// any of the EntryScanner returns ScanVerdictDelete, it deletes the entry, does
// not call any other EntryScanner and continues the iteration.
//
// It provides a delete-save iteration over the conntrack table for multiple
// evaluation functions, to keep their implementation simpler.
type Scanner struct {
ctMap bpf.Map
scanners []EntryScanner
}

// NewScanner returns a scanner for the given conntrack map and the set of
// EntryScanner. They are executed in the provided order on each entry.
func NewScanner(ctMap bpf.Map, scanners ...EntryScanner) *Scanner {
return &Scanner{
ctMap: ctMap,
scanners: scanners,
}
}

// Scan executes a scanning iteration
func (s *Scanner) Scan() {
debug := log.GetLevel() >= log.DebugLevel

var ctKey Key
var ctVal Value

err := s.ctMap.Iter(func(k, v []byte) bpf.IteratorAction {
copy(ctKey[:], k[:])
copy(ctVal[:], v[:])

if debug {
log.WithFields(log.Fields{
"key": ctKey,
"entry": ctVal,
}).Debug("Examining conntrack entry")
}

for _, scanner := range s.scanners {
if verdict := scanner(ctKey, ctVal, s.get); verdict == ScanVerdictDelete {
if debug {
log.Debug("Deleting conntrack entry.")
}
return bpf.IterDelete
}
}
return bpf.IterNone
})

if err != nil {
log.WithError(err).Warn("Failed to iterate over conntrack map")
}
}

func (s *Scanner) get(k Key) (Value, error) {
v, err := s.ctMap.Get(k.AsBytes())

if err != nil {
return Value{}, err
}

return ValueFromBytes(v), nil
}

type LivenessScanner struct {
timeouts Timeouts
dsr bool
Expand Down Expand Up @@ -163,7 +81,7 @@ func WithTimeShim(shim timeshim.Interface) LivenessScannerOpt {
}
}

func (l *LivenessScanner) ScanEntry(ctKey Key, ctVal Value, get EntryGet) ScanVerdict {
func (l *LivenessScanner) Check(ctKey Key, ctVal Value, get EntryGet) ScanVerdict {
if l.cachedKTime == 0 || l.time.Since(l.goTimeOfLastKTimeLookup) > time.Second {
l.cachedKTime = l.time.KTimeNanos()
l.goTimeOfLastKTimeLookup = l.time.Now()
Expand All @@ -190,7 +108,7 @@ func (l *LivenessScanner) ScanEntry(ctKey Key, ctVal Value, get EntryGet) ScanVe
log.WithError(err).Warn("Failed to look up conntrack entry.")
return ScanVerdictOK
}
if reason, expired := l.EntryExpired(now, ctKey.Proto(), revEntry); expired {
if reason, expired := l.timeouts.EntryExpired(now, ctKey.Proto(), revEntry); expired {
if debug {
log.WithField("reason", reason).Debug("Deleting expired conntrack forward-NAT entry")
}
Expand All @@ -200,14 +118,14 @@ func (l *LivenessScanner) ScanEntry(ctKey Key, ctVal Value, get EntryGet) ScanVe
// it once we come across it again.
}
case TypeNATReverse:
if reason, expired := l.EntryExpired(now, ctKey.Proto(), ctVal); expired {
if reason, expired := l.timeouts.EntryExpired(now, ctKey.Proto(), ctVal); expired {
if debug {
log.WithField("reason", reason).Debug("Deleting expired conntrack reverse-NAT entry")
}
return ScanVerdictDelete
}
case TypeNormal:
if reason, expired := l.EntryExpired(now, ctKey.Proto(), ctVal); expired {
if reason, expired := l.timeouts.EntryExpired(now, ctKey.Proto(), ctVal); expired {
if debug {
log.WithField("reason", reason).Debug("Deleting expired normal conntrack entry")
}
Expand All @@ -220,9 +138,11 @@ func (l *LivenessScanner) ScanEntry(ctKey Key, ctVal Value, get EntryGet) ScanVe
return ScanVerdictOK
}

func (l *LivenessScanner) EntryExpired(nowNanos int64, proto uint8, entry Value) (reason string, expired bool) {
// EntryExpired checks whether a given conntrack table entry for a given
// protocol and time, is expired.
func (t *Timeouts) EntryExpired(nowNanos int64, proto uint8, entry Value) (reason string, expired bool) {
sinceCreation := time.Duration(nowNanos - entry.Created())
if sinceCreation < l.timeouts.CreationGracePeriod {
if sinceCreation < t.CreationGracePeriod {
log.Debug("Conntrack entry in creation grace period. Ignoring.")
return
}
Expand All @@ -232,135 +152,159 @@ func (l *LivenessScanner) EntryExpired(nowNanos int64, proto uint8, entry Value)
dsr := entry.IsForwardDSR()
data := entry.Data()
rstSeen := data.RSTSeen()
if rstSeen && age > l.timeouts.TCPResetSeen {
if rstSeen && age > t.TCPResetSeen {
return "RST seen", true
}
finsSeen := (dsr && data.FINsSeenDSR()) || data.FINsSeen()
if finsSeen && age > l.timeouts.TCPFinsSeen {
if finsSeen && age > t.TCPFinsSeen {
// Both legs have been finished, tear down.
return "FINs seen", true
}
if data.Established() || dsr {
if age > l.timeouts.TCPEstablished {
if age > t.TCPEstablished {
return "no traffic on established flow for too long", true
}
} else {
if age > l.timeouts.TCPPreEstablished {
if age > t.TCPPreEstablished {
return "no traffic on pre-established flow for too long", true
}
}
return "", false
case ProtoICMP:
if age > l.timeouts.ICMPLastSeen {
if age > t.ICMPLastSeen {
return "no traffic on ICMP flow for too long", true
}
default:
// FIXME separate timeouts for non-UDP IP traffic?
if age > l.timeouts.UDPLastSeen {
if age > t.UDPLastSeen {
return "no traffic on UDP flow for too long", true
}
}
return "", false
}

// NATChecker returns true a given combination of frontend-backend exists
type NATChecker func(frontIP net.IP, frontPort uint16, backIP net.IP, backPort uint16, proto uint8) bool
type NATChecker interface {
ConntrackScanStart()
ConntrackScanEnd()
ConntrackFrontendHasBackend(ip net.IP, port uint16, backendIP net.IP, backendPort uint16, proto uint8) bool
}

// StaleNATScanner removes any entries to frontend that do not have the backend anymore.
type StaleNATScanner struct {
natChecker NATChecker
}

// NewStaleNATScanner returns an EntryScanner that checks if entries have
// exisitng NAT entries using the provided NATChecker and if not, it deletes
// them.
func NewStaleNATScanner(frontendHasBackend NATChecker) EntryScanner {
func NewStaleNATScanner(frontendHasBackend NATChecker) *StaleNATScanner {
return &StaleNATScanner{
natChecker: frontendHasBackend,
}
}

// Check checks the conntrack entry
func (sns *StaleNATScanner) Check(k Key, v Value, _ EntryGet) ScanVerdict {
debug := log.GetLevel() >= log.DebugLevel

return func(k Key, v Value, _ EntryGet) ScanVerdict {
switch v.Type() {
case TypeNormal:
// skip non-NAT entry

case TypeNATReverse:
proto := k.Proto()
ipA := k.AddrA()
ipB := k.AddrB()

portA := k.PortA()
portB := k.PortB()

svcIP := v.OrigIP()
svcPort := v.OrigPort()

// We cannot tell which leg is EP and which is the client, we must
// try both. If there is a record for one of them, it is still most
// likely an active entry.
if !frontendHasBackend(svcIP, svcPort, ipA, portA, proto) &&
!frontendHasBackend(svcIP, svcPort, ipB, portB, proto) {
if debug {
log.WithField("key", k).Debugf("TypeNATReverse is stale")
}
return ScanVerdictDelete
}
switch v.Type() {
case TypeNormal:
// skip non-NAT entry

case TypeNATReverse:
proto := k.Proto()
ipA := k.AddrA()
ipB := k.AddrB()

portA := k.PortA()
portB := k.PortB()

svcIP := v.OrigIP()
svcPort := v.OrigPort()

// We cannot tell which leg is EP and which is the client, we must
// try both. If there is a record for one of them, it is still most
// likely an active entry.
if !sns.natChecker.ConntrackFrontendHasBackend(svcIP, svcPort, ipA, portA, proto) &&
!sns.natChecker.ConntrackFrontendHasBackend(svcIP, svcPort, ipB, portB, proto) {
if debug {
log.WithField("key", k).Debugf("TypeNATReverse still active")
log.WithField("key", k).Debugf("TypeNATReverse is stale")
}
return ScanVerdictDelete
}
if debug {
log.WithField("key", k).Debugf("TypeNATReverse still active")
}

case TypeNATForward:
proto := k.Proto()
kA := k.AddrA()
kAport := k.PortA()
kB := k.AddrB()
kBport := k.PortB()
revKey := v.ReverseNATKey()
revA := revKey.AddrA()
revAport := revKey.PortA()
revB := revKey.AddrB()
revBport := revKey.PortB()

var (
svcIP, epIP net.IP
svcPort, epPort uint16
)

// Because client IP/Port are both in fwd key and rev key, we can
// can tell which one it is and thus determine exactly meaning of
// the other values.
if kA.Equal(revA) && kAport == revAport {
epIP = revB
epPort = revBport
svcIP = kB
svcPort = kBport
} else if kB.Equal(revA) && kBport == revAport {
epIP = revB
epPort = revBport
svcIP = kA
svcPort = kAport
} else if kA.Equal(revB) && kAport == revBport {
epIP = revA
epPort = revAport
svcIP = kB
svcPort = kBport
} else if kB.Equal(revB) && kBport == revBport {
epIP = revA
epPort = revAport
svcIP = kA
svcPort = kAport
} else {
log.WithFields(log.Fields{"key": k, "value": v}).Error("Mismatch between key and rev key")
return ScanVerdictOK // don't touch, will get deleted when expired
}
case TypeNATForward:
proto := k.Proto()
kA := k.AddrA()
kAport := k.PortA()
kB := k.AddrB()
kBport := k.PortB()
revKey := v.ReverseNATKey()
revA := revKey.AddrA()
revAport := revKey.PortA()
revB := revKey.AddrB()
revBport := revKey.PortB()

var (
svcIP, epIP net.IP
svcPort, epPort uint16
)

// Because client IP/Port are both in fwd key and rev key, we can
// can tell which one it is and thus determine exactly meaning of
// the other values.
if kA.Equal(revA) && kAport == revAport {
epIP = revB
epPort = revBport
svcIP = kB
svcPort = kBport
} else if kB.Equal(revA) && kBport == revAport {
epIP = revB
epPort = revBport
svcIP = kA
svcPort = kAport
} else if kA.Equal(revB) && kAport == revBport {
epIP = revA
epPort = revAport
svcIP = kB
svcPort = kBport
} else if kB.Equal(revB) && kBport == revBport {
epIP = revA
epPort = revAport
svcIP = kA
svcPort = kAport
} else {
log.WithFields(log.Fields{"key": k, "value": v}).Error("Mismatch between key and rev key")
return ScanVerdictOK // don't touch, will get deleted when expired
}

if !frontendHasBackend(svcIP, svcPort, epIP, epPort, proto) {
if debug {
log.WithField("key", k).Debugf("TypeNATForward is stale")
}
return ScanVerdictDelete
}
if !sns.natChecker.ConntrackFrontendHasBackend(svcIP, svcPort, epIP, epPort, proto) {
if debug {
log.WithField("key", k).Debugf("TypeNATForward still active")
log.WithField("key", k).Debugf("TypeNATForward is stale")
}

default:
log.WithField("conntrack.Value.Type()", v.Type()).Warn("Unknown type")
return ScanVerdictDelete
}
if debug {
log.WithField("key", k).Debugf("TypeNATForward still active")
}

return ScanVerdictOK
default:
log.WithField("conntrack.Value.Type()", v.Type()).Warn("Unknown type")
}

return ScanVerdictOK
}

// IterationStart satisfies EntryScannerSynced
func (sns *StaleNATScanner) IterationStart() {
sns.natChecker.ConntrackScanStart()
}

// IterationEnd satisfies EntryScannerSynced
func (sns *StaleNATScanner) IterationEnd() {
sns.natChecker.ConntrackScanEnd()
}
Loading

0 comments on commit 0d79f37

Please sign in to comment.