Skip to content

Commit

Permalink
Configure Allocator Status Code (#3782)
Browse files Browse the repository at this point in the history
* Configure Allocator Status Code
* Flags added in cmd/main.go
  • Loading branch information
Kalaiselvi84 authored Apr 23, 2024
1 parent dd18cb9 commit 3ae6e18
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 37 deletions.
77 changes: 61 additions & 16 deletions cmd/allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,6 @@ import (
"sync"
"time"

"agones.dev/agones/pkg"
"agones.dev/agones/pkg/allocation/converters"
pb "agones.dev/agones/pkg/allocation/go"
allocationv1 "agones.dev/agones/pkg/apis/allocation/v1"
"agones.dev/agones/pkg/client/clientset/versioned"
"agones.dev/agones/pkg/client/informers/externalversions"
"agones.dev/agones/pkg/gameserverallocations"
"agones.dev/agones/pkg/gameservers"
"agones.dev/agones/pkg/util/fswatch"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/signals"
"github.com/heptiolabs/healthcheck"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand All @@ -53,6 +42,18 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

"agones.dev/agones/pkg"
"agones.dev/agones/pkg/allocation/converters"
pb "agones.dev/agones/pkg/allocation/go"
allocationv1 "agones.dev/agones/pkg/apis/allocation/v1"
"agones.dev/agones/pkg/client/clientset/versioned"
"agones.dev/agones/pkg/client/informers/externalversions"
"agones.dev/agones/pkg/gameserverallocations"
"agones.dev/agones/pkg/gameservers"
"agones.dev/agones/pkg/util/fswatch"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/signals"
)

var (
Expand Down Expand Up @@ -81,6 +82,7 @@ const (
logLevelFlag = "log-level"
allocationBatchWaitTime = "allocation-batch-wait-time"
readinessShutdownDuration = "readiness-shutdown-duration"
httpUnallocatedStatusCode = "http-unallocated-status-code"
)

func parseEnvFlags() config {
Expand All @@ -98,6 +100,7 @@ func parseEnvFlags() config {
viper.SetDefault(totalRemoteAllocationTimeoutFlag, 30*time.Second)
viper.SetDefault(logLevelFlag, "Info")
viper.SetDefault(allocationBatchWaitTime, 500*time.Millisecond)
viper.SetDefault(httpUnallocatedStatusCode, http.StatusTooManyRequests)

pflag.Int32(httpPortFlag, viper.GetInt32(httpPortFlag), "Port to listen on for REST requests")
pflag.Int32(grpcPortFlag, viper.GetInt32(grpcPortFlag), "Port to listen on for gRPC requests")
Expand All @@ -114,6 +117,7 @@ func parseEnvFlags() config {
pflag.String(logLevelFlag, viper.GetString(logLevelFlag), "Agones Log level")
pflag.Duration(allocationBatchWaitTime, viper.GetDuration(allocationBatchWaitTime), "Flag to configure the waiting period between allocations batches")
pflag.Duration(readinessShutdownDuration, viper.GetDuration(readinessShutdownDuration), "Time in seconds for SIGTERM/SIGINT handler to sleep for.")
pflag.Int32(httpUnallocatedStatusCode, viper.GetInt32(httpUnallocatedStatusCode), "HTTP status code to return when no GameServer is available")
runtime.FeaturesBindFlags()
pflag.Parse()

Expand All @@ -133,6 +137,7 @@ func parseEnvFlags() config {
runtime.Must(viper.BindEnv(logLevelFlag))
runtime.Must(viper.BindEnv(allocationBatchWaitTime))
runtime.Must(viper.BindEnv(readinessShutdownDuration))
runtime.Must(viper.BindEnv(httpUnallocatedStatusCode))
runtime.Must(viper.BindPFlags(pflag.CommandLine))
runtime.Must(runtime.FeaturesBindEnv())

Expand All @@ -154,6 +159,7 @@ func parseEnvFlags() config {
totalRemoteAllocationTimeout: viper.GetDuration(totalRemoteAllocationTimeoutFlag),
allocationBatchWaitTime: viper.GetDuration(allocationBatchWaitTime),
ReadinessShutdownDuration: viper.GetDuration(readinessShutdownDuration),
httpUnallocatedStatusCode: int(viper.GetInt32(httpUnallocatedStatusCode)),
}
}

Expand All @@ -173,6 +179,7 @@ type config struct {
remoteAllocationTimeout time.Duration
allocationBatchWaitTime time.Duration
ReadinessShutdownDuration time.Duration
httpUnallocatedStatusCode int
}

// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC
Expand Down Expand Up @@ -244,7 +251,9 @@ func main() {
os.Exit(0)
})

h := newServiceHandler(ctx, kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime)
grpcUnallocatedStatusCode := grpcCodeFromHTTPStatus(conf.httpUnallocatedStatusCode)

h := newServiceHandler(ctx, kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime, grpcUnallocatedStatusCode)

if !h.tlsDisabled {
cancelTLS, err := fswatch.Watch(logger, tlsDir, time.Second, func() {
Expand Down Expand Up @@ -378,7 +387,7 @@ func runGRPC(h *serviceHandler, grpcPort int) {
}()
}

func newServiceHandler(ctx context.Context, kubeClient kubernetes.Interface, agonesClient versioned.Interface, health healthcheck.Handler, mTLSDisabled bool, tlsDisabled bool, remoteAllocationTimeout time.Duration, totalRemoteAllocationTimeout time.Duration, allocationBatchWaitTime time.Duration) *serviceHandler {
func newServiceHandler(ctx context.Context, kubeClient kubernetes.Interface, agonesClient versioned.Interface, health healthcheck.Handler, mTLSDisabled bool, tlsDisabled bool, remoteAllocationTimeout time.Duration, totalRemoteAllocationTimeout time.Duration, allocationBatchWaitTime time.Duration, grpcUnallocatedStatusCode codes.Code) *serviceHandler {
defaultResync := 30 * time.Second
agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync)
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync)
Expand All @@ -398,8 +407,9 @@ func newServiceHandler(ctx context.Context, kubeClient kubernetes.Interface, ago
allocationCallback: func(gsa *allocationv1.GameServerAllocation) (k8sruntime.Object, error) {
return allocator.Allocate(ctx, gsa)
},
mTLSDisabled: mTLSDisabled,
tlsDisabled: tlsDisabled,
mTLSDisabled: mTLSDisabled,
tlsDisabled: tlsDisabled,
grpcUnallocatedStatusCode: grpcUnallocatedStatusCode,
}

kubeInformerFactory.Start(ctx.Done())
Expand Down Expand Up @@ -599,6 +609,8 @@ type serviceHandler struct {

mTLSDisabled bool
tlsDisabled bool

grpcUnallocatedStatusCode codes.Code
}

// Allocate implements the Allocate gRPC method definition
Expand All @@ -621,8 +633,41 @@ func (h *serviceHandler) Allocate(ctx context.Context, in *pb.AllocationRequest)
logger.Errorf("internal server error - Bad GSA format %v", resultObj)
return nil, status.Errorf(codes.Internal, "internal server error- Bad GSA format %v", resultObj)
}
response, err := converters.ConvertGSAToAllocationResponse(allocatedGsa)
response, err := converters.ConvertGSAToAllocationResponse(allocatedGsa, h.grpcUnallocatedStatusCode)
logger.WithField("response", response).WithError(err).Infof("allocation response is being sent")

return response, err
}

// grpcCodeFromHTTPStatus converts an HTTP status code to the corresponding gRPC status code.
func grpcCodeFromHTTPStatus(httpUnallocatedStatusCode int) codes.Code {
switch httpUnallocatedStatusCode {
case http.StatusOK:
return codes.OK
case 499:
return codes.Canceled
case http.StatusInternalServerError:
return codes.Internal
case http.StatusBadRequest:
return codes.InvalidArgument
case http.StatusGatewayTimeout:
return codes.DeadlineExceeded
case http.StatusNotFound:
return codes.NotFound
case http.StatusConflict:
return codes.AlreadyExists
case http.StatusForbidden:
return codes.PermissionDenied
case http.StatusUnauthorized:
return codes.Unauthenticated
case http.StatusTooManyRequests:
return codes.ResourceExhausted
case http.StatusNotImplemented:
return codes.Unimplemented
case http.StatusServiceUnavailable:
return codes.Unavailable
default:
logger.WithField("httpStatusCode", httpUnallocatedStatusCode).Warnf("received unknown http status code, defaulting to codes.ResourceExhausted / 429")
return codes.ResourceExhausted
}
}
2 changes: 2 additions & 0 deletions install/helm/agones/templates/service/allocation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ spec:
- name: GRPC_PORT
value: {{ .Values.agones.allocator.service.grpc.targetPort | quote }}
{{- end }}
- name: HTTP_UNALLOCATED_STATUS_CODE
value: {{ .Values.agones.allocator.service.http.unallocatedStatusCode | quote }}
- name: API_SERVER_QPS
value: {{ .Values.agones.allocator.apiServerQPS | quote }}
- name: API_SERVER_QPS_BURST
Expand Down
1 change: 1 addition & 0 deletions install/helm/agones/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ agones:
portName: https
targetPort: 8443
nodePort: 0 # nodePort will be used if the serviceType is set to NodePort
unallocatedStatusCode: 429
grpc:
enabled: true
appProtocol: ""
Expand Down
2 changes: 2 additions & 0 deletions install/yaml/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17437,6 +17437,8 @@ spec:
value: "8443"
- name: GRPC_PORT
value: "8443"
- name: HTTP_UNALLOCATED_STATUS_CODE
value: "429"
- name: API_SERVER_QPS
value: "400"
- name: API_SERVER_QPS_BURST
Expand Down
20 changes: 11 additions & 9 deletions pkg/allocation/converters/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@
package converters

import (
pb "agones.dev/agones/pkg/allocation/go"
"agones.dev/agones/pkg/apis"
agonesv1 "agones.dev/agones/pkg/apis/agones/v1"
allocationv1 "agones.dev/agones/pkg/apis/allocation/v1"
"agones.dev/agones/pkg/util/runtime"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/wrapperspb"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

pb "agones.dev/agones/pkg/allocation/go"
"agones.dev/agones/pkg/apis"
agonesv1 "agones.dev/agones/pkg/apis/agones/v1"
allocationv1 "agones.dev/agones/pkg/apis/allocation/v1"
"agones.dev/agones/pkg/util/runtime"
)

// ConvertAllocationRequestToGSA converts AllocationRequest to GameServerAllocation V1 (GSA)
Expand Down Expand Up @@ -303,12 +304,12 @@ func convertGameServerSelectorsToInternalGameServerSelectors(in []*pb.GameServer
}

// ConvertGSAToAllocationResponse converts GameServerAllocation V1 (GSA) to AllocationResponse
func ConvertGSAToAllocationResponse(in *allocationv1.GameServerAllocation) (*pb.AllocationResponse, error) {
func ConvertGSAToAllocationResponse(in *allocationv1.GameServerAllocation, grpcUnallocatedStatusCode codes.Code) (*pb.AllocationResponse, error) {
if in == nil {
return nil, nil
}

if err := convertStateV1ToError(in.Status.State); err != nil {
if err := convertStateV1ToError(in.Status.State, grpcUnallocatedStatusCode); err != nil {
return nil, err
}

Expand Down Expand Up @@ -481,12 +482,13 @@ func convertAllocationListsToGSALists(in map[string]*pb.AllocationResponse_ListS
}

// convertStateV1ToError converts GameServerAllocationState V1 (GSA) to AllocationResponse_GameServerAllocationState
func convertStateV1ToError(in allocationv1.GameServerAllocationState) error {
func convertStateV1ToError(in allocationv1.GameServerAllocationState, grpcUnallocatedStatusCode codes.Code) error {

switch in {
case allocationv1.GameServerAllocationAllocated:
return nil
case allocationv1.GameServerAllocationUnAllocated:
return status.Error(codes.ResourceExhausted, "there is no available GameServer to allocate")
return status.Error(grpcUnallocatedStatusCode, "there is no available GameServer to allocate")
case allocationv1.GameServerAllocationContention:
return status.Error(codes.Aborted, "too many concurrent requests have overwhelmed the system")
}
Expand Down
57 changes: 45 additions & 12 deletions pkg/allocation/converters/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@ import (
"fmt"
"testing"

pb "agones.dev/agones/pkg/allocation/go"
"agones.dev/agones/pkg/apis"
agonesv1 "agones.dev/agones/pkg/apis/agones/v1"
allocationv1 "agones.dev/agones/pkg/apis/allocation/v1"
"agones.dev/agones/pkg/util/runtime"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/wrapperspb"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

pb "agones.dev/agones/pkg/allocation/go"
"agones.dev/agones/pkg/apis"
agonesv1 "agones.dev/agones/pkg/apis/agones/v1"
allocationv1 "agones.dev/agones/pkg/apis/allocation/v1"
"agones.dev/agones/pkg/util/runtime"
)

func TestConvertAllocationRequestToGameServerAllocation(t *testing.T) {
Expand Down Expand Up @@ -843,12 +844,13 @@ func TestConvertGSAToAllocationRequest(t *testing.T) {

func TestConvertGSAToAllocationResponse(t *testing.T) {
tests := []struct {
name string
features string
in *allocationv1.GameServerAllocation
want *pb.AllocationResponse
wantErrCode codes.Code
skipConvertToGSA bool
name string
features string
in *allocationv1.GameServerAllocation
grpcUnallocatedStatusCode codes.Code
want *pb.AllocationResponse
wantErrCode codes.Code
skipConvertToGSA bool
}{
{
name: "status state is set to allocated",
Expand Down Expand Up @@ -1332,6 +1334,32 @@ func TestConvertGSAToAllocationResponse(t *testing.T) {
},
},
},
{
name: "status field is set to unallocated, non-default unallocated",
in: &allocationv1.GameServerAllocation{
TypeMeta: metav1.TypeMeta{
Kind: "GameServerAllocation",
APIVersion: "allocation.agones.dev/v1",
},
Status: allocationv1.GameServerAllocationStatus{
State: allocationv1.GameServerAllocationUnAllocated,
GameServerName: "GSN",
Ports: []agonesv1.GameServerStatusPort{
{
Port: 123,
},
{
Name: "port-name",
},
},
Address: "address",
NodeName: "node-name",
},
},
grpcUnallocatedStatusCode: codes.Unimplemented,
wantErrCode: codes.Unimplemented,
skipConvertToGSA: true,
},
}
for _, tc := range tests {
tc := tc
Expand All @@ -1341,7 +1369,12 @@ func TestConvertGSAToAllocationResponse(t *testing.T) {
defer runtime.FeatureTestMutex.Unlock()
require.NoError(t, runtime.ParseFeatures(tc.features))

out, err := ConvertGSAToAllocationResponse(tc.in)
grpcUnallocatedStatusCode := tc.grpcUnallocatedStatusCode
if grpcUnallocatedStatusCode == codes.OK {
grpcUnallocatedStatusCode = codes.ResourceExhausted
}

out, err := ConvertGSAToAllocationResponse(tc.in, grpcUnallocatedStatusCode)
if tc.wantErrCode != 0 {
st, ok := status.FromError(err)
if !assert.True(t, ok) {
Expand Down

0 comments on commit 3ae6e18

Please sign in to comment.