Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: log height every hour simulateneously #126

Merged
merged 6 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 7 additions & 17 deletions relayer/chains/icon/icon_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
metrics *processor.PrometheusMetrics

verifier *Verifier

heightSnapshotChan chan struct{}
}

type Verifier struct {
Expand All @@ -74,9 +76,9 @@
prevNetworkSectionHash []byte
}

func NewIconChainProcessor(log *zap.Logger, provider *IconProvider, metrics *processor.PrometheusMetrics) *IconChainProcessor {
func NewIconChainProcessor(log *zap.Logger, provider *IconProvider, metrics *processor.PrometheusMetrics, heightSnapshot chan struct{}) *IconChainProcessor {

Check warning on line 79 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L79

Added line #L79 was not covered by tests
return &IconChainProcessor{
log: log.With(zap.String("chain_name", provider.ChainName()), zap.String("chain_id", provider.ChainId())),

Check warning on line 81 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L81

Added line #L81 was not covered by tests
chainProvider: provider,
latestClientState: make(latestClientState),
connectionStateCache: make(processor.ConnectionStateCache),
Expand All @@ -84,6 +86,7 @@
connectionClients: make(map[string]string),
channelConnections: make(map[string]string),
metrics: metrics,
heightSnapshotChan: heightSnapshot,

Check warning on line 89 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L89

Added line #L89 was not covered by tests
}
}

Expand Down Expand Up @@ -149,7 +152,7 @@
}

// start_query_cycle
icp.log.Debug("Starting query cycle")

Check warning on line 155 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L155

Added line #L155 was not covered by tests
err := icp.monitoring(ctx, &persistence)
return err
}
Expand All @@ -168,12 +171,12 @@
return snapshotHeight
}

func (icp *IconChainProcessor) getLastSavedHeight() int {
snapshotHeight, err := rlycommon.LoadSnapshotHeight(icp.Provider().ChainId())
if err != nil || snapshotHeight < 0 {
return 0
}
return snapshotHeight

Check warning on line 179 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L174-L179

Added lines #L174 - L179 were not covered by tests
}

func (icp *IconChainProcessor) initializeConnectionState(ctx context.Context) error {
Expand All @@ -195,9 +198,9 @@
CounterpartyClientID: c.Counterparty.ClientId,
}] = c.State == conntypes.OPEN

icp.log.Debug("Found open connection",
zap.String("client-id ", c.ClientId),
zap.String("connection-id ", c.Id),

Check warning on line 203 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L201-L203

Added lines #L201 - L203 were not covered by tests
)
}
return nil
Expand Down Expand Up @@ -229,11 +232,11 @@
CounterpartyPortID: ch.Counterparty.PortId,
}] = ch.State == chantypes.OPEN

icp.log.Debug("Found open channel",
zap.String("channel-id", ch.ChannelId),
zap.String("port-id ", ch.PortId),
zap.String("counterparty-channel-id", ch.Counterparty.ChannelId),
zap.String("counterparty-port-id", ch.Counterparty.PortId))

Check warning on line 239 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L235-L239

Added lines #L235 - L239 were not covered by tests
}

return nil
Expand Down Expand Up @@ -284,7 +287,7 @@
}
// }

icp.log.Info("Start to query from height", zap.Int64("height", processedheight))

Check warning on line 290 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L290

Added line #L290 was not covered by tests
// subscribe to monitor block
ctxMonitorBlock, cancelMonitorBlock := context.WithCancel(ctx)
reconnect()
Expand All @@ -306,13 +309,16 @@
case err := <-errCh:
return err

case <-icp.heightSnapshotChan:
icp.SnapshotHeight(icp.getHeightToSave(int64(icp.latestBlock.Height)))

Check warning on line 313 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L312-L313

Added lines #L312 - L313 were not covered by tests

case <-reconnectCh:
cancelMonitorBlock()
ctxMonitorBlock, cancelMonitorBlock = context.WithCancel(ctx)

