From d01d78ebd2433f2e65b087aa3ccf711fbc5de5e6 Mon Sep 17 00:00:00 2001 From: Yicheng-Lu-llll Date: Wed, 14 Jun 2023 01:06:55 +0000 Subject: [PATCH 1/4] implement GetFlyteKitMetrics Signed-off-by: Yicheng-Lu-llll --- pkg/manager/impl/metrics_manager.go | 57 ++++++++++++++++++- pkg/manager/interfaces/metrics.go | 3 + .../transformers/node_execution.go | 1 + pkg/rpc/adminservice/base.go | 2 +- pkg/rpc/adminservice/execution.go | 18 ++++++ pkg/rpc/adminservice/metrics.go | 44 +++++++------- 6 files changed, 102 insertions(+), 23 deletions(-) diff --git a/pkg/manager/impl/metrics_manager.go b/pkg/manager/impl/metrics_manager.go index a6d010b1e..1837a2548 100644 --- a/pkg/manager/impl/metrics_manager.go +++ b/pkg/manager/impl/metrics_manager.go @@ -20,6 +20,9 @@ import ( "github.com/golang/protobuf/ptypes/timestamp" "google.golang.org/protobuf/types/known/timestamppb" + + dataInterfaces "github.com/flyteorg/flyteadmin/pkg/data/interfaces" + "github.com/flyteorg/flytestdlib/storage" ) const ( @@ -59,6 +62,8 @@ type MetricsManager struct { nodeExecutionManager interfaces.NodeExecutionInterface taskExecutionManager interfaces.TaskExecutionInterface metrics metrics + urlData dataInterfaces.RemoteURLInterface + storageClient *storage.DataStore } // createOperationSpan returns a Span defined by the provided arguments. @@ -643,6 +648,23 @@ func (m *MetricsManager) parseTaskNodeExecution(ctx context.Context, nodeExecuti return nil } +func printSpans(span *core.Span, prefix string) { + switch id := span.Id.(type) { + case *core.Span_WorkflowId: + fmt.Println(prefix, "Workflow ID:", id.WorkflowId) + case *core.Span_NodeId: + fmt.Println(prefix, "Node ID:", id.NodeId) + case *core.Span_TaskId: + fmt.Println(prefix, "Task ID:", id.TaskId) + case *core.Span_OperationId: + fmt.Println(prefix, "Operation ID:", id.OperationId) + } + + for _, childSpan := range span.Spans { + printSpans(childSpan, prefix+"-") + } +} + // GetExecutionMetrics returns a Span hierarchically breaking down the workflow execution into a collection of // Categorical and Reference Spans. func (m *MetricsManager) GetExecutionMetrics(ctx context.Context, @@ -663,13 +685,44 @@ func (m *MetricsManager) GetExecutionMetrics(ctx context.Context, return &admin.WorkflowExecutionGetMetricsResponse{Span: span}, nil } +func (m *MetricsManager) getFlyteKitSpans(ctx context.Context, spanUri string) (*core.Span, error) { + fmt.Println("For test only, Span URI:", spanUri) + blob, err := m.urlData.Get(ctx, spanUri) + if err != nil { + return nil, err + } + + var flyteKitSpan core.Span + err = m.storageClient.ReadProtobuf(ctx, storage.DataReference(blob.Url), &flyteKitSpan) + if err != nil { + return nil, err + } + return &flyteKitSpan, nil +} + +func (m *MetricsManager) GetFlyteKitMetrics(ctx context.Context, + request admin.NodeExecutionGetRequest,) (*admin.WorkflowExecutionGetMetricsResponse, error) { + nodeExecution, err := m.nodeExecutionManager.GetNodeExecution(ctx, request) + if err != nil { + return nil, err + } + + flyteKitSpan, err := m.getFlyteKitSpans(ctx, nodeExecution.Closure.SpanUri) + if err != nil { + return nil, err + } + + return &admin.WorkflowExecutionGetMetricsResponse{Span: flyteKitSpan}, nil +} + // NewMetricsManager returns a new MetricsManager constructed with the provided arguments. func NewMetricsManager( workflowManager interfaces.WorkflowInterface, executionManager interfaces.ExecutionInterface, nodeExecutionManager interfaces.NodeExecutionInterface, taskExecutionManager interfaces.TaskExecutionInterface, - scope promutils.Scope) interfaces.MetricsInterface { + scope promutils.Scope, urlData dataInterfaces.RemoteURLInterface, storageClient *storage.DataStore, +) interfaces.MetricsInterface { metrics := metrics{ Scope: scope, } @@ -680,5 +733,7 @@ func NewMetricsManager( nodeExecutionManager: nodeExecutionManager, taskExecutionManager: taskExecutionManager, metrics: metrics, + urlData: urlData, + storageClient: storageClient, } } diff --git a/pkg/manager/interfaces/metrics.go b/pkg/manager/interfaces/metrics.go index d726cdc99..bbd58a880 100644 --- a/pkg/manager/interfaces/metrics.go +++ b/pkg/manager/interfaces/metrics.go @@ -12,4 +12,7 @@ import ( type MetricsInterface interface { GetExecutionMetrics(ctx context.Context, request admin.WorkflowExecutionGetMetricsRequest) ( *admin.WorkflowExecutionGetMetricsResponse, error) + + GetFlyteKitMetrics(ctx context.Context,request admin.NodeExecutionGetRequest) ( + *admin.WorkflowExecutionGetMetricsResponse, error) } diff --git a/pkg/repositories/transformers/node_execution.go b/pkg/repositories/transformers/node_execution.go index f1d90361f..4da95907d 100644 --- a/pkg/repositories/transformers/node_execution.go +++ b/pkg/repositories/transformers/node_execution.go @@ -98,6 +98,7 @@ func addTerminalState( nodeExecutionModel.ErrorCode = &request.Event.GetError().Code } closure.DeckUri = request.Event.DeckUri + closure.SpanUri = request.Event.SpanUri return nil } diff --git a/pkg/rpc/adminservice/base.go b/pkg/rpc/adminservice/base.go index 77c78f480..abca0b8e5 100644 --- a/pkg/rpc/adminservice/base.go +++ b/pkg/rpc/adminservice/base.go @@ -178,7 +178,7 @@ func NewAdminServer(ctx context.Context, pluginRegistry *plugins.Registry, confi ProjectManager: manager.NewProjectManager(repo, configuration), ResourceManager: resources.NewResourceManager(repo, configuration.ApplicationConfiguration()), MetricsManager: manager.NewMetricsManager(workflowManager, executionManager, nodeExecutionManager, - taskExecutionManager, adminScope.NewSubScope("metrics_manager")), + taskExecutionManager, adminScope.NewSubScope("metrics_manager"), urlData, dataStorageClient), Metrics: InitMetrics(adminScope), } } diff --git a/pkg/rpc/adminservice/execution.go b/pkg/rpc/adminservice/execution.go index 87e2fe487..e956eeaee 100644 --- a/pkg/rpc/adminservice/execution.go +++ b/pkg/rpc/adminservice/execution.go @@ -159,6 +159,24 @@ func (m *AdminService) GetExecutionMetrics( return response, nil } +func (m *AdminService) GetFlyteKitMetrics( + ctx context.Context, request *admin.NodeExecutionGetRequest) (*admin.WorkflowExecutionGetMetricsResponse, error) { + defer m.interceptPanic(ctx, request) + if request == nil { + return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") + } + var response *admin.WorkflowExecutionGetMetricsResponse + var err error + m.Metrics.executionEndpointMetrics.GetFlyteKitMetrics.Time(func() { + response, err = m.MetricsManager.GetFlyteKitMetrics(ctx, *request) + }) + if err != nil { + return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.GetFlyteKitMetrics) + } + m.Metrics.executionEndpointMetrics.GetFlyteKitMetrics.Success() + return response, nil +} + func (m *AdminService) ListExecutions( ctx context.Context, request *admin.ResourceListRequest) (*admin.ExecutionList, error) { defer m.interceptPanic(ctx, request) diff --git a/pkg/rpc/adminservice/metrics.go b/pkg/rpc/adminservice/metrics.go index b2bab4514..f30180bac 100644 --- a/pkg/rpc/adminservice/metrics.go +++ b/pkg/rpc/adminservice/metrics.go @@ -10,16 +10,17 @@ import ( type executionEndpointMetrics struct { scope promutils.Scope - create util.RequestMetrics - relaunch util.RequestMetrics - recover util.RequestMetrics - createEvent util.RequestMetrics - get util.RequestMetrics - update util.RequestMetrics - getData util.RequestMetrics - getMetrics util.RequestMetrics - list util.RequestMetrics - terminate util.RequestMetrics + create util.RequestMetrics + relaunch util.RequestMetrics + recover util.RequestMetrics + createEvent util.RequestMetrics + get util.RequestMetrics + update util.RequestMetrics + getData util.RequestMetrics + getMetrics util.RequestMetrics + GetFlyteKitMetrics util.RequestMetrics + list util.RequestMetrics + terminate util.RequestMetrics } type launchPlanEndpointMetrics struct { @@ -131,17 +132,18 @@ func InitMetrics(adminScope promutils.Scope) AdminMetrics { "panics encountered while handling requests to the admin service"), executionEndpointMetrics: executionEndpointMetrics{ - scope: adminScope, - create: util.NewRequestMetrics(adminScope, "create_execution"), - relaunch: util.NewRequestMetrics(adminScope, "relaunch_execution"), - recover: util.NewRequestMetrics(adminScope, "recover_execution"), - createEvent: util.NewRequestMetrics(adminScope, "create_execution_event"), - get: util.NewRequestMetrics(adminScope, "get_execution"), - update: util.NewRequestMetrics(adminScope, "update_execution"), - getData: util.NewRequestMetrics(adminScope, "get_execution_data"), - getMetrics: util.NewRequestMetrics(adminScope, "get_execution_metrics"), - list: util.NewRequestMetrics(adminScope, "list_execution"), - terminate: util.NewRequestMetrics(adminScope, "terminate_execution"), + scope: adminScope, + create: util.NewRequestMetrics(adminScope, "create_execution"), + relaunch: util.NewRequestMetrics(adminScope, "relaunch_execution"), + recover: util.NewRequestMetrics(adminScope, "recover_execution"), + createEvent: util.NewRequestMetrics(adminScope, "create_execution_event"), + get: util.NewRequestMetrics(adminScope, "get_execution"), + update: util.NewRequestMetrics(adminScope, "update_execution"), + getData: util.NewRequestMetrics(adminScope, "get_execution_data"), + getMetrics: util.NewRequestMetrics(adminScope, "get_execution_metrics"), + GetFlyteKitMetrics: util.NewRequestMetrics(adminScope, "get_flytekit_metrics"), + list: util.NewRequestMetrics(adminScope, "list_execution"), + terminate: util.NewRequestMetrics(adminScope, "terminate_execution"), }, launchPlanEndpointMetrics: launchPlanEndpointMetrics{ scope: adminScope, From 3e988f2d6a0a263781802fa698758a72ca189252 Mon Sep 17 00:00:00 2001 From: Yicheng-Lu-llll Date: Wed, 14 Jun 2023 06:07:02 +0000 Subject: [PATCH 2/4] fix lint error Signed-off-by: Yicheng-Lu-llll --- pkg/manager/impl/metrics_manager.go | 25 ++++--------------------- pkg/manager/interfaces/metrics.go | 2 +- 2 files changed, 5 insertions(+), 22 deletions(-) diff --git a/pkg/manager/impl/metrics_manager.go b/pkg/manager/impl/metrics_manager.go index 1837a2548..42847620c 100644 --- a/pkg/manager/impl/metrics_manager.go +++ b/pkg/manager/impl/metrics_manager.go @@ -648,23 +648,6 @@ func (m *MetricsManager) parseTaskNodeExecution(ctx context.Context, nodeExecuti return nil } -func printSpans(span *core.Span, prefix string) { - switch id := span.Id.(type) { - case *core.Span_WorkflowId: - fmt.Println(prefix, "Workflow ID:", id.WorkflowId) - case *core.Span_NodeId: - fmt.Println(prefix, "Node ID:", id.NodeId) - case *core.Span_TaskId: - fmt.Println(prefix, "Task ID:", id.TaskId) - case *core.Span_OperationId: - fmt.Println(prefix, "Operation ID:", id.OperationId) - } - - for _, childSpan := range span.Spans { - printSpans(childSpan, prefix+"-") - } -} - // GetExecutionMetrics returns a Span hierarchically breaking down the workflow execution into a collection of // Categorical and Reference Spans. func (m *MetricsManager) GetExecutionMetrics(ctx context.Context, @@ -685,9 +668,9 @@ func (m *MetricsManager) GetExecutionMetrics(ctx context.Context, return &admin.WorkflowExecutionGetMetricsResponse{Span: span}, nil } -func (m *MetricsManager) getFlyteKitSpans(ctx context.Context, spanUri string) (*core.Span, error) { - fmt.Println("For test only, Span URI:", spanUri) - blob, err := m.urlData.Get(ctx, spanUri) +func (m *MetricsManager) getFlyteKitSpans(ctx context.Context, spanURI string) (*core.Span, error) { + fmt.Println("For test only, Span URI:", spanURI) + blob, err := m.urlData.Get(ctx, spanURI) if err != nil { return nil, err } @@ -701,7 +684,7 @@ func (m *MetricsManager) getFlyteKitSpans(ctx context.Context, spanUri string) ( } func (m *MetricsManager) GetFlyteKitMetrics(ctx context.Context, - request admin.NodeExecutionGetRequest,) (*admin.WorkflowExecutionGetMetricsResponse, error) { + request admin.NodeExecutionGetRequest) (*admin.WorkflowExecutionGetMetricsResponse, error) { nodeExecution, err := m.nodeExecutionManager.GetNodeExecution(ctx, request) if err != nil { return nil, err diff --git a/pkg/manager/interfaces/metrics.go b/pkg/manager/interfaces/metrics.go index bbd58a880..e88d61dcb 100644 --- a/pkg/manager/interfaces/metrics.go +++ b/pkg/manager/interfaces/metrics.go @@ -13,6 +13,6 @@ type MetricsInterface interface { GetExecutionMetrics(ctx context.Context, request admin.WorkflowExecutionGetMetricsRequest) ( *admin.WorkflowExecutionGetMetricsResponse, error) - GetFlyteKitMetrics(ctx context.Context,request admin.NodeExecutionGetRequest) ( + GetFlyteKitMetrics(ctx context.Context, request admin.NodeExecutionGetRequest) ( *admin.WorkflowExecutionGetMetricsResponse, error) } From 33c18bd86f36930257b538324e21f6fa552cb5ed Mon Sep 17 00:00:00 2001 From: Yicheng-Lu-llll Date: Wed, 14 Jun 2023 18:57:26 +0000 Subject: [PATCH 3/4] nit Signed-off-by: Yicheng-Lu-llll --- pkg/manager/impl/metrics_manager.go | 22 +++++--------- pkg/manager/mocks/metrics_interface.go | 41 ++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 15 deletions(-) diff --git a/pkg/manager/impl/metrics_manager.go b/pkg/manager/impl/metrics_manager.go index 42847620c..c7b584dbd 100644 --- a/pkg/manager/impl/metrics_manager.go +++ b/pkg/manager/impl/metrics_manager.go @@ -668,34 +668,26 @@ func (m *MetricsManager) GetExecutionMetrics(ctx context.Context, return &admin.WorkflowExecutionGetMetricsResponse{Span: span}, nil } -func (m *MetricsManager) getFlyteKitSpans(ctx context.Context, spanURI string) (*core.Span, error) { - fmt.Println("For test only, Span URI:", spanURI) - blob, err := m.urlData.Get(ctx, spanURI) - if err != nil { - return nil, err - } +func (m *MetricsManager) GetFlyteKitMetrics(ctx context.Context, + request admin.NodeExecutionGetRequest) (*admin.WorkflowExecutionGetMetricsResponse, error) { - var flyteKitSpan core.Span - err = m.storageClient.ReadProtobuf(ctx, storage.DataReference(blob.Url), &flyteKitSpan) + nodeExecution, err := m.nodeExecutionManager.GetNodeExecution(ctx, request) if err != nil { return nil, err } - return &flyteKitSpan, nil -} -func (m *MetricsManager) GetFlyteKitMetrics(ctx context.Context, - request admin.NodeExecutionGetRequest) (*admin.WorkflowExecutionGetMetricsResponse, error) { - nodeExecution, err := m.nodeExecutionManager.GetNodeExecution(ctx, request) + blob, err := m.urlData.Get(ctx, nodeExecution.Closure.SpanUri) if err != nil { return nil, err } - flyteKitSpan, err := m.getFlyteKitSpans(ctx, nodeExecution.Closure.SpanUri) + var flyteKitSpan core.Span + err = m.storageClient.ReadProtobuf(ctx, storage.DataReference(blob.Url), &flyteKitSpan) if err != nil { return nil, err } - return &admin.WorkflowExecutionGetMetricsResponse{Span: flyteKitSpan}, nil + return &admin.WorkflowExecutionGetMetricsResponse{Span: &flyteKitSpan}, nil } // NewMetricsManager returns a new MetricsManager constructed with the provided arguments. diff --git a/pkg/manager/mocks/metrics_interface.go b/pkg/manager/mocks/metrics_interface.go index 2e292593e..bb37d355f 100644 --- a/pkg/manager/mocks/metrics_interface.go +++ b/pkg/manager/mocks/metrics_interface.go @@ -55,3 +55,44 @@ func (_m *MetricsInterface) GetExecutionMetrics(ctx context.Context, request adm return r0, r1 } + +type MetricsInterface_GetFlyteKitMetrics struct { + *mock.Call +} + +func (_m MetricsInterface_GetFlyteKitMetrics) Return(_a0 *admin.WorkflowExecutionGetMetricsResponse, _a1 error) *MetricsInterface_GetFlyteKitMetrics { + return &MetricsInterface_GetFlyteKitMetrics{Call: _m.Call.Return(_a0, _a1)} +} + +func (_m *MetricsInterface) OnGetFlyteKitMetrics(ctx context.Context, request admin.NodeExecutionGetRequest) *MetricsInterface_GetFlyteKitMetrics { + c_call := _m.On("GetFlyteKitMetrics", ctx, request) + return &MetricsInterface_GetFlyteKitMetrics{Call: c_call} +} + +func (_m *MetricsInterface) OnGetFlyteKitMetricsMatch(matchers ...interface{}) *MetricsInterface_GetFlyteKitMetrics { + c_call := _m.On("GetFlyteKitMetrics", matchers...) + return &MetricsInterface_GetFlyteKitMetrics{Call: c_call} +} + +// GetFlyteKitMetrics provides a mock function with given fields: ctx, request +func (_m *MetricsInterface) GetFlyteKitMetrics(ctx context.Context, request admin.NodeExecutionGetRequest) (*admin.WorkflowExecutionGetMetricsResponse, error) { + ret := _m.Called(ctx, request) + + var r0 *admin.WorkflowExecutionGetMetricsResponse + if rf, ok := ret.Get(0).(func(context.Context, admin.NodeExecutionGetRequest) *admin.WorkflowExecutionGetMetricsResponse); ok { + r0 = rf(ctx, request) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*admin.WorkflowExecutionGetMetricsResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, admin.NodeExecutionGetRequest) error); ok { + r1 = rf(ctx, request) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} From 12ac25b17b1f73d0ae37339d2ba0ef5a4f2c812c Mon Sep 17 00:00:00 2001 From: Yicheng-Lu-llll Date: Sat, 17 Jun 2023 00:43:46 +0000 Subject: [PATCH 4/4] improve Signed-off-by: Yicheng-Lu-llll --- pkg/manager/impl/metrics_manager.go | 11 +++--- pkg/manager/interfaces/metrics.go | 4 +-- pkg/manager/mocks/metrics_interface.go | 30 ++++++++--------- pkg/rpc/adminservice/execution.go | 14 ++++---- pkg/rpc/adminservice/metrics.go | 46 +++++++++++++------------- 5 files changed, 54 insertions(+), 51 deletions(-) diff --git a/pkg/manager/impl/metrics_manager.go b/pkg/manager/impl/metrics_manager.go index c7b584dbd..5c0a09684 100644 --- a/pkg/manager/impl/metrics_manager.go +++ b/pkg/manager/impl/metrics_manager.go @@ -668,10 +668,13 @@ func (m *MetricsManager) GetExecutionMetrics(ctx context.Context, return &admin.WorkflowExecutionGetMetricsResponse{Span: span}, nil } -func (m *MetricsManager) GetFlyteKitMetrics(ctx context.Context, - request admin.NodeExecutionGetRequest) (*admin.WorkflowExecutionGetMetricsResponse, error) { +func (m *MetricsManager) GetTaskMetrics(ctx context.Context, + request admin.GetTaskMetricsRequest) (*admin.GetTaskMetricsResponse, error) { - nodeExecution, err := m.nodeExecutionManager.GetNodeExecution(ctx, request) + nodeExecution, err := m.nodeExecutionManager.GetNodeExecution(ctx, admin.NodeExecutionGetRequest{ + Id: request.Id, + }, + ) if err != nil { return nil, err } @@ -687,7 +690,7 @@ func (m *MetricsManager) GetFlyteKitMetrics(ctx context.Context, return nil, err } - return &admin.WorkflowExecutionGetMetricsResponse{Span: &flyteKitSpan}, nil + return &admin.GetTaskMetricsResponse{Span: &flyteKitSpan}, nil } // NewMetricsManager returns a new MetricsManager constructed with the provided arguments. diff --git a/pkg/manager/interfaces/metrics.go b/pkg/manager/interfaces/metrics.go index e88d61dcb..c07d8adb9 100644 --- a/pkg/manager/interfaces/metrics.go +++ b/pkg/manager/interfaces/metrics.go @@ -13,6 +13,6 @@ type MetricsInterface interface { GetExecutionMetrics(ctx context.Context, request admin.WorkflowExecutionGetMetricsRequest) ( *admin.WorkflowExecutionGetMetricsResponse, error) - GetFlyteKitMetrics(ctx context.Context, request admin.NodeExecutionGetRequest) ( - *admin.WorkflowExecutionGetMetricsResponse, error) + GetTaskMetrics(ctx context.Context, request admin.GetTaskMetricsRequest) ( + *admin.GetTaskMetricsResponse, error) } diff --git a/pkg/manager/mocks/metrics_interface.go b/pkg/manager/mocks/metrics_interface.go index bb37d355f..e9c95a62b 100644 --- a/pkg/manager/mocks/metrics_interface.go +++ b/pkg/manager/mocks/metrics_interface.go @@ -56,39 +56,39 @@ func (_m *MetricsInterface) GetExecutionMetrics(ctx context.Context, request adm return r0, r1 } -type MetricsInterface_GetFlyteKitMetrics struct { +type MetricsInterface_GetTaskMetrics struct { *mock.Call } -func (_m MetricsInterface_GetFlyteKitMetrics) Return(_a0 *admin.WorkflowExecutionGetMetricsResponse, _a1 error) *MetricsInterface_GetFlyteKitMetrics { - return &MetricsInterface_GetFlyteKitMetrics{Call: _m.Call.Return(_a0, _a1)} +func (_m MetricsInterface_GetTaskMetrics) Return(_a0 *admin.GetTaskMetricsRequest, _a1 error) *MetricsInterface_GetTaskMetrics { + return &MetricsInterface_GetTaskMetrics{Call: _m.Call.Return(_a0, _a1)} } -func (_m *MetricsInterface) OnGetFlyteKitMetrics(ctx context.Context, request admin.NodeExecutionGetRequest) *MetricsInterface_GetFlyteKitMetrics { - c_call := _m.On("GetFlyteKitMetrics", ctx, request) - return &MetricsInterface_GetFlyteKitMetrics{Call: c_call} +func (_m *MetricsInterface) OnGetTaskMetrics(ctx context.Context, request admin.GetTaskMetricsRequest) *MetricsInterface_GetTaskMetrics { + c_call := _m.On("GetTaskMetrics", ctx, request) + return &MetricsInterface_GetTaskMetrics{Call: c_call} } -func (_m *MetricsInterface) OnGetFlyteKitMetricsMatch(matchers ...interface{}) *MetricsInterface_GetFlyteKitMetrics { - c_call := _m.On("GetFlyteKitMetrics", matchers...) - return &MetricsInterface_GetFlyteKitMetrics{Call: c_call} +func (_m *MetricsInterface) OnGetTaskMetricsMatch(matchers ...interface{}) *MetricsInterface_GetTaskMetrics { + c_call := _m.On("GetTaskMetrics", matchers...) + return &MetricsInterface_GetTaskMetrics{Call: c_call} } -// GetFlyteKitMetrics provides a mock function with given fields: ctx, request -func (_m *MetricsInterface) GetFlyteKitMetrics(ctx context.Context, request admin.NodeExecutionGetRequest) (*admin.WorkflowExecutionGetMetricsResponse, error) { +// GetTaskMetrics provides a mock function with given fields: ctx, request +func (_m *MetricsInterface) GetTaskMetrics(ctx context.Context, request admin.GetTaskMetricsRequest) (*admin.GetTaskMetricsRequest, error) { ret := _m.Called(ctx, request) - var r0 *admin.WorkflowExecutionGetMetricsResponse - if rf, ok := ret.Get(0).(func(context.Context, admin.NodeExecutionGetRequest) *admin.WorkflowExecutionGetMetricsResponse); ok { + var r0 *admin.GetTaskMetricsRequest + if rf, ok := ret.Get(0).(func(context.Context, admin.GetTaskMetricsRequest) *admin.GetTaskMetricsRequest); ok { r0 = rf(ctx, request) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*admin.WorkflowExecutionGetMetricsResponse) + r0 = ret.Get(0).(*admin.GetTaskMetricsRequest) } } var r1 error - if rf, ok := ret.Get(1).(func(context.Context, admin.NodeExecutionGetRequest) error); ok { + if rf, ok := ret.Get(1).(func(context.Context, admin.GetTaskMetricsRequest) error); ok { r1 = rf(ctx, request) } else { r1 = ret.Error(1) diff --git a/pkg/rpc/adminservice/execution.go b/pkg/rpc/adminservice/execution.go index e956eeaee..2db3d1766 100644 --- a/pkg/rpc/adminservice/execution.go +++ b/pkg/rpc/adminservice/execution.go @@ -159,21 +159,21 @@ func (m *AdminService) GetExecutionMetrics( return response, nil } -func (m *AdminService) GetFlyteKitMetrics( - ctx context.Context, request *admin.NodeExecutionGetRequest) (*admin.WorkflowExecutionGetMetricsResponse, error) { +func (m *AdminService) GetTaskMetrics( + ctx context.Context, request *admin.GetTaskMetricsRequest) (*admin.GetTaskMetricsResponse, error) { defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } - var response *admin.WorkflowExecutionGetMetricsResponse + var response *admin.GetTaskMetricsResponse var err error - m.Metrics.executionEndpointMetrics.GetFlyteKitMetrics.Time(func() { - response, err = m.MetricsManager.GetFlyteKitMetrics(ctx, *request) + m.Metrics.executionEndpointMetrics.GetTaskMetrics.Time(func() { + response, err = m.MetricsManager.GetTaskMetrics(ctx, *request) }) if err != nil { - return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.GetFlyteKitMetrics) + return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.GetTaskMetrics) } - m.Metrics.executionEndpointMetrics.GetFlyteKitMetrics.Success() + m.Metrics.executionEndpointMetrics.GetTaskMetrics.Success() return response, nil } diff --git a/pkg/rpc/adminservice/metrics.go b/pkg/rpc/adminservice/metrics.go index f30180bac..68d9296cc 100644 --- a/pkg/rpc/adminservice/metrics.go +++ b/pkg/rpc/adminservice/metrics.go @@ -10,17 +10,17 @@ import ( type executionEndpointMetrics struct { scope promutils.Scope - create util.RequestMetrics - relaunch util.RequestMetrics - recover util.RequestMetrics - createEvent util.RequestMetrics - get util.RequestMetrics - update util.RequestMetrics - getData util.RequestMetrics - getMetrics util.RequestMetrics - GetFlyteKitMetrics util.RequestMetrics - list util.RequestMetrics - terminate util.RequestMetrics + create util.RequestMetrics + relaunch util.RequestMetrics + recover util.RequestMetrics + createEvent util.RequestMetrics + get util.RequestMetrics + update util.RequestMetrics + getData util.RequestMetrics + getMetrics util.RequestMetrics + GetTaskMetrics util.RequestMetrics + list util.RequestMetrics + terminate util.RequestMetrics } type launchPlanEndpointMetrics struct { @@ -132,18 +132,18 @@ func InitMetrics(adminScope promutils.Scope) AdminMetrics { "panics encountered while handling requests to the admin service"), executionEndpointMetrics: executionEndpointMetrics{ - scope: adminScope, - create: util.NewRequestMetrics(adminScope, "create_execution"), - relaunch: util.NewRequestMetrics(adminScope, "relaunch_execution"), - recover: util.NewRequestMetrics(adminScope, "recover_execution"), - createEvent: util.NewRequestMetrics(adminScope, "create_execution_event"), - get: util.NewRequestMetrics(adminScope, "get_execution"), - update: util.NewRequestMetrics(adminScope, "update_execution"), - getData: util.NewRequestMetrics(adminScope, "get_execution_data"), - getMetrics: util.NewRequestMetrics(adminScope, "get_execution_metrics"), - GetFlyteKitMetrics: util.NewRequestMetrics(adminScope, "get_flytekit_metrics"), - list: util.NewRequestMetrics(adminScope, "list_execution"), - terminate: util.NewRequestMetrics(adminScope, "terminate_execution"), + scope: adminScope, + create: util.NewRequestMetrics(adminScope, "create_execution"), + relaunch: util.NewRequestMetrics(adminScope, "relaunch_execution"), + recover: util.NewRequestMetrics(adminScope, "recover_execution"), + createEvent: util.NewRequestMetrics(adminScope, "create_execution_event"), + get: util.NewRequestMetrics(adminScope, "get_execution"), + update: util.NewRequestMetrics(adminScope, "update_execution"), + getData: util.NewRequestMetrics(adminScope, "get_execution_data"), + getMetrics: util.NewRequestMetrics(adminScope, "get_execution_metrics"), + GetTaskMetrics: util.NewRequestMetrics(adminScope, "get_flytekit_metrics"), + list: util.NewRequestMetrics(adminScope, "list_execution"), + terminate: util.NewRequestMetrics(adminScope, "terminate_execution"), }, launchPlanEndpointMetrics: launchPlanEndpointMetrics{ scope: adminScope,