diff --git a/.github/workflows/lava.yml b/.github/workflows/lava.yml index 6ee6658571..8d114b5941 100644 --- a/.github/workflows/lava.yml +++ b/.github/workflows/lava.yml @@ -305,6 +305,47 @@ jobs: # name: SDK E2E Logs # path: "testutil/e2e/sdkLogs/*" +# This part came from lava_sdk_tests.yml that was removed. just not to lose functionality it moved here. +# name: Lava SDK Tests + +# on: +# pull_request + +# jobs: +# main: +# runs-on: ubuntu-latest +# steps: +# - name: Checkout code +# uses: actions/checkout@v4 + +# - name: Cache dependencies +# uses: actions/cache@v4 +# with: +# path: ~/.yarn +# key: yarn-${{ hashFiles('yarn.lock') }} +# restore-keys: yarn- + +# - uses: actions/setup-go@v5 +# with: +# go-version: "1.20.5" + +# - uses: actions/setup-node@v4 +# with: +# node-version: "21.2.0" + +# - name: Init the SDK +# run: GOPATH=~/go ./scripts/init_sdk.sh -s +# working-directory: ./ecosystem/lava-sdk + +# - name: ESLint +# run: ./node_modules/.bin/eslint '**/*.ts' +# working-directory: ./ecosystem/lava-sdk + +# - name: Test +# run: ./node_modules/.bin/jest ./src --ci +# working-directory: ./ecosystem/lava-sdk + + test-payment-e2e: runs-on: ubuntu-latest steps: diff --git a/.github/workflows/lava_sdk_tests.yml b/.github/workflows/lava_sdk_tests.yml deleted file mode 100644 index a13c83c348..0000000000 --- a/.github/workflows/lava_sdk_tests.yml +++ /dev/null @@ -1,38 +0,0 @@ -name: Lava SDK Tests - -on: - pull_request - -jobs: - main: - runs-on: ubuntu-latest - steps: - - name: Checkout code - uses: actions/checkout@v4 - - - name: Cache dependencies - uses: actions/cache@v4 - with: - path: ~/.yarn - key: yarn-${{ hashFiles('yarn.lock') }} - restore-keys: yarn- - - - uses: actions/setup-go@v5 - with: - go-version: "1.20.5" - - - uses: actions/setup-node@v4 - with: - node-version: "21.2.0" - - - name: Init the SDK - run: GOPATH=~/go ./scripts/init_sdk.sh -s - working-directory: ./ecosystem/lava-sdk - - - name: ESLint - run: ./node_modules/.bin/eslint '**/*.ts' - working-directory: ./ecosystem/lava-sdk - - - name: Test - run: ./node_modules/.bin/jest ./src --ci - working-directory: ./ecosystem/lava-sdk diff --git a/protocol/chainlib/chainlib.go b/protocol/chainlib/chainlib.go index 6fbf4ba536..18916afa7d 100644 --- a/protocol/chainlib/chainlib.go +++ b/protocol/chainlib/chainlib.go @@ -124,7 +124,6 @@ type RelaySender interface { connectionType string, dappID string, consumerIp string, - analytics *metrics.RelayMetrics, metadata []pairingtypes.Metadata, ) (ProtocolMessage, error) SendParsedRelay( diff --git a/protocol/chainlib/chainlib_mock.go b/protocol/chainlib/chainlib_mock.go index fec033fe68..0891fd906a 100644 --- a/protocol/chainlib/chainlib_mock.go +++ b/protocol/chainlib/chainlib_mock.go @@ -714,18 +714,18 @@ func (mr *MockRelaySenderMockRecorder) CreateDappKey(userData interface{}) *gomo } // ParseRelay mocks base method. -func (m *MockRelaySender) ParseRelay(ctx context.Context, url, req, connectionType, dappID, consumerIp string, analytics *metrics.RelayMetrics, metadata []types.Metadata) (ProtocolMessage, error) { +func (m *MockRelaySender) ParseRelay(ctx context.Context, url, req, connectionType, dappID, consumerIp string, metadata []types.Metadata) (ProtocolMessage, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ParseRelay", ctx, url, req, connectionType, dappID, consumerIp, analytics, metadata) + ret := m.ctrl.Call(m, "ParseRelay", ctx, url, req, connectionType, dappID, consumerIp, metadata) ret0, _ := ret[0].(ProtocolMessage) ret1, _ := ret[1].(error) return ret0, ret1 } // ParseRelay indicates an expected call of ParseRelay. -func (mr *MockRelaySenderMockRecorder) ParseRelay(ctx, url, req, connectionType, dappID, consumerIp, analytics, metadata interface{}) *gomock.Call { +func (mr *MockRelaySenderMockRecorder) ParseRelay(ctx, url, req, connectionType, dappID, consumerIp, metadata interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ParseRelay", reflect.TypeOf((*MockRelaySender)(nil).ParseRelay), ctx, url, req, connectionType, dappID, consumerIp, analytics, metadata) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ParseRelay", reflect.TypeOf((*MockRelaySender)(nil).ParseRelay), ctx, url, req, connectionType, dappID, consumerIp, metadata) } // SendParsedRelay mocks base method. diff --git a/protocol/chainlib/consumer_websocket_manager.go b/protocol/chainlib/consumer_websocket_manager.go index 6bf645cf4a..ee8ab6319a 100644 --- a/protocol/chainlib/consumer_websocket_manager.go +++ b/protocol/chainlib/consumer_websocket_manager.go @@ -218,7 +218,7 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() { metricsData := metrics.NewRelayAnalytics(dappID, cwm.chainId, cwm.apiInterface) - protocolMessage, err := cwm.relaySender.ParseRelay(webSocketCtx, "", string(msg), cwm.connectionType, dappID, userIp, metricsData, nil) + protocolMessage, err := cwm.relaySender.ParseRelay(webSocketCtx, "", string(msg), cwm.connectionType, dappID, userIp, nil) if err != nil { utils.LavaFormatDebug("ws manager could not parse message", utils.LogAttr("message", msg), utils.LogAttr("err", err)) formatterMsg := logger.AnalyzeWebSocketErrorAndGetFormattedMessage(websocketConn.LocalAddr().String(), err, msgSeed, msg, cwm.apiInterface, time.Since(startTime)) diff --git a/protocol/chainlib/consumer_ws_subscription_manager.go b/protocol/chainlib/consumer_ws_subscription_manager.go index 72ed94e3ee..88c2fc3aac 100644 --- a/protocol/chainlib/consumer_ws_subscription_manager.go +++ b/protocol/chainlib/consumer_ws_subscription_manager.go @@ -697,7 +697,7 @@ func (cwsm *ConsumerWSSubscriptionManager) craftUnsubscribeMessage(hashedParams, // Craft the unsubscribe chain message ctx := context.Background() - protocolMessage, err := cwsm.relaySender.ParseRelay(ctx, "", unsubscribeRequestData, cwsm.connectionType, dappID, consumerIp, metricsData, nil) + protocolMessage, err := cwsm.relaySender.ParseRelay(ctx, "", unsubscribeRequestData, cwsm.connectionType, dappID, consumerIp, nil) if err != nil { return nil, utils.LavaFormatError("could not craft unsubscribe chain message", err, utils.LogAttr("hashedParams", utils.ToHexString(hashedParams)), diff --git a/protocol/chainlib/consumer_ws_subscription_manager_test.go b/protocol/chainlib/consumer_ws_subscription_manager_test.go index 08015c239f..4683eac50e 100644 --- a/protocol/chainlib/consumer_ws_subscription_manager_test.go +++ b/protocol/chainlib/consumer_ws_subscription_manager_test.go @@ -88,7 +88,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes relaySender. EXPECT(). - ParseRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + ParseRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(protocolMessage1, nil). AnyTimes() @@ -244,7 +244,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) { relaySender. EXPECT(). - ParseRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + ParseRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(protocolMessage1, nil). AnyTimes() @@ -484,7 +484,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { require.True(t, ok) areEqual := reqData == string(play.unsubscribeMessage1) return areEqual - }), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + }), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(unsubscribeProtocolMessage1, nil). AnyTimes() @@ -495,7 +495,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { require.True(t, ok) areEqual := reqData == string(play.subscriptionRequestData1) return areEqual - }), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + }), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(subscribeProtocolMessage1, nil). AnyTimes() @@ -600,7 +600,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { require.True(t, ok) areEqual := reqData == string(play.unsubscribeMessage2) return areEqual - }), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + }), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(unsubscribeProtocolMessage2, nil). AnyTimes() @@ -611,7 +611,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { require.True(t, ok) areEqual := reqData == string(play.subscriptionRequestData2) return areEqual - }), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + }), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(subscribeProtocolMessage2, nil). AnyTimes() diff --git a/protocol/chainlib/extensionslib/extension_parser.go b/protocol/chainlib/extensionslib/extension_parser.go index c8fc38db90..ae371bab1a 100644 --- a/protocol/chainlib/extensionslib/extension_parser.go +++ b/protocol/chainlib/extensionslib/extension_parser.go @@ -4,6 +4,8 @@ import ( spectypes "github.com/lavanet/lava/v4/x/spec/types" ) +const ArchiveExtension = "archive" + type ExtensionInfo struct { ExtensionOverride []string LatestBlock uint64 @@ -84,7 +86,7 @@ func (ep *ExtensionParser) ExtensionParsing(addon string, extensionsChainMessage func NewExtensionParserRule(extension *spectypes.Extension) ExtensionParserRule { switch extension.Name { - case "archive": + case ArchiveExtension: return ArchiveParserRule{extension: extension} default: // unsupported rule diff --git a/protocol/rpcconsumer/consumer_relay_state_machine.go b/protocol/rpcconsumer/consumer_relay_state_machine.go index ca0df4c4b9..484fb0bc36 100644 --- a/protocol/rpcconsumer/consumer_relay_state_machine.go +++ b/protocol/rpcconsumer/consumer_relay_state_machine.go @@ -2,11 +2,15 @@ package rpcconsumer import ( context "context" + "sync" "sync/atomic" "time" + pairingtypes "github.com/lavanet/lava/v4/x/pairing/types" + "github.com/lavanet/lava/v4/protocol/chainlib" common "github.com/lavanet/lava/v4/protocol/common" + "github.com/lavanet/lava/v4/protocol/lavaprotocol" lavasession "github.com/lavanet/lava/v4/protocol/lavasession" "github.com/lavanet/lava/v4/protocol/metrics" "github.com/lavanet/lava/v4/utils" @@ -15,16 +19,31 @@ import ( type RelayStateMachine interface { GetProtocolMessage() chainlib.ProtocolMessage GetDebugState() bool - GetRelayTaskChannel() chan RelayStateSendInstructions + GetRelayTaskChannel() (chan RelayStateSendInstructions, error) UpdateBatch(err error) GetSelection() Selection GetUsedProviders() *lavasession.UsedProviders - SetRelayProcessor(relayProcessor *RelayProcessor) + SetResultsChecker(resultsChecker ResultsCheckerInf) + SetRelayRetriesManager(relayRetriesManager *lavaprotocol.RelayRetriesManager) +} + +type ResultsCheckerInf interface { + WaitForResults(ctx context.Context) error + HasRequiredNodeResults() (bool, int) } type ConsumerRelaySender interface { getProcessingTimeout(chainMessage chainlib.ChainMessage) (processingTimeout time.Duration, relayTimeout time.Duration) GetChainIdAndApiInterface() (string, string) + ParseRelay( + ctx context.Context, + url string, + req string, + connectionType string, + dappID string, + consumerIp string, + metadata []pairingtypes.Metadata, + ) (protocolMessage chainlib.ProtocolMessage, err error) } type tickerMetricSetterInf interface { @@ -32,16 +51,19 @@ type tickerMetricSetterInf interface { } type ConsumerRelayStateMachine struct { - ctx context.Context // same context as user context. - relaySender ConsumerRelaySender - parentRelayProcessor *RelayProcessor - protocolMessage chainlib.ProtocolMessage // only one should make changes to protocol message is ConsumerRelayStateMachine. - analytics *metrics.RelayMetrics // first relay metrics - selection Selection - debugRelays bool - tickerMetricSetter tickerMetricSetterInf - batchUpdate chan error - usedProviders *lavasession.UsedProviders + ctx context.Context // same context as user context. + relaySender ConsumerRelaySender + resultsChecker ResultsCheckerInf + analytics *metrics.RelayMetrics // first relay metrics + selection Selection + debugRelays bool + tickerMetricSetter tickerMetricSetterInf + batchUpdate chan error + usedProviders *lavasession.UsedProviders + relayRetriesManager *lavaprotocol.RelayRetriesManager + relayState []*RelayState + protocolMessage chainlib.ProtocolMessage + relayStateLock sync.RWMutex } func NewRelayStateMachine( @@ -68,11 +90,20 @@ func NewRelayStateMachine( debugRelays: debugRelays, tickerMetricSetter: tickerMetricSetter, batchUpdate: make(chan error, MaximumNumberOfTickerRelayRetries), + relayState: make([]*RelayState, 0), } } -func (crsm *ConsumerRelayStateMachine) SetRelayProcessor(relayProcessor *RelayProcessor) { - crsm.parentRelayProcessor = relayProcessor +func (crsm *ConsumerRelayStateMachine) Initialized() bool { + return crsm.relayRetriesManager != nil && crsm.resultsChecker != nil +} + +func (crsm *ConsumerRelayStateMachine) SetRelayRetriesManager(relayRetriesManager *lavaprotocol.RelayRetriesManager) { + crsm.relayRetriesManager = relayRetriesManager +} + +func (crsm *ConsumerRelayStateMachine) SetResultsChecker(resultsChecker ResultsCheckerInf) { + crsm.resultsChecker = resultsChecker } func (crsm *ConsumerRelayStateMachine) GetUsedProviders() *lavasession.UsedProviders { @@ -83,13 +114,48 @@ func (crsm *ConsumerRelayStateMachine) GetSelection() Selection { return crsm.selection } -func (crsm *ConsumerRelayStateMachine) shouldRetryOnResult(numberOfRetriesLaunched int, numberOfNodeErrors uint64) bool { - shouldRetry := crsm.shouldRetryInner(numberOfRetriesLaunched) - // archive functionality will be added here. +func (crsm *ConsumerRelayStateMachine) appendRelayState(nextState *RelayState) { + crsm.relayStateLock.Lock() + defer crsm.relayStateLock.Unlock() + crsm.relayState = append(crsm.relayState, nextState) +} + +func (crsm *ConsumerRelayStateMachine) getLatestState() *RelayState { + crsm.relayStateLock.RLock() + defer crsm.relayStateLock.RUnlock() + if len(crsm.relayState) == 0 { + return nil + } + return crsm.relayState[len(crsm.relayState)-1] +} + +func (crsm *ConsumerRelayStateMachine) stateTransition(relayState *RelayState) *RelayState { + var nextState *RelayState + if relayState == nil { // initial state + nextState = NewRelayState(crsm.ctx, crsm.protocolMessage, 0, crsm.relayRetriesManager, crsm.relaySender, ArchiveStatus{}) + } else { + nextState = NewRelayState(crsm.ctx, relayState.GetProtocolMessage(), relayState.GetStateNumber()+1, crsm.relayRetriesManager, crsm.relaySender, relayState.archiveStatus) + } + crsm.appendRelayState(nextState) + return nextState +} + +// Should retry implements the logic for when to send another relay. +// As well as the decision of changing the protocol message, +// into different extensions or addons based on certain conditions +func (crsm *ConsumerRelayStateMachine) shouldRetry(numberOfNodeErrors uint64) bool { + batchNumber := crsm.usedProviders.BatchNumber() + shouldRetry := crsm.retryCondition(batchNumber) + if shouldRetry { + lastState := crsm.getLatestState() + nextState := crsm.stateTransition(lastState) + // Retry archive logic + return nextState.upgradeToArchiveIfNeeded(batchNumber, numberOfNodeErrors) + } return shouldRetry } -func (crsm *ConsumerRelayStateMachine) shouldRetryInner(numberOfRetriesLaunched int) bool { +func (crsm *ConsumerRelayStateMachine) retryCondition(numberOfRetriesLaunched int) bool { if numberOfRetriesLaunched >= MaximumNumberOfTickerRelayRetries { return false } @@ -97,16 +163,16 @@ func (crsm *ConsumerRelayStateMachine) shouldRetryInner(numberOfRetriesLaunched return crsm.selection != BestResult } -func (crsm *ConsumerRelayStateMachine) shouldRetryTicker(numberOfRetriesLaunched int) bool { - return crsm.shouldRetryInner(numberOfRetriesLaunched) -} - func (crsm *ConsumerRelayStateMachine) GetDebugState() bool { return crsm.debugRelays } func (crsm *ConsumerRelayStateMachine) GetProtocolMessage() chainlib.ProtocolMessage { - return crsm.protocolMessage + latestState := crsm.getLatestState() + if latestState == nil { // failed fetching latest state + return crsm.protocolMessage + } + return latestState.GetProtocolMessage() } type RelayStateSendInstructions struct { @@ -120,7 +186,10 @@ func (rssi *RelayStateSendInstructions) IsDone() bool { return rssi.done || rssi.err != nil } -func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSendInstructions { +func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() (chan RelayStateSendInstructions, error) { + if !crsm.Initialized() { + return nil, utils.LavaFormatError("ConsumerRelayStateMachine was not initialized properly", nil) + } relayTaskChannel := make(chan RelayStateSendInstructions) go func() { // A channel to be notified processing was done, true means we have results and can return @@ -137,9 +206,9 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend readResultsFromProcessor := func() { // ProcessResults is reading responses while blocking until the conditions are met utils.LavaFormatTrace("[StateMachine] Waiting for results", utils.LogAttr("batch", crsm.usedProviders.BatchNumber())) - crsm.parentRelayProcessor.WaitForResults(processingCtx) + crsm.resultsChecker.WaitForResults(processingCtx) // Decide if we need to resend or not - metRequiredNodeResults, numberOfNodeErrors := crsm.parentRelayProcessor.HasRequiredNodeResults() + metRequiredNodeResults, numberOfNodeErrors := crsm.resultsChecker.HasRequiredNodeResults() numberOfNodeErrorsAtomic.Store(uint64(numberOfNodeErrors)) if metRequiredNodeResults { gotResults <- true @@ -160,6 +229,8 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend } } + // initialize relay state + crsm.stateTransition(nil) // Send First Message, with analytics and without waiting for batch update. relayTaskChannel <- RelayStateSendInstructions{ protocolMessage: crsm.GetProtocolMessage(), @@ -187,9 +258,7 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend } else { utils.LavaFormatTrace("[StateMachine] batchUpdate - err != nil - batch fail retry attempt", utils.LogAttr("batch", crsm.usedProviders.BatchNumber()), utils.LogAttr("consecutiveBatchErrors", consecutiveBatchErrors)) // Failed sending message, but we still want to attempt sending more. - relayTaskChannel <- RelayStateSendInstructions{ - protocolMessage: crsm.GetProtocolMessage(), - } + relayTaskChannel <- RelayStateSendInstructions{protocolMessage: crsm.GetProtocolMessage()} } continue } @@ -205,7 +274,7 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend return } // If should retry == true, send a new batch. (success == false) - if crsm.shouldRetryOnResult(crsm.usedProviders.BatchNumber(), numberOfNodeErrorsAtomic.Load()) { + if crsm.shouldRetry(numberOfNodeErrorsAtomic.Load()) { utils.LavaFormatTrace("[StateMachine] success := <-gotResults - crsm.ShouldRetry(batchNumber)", utils.LogAttr("batch", crsm.usedProviders.BatchNumber())) relayTaskChannel <- RelayStateSendInstructions{protocolMessage: crsm.GetProtocolMessage()} } else { @@ -214,7 +283,7 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend go readResultsFromProcessor() case <-startNewBatchTicker.C: // Only trigger another batch for non BestResult relays or if we didn't pass the retry limit. - if crsm.shouldRetryTicker(crsm.usedProviders.BatchNumber()) { + if crsm.shouldRetry(numberOfNodeErrorsAtomic.Load()) { utils.LavaFormatTrace("[StateMachine] ticker triggered", utils.LogAttr("batch", crsm.usedProviders.BatchNumber())) relayTaskChannel <- RelayStateSendInstructions{protocolMessage: crsm.GetProtocolMessage()} // Add ticker launch metrics @@ -249,7 +318,7 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend } } }() - return relayTaskChannel + return relayTaskChannel, nil } func (crsm *ConsumerRelayStateMachine) UpdateBatch(err error) { diff --git a/protocol/rpcconsumer/consumer_relay_state_machine_test.go b/protocol/rpcconsumer/consumer_relay_state_machine_test.go index dfd6eeb871..bd51cd5673 100644 --- a/protocol/rpcconsumer/consumer_relay_state_machine_test.go +++ b/protocol/rpcconsumer/consumer_relay_state_machine_test.go @@ -10,7 +10,7 @@ import ( "github.com/lavanet/lava/v4/protocol/chainlib" "github.com/lavanet/lava/v4/protocol/chainlib/extensionslib" lavasession "github.com/lavanet/lava/v4/protocol/lavasession" - "github.com/lavanet/lava/v4/protocol/metrics" + pairingtypes "github.com/lavanet/lava/v4/x/pairing/types" spectypes "github.com/lavanet/lava/v4/x/spec/types" "github.com/stretchr/testify/require" ) @@ -20,10 +20,6 @@ type ConsumerRelaySenderMock struct { tickerValue time.Duration } -func (crsm *ConsumerRelaySenderMock) sendRelayToProvider(ctx context.Context, protocolMessage chainlib.ProtocolMessage, relayProcessor *RelayProcessor, analytics *metrics.RelayMetrics) (errRet error) { - return crsm.retValue -} - func (crsm *ConsumerRelaySenderMock) getProcessingTimeout(chainMessage chainlib.ChainMessage) (processingTimeout time.Duration, relayTimeout time.Duration) { if crsm.tickerValue != 0 { return time.Second * 50000, crsm.tickerValue @@ -35,6 +31,18 @@ func (crsm *ConsumerRelaySenderMock) GetChainIdAndApiInterface() (string, string return "testUno", "testDos" } +func (crsm *ConsumerRelaySenderMock) ParseRelay( + ctx context.Context, + url string, + req string, + connectionType string, + dappID string, + consumerIp string, + metadata []pairingtypes.Metadata, +) (protocolMessage chainlib.ProtocolMessage, err error) { + return nil, fmt.Errorf("not implemented") +} + func TestConsumerStateMachineHappyFlow(t *testing.T) { t.Run("happy", func(t *testing.T) { ctx := context.Background() @@ -66,7 +74,8 @@ func TestConsumerStateMachineHappyFlow(t *testing.T) { require.Zero(t, usedProviders.SessionsLatestBatch()) consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}} - relayTaskChannel := relayProcessor.GetRelayTaskChannel() + relayTaskChannel, err := relayProcessor.GetRelayTaskChannel() + require.NoError(t, err) taskNumber := 0 for task := range relayTaskChannel { switch taskNumber { @@ -135,7 +144,8 @@ func TestConsumerStateMachineExhaustRetries(t *testing.T) { require.Zero(t, usedProviders.CurrentlyUsed()) require.Zero(t, usedProviders.SessionsLatestBatch()) - relayTaskChannel := relayProcessor.GetRelayTaskChannel() + relayTaskChannel, err := relayProcessor.GetRelayTaskChannel() + require.NoError(t, err) taskNumber := 0 for task := range relayTaskChannel { switch taskNumber { diff --git a/protocol/rpcconsumer/relay_processor.go b/protocol/rpcconsumer/relay_processor.go index 589c054fcc..d68520a8a8 100644 --- a/protocol/rpcconsumer/relay_processor.go +++ b/protocol/rpcconsumer/relay_processor.go @@ -86,7 +86,8 @@ func NewRelayProcessor( selection: relayStateMachine.GetSelection(), usedProviders: relayStateMachine.GetUsedProviders(), } - relayProcessor.RelayStateMachine.SetRelayProcessor(relayProcessor) + relayProcessor.RelayStateMachine.SetResultsChecker(relayProcessor) + relayProcessor.RelayStateMachine.SetRelayRetriesManager(relayRetriesManager) return relayProcessor } diff --git a/protocol/rpcconsumer/relay_state.go b/protocol/rpcconsumer/relay_state.go new file mode 100644 index 0000000000..8bee2eda25 --- /dev/null +++ b/protocol/rpcconsumer/relay_state.go @@ -0,0 +1,118 @@ +package rpcconsumer + +import ( + "context" + "strings" + + "github.com/lavanet/lava/v4/protocol/chainlib" + "github.com/lavanet/lava/v4/protocol/chainlib/extensionslib" + common "github.com/lavanet/lava/v4/protocol/common" + "github.com/lavanet/lava/v4/utils" + slices "github.com/lavanet/lava/v4/utils/lavaslices" + pairingtypes "github.com/lavanet/lava/v4/x/pairing/types" +) + +type RetryHashCacheInf interface { + CheckHashInCache(hash string) bool + AddHashToCache(hash string) +} + +type RelayParserInf interface { + ParseRelay( + ctx context.Context, + url string, + req string, + connectionType string, + dappID string, + consumerIp string, + metadata []pairingtypes.Metadata, + ) (protocolMessage chainlib.ProtocolMessage, err error) +} + +type ArchiveStatus struct { + isArchive bool + isUpgraded bool + isHashCached bool +} + +type RelayState struct { + archiveStatus ArchiveStatus + stateNumber int + protocolMessage chainlib.ProtocolMessage + cache RetryHashCacheInf + relayParser RelayParserInf + ctx context.Context +} + +func NewRelayState(ctx context.Context, protocolMessage chainlib.ProtocolMessage, stateNumber int, cache RetryHashCacheInf, relayParser RelayParserInf, archiveInfo ArchiveStatus) *RelayState { + relayRequestData := protocolMessage.RelayPrivateData() + isArchive := false + if slices.Contains(relayRequestData.Extensions, extensionslib.ArchiveExtension) { + isArchive = true + } + return &RelayState{ctx: ctx, protocolMessage: protocolMessage, stateNumber: stateNumber, cache: cache, relayParser: relayParser, archiveStatus: ArchiveStatus{isArchive: isArchive, isUpgraded: archiveInfo.isUpgraded, isHashCached: archiveInfo.isHashCached}} +} + +func (rs *RelayState) GetStateNumber() int { + if rs == nil { + return 0 + } + return rs.stateNumber +} + +func (rs *RelayState) GetProtocolMessage() chainlib.ProtocolMessage { + return rs.protocolMessage +} + +func (rs *RelayState) upgradeToArchiveIfNeeded(numberOfRetriesLaunched int, numberOfNodeErrors uint64) bool { + hashes := rs.protocolMessage.GetRequestedBlocksHashes() + // If we got upgraded and we still got a node error (>= 2) we know upgrade didn't work + if rs.archiveStatus.isUpgraded && numberOfNodeErrors >= 2 { + // Validate the following. + // 1. That we have applied archive + // 2. That we had more than one node error (meaning the 2nd was a successful archive [node error] 100%) + // Now - + // We know we have applied archive and failed. + // 1. We can remove the archive, return to the original protocol message, + // 2. Set all hashes as irrelevant for future queries. + if !rs.archiveStatus.isHashCached { + for _, hash := range hashes { + rs.cache.AddHashToCache(hash) + } + rs.archiveStatus.isHashCached = true + } + // We do not want to send additional relays after archive attempt. return false. + return false + } + if !rs.archiveStatus.isArchive && len(hashes) > 0 && numberOfNodeErrors > 0 { + // Launch archive only on the second retry attempt. + if numberOfRetriesLaunched == 1 { + // Iterate over all hashes found in relay, if we don't have them in the cache we can try retry on archive. + // If we are familiar with all, we don't want to allow archive. + for _, hash := range hashes { + if !rs.cache.CheckHashInCache(hash) { + // If we didn't find the hash in the cache we can try archive relay. + relayRequestData := rs.protocolMessage.RelayPrivateData() + // We need to set archive. + // Create a new relay private data containing the extension. + userData := rs.protocolMessage.GetUserData() + // add all existing extensions including archive split by "," so the override will work + existingExtensionsPlusArchive := strings.Join(append(relayRequestData.Extensions, extensionslib.ArchiveExtension), ",") + metaDataForArchive := []pairingtypes.Metadata{{Name: common.EXTENSION_OVERRIDE_HEADER_NAME, Value: existingExtensionsPlusArchive}} + newProtocolMessage, err := rs.relayParser.ParseRelay(rs.ctx, relayRequestData.ApiUrl, string(relayRequestData.Data), relayRequestData.ConnectionType, userData.DappId, userData.ConsumerIp, metaDataForArchive) + if err != nil { + utils.LavaFormatError("Failed converting to archive message in shouldRetry", err, utils.LogAttr("relayRequestData", relayRequestData), utils.LogAttr("metadata", metaDataForArchive)) + } + // Creating an archive protocol message, and set it to current protocol message + rs.protocolMessage = newProtocolMessage + // for future batches. + rs.archiveStatus.isUpgraded = true + rs.archiveStatus.isArchive = true + break + } + } + // We had node error, and we have a hash parsed. + } + } + return true +} diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 4b3612dd1d..468c536fe9 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -326,7 +326,7 @@ func (rpccs *RPCConsumerServer) SendRelay( analytics *metrics.RelayMetrics, metadata []pairingtypes.Metadata, ) (relayResult *common.RelayResult, errRet error) { - protocolMessage, err := rpccs.ParseRelay(ctx, url, req, connectionType, dappID, consumerIp, analytics, metadata) + protocolMessage, err := rpccs.ParseRelay(ctx, url, req, connectionType, dappID, consumerIp, metadata) if err != nil { return nil, err } @@ -341,7 +341,6 @@ func (rpccs *RPCConsumerServer) ParseRelay( connectionType string, dappID string, consumerIp string, - analytics *metrics.RelayMetrics, metadata []pairingtypes.Metadata, ) (protocolMessage chainlib.ProtocolMessage, err error) { // gets the relay request data from the ChainListener @@ -438,7 +437,10 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, protocolMe NewRelayStateMachine(ctx, usedProviders, rpccs, protocolMessage, analytics, rpccs.debugRelays, rpccs.rpcConsumerLogs), ) - relayTaskChannel := relayProcessor.GetRelayTaskChannel() + relayTaskChannel, err := relayProcessor.GetRelayTaskChannel() + if err != nil { + return relayProcessor, err + } for task := range relayTaskChannel { if task.IsDone() { return relayProcessor, task.err diff --git a/protocol/rpcprovider/rpcprovider_server_test.go b/protocol/rpcprovider/rpcprovider_server_test.go index a18228b9ec..912a368eb2 100644 --- a/protocol/rpcprovider/rpcprovider_server_test.go +++ b/protocol/rpcprovider/rpcprovider_server_test.go @@ -129,7 +129,7 @@ func TestHandleConsistency(t *testing.T) { requestBlock: spectypes.LATEST_BLOCK, specId: "LAV1", err: nil, - timeout: 20 * time.Millisecond, // 150 is one way travel time + timeout: 25 * time.Millisecond, // 150 is one way travel time chainTrackerBlocks: []int64{100, 101}, changeTime: 100 * time.Second, sleep: true,