go func(ctx context.Context, cancel context.CancelFunc) {
blockReq.Height = types.NewHexInt(processedheight)
icp.log.Debug("Try to reconnect from", zap.Int64("height", processedheight))

Check warning on line 321 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L321

Added line #L321 was not covered by tests
err := icp.chainProvider.client.MonitorBlock(ctx, blockReq, func(conn *websocket.Conn, v *types.BlockNotification) error {
if !errors.Is(ctx.Err(), context.Canceled) {
btpBlockNotifCh <- v
Expand All @@ -321,10 +327,10 @@
}, func(conn *websocket.Conn) {
}, func(conn *websocket.Conn, err error) {})
if err != nil {
ht := icp.getHeightToSave(processedheight)
if ht != icp.getLastSavedHeight() {
icp.SnapshotHeight(ht)
}

Check warning on line 333 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L330-L333

Added lines #L330 - L333 were not covered by tests
if errors.Is(err, context.Canceled) {
return
}
Expand All @@ -340,8 +346,8 @@
err := icp.verifyBlock(ctx, br.Header)
if err != nil {
reconnect()
icp.log.Warn("Failed to verify BTP Block",
zap.Int64("height", br.Height),

Check warning on line 350 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L349-L350

Added lines #L349 - L350 were not covered by tests
zap.Error(err),
)
break
Expand All @@ -359,7 +365,7 @@
}

ibcHeaderCache[uint64(br.Height)] = br.Header
icp.log.Debug("Queried block ",

Check warning on line 368 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L368

Added line #L368 was not covered by tests
zap.Int64("height", br.Height))
err = icp.handlePathProcessorUpdate(ctx, br.Header, ibcMessageCache, ibcHeaderCache.Clone())
if err != nil {
Expand All @@ -374,10 +380,6 @@
if br = nil; len(btpBlockRespCh) > 0 {
br = <-btpBlockRespCh
}
ht, takeSnapshot := icp.shouldSnapshot(int(icp.latestBlock.Height))
if takeSnapshot {
icp.SnapshotHeight(ht)
}
}
// remove unprocessed blockResponses
for len(btpBlockRespCh) > 0 {
Expand Down Expand Up @@ -468,32 +470,20 @@
}
}

func (icp *IconChainProcessor) shouldSnapshot(height int) (int, bool) {
blockInterval := icp.Provider().ProviderConfig().GetBlockInterval()
snapshotThreshold := rlycommon.ONE_HOUR / int(blockInterval)

snapshotHeight := icp.getHeightToSave(int64(height))

if snapshotHeight%snapshotThreshold == 0 {
return snapshotHeight, true
}
return 0, false
}

func (icp *IconChainProcessor) getHeightToSave(height int64) int {

Check warning on line 473 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L473

Added line #L473 was not covered by tests
retryAfter := icp.Provider().ProviderConfig().GetFirstRetryBlockAfter()
ht := int(height - int64(retryAfter))
if ht < 0 {
return 0
}
return ht

Check warning on line 479 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L475-L479

Added lines #L475 - L479 were not covered by tests
}

func (icp *IconChainProcessor) SnapshotHeight(height int) {
icp.log.Info("Save height for snapshot", zap.Int("height", height))
err := rlycommon.SnapshotHeight(icp.Provider().ChainId(), height)
if err != nil {
icp.log.Warn("Failed saving height snapshot for height", zap.Int("height", height))

Check warning on line 486 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L482-L486

Added lines #L482 - L486 were not covered by tests
}
}

Expand Down Expand Up @@ -552,8 +542,8 @@
icp.verifier.nextProofContext = header.Validators
icp.verifier.verifiedHeight = int64(header.Height())
icp.verifier.prevNetworkSectionHash = types.NewNetworkSection(header.Header).Hash()
icp.log.Debug("Verified block ",
zap.Uint64("height", header.Height()))

Check warning on line 546 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L545-L546

Added lines #L545 - L546 were not covered by tests
return nil
}

Expand Down Expand Up @@ -630,8 +620,8 @@
request.err = errors.Wrapf(err, "event.UnmarshalFromBytes: %v", err)
return
}
icp.log.Info("Detected eventlog ", zap.Int64("height", request.height),
zap.String("eventlog", IconCosmosEventMap[string(el.Indexed[0])]))

Check warning on line 624 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L623-L624

Added lines #L623 - L624 were not covered by tests
eventlogs = append(eventlogs, el)
}

