From 3ae6e185b61af0ace7307f7807176252c195a830 Mon Sep 17 00:00:00 2001 From: Kalaiselvim <117940852+Kalaiselvi84@users.noreply.github.com> Date: Tue, 23 Apr 2024 23:53:50 +0000 Subject: [PATCH] Configure Allocator Status Code (#3782) * Configure Allocator Status Code * Flags added in cmd/main.go --- cmd/allocator/main.go | 77 +++++++++++++++---- .../agones/templates/service/allocation.yaml | 2 + install/helm/agones/values.yaml | 1 + install/yaml/install.yaml | 2 + pkg/allocation/converters/converter.go | 20 ++--- pkg/allocation/converters/converter_test.go | 57 +++++++++++--- 6 files changed, 122 insertions(+), 37 deletions(-) diff --git a/cmd/allocator/main.go b/cmd/allocator/main.go index 97e56b894e..3c22d9b813 100644 --- a/cmd/allocator/main.go +++ b/cmd/allocator/main.go @@ -26,17 +26,6 @@ import ( "sync" "time" - "agones.dev/agones/pkg" - "agones.dev/agones/pkg/allocation/converters" - pb "agones.dev/agones/pkg/allocation/go" - allocationv1 "agones.dev/agones/pkg/apis/allocation/v1" - "agones.dev/agones/pkg/client/clientset/versioned" - "agones.dev/agones/pkg/client/informers/externalversions" - "agones.dev/agones/pkg/gameserverallocations" - "agones.dev/agones/pkg/gameservers" - "agones.dev/agones/pkg/util/fswatch" - "agones.dev/agones/pkg/util/runtime" - "agones.dev/agones/pkg/util/signals" "github.com/heptiolabs/healthcheck" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -53,6 +42,18 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + + "agones.dev/agones/pkg" + "agones.dev/agones/pkg/allocation/converters" + pb "agones.dev/agones/pkg/allocation/go" + allocationv1 "agones.dev/agones/pkg/apis/allocation/v1" + "agones.dev/agones/pkg/client/clientset/versioned" + "agones.dev/agones/pkg/client/informers/externalversions" + "agones.dev/agones/pkg/gameserverallocations" + "agones.dev/agones/pkg/gameservers" + "agones.dev/agones/pkg/util/fswatch" + "agones.dev/agones/pkg/util/runtime" + "agones.dev/agones/pkg/util/signals" ) var ( @@ -81,6 +82,7 @@ const ( logLevelFlag = "log-level" allocationBatchWaitTime = "allocation-batch-wait-time" readinessShutdownDuration = "readiness-shutdown-duration" + httpUnallocatedStatusCode = "http-unallocated-status-code" ) func parseEnvFlags() config { @@ -98,6 +100,7 @@ func parseEnvFlags() config { viper.SetDefault(totalRemoteAllocationTimeoutFlag, 30*time.Second) viper.SetDefault(logLevelFlag, "Info") viper.SetDefault(allocationBatchWaitTime, 500*time.Millisecond) + viper.SetDefault(httpUnallocatedStatusCode, http.StatusTooManyRequests) pflag.Int32(httpPortFlag, viper.GetInt32(httpPortFlag), "Port to listen on for REST requests") pflag.Int32(grpcPortFlag, viper.GetInt32(grpcPortFlag), "Port to listen on for gRPC requests") @@ -114,6 +117,7 @@ func parseEnvFlags() config { pflag.String(logLevelFlag, viper.GetString(logLevelFlag), "Agones Log level") pflag.Duration(allocationBatchWaitTime, viper.GetDuration(allocationBatchWaitTime), "Flag to configure the waiting period between allocations batches") pflag.Duration(readinessShutdownDuration, viper.GetDuration(readinessShutdownDuration), "Time in seconds for SIGTERM/SIGINT handler to sleep for.") + pflag.Int32(httpUnallocatedStatusCode, viper.GetInt32(httpUnallocatedStatusCode), "HTTP status code to return when no GameServer is available") runtime.FeaturesBindFlags() pflag.Parse() @@ -133,6 +137,7 @@ func parseEnvFlags() config { runtime.Must(viper.BindEnv(logLevelFlag)) runtime.Must(viper.BindEnv(allocationBatchWaitTime)) runtime.Must(viper.BindEnv(readinessShutdownDuration)) + runtime.Must(viper.BindEnv(httpUnallocatedStatusCode)) runtime.Must(viper.BindPFlags(pflag.CommandLine)) runtime.Must(runtime.FeaturesBindEnv()) @@ -154,6 +159,7 @@ func parseEnvFlags() config { totalRemoteAllocationTimeout: viper.GetDuration(totalRemoteAllocationTimeoutFlag), allocationBatchWaitTime: viper.GetDuration(allocationBatchWaitTime), ReadinessShutdownDuration: viper.GetDuration(readinessShutdownDuration), + httpUnallocatedStatusCode: int(viper.GetInt32(httpUnallocatedStatusCode)), } } @@ -173,6 +179,7 @@ type config struct { remoteAllocationTimeout time.Duration allocationBatchWaitTime time.Duration ReadinessShutdownDuration time.Duration + httpUnallocatedStatusCode int } // grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC @@ -244,7 +251,9 @@ func main() { os.Exit(0) }) - h := newServiceHandler(ctx, kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime) + grpcUnallocatedStatusCode := grpcCodeFromHTTPStatus(conf.httpUnallocatedStatusCode) + + h := newServiceHandler(ctx, kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime, grpcUnallocatedStatusCode) if !h.tlsDisabled { cancelTLS, err := fswatch.Watch(logger, tlsDir, time.Second, func() { @@ -378,7 +387,7 @@ func runGRPC(h *serviceHandler, grpcPort int) { }() } -func newServiceHandler(ctx context.Context, kubeClient kubernetes.Interface, agonesClient versioned.Interface, health healthcheck.Handler, mTLSDisabled bool, tlsDisabled bool, remoteAllocationTimeout time.Duration, totalRemoteAllocationTimeout time.Duration, allocationBatchWaitTime time.Duration) *serviceHandler { +func newServiceHandler(ctx context.Context, kubeClient kubernetes.Interface, agonesClient versioned.Interface, health healthcheck.Handler, mTLSDisabled bool, tlsDisabled bool, remoteAllocationTimeout time.Duration, totalRemoteAllocationTimeout time.Duration, allocationBatchWaitTime time.Duration, grpcUnallocatedStatusCode codes.Code) *serviceHandler { defaultResync := 30 * time.Second agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync) kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync) @@ -398,8 +407,9 @@ func newServiceHandler(ctx context.Context, kubeClient kubernetes.Interface, ago allocationCallback: func(gsa *allocationv1.GameServerAllocation) (k8sruntime.Object, error) { return allocator.Allocate(ctx, gsa) }, - mTLSDisabled: mTLSDisabled, - tlsDisabled: tlsDisabled, + mTLSDisabled: mTLSDisabled, + tlsDisabled: tlsDisabled, + grpcUnallocatedStatusCode: grpcUnallocatedStatusCode, } kubeInformerFactory.Start(ctx.Done()) @@ -599,6 +609,8 @@ type serviceHandler struct { mTLSDisabled bool tlsDisabled bool + + grpcUnallocatedStatusCode codes.Code } // Allocate implements the Allocate gRPC method definition @@ -621,8 +633,41 @@ func (h *serviceHandler) Allocate(ctx context.Context, in *pb.AllocationRequest) logger.Errorf("internal server error - Bad GSA format %v", resultObj) return nil, status.Errorf(codes.Internal, "internal server error- Bad GSA format %v", resultObj) } - response, err := converters.ConvertGSAToAllocationResponse(allocatedGsa) + response, err := converters.ConvertGSAToAllocationResponse(allocatedGsa, h.grpcUnallocatedStatusCode) logger.WithField("response", response).WithError(err).Infof("allocation response is being sent") return response, err } + +// grpcCodeFromHTTPStatus converts an HTTP status code to the corresponding gRPC status code. +func grpcCodeFromHTTPStatus(httpUnallocatedStatusCode int) codes.Code { + switch httpUnallocatedStatusCode { + case http.StatusOK: + return codes.OK + case 499: + return codes.Canceled + case http.StatusInternalServerError: + return codes.Internal + case http.StatusBadRequest: + return codes.InvalidArgument + case http.StatusGatewayTimeout: + return codes.DeadlineExceeded + case http.StatusNotFound: + return codes.NotFound + case http.StatusConflict: + return codes.AlreadyExists + case http.StatusForbidden: + return codes.PermissionDenied + case http.StatusUnauthorized: + return codes.Unauthenticated + case http.StatusTooManyRequests: + return codes.ResourceExhausted + case http.StatusNotImplemented: + return codes.Unimplemented + case http.StatusServiceUnavailable: + return codes.Unavailable + default: + logger.WithField("httpStatusCode", httpUnallocatedStatusCode).Warnf("received unknown http status code, defaulting to codes.ResourceExhausted / 429") + return codes.ResourceExhausted + } +} diff --git a/install/helm/agones/templates/service/allocation.yaml b/install/helm/agones/templates/service/allocation.yaml index 98718983f3..cd3ae400e3 100644 --- a/install/helm/agones/templates/service/allocation.yaml +++ b/install/helm/agones/templates/service/allocation.yaml @@ -253,6 +253,8 @@ spec: - name: GRPC_PORT value: {{ .Values.agones.allocator.service.grpc.targetPort | quote }} {{- end }} + - name: HTTP_UNALLOCATED_STATUS_CODE + value: {{ .Values.agones.allocator.service.http.unallocatedStatusCode | quote }} - name: API_SERVER_QPS value: {{ .Values.agones.allocator.apiServerQPS | quote }} - name: API_SERVER_QPS_BURST diff --git a/install/helm/agones/values.yaml b/install/helm/agones/values.yaml index 1070d30065..b5eba37650 100644 --- a/install/helm/agones/values.yaml +++ b/install/helm/agones/values.yaml @@ -226,6 +226,7 @@ agones: portName: https targetPort: 8443 nodePort: 0 # nodePort will be used if the serviceType is set to NodePort + unallocatedStatusCode: 429 grpc: enabled: true appProtocol: "" diff --git a/install/yaml/install.yaml b/install/yaml/install.yaml index 6428db250a..f9f90ec984 100644 --- a/install/yaml/install.yaml +++ b/install/yaml/install.yaml @@ -17437,6 +17437,8 @@ spec: value: "8443" - name: GRPC_PORT value: "8443" + - name: HTTP_UNALLOCATED_STATUS_CODE + value: "429" - name: API_SERVER_QPS value: "400" - name: API_SERVER_QPS_BURST diff --git a/pkg/allocation/converters/converter.go b/pkg/allocation/converters/converter.go index 64ad2c443a..3673ebcd6d 100644 --- a/pkg/allocation/converters/converter.go +++ b/pkg/allocation/converters/converter.go @@ -16,16 +16,17 @@ package converters import ( - pb "agones.dev/agones/pkg/allocation/go" - "agones.dev/agones/pkg/apis" - agonesv1 "agones.dev/agones/pkg/apis/agones/v1" - allocationv1 "agones.dev/agones/pkg/apis/allocation/v1" - "agones.dev/agones/pkg/util/runtime" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/wrapperspb" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + pb "agones.dev/agones/pkg/allocation/go" + "agones.dev/agones/pkg/apis" + agonesv1 "agones.dev/agones/pkg/apis/agones/v1" + allocationv1 "agones.dev/agones/pkg/apis/allocation/v1" + "agones.dev/agones/pkg/util/runtime" ) // ConvertAllocationRequestToGSA converts AllocationRequest to GameServerAllocation V1 (GSA) @@ -303,12 +304,12 @@ func convertGameServerSelectorsToInternalGameServerSelectors(in []*pb.GameServer } // ConvertGSAToAllocationResponse converts GameServerAllocation V1 (GSA) to AllocationResponse -func ConvertGSAToAllocationResponse(in *allocationv1.GameServerAllocation) (*pb.AllocationResponse, error) { +func ConvertGSAToAllocationResponse(in *allocationv1.GameServerAllocation, grpcUnallocatedStatusCode codes.Code) (*pb.AllocationResponse, error) { if in == nil { return nil, nil } - if err := convertStateV1ToError(in.Status.State); err != nil { + if err := convertStateV1ToError(in.Status.State, grpcUnallocatedStatusCode); err != nil { return nil, err } @@ -481,12 +482,13 @@ func convertAllocationListsToGSALists(in map[string]*pb.AllocationResponse_ListS } // convertStateV1ToError converts GameServerAllocationState V1 (GSA) to AllocationResponse_GameServerAllocationState -func convertStateV1ToError(in allocationv1.GameServerAllocationState) error { +func convertStateV1ToError(in allocationv1.GameServerAllocationState, grpcUnallocatedStatusCode codes.Code) error { + switch in { case allocationv1.GameServerAllocationAllocated: return nil case allocationv1.GameServerAllocationUnAllocated: - return status.Error(codes.ResourceExhausted, "there is no available GameServer to allocate") + return status.Error(grpcUnallocatedStatusCode, "there is no available GameServer to allocate") case allocationv1.GameServerAllocationContention: return status.Error(codes.Aborted, "too many concurrent requests have overwhelmed the system") } diff --git a/pkg/allocation/converters/converter_test.go b/pkg/allocation/converters/converter_test.go index 8c98873c05..0de964807a 100644 --- a/pkg/allocation/converters/converter_test.go +++ b/pkg/allocation/converters/converter_test.go @@ -18,11 +18,6 @@ import ( "fmt" "testing" - pb "agones.dev/agones/pkg/allocation/go" - "agones.dev/agones/pkg/apis" - agonesv1 "agones.dev/agones/pkg/apis/agones/v1" - allocationv1 "agones.dev/agones/pkg/apis/allocation/v1" - "agones.dev/agones/pkg/util/runtime" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc/codes" @@ -30,6 +25,12 @@ import ( "google.golang.org/protobuf/types/known/wrapperspb" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + pb "agones.dev/agones/pkg/allocation/go" + "agones.dev/agones/pkg/apis" + agonesv1 "agones.dev/agones/pkg/apis/agones/v1" + allocationv1 "agones.dev/agones/pkg/apis/allocation/v1" + "agones.dev/agones/pkg/util/runtime" ) func TestConvertAllocationRequestToGameServerAllocation(t *testing.T) { @@ -843,12 +844,13 @@ func TestConvertGSAToAllocationRequest(t *testing.T) { func TestConvertGSAToAllocationResponse(t *testing.T) { tests := []struct { - name string - features string - in *allocationv1.GameServerAllocation - want *pb.AllocationResponse - wantErrCode codes.Code - skipConvertToGSA bool + name string + features string + in *allocationv1.GameServerAllocation + grpcUnallocatedStatusCode codes.Code + want *pb.AllocationResponse + wantErrCode codes.Code + skipConvertToGSA bool }{ { name: "status state is set to allocated", @@ -1332,6 +1334,32 @@ func TestConvertGSAToAllocationResponse(t *testing.T) { }, }, }, + { + name: "status field is set to unallocated, non-default unallocated", + in: &allocationv1.GameServerAllocation{ + TypeMeta: metav1.TypeMeta{ + Kind: "GameServerAllocation", + APIVersion: "allocation.agones.dev/v1", + }, + Status: allocationv1.GameServerAllocationStatus{ + State: allocationv1.GameServerAllocationUnAllocated, + GameServerName: "GSN", + Ports: []agonesv1.GameServerStatusPort{ + { + Port: 123, + }, + { + Name: "port-name", + }, + }, + Address: "address", + NodeName: "node-name", + }, + }, + grpcUnallocatedStatusCode: codes.Unimplemented, + wantErrCode: codes.Unimplemented, + skipConvertToGSA: true, + }, } for _, tc := range tests { tc := tc @@ -1341,7 +1369,12 @@ func TestConvertGSAToAllocationResponse(t *testing.T) { defer runtime.FeatureTestMutex.Unlock() require.NoError(t, runtime.ParseFeatures(tc.features)) - out, err := ConvertGSAToAllocationResponse(tc.in) + grpcUnallocatedStatusCode := tc.grpcUnallocatedStatusCode + if grpcUnallocatedStatusCode == codes.OK { + grpcUnallocatedStatusCode = codes.ResourceExhausted + } + + out, err := ConvertGSAToAllocationResponse(tc.in, grpcUnallocatedStatusCode) if tc.wantErrCode != 0 { st, ok := status.FromError(err) if !assert.True(t, ok) {