diff --git a/protocol/app/app.go b/protocol/app/app.go index 0f7cadde54..9effdb5169 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -288,7 +288,8 @@ type App struct { // can correctly operate. startDaemons func() - PriceFeedClient *pricefeedclient.Client + PriceFeedClient *pricefeedclient.Client + LiquidationsClient *liquidationclient.Client } // assertAppPreconditions assert invariants required for an application to start. @@ -602,14 +603,14 @@ func New( app.Server.ExpectLiquidationsDaemon( daemonservertypes.MaximumAcceptableUpdateDelay(daemonFlags.Liquidation.LoopDelayMs), ) + app.LiquidationsClient = liquidationclient.NewClient(logger) go func() { - if err := liquidationclient.Start( + if err := app.LiquidationsClient.Start( // The client will use `context.Background` so that it can have a different context from // the main application. context.Background(), daemonFlags, appFlags, - logger, &daemontypes.GrpcClientImpl{}, ); err != nil { panic(err) diff --git a/protocol/app/app_test.go b/protocol/app/app_test.go index 6e1f0f0ce3..e706435341 100644 --- a/protocol/app/app_test.go +++ b/protocol/app/app_test.go @@ -97,11 +97,17 @@ func TestAppIsFullyInitialized(t *testing.T) { tApp.InitChain() uninitializedFields := getUninitializedStructFields(reflect.ValueOf(*tApp.App)) - // Note that the PriceFeedClient is currently hard coded as disabled in GetDefaultTestAppOptions. - // Normally it would be only disabled for non-validating full nodes or for nodes where the - // price feed client is explicitly disabled. - if idx := slices.Index(uninitializedFields, "PriceFeedClient"); idx >= 0 { - slices.Remove(&uninitializedFields, idx) + // Note that the daemon clients are currently hard coded as disabled in GetDefaultTestAppOptions. + // Normally they would be only disabled for non-validating full nodes or for nodes where any + // daemon is explicitly disabled. + expectedUninitializedFields := []string{ + "PriceFeedClient", + "LiquidationsClient", + } + for _, field := range expectedUninitializedFields { + if idx := slices.Index(uninitializedFields, field); idx >= 0 { + slices.Remove(&uninitializedFields, idx) + } } require.Len( diff --git a/protocol/daemons/liquidation/client/client.go b/protocol/daemons/liquidation/client/client.go index 6eee3cca6b..5d622f69a4 100644 --- a/protocol/daemons/liquidation/client/client.go +++ b/protocol/daemons/liquidation/client/client.go @@ -2,36 +2,55 @@ package client import ( "context" + "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types" + timelib "github.com/dydxprotocol/v4-chain/protocol/lib/time" "time" - "github.com/dydxprotocol/v4-chain/protocol/lib" - - gometrics "github.com/armon/go-metrics" "github.com/cometbft/cometbft/libs/log" - "github.com/cosmos/cosmos-sdk/telemetry" - "github.com/cosmos/cosmos-sdk/types/query" appflags "github.com/dydxprotocol/v4-chain/protocol/app/flags" "github.com/dydxprotocol/v4-chain/protocol/daemons/flags" "github.com/dydxprotocol/v4-chain/protocol/daemons/liquidation/api" daemontypes "github.com/dydxprotocol/v4-chain/protocol/daemons/types" - "github.com/dydxprotocol/v4-chain/protocol/lib/metrics" clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" ) +// Client implements a daemon service client that periodically calculates and reports liquidatable subaccounts +// to the protocol. +type Client struct { + // include HealthCheckable to track the health of the daemon. + daemontypes.HealthCheckable + + // logger is the logger for the daemon. + logger log.Logger +} + +// Ensure Client implements the HealthCheckable interface. +var _ daemontypes.HealthCheckable = (*Client)(nil) + +func NewClient(logger log.Logger) *Client { + return &Client{ + HealthCheckable: daemontypes.NewTimeBoundedHealthCheckable( + types.LiquidationsDaemonServiceName, + &timelib.TimeProviderImpl{}, + logger, + ), + logger: logger, + } +} + // Start begins a job that periodically: // 1) Queries a gRPC server for all subaccounts including their open positions. // 2) Checks collateralization statuses of subaccounts with at least one open position. // 3) Sends a list of subaccount ids that potentially need to be liquidated to the application. -func Start( +func (c *Client) Start( ctx context.Context, flags flags.DaemonFlags, appFlags appflags.Flags, - logger log.Logger, grpcClient daemontypes.GrpcClient, ) error { // Log the daemon flags. - logger.Info( + c.logger.Info( "Starting liquidations daemon with flags", "LiquidationFlags", flags.Liquidation, ) @@ -39,7 +58,7 @@ func Start( // Make a connection to the Cosmos gRPC query services. queryConn, err := grpcClient.NewTcpConnection(ctx, appFlags.GrpcAddress) if err != nil { - logger.Error("Failed to establish gRPC connection to Cosmos gRPC query services", "error", err) + c.logger.Error("Failed to establish gRPC connection to Cosmos gRPC query services", "error", err) return err } defer func() { @@ -51,7 +70,7 @@ func Start( // Make a connection to the private daemon gRPC server. daemonConn, err := grpcClient.NewGrpcConnection(ctx, flags.Shared.SocketAddress) if err != nil { - logger.Error("Failed to establish gRPC connection to socket address", "error", err) + c.logger.Error("Failed to establish gRPC connection to socket address", "error", err) return err } defer func() { @@ -65,262 +84,55 @@ func Start( liquidationServiceClient := api.NewLiquidationServiceClient(daemonConn) ticker := time.NewTicker(time.Duration(flags.Liquidation.LoopDelayMs) * time.Millisecond) - for ; true; <-ticker.C { - if err := RunLiquidationDaemonTaskLoop( - ctx, - flags.Liquidation, - subaccountQueryClient, - clobQueryClient, - liquidationServiceClient, - ); err != nil { - // TODO(DEC-947): Move daemon shutdown to application. - logger.Error("Liquidations daemon returned error", "error", err) - } - } - - return nil -} - -// RunLiquidationDaemonTaskLoop contains the logic to communicate with various gRPC services -// to find the liquidatable subaccount ids. -func RunLiquidationDaemonTaskLoop( - ctx context.Context, - liqFlags flags.LiquidationFlags, - subaccountQueryClient satypes.QueryClient, - clobQueryClient clobtypes.QueryClient, - liquidationServiceClient api.LiquidationServiceClient, -) error { - defer telemetry.ModuleMeasureSince( - metrics.LiquidationDaemon, - time.Now(), - metrics.MainTaskLoop, - metrics.Latency, - ) + stop := make(chan bool) - // 1. Fetch all subaccounts from query service. - subaccounts, err := GetAllSubaccounts( + s := &SubTaskRunnerImpl{} + StartLiquidationsDaemonTaskLoop( + c, ctx, + s, + flags, + ticker, + stop, subaccountQueryClient, - liqFlags.SubaccountPageLimit, - ) - if err != nil { - return err - } - - // 2. Check collateralization statuses of subaccounts with at least one open position. - liquidatableSubaccountIds, err := GetLiquidatableSubaccountIds( - ctx, clobQueryClient, - liqFlags, - subaccounts, - ) - if err != nil { - return err - } - - // 3. Send the list of liquidatable subaccount ids to the daemon server. - err = SendLiquidatableSubaccountIds( - ctx, liquidationServiceClient, - liquidatableSubaccountIds, ) - if err != nil { - return err - } return nil } -// GetAllSubaccounts queries a gRPC server and returns a list of subaccounts and -// their balances and open positions. -func GetAllSubaccounts( +// StartLiquidationsDaemonTaskLoop contains the logic to periodically run the liquidations daemon task. +func StartLiquidationsDaemonTaskLoop( + client *Client, ctx context.Context, - client satypes.QueryClient, - limit uint64, -) ( - subaccounts []satypes.Subaccount, - err error, + s SubTaskRunner, + flags flags.DaemonFlags, + ticker *time.Ticker, + stop <-chan bool, + subaccountQueryClient satypes.QueryClient, + clobQueryClient clobtypes.QueryClient, + liquidationServiceClient api.LiquidationServiceClient, ) { - defer telemetry.ModuleMeasureSince(metrics.LiquidationDaemon, time.Now(), metrics.GetAllSubaccounts, metrics.Latency) - subaccounts = make([]satypes.Subaccount, 0) - - var nextKey []byte for { - subaccountsFromKey, next, err := getSubaccountsFromKey( - ctx, - client, - limit, - nextKey, - ) - - if err != nil { - return nil, err - } - - subaccounts = append(subaccounts, subaccountsFromKey...) - nextKey = next - - if len(nextKey) == 0 { - break - } - } - - telemetry.ModuleSetGauge( - metrics.LiquidationDaemon, - float32(len(subaccounts)), - metrics.GetAllSubaccounts, - metrics.Count, - ) - - return subaccounts, nil -} - -// GetLiquidatableSubaccountIds verifies collateralization statuses of subaccounts with -// at least one open position and returns a list of unique and potentially liquidatable subaccount ids. -func GetLiquidatableSubaccountIds( - ctx context.Context, - client clobtypes.QueryClient, - liqFlags flags.LiquidationFlags, - subaccounts []satypes.Subaccount, -) ( - liquidatableSubaccountIds []satypes.SubaccountId, - err error, -) { - defer telemetry.ModuleMeasureSince( - metrics.LiquidationDaemon, - time.Now(), - metrics.GetLiquidatableSubaccountIds, - metrics.Latency, - ) - - // Filter out subaccounts with no open positions. - subaccountsToCheck := make([]satypes.SubaccountId, 0) - for _, subaccount := range subaccounts { - if len(subaccount.PerpetualPositions) > 0 { - subaccountsToCheck = append(subaccountsToCheck, *subaccount.Id) - } - } - - telemetry.ModuleSetGauge( - metrics.LiquidationDaemon, - float32(len(subaccountsToCheck)), - metrics.SubaccountsWithOpenPositions, - metrics.Count, - ) - - // Query the gRPC server in chunks of size `liqFlags.RequestChunkSize`. - liquidatableSubaccountIds = make([]satypes.SubaccountId, 0) - for start := 0; start < len(subaccountsToCheck); start += int(liqFlags.RequestChunkSize) { - end := lib.Min(start+int(liqFlags.RequestChunkSize), len(subaccountsToCheck)) - - results, err := CheckCollateralizationForSubaccounts( - ctx, - client, - subaccountsToCheck[start:end], - ) - if err != nil { - return nil, err - } - - for _, result := range results { - if result.IsLiquidatable { - liquidatableSubaccountIds = append(liquidatableSubaccountIds, result.SubaccountId) + select { + case <-ticker.C: + if err := s.RunLiquidationDaemonTaskLoop( + client, + ctx, + flags.Liquidation, + subaccountQueryClient, + clobQueryClient, + liquidationServiceClient, + ); err != nil { + // TODO(DEC-947): Move daemon shutdown to application. + client.logger.Error("Liquidations daemon returned error", "error", err) + client.ReportFailure(err) + } else { + client.ReportSuccess() } + case <-stop: + return } } - return liquidatableSubaccountIds, nil -} - -// CheckCollateralizationForSubaccounts queries a gRPC server using `AreSubaccountsLiquidatable` -// and returns a list of collateralization statuses for the given list of subaccount ids. -func CheckCollateralizationForSubaccounts( - ctx context.Context, - client clobtypes.QueryClient, - subaccountIds []satypes.SubaccountId, -) ( - results []clobtypes.AreSubaccountsLiquidatableResponse_Result, - err error, -) { - defer telemetry.ModuleMeasureSince( - metrics.LiquidationDaemon, - time.Now(), - metrics.CheckCollateralizationForSubaccounts, - metrics.Latency, - ) - - query := &clobtypes.AreSubaccountsLiquidatableRequest{ - SubaccountIds: subaccountIds, - } - response, err := client.AreSubaccountsLiquidatable(ctx, query) - if err != nil { - return nil, err - } - return response.Results, nil -} - -// SendLiquidatableSubaccountIds sends a list of unique and potentially liquidatable -// subaccount ids to a gRPC server via `LiquidateSubaccounts`. -func SendLiquidatableSubaccountIds( - ctx context.Context, - client api.LiquidationServiceClient, - subaccountIds []satypes.SubaccountId, -) error { - defer telemetry.ModuleMeasureSince( - metrics.LiquidationDaemon, - time.Now(), - metrics.SendLiquidatableSubaccountIds, - metrics.Latency, - ) - - telemetry.ModuleSetGauge( - metrics.LiquidationDaemon, - float32(len(subaccountIds)), - metrics.LiquidatableSubaccountIds, - metrics.Count, - ) - - request := &api.LiquidateSubaccountsRequest{ - SubaccountIds: subaccountIds, - } - - if _, err := client.LiquidateSubaccounts(ctx, request); err != nil { - return err - } - return nil -} - -func getSubaccountsFromKey( - ctx context.Context, - client satypes.QueryClient, - limit uint64, - pageRequestKey []byte, -) ( - subaccounts []satypes.Subaccount, - nextKey []byte, - err error, -) { - defer metrics.ModuleMeasureSinceWithLabels( - metrics.LiquidationDaemon, - []string{metrics.GetSubaccountsFromKey, metrics.Latency}, - time.Now(), - []gometrics.Label{ - metrics.GetLabelForIntValue(metrics.PageLimit, int(limit)), - }, - ) - - query := &satypes.QueryAllSubaccountRequest{ - Pagination: &query.PageRequest{ - Key: pageRequestKey, - Limit: limit, - }, - } - - response, err := client.SubaccountAll(ctx, query) - if err != nil { - return nil, nil, err - } - if response.Pagination != nil { - nextKey = response.Pagination.NextKey - } - return response.Subaccount, nextKey, nil } diff --git a/protocol/daemons/liquidation/client/client_test.go b/protocol/daemons/liquidation/client/client_test.go index 6475785344..1661434ef8 100644 --- a/protocol/daemons/liquidation/client/client_test.go +++ b/protocol/daemons/liquidation/client/client_test.go @@ -3,23 +3,24 @@ package client_test import ( "context" "errors" - appflags "github.com/dydxprotocol/v4-chain/protocol/app/flags" - "github.com/dydxprotocol/v4-chain/protocol/testutil/appoptions" - "testing" - + "fmt" "github.com/cometbft/cometbft/libs/log" "github.com/cosmos/cosmos-sdk/types/query" + appflags "github.com/dydxprotocol/v4-chain/protocol/app/flags" d_constants "github.com/dydxprotocol/v4-chain/protocol/daemons/constants" "github.com/dydxprotocol/v4-chain/protocol/daemons/flags" "github.com/dydxprotocol/v4-chain/protocol/daemons/liquidation/api" "github.com/dydxprotocol/v4-chain/protocol/daemons/liquidation/client" "github.com/dydxprotocol/v4-chain/protocol/mocks" + "github.com/dydxprotocol/v4-chain/protocol/testutil/appoptions" "github.com/dydxprotocol/v4-chain/protocol/testutil/constants" + daemontestutils "github.com/dydxprotocol/v4-chain/protocol/testutil/daemons" "github.com/dydxprotocol/v4-chain/protocol/testutil/grpc" clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "testing" ) func TestStart_TcpConnectionFails(t *testing.T) { @@ -28,13 +29,13 @@ func TestStart_TcpConnectionFails(t *testing.T) { mockGrpcClient := &mocks.GrpcClient{} mockGrpcClient.On("NewTcpConnection", grpc.Ctx, d_constants.DefaultGrpcEndpoint).Return(nil, errors.New(errorMsg)) + liquidationsClient := client.NewClient(log.NewNopLogger()) require.EqualError( t, - client.Start( + liquidationsClient.Start( grpc.Ctx, flags.GetDefaultDaemonFlags(), appflags.GetFlagValuesFromOptions(appoptions.GetDefaultTestAppOptions("", nil)), - log.NewNopLogger(), mockGrpcClient, ), errorMsg, @@ -52,13 +53,13 @@ func TestStart_UnixSocketConnectionFails(t *testing.T) { mockGrpcClient.On("NewGrpcConnection", grpc.Ctx, grpc.SocketPath).Return(nil, errors.New(errorMsg)) mockGrpcClient.On("CloseConnection", grpc.GrpcConn).Return(nil) + liquidationsClient := client.NewClient(log.NewNopLogger()) require.EqualError( t, - client.Start( + liquidationsClient.Start( grpc.Ctx, flags.GetDefaultDaemonFlags(), appflags.GetFlagValuesFromOptions(appoptions.GetDefaultTestAppOptions("", nil)), - log.NewNopLogger(), mockGrpcClient, ), errorMsg, @@ -223,8 +224,12 @@ func TestRunLiquidationDaemonTaskLoop(t *testing.T) { t.Run(name, func(t *testing.T) { queryClientMock := &mocks.QueryClient{} tc.setupMocks(grpc.Ctx, queryClientMock) + s := client.SubTaskRunnerImpl{} + + c := client.NewClient(log.NewNopLogger()) - err := client.RunLiquidationDaemonTaskLoop( + err := s.RunLiquidationDaemonTaskLoop( + c, grpc.Ctx, flags.GetDefaultDaemonFlags().Liquidation, queryClientMock, @@ -241,245 +246,94 @@ func TestRunLiquidationDaemonTaskLoop(t *testing.T) { } } -func TestGetAllSubaccounts(t *testing.T) { - df := flags.GetDefaultDaemonFlags() - tests := map[string]struct { - // mocks - setupMocks func(ctx context.Context, mck *mocks.QueryClient) +// FakeSubTaskRunner is a mock implementation of the SubTaskRunner interface for testing. +type FakeSubTaskRunner struct { + err error + called bool +} - // expectations - expectedSubaccounts []satypes.Subaccount - expectedError error - }{ - "Success": { - setupMocks: func(ctx context.Context, mck *mocks.QueryClient) { - req := &satypes.QueryAllSubaccountRequest{ - Pagination: &query.PageRequest{ - Limit: df.Liquidation.SubaccountPageLimit, - }, - } - response := &satypes.QuerySubaccountAllResponse{ - Subaccount: []satypes.Subaccount{ - constants.Carl_Num0_599USD, - constants.Dave_Num0_599USD, - }, - } - mck.On("SubaccountAll", ctx, req).Return(response, nil) - }, - expectedSubaccounts: []satypes.Subaccount{ - constants.Carl_Num0_599USD, - constants.Dave_Num0_599USD, - }, - }, - "Success Paginated": { - setupMocks: func(ctx context.Context, mck *mocks.QueryClient) { - req := &satypes.QueryAllSubaccountRequest{ - Pagination: &query.PageRequest{ - Limit: df.Liquidation.SubaccountPageLimit, - }, - } - nextKey := []byte("next key") - response := &satypes.QuerySubaccountAllResponse{ - Subaccount: []satypes.Subaccount{ - constants.Carl_Num0_599USD, - }, - Pagination: &query.PageResponse{ - NextKey: nextKey, - }, - } - mck.On("SubaccountAll", ctx, req).Return(response, nil) - req2 := &satypes.QueryAllSubaccountRequest{ - Pagination: &query.PageRequest{ - Key: nextKey, - Limit: df.Liquidation.SubaccountPageLimit, - }, - } - response2 := &satypes.QuerySubaccountAllResponse{ - Subaccount: []satypes.Subaccount{ - constants.Dave_Num0_599USD, - }, - } - mck.On("SubaccountAll", ctx, req2).Return(response2, nil) - }, - expectedSubaccounts: []satypes.Subaccount{ - constants.Carl_Num0_599USD, - constants.Dave_Num0_599USD, - }, - }, - "Errors are propagated": { - setupMocks: func(ctx context.Context, mck *mocks.QueryClient) { - req := &satypes.QueryAllSubaccountRequest{ - Pagination: &query.PageRequest{ - Limit: df.Liquidation.SubaccountPageLimit, - }, - } - mck.On("SubaccountAll", ctx, req).Return(nil, errors.New("test error")) - }, - expectedError: errors.New("test error"), - }, +func NewFakeSubTaskRunnerWithError(err error) *FakeSubTaskRunner { + return &FakeSubTaskRunner{ + err: err, } +} - for name, tc := range tests { - t.Run(name, func(t *testing.T) { - queryClientMock := &mocks.QueryClient{} - tc.setupMocks(grpc.Ctx, queryClientMock) - - actual, err := client.GetAllSubaccounts(grpc.Ctx, queryClientMock, df.Liquidation.SubaccountPageLimit) - if err != nil { - require.EqualError(t, err, tc.expectedError.Error()) - } else { - require.Equal(t, tc.expectedSubaccounts, actual) - } - }) - } +// RunLiquidationDaemonTaskLoop is a mock implementation of the SubTaskRunner interface. It records the +// call as a sanity check, and returns the error set by NewFakeSubTaskRunnerWithError. +func (f *FakeSubTaskRunner) RunLiquidationDaemonTaskLoop( + _ *client.Client, + _ context.Context, + _ flags.LiquidationFlags, + _ satypes.QueryClient, + _ clobtypes.QueryClient, + _ api.LiquidationServiceClient, +) error { + f.called = true + return f.err } -func TestCheckCollateralizationForSubaccounts(t *testing.T) { +func TestHealthCheck_Mixed(t *testing.T) { tests := map[string]struct { - // mocks - setupMocks func( - ctx context.Context, - mck *mocks.QueryClient, - results []clobtypes.AreSubaccountsLiquidatableResponse_Result, - ) - subaccountIds []satypes.SubaccountId - - // expectations - expectedResults []clobtypes.AreSubaccountsLiquidatableResponse_Result - expectedError error + // taskLoopResponses is a list of errors returned by the task loop. If the error is nil, the task loop is + // considered to have succeeded. + taskLoopResponses []error + expectedHealthStatus error }{ - "Success": { - setupMocks: func( - ctx context.Context, - mck *mocks.QueryClient, - results []clobtypes.AreSubaccountsLiquidatableResponse_Result, - ) { - query := &clobtypes.AreSubaccountsLiquidatableRequest{ - SubaccountIds: []satypes.SubaccountId{ - constants.Alice_Num0, - constants.Bob_Num0, - }, - } - response := &clobtypes.AreSubaccountsLiquidatableResponse{ - Results: results, - } - mck.On("AreSubaccountsLiquidatable", ctx, query).Return(response, nil) - }, - subaccountIds: []satypes.SubaccountId{ - constants.Alice_Num0, - constants.Bob_Num0, - }, - expectedResults: []clobtypes.AreSubaccountsLiquidatableResponse_Result{ - { - SubaccountId: constants.Alice_Num0, - IsLiquidatable: true, - }, - { - SubaccountId: constants.Bob_Num0, - IsLiquidatable: false, - }, + "Healthy - successful update": { + taskLoopResponses: []error{ + nil, // 1 successful update }, + expectedHealthStatus: nil, // healthy status }, - "Success - Empty": { - setupMocks: func( - ctx context.Context, - mck *mocks.QueryClient, - results []clobtypes.AreSubaccountsLiquidatableResponse_Result, - ) { - query := &clobtypes.AreSubaccountsLiquidatableRequest{ - SubaccountIds: []satypes.SubaccountId{}, - } - response := &clobtypes.AreSubaccountsLiquidatableResponse{ - Results: results, - } - mck.On("AreSubaccountsLiquidatable", ctx, query).Return(response, nil) + "Unhealthy - failed update": { + taskLoopResponses: []error{ + fmt.Errorf("failed to update"), // 1 failed update }, - subaccountIds: []satypes.SubaccountId{}, - expectedResults: []clobtypes.AreSubaccountsLiquidatableResponse_Result{}, + expectedHealthStatus: fmt.Errorf("no successful update has occurred"), }, - "Errors are propagated": { - setupMocks: func( - ctx context.Context, - mck *mocks.QueryClient, - results []clobtypes.AreSubaccountsLiquidatableResponse_Result, - ) { - query := &clobtypes.AreSubaccountsLiquidatableRequest{ - SubaccountIds: []satypes.SubaccountId{}, - } - mck.On("AreSubaccountsLiquidatable", ctx, query).Return(nil, errors.New("test error")) + "Unhealthy - failed update after successful update": { + taskLoopResponses: []error{ + nil, // 1 successful update + fmt.Errorf("failed to update"), // 1 failed update }, - subaccountIds: []satypes.SubaccountId{}, - expectedResults: []clobtypes.AreSubaccountsLiquidatableResponse_Result{}, - expectedError: errors.New("test error"), + expectedHealthStatus: fmt.Errorf("last update failed"), }, } - for name, tc := range tests { t.Run(name, func(t *testing.T) { - queryClientMock := &mocks.QueryClient{} - tc.setupMocks(grpc.Ctx, queryClientMock, tc.expectedResults) + // Setup. + c := client.NewClient(log.NewNopLogger()) - actual, err := client.CheckCollateralizationForSubaccounts(grpc.Ctx, queryClientMock, tc.subaccountIds) - if err != nil { - require.EqualError(t, err, tc.expectedError.Error()) - } else { - require.Equal(t, tc.expectedResults, actual) - } - }) - } -} - -func TestSendLiquidatableSubaccountIds(t *testing.T) { - tests := map[string]struct { - // mocks - setupMocks func(ctx context.Context, mck *mocks.QueryClient, ids []satypes.SubaccountId) - subaccountIds []satypes.SubaccountId - - // expectations - expectedError error - }{ - "Success": { - setupMocks: func(ctx context.Context, mck *mocks.QueryClient, ids []satypes.SubaccountId) { - req := &api.LiquidateSubaccountsRequest{ - SubaccountIds: ids, - } - response := &api.LiquidateSubaccountsResponse{} - mck.On("LiquidateSubaccounts", ctx, req).Return(response, nil) - }, - subaccountIds: []satypes.SubaccountId{ - constants.Alice_Num0, - constants.Bob_Num0, - }, - }, - "Success Empty": { - setupMocks: func(ctx context.Context, mck *mocks.QueryClient, ids []satypes.SubaccountId) { - req := &api.LiquidateSubaccountsRequest{ - SubaccountIds: ids, - } - response := &api.LiquidateSubaccountsResponse{} - mck.On("LiquidateSubaccounts", ctx, req).Return(response, nil) - }, - subaccountIds: []satypes.SubaccountId{}, - }, - "Errors are propagated": { - setupMocks: func(ctx context.Context, mck *mocks.QueryClient, ids []satypes.SubaccountId) { - req := &api.LiquidateSubaccountsRequest{ - SubaccountIds: ids, - } - mck.On("LiquidateSubaccounts", ctx, req).Return(nil, errors.New("test error")) - }, - subaccountIds: []satypes.SubaccountId{}, - expectedError: errors.New("test error"), - }, - } + // Sanity check - the client should be unhealthy before the first successful update. + require.ErrorContains( + t, + c.HealthCheck(), + "no successful update has occurred", + ) - for name, tc := range tests { - t.Run(name, func(t *testing.T) { - queryClientMock := &mocks.QueryClient{} - tc.setupMocks(grpc.Ctx, queryClientMock, tc.subaccountIds) + // Run the sequence of task loop responses. + for _, taskLoopError := range tc.taskLoopResponses { + ticker, stop := daemontestutils.SingleTickTickerAndStop() + // Start the daemon task loop. Since we created a single-tick ticker, this will run for one iteration and + // return. + client.StartLiquidationsDaemonTaskLoop( + c, + grpc.Ctx, + NewFakeSubTaskRunnerWithError(taskLoopError), + flags.GetDefaultDaemonFlags(), + ticker, + stop, + &mocks.QueryClient{}, + &mocks.QueryClient{}, + &mocks.QueryClient{}, + ) + } - err := client.SendLiquidatableSubaccountIds(grpc.Ctx, queryClientMock, tc.subaccountIds) - require.Equal(t, tc.expectedError, err) + if tc.expectedHealthStatus == nil { + require.NoError(t, c.HealthCheck()) + } else { + require.ErrorContains(t, c.HealthCheck(), tc.expectedHealthStatus.Error()) + } }) } } diff --git a/protocol/daemons/liquidation/client/sub_task_runner.go b/protocol/daemons/liquidation/client/sub_task_runner.go new file mode 100644 index 0000000000..db8b92b321 --- /dev/null +++ b/protocol/daemons/liquidation/client/sub_task_runner.go @@ -0,0 +1,285 @@ +package client + +import ( + "context" + gometrics "github.com/armon/go-metrics" + "github.com/cosmos/cosmos-sdk/telemetry" + "github.com/cosmos/cosmos-sdk/types/query" + "github.com/dydxprotocol/v4-chain/protocol/daemons/flags" + "github.com/dydxprotocol/v4-chain/protocol/daemons/liquidation/api" + "github.com/dydxprotocol/v4-chain/protocol/lib" + "github.com/dydxprotocol/v4-chain/protocol/lib/metrics" + clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" + satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" + "time" +) + +// SubTaskRunner provides an interface that encapsulates the liquidations daemon logic to gather and report +// potentially liquidatable subaccount ids. This interface is used to mock the daemon logic in tests. +type SubTaskRunner interface { + RunLiquidationDaemonTaskLoop( + client *Client, + ctx context.Context, + liqFlags flags.LiquidationFlags, + subaccountQueryClient satypes.QueryClient, + clobQueryClient clobtypes.QueryClient, + liquidationServiceClient api.LiquidationServiceClient, + ) error +} + +type SubTaskRunnerImpl struct{} + +// Ensure SubTaskRunnerImpl implements the SubTaskRunner interface. +var _ SubTaskRunner = (*SubTaskRunnerImpl)(nil) + +// RunLiquidationDaemonTaskLoop contains the logic to communicate with various gRPC services +// to find the liquidatable subaccount ids. +func (s *SubTaskRunnerImpl) RunLiquidationDaemonTaskLoop( + client *Client, + ctx context.Context, + liqFlags flags.LiquidationFlags, + subaccountQueryClient satypes.QueryClient, + clobQueryClient clobtypes.QueryClient, + liquidationServiceClient api.LiquidationServiceClient, +) error { + defer telemetry.ModuleMeasureSince( + metrics.LiquidationDaemon, + time.Now(), + metrics.MainTaskLoop, + metrics.Latency, + ) + + // 1. Fetch all subaccounts from query service. + subaccounts, err := GetAllSubaccounts( + client, + ctx, + subaccountQueryClient, + liqFlags.SubaccountPageLimit, + ) + if err != nil { + return err + } + + // 2. Check collateralization statuses of subaccounts with at least one open position. + liquidatableSubaccountIds, err := GetLiquidatableSubaccountIds( + client, + ctx, + clobQueryClient, + liqFlags, + subaccounts, + ) + if err != nil { + return err + } + + // 3. Send the list of liquidatable subaccount ids to the daemon server. + err = SendLiquidatableSubaccountIds( + ctx, + liquidationServiceClient, + liquidatableSubaccountIds, + ) + if err != nil { + return err + } + + return nil +} + +// CheckCollateralizationForSubaccounts queries a gRPC server using `AreSubaccountsLiquidatable` +// and returns a list of collateralization statuses for the given list of subaccount ids. +func CheckCollateralizationForSubaccounts( + daemon *Client, + ctx context.Context, + client clobtypes.QueryClient, + subaccountIds []satypes.SubaccountId, +) ( + results []clobtypes.AreSubaccountsLiquidatableResponse_Result, + err error, +) { + defer telemetry.ModuleMeasureSince( + metrics.LiquidationDaemon, + time.Now(), + metrics.CheckCollateralizationForSubaccounts, + metrics.Latency, + ) + + query := &clobtypes.AreSubaccountsLiquidatableRequest{ + SubaccountIds: subaccountIds, + } + response, err := client.AreSubaccountsLiquidatable(ctx, query) + if err != nil { + return nil, err + } + + return response.Results, nil +} + +// SendLiquidatableSubaccountIds sends a list of unique and potentially liquidatable +// subaccount ids to a gRPC server via `LiquidateSubaccounts`. +func SendLiquidatableSubaccountIds( + ctx context.Context, + client api.LiquidationServiceClient, + subaccountIds []satypes.SubaccountId, +) error { + defer telemetry.ModuleMeasureSince( + metrics.LiquidationDaemon, + time.Now(), + metrics.SendLiquidatableSubaccountIds, + metrics.Latency, + ) + + telemetry.ModuleSetGauge( + metrics.LiquidationDaemon, + float32(len(subaccountIds)), + metrics.LiquidatableSubaccountIds, + metrics.Count, + ) + + request := &api.LiquidateSubaccountsRequest{ + SubaccountIds: subaccountIds, + } + + if _, err := client.LiquidateSubaccounts(ctx, request); err != nil { + return err + } + return nil +} + +func getSubaccountsFromKey( + ctx context.Context, + client satypes.QueryClient, + limit uint64, + pageRequestKey []byte, +) ( + subaccounts []satypes.Subaccount, + nextKey []byte, + err error, +) { + defer metrics.ModuleMeasureSinceWithLabels( + metrics.LiquidationDaemon, + []string{metrics.GetSubaccountsFromKey, metrics.Latency}, + time.Now(), + []gometrics.Label{ + metrics.GetLabelForIntValue(metrics.PageLimit, int(limit)), + }, + ) + + query := &satypes.QueryAllSubaccountRequest{ + Pagination: &query.PageRequest{ + Key: pageRequestKey, + Limit: limit, + }, + } + + response, err := client.SubaccountAll(ctx, query) + if err != nil { + return nil, nil, err + } + if response.Pagination != nil { + nextKey = response.Pagination.NextKey + } + return response.Subaccount, nextKey, nil +} + +// GetAllSubaccounts queries a gRPC server and returns a list of subaccounts and +// their balances and open positions. +func GetAllSubaccounts( + daemon *Client, + ctx context.Context, + client satypes.QueryClient, + limit uint64, +) ( + subaccounts []satypes.Subaccount, + err error, +) { + defer telemetry.ModuleMeasureSince(metrics.LiquidationDaemon, time.Now(), metrics.GetAllSubaccounts, metrics.Latency) + subaccounts = make([]satypes.Subaccount, 0) + + var nextKey []byte + for { + subaccountsFromKey, next, err := getSubaccountsFromKey( + ctx, + client, + limit, + nextKey, + ) + + if err != nil { + return nil, err + } + + subaccounts = append(subaccounts, subaccountsFromKey...) + nextKey = next + + if len(nextKey) == 0 { + break + } + } + + telemetry.ModuleSetGauge( + metrics.LiquidationDaemon, + float32(len(subaccounts)), + metrics.GetAllSubaccounts, + metrics.Count, + ) + + return subaccounts, nil +} + +// GetLiquidatableSubaccountIds verifies collateralization statuses of subaccounts with +// at least one open position and returns a list of unique and potentially liquidatable subaccount ids. +func GetLiquidatableSubaccountIds( + daemon *Client, + ctx context.Context, + client clobtypes.QueryClient, + liqFlags flags.LiquidationFlags, + subaccounts []satypes.Subaccount, +) ( + liquidatableSubaccountIds []satypes.SubaccountId, + err error, +) { + defer telemetry.ModuleMeasureSince( + metrics.LiquidationDaemon, + time.Now(), + metrics.GetLiquidatableSubaccountIds, + metrics.Latency, + ) + + // Filter out subaccounts with no open positions. + subaccountsToCheck := make([]satypes.SubaccountId, 0) + for _, subaccount := range subaccounts { + if len(subaccount.PerpetualPositions) > 0 { + subaccountsToCheck = append(subaccountsToCheck, *subaccount.Id) + } + } + + telemetry.ModuleSetGauge( + metrics.LiquidationDaemon, + float32(len(subaccountsToCheck)), + metrics.SubaccountsWithOpenPositions, + metrics.Count, + ) + + // Query the gRPC server in chunks of size `liqFlags.RequestChunkSize`. + liquidatableSubaccountIds = make([]satypes.SubaccountId, 0) + for start := 0; start < len(subaccountsToCheck); start += int(liqFlags.RequestChunkSize) { + end := lib.Min(start+int(liqFlags.RequestChunkSize), len(subaccountsToCheck)) + + results, err := CheckCollateralizationForSubaccounts( + daemon, + ctx, + client, + subaccountsToCheck[start:end], + ) + if err != nil { + return nil, err + } + + for _, result := range results { + if result.IsLiquidatable { + liquidatableSubaccountIds = append(liquidatableSubaccountIds, result.SubaccountId) + } + } + } + return liquidatableSubaccountIds, nil +} diff --git a/protocol/daemons/liquidation/client/sub_task_runner_test.go b/protocol/daemons/liquidation/client/sub_task_runner_test.go new file mode 100644 index 0000000000..74dc257d20 --- /dev/null +++ b/protocol/daemons/liquidation/client/sub_task_runner_test.go @@ -0,0 +1,274 @@ +package client_test + +import ( + "context" + "errors" + "github.com/cometbft/cometbft/libs/log" + "github.com/cosmos/cosmos-sdk/types/query" + "github.com/dydxprotocol/v4-chain/protocol/daemons/flags" + "github.com/dydxprotocol/v4-chain/protocol/daemons/liquidation/api" + "github.com/dydxprotocol/v4-chain/protocol/daemons/liquidation/client" + "github.com/dydxprotocol/v4-chain/protocol/mocks" + "github.com/dydxprotocol/v4-chain/protocol/testutil/constants" + "github.com/dydxprotocol/v4-chain/protocol/testutil/grpc" + clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" + satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" + "github.com/stretchr/testify/require" + "testing" +) + +func TestGetAllSubaccounts(t *testing.T) { + df := flags.GetDefaultDaemonFlags() + tests := map[string]struct { + // mocks + setupMocks func(ctx context.Context, mck *mocks.QueryClient) + + // expectations + expectedSubaccounts []satypes.Subaccount + expectedError error + }{ + "Success": { + setupMocks: func(ctx context.Context, mck *mocks.QueryClient) { + req := &satypes.QueryAllSubaccountRequest{ + Pagination: &query.PageRequest{ + Limit: df.Liquidation.SubaccountPageLimit, + }, + } + response := &satypes.QuerySubaccountAllResponse{ + Subaccount: []satypes.Subaccount{ + constants.Carl_Num0_599USD, + constants.Dave_Num0_599USD, + }, + } + mck.On("SubaccountAll", ctx, req).Return(response, nil) + }, + expectedSubaccounts: []satypes.Subaccount{ + constants.Carl_Num0_599USD, + constants.Dave_Num0_599USD, + }, + }, + "Success Paginated": { + setupMocks: func(ctx context.Context, mck *mocks.QueryClient) { + req := &satypes.QueryAllSubaccountRequest{ + Pagination: &query.PageRequest{ + Limit: df.Liquidation.SubaccountPageLimit, + }, + } + nextKey := []byte("next key") + response := &satypes.QuerySubaccountAllResponse{ + Subaccount: []satypes.Subaccount{ + constants.Carl_Num0_599USD, + }, + Pagination: &query.PageResponse{ + NextKey: nextKey, + }, + } + mck.On("SubaccountAll", ctx, req).Return(response, nil) + req2 := &satypes.QueryAllSubaccountRequest{ + Pagination: &query.PageRequest{ + Key: nextKey, + Limit: df.Liquidation.SubaccountPageLimit, + }, + } + response2 := &satypes.QuerySubaccountAllResponse{ + Subaccount: []satypes.Subaccount{ + constants.Dave_Num0_599USD, + }, + } + mck.On("SubaccountAll", ctx, req2).Return(response2, nil) + }, + expectedSubaccounts: []satypes.Subaccount{ + constants.Carl_Num0_599USD, + constants.Dave_Num0_599USD, + }, + }, + "Errors are propagated": { + setupMocks: func(ctx context.Context, mck *mocks.QueryClient) { + req := &satypes.QueryAllSubaccountRequest{ + Pagination: &query.PageRequest{ + Limit: df.Liquidation.SubaccountPageLimit, + }, + } + mck.On("SubaccountAll", ctx, req).Return(nil, errors.New("test error")) + }, + expectedError: errors.New("test error"), + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + queryClientMock := &mocks.QueryClient{} + tc.setupMocks(grpc.Ctx, queryClientMock) + + daemonClient := client.NewClient(log.NewNopLogger()) + actual, err := client.GetAllSubaccounts( + daemonClient, + grpc.Ctx, + queryClientMock, + df.Liquidation.SubaccountPageLimit, + ) + if err != nil { + require.EqualError(t, err, tc.expectedError.Error()) + } else { + require.Equal(t, tc.expectedSubaccounts, actual) + } + }) + } +} + +func TestCheckCollateralizationForSubaccounts(t *testing.T) { + tests := map[string]struct { + // mocks + setupMocks func( + ctx context.Context, + mck *mocks.QueryClient, + results []clobtypes.AreSubaccountsLiquidatableResponse_Result, + ) + subaccountIds []satypes.SubaccountId + + // expectations + expectedResults []clobtypes.AreSubaccountsLiquidatableResponse_Result + expectedError error + }{ + "Success": { + setupMocks: func( + ctx context.Context, + mck *mocks.QueryClient, + results []clobtypes.AreSubaccountsLiquidatableResponse_Result, + ) { + query := &clobtypes.AreSubaccountsLiquidatableRequest{ + SubaccountIds: []satypes.SubaccountId{ + constants.Alice_Num0, + constants.Bob_Num0, + }, + } + response := &clobtypes.AreSubaccountsLiquidatableResponse{ + Results: results, + } + mck.On("AreSubaccountsLiquidatable", ctx, query).Return(response, nil) + }, + subaccountIds: []satypes.SubaccountId{ + constants.Alice_Num0, + constants.Bob_Num0, + }, + expectedResults: []clobtypes.AreSubaccountsLiquidatableResponse_Result{ + { + SubaccountId: constants.Alice_Num0, + IsLiquidatable: true, + }, + { + SubaccountId: constants.Bob_Num0, + IsLiquidatable: false, + }, + }, + }, + "Success - Empty": { + setupMocks: func( + ctx context.Context, + mck *mocks.QueryClient, + results []clobtypes.AreSubaccountsLiquidatableResponse_Result, + ) { + query := &clobtypes.AreSubaccountsLiquidatableRequest{ + SubaccountIds: []satypes.SubaccountId{}, + } + response := &clobtypes.AreSubaccountsLiquidatableResponse{ + Results: results, + } + mck.On("AreSubaccountsLiquidatable", ctx, query).Return(response, nil) + }, + subaccountIds: []satypes.SubaccountId{}, + expectedResults: []clobtypes.AreSubaccountsLiquidatableResponse_Result{}, + }, + "Errors are propagated": { + setupMocks: func( + ctx context.Context, + mck *mocks.QueryClient, + results []clobtypes.AreSubaccountsLiquidatableResponse_Result, + ) { + query := &clobtypes.AreSubaccountsLiquidatableRequest{ + SubaccountIds: []satypes.SubaccountId{}, + } + mck.On("AreSubaccountsLiquidatable", ctx, query).Return(nil, errors.New("test error")) + }, + subaccountIds: []satypes.SubaccountId{}, + expectedResults: []clobtypes.AreSubaccountsLiquidatableResponse_Result{}, + expectedError: errors.New("test error"), + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + queryClientMock := &mocks.QueryClient{} + tc.setupMocks(grpc.Ctx, queryClientMock, tc.expectedResults) + + daemon := client.NewClient(log.NewNopLogger()) + actual, err := client.CheckCollateralizationForSubaccounts( + daemon, + grpc.Ctx, + queryClientMock, + tc.subaccountIds, + ) + + if err != nil { + require.EqualError(t, err, tc.expectedError.Error()) + } else { + require.Equal(t, tc.expectedResults, actual) + } + }) + } +} + +func TestSendLiquidatableSubaccountIds(t *testing.T) { + tests := map[string]struct { + // mocks + setupMocks func(ctx context.Context, mck *mocks.QueryClient, ids []satypes.SubaccountId) + subaccountIds []satypes.SubaccountId + + // expectations + expectedError error + }{ + "Success": { + setupMocks: func(ctx context.Context, mck *mocks.QueryClient, ids []satypes.SubaccountId) { + req := &api.LiquidateSubaccountsRequest{ + SubaccountIds: ids, + } + response := &api.LiquidateSubaccountsResponse{} + mck.On("LiquidateSubaccounts", ctx, req).Return(response, nil) + }, + subaccountIds: []satypes.SubaccountId{ + constants.Alice_Num0, + constants.Bob_Num0, + }, + }, + "Success Empty": { + setupMocks: func(ctx context.Context, mck *mocks.QueryClient, ids []satypes.SubaccountId) { + req := &api.LiquidateSubaccountsRequest{ + SubaccountIds: ids, + } + response := &api.LiquidateSubaccountsResponse{} + mck.On("LiquidateSubaccounts", ctx, req).Return(response, nil) + }, + subaccountIds: []satypes.SubaccountId{}, + }, + "Errors are propagated": { + setupMocks: func(ctx context.Context, mck *mocks.QueryClient, ids []satypes.SubaccountId) { + req := &api.LiquidateSubaccountsRequest{ + SubaccountIds: ids, + } + mck.On("LiquidateSubaccounts", ctx, req).Return(nil, errors.New("test error")) + }, + subaccountIds: []satypes.SubaccountId{}, + expectedError: errors.New("test error"), + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + queryClientMock := &mocks.QueryClient{} + tc.setupMocks(grpc.Ctx, queryClientMock, tc.subaccountIds) + + err := client.SendLiquidatableSubaccountIds(grpc.Ctx, queryClientMock, tc.subaccountIds) + require.Equal(t, tc.expectedError, err) + }) + } +} diff --git a/protocol/daemons/pricefeed/client/client.go b/protocol/daemons/pricefeed/client/client.go index 96bcd4e30a..fc2b9d549a 100644 --- a/protocol/daemons/pricefeed/client/client.go +++ b/protocol/daemons/pricefeed/client/client.go @@ -52,19 +52,25 @@ type Client struct { // Ensure stop only executes one time. stopDaemon sync.Once + + // logger is the logger for the daemon. + logger log.Logger } // Ensure Client implements the HealthCheckable interface. var _ daemontypes.HealthCheckable = (*Client)(nil) -func newClient() *Client { +func newClient(logger log.Logger) *Client { + logger = logger.With(sdklog.ModuleKey, constants.PricefeedDaemonModuleName) client := &Client{ tickers: []*time.Ticker{}, stops: []chan bool{}, HealthCheckable: daemontypes.NewTimeBoundedHealthCheckable( constants.PricefeedDaemonModuleName, &libtime.TimeProviderImpl{}, + logger, ), + logger: logger, } // Set the client's daemonStartup state to indicate that the daemon has not finished starting up. @@ -126,7 +132,6 @@ func (c *Client) Stop() { func (c *Client) start(ctx context.Context, daemonFlags flags.DaemonFlags, appFlags appflags.Flags, - logger log.Logger, grpcClient daemontypes.GrpcClient, exchangeIdToQueryConfig map[types.ExchangeId]*types.ExchangeQueryConfig, exchangeIdToExchangeDetails map[types.ExchangeId]types.ExchangeQueryDetails, @@ -135,7 +140,7 @@ func (c *Client) start(ctx context.Context, // 1. Establish connections to gRPC servers. queryConn, err := grpcClient.NewTcpConnection(ctx, appFlags.GrpcAddress) if err != nil { - logger.Error("Failed to establish gRPC connection to Cosmos gRPC query services", "error", err) + c.logger.Error("Failed to establish gRPC connection to Cosmos gRPC query services", "error", err) return err } // Defer closing gRPC connection until job completes. @@ -147,7 +152,7 @@ func (c *Client) start(ctx context.Context, daemonConn, err := grpcClient.NewGrpcConnection(ctx, daemonFlags.Shared.SocketAddress) if err != nil { - logger.Error("Failed to establish gRPC connection to socket address", "error", err) + c.logger.Error("Failed to establish gRPC connection to socket address", "error", err) return err } // Defer closing gRPC connection until job completes. @@ -207,7 +212,7 @@ func (c *Client) start(ctx context.Context, exchangeId, priceFeedMutableMarketConfigs, exchangeToMarketPrices, - logger, + c.logger, bCh, ) }() @@ -223,7 +228,7 @@ func (c *Client) start(ctx context.Context, *exchangeConfig, exchangeDetails, &handler.ExchangeQueryHandlerImpl{TimeProvider: timeProvider}, - logger, + c.logger, bCh, ) }() @@ -240,7 +245,7 @@ func (c *Client) start(ctx context.Context, marketParamUpdaterStop, priceFeedMutableMarketConfigs, pricesQueryClient, - logger, + c.logger, ) }() @@ -265,7 +270,7 @@ func (c *Client) start(ctx context.Context, priceUpdaterStop, exchangeToMarketPrices, pricefeedClient, - logger, + c.logger, ) return nil } @@ -291,7 +296,7 @@ func StartNewClient( "PriceFlags", daemonFlags.Price, ) - client = newClient() + client = newClient(logger) client.runningSubtasksWaitGroup.Add(1) go func() { defer client.runningSubtasksWaitGroup.Done() @@ -299,7 +304,6 @@ func StartNewClient( ctx, daemonFlags, appFlags, - logger.With(sdklog.ModuleKey, constants.PricefeedDaemonModuleName), grpcClient, exchangeIdToQueryConfig, exchangeIdToExchangeDetails, diff --git a/protocol/daemons/pricefeed/client/client_test.go b/protocol/daemons/pricefeed/client/client_test.go index 2ca7f46a72..310d3efa4d 100644 --- a/protocol/daemons/pricefeed/client/client_test.go +++ b/protocol/daemons/pricefeed/client/client_test.go @@ -237,12 +237,11 @@ func TestStart_InvalidConfig(t *testing.T) { faketaskRunner.WaitGroup.Add(tc.expectedNumExchangeTasks * 2) // Run Start. - client := newClient() + client := newClient(log.NewNopLogger()) err := client.start( grpc_util.Ctx, daemonflags.GetDefaultDaemonFlags(), appflags.GetFlagValuesFromOptions(appoptions.GetDefaultTestAppOptions("", nil)), - log.NewNopLogger(), tc.mockGrpcClient, tc.exchangeIdToQueryConfig, tc.exchangeIdToExchangeDetails, @@ -764,7 +763,7 @@ func TestHealthCheck_Mixed(t *testing.T) { Return(nil, tc.updateMarketPricesError).Once() ticker, stop := daemontestutils.SingleTickTickerAndStop() - client := newClient() + client := newClient(log.NewNopLogger()) // Act. // Run the price updater for a single tick. Expect the daemon to toggle health state based on @@ -785,9 +784,6 @@ func TestHealthCheck_Mixed(t *testing.T) { } else { require.ErrorContains(t, client.HealthCheck(), tc.expectedError.Error()) } - - // Cleanup. - close(stop) }) } } diff --git a/protocol/daemons/types/health_checkable.go b/protocol/daemons/types/health_checkable.go index 188f1d30af..da79405685 100644 --- a/protocol/daemons/types/health_checkable.go +++ b/protocol/daemons/types/health_checkable.go @@ -2,6 +2,7 @@ package types import ( "fmt" + "github.com/cometbft/cometbft/libs/log" libtime "github.com/dydxprotocol/v4-chain/protocol/lib/time" "sync" "time" @@ -61,12 +62,20 @@ type timeBoundedHealthCheckable struct { // timeProvider is the time provider used to determine the current time. This is used for timestamping // creation and checking for update staleness during HealthCheck. timeProvider libtime.TimeProvider + + // logger is the logger used to log errors. + logger log.Logger } // NewTimeBoundedHealthCheckable creates a new HealthCheckable instance. -func NewTimeBoundedHealthCheckable(serviceName string, timeProvider libtime.TimeProvider) HealthCheckable { +func NewTimeBoundedHealthCheckable( + serviceName string, + timeProvider libtime.TimeProvider, + logger log.Logger, +) HealthCheckable { hc := &timeBoundedHealthCheckable{ timeProvider: timeProvider, + logger: logger, } // Initialize the timeBoudnedHealthCheckable to an unhealthy state by reporting an error. hc.ReportFailure(fmt.Errorf("%v is initializing", serviceName)) @@ -117,14 +126,17 @@ func (h *timeBoundedHealthCheckable) HealthCheck() error { ) } - // If the last successful update was more than 5 minutes ago, report the specific error. + // If the last successful update was more than 5 minutes ago, log the specific error. if h.timeProvider.Now().Sub(h.lastSuccessfulUpdate) > MaxAcceptableUpdateDelay { - return fmt.Errorf( - "last successful update occurred at %v, which is more than %v ago. Last failure occurred at %v with error '%w'", - h.lastSuccessfulUpdate, - MaxAcceptableUpdateDelay, - h.lastFailedUpdate.Timestamp(), - h.lastFailedUpdate.Error(), + h.logger.Error( + fmt.Sprintf( + "last successful update occurred at %v, which is more than %v ago. "+ + "Last failure occurred at %v with error '%v'", + h.lastSuccessfulUpdate, + MaxAcceptableUpdateDelay, + h.lastFailedUpdate.Timestamp(), + h.lastFailedUpdate.Error(), + ), ) } diff --git a/protocol/daemons/types/health_checkable_test.go b/protocol/daemons/types/health_checkable_test.go index 4ec5ca9949..7059da5f35 100644 --- a/protocol/daemons/types/health_checkable_test.go +++ b/protocol/daemons/types/health_checkable_test.go @@ -2,6 +2,7 @@ package types_test import ( "fmt" + "github.com/cometbft/cometbft/libs/log" "github.com/dydxprotocol/v4-chain/protocol/daemons/types" libtime "github.com/dydxprotocol/v4-chain/protocol/lib/time" "github.com/dydxprotocol/v4-chain/protocol/mocks" @@ -100,23 +101,6 @@ func TestHealthCheckableImpl_Mixed(t *testing.T) { healthCheckTime: Time3, expectedHealthStatus: nil, // expect healthy }, - "unhealthy: last successful update was more than max delay": { - updates: []struct { - timestamp time.Time - err error - }{ - {Time1, nil}, // successful update - }, - healthCheckTime: Time_5Minutes_And_2Seconds, - expectedHealthStatus: fmt.Errorf( - "last successful update occurred at %v, which is more than %v ago. "+ - "Last failure occurred at %v with error '%w'", - Time1, - types.MaxAcceptableUpdateDelay, - Time0, - InitializingStatus, - ), - }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { @@ -138,6 +122,7 @@ func TestHealthCheckableImpl_Mixed(t *testing.T) { hci := types.NewTimeBoundedHealthCheckable( "test", mockTimeProviderWithTimestamps(timestamps), + log.NewNopLogger(), ) // Act. diff --git a/protocol/testutil/daemons/common.go b/protocol/testutil/daemons/common.go index 2039b4e2f7..870fafeda6 100644 --- a/protocol/testutil/daemons/common.go +++ b/protocol/testutil/daemons/common.go @@ -20,6 +20,7 @@ func SingleTickTickerAndStop() (*time.Ticker, chan bool) { // Once the single tick is consumed, stop the ticker and signal the stop channel. if len(ticker.C) == 0 { stop <- true + close(stop) ticker.Stop() return }