Expand All @@ -657,7 +647,7 @@
request.response.IsProcessed = processed
return
}
request.err = errors.Wrapf(err, "Failed to get btp header: %v", err)

Check warning on line 650 in relayer/chains/icon/icon_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/icon_chain_processor.go#L650

Added line #L650 was not covered by tests
return
}
request.response.Header = NewIconIBCHeader(btpHeader, validators, int64(btpHeader.MainHeight))
Expand Down
2 changes: 1 addition & 1 deletion relayer/chains/icon/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
if block != nil {
return block.Height, nil
}
return 0, fmt.Errorf("failed to query Block")
return 0, fmt.Errorf("failed to query latest block")

Check warning on line 108 in relayer/chains/icon/query.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/query.go#L108

Added line #L108 was not covered by tests
}

// legacy
Expand Down Expand Up @@ -615,12 +615,12 @@
"portId": portId,
}), &_channel)
if err != nil {
icp.log.Error("unable to fetch channel for ", zap.String("channel-id ", channelId), zap.Error(err))

Check warning on line 618 in relayer/chains/icon/query.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/query.go#L618

Added line #L618 was not covered by tests
continue
}

if _channel == "" {
icp.log.Debug("Channel not present for ", zap.String("channel-id ", channelId), zap.String("port-id ", portId))

Check warning on line 623 in relayer/chains/icon/query.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/query.go#L623

Added line #L623 was not covered by tests
continue
}

Expand All @@ -628,7 +628,7 @@
_, err = HexBytesToProtoUnmarshal(_channel, &channel)
if err != nil {
icp.log.Info("Unable to unmarshal channel for ",
zap.String("channel-id ", channelId), zap.Error(err))

Check warning on line 631 in relayer/chains/icon/query.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/query.go#L631

Added line #L631 was not covered by tests
continue
}

Expand Down Expand Up @@ -673,24 +673,24 @@
"portId": portid,
"channelId": channelid,
}, callParamsWithHeight(types.NewHexInt(height)))
var nextSeqRecv types.HexInt

Check warning on line 676 in relayer/chains/icon/query.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/query.go#L676

Added line #L676 was not covered by tests
if err := icp.client.Call(callParam, &nextSeqRecv); err != nil {
return nil, err
}
key := common.GetNextSequenceRecvCommitmentKey(portid, channelid)
keyHash := common.Sha3keccak256(key, []byte(nextSeqRecv))

Check warning on line 681 in relayer/chains/icon/query.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/query.go#L681

Added line #L681 was not covered by tests

proof, err := icp.QueryIconProof(ctx, height, keyHash)
if err != nil {
return nil, err
}

nextSeq, err := nextSeqRecv.Value()
if err != nil {
return nil, err
}

Check warning on line 691 in relayer/chains/icon/query.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/query.go#L688-L691

Added lines #L688 - L691 were not covered by tests
return &chantypes.QueryNextSequenceReceiveResponse{
NextSequenceReceive: uint64(nextSeq),

Check warning on line 693 in relayer/chains/icon/query.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/icon/query.go#L693

Added line #L693 was not covered by tests
Proof: proof,
ProofHeight: clienttypes.NewHeight(0, uint64(height)),
}, nil
Expand Down
27 changes: 8 additions & 19 deletions relayer/chains/wasm/wasm_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,15 @@
parsedGasPrices *sdk.DecCoins

verifier *Verifier

heightSnapshotChan chan struct{}
}

type Verifier struct {
Header *types.LightBlock
}

