Skip to content

Commit

Permalink
Merge tag '1.22.3' into tetrate-release-1.22
Browse files Browse the repository at this point in the history
Istio release 1.22.3
  • Loading branch information
github-actions committed Jul 17, 2024
2 parents e298fde + 1e0dc8d commit 6ae7557
Show file tree
Hide file tree
Showing 194 changed files with 3,772 additions and 1,651 deletions.
2 changes: 1 addition & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "istio build-tools",
"image": "gcr.io/istio-testing/build-tools:release-1.22-90c1573ac8a673ef69c7d0587232efa748243fac",
"image": "gcr.io/istio-testing/build-tools:release-1.22-46fce460ef8547fb88a20de8494683bfb3bfa8e5",
"privileged": true,
"remoteEnv": {
"USE_GKE_GCLOUD_AUTH_PLUGIN": "True",
Expand Down
2 changes: 1 addition & 1 deletion Makefile.core.mk
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ endif
export VERSION

# Base version of Istio image to use
BASE_VERSION ?= master-2024-04-19T19-01-19
BASE_VERSION ?= 1.22-2024-06-27T19-01-41
ISTIO_BASE_REGISTRY ?= gcr.io/istio-release

export GO111MODULE ?= on
Expand Down
11 changes: 5 additions & 6 deletions cni/pkg/iptables/iptables_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ func forEachInpodMarkIPRule(cfg *Config, f func(*netlink.Rule) error) error {
}

func AddLoopbackRoutes(cfg *Config) error {
return forEachLoopbackRoute(cfg, netlink.RouteReplace)
return forEachLoopbackRoute(cfg, "add", netlink.RouteReplace)
}

func DelLoopbackRoutes(cfg *Config) error {
return forEachLoopbackRoute(cfg, netlink.RouteDel)
return forEachLoopbackRoute(cfg, "remove", netlink.RouteDel)
}

func forEachLoopbackRoute(cfg *Config, f func(*netlink.Route) error) error {
func forEachLoopbackRoute(cfg *Config, operation string, f func(*netlink.Route) error) error {
loopbackLink, err := netlink.LinkByName("lo")
if err != nil {
return fmt.Errorf("failed to find 'lo' link: %v", err)
Expand Down Expand Up @@ -108,10 +108,9 @@ func forEachLoopbackRoute(cfg *Config, f func(*netlink.Route) error) error {
}

for _, route := range netlinkRoutes {
log.Debugf("Iterating netlink route : %+v", route)
log.Debugf("Iterating netlink route: %+v", route)
if err := f(route); err != nil {
log.Errorf("Failed to add netlink route : %+v", route)
return fmt.Errorf("failed to add route: %v", err)
return fmt.Errorf("failed to %v route (%+v): %v", operation, route, err)
}
}
}
Expand Down
62 changes: 51 additions & 11 deletions cni/pkg/log/uds.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ type UDSLogger struct {
}

type cniLog struct {
Level string `json:"level"`
Time time.Time `json:"time"`
Msg string `json:"msg"`
Level string `json:"level"`
Time time.Time `json:"time"`
Msg string `json:"msg"`
Arbitrary map[string]any `json:"-"`
}

func NewUDSLogger() *UDSLogger {
Expand Down Expand Up @@ -103,29 +104,68 @@ func (l *UDSLogger) processLog(body []byte) {
}
messages := make([]cniLog, 0, len(cniLogs))
for _, l := range cniLogs {
var msg cniLog
if err := json.Unmarshal([]byte(l), &msg); err != nil {
log.Debugf("Failed to unmarshal CNI plugin log entry: %v", err)
msg, ok := parseCniLog(l)
if !ok {
continue
}
msg.Msg = strings.TrimSpace(msg.Msg)
messages = append(messages, msg)
}
// Lock log message printing to prevent log messages from different CNI
// processes interleave.
l.mu.Lock()
defer l.mu.Unlock()
for _, m := range messages {
logger := pluginLog
// For any k/v pairs, add them back
for k, v := range m.Arbitrary {
logger = logger.WithLabels(k, v)
}
// There is no fatal log from CNI plugin
switch m.Level {
case "debug":
pluginLog.LogWithTime(log.DebugLevel, m.Msg, m.Time)
logger.LogWithTime(log.DebugLevel, m.Msg, m.Time)
case "info":
pluginLog.LogWithTime(log.InfoLevel, m.Msg, m.Time)
logger.LogWithTime(log.InfoLevel, m.Msg, m.Time)
case "warn":
pluginLog.LogWithTime(log.WarnLevel, m.Msg, m.Time)
logger.LogWithTime(log.WarnLevel, m.Msg, m.Time)
case "error":
pluginLog.LogWithTime(log.ErrorLevel, m.Msg, m.Time)
logger.LogWithTime(log.ErrorLevel, m.Msg, m.Time)
}
}
}

// parseCniLog is tricky because we have known and arbitrary fields in the log, and Go doesn't make that very easy
func parseCniLog(l string) (cniLog, bool) {
var raw map[string]json.RawMessage
if err := json.Unmarshal([]byte(l), &raw); err != nil {
log.Debugf("Failed to unmarshal CNI plugin log entry: %v", err)
return cniLog{}, false
}
var msg cniLog
if err := json.Unmarshal(raw["msg"], &msg.Msg); err != nil {
log.Debugf("Failed to unmarshal CNI plugin log entry: %v", err)
return cniLog{}, false
}
if err := json.Unmarshal(raw["level"], &msg.Level); err != nil {
log.Debugf("Failed to unmarshal CNI plugin log entry: %v", err)
return cniLog{}, false
}
if err := json.Unmarshal(raw["time"], &msg.Time); err != nil {
log.Debugf("Failed to unmarshal CNI plugin log entry: %v", err)
return cniLog{}, false
}
delete(raw, "msg")
delete(raw, "level")
delete(raw, "time")
msg.Arbitrary = make(map[string]any, len(raw))
for k, v := range raw {
var res any
if err := json.Unmarshal(v, &res); err != nil {
log.Debugf("Failed to unmarshal CNI plugin log entry: %v", err)
continue
}
msg.Arbitrary[k] = res
}
msg.Msg = strings.TrimSpace(msg.Msg)
return msg, true
}
89 changes: 81 additions & 8 deletions cni/pkg/log/uds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
package log

import (
"encoding/json"
"io"
"os"
"path/filepath"
"strings"
"testing"
"time"

"istio.io/istio/cni/pkg/constants"
"istio.io/istio/pkg/log"
"istio.io/istio/pkg/ptr"
"istio.io/istio/pkg/test/util/assert"
)

Expand All @@ -41,13 +44,15 @@ func TestUDSLog(t *testing.T) {
r, w, _ := os.Pipe()
os.Stdout = w
loggingOptions := log.DefaultOptions()
loggingOptions.JSONEncoding = true
loggingOptions.WithTeeToUDS(udsSock, constants.UDSLogPath)
assert.NoError(t, log.Configure(loggingOptions))
log.FindScope("default").SetOutputLevel(log.DebugLevel)
log.Debug("debug log")
log.Info("info log")
log.Warn("warn log")
log.Error("error log")
log.WithLabels("key", 2).Infof("with labels")
// This will error because stdout cannot sync, but the UDS part should sync
// Ideally we would fail if the UDS part fails but the error library makes it kind of tricky
_ = log.Sync()
Expand All @@ -60,20 +65,88 @@ func TestUDSLog(t *testing.T) {
out, err := io.ReadAll(r)
assert.NoError(t, err)

cases := []struct {
level string
msg string
key *float64
}{
{"debug", "debug log", nil},
{"info", "info log", nil},
{"warn", "warn log", nil},
{"error", "error log", nil},
{"info", "with labels", ptr.Of(float64(2))},
}
// For each level, there should be two lines, one from direct log,
// the other one from UDS server
wantLevels := []string{"debug", "info", "warn", "error", "debug", "info", "warn", "error"}
gotLogs := strings.Split(
strings.TrimSuffix(string(out), "\n"), "\n")
if want, got := len(wantLevels), len(gotLogs); want != got {
if want, got := len(cases)*2, len(gotLogs); want != got {
t.Fatalf("Number of logs want %v, got %v logs: %v", want, got, gotLogs)
}

for i, l := range gotLogs {
// For each line, there should be two level string, e.g.
// "2021-07-09T03:26:08.984951Z debug debug log"
if got, want := strings.Count(l, wantLevels[i]), 2; want != got {
t.Errorf("Number of log level string want %v, got %v", want, got)
i := 0
for _, l := range gotLogs {
var parsedLog map[string]any
assert.NoError(t, json.Unmarshal([]byte(l), &parsedLog))
if parsedLog["scope"] != "cni-plugin" {
// Each log is 2x: one direct, and one over UDS. Just test the UDS one
continue
}
// remove scope since it is constant and not needed to test
delete(parsedLog, "scope")
// check time is there
if _, f := parsedLog["time"]; !f {
t.Fatalf("log %v did not have time", i)
}
// but remove time since it changes on each test
delete(parsedLog, "time")
want := map[string]any{
"level": cases[i].level,
"msg": cases[i].msg,
}
if k := cases[i].key; k != nil {
want["key"] = *k
}
assert.Equal(t, want, parsedLog)
i++
}
}

func TestParseCniLog(t *testing.T) {
wantT := &time.Time{}
assert.NoError(t, wantT.UnmarshalText([]byte("2020-01-01T00:00:00.356374Z")))
cases := []struct {
name string
in string
out cniLog
}{
{
"without keys",
`{"level":"info","time":"2020-01-01T00:00:00.356374Z","msg":"my message"}`,
cniLog{
Level: "info",
Time: *wantT,
Msg: "my message",
Arbitrary: nil,
},
},
{
"with keys",
`{"level":"info","time":"2020-01-01T00:00:00.356374Z","msg":"my message","key":"string value","bar":2}`,
cniLog{
Level: "info",
Time: *wantT,
Msg: "my message",
Arbitrary: map[string]any{
"key": "string value",
"bar": float64(2),
},
},
},
}
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
got, _ := parseCniLog(tt.in)
assert.Equal(t, got, tt.out)
})
}
}
59 changes: 37 additions & 22 deletions cni/pkg/nodeagent/cni-watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (

pconstants "istio.io/istio/cni/pkg/constants"
"istio.io/istio/cni/pkg/pluginlistener"
istiolog "istio.io/istio/pkg/log"
"istio.io/istio/pkg/sleep"
)

// Just a composite of the CNI plugin add event struct + some extracted "args"
Expand Down Expand Up @@ -124,19 +126,19 @@ func (s *CniPluginServer) handleAddEvent(w http.ResponseWriter, req *http.Reques
defer req.Body.Close()
data, err := io.ReadAll(req.Body)
if err != nil {
log.Errorf("Failed to read event report from cni plugin: %v", err)
log.Errorf("failed to read event report from cni plugin: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
msg, err := processAddEvent(data)
if err != nil {
log.Errorf("Failed to process CNI event payload: %v", err)
log.Errorf("failed to process CNI event payload: %v", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

if err := s.ReconcileCNIAddEvent(req.Context(), msg); err != nil {
log.Errorf("Failed to handle add event: %v", err)
log.WithLabels("ns", msg.PodNamespace, "name", msg.PodName).Errorf("failed to handle add event: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -161,25 +163,9 @@ func (s *CniPluginServer) ReconcileCNIAddEvent(ctx context.Context, addCmd CNIPl

// The CNI node plugin should have already checked the pod against the k8s API before forwarding us the event,
// but we have to invoke the K8S client anyway, so to be safe we check it again here to make sure we get the same result.
maxStaleRetries := 10
msInterval := 10
retries := 0
var ambientPod *corev1.Pod
var err error

log.Debugf("Checking pod: %s in ns: %s is enabled for ambient", addCmd.PodName, addCmd.PodNamespace)
// The plugin already consulted the k8s API - but on this end handler caches may be stale, so retry a few times if we get no pod.
for ambientPod, err = s.handlers.GetPodIfAmbient(addCmd.PodName, addCmd.PodNamespace); (ambientPod == nil) && (retries < maxStaleRetries); retries++ {
if err != nil {
return err
}
log.Warnf("got an event for pod %s in namespace %s not found in current pod cache, retry %d of %d",
addCmd.PodName, addCmd.PodNamespace, retries, maxStaleRetries)
time.Sleep(time.Duration(msInterval) * time.Millisecond)
}

if ambientPod == nil {
return fmt.Errorf("got event for pod %s in namespace %s but could not find in pod cache after retries", addCmd.PodName, addCmd.PodNamespace)
ambientPod, err := s.getPodWithRetry(log, addCmd.PodName, addCmd.PodNamespace)
if err != nil {
return err
}
log.Debugf("Pod: %s in ns: %s is enabled for ambient, adding to mesh.", addCmd.PodName, addCmd.PodNamespace)

Expand All @@ -200,3 +186,32 @@ func (s *CniPluginServer) ReconcileCNIAddEvent(ctx context.Context, addCmd CNIPl

return nil
}

func (s *CniPluginServer) getPodWithRetry(log *istiolog.Scope, name, namespace string) (*corev1.Pod, error) {
log.Debugf("Checking if pod %s/%s is enabled for ambient", namespace, name)
const maxStaleRetries = 10
const msInterval = 10
retries := 0
var ambientPod *corev1.Pod
var err error

// The plugin already consulted the k8s API - but on this end handler caches may be stale, so retry a few times if we get no pod.
// if err is returned, we couldn't find the pod
// if nil is returned, we found it but ambient is not enabled
for ambientPod, err = s.handlers.GetPodIfAmbient(name, namespace); (err != nil) && (retries < maxStaleRetries); retries++ {
log.Warnf("got an event for pod %s in namespace %s not found in current pod cache, retry %d of %d",
name, namespace, retries, maxStaleRetries)
if !sleep.UntilContext(s.ctx, time.Duration(msInterval)*time.Millisecond) {
return nil, fmt.Errorf("aborted")
}
}
if err != nil {
return nil, fmt.Errorf("failed to get pod %s/%s: %v", namespace, name, err)
}

// This shouldn't happen - we only trigger this when a pod is added to ambient.
if ambientPod == nil {
return nil, fmt.Errorf("pod %s/%s is unexpectedly not enrolled in ambient", namespace, name)
}
return ambientPod, nil
}
Loading

0 comments on commit 6ae7557

Please sign in to comment.