Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Hongxin Liang <[email protected]>
  • Loading branch information
honnix committed Jul 5, 2023
1 parent 8b27579 commit 42a2565
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 14 deletions.
12 changes: 7 additions & 5 deletions go/tasks/plugins/webapi/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
)

var (
defaultTimeout = config.Duration{Duration: 10 * time.Second}

defaultConfig = Config{
WebAPI: webapi.PluginConfig{
ResourceQuotas: map[core.ResourceNamespace]int{
Expand Down Expand Up @@ -42,8 +40,9 @@ var (
},
},
DefaultGrpcEndpoint: GrpcEndpoint{
Endpoint: "dns:///flyte-agent.flyte.svc.cluster.local:80",
Insecure: true,
Endpoint: "dns:///flyte-agent.flyte.svc.cluster.local:80",
Insecure: true,
DefaultTimeout: config.Duration{Duration: 10 * time.Second},
},
SupportedTaskTypes: []string{"task_type_1", "task_type_2"},
}
Expand Down Expand Up @@ -78,8 +77,11 @@ type GrpcEndpoint struct {
// DefaultServiceConfig sets default gRPC service config; check https://github.com/grpc/grpc/blob/master/doc/service_config.md for more details
DefaultServiceConfig string `json:"defaultServiceConfig"`

// Timeouts defines various RPC timeout values for different plugin operations: CreateTask, GetTask, DeleteTask; if not configured, defaults to 10s
// Timeouts defines various RPC timeout values for different plugin operations: CreateTask, GetTask, DeleteTask; if not configured, defaults to DefaultTimeout
Timeouts map[string]config.Duration `json:"timeouts"`

// DefaultTimeout gives the default RPC timeout if a more specific one is not defined in Timeouts
DefaultTimeout config.Duration `json:"defaultTimeout"`
}

func GetConfig() *Config {
Expand Down
1 change: 1 addition & 0 deletions go/tasks/plugins/webapi/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestGetAndSetConfig(t *testing.T) {
Duration: 3 * time.Millisecond,
},
}
cfg.DefaultGrpcEndpoint.DefaultTimeout = config.Duration{Duration: 10 * time.Second}
err := SetConfig(&cfg)
assert.NoError(t, err)
assert.Equal(t, &cfg, GetConfig())
Expand Down
14 changes: 7 additions & 7 deletions go/tasks/plugins/webapi/agent/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (p Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContextR
return nil, nil, fmt.Errorf("failed to connect to agent with error: %v", err)
}

newCtx, cancel := context.WithTimeout(ctx, getFinalTimeout("CreateTask", endpoint.Timeouts).Duration)
newCtx, cancel := context.WithTimeout(ctx, getFinalTimeout("CreateTask", endpoint).Duration)
defer cancel()

res, err := client.CreateTask(newCtx, &admin.CreateTaskRequest{Inputs: inputs, Template: taskTemplate, OutputPrefix: outputPrefix})
Expand All @@ -101,7 +101,7 @@ func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest weba
return nil, fmt.Errorf("failed to connect to agent with error: %v", err)
}

newCtx, cancel := context.WithTimeout(ctx, getFinalTimeout("GetTask", endpoint.Timeouts).Duration)
newCtx, cancel := context.WithTimeout(ctx, getFinalTimeout("GetTask", endpoint).Duration)
defer cancel()

res, err := client.GetTask(newCtx, &admin.GetTaskRequest{TaskType: metadata.TaskType, ResourceMeta: metadata.AgentResourceMeta})
Expand All @@ -127,7 +127,7 @@ func (p Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error
return fmt.Errorf("failed to connect to agent with error: %v", err)
}

newCtx, cancel := context.WithTimeout(ctx, getFinalTimeout("DeleteTask", endpoint.Timeouts).Duration)
newCtx, cancel := context.WithTimeout(ctx, getFinalTimeout("DeleteTask", endpoint).Duration)
defer cancel()

_, err = client.DeleteTask(newCtx, &admin.DeleteTaskRequest{TaskType: metadata.TaskType, ResourceMeta: metadata.AgentResourceMeta})

Check warning on line 133 in go/tasks/plugins/webapi/agent/plugin.go

View check run for this annotation

Codecov / codecov/patch

go/tasks/plugins/webapi/agent/plugin.go#L130-L133

Added lines #L130 - L133 were not covered by tests
Expand Down Expand Up @@ -185,7 +185,7 @@ func getClientFunc(ctx context.Context, endpoint GrpcEndpoint, connectionCache m
opts = append(opts, grpc.WithTransportCredentials(creds))
}

if endpoint.DefaultServiceConfig != "" {
if len(endpoint.DefaultServiceConfig) != 0 {
opts = append(opts, grpc.WithDefaultServiceConfig(endpoint.DefaultServiceConfig))
}

Expand All @@ -212,12 +212,12 @@ func getClientFunc(ctx context.Context, endpoint GrpcEndpoint, connectionCache m
return service.NewAsyncAgentServiceClient(conn), nil
}

func getFinalTimeout(operation string, timeouts map[string]config.Duration) config.Duration {
if t, exists := timeouts[operation]; exists {
func getFinalTimeout(operation string, endpoint GrpcEndpoint) config.Duration {
if t, exists := endpoint.Timeouts[operation]; exists {
return t
}

return defaultTimeout
return endpoint.DefaultTimeout
}

func newAgentPlugin() webapi.PluginEntry {
Expand Down
4 changes: 2 additions & 2 deletions go/tasks/plugins/webapi/agent/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ func TestPlugin(t *testing.T) {
})

t.Run("test getFinalTimeout", func(t *testing.T) {
timeout := getFinalTimeout("CreateTask", map[string]config.Duration{"CreateTask": {Duration: 1 * time.Millisecond}})
timeout := getFinalTimeout("CreateTask", GrpcEndpoint{Endpoint: "localhost:8080", Timeouts: map[string]config.Duration{"CreateTask": {Duration: 1 * time.Millisecond}}})
assert.Equal(t, timeout.Duration, 1*time.Millisecond)
timeout = getFinalTimeout("DeleteTask", map[string]config.Duration{})
timeout = getFinalTimeout("DeleteTask", GrpcEndpoint{Endpoint: "localhost:8080", DefaultTimeout: config.Duration{Duration: 10 * time.Second}})
assert.Equal(t, timeout.Duration, 10*time.Second)
})
}

0 comments on commit 42a2565

Please sign in to comment.