func NewWasmChainProcessor(log *zap.Logger, provider *WasmProvider, metrics *processor.PrometheusMetrics) *WasmChainProcessor {
func NewWasmChainProcessor(log *zap.Logger, provider *WasmProvider, metrics *processor.PrometheusMetrics, heightSnapshot chan struct{}) *WasmChainProcessor {

Check warning on line 69 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L69

Added line #L69 was not covered by tests
return &WasmChainProcessor{
log: log.With(zap.String("chain_name", provider.ChainName()), zap.String("chain_id", provider.ChainId())),
chainProvider: provider,
Expand All @@ -74,6 +76,7 @@
connectionClients: make(map[string]string),
channelConnections: make(map[string]string),
metrics: metrics,
heightSnapshotChan: heightSnapshot,

Check warning on line 79 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L79

Added line #L79 was not covered by tests
}
}

Expand Down Expand Up @@ -218,7 +221,7 @@
func (ccp *WasmChainProcessor) Run(ctx context.Context, initialBlockHistory uint64) error {
// this will be used for persistence across query cycle loop executions
persistence := queryCyclePersistence{
minQueryLoopDuration: time.Duration(ccp.chainProvider.PCfg.BlockInterval * uint64(common.NanosecondRatio)),

Check warning on line 224 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L224

Added line #L224 was not covered by tests
lastBalanceUpdate: time.Unix(0, 0),
balanceUpdateWaitDuration: defaultBalanceUpdateWaitDuration,
}
Expand Down Expand Up @@ -290,6 +293,8 @@
select {
case <-ctx.Done():
return nil
case <-ccp.heightSnapshotChan:
ccp.SnapshotHeight(ccp.getHeightToSave(persistence.latestHeight))

Check warning on line 297 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L296-L297

Added lines #L296 - L297 were not covered by tests
case <-ticker.C:
ticker.Reset(persistence.minQueryLoopDuration)
}
Expand All @@ -312,11 +317,11 @@
CounterpartyConnID: c.Counterparty.ConnectionId,
CounterpartyClientID: c.Counterparty.ClientId,
}] = c.State == conntypes.OPEN

ccp.log.Debug("Found open connection",
zap.String("client-id ", c.ClientId),
zap.String("connection-id ", c.Id),
)

Check warning on line 324 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L320-L324

Added lines #L320 - L324 were not covered by tests
}
return nil
}
Expand Down Expand Up @@ -345,11 +350,11 @@
CounterpartyChannelID: ch.Counterparty.ChannelId,
CounterpartyPortID: ch.Counterparty.PortId,
}] = ch.State == chantypes.OPEN
ccp.log.Debug("Found open channel",
zap.String("channel-id", ch.ChannelId),
zap.String("port-id ", ch.PortId),
zap.String("counterparty-channel-id", ch.Counterparty.ChannelId),
zap.String("counterparty-port-id", ch.Counterparty.PortId))

Check warning on line 357 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L353-L357

Added lines #L353 - L357 were not covered by tests
}
return nil
}
Expand All @@ -363,15 +368,16 @@
zap.Uint("attempts", latestHeightQueryRetries),
zap.Error(err),
)

ccp.SnapshotHeight(ccp.getHeightToSave(status.SyncInfo.LatestBlockHeight))
// TODO: Save height when node status is false?
// ccp.SnapshotHeight(ccp.getHeightToSave(status.SyncInfo.LatestBlockHeight))

Check warning on line 373 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L371-L373

Added lines #L371 - L373 were not covered by tests
return nil
}

persistence.latestHeight = status.SyncInfo.LatestBlockHeight
// ccp.chainProvider.setCometVersion(ccp.log, status.NodeInfo.Version)

ccp.log.Debug("Queried latest height",

Check warning on line 380 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L380

Added line #L380 was not covered by tests
zap.Int64("latest_height", persistence.latestHeight),
)

Expand Down Expand Up @@ -404,11 +410,6 @@
chainID := ccp.chainProvider.ChainId()
var latestHeader provider.IBCHeader

ht, takeSnapshot := ccp.shouldSnapshot(int(persistence.latestHeight))
if takeSnapshot {
ccp.SnapshotHeight(ht)
}

for i := persistence.latestQueriedBlock + 1; i <= persistence.latestHeight; i++ {
var eg errgroup.Group
var blockRes *ctypes.ResultBlockResults
Expand All @@ -433,7 +434,7 @@
}

if err := ccp.Verify(ctx, lightBlock); err != nil {
ccp.log.Warn("Failed to verify block", zap.Int64("height", blockRes.Height), zap.Error(err))

Check warning on line 437 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L437

Added line #L437 was not covered by tests
return err
}

Expand All @@ -459,7 +460,7 @@
messages := ibcMessagesFromEvents(ccp.log, tx.Events, chainID, heightUint64, ccp.chainProvider.PCfg.IbcHandlerAddress, base64Encoded)

