diff --git a/Makefile b/Makefile index f7bd28b7a..ca0bba62b 100644 --- a/Makefile +++ b/Makefile @@ -200,11 +200,11 @@ docker.push: docker .PHONY: e2e e2e: - ./test/e2e/run_test.sh + ./test/e2e/run_test.sh --skip-cleanup-apps .PHONY: e2e-ipv6 e2e-ipv6: - ./test/e2e/run_test.sh --ipv6 + ./test/e2e/run_test.sh --ipv6 --skip-cleanup-apps .PHONY: format format: diff --git a/bpf/include/bpf_log.h b/bpf/include/bpf_log.h index 8734e8b85..8d985bcae 100644 --- a/bpf/include/bpf_log.h +++ b/bpf/include/bpf_log.h @@ -77,7 +77,6 @@ volatile __u32 bpf_log_level = BPF_LOG_INFO; #define BPF_LOG(l, t, f, ...) \ do { \ - BPF_LOG_U("bpf log level %u %u", BPF_LOG_##l, bpf_log_level); \ if ((int)(BPF_LOG_##l) <= bpf_log_level) { \ static const char fmt[] = "[" #t "] " #l ": " f ""; \ if (!KERNEL_VERSION_HIGHER_5_13_0) \ diff --git a/bpf/kmesh/workload/xdp.c b/bpf/kmesh/workload/xdp.c index 259b5cda4..6a738bc93 100644 --- a/bpf/kmesh/workload/xdp.c +++ b/bpf/kmesh/workload/xdp.c @@ -19,7 +19,7 @@ static inline void shutdown_tuple(struct xdp_info *info) { info->tcph->fin = 0; info->tcph->syn = 0; - info->tcph->rst = 1; + info->tcph->rst = 0; info->tcph->psh = 0; info->tcph->ack = 0; } diff --git a/pkg/auth/rbac.go b/pkg/auth/rbac.go index b5c21e169..8df2f526f 100644 --- a/pkg/auth/rbac.go +++ b/pkg/auth/rbac.go @@ -181,7 +181,7 @@ func (r *Rbac) doRbac(conn *rbacConnection) bool { // If no workload found, deny if dstWorkload == nil { log.Debugf("denied for connection: %v because destination workload not found", conn) - return false + return true } // TODO: maybe cache them for performance issue diff --git a/pkg/bpf/bpf.go b/pkg/bpf/bpf.go index aac364160..7ef646675 100644 --- a/pkg/bpf/bpf.go +++ b/pkg/bpf/bpf.go @@ -294,20 +294,16 @@ func (l *BpfLoader) setBpfProgOptions() { // Kmesh reboot updates only the nodeIP and pod sub gateway if restart.GetStartType() == restart.Normal { - if err := l.NodeIP.Set(nodeIP); err != nil { - log.Error("set NodeIP failed ", err) - return - } - if err := l.PodGateway.Set(gateway); err != nil { - log.Error("set PodGateway failed ", err) - return - } - if err := l.AuthzOffload.Set(constants.ENABLED); err != nil { - log.Error("set AuthzOffload failed ", err) - return - } - if err := l.EnableMonitoring.Set(constants.ENABLED); err != nil { - log.Error("set EnableMonitoring failed ", err) + // Kmesh is create first time. So init kmeshConfigMap. + // valueOfKmeshBpfConfig.BpfLogLevel = constants.BPF_LOG_INFO + valueOfKmeshBpfConfig.BpfLogLevel = constants.BPF_LOG_DEBUG + valueOfKmeshBpfConfig.NodeIP = nodeIP + valueOfKmeshBpfConfig.PodGateway = gateway + valueOfKmeshBpfConfig.AuthzOffload = constants.DISABLED + valueOfKmeshBpfConfig.EnableMonitoring = constants.ENABLED + + if err := UpdateKmeshConfigMap(l.kmeshConfig, valueOfKmeshBpfConfig); err != nil { + log.Errorf("update kmeshConfig map failed: %v", err) return } } diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index a3f8ec2fa..9cab4a0a8 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -43,7 +43,8 @@ var ( defaultLogger = initDefaultLogger() fileOnlyLogger = initFileLogger() - defaultLogLevel = logrus.InfoLevel + // defaultLogLevel = logrus.InfoLevel + defaultLogLevel = logrus.DebugLevel defaultLogFile = "/var/run/kmesh/daemon.log" defaultLogFormat = &logrus.TextFormatter{ diff --git a/test/e2e/main_test.go b/test/e2e/main_test.go index 98522c73c..2d60edd43 100644 --- a/test/e2e/main_test.go +++ b/test/e2e/main_test.go @@ -28,6 +28,7 @@ import ( "fmt" "io" "os" + "os/exec" "path/filepath" "runtime" "testing" @@ -319,6 +320,18 @@ func newWaypointProxy(ctx resource.Context, ns namespace.Instance, name string, return nil, err } pod := pods[0] + + // adjust log level of waypoint to trace. + cmd := exec.Command("istioctl", "pc", "log", fmt.Sprintf("%s.%s", pod.Name, pod.Namespace), "--level", "trace") + + output, err := cmd.Output() + if err != nil { + fmt.Printf("execute istioctl command failed: %v", err) + return nil, err + } + + fmt.Printf(string(output)) + inbound, err := cls.NewPortForwarder(pod.Name, pod.Namespace, "", 0, 15008) if err != nil { return nil, err diff --git a/test/e2e/restart_test.go b/test/e2e/restart_test.go index 987c3d095..69d92f3b0 100644 --- a/test/e2e/restart_test.go +++ b/test/e2e/restart_test.go @@ -24,14 +24,17 @@ package kmesh import ( + "bytes" "context" "fmt" "testing" "time" + "github.com/hashicorp/go-multierror" + "istio.io/istio/pkg/test" "istio.io/istio/pkg/test/framework" "istio.io/istio/pkg/test/framework/components/echo" - "istio.io/istio/pkg/test/framework/components/echo/util/traffic" + "istio.io/istio/pkg/test/framework/components/echo/check" kubetest "istio.io/istio/pkg/test/kube" "istio.io/istio/pkg/test/util/retry" appsv1 "k8s.io/api/apps/v1" @@ -45,7 +48,7 @@ func TestKmeshRestart(t *testing.T) { dst := apps.ServiceWithWaypointAtServiceGranularity options := echo.CallOptions{ To: dst, - Count: 1, + Count: 10, // Determine whether it is managed by Kmesh by passing through Waypoint. Check: httpValidator, Port: echo.Port{ @@ -54,13 +57,22 @@ func TestKmeshRestart(t *testing.T) { Retry: echo.Retry{NoRetry: true}, } - g := traffic.NewGenerator(t, traffic.Config{ + g := NewGenerator(t, Config{ Source: src, Options: options, - Interval: 50 * time.Millisecond, - }).Start() + Interval: 5 * time.Millisecond, + }) + g.Start() - restartKmesh(t) + for i := 0; i < 1; i++ { + select { + case <-g.stopIter: + fmt.Printf("-- break restart iterator\n") + break + default: + } + restartKmesh(t) + } g.Stop().CheckSuccessRate(t, 1) }) @@ -105,3 +117,173 @@ func restartKmesh(t framework.TestContext) { func daemonsetsetComplete(ds *appsv1.DaemonSet) bool { return ds.Status.UpdatedNumberScheduled == ds.Status.DesiredNumberScheduled && ds.Status.NumberReady == ds.Status.DesiredNumberScheduled && ds.Status.ObservedGeneration >= ds.Generation } + +const ( + defaultInterval = 1 * time.Second + defaultTimeout = 15 * time.Second +) + +// Config for a traffic Generator. +type Config struct { + // Source of the traffic. + Source echo.Caller + + // Options for generating traffic from the Source to the target. + Options echo.CallOptions + + // Interval between successive call operations. If not set, defaults to 1 second. + Interval time.Duration + + // Maximum time to wait for traffic to complete after stopping. If not set, defaults to 15 seconds. + StopTimeout time.Duration +} + +// Generator of traffic between echo instances. Every time interval +// (as defined by Config.Interval), a grpc request is sent to the source pod, +// causing it to send a request to the destination echo server. Results are +// captured for each request for later processing. +type Generator interface { + // Start sending traffic. + Start() Generator + + // Stop sending traffic and wait for any in-flight requests to complete. + // Returns the Result + Stop() Result +} + +// NewGenerator returns a new Generator with the given configuration. +func NewGenerator(t test.Failer, cfg Config) *generator { + fillInDefaults(&cfg) + return &generator{ + Config: cfg, + t: t, + stop: make(chan struct{}), + stopped: make(chan struct{}), + stopIter: make(chan struct{}), + } +} + +var _ Generator = &generator{} + +type generator struct { + Config + t test.Failer + result Result + stop chan struct{} + stopped chan struct{} + + stopIter chan struct{} +} + +func (g *generator) Start() Generator { + go func() { + t := time.NewTimer(g.Interval) + for { + select { + case <-g.stop: + t.Stop() + close(g.stopped) + return + case <-t.C: + result, err := g.Source.Call(g.Options) + g.result.add(result, err) + if err != nil { + g.t.Logf("-- %v encounter error", time.Now()) + close(g.stopIter) + break + } + t.Reset(g.Interval) + } + } + + for { + <-g.stop + t.Stop() + close(g.stopped) + return + } + }() + return g +} + +func (g *generator) Stop() Result { + // Trigger the generator to stop. + close(g.stop) + + // Wait for the generator to exit. + t := time.NewTimer(g.StopTimeout) + select { + case <-g.stopped: + t.Stop() + if g.result.TotalRequests == 0 { + g.t.Fatal("no requests completed before stopping the traffic generator") + } + return g.result + case <-t.C: + g.t.Fatal("timed out waiting for result") + } + // Can never happen, but the compiler doesn't know that Fatal terminates + return Result{} +} + +func fillInDefaults(cfg *Config) { + if cfg.Interval == 0 { + cfg.Interval = defaultInterval + } + if cfg.StopTimeout == 0 { + cfg.StopTimeout = defaultTimeout + } + if cfg.Options.Check == nil { + cfg.Options.Check = check.OK() + } +} + +// Result of a traffic generation operation. +type Result struct { + TotalRequests int + SuccessfulRequests int + Error error +} + +func (r Result) String() string { + buf := &bytes.Buffer{} + + _, _ = fmt.Fprintf(buf, "TotalRequests: %d\n", r.TotalRequests) + _, _ = fmt.Fprintf(buf, "SuccessfulRequests: %d\n", r.SuccessfulRequests) + _, _ = fmt.Fprintf(buf, "PercentSuccess: %f\n", r.PercentSuccess()) + _, _ = fmt.Fprintf(buf, "Errors: %v\n", r.Error) + + return buf.String() +} + +func (r *Result) add(result echo.CallResult, err error) { + count := result.Responses.Len() + if count == 0 { + count = 1 + } + + r.TotalRequests += count + if err != nil { + r.Error = multierror.Append(r.Error, fmt.Errorf("request %d: %v", r.TotalRequests, err)) + } else { + r.SuccessfulRequests += count + } +} + +func (r Result) PercentSuccess() float64 { + return float64(r.SuccessfulRequests) / float64(r.TotalRequests) +} + +// CheckSuccessRate asserts that a minimum success threshold was met. +func (r Result) CheckSuccessRate(t test.Failer, minimumPercent float64) { + t.Helper() + if r.PercentSuccess() < minimumPercent { + t.Fatalf("Minimum success threshold, %f, was not met. %d/%d (%f) requests failed: %v", + minimumPercent, r.SuccessfulRequests, r.TotalRequests, r.PercentSuccess(), r.Error) + } + if r.SuccessfulRequests == r.TotalRequests { + t.Logf("traffic checker succeeded with all successful requests (%d/%d)", r.SuccessfulRequests, r.TotalRequests) + } else { + t.Logf("traffic checker met minimum threshold, with %d/%d successes, but encountered some failures: %v", r.SuccessfulRequests, r.TotalRequests, r.Error) + } +} diff --git a/test/e2e/run_test.sh b/test/e2e/run_test.sh index a05e1014a..92cbd237c 100755 --- a/test/e2e/run_test.sh +++ b/test/e2e/run_test.sh @@ -5,7 +5,7 @@ # AND ADAPTED FOR KMESH. # Exit immediately for non zero status -set -e +# set -e DEFAULT_KIND_IMAGE="kindest/node:v1.30.0@sha256:047357ac0cfea04663786a612ba1eaba9702bef25227a794b52890dd8bcd692e" @@ -137,6 +137,16 @@ function setup_kmesh() { echo "Waiting for pods of Kmesh daemon to enter Running state..." sleep 1 done + + # Set log of each Kmesh pods. + #PODS=$(kubectl get pods -n kmesh-system -l app=kmesh -o jsonpath='{.items[*].metadata.name}') + + #sleep 10 + + #for POD in $PODS; do + # echo $POD + # kmeshctl log $POD --set bpf:debug + #done } export KIND_REGISTRY_NAME="kind-registry" @@ -161,6 +171,11 @@ function build_and_push_images() { HUB="${KIND_REGISTRY}" TAG="latest" make docker.push } +function install_kmeshctl() { + # Install kmeshctl + cp kmeshctl $TMPBIN +} + function install_dependencies() { # 1. Install kind. if ! which kind &> /dev/null @@ -274,6 +289,7 @@ fi if [[ -z "${SKIP_BUILD:-}" ]]; then setup_kind_registry build_and_push_images + install_kmeshctl fi kubectl config use-context "kind-$NAME" @@ -286,9 +302,71 @@ if [[ -z "${SKIP_SETUP:-}" ]]; then setup_kmesh fi -cmd="go test -v -tags=integ $ROOT_DIR/test/e2e/... -istio.test.kube.loadbalancer=false ${PARAMS[*]}" +uname -a + +cmd="go test -v -tags=integ $ROOT_DIR/test/e2e/... -istio.test.kube.loadbalancer=false ${PARAMS[*]} -failfast" -bash -c "$cmd" +exit_code=0 + +for i in {1..100}; do + echo "Iteration $i" + + # 删除前缀为 "echo" 的 k8s namespace + kubectl get namespaces --no-headers | awk '/^(echo|another)/{print $1}' | xargs -r kubectl delete namespace + + # 执行命令,如果失败则跳出循环 + if ! bash -c "$cmd"; then + echo "Command failed, exiting loop." + exit_code=1 + break + fi +done + +#bash -c "$cmd" + +#exit_code=$? + +# 定义变量 +NAMESPACE="kmesh-system" +NODE_NAME="kmesh-testing-worker" + +# 获取指定节点上所有 Pods 的名称 +PODS=$(kubectl get pods -n $NAMESPACE --field-selector spec.nodeName=$NODE_NAME -o jsonpath='{.items[*].metadata.name}') + +# 检查是否找到 Pods +if [ -z "$PODS" ]; then + echo "No pods found on node $NODE_NAME in namespace $NAMESPACE." + exit 1 +fi + +# 遍历每个 Pod 并输出日志 +for POD in $PODS; do + echo "Logs for Pod: $POD" + #kubectl logs -n $NAMESPACE $POD + echo "----------------------------------------" +done + +# 定义命名空间前缀和 Pod 前缀 +#NAMESPACE_PREFIX="echo" +#POD_PREFIX="waypoint" +# +## 获取所有符合条件的命名空间 +#NAMESPACES=$(kubectl get namespaces --no-headers | awk '{print $1}' | grep "^$NAMESPACE_PREFIX") +# +## 遍历每个命名空间 +#for NAMESPACE in $NAMESPACES; do +# # 获取以 waypoint 为前缀的 Pods +# PODS=$(kubectl get pods -n $NAMESPACE --no-headers | awk '{print $1}' | grep "^$POD_PREFIX") +# +# # 遍历每个 Pod 并输出日志 +# for POD in $PODS; do +# echo "Fetching logs for Pod: $POD in Namespace: $NAMESPACE" +# kubectl logs -n $NAMESPACE $POD --tail=10000 +# echo "----------------------------------------" +# done +#done + +sleep 10 if [[ -n "${CLEANUP_KIND}" ]]; then cleanup_kind_cluster @@ -299,3 +377,5 @@ if [[ -n "${CLEANUP_REGISTRY}" ]]; then fi rm -rf "${TMP}" + +exit $exit_code