diff --git a/build/test/client.go b/build/test/client.go index 1d357ec463..3b96d0cd2a 100644 --- a/build/test/client.go +++ b/build/test/client.go @@ -48,7 +48,7 @@ func createResource(yamlFile string) error { cmd := egctlCmd("create", "-f", "-") cmd.Stdin = strings.NewReader(yamlFile) _, stderr, err := runCmd(cmd) - if err != nil { + if err != nil || stderr != "" { return fmt.Errorf("create resource failed\nstderr: %v\nerr: %v", stderr, err) } return nil @@ -58,7 +58,7 @@ func applyResource(yamlFile string) error { cmd := egctlCmd("apply", "-f", "-") cmd.Stdin = strings.NewReader(yamlFile) _, stderr, err := runCmd(cmd) - if err != nil { + if err != nil || stderr != "" { return fmt.Errorf("apply resource failed\nstderr: %v\nerr: %v", stderr, err) } return nil @@ -70,7 +70,7 @@ func deleteResource(kind string, args ...string) error { cmd.Args = append(cmd.Args, args...) } _, stderr, err := runCmd(cmd) - if err != nil { + if err != nil || stderr != "" { return fmt.Errorf("delete resource failed\nstderr: %v\nerr: %v", stderr, err) } return nil @@ -82,7 +82,7 @@ func describeResource(kind string, args ...string) (string, error) { cmd.Args = append(cmd.Args, args...) } stdout, stderr, err := runCmd(cmd) - if err != nil { + if err != nil || stderr != "" { return "", fmt.Errorf("describe resource failed\nstderr: %v\nerr: %v", stderr, err) } return stdout, nil @@ -94,7 +94,7 @@ func getResource(kind string, args ...string) (string, error) { cmd.Args = append(cmd.Args, args...) } stdout, stderr, err := runCmd(cmd) - if err != nil { + if err != nil || stderr != "" { return "", fmt.Errorf("describe resource failed\nstderr: %v\nerr: %v", stderr, err) } return stdout, nil diff --git a/build/test/integration_test.go b/build/test/integration_test.go index 0fb954c1bd..6b96d41e19 100644 --- a/build/test/integration_test.go +++ b/build/test/integration_test.go @@ -26,10 +26,12 @@ import ( "os" "path/filepath" "strings" + "sync/atomic" "testing" "time" paho "github.com/eclipse/paho.mqtt.golang" + "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -234,15 +236,12 @@ filters: mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "hello from backend") }) - server := startServer(8888, mux) - defer server.Shutdown(context.Background()) - // check 8888 server is started - started := checkServerStart(t, func() *http.Request { + server := mustStartServer(8888, mux, func() *http.Request { req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:8888", nil) require.Nil(t, err) return req }) - require.True(t, started) + defer server.Shutdown(context.Background()) // send request to 10081 HTTPServer req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:10081/", nil) @@ -540,22 +539,25 @@ list: func TestCreateHTTPProxy(t *testing.T) { assert := assert.New(t) - for i, port := range []int{9096, 9097, 9098} { + servers := make([]*http.Server, 0) + for _, port := range []int{9096, 9097, 9098} { currentPort := port mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "hello from backend %d", currentPort) }) - - server := startServer(currentPort, mux) - defer server.Shutdown(context.Background()) - started := checkServerStart(t, func() *http.Request { + server := mustStartServer(currentPort, mux, func() *http.Request { req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://127.0.0.1:%d", currentPort), nil) require.Nil(t, err) return req }) - require.True(t, started, i) + servers = append(servers, server) } + defer func() { + for _, s := range servers { + s.Shutdown(context.Background()) + } + }() cmd := egctlCmd( "create", @@ -572,6 +574,9 @@ func TestCreateHTTPProxy(t *testing.T) { _, stderr, err := runCmd(cmd) assert.NoError(err) assert.Empty(stderr) + defer func() { + deleteResource("httpserver", "http-proxy-test") + }() output, err := getResource("httpserver") assert.NoError(err) @@ -635,6 +640,7 @@ rules: time.Sleep(1 * time.Second) cmd.Process.Kill() assert.Contains(stdout.String(), "test-egctl-logs") + deleteResource("httpserver", "test-egctl-logs") } { @@ -668,3 +674,405 @@ func TestMetrics(t *testing.T) { assert.Contains(output, "etcd_server_has_leader") } } + +func TestHealthCheck(t *testing.T) { + assert := assert.New(t) + + // unhealthy server return 503 as status code + var invalidCode int32 = 503 + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("X-Server", "unhealthy") + w.WriteHeader(http.StatusOK) + }) + mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + code := atomic.LoadInt32(&invalidCode) + w.WriteHeader(int(code)) + }) + unhealthy := mustStartServer(12345, mux, func() *http.Request { + req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:12345", nil) + require.Nil(t, err) + return req + }) + defer unhealthy.Shutdown(context.Background()) + + // healthy server return 200 as status code + mux2 := http.NewServeMux() + mux2.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("X-Server", "healthy") + w.WriteHeader(http.StatusOK) + }) + mux2.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }) + healthy := mustStartServer(12346, mux2, func() *http.Request { + req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:12346", nil) + require.Nil(t, err) + return req + }) + defer healthy.Shutdown(context.Background()) + + httpSeverYaml := ` +name: httpserver-hc +kind: HTTPServer +port: 9099 +rules: +- paths: + - pathPrefix: / + backend: pipeline-hc +` + pipelineYaml := ` +name: pipeline-hc +kind: Pipeline +flow: +- filter: proxy +filters: +- name: proxy + kind: Proxy + pools: + - servers: + - url: http://127.0.0.1:12345 + - url: http://127.0.0.1:12346 + loadBalance: + policy: roundRobin + healthCheck: + interval: 200ms + fails: 2 + pass: 2 + path: /health +` + err := createResource(httpSeverYaml) + assert.Nil(err) + defer deleteResource("httpserver", "httpserver-hc") + err = createResource(pipelineYaml) + defer deleteResource("pipeline", "pipeline-hc") + assert.Nil(err) + started := checkServerStart(func() *http.Request { + req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:9099", nil) + assert.Nil(err) + return req + }) + assert.True(started) + + time.Sleep(1 * time.Second) + // unhealthy server not passed health check. + for i := 0; i < 50; i++ { + req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:9099", nil) + assert.Nil(err, i) + resp, err := http.DefaultClient.Do(req) + assert.Nil(err, i) + assert.Equal(http.StatusOK, resp.StatusCode, i) + assert.Equal("healthy", resp.Header.Get("X-Server"), i) + resp.Body.Close() + } + + atomic.StoreInt32(&invalidCode, 200) + time.Sleep(1 * time.Second) + last := "" + // unhealthy server passed health check. + // based on round robin, the response should be unhealthy, healthy, unhealthy, healthy... + for i := 0; i < 50; i++ { + req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:9099", nil) + assert.Nil(err, i) + resp, err := http.DefaultClient.Do(req) + assert.Nil(err, i) + assert.Equal(http.StatusOK, resp.StatusCode, i) + value := resp.Header.Get("X-Server") + if last != "" { + if last == "healthy" { + assert.Equal("unhealthy", value, i) + } else { + assert.Equal("healthy", value, i) + } + } + last = value + resp.Body.Close() + } +} + +func TestHealthCheck2(t *testing.T) { + assert := assert.New(t) + + httpSeverYaml := ` +name: httpserver-hc +kind: HTTPServer +port: 9099 +rules: +- paths: + - pathPrefix: / + backend: pipeline-hc +` + pipelineYaml := ` +name: pipeline-hc +kind: Pipeline +flow: +- filter: proxy +filters: +- name: proxy + kind: Proxy + pools: + - servers: + - url: http://127.0.0.1:12345 + healthCheck: + interval: 200ms + fails: 2 + pass: 2 + uri: /health?proxy=easegress + method: POST + headers: + X-Health: easegress + body: "easegress" + username: admin + password: test-health-check + match: + statusCodes: + - [200, 399] + headers: + - name: X-Status + value: healthy + body: + value: "healthy" +` + // check if health check set request correctly + requestChecker := func(req *http.Request) error { + if req.URL.Path != "/health" || req.URL.Query().Get("proxy") != "easegress" { + return fmt.Errorf("invalid request url: %s", req.URL.String()) + } + if req.Method != http.MethodPost { + return fmt.Errorf("invalid request method: %s", req.Method) + } + if req.Header.Get("X-Health") != "easegress" { + return fmt.Errorf("invalid request header: %s", req.Header.Get("X-Health")) + } + body, err := io.ReadAll(req.Body) + if err != nil { + return err + } + if string(body) != "easegress" { + return fmt.Errorf("invalid request body: %s", string(body)) + } + username, password, ok := req.BasicAuth() + if !ok { + return fmt.Errorf("failed to get basic auth") + } + if username != "admin" || password != "test-health-check" { + return fmt.Errorf("invalid basic auth: %s:%s", username, password) + } + return nil + } + + healthCheckHandler := atomic.Value{} + healthCheckHandler.Store(func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("X-Status", "healthy") + w.WriteHeader(http.StatusOK) + w.Write([]byte("healthy")) + }) + callHealthCheck := atomic.Bool{} + callHealthCheck.Store(false) + + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("X-Server", "server") + w.WriteHeader(http.StatusOK) + }) + mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + callHealthCheck.Store(true) + err := requestChecker(r) + assert.Nil(err) + handler := healthCheckHandler.Load().(func(w http.ResponseWriter, r *http.Request)) + handler(w, r) + }) + server := mustStartServer(12345, mux, func() *http.Request { + req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:12345", nil) + require.Nil(t, err) + return req + }) + defer server.Shutdown(context.Background()) + + err := createResource(httpSeverYaml) + assert.Nil(err) + defer deleteResource("httpserver", "httpserver-hc") + err = createResource(pipelineYaml) + defer deleteResource("pipeline", "pipeline-hc") + assert.Nil(err) + started := checkServerStart(func() *http.Request { + req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:9099", nil) + assert.Nil(err) + return req + }) + assert.True(started) + + doReq := func() *http.Response { + req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:9099", nil) + assert.Nil(err) + resp, err := http.DefaultClient.Do(req) + assert.Nil(err) + return resp + } + + // health check passed + time.Sleep(1 * time.Second) + resp := doReq() + resp.Body.Close() + assert.Equal(http.StatusOK, resp.StatusCode) + assert.Equal("server", resp.Header.Get("X-Server")) + + // health check failed, wrong status code + healthCheckHandler.Store(func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("X-Status", "healthy") + w.WriteHeader(http.StatusUnauthorized) + w.Write([]byte("healthy")) + }) + time.Sleep(1 * time.Second) + resp = doReq() + resp.Body.Close() + assert.Equal(http.StatusServiceUnavailable, resp.StatusCode) + + // health check failed, wrong body + healthCheckHandler.Store(func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("X-Status", "healthy") + w.WriteHeader(http.StatusOK) + w.Write([]byte("not ok")) + }) + time.Sleep(1 * time.Second) + resp = doReq() + resp.Body.Close() + assert.Equal(http.StatusServiceUnavailable, resp.StatusCode) + + // health check failed, wrong header + healthCheckHandler.Store(func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("X-Status", "unhealthy") + w.WriteHeader(http.StatusOK) + w.Write([]byte("healthy")) + }) + time.Sleep(800 * time.Millisecond) + resp = doReq() + resp.Body.Close() + assert.Equal(http.StatusServiceUnavailable, resp.StatusCode) + + // health check passed + healthCheckHandler.Store(func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("X-Status", "healthy") + w.WriteHeader(http.StatusFound) + w.Write([]byte("very healthy")) + }) + time.Sleep(1 * time.Second) + resp = doReq() + resp.Body.Close() + assert.Equal(http.StatusOK, resp.StatusCode) + assert.Equal("server", resp.Header.Get("X-Server")) + + called := callHealthCheck.Load() + assert.True(called) +} + +func TestWebSocketHealthCheck(t *testing.T) { + assert := assert.New(t) + + httpSeverYaml := ` +name: httpserver-hc +kind: HTTPServer +port: 9099 +rules: +- paths: + - headers: + - key: Upgrade + values: + - websocket + backend: pipeline-ws + clientMaxBodySize: -1 +` + wsYaml := ` +name: pipeline-ws +kind: Pipeline +filters: +- name: websocket + kind: WebSocketProxy + pools: + - servers: + - url: ws://127.0.0.1:12345 + healthCheck: + interval: 200ms + timeout: 200ms + http: + uri: /health + ws: + uri: /ws +` + + upgrader := &websocket.Upgrader{} + + httpHandler := atomic.Value{} + httpHandler.Store(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }) + wsHandler := atomic.Value{} + wsHandler.Store(func(w http.ResponseWriter, r *http.Request) { + _, err := upgrader.Upgrade(w, r, nil) + assert.Nil(err) + }) + + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + // for websocket proxy filter to access + conn, err := upgrader.Upgrade(w, r, nil) + assert.Nil(err) + defer conn.Close() + conn.WriteMessage(websocket.TextMessage, []byte("hello from websocket")) + }) + mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + // for http health check of websocket proxy filter + handler := httpHandler.Load().(func(w http.ResponseWriter, r *http.Request)) + handler(w, r) + }) + mux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { + // for ws health check of websocket proxy filter + handler := wsHandler.Load().(func(w http.ResponseWriter, r *http.Request)) + handler(w, r) + }) + server := mustStartServer(12345, mux, func() *http.Request { + req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:12345/health", nil) + require.Nil(t, err) + return req + }) + defer server.Shutdown(context.Background()) + + err := createResource(httpSeverYaml) + assert.Nil(err) + defer deleteResource("httpserver", "httpserver-hc") + err = createResource(wsYaml) + assert.Nil(err) + defer deleteResource("pipeline", "pipeline-ws") + time.Sleep(1 * time.Second) + + // health check passed + time.Sleep(1 * time.Second) + conn, _, err := websocket.DefaultDialer.Dial("ws://127.0.0.1:9099", nil) + assert.Nil(err) + _, data, err := conn.ReadMessage() + assert.Nil(err) + assert.Equal("hello from websocket", string(data)) + conn.Close() + + // health check failed + wsHandler.Store(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }) + time.Sleep(1 * time.Second) + _, resp, err := websocket.DefaultDialer.Dial("ws://127.0.0.1:9099", nil) + assert.NotNil(err) + assert.Equal(http.StatusServiceUnavailable, resp.StatusCode) + + // health check passed again + wsHandler.Store(func(w http.ResponseWriter, r *http.Request) { + _, err := upgrader.Upgrade(w, r, nil) + assert.Nil(err) + }) + time.Sleep(1 * time.Second) + conn, resp, err = websocket.DefaultDialer.Dial("ws://127.0.0.1:9099", nil) + assert.Nil(err) + assert.Equal(http.StatusSwitchingProtocols, resp.StatusCode) + _, data, err = conn.ReadMessage() + assert.Nil(err) + assert.Equal("hello from websocket", string(data)) +} diff --git a/build/test/server.go b/build/test/server.go index ffb40349d5..e173c54384 100644 --- a/build/test/server.go +++ b/build/test/server.go @@ -21,7 +21,6 @@ package test import ( "fmt" "net/http" - "testing" "time" ) @@ -34,11 +33,11 @@ func startServer(port int, handler http.Handler) *http.Server { return server } -func checkServerStart(t *testing.T, checkReq func() *http.Request) bool { - for i := 0; i < 10; i++ { +func checkServerStart(checkReq func() *http.Request) bool { + for i := 0; i < 100; i++ { req := checkReq() resp, err := http.DefaultClient.Do(req) - if err == nil { + if err == nil && resp.StatusCode == http.StatusOK { resp.Body.Close() return true } @@ -46,3 +45,11 @@ func checkServerStart(t *testing.T, checkReq func() *http.Request) bool { } return false } + +func mustStartServer(port int, hanlder http.Handler, checkReq func() *http.Request) *http.Server { + server := startServer(port, hanlder) + if !checkServerStart(checkReq) { + panic(fmt.Sprintf("failed to start server on port %v", port)) + } + return server +} diff --git a/cmd/builder/command/add.go b/cmd/builder/command/add.go index 73c7841e0f..74c43a545f 100644 --- a/cmd/builder/command/add.go +++ b/cmd/builder/command/add.go @@ -30,6 +30,7 @@ import ( var addConfig = &gen.Config{} +// AddCmd adds filter or controllers to the project. func AddCmd() *cobra.Command { cmd := &cobra.Command{ Use: "add", @@ -44,7 +45,7 @@ func AddCmd() *cobra.Command { return cmd } -func addArgs(cmd *cobra.Command, args []string) error { +func addArgs(_ *cobra.Command, args []string) error { if len(args) != 0 { return errors.New("add takes no arguments") } @@ -65,7 +66,7 @@ func addArgs(cmd *cobra.Command, args []string) error { return nil } -func addRun(cmd *cobra.Command, args []string) { +func addRun(_ *cobra.Command, _ []string) { cwd, err := os.Getwd() if err != nil { utils.ExitWithError(err) diff --git a/cmd/builder/command/build.go b/cmd/builder/command/build.go index ab3f57bcaf..2af3a23a11 100644 --- a/cmd/builder/command/build.go +++ b/cmd/builder/command/build.go @@ -28,6 +28,7 @@ import ( var buildConfig string +// BuildCmd builds Easegress with custom plugins. func BuildCmd() *cobra.Command { cmd := &cobra.Command{ Use: "build", @@ -40,14 +41,14 @@ func BuildCmd() *cobra.Command { return cmd } -func buildArgs(cmd *cobra.Command, args []string) error { +func buildArgs(_ *cobra.Command, _ []string) error { if len(buildConfig) == 0 { return errors.New("config file is required") } return nil } -func buildRun(cmd *cobra.Command, args []string) { +func buildRun(_ *cobra.Command, _ []string) { ctx, stop := utils.WithInterrupt(context.Background()) defer stop() diff --git a/docs/02.Tutorials/2.3.Pipeline-Explained.md b/docs/02.Tutorials/2.3.Pipeline-Explained.md index 441590cded..f9736d6d05 100644 --- a/docs/02.Tutorials/2.3.Pipeline-Explained.md +++ b/docs/02.Tutorials/2.3.Pipeline-Explained.md @@ -17,6 +17,7 @@ Easegress offers a rich set of filters tailored for diverse use cases, including - [GlobalFilter](#globalfilter) - [Load Balancer](#load-balancer) - [Traffic Adaptor: Change Something of Two-Way Traffic](#traffic-adaptor-change-something-of-two-way-traffic) + - [Health Check](#health-check) - [API Aggregation](#api-aggregation) - [Example 1: Simple aggregation](#example-1-simple-aggregation) - [Example 2: Merge response body](#example-2-merge-response-body) @@ -374,6 +375,49 @@ filters: loadBalance: policy: roundRobin ``` + +### Health Check +Perform a health check on the servers in the pool. If a server fails the check, it will be marked as unhealthy, and requests will be rerouted to other healthy servers until it regains health. + +More details config in [Proxy](../07.Reference/7.02.Filters.md#health-check). + +```yaml +name: pipeline-reverse-proxy +kind: Pipeline +flow: + - filter: proxy +filters: + - name: proxy + kind: Proxy + pools: + - servers: + - url: http://127.0.0.1:9095 + - url: http://127.0.0.1:9096 + - url: http://127.0.0.1:9097 + loadBalance: + policy: roundRobin + healthCheck: + # interval between health checks (default: 60s) + interval: 60s + + # uri for health check http request + uri: /health + # http request headers for health check + headers: + X-Health-Check: easegress + # username for basic authentication + username: admin + # password for basic authentication + password: xxxxxx + + # response validation criteria (default: 2xx and 3xx status codes) + match: + # acceptable status code ranges + statusCodes: + - [200, 299] # 2xx + - [300, 399] # 3xx +``` + ### API Aggregation API aggregation is a pattern to aggregate multiple individual requests into a single request. This pattern is useful when a client must make multiple calls to different backend systems to operate. Easegress provides filters [RequestBuilder](../07.Reference/7.02.Filters.md#requestbuilder) & [ResponseBuilder](../07.Reference/7.02.Filters.md#responsebuilder) for this powerful feature. diff --git a/docs/02.Tutorials/2.6.Websocket.md b/docs/02.Tutorials/2.6.Websocket.md index 8cad755212..e0128ec125 100644 --- a/docs/02.Tutorials/2.6.Websocket.md +++ b/docs/02.Tutorials/2.6.Websocket.md @@ -2,6 +2,8 @@ - [Background](#background) - [Design](#design) +- [Health Check](#health-check) + - [Health Check](#health-check-1) - [Example](#example) - [References](#references) @@ -79,6 +81,58 @@ headers that send to websocket backend. `Sec-Websocket-Version`, `Sec-Websocket-Extensions` and `Sec-Websocket-Protocol` in http headers to set connection. +## Health Check + +### Health Check +Perform a health check on the servers in the pool. If a server fails the check, it will be marked as unhealthy, and requests will be rerouted to other healthy servers until it regains health. Health check for websocket proxy contains both http way or websocket way. The HTTP check involves a request-response evaluation similar to a Proxy filter. In the WebSocket method, a successful connection yields a 101 status code. Additional headers can be set and evaluated in both methods. +If you send both two ways of health check, then a server passes both HTTP and WebSocket health checks, it will be considered healthy. + +More details in [WebSocketProxy](../07.Reference/7.02.Filters.md#health-check-1). + +```yaml +kind: WebSocketProxy +name: proxy-example-1 +pools: +- servers: + - url: ws://127.0.0.1:9095 + - url: ws://127.0.0.1:9096 + - url: ws://127.0.0.1:9097 + healthCheck: + # interval between health checks (default: 60s) + interval: 60s + # timeout for health check response (default: 3s) + timeout: 3s + # fail threshold to mark server as unhealthy (default: 1) + fails: 1 + # success threshold to mark server as healthy (default: 1) + pass: 1 + + ws: + # health check request port (defaults to server's port, e.g., 9095) + port: 10080 + # uri for health check + uri: /health/ws + # http request headers for health check + headers: + X-Health-Check: easegress + + # response validation criteria (default: 101 Switching Protocols) + match: + # acceptable status code ranges + statusCodes: + - [101, 101] # 101 + # response header validation + # name is header key. + # value is header value, can be empty. + # type is type of value, can be "exact" or "regexp". + headers: + - name: X-Status + value: healthy + type: exact + http: + uri: /health +``` + ## Example 1. Send request diff --git a/docs/07.Reference/7.02.Filters.md b/docs/07.Reference/7.02.Filters.md index 2f31024e46..4c86983bcc 100644 --- a/docs/07.Reference/7.02.Filters.md +++ b/docs/07.Reference/7.02.Filters.md @@ -2,12 +2,14 @@ - [Proxy](#proxy) + - [Health Check](#health-check) - [Configuration](#configuration) - [Results](#results) - [SimpleHTTPProxy](#simplehttpproxy) - [Configuration](#configuration-1) - [Results](#results-1) - [WebSocketProxy](#websocketproxy) + - [Health Check](#health-check-1) - [Configuration](#configuration-2) - [Results](#results-2) - [CORSAdaptor](#corsadaptor) @@ -127,7 +129,7 @@ or preceding filter needs to take extra action. The Proxy filter is a proxy of the backend service. Below is one of the simplest Proxy configurations, it forward requests -to `http://127.0.0.1:9095`. +to `http://127.0.0.1:9095` or `http://127.0.0.1:9096` or `http://127.0.0.1:9097` based on `roundRobin`. ```yaml kind: Proxy @@ -135,6 +137,10 @@ name: proxy-example-1 pools: - servers: - url: http://127.0.0.1:9095 + - url: http://127.0.0.1:9096 + - url: http://127.0.0.1:9097 + loadBalance: + policy: roundRobin maxRedirection: 10 ``` @@ -177,22 +183,68 @@ pools: - serverTags: ["v2"] serviceName: service-001 serviceRegistry: eureka-service-registry-example + loadBalance: + policy: roundRobin maxRedirection: 10 ``` -When there are multiple servers in a pool, the Proxy can do a load balance -between them: +### Health Check + +Perform a health check on the servers in the pool. If a server fails the check, it will be marked as unhealthy, and requests will be rerouted to other healthy servers until it regains health. ```yaml +name: proxy kind: Proxy -name: proxy-example-4 pools: -- serverTags: ["v2"] - serviceName: service-001 - serviceRegistry: eureka-service-registry-example - loadBalance: - policy: roundRobin -maxRedirection: 10 +- servers: + - url: http://127.0.0.1:9095 + - url: http://127.0.0.1:9096 + - url: http://127.0.0.1:9097 + healthCheck: + # interval between health checks (default: 60s) + interval: 60s + # timeout for health check response (default: 3s) + timeout: 3s + # fail threshold to mark server as unhealthy (default: 1) + fails: 1 + # success threshold to mark server as healthy (default: 1) + pass: 1 + + # health check request port (defaults to server's port, e.g., 9095) + port: 10080 + # uri for health check http request + uri: /health + # http method for health check + method: GET + # http request headers for health check + headers: + X-Health-Check: easegress + # http request body for health check + body: "you-body-here" + # username for basic authentication + username: admin + # password for basic authentication + password: xxxxxx + + # response validation criteria (default: 2xx and 3xx status codes) + match: + # acceptable status code ranges + statusCodes: + - [200, 299] # 2xx + - [300, 399] # 3xx + # response header validation + # name is header key. + # value is header value, can be empty. + # type is type of value, can be "exact" or "regexp". + headers: + - name: X-Status + value: healthy + type: exact + # response body validation + # type can be "contains" or "regexp". + body: + value: "healthy" + type: contains ``` ### Configuration @@ -299,7 +351,7 @@ https_proxy=http://127.0.0.1:8088 http_proxy=http://127.0.0.1:8088 curl https:// The WebSocketProxy filter is a proxy of the websocket backend service. Below is one of the simplest WebSocketProxy configurations, it forwards -the websocket connection to `ws://127.0.0.1:9095`. +the websocket connection to `ws://127.0.0.1:9095` or `ws://127.0.0.1:9096` or `ws://127.0.0.1:9097`. ```yaml kind: WebSocketProxy @@ -307,6 +359,8 @@ name: proxy-example-1 pools: - servers: - url: ws://127.0.0.1:9095 + - url: ws://127.0.0.1:9096 + - url: ws://127.0.0.1:9097 ``` Same as the `Proxy` filter: @@ -330,6 +384,88 @@ rules: backend: websocket-pipeline ``` +### Health Check +Perform a health check on the servers in the pool. If a server fails the check, it will be marked as unhealthy, and requests will be rerouted to other healthy servers until it regains health. Health check for websocket proxy contains both http way or websocket way. The HTTP check involves a request-response evaluation similar to a Proxy filter. In the WebSocket method, a successful connection yields a 101 status code. Additional headers can be set and evaluated in both methods. +If you send both two ways of health check, then a server passes both HTTP and WebSocket health checks, it will be considered healthy. + +```yaml +kind: WebSocketProxy +name: proxy-example-1 +pools: +- servers: + - url: ws://127.0.0.1:9095 + - url: ws://127.0.0.1:9096 + - url: ws://127.0.0.1:9097 + healthCheck: + # interval between health checks (default: 60s) + interval: 60s + # timeout for health check response (default: 3s) + timeout: 3s + # fail threshold to mark server as unhealthy (default: 1) + fails: 1 + # success threshold to mark server as healthy (default: 1) + pass: 1 + + ws: + # health check request port (defaults to server's port, e.g., 9095) + port: 10080 + # uri for health check + uri: /health/ws + # http request headers for health check + headers: + X-Health-Check: easegress + + # response validation criteria (default: 101 Switching Protocols) + match: + # acceptable status code ranges + statusCodes: + - [101, 101] # 101 + # response header validation + # name is header key. + # value is header value, can be empty. + # type is type of value, can be "exact" or "regexp". + headers: + - name: X-Status + value: healthy + type: exact + http: + # health check request port (defaults to server's port, e.g., 9095) + port: 10080 + # uri for health check http request + uri: /health + # http method for health check + method: GET + # http request headers for health check + headers: + X-Health-Check: easegress + # http request body for health check + body: "you-body-here" + # username for basic authentication + username: admin + # password for basic authentication + password: xxxxxx + + # response validation criteria (default: 2xx and 3xx status codes) + match: + # acceptable status code ranges + statusCodes: + - [200, 299] # 2xx + - [300, 399] # 3xx + # response header validation + # name is header key. + # value is header value, can be empty. + # type is type of value, can be "exact" or "regexp". + headers: + - name: X-Status + value: healthy + type: exact + # response body validation + # type can be "contains" or "regexp". + body: + value: "healthy" + type: contains +``` + ### Configuration | Name | Type | Description | Required | | ---- | ---- | ----------- | -------- | @@ -1585,6 +1721,7 @@ Rules to revise request header. | retryPolicy | string | Retry policy name | No | | circuitBreakerPolicy | string | CircuitBreaker policy name | No | | failureCodes | []int | Proxy return result of failureCode when backend resposne's status code in failureCodes. The default value is 5xx | No | +| healthCheck | ProxyHealthCheckSpec | Health check. Full example with details in [Proxy Health Check](#health-check) | No | ### proxy.Server @@ -1603,7 +1740,7 @@ Rules to revise request header. | policy | string | Load balance policy, valid values are `roundRobin`, `random`, `weightedRandom`, `ipHash`, `headerHash`, `cookieHash` and `forward`, the last one is only used in `GRPCProxy` | Yes | | headerHashKey | string | When `policy` is `headerHash`, this option is the name of a header whose value is used for hash calculation | No | | stickySession | [proxy.StickySession](#proxyStickySessionSpec) | Sticky session spec | No | -| healthCheck | [proxy.HealthCheck](#proxyHealthCheckSpec) | Health check spec, note that healthCheck is not needed if you are using service registry | No | +| healthCheck | [proxy.HealthCheck](#proxyHealthCheckSpec) | (Deprecated) Use [Proxy](#health-check) or [WebSocketProxy](#health-check-1) instead. | No | | forwardKey | string | The value of this field is a header name of the incoming request, the value of this header is address of the target server (host:port), and the request will be sent to this address | No | ### proxy.StickySessionSpec @@ -1617,6 +1754,8 @@ Rules to revise request header. ### proxy.HealthCheckSpec +(Deprecated) Use [Proxy](#health-check) or [WebSocketProxy](#health-check-1) instead. + | Name | Type | Description | Required | | ------------- | ------ | ----------------------------------------------------------------------------------------------------------- | -------- | | interval | string | Interval duration for health check, default is 60s | Yes | @@ -1743,6 +1882,7 @@ The relationship between `methods` and `url` is `AND`. | filter | [proxy.RequestMatcherSpec](#proxyrequestmatcherspec) | Filter options for candidate pools | No | | insecureSkipVerify | bool | Disable origin verification when accepting client connections, default is `false`. | No | | originPatterns | []string | Host patterns for authorized origins, used to enable cross origin WebSockets. | No | +| healthCheck | WSProxyHealthCheckSpec | Health check for Websocket. Full example with details in [WebSocketProxy Health Check](#health-check-1) | No | ### mock.Rule diff --git a/go.mod b/go.mod index 31332fd426..be841f9dc5 100644 --- a/go.mod +++ b/go.mod @@ -195,7 +195,7 @@ require ( github.com/google/go-querystring v1.1.0 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect - github.com/gorilla/websocket v1.5.0 // indirect + github.com/gorilla/websocket v1.5.0 github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect diff --git a/pkg/filters/proxies/healthcheck.go b/pkg/filters/proxies/healthcheck.go index 1bc281d7e1..773b2407f6 100644 --- a/pkg/filters/proxies/healthcheck.go +++ b/pkg/filters/proxies/healthcheck.go @@ -17,58 +17,46 @@ package proxies -import ( - "net/http" - "time" -) +import "time" // HealthCheckSpec is the spec for health check. type HealthCheckSpec struct { // Interval is the interval duration for health check. Interval string `json:"interval,omitempty" jsonschema:"format=duration"` - // Path is the health check path for server - Path string `json:"path,omitempty"` // Timeout is the timeout duration for health check, default is 3. Timeout string `json:"timeout,omitempty" jsonschema:"format=duration"` // Fails is the consecutive fails count for assert fail, default is 1. Fails int `json:"fails,omitempty" jsonschema:"minimum=1"` // Passes is the consecutive passes count for assert pass, default is 1. Passes int `json:"passes,omitempty" jsonschema:"minimum=1"` -} - -// HealthChecker checks whether a server is healthy or not. -type HealthChecker interface { - Check(svr *Server) bool - Close() -} -// HTTPHealthChecker is a health checker for HTTP protocol. -type HTTPHealthChecker struct { - path string - client *http.Client + // Deprecated: other fields in this struct are general, but this field is + // specific to HTTP health check. It should be moved to HTTP health check. + // In HTTP health check, we should use URI instead of path. + Path string `json:"path,omitempty"` } -// NewHTTPHealthChecker creates a new HTTPHealthChecker. -func NewHTTPHealthChecker(spec *HealthCheckSpec) HealthChecker { - timeout, _ := time.ParseDuration(spec.Timeout) +// GetTimeout returns the timeout duration. +func (s *HealthCheckSpec) GetTimeout() time.Duration { + timeout, _ := time.ParseDuration(s.Timeout) if timeout <= 0 { timeout = 3 * time.Second } - - return &HTTPHealthChecker{ - path: spec.Path, - client: &http.Client{Timeout: timeout}, - } + return timeout } -// Check checks whether a server is healthy or not. -func (hc *HTTPHealthChecker) Check(svr *Server) bool { - // TODO: should use url.JoinPath? - url := svr.URL + hc.path - resp, err := hc.client.Get(url) - return err == nil && resp.StatusCode < 500 +// GetInterval returns the interval duration. +func (s *HealthCheckSpec) GetInterval() time.Duration { + interval, _ := time.ParseDuration(s.Interval) + if interval <= 0 { + interval = time.Minute + } + return interval } -// Close closes the health checker -func (hc *HTTPHealthChecker) Close() { +// HealthChecker checks whether a server is healthy or not. +type HealthChecker interface { + BaseSpec() HealthCheckSpec + Check(svr *Server) bool + Close() } diff --git a/pkg/filters/proxies/healthcheck_test.go b/pkg/filters/proxies/healthcheck_test.go index c5db9cdbc4..669b666917 100644 --- a/pkg/filters/proxies/healthcheck_test.go +++ b/pkg/filters/proxies/healthcheck_test.go @@ -20,36 +20,27 @@ package proxies import ( "sync" "sync/atomic" - "testing" - - "github.com/stretchr/testify/assert" ) type MockHealthChecker struct { - expect int32 + Expect int32 + Result bool + WG *sync.WaitGroup + + spec HealthCheckSpec counter int32 - result bool - wg *sync.WaitGroup +} + +func (c *MockHealthChecker) BaseSpec() HealthCheckSpec { + return c.spec } func (c *MockHealthChecker) Check(svr *Server) bool { - if c.wg != nil && atomic.AddInt32(&c.counter, 1) <= c.expect { - c.wg.Done() + if c.WG != nil && atomic.AddInt32(&c.counter, 1) <= c.Expect { + c.WG.Done() } - return c.result + return c.Result } func (c *MockHealthChecker) Close() { } - -func TestHTTPHealthChecker(t *testing.T) { - spec := &HealthCheckSpec{} - c := NewHTTPHealthChecker(spec) - assert.NotNil(t, c) - - spec = &HealthCheckSpec{Timeout: "100ms"} - c = NewHTTPHealthChecker(spec) - c.Check(&Server{URL: "https://www.megaease.com"}) - - c.Close() -} diff --git a/pkg/filters/proxies/httpproxy/healthcheck.go b/pkg/filters/proxies/httpproxy/healthcheck.go new file mode 100644 index 0000000000..d4ebad1262 --- /dev/null +++ b/pkg/filters/proxies/httpproxy/healthcheck.go @@ -0,0 +1,484 @@ +/* + * Copyright (c) 2017, MegaEase + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package httpproxy + +import ( + "bytes" + "crypto/tls" + "fmt" + "io" + "net/http" + "net/url" + "regexp" + "strings" + + "github.com/gorilla/websocket" + "github.com/megaease/easegress/v2/pkg/filters/proxies" + "github.com/megaease/easegress/v2/pkg/logger" +) + +const ( + regexpType = "regexp" + exactType = "exact" + containsType = "contains" +) + +// ProxyHealthCheckSpec is the spec of http proxy health check. +type ProxyHealthCheckSpec struct { + proxies.HealthCheckSpec `json:",inline"` + HTTPHealthCheckSpec `json:",inline"` +} + +// HTTPHealthCheckSpec is the spec of HTTP health check. +type HTTPHealthCheckSpec struct { + Port int `json:"port,omitempty"` + URI string `json:"uri,omitempty"` + Method string `json:"method,omitempty"` + Headers map[string]string `json:"headers,omitempty"` + Body string `json:"body,omitempty"` + Username string `json:"username,omitempty"` + Password string `json:"password,omitempty"` + + Match *HealthCheckMatch `json:"match,omitempty"` +} + +// HealthCheckMatch is the match spec of health check. +type HealthCheckMatch struct { + StatusCodes [][]int `json:"statusCodes,omitempty"` + Headers []HealthCheckHeaderMatch `json:"headers,omitempty"` + Body *HealthCheckBodyMatch `json:"body,omitempty"` +} + +// HealthCheckHeaderMatch is the match spec of health check header. +type HealthCheckHeaderMatch struct { + Name string `json:"name,omitempty"` + Value string `json:"value,omitempty"` + // Type is the match type, exact or regex + Type string `json:"type,omitempty"` + re *regexp.Regexp +} + +// HealthCheckBodyMatch is the match spec of health check body. +type HealthCheckBodyMatch struct { + Value string `json:"value,omitempty"` + // Type is the match type, contains or regex + Type string `json:"type,omitempty"` + re *regexp.Regexp +} + +func (match *HealthCheckMatch) Match(resp *http.Response) error { + var valid bool + for _, r := range match.StatusCodes { + if resp.StatusCode >= r[0] && resp.StatusCode <= r[1] { + valid = true + break + } + } + if !valid { + return fmt.Errorf("invalid status code %d", resp.StatusCode) + } + for _, h := range match.Headers { + v := resp.Header.Get(h.Name) + if h.re != nil { + if !h.re.MatchString(v) { + return fmt.Errorf("invalid header %s: %s", h.Name, v) + } + } else { + if v != h.Value { + return fmt.Errorf("invalid header %s: %s", h.Name, v) + } + } + } + if match.Body != nil { + body, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + if match.Body.re != nil { + if !match.Body.re.MatchString(string(body)) { + return fmt.Errorf("invalid body: %s", string(body)) + } + } else { + if !strings.Contains(string(body), match.Body.Value) { + return fmt.Errorf("invalid body: %s", string(body)) + } + } + } + return nil +} + +func (match *HealthCheckMatch) Validate() error { + if len(match.StatusCodes) != 0 { + for _, s := range match.StatusCodes { + if len(s) != 2 { + return fmt.Errorf("invalid status code range: %v", s) + } + if s[0] > s[1] { + return fmt.Errorf("invalid status code range: %v", s) + } + } + } + if len(match.Headers) != 0 { + for _, h := range match.Headers { + if h.Name == "" { + return fmt.Errorf("empty match header name") + } + if h.Type != "" { + if h.Type != exactType && h.Type != regexpType { + return fmt.Errorf("invalid match header type: %s", h.Type) + } + } + if h.Type == regexpType { + _, err := regexp.Compile(h.Value) + if err != nil { + return fmt.Errorf("invalid match header regex: %s", h.Value) + } + } + } + } + if match.Body != nil { + if match.Body.Type != "" { + if match.Body.Type != containsType && match.Body.Type != regexpType { + return fmt.Errorf("invalid match body type: %s", match.Body.Type) + } + } + if match.Body.Type == regexpType { + _, err := regexp.Compile(match.Body.Value) + if err != nil { + return fmt.Errorf("invalid match body regex: %s", match.Body.Value) + } + } + } + return nil +} + +// Validate validates HTTPHealthCheckSpec. +func (spec *HTTPHealthCheckSpec) Validate() error { + if spec.Port < 0 { + return fmt.Errorf("invalid port: %d", spec.Port) + } + if spec.URI != "" { + _, err := url.Parse(spec.URI) + if err != nil { + return err + } + } + if spec.Method != "" { + if _, ok := httpMethods[spec.Method]; !ok { + return fmt.Errorf("invalid method: %s", spec.Method) + } + } + if spec.Username != "" && spec.Password == "" { + return fmt.Errorf("empty password") + } + if spec.Username == "" && spec.Password != "" { + return fmt.Errorf("empty username") + } + if spec.Match != nil { + err := spec.Match.Validate() + if err != nil { + return err + } + } + return nil +} + +// Validate validates HealthCheckSpec. +func (spec *ProxyHealthCheckSpec) Validate() error { + return spec.HTTPHealthCheckSpec.Validate() +} + +type httpHealthChecker struct { + spec *ProxyHealthCheckSpec + uri *url.URL + client *http.Client +} + +// BaseSpec returns the base spec. +func (hc *httpHealthChecker) BaseSpec() proxies.HealthCheckSpec { + return hc.spec.HealthCheckSpec +} + +// Check checks the health of the server. +func (hc *httpHealthChecker) Check(server *proxies.Server) bool { + s := hc.spec + + url := getURL(server, hc.uri, hc.spec.Port, false) + req, err := http.NewRequest(s.Method, url, bytes.NewReader([]byte(s.Body))) + if err != nil { + logger.Errorf("create health check request %s %s failed: %v", s.Method, url, err) + return false + } + if len(s.Username) > 0 { + req.SetBasicAuth(s.Username, s.Password) + } + for k, v := range s.Headers { + if strings.EqualFold(k, "host") { + req.Host = v + } else { + req.Header.Set(k, v) + } + } + // client close the connection + req.Close = true + resp, err := hc.client.Do(req) + if err != nil { + logger.Warnf("health check %s %s failed: %v", s.Method, url, err) + return false + } + defer resp.Body.Close() + + err = s.Match.Match(resp) + if err != nil { + logger.Warnf("health check %s %s failed: %v", s.Method, url, err) + return false + } + return true +} + +// Close closes the health checker. +func (hc *httpHealthChecker) Close() {} + +// NewHTTPHealthChecker creates a new HTTP health checker. +func NewHTTPHealthChecker(tlsConfig *tls.Config, spec *ProxyHealthCheckSpec) proxies.HealthChecker { + if spec == nil { + return nil + } + if spec.Method == "" { + spec.Method = http.MethodGet + } + if spec.URI == "" && spec.Path != "" { + spec.URI = spec.Path + } + var uri *url.URL + if spec.URI != "" { + uri, _ = url.Parse(spec.URI) + } + if spec.Match != nil { + if len(spec.Match.StatusCodes) == 0 { + spec.Match.StatusCodes = [][]int{{200, 399}} + } + for i := range spec.Match.Headers { + h := &spec.Match.Headers[i] + if h.Type == regexpType { + h.re = regexp.MustCompile(h.Value) + } + } + if spec.Match.Body != nil { + if spec.Match.Body.Type == regexpType { + spec.Match.Body.re = regexp.MustCompile(spec.Match.Body.Value) + } + } + } else { + spec.Match = &HealthCheckMatch{ + StatusCodes: [][]int{{200, 399}}, + } + } + transport := &http.Transport{ + TLSClientConfig: tlsConfig, + DisableKeepAlives: true, + Proxy: http.ProxyFromEnvironment, // use proxy from environment variables + } + return &httpHealthChecker{ + spec: spec, + uri: uri, + client: &http.Client{ + Timeout: spec.GetTimeout(), + Transport: transport, + }, + } +} + +// WSProxyHealthCheckSpec is the spec of ws proxy health check. +type WSProxyHealthCheckSpec struct { + proxies.HealthCheckSpec `json:",inline"` + HTTP *HTTPHealthCheckSpec `json:"http,omitempty"` + WS *WSHealthCheckSpec `json:"ws,omitempty"` +} + +type WSHealthCheckSpec struct { + Port int `json:"port,omitempty"` + URI string `json:"uri,omitempty"` + Headers map[string]string `json:"headers,omitempty"` + Match *HealthCheckMatch `json:"match,omitempty"` +} + +func (spec *WSProxyHealthCheckSpec) Validate() error { + if spec.HTTP == nil && spec.WS == nil { + return fmt.Errorf("empty health check spec") + } + if spec.HTTP != nil { + err := spec.HTTP.Validate() + if err != nil { + return err + } + } + if spec.WS != nil { + ws := spec.WS + if ws.Port < 0 { + return fmt.Errorf("invalid port: %d", ws.Port) + } + if ws.URI != "" { + _, err := url.Parse(ws.URI) + if err != nil { + return err + } + } + if ws.Match != nil { + err := ws.Match.Validate() + if err != nil { + return err + } + } + } + return nil +} + +type wsHealthChecker struct { + spec *WSProxyHealthCheckSpec + + httpHealthChecker proxies.HealthChecker + + uri *url.URL + dialer *websocket.Dialer +} + +// BaseSpec returns the base spec. +func (ws *wsHealthChecker) BaseSpec() proxies.HealthCheckSpec { + return ws.spec.HealthCheckSpec +} + +// Check checks the health of the server. +func (ws *wsHealthChecker) Check(server *proxies.Server) bool { + var res bool + if ws.httpHealthChecker != nil { + res = ws.httpHealthChecker.Check(server) + if !res { + return false + } + } + if ws.dialer != nil { + header := http.Header{} + for k, v := range ws.spec.WS.Headers { + header.Set(k, v) + } + conn, resp, err := ws.dialer.Dial(getURL(server, ws.uri, ws.spec.WS.Port, true), header) + if err != nil { + return false + } + defer func() { + conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + conn.Close() + }() + defer resp.Body.Close() + err = ws.spec.WS.Match.Match(resp) + if err != nil { + logger.Warnf("health check %s failed: %v", server.URL, err) + return false + } + } + return true +} + +// Close closes the health checker. +func (ws *wsHealthChecker) Close() {} + +func NewWebSocketHealthChecker(spec *WSProxyHealthCheckSpec) proxies.HealthChecker { + if spec == nil { + return nil + } + + res := &wsHealthChecker{spec: spec} + if spec.HTTP != nil { + httpSpec := &ProxyHealthCheckSpec{ + HealthCheckSpec: spec.HealthCheckSpec, + HTTPHealthCheckSpec: *spec.HTTP, + } + res.httpHealthChecker = NewHTTPHealthChecker(nil, httpSpec) + } + if spec.WS == nil { + return res + } + + ws := spec.WS + dialer := &websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: spec.GetTimeout(), + } + res.dialer = dialer + + if ws.URI != "" { + uri, _ := url.Parse(ws.URI) + res.uri = uri + } + if ws.Match != nil { + if len(ws.Match.StatusCodes) == 0 { + // default status code for websocket is 101 + ws.Match.StatusCodes = [][]int{{101, 101}} + } + for i := range ws.Match.Headers { + h := &ws.Match.Headers[i] + if h.Type == regexpType { + h.re = regexp.MustCompile(h.Value) + } + } + if ws.Match.Body != nil { + if ws.Match.Body.Type == regexpType { + ws.Match.Body.re = regexp.MustCompile(ws.Match.Body.Value) + } + } + } else { + ws.Match = &HealthCheckMatch{ + StatusCodes: [][]int{{101, 101}}, + } + } + return res +} + +func getURL(server *proxies.Server, uri *url.URL, port int, ws bool) string { + serverURL, err := url.Parse(server.URL) + if err != nil { + return server.URL + } + target := &url.URL{} + if uri != nil { + target.Host = serverURL.Host + target.Scheme = serverURL.Scheme + target.Path = uri.Path + target.RawQuery = uri.RawQuery + } else { + target = serverURL + } + if port != 0 { + target.Host = fmt.Sprintf("%s:%d", target.Hostname(), port) + } + if ws { + if target.Scheme == "http" { + target.Scheme = "ws" + } else if target.Scheme == "https" { + target.Scheme = "wss" + } + } else { + if target.Scheme == "ws" { + target.Scheme = "http" + } else if target.Scheme == "wss" { + target.Scheme = "https" + } + } + return target.String() +} diff --git a/pkg/filters/proxies/httpproxy/healthcheck_test.go b/pkg/filters/proxies/httpproxy/healthcheck_test.go new file mode 100644 index 0000000000..5f68767a23 --- /dev/null +++ b/pkg/filters/proxies/httpproxy/healthcheck_test.go @@ -0,0 +1,630 @@ +/* + * Copyright (c) 2017, MegaEase + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package httpproxy + +import ( + "context" + "fmt" + "io" + "net/http" + "sync/atomic" + "testing" + "time" + + "github.com/gorilla/websocket" + "github.com/megaease/easegress/v2/pkg/filters/proxies" + "github.com/stretchr/testify/assert" +) + +func TestHTTPHealthCheckSpec(t *testing.T) { + testCases := []struct { + spec *ProxyHealthCheckSpec + failed bool + msg string + }{ + { + spec: &ProxyHealthCheckSpec{}, + failed: false, + }, + { + spec: &ProxyHealthCheckSpec{ + HTTPHealthCheckSpec: HTTPHealthCheckSpec{ + URI: "::::::::", + }, + }, + failed: true, + }, + { + spec: &ProxyHealthCheckSpec{ + HTTPHealthCheckSpec: HTTPHealthCheckSpec{ + Method: "not-a-method", + }, + }, + failed: true, + msg: "invalid method", + }, + { + spec: &ProxyHealthCheckSpec{ + HTTPHealthCheckSpec: HTTPHealthCheckSpec{ + Username: "123", + }, + }, + failed: true, + msg: "empty password", + }, + { + spec: &ProxyHealthCheckSpec{ + HTTPHealthCheckSpec: HTTPHealthCheckSpec{ + Password: "123", + }, + }, + failed: true, + msg: "empty username", + }, + { + spec: &ProxyHealthCheckSpec{ + HTTPHealthCheckSpec: HTTPHealthCheckSpec{ + Port: -100, + }, + }, + failed: true, + msg: "invalid port", + }, + { + spec: &ProxyHealthCheckSpec{ + HTTPHealthCheckSpec: HTTPHealthCheckSpec{ + Match: &HealthCheckMatch{ + StatusCodes: [][]int{{2, 1}}, + }, + }, + }, + failed: true, + msg: "invalid status code range", + }, + { + spec: &ProxyHealthCheckSpec{ + HTTPHealthCheckSpec: HTTPHealthCheckSpec{ + Match: &HealthCheckMatch{ + StatusCodes: [][]int{{1, 2, 3}}, + }, + }, + }, + failed: true, + msg: "invalid status code range", + }, + { + spec: &ProxyHealthCheckSpec{ + HTTPHealthCheckSpec: HTTPHealthCheckSpec{ + Match: &HealthCheckMatch{ + Headers: []HealthCheckHeaderMatch{ + {Name: ""}, + }, + }, + }, + }, + failed: true, + msg: "empty match header name", + }, + { + spec: &ProxyHealthCheckSpec{ + HTTPHealthCheckSpec: HTTPHealthCheckSpec{ + Match: &HealthCheckMatch{ + Headers: []HealthCheckHeaderMatch{ + {Name: "X-Test", Type: "not-a-type"}, + }, + }, + }, + }, + failed: true, + msg: "invalid match header type", + }, + { + spec: &ProxyHealthCheckSpec{ + HTTPHealthCheckSpec: HTTPHealthCheckSpec{ + Match: &HealthCheckMatch{ + Headers: []HealthCheckHeaderMatch{ + {Name: "X-Test", Type: "regexp", Value: "["}, + }, + }, + }, + }, + failed: true, + msg: "invalid match header regex", + }, + { + spec: &ProxyHealthCheckSpec{ + HTTPHealthCheckSpec: HTTPHealthCheckSpec{ + Match: &HealthCheckMatch{ + Body: &HealthCheckBodyMatch{ + Type: "not-a-type", + Value: "123", + }, + }, + }, + }, + failed: true, + msg: "invalid match body type", + }, + { + spec: &ProxyHealthCheckSpec{ + HTTPHealthCheckSpec: HTTPHealthCheckSpec{ + Match: &HealthCheckMatch{ + Body: &HealthCheckBodyMatch{ + Type: "regexp", + Value: "[", + }, + }, + }, + }, + failed: true, + msg: "invalid match body regex", + }, + { + spec: &ProxyHealthCheckSpec{ + HTTPHealthCheckSpec: HTTPHealthCheckSpec{ + Port: 10080, + URI: "/healthz", + Method: "GET", + Headers: map[string]string{ + "X-Test": "easegress", + }, + Body: "easegress", + Username: "admin", + Password: "test", + Match: &HealthCheckMatch{ + StatusCodes: [][]int{{200, 300}}, + Headers: []HealthCheckHeaderMatch{ + {Name: "X-Test", Type: "exact", Value: "easegress"}, + {Name: "X-Test-Re", Type: "regexp", Value: ".*"}, + }, + Body: &HealthCheckBodyMatch{ + Value: ".*", + Type: "regexp", + }, + }, + }, + }, + failed: false, + }, + } + for _, tc := range testCases { + err := tc.spec.Validate() + if tc.failed { + assert.NotNil(t, err, tc) + assert.Contains(t, err.Error(), tc.msg, tc) + } else { + assert.Nil(t, err, tc) + } + } + + hc := NewHTTPHealthChecker(nil, &ProxyHealthCheckSpec{}).(*httpHealthChecker) + expected := &ProxyHealthCheckSpec{ + HTTPHealthCheckSpec: HTTPHealthCheckSpec{ + Method: "GET", + Match: &HealthCheckMatch{ + StatusCodes: [][]int{{200, 399}}, + }, + }, + } + assert.Equal(t, expected, hc.spec, "default spec") +} + +func startServer(port int, handler http.Handler, checkReq func() *http.Request) (*http.Server, error) { + server := &http.Server{ + Addr: fmt.Sprintf(":%v", port), + Handler: handler, + } + go server.ListenAndServe() + for i := 0; i < 100; i++ { + req := checkReq() + resp, err := http.DefaultClient.Do(req) + if err == nil && resp.StatusCode == http.StatusOK { + resp.Body.Close() + return server, nil + } + time.Sleep(100 * time.Millisecond) + } + return nil, fmt.Errorf("failed to start server") +} + +func TestHTTPHealthCheck(t *testing.T) { + assert := assert.New(t) + + handleFunc := atomic.Value{} + handleFunc.Store(func(w http.ResponseWriter, r *http.Request) {}) + mux := http.ServeMux{} + mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + handleFunc.Load().(func(w http.ResponseWriter, r *http.Request))(w, r) + }) + server, err := startServer(10080, &mux, func() *http.Request { + req, _ := http.NewRequest("GET", "http://127.0.0.1:10080/healthz", nil) + return req + }) + assert.Nil(err) + defer server.Shutdown(context.Background()) + + type respConfig struct { + header map[string]string + status int + body string + } + + respHandler := func(w http.ResponseWriter, cfg respConfig) { + for k, v := range cfg.header { + w.Header().Set(k, v) + } + w.WriteHeader(cfg.status) + w.Write([]byte(cfg.body)) + } + + { + spec := &ProxyHealthCheckSpec{ + HTTPHealthCheckSpec: HTTPHealthCheckSpec{ + Port: 10080, + URI: "/healthz?test=easegress", + Method: http.MethodPost, + Headers: map[string]string{ + "X-Test": "easegress", + }, + Body: "easegress", + Username: "admin", + Password: "test", + Match: &HealthCheckMatch{ + StatusCodes: [][]int{{200, 299}, {400, 499}}, + Headers: []HealthCheckHeaderMatch{ + {Name: "H-One", Type: "exact", Value: "V-One"}, + {Name: "H-Prefix", Type: "regexp", Value: "^V-"}, + }, + Body: &HealthCheckBodyMatch{ + Value: "success", + Type: "contains", + }, + }, + }, + } + succConfig := func() respConfig { + return respConfig{ + header: map[string]string{ + "H-One": "V-One", + "H-Prefix": "V-Two", + }, + status: http.StatusOK, + body: "success", + } + } + + hc := NewHTTPHealthChecker(nil, spec).(*httpHealthChecker) + handleFunc.Store(func(w http.ResponseWriter, r *http.Request) { + assert.Equal("POST", r.Method) + assert.Equal("/healthz", r.URL.Path) + assert.Equal("test=easegress", r.URL.RawQuery) + assert.Equal("easegress", r.Header.Get("X-Test")) + + username, password, ok := r.BasicAuth() + assert.True(ok) + assert.Equal("admin", username) + assert.Equal("test", password) + + body, err := io.ReadAll(r.Body) + assert.Nil(err) + assert.Equal("easegress", string(body)) + + respHandler(w, succConfig()) + }) + s := &proxies.Server{URL: "http://127.0.0.1:8080?test=megaease"} + assert.True(hc.Check(s)) + + // wrong status code + handleFunc.Store(func(w http.ResponseWriter, r *http.Request) { + cfg := succConfig() + cfg.status = http.StatusFound + respHandler(w, cfg) + }) + assert.False(hc.Check(s)) + + // wrong header + handleFunc.Store(func(w http.ResponseWriter, r *http.Request) { + cfg := succConfig() + cfg.header["H-One"] = "V-Two" + respHandler(w, cfg) + }) + assert.False(hc.Check(s)) + + handleFunc.Store(func(w http.ResponseWriter, r *http.Request) { + cfg := succConfig() + cfg.header["H-Prefix"] = "wrong" + respHandler(w, cfg) + }) + assert.False(hc.Check(s)) + + // wrong body + handleFunc.Store(func(w http.ResponseWriter, r *http.Request) { + cfg := succConfig() + cfg.body = "failed!!!" + respHandler(w, cfg) + }) + assert.False(hc.Check(s)) + + handleFunc.Store(func(w http.ResponseWriter, r *http.Request) { + cfg := succConfig() + cfg.status = http.StatusNotFound + cfg.body = "big success" + respHandler(w, cfg) + }) + assert.True(hc.Check(s)) + } +} + +func TestWebSocketHealthCheckSpec(t *testing.T) { + testCases := []struct { + spec *WSProxyHealthCheckSpec + failed bool + msg string + }{ + { + spec: &WSProxyHealthCheckSpec{}, + failed: true, + msg: "empty health check spec", + }, + { + spec: &WSProxyHealthCheckSpec{ + HTTP: &HTTPHealthCheckSpec{}, + WS: &WSHealthCheckSpec{}, + }, + failed: false, + }, + { + spec: &WSProxyHealthCheckSpec{ + HTTP: &HTTPHealthCheckSpec{ + Method: "not-a-method", + }, + }, + failed: true, + msg: "invalid method", + }, + { + spec: &WSProxyHealthCheckSpec{ + WS: &WSHealthCheckSpec{ + Port: -100, + }, + }, + failed: true, + msg: "invalid port", + }, + { + spec: &WSProxyHealthCheckSpec{ + WS: &WSHealthCheckSpec{ + URI: "::::::", + }, + }, + failed: true, + }, + { + spec: &WSProxyHealthCheckSpec{ + WS: &WSHealthCheckSpec{ + Match: &HealthCheckMatch{ + StatusCodes: [][]int{{2, 1}}, + }, + }, + }, + failed: true, + msg: "invalid status code range", + }, + { + spec: &WSProxyHealthCheckSpec{ + WS: &WSHealthCheckSpec{ + Match: &HealthCheckMatch{ + StatusCodes: [][]int{{1, 2, 3}}, + }, + }, + }, + failed: true, + msg: "invalid status code range", + }, + { + spec: &WSProxyHealthCheckSpec{ + WS: &WSHealthCheckSpec{ + Match: &HealthCheckMatch{ + Headers: []HealthCheckHeaderMatch{ + {Name: ""}, + }, + }, + }, + }, + failed: true, + msg: "empty match header name", + }, + { + spec: &WSProxyHealthCheckSpec{ + WS: &WSHealthCheckSpec{ + Match: &HealthCheckMatch{ + Headers: []HealthCheckHeaderMatch{ + {Name: "X-Test", Type: "not-a-type"}, + }, + }, + }, + }, + failed: true, + msg: "invalid match header type", + }, + { + spec: &WSProxyHealthCheckSpec{ + WS: &WSHealthCheckSpec{ + Match: &HealthCheckMatch{ + Headers: []HealthCheckHeaderMatch{ + {Name: "X-Test", Type: "regexp", Value: "["}, + }, + }, + }, + }, + failed: true, + msg: "invalid match header regex", + }, + { + spec: &WSProxyHealthCheckSpec{ + WS: &WSHealthCheckSpec{ + Match: &HealthCheckMatch{ + Body: &HealthCheckBodyMatch{ + Type: "not-a-type", + Value: "123", + }, + }, + }, + }, + failed: true, + msg: "invalid match body type", + }, + { + spec: &WSProxyHealthCheckSpec{ + WS: &WSHealthCheckSpec{ + Match: &HealthCheckMatch{ + Body: &HealthCheckBodyMatch{ + Type: "regexp", + Value: "[", + }, + }, + }, + }, + failed: true, + msg: "invalid match body regex", + }, + } + for _, tc := range testCases { + err := tc.spec.Validate() + if tc.failed { + assert.NotNil(t, err, tc) + assert.Contains(t, err.Error(), tc.msg, tc) + } else { + assert.Nil(t, err, tc) + } + } + + hc := NewWebSocketHealthChecker(&WSProxyHealthCheckSpec{ + WS: &WSHealthCheckSpec{}, + }).(*wsHealthChecker) + + expected := &WSProxyHealthCheckSpec{ + WS: &WSHealthCheckSpec{ + Match: &HealthCheckMatch{ + StatusCodes: [][]int{{101, 101}}, + }, + }, + } + assert.Equal(t, expected, hc.spec, "default spec") +} + +func TestWebSocketHealthCheck(t *testing.T) { + assert := assert.New(t) + + mux := http.ServeMux{} + + httpHandler := atomic.Value{} + httpHandler.Store(func(w http.ResponseWriter, r *http.Request) {}) + mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + httpHandler.Load().(func(w http.ResponseWriter, r *http.Request))(w, r) + }) + + wsHandler := atomic.Value{} + wsHandler.Store(func(w http.ResponseWriter, r *http.Request) {}) + mux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { + wsHandler.Load().(func(w http.ResponseWriter, r *http.Request))(w, r) + }) + + server, err := startServer(10080, &mux, func() *http.Request { + req, _ := http.NewRequest("GET", "http://127.0.0.1:10080/healthz", nil) + return req + }) + assert.Nil(err) + defer server.Shutdown(context.Background()) + + upgrader := websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + } + + { + spec := &WSProxyHealthCheckSpec{ + HTTP: &HTTPHealthCheckSpec{ + Port: 10080, + URI: "/healthz", + Match: &HealthCheckMatch{ + StatusCodes: [][]int{{200, 299}, {400, 499}}, + }, + }, + WS: &WSHealthCheckSpec{ + Port: 10080, + URI: "/ws", + Headers: map[string]string{ + "X-Test": "easegress", + }, + Match: &HealthCheckMatch{ + Headers: []HealthCheckHeaderMatch{ + {Name: "H-One", Type: "exact", Value: "V-One"}, + {Name: "H-Prefix", Type: "regexp", Value: "^V-"}, + }, + }, + }, + } + + hc := NewWebSocketHealthChecker(spec).(*wsHealthChecker) + + httpHandler.Store(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }) + wsHandler.Store(func(w http.ResponseWriter, r *http.Request) { + assert.Equal("easegress", r.Header.Get("X-Test")) + header := http.Header{} + header.Set("H-One", "V-One") + header.Set("H-Prefix", "V-Two") + conn, err := upgrader.Upgrade(w, r, header) + assert.Nil(err) + defer conn.Close() + }) + s := &proxies.Server{URL: "http://127.0.0.1:8080?test=megaease"} + assert.True(hc.Check(s)) + + s = &proxies.Server{URL: "ws://127.0.0.1:8080?test=megaease"} + assert.True(hc.Check(s)) + + // wrong http status code + httpHandler.Store(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusFound) + }) + assert.False(hc.Check(s)) + + // wrong websocket header + httpHandler.Store(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }) + wsHandler.Store(func(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + assert.Nil(err) + defer conn.Close() + }) + assert.False(hc.Check(s)) + + // wrong websocket status code + wsHandler.Store(func(w http.ResponseWriter, r *http.Request) { + header := http.Header{} + header.Set("H-One", "V-One") + header.Set("H-Prefix", "V-Two") + w.WriteHeader(http.StatusOK) + }) + assert.False(hc.Check(s)) + } +} diff --git a/pkg/filters/proxies/httpproxy/pool.go b/pkg/filters/proxies/httpproxy/pool.go index 1a293c8926..e3ee7b7f3f 100644 --- a/pkg/filters/proxies/httpproxy/pool.go +++ b/pkg/filters/proxies/httpproxy/pool.go @@ -41,6 +41,18 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +var httpMethods = map[string]struct{}{ + http.MethodGet: {}, + http.MethodHead: {}, + http.MethodPost: {}, + http.MethodPut: {}, + http.MethodPatch: {}, + http.MethodDelete: {}, + http.MethodConnect: {}, + http.MethodOptions: {}, + http.MethodTrace: {}, +} + // serverPoolError is the error returned by handler function of // a server pool. type serverPoolError struct { @@ -172,27 +184,42 @@ type ServerPool struct { retryWrapper resilience.Wrapper circuitBreakerWrapper resilience.Wrapper - httpStat *httpstat.HTTPStat - memoryCache *MemoryCache - metrics *metrics + httpStat *httpstat.HTTPStat + memoryCache *MemoryCache + metrics *metrics + healthChecker proxies.HealthChecker } // ServerPoolSpec is the spec for a server pool. type ServerPoolSpec struct { BaseServerPoolSpec `json:",inline"` - Filter *RequestMatcherSpec `json:"filter,omitempty"` - SpanName string `json:"spanName,omitempty"` - ServerMaxBodySize int64 `json:"serverMaxBodySize,omitempty"` - Timeout string `json:"timeout,omitempty" jsonschema:"format=duration"` - RetryPolicy string `json:"retryPolicy,omitempty"` - CircuitBreakerPolicy string `json:"circuitBreakerPolicy,omitempty"` - MemoryCache *MemoryCacheSpec `json:"memoryCache,omitempty"` + Filter *RequestMatcherSpec `json:"filter,omitempty"` + SpanName string `json:"spanName,omitempty"` + ServerMaxBodySize int64 `json:"serverMaxBodySize,omitempty"` + Timeout string `json:"timeout,omitempty" jsonschema:"format=duration"` + RetryPolicy string `json:"retryPolicy,omitempty"` + CircuitBreakerPolicy string `json:"circuitBreakerPolicy,omitempty"` + MemoryCache *MemoryCacheSpec `json:"memoryCache,omitempty"` + HealthCheck *ProxyHealthCheckSpec `json:"healthCheck,omitempty"` // FailureCodes would be 5xx if it isn't assigned any value. FailureCodes []int `json:"failureCodes,omitempty" jsonschema:"uniqueItems=true"` } +func (spec *ServerPoolSpec) Validate() error { + if err := spec.BaseServerPoolSpec.Validate(); err != nil { + return err + } + if spec.ServiceName != "" && spec.HealthCheck != nil { + return fmt.Errorf("serviceName and healthCheck can't be set at the same time") + } + if spec.HealthCheck != nil { + return spec.HealthCheck.Validate() + } + return nil +} + // ServerPoolStatus is the status of Pool. type ServerPoolStatus struct { Stat *httpstat.Status `json:"stat"` @@ -200,10 +227,18 @@ type ServerPoolStatus struct { // NewServerPool creates a new server pool according to spec. func NewServerPool(proxy *Proxy, spec *ServerPoolSpec, name string) *ServerPool { + tlsConfig, _ := proxy.tlsConfig() + // backward compatibility, if healthCheck is not set, but loadBalance's healthCheck is set, use it. + if spec.HealthCheck == nil && spec.LoadBalance != nil && spec.LoadBalance.HealthCheck != nil { + spec.HealthCheck = &ProxyHealthCheckSpec{ + HealthCheckSpec: *spec.LoadBalance.HealthCheck, + } + } sp := &ServerPool{ - proxy: proxy, - spec: spec, - httpStat: httpstat.New(), + proxy: proxy, + spec: spec, + httpStat: httpstat.New(), + healthChecker: NewHTTPHealthChecker(tlsConfig, spec.HealthCheck), } if spec.Filter != nil { sp.filter = NewRequestMatcher(spec.Filter) @@ -231,7 +266,7 @@ func NewServerPool(proxy *Proxy, spec *ServerPoolSpec, name string) *ServerPool // CreateLoadBalancer creates a load balancer according to spec. func (sp *ServerPool) CreateLoadBalancer(spec *LoadBalanceSpec, servers []*Server) LoadBalancer { lb := proxies.NewGeneralLoadBalancer(spec, servers) - lb.Init(proxies.NewHTTPSessionSticker, proxies.NewHTTPHealthChecker, nil) + lb.Init(proxies.NewHTTPSessionSticker, sp.healthChecker, nil) return lb } diff --git a/pkg/filters/proxies/httpproxy/pool_test.go b/pkg/filters/proxies/httpproxy/pool_test.go index 125e5516fe..64ebc1b38e 100644 --- a/pkg/filters/proxies/httpproxy/pool_test.go +++ b/pkg/filters/proxies/httpproxy/pool_test.go @@ -91,7 +91,7 @@ servers: assert.NoError(err) assert.NoError(spec.Validate()) - p := &Proxy{} + p := kind.CreateInstance(kind.DefaultSpec()).(*Proxy) p.super = supervisor.NewMock(option.New(), nil, nil, nil, false, nil, nil) sp := NewServerPool(p, spec, "test") @@ -133,7 +133,7 @@ servers: assert.NoError(err) assert.NoError(spec.Validate()) - p := &Proxy{} + p := kind.CreateInstance(kind.DefaultSpec()).(*Proxy) p.super = supervisor.NewMock(option.New(), nil, nil, nil, false, nil, nil) sp := NewServerPool(p, spec, "test") @@ -176,7 +176,7 @@ func TestCopyCORSHeaders(t *testing.T) { src.Add("X-Src", "src") dst.Add("X-Dst", "dst") - p := &Proxy{} + p := kind.CreateInstance(kind.DefaultSpec()).(*Proxy) p.super = supervisor.NewMock(option.New(), nil, nil, nil, false, nil, nil) sp := NewServerPool(p, &ServerPoolSpec{}, "test") diff --git a/pkg/filters/proxies/httpproxy/wspool.go b/pkg/filters/proxies/httpproxy/wspool.go index 883d0a8449..c240bfc32a 100644 --- a/pkg/filters/proxies/httpproxy/wspool.go +++ b/pkg/filters/proxies/httpproxy/wspool.go @@ -38,10 +38,11 @@ import ( type WebSocketServerPool struct { BaseServerPool - filter RequestMatcher - proxy *WebSocketProxy - spec *WebSocketServerPoolSpec - httpStat *httpstat.HTTPStat + filter RequestMatcher + proxy *WebSocketProxy + spec *WebSocketServerPoolSpec + httpStat *httpstat.HTTPStat + healthChecker proxies.HealthChecker } // WebSocketServerPoolSpec is the spec for a server pool. @@ -52,14 +53,17 @@ type WebSocketServerPoolSpec struct { Filter *RequestMatcherSpec `json:"filter,omitempty"` InsecureSkipVerify bool `json:"insecureSkipVerify,omitempty"` OriginPatterns []string `json:"originPatterns,omitempty"` + + HealthCheck *WSProxyHealthCheckSpec `json:"healthCheck,omitempty"` } // NewWebSocketServerPool creates a new server pool according to spec. func NewWebSocketServerPool(proxy *WebSocketProxy, spec *WebSocketServerPoolSpec, name string) *WebSocketServerPool { sp := &WebSocketServerPool{ - proxy: proxy, - spec: spec, - httpStat: httpstat.New(), + proxy: proxy, + spec: spec, + httpStat: httpstat.New(), + healthChecker: NewWebSocketHealthChecker(spec.HealthCheck), } if spec.Filter != nil { sp.filter = NewRequestMatcher(spec.Filter) @@ -71,7 +75,7 @@ func NewWebSocketServerPool(proxy *WebSocketProxy, spec *WebSocketServerPoolSpec // CreateLoadBalancer creates a load balancer according to spec. func (sp *WebSocketServerPool) CreateLoadBalancer(spec *LoadBalanceSpec, servers []*Server) LoadBalancer { lb := proxies.NewGeneralLoadBalancer(spec, servers) - lb.Init(proxies.NewHTTPSessionSticker, proxies.NewHTTPHealthChecker, nil) + lb.Init(proxies.NewHTTPSessionSticker, sp.healthChecker, nil) return lb } diff --git a/pkg/filters/proxies/loadbalance.go b/pkg/filters/proxies/loadbalance.go index f85d7f6a07..e800aed488 100644 --- a/pkg/filters/proxies/loadbalance.go +++ b/pkg/filters/proxies/loadbalance.go @@ -61,7 +61,9 @@ type LoadBalanceSpec struct { HeaderHashKey string `json:"headerHashKey,omitempty"` ForwardKey string `json:"forwardKey,omitempty"` StickySession *StickySessionSpec `json:"stickySession,omitempty"` - HealthCheck *HealthCheckSpec `json:"healthCheck,omitempty"` + // Deprecated: HealthCheck is protocol related. It should be moved to protocol spec. + // This one is kept for backward compatibility. + HealthCheck *HealthCheckSpec `json:"healthCheck,omitempty"` } // LoadBalancePolicy is the interface of a load balance policy. @@ -77,9 +79,10 @@ type GeneralLoadBalancer struct { done chan struct{} - lbp LoadBalancePolicy - ss SessionSticker - hc HealthChecker + lbp LoadBalancePolicy + ss SessionSticker + hc HealthChecker + hcSpec *HealthCheckSpec } // NewGeneralLoadBalancer creates a new GeneralLoadBalancer. @@ -95,7 +98,7 @@ func NewGeneralLoadBalancer(spec *LoadBalanceSpec, servers []*Server) *GeneralLo // Init initializes the load balancer. func (glb *GeneralLoadBalancer) Init( fnNewSessionSticker func(*StickySessionSpec) SessionSticker, - fnNewHealthChecker func(*HealthCheckSpec) HealthChecker, + hc HealthChecker, lbp LoadBalancePolicy, ) { // load balance policy @@ -127,28 +130,23 @@ func (glb *GeneralLoadBalancer) Init( glb.ss = ss } - // health check - if glb.spec.HealthCheck == nil { + if hc == nil { return } - if glb.spec.HealthCheck.Fails <= 0 { - glb.spec.HealthCheck.Fails = 1 + spec := hc.BaseSpec() + if spec.Fails <= 0 { + spec.Fails = 1 } - - if glb.spec.HealthCheck.Passes <= 0 { - glb.spec.HealthCheck.Passes = 1 - } - - glb.hc = fnNewHealthChecker(glb.spec.HealthCheck) - - interval, _ := time.ParseDuration(glb.spec.HealthCheck.Interval) - if interval <= 0 { - interval = time.Minute + if spec.Passes <= 0 { + spec.Passes = 1 } + glb.hc = hc + glb.hcSpec = &spec - ticker := time.NewTicker(interval) + ticker := time.NewTicker(spec.GetInterval()) glb.done = make(chan struct{}) + glb.checkServers() go func() { for { select { @@ -173,7 +171,7 @@ func (glb *GeneralLoadBalancer) checkServers() { svr.HealthCounter = 0 } svr.HealthCounter++ - if svr.Unhealth && svr.HealthCounter >= glb.spec.HealthCheck.Passes { + if svr.Unhealth && svr.HealthCounter >= glb.hcSpec.Passes { logger.Warnf("server:%v becomes healthy.", svr.ID()) svr.Unhealth = false changed = true @@ -183,7 +181,7 @@ func (glb *GeneralLoadBalancer) checkServers() { svr.HealthCounter = 0 } svr.HealthCounter-- - if svr.Healthy() && svr.HealthCounter <= -glb.spec.HealthCheck.Fails { + if svr.Healthy() && svr.HealthCounter <= -glb.hcSpec.Fails { logger.Warnf("server:%v becomes unhealthy.", svr.ID()) svr.Unhealth = true changed = true diff --git a/pkg/filters/proxies/loadbalance_test.go b/pkg/filters/proxies/loadbalance_test.go index 25ea3f0904..9da4b5dca3 100644 --- a/pkg/filters/proxies/loadbalance_test.go +++ b/pkg/filters/proxies/loadbalance_test.go @@ -61,9 +61,8 @@ func TestGeneralLoadBalancer(t *testing.T) { lb := NewGeneralLoadBalancer(spec, servers) wg := &sync.WaitGroup{} wg.Add(serverCount) - lb.Init(NewHTTPSessionSticker, func(hcs *HealthCheckSpec) HealthChecker { - return &MockHealthChecker{expect: int32(serverCount), wg: wg, result: false} - }, nil) + hc := &MockHealthChecker{Expect: int32(serverCount), WG: wg, Result: false} + lb.Init(NewHTTPSessionSticker, hc, nil) wg.Wait() time.Sleep(20 * time.Millisecond) assert.Equal(t, len(lb.healthyServers.Load().Servers), 0) @@ -74,9 +73,8 @@ func TestGeneralLoadBalancer(t *testing.T) { lb = NewGeneralLoadBalancer(spec, servers) wg.Add(serverCount) - lb.Init(NewHTTPSessionSticker, func(hcs *HealthCheckSpec) HealthChecker { - return &MockHealthChecker{expect: int32(serverCount), wg: wg, result: true} - }, nil) + hc = &MockHealthChecker{Expect: int32(serverCount), WG: wg, Result: true} + lb.Init(NewHTTPSessionSticker, hc, nil) wg.Wait() time.Sleep(20 * time.Millisecond) assert.Equal(t, len(lb.healthyServers.Load().Servers), 10) diff --git a/pkg/filters/proxies/serverpool_test.go b/pkg/filters/proxies/serverpool_test.go index 303aa19511..048202de58 100644 --- a/pkg/filters/proxies/serverpool_test.go +++ b/pkg/filters/proxies/serverpool_test.go @@ -53,7 +53,7 @@ type MockServerPoolImpl struct { func (m *MockServerPoolImpl) CreateLoadBalancer(spec *LoadBalanceSpec, servers []*Server) LoadBalancer { lb := NewGeneralLoadBalancer(spec, servers) - lb.Init(NewHTTPSessionSticker, NewHTTPHealthChecker, nil) + lb.Init(NewHTTPSessionSticker, nil, nil) return lb } diff --git a/pkg/filters/proxies/stickysession_test.go b/pkg/filters/proxies/stickysession_test.go index 9f09b7c96e..a246eeb3fa 100644 --- a/pkg/filters/proxies/stickysession_test.go +++ b/pkg/filters/proxies/stickysession_test.go @@ -47,7 +47,7 @@ func TestStickySession_ConsistentHash(t *testing.T) { } lb := NewGeneralLoadBalancer(spec, servers) - lb.Init(NewHTTPSessionSticker, NewHTTPHealthChecker, nil) + lb.Init(NewHTTPSessionSticker, nil, nil) req := &http.Request{Header: http.Header{}} req.AddCookie(&http.Cookie{Name: "AppCookie", Value: "abcd-1"}) @@ -72,7 +72,7 @@ func TestStickySession_DurationBased(t *testing.T) { } lb := NewGeneralLoadBalancer(spec, servers) - lb.Init(NewHTTPSessionSticker, NewHTTPHealthChecker, nil) + lb.Init(NewHTTPSessionSticker, nil, nil) r, _ := httpprot.NewRequest(&http.Request{Header: http.Header{}}) svr1 := lb.ChooseServer(r) @@ -106,7 +106,7 @@ func TestStickySession_ApplicationBased(t *testing.T) { }, } lb := NewGeneralLoadBalancer(spec, servers) - lb.Init(NewHTTPSessionSticker, NewHTTPHealthChecker, nil) + lb.Init(NewHTTPSessionSticker, nil, nil) r, _ := httpprot.NewRequest(&http.Request{Header: http.Header{}}) svr1 := lb.ChooseServer(r)