for _, m := range messages {
ccp.log.Info("Detected eventlog", zap.String("eventlog", m.eventType))

Check warning on line 463 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L463

Added line #L463 was not covered by tests
ccp.handleMessage(ctx, m, ibcMessagesCache)
}
}
Expand Down Expand Up @@ -509,32 +510,20 @@
return nil
}

func (ccp *WasmChainProcessor) shouldSnapshot(height int) (int, bool) {
blockInterval := ccp.Provider().ProviderConfig().GetBlockInterval()
snapshotThreshold := common.ONE_HOUR / int(blockInterval)

snapshotHeight := ccp.getHeightToSave(int64(height))

if snapshotHeight%snapshotThreshold == 0 {
return snapshotHeight, true
}
return 0, false
}

func (ccp *WasmChainProcessor) getHeightToSave(height int64) int {

Check warning on line 513 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L513

Added line #L513 was not covered by tests
retryAfter := ccp.Provider().ProviderConfig().GetFirstRetryBlockAfter()
ht := int(height - int64(retryAfter))
if ht < 0 {
return 0
}
return ht

Check warning on line 519 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L515-L519

Added lines #L515 - L519 were not covered by tests
}

func (ccp *WasmChainProcessor) SnapshotHeight(height int) {
ccp.log.Info("Save height for snapshot", zap.Int("height", height))
err := common.SnapshotHeight(ccp.Provider().ChainId(), height)
if err != nil {
ccp.log.Warn("Failed saving height snapshot for height", zap.Int("height", height))

Check warning on line 526 in relayer/chains/wasm/wasm_chain_processor.go

View check run for this annotation

Codecov / codecov/patch

relayer/chains/wasm/wasm_chain_processor.go#L522-L526

Added lines #L522 - L526 were not covered by tests
}
}

Expand Down
12 changes: 6 additions & 6 deletions relayer/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@

return processor.NewEventProcessor().
WithChainProcessors(
c.chainProcessor(c.log, nil),
dst.chainProcessor(c.log, nil),
c.chainProcessor(c.log, nil, nil),
dst.chainProcessor(c.log, nil, nil),

Check warning on line 75 in relayer/channel.go

View check run for this annotation

Codecov / codecov/patch

relayer/channel.go#L74-L75

Added lines #L74 - L75 were not covered by tests
).
WithPathProcessors(pp).
WithInitialBlockHistory(0).
Expand Down Expand Up @@ -121,8 +121,8 @@

flushProcessor := processor.NewEventProcessor().
WithChainProcessors(
c.chainProcessor(c.log, nil),
dst.chainProcessor(c.log, nil),
c.chainProcessor(c.log, nil, nil),
dst.chainProcessor(c.log, nil, nil),

Check warning on line 125 in relayer/channel.go

View check run for this annotation

Codecov / codecov/patch

relayer/channel.go#L124-L125

Added lines #L124 - L125 were not covered by tests
).
WithPathProcessors(processor.NewPathProcessor(
c.log,
Expand Down Expand Up @@ -159,8 +159,8 @@

return processor.NewEventProcessor().
WithChainProcessors(
c.chainProcessor(c.log, nil),
dst.chainProcessor(c.log, nil),
c.chainProcessor(c.log, nil, nil),
dst.chainProcessor(c.log, nil, nil),

Check warning on line 163 in relayer/channel.go

View check run for this annotation

Codecov / codecov/patch

relayer/channel.go#L162-L163

Added lines #L162 - L163 were not covered by tests
).
WithPathProcessors(processor.NewPathProcessor(
c.log,
Expand Down
4 changes: 2 additions & 2 deletions relayer/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@

return connectionSrc, connectionDst, processor.NewEventProcessor().
WithChainProcessors(
c.chainProcessor(c.log, nil),
dst.chainProcessor(c.log, nil),
c.chainProcessor(c.log, nil, nil),
dst.chainProcessor(c.log, nil, nil),

Check warning on line 65 in relayer/connection.go

View check run for this annotation

Codecov / codecov/patch

relayer/connection.go#L64-L65

Added lines #L64 - L65 were not covered by tests
).
WithPathProcessors(pp).
WithInitialBlockHistory(initialBlockHistory).
Expand Down
38 changes: 33 additions & 5 deletions relayer/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,27 @@
TwoMB = 2 * 1024 * 1024
)

func timerChannel(ctx context.Context, log *zap.Logger, timerChan map[string]chan struct{}, chains map[string]*Chain) {
ticker := time.NewTicker(time.Hour)
defer ticker.Stop()
for {
NamedLoop:
select {
case <-ticker.C:
for _, c := range chains {
_, err := c.ChainProvider.QueryLatestHeight(ctx)
if err != nil {
log.Warn("Failed getting status of chain", zap.String("chain_id", c.ChainID()), zap.Error(err))
break NamedLoop

Check warning on line 47 in relayer/strategies.go

View check run for this annotation

Codecov / codecov/patch

relayer/strategies.go#L36-L47

Added lines #L36 - L47 were not covered by tests
}
}
for _, c := range timerChan {
c <- struct{}{}
}

Check warning on line 52 in relayer/strategies.go

View check run for this annotation

Codecov / codecov/patch

relayer/strategies.go#L50-L52

Added lines #L50 - L52 were not covered by tests
}
}
}

// StartRelayer starts the main relaying loop and returns a channel that will contain any control-flow related errors.
func StartRelayer(
ctx context.Context,
Expand All @@ -49,13 +70,20 @@
metrics *processor.PrometheusMetrics,
) chan error {
errorChan := make(chan error, 1)
chans := make(map[string]chan struct{})

for k := range chains {
chans[k] = make(chan struct{})
}

Check warning on line 77 in relayer/strategies.go

View check run for this annotation

Codecov / codecov/patch

relayer/strategies.go#L73-L77

Added lines #L73 - L77 were not covered by tests

go timerChannel(ctx, log, chans, chains)

Check warning on line 79 in relayer/strategies.go

View check run for this annotation

Codecov / codecov/patch

relayer/strategies.go#L79

Added line #L79 was not covered by tests

switch processorType {
case ProcessorEvents:
chainProcessors := make([]processor.ChainProcessor, 0, len(chains))

for _, chain := range chains {
chainProcessors = append(chainProcessors, chain.chainProcessor(log, metrics))
for name, chain := range chains {
chainProcessors = append(chainProcessors, chain.chainProcessor(log, metrics, chans[name]))

Check warning on line 86 in relayer/strategies.go

View check run for this annotation

Codecov / codecov/patch

relayer/strategies.go#L85-L86

Added lines #L85 - L86 were not covered by tests
}

ePaths := make([]path, len(paths))
Expand Down Expand Up @@ -116,17 +144,17 @@
}

// chainProcessor returns the corresponding ChainProcessor implementation instance for a pathChain.
func (chain *Chain) chainProcessor(log *zap.Logger, metrics *processor.PrometheusMetrics) processor.ChainProcessor {
func (chain *Chain) chainProcessor(log *zap.Logger, metrics *processor.PrometheusMetrics, timerChan chan struct{}) processor.ChainProcessor {

Check warning on line 147 in relayer/strategies.go

View check run for this annotation

Codecov / codecov/patch

relayer/strategies.go#L147

Added line #L147 was not covered by tests
// Handle new ChainProcessor implementations as cases here
switch p := chain.ChainProvider.(type) {
case *penumbraprocessor.PenumbraProvider:
return penumbraprocessor.NewPenumbraChainProcessor(log, p)
case *cosmos.CosmosProvider:
return cosmos.NewCosmosChainProcessor(log, p, metrics)
case *icon.IconProvider:
return icon.NewIconChainProcessor(log, p, metrics)
return icon.NewIconChainProcessor(log, p, metrics, timerChan)

Check warning on line 155 in relayer/strategies.go

View check run for this annotation

Codecov / codecov/patch

relayer/strategies.go#L155

Added line #L155 was not covered by tests
case *wasm.WasmProvider:
return wasm.NewWasmChainProcessor(log, p, metrics)
return wasm.NewWasmChainProcessor(log, p, metrics, timerChan)

Check warning on line 157 in relayer/strategies.go

View check run for this annotation

Codecov / codecov/patch

relayer/strategies.go#L157

Added line #L157 was not covered by tests
default:
panic(fmt.Errorf("unsupported chain provider type: %T", chain.ChainProvider))
}
Expand Down
Loading