Skip to content

Commit

Permalink
Add summarisation to the logs from the main dataplane loop.
Browse files Browse the repository at this point in the history
* Pass around a log accumulator.
* Record operations done on each iteration of the loop.
* Record duration of each apply.
* Log out the average and worst apply every 60s.
  • Loading branch information
fasaxc committed Nov 25, 2020
1 parent 51cdd68 commit 9fabc0b
Show file tree
Hide file tree
Showing 12 changed files with 218 additions and 24 deletions.
30 changes: 25 additions & 5 deletions dataplane/linux/int_dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (

"github.com/projectcalico/felix/bpf/state"
"github.com/projectcalico/felix/bpf/tc"

"github.com/projectcalico/felix/idalloc"

"k8s.io/client-go/kubernetes"
Expand All @@ -43,6 +42,8 @@ import (
cprometheus "github.com/projectcalico/libcalico-go/lib/prometheus"
"github.com/projectcalico/libcalico-go/lib/set"

lclogutils "github.com/projectcalico/libcalico-go/lib/logutils"

"github.com/projectcalico/felix/bpf"
"github.com/projectcalico/felix/bpf/arp"
"github.com/projectcalico/felix/bpf/conntrack"
Expand All @@ -60,7 +61,6 @@ import (
"github.com/projectcalico/felix/rules"
"github.com/projectcalico/felix/throttle"
"github.com/projectcalico/felix/wireguard"
lclogutils "github.com/projectcalico/libcalico-go/lib/logutils"
)

const (
Expand Down Expand Up @@ -271,6 +271,8 @@ type InternalDataplane struct {
endpointsSourceV4 endpointsSource
ipsetsSourceV4 ipsetsSource
callbacks *callbacks

logAccumulator *LogAccumulator
}

const (
Expand Down Expand Up @@ -323,6 +325,7 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
ifaceAddrUpdates: make(chan *ifaceAddrsUpdate, 100),
config: config,
applyThrottle: throttle.New(10),
logAccumulator: NewLogAccumulator(),
}
dp.applyThrottle.Refill() // Allow the first apply() immediately.
dp.ifaceMonitor.StateCallback = dp.onIfaceStateChange
Expand Down Expand Up @@ -683,7 +686,8 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
dp.fromDataplane <- &proto.WireguardStatusUpdate{PublicKey: publicKey.String()}
}
return nil
})
},
dp.logAccumulator)
dp.wireguardManager = newWireguardManager(cryptoRouteTableWireguard)
dp.RegisterManager(dp.wireguardManager) // IPv4-only

Expand Down Expand Up @@ -767,6 +771,19 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
dp.allIptablesTables = append(dp.allIptablesTables, dp.iptablesFilterTables...)
dp.allIptablesTables = append(dp.allIptablesTables, dp.iptablesRawTables...)

for _, t := range dp.allIptablesTables {
t.OpReporter = dp.logAccumulator
}
for _, ips := range dp.ipSets {
ips.OpReporter = dp.logAccumulator
}
for _, r := range dp.routeTableSyncers() {
switch r := r.(type) {
case *routetable.RouteTable:
r.OpReporter = dp.logAccumulator
}
}

// Register that we will report liveness and readiness.
if config.HealthAggregator != nil {
log.Info("Registering to report health.")
Expand Down Expand Up @@ -1450,7 +1467,7 @@ func (d *InternalDataplane) loopUpdatingDataplane() {
log.Info("Dataplane updates no longer throttled")
beingThrottled = false
}
log.Info("Applying dataplane updates")
log.Debug("Applying dataplane updates")
applyStart := time.Now()

// Actually apply the changes to the dataplane.
Expand All @@ -1464,13 +1481,16 @@ func (d *InternalDataplane) loopUpdatingDataplane() {
// Dataplane is still dirty, record an error.
countDataplaneSyncErrors.Inc()
}
log.WithField("msecToApply", applyTime.Seconds()*1000.0).Info(

d.logAccumulator.EndOfIteration(applyTime)
log.WithField("msecToApply", applyTime.Seconds()*1000.0).Debug(
"Finished applying updates to dataplane.")

if !d.doneFirstApply {
log.WithField(
"secsSinceStart", time.Since(processStartTime).Seconds(),
).Info("Completed first update to dataplane.")
d.logAccumulator.RecordOperation("first-update")
d.doneFirstApply = true
if d.config.PostInSyncCallback != nil {
d.config.PostInSyncCallback()
Expand Down
106 changes: 106 additions & 0 deletions dataplane/linux/summary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright (c) 2020 Tigera, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package intdataplane

import (
"sort"
"strings"
"sync"
"time"

log "github.com/sirupsen/logrus"

"github.com/projectcalico/libcalico-go/lib/set"
)

type DurationStat struct {
Name string
LogIfAbove time.Duration
}

type LogAccumulator struct {
lock sync.Mutex
lastLogTime time.Time

currentIteration *iteration
iterations []*iteration
}

func (l *LogAccumulator) Reset() {
l.iterations = l.iterations[:0]
}

type iteration struct {
Operations []string
Duration time.Duration
}

func (i *iteration) RecordOperation(name string) {
i.Operations = append(i.Operations, name)
}

func NewLogAccumulator() *LogAccumulator {
return &LogAccumulator{
currentIteration: &iteration{},
lastLogTime: time.Now(),
}
}

func (l *LogAccumulator) RecordOperation(name string) {
l.lock.Lock()
defer l.lock.Unlock()

l.currentIteration.RecordOperation(name)
}

// EndOfIteration should be called at the end of the loop, it will trigger logging of noteworthy logs.
func (l *LogAccumulator) EndOfIteration(duration time.Duration) {
l.lock.Lock()
defer l.lock.Unlock()

lastIteration := l.currentIteration
lastIteration.Duration = duration
l.iterations = append(l.iterations, lastIteration)
l.currentIteration = &iteration{}
if time.Since(l.lastLogTime) > time.Minute {
l.DoLog()
l.Reset()
l.lastLogTime = time.Now()
}
}

func (l *LogAccumulator) DoLog() {
numUpdates := len(l.iterations)
allOps := set.New()
var longestIteration *iteration
var sumOfDurations time.Duration
for _, it := range l.iterations {
allOps.AddAll(it.Operations)
sumOfDurations += it.Duration
if longestIteration == nil || it.Duration > longestIteration.Duration {
longestIteration = it
}
}
if longestIteration == nil {
return
}
avgDuration := (sumOfDurations / time.Duration(numUpdates)).Round(time.Millisecond)
longestOps := longestIteration.Operations
sort.Strings(longestOps)
log.Infof("Summarising %d dataplane reconciliation loops over %v: avg=%v longest=%v (%v)",
numUpdates, time.Since(l.lastLogTime).Round(100*time.Millisecond), avgDuration,
longestIteration.Duration.Round(time.Millisecond),
strings.Join(longestOps, ","))
}
4 changes: 2 additions & 2 deletions dataplane/linux/xdp_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (x *xdpState) ResyncIfNeeded(ipsSourceV4 ipsetsSource) error {
if i > 0 {
log.Info("Retrying after an XDP update failure...")
}
log.Info("Resyncing XDP state with dataplane.")
log.Debug("Resyncing XDP state with dataplane.")
err = x.tryResync(newConvertingIPSetsSource(ipsSourceV4))
if err == nil {
success = true
Expand Down Expand Up @@ -558,7 +558,7 @@ func (s *xdpIPState) getIPSetMembers(setID string, ipsSource ipsetsSource) (set.
func (s *xdpIPState) tryResync(common *xdpStateCommon, ipsSource ipsetsSource) error {
resyncStart := time.Now()
defer func() {
s.logCxt.WithField("resyncDuration", time.Since(resyncStart)).Info("Finished XDP resync.")
s.logCxt.WithField("resyncDuration", time.Since(resyncStart)).Debug("Finished XDP resync.")
}()
s.ipsetIDsToMembers.Clear()
resyncState, err := s.newXDPResyncState(common.bpfLib, ipsSource, common.programTag, common.xdpModes)
Expand Down
1 change: 0 additions & 1 deletion fv/named_ports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ func describeNamedPortTests(testSourcePorts bool, protocol string) {
})

AfterEach(func() {

if CurrentGinkgoTestDescription().Failed {
log.Warn("Test failed, dumping diags...")
utils.Run("docker", "logs", felix.Name)
Expand Down
9 changes: 9 additions & 0 deletions ipsets/ipset_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,15 @@ func (f IPFamily) IsValid() bool {
}
return false
}
func (f IPFamily) Version() int {
switch f {
case IPFamilyV4:
return 4
case IPFamilyV6:
return 6
}
return 0
}

// IPSetMetadata contains the metadata for a particular IP set, such as its name, type and size.
type IPSetMetadata struct {
Expand Down
25 changes: 19 additions & 6 deletions ipsets/ipsets.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2017-2019 Tigera, Inc. All rights reserved.
// Copyright (c) 2017-2020 Tigera, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -67,6 +67,12 @@ type IPSets struct {
// stderrCopy holds a copy of the the stderr emitted by ipset restore. It is reset after
// each use.
stderrCopy bytes.Buffer

OpReporter OpReporter
}

type OpReporter interface {
RecordOperation(name string)
}

func NewIPSets(ipVersionConfig *IPVersionConfig) *IPSets {
Expand Down Expand Up @@ -220,7 +226,7 @@ func (s *IPSets) RemoveMembers(setID string, removedMembers []string) {

// QueueResync forces a resync with the dataplane on the next ApplyUpdates() call.
func (s *IPSets) QueueResync() {
s.logCxt.Info("Asked to resync with the dataplane on next update.")
s.logCxt.Debug("Asked to resync with the dataplane on next update.")
s.resyncRequired = true
}

Expand Down Expand Up @@ -303,16 +309,19 @@ func (s *IPSets) ApplyUpdates() {
if s.resyncRequired {
// Compare our in-memory state against the dataplane and queue up
// modifications to fix any inconsistencies.
s.logCxt.Info("Resyncing ipsets with dataplane.")
s.logCxt.Debug("Resyncing ipsets with dataplane.")
if s.OpReporter != nil {
s.OpReporter.RecordOperation(fmt.Sprint("resync-ipsets-v", s.IPVersionConfig.Family.Version()))
}
numProblems, err := s.tryResync()
if err != nil {
s.logCxt.WithError(err).Warning("Failed to resync with dataplane")
backOff()
continue
}
if numProblems > 0 {
s.logCxt.WithField("numProblems", numProblems).Info(
"Found inconsistencies in dataplane")
s.logCxt.WithField("numProblems", numProblems).Warn(
"Found inconsistencies in IP sets in dataplane")
}
s.resyncRequired = false
}
Expand Down Expand Up @@ -353,7 +362,7 @@ func (s *IPSets) tryResync() (numProblems int, err error) {
s.logCxt.WithFields(log.Fields{
"resyncDuration": time.Since(resyncStart),
"numInconsistenciesFound": numProblems,
}).Info("Finished resync")
}).Debug("Finished IPSets resync")
}()

// Start an 'ipset list' child process, which will emit output of the following form:
Expand Down Expand Up @@ -622,6 +631,10 @@ func (s *IPSets) tryUpdates() error {
return nil
}

if s.OpReporter != nil {
s.OpReporter.RecordOperation(fmt.Sprint("update-ipsets-", s.IPVersionConfig.Family.Version()))
}

// Set up an ipset restore session.
countNumIPSetCalls.Inc()
cmd := s.newCmd("ipset", "restore")
Expand Down
15 changes: 13 additions & 2 deletions iptables/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ const (
minPostWriteInterval = 50 * time.Millisecond
)

type OpReporter interface {
RecordOperation(name string)
}

var (
// List of all the top-level kernel-created chains by iptables table.
tableToKernelChains = map[string][]string{
Expand Down Expand Up @@ -275,6 +279,7 @@ type Table struct {
lookPath func(file string) (string, error)

onStillAlive func()
OpReporter OpReporter
}

type TableOptions struct {
Expand Down Expand Up @@ -593,7 +598,10 @@ func (t *Table) loadDataplaneState() {
t.featureDetector.RefreshFeatures()

// Load the hashes from the dataplane.
t.logCxt.Info("Loading current iptables state and checking it is correct.")
t.logCxt.Debug("Loading current iptables state and checking it is correct.")
if t.OpReporter != nil {
t.OpReporter.RecordOperation(fmt.Sprintf("resync-%v-v%d", t.Name, t.IPVersion))
}
t.lastReadTime = t.timeNow()
dataplaneHashes, dataplaneRules := t.getHashesAndRulesFromDataplane()

Expand Down Expand Up @@ -931,7 +939,7 @@ func (t *Table) InvalidateDataplaneCache(reason string) {
logCxt.Debug("Would invalidate dataplane cache but it was already invalid.")
return
}
logCxt.Info("Invalidating dataplane cache")
logCxt.Debug("Invalidating dataplane cache")
t.inSyncWithDataPlane = false
}

Expand Down Expand Up @@ -1259,6 +1267,9 @@ func (t *Table) applyUpdates() error {
} else {
// Get the contents of the buffer ready to send to iptables-restore. Warning: for perf, this is directly
// accessing the buffer's internal array; don't touch the buffer after this point.
if t.OpReporter != nil {
t.OpReporter.RecordOperation(fmt.Sprintf("update-%v-v%d", t.Name, t.IPVersion))
}
inputBytes := buf.GetBytesAndReset()

if log.GetLevel() >= log.DebugLevel {
Expand Down
4 changes: 2 additions & 2 deletions k8sfv/pod_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2017 Tigera, Inc. All rights reserved.
// Copyright (c) 2017,2020 Tigera, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -21,8 +21,8 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
log "github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

var _ = Context("with a k8s clientset", func() {
Expand Down
Loading

0 comments on commit 9fabc0b

Please sign in to comment.