Skip to content

Commit

Permalink
feat: log height every hour simulateneously (#126)
Browse files Browse the repository at this point in the history
* feat: log height every hour simulateneously

* fix: add missing method

* fix: remove should snapshot

---------

Co-authored-by: izyak <[email protected]>
Co-authored-by: izyak <[email protected]>
Co-authored-by: viveksharmapoudel <[email protected]>
  • Loading branch information
4 people authored Aug 9, 2023
1 parent b58f471 commit 2fc35ae
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 50 deletions.
24 changes: 7 additions & 17 deletions relayer/chains/icon/icon_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type IconChainProcessor struct {
metrics *processor.PrometheusMetrics

verifier *Verifier

heightSnapshotChan chan struct{}
}

type Verifier struct {
Expand All @@ -74,7 +76,7 @@ type Verifier struct {
prevNetworkSectionHash []byte
}

func NewIconChainProcessor(log *zap.Logger, provider *IconProvider, metrics *processor.PrometheusMetrics) *IconChainProcessor {
func NewIconChainProcessor(log *zap.Logger, provider *IconProvider, metrics *processor.PrometheusMetrics, heightSnapshot chan struct{}) *IconChainProcessor {
return &IconChainProcessor{
log: log.With(zap.String("chain_name", provider.ChainName()), zap.String("chain_id", provider.ChainId())),
chainProvider: provider,
Expand All @@ -84,6 +86,7 @@ func NewIconChainProcessor(log *zap.Logger, provider *IconProvider, metrics *pro
connectionClients: make(map[string]string),
channelConnections: make(map[string]string),
metrics: metrics,
heightSnapshotChan: heightSnapshot,
}
}

Expand Down Expand Up @@ -306,6 +309,9 @@ loop:
case err := <-errCh:
return err

case <-icp.heightSnapshotChan:
icp.SnapshotHeight(icp.getHeightToSave(int64(icp.latestBlock.Height)))

case <-reconnectCh:
cancelMonitorBlock()
ctxMonitorBlock, cancelMonitorBlock = context.WithCancel(ctx)
Expand Down Expand Up @@ -374,10 +380,6 @@ loop:
if br = nil; len(btpBlockRespCh) > 0 {
br = <-btpBlockRespCh
}
ht, takeSnapshot := icp.shouldSnapshot(int(icp.latestBlock.Height))
if takeSnapshot {
icp.SnapshotHeight(ht)
}
}
// remove unprocessed blockResponses
for len(btpBlockRespCh) > 0 {
Expand Down Expand Up @@ -468,18 +470,6 @@ loop:
}
}

func (icp *IconChainProcessor) shouldSnapshot(height int) (int, bool) {
blockInterval := icp.Provider().ProviderConfig().GetBlockInterval()
snapshotThreshold := rlycommon.ONE_HOUR / int(blockInterval)

snapshotHeight := icp.getHeightToSave(int64(height))

if snapshotHeight%snapshotThreshold == 0 {
return snapshotHeight, true
}
return 0, false
}

func (icp *IconChainProcessor) getHeightToSave(height int64) int {
retryAfter := icp.Provider().ProviderConfig().GetFirstRetryBlockAfter()
ht := int(height - int64(retryAfter))
Expand Down
2 changes: 1 addition & 1 deletion relayer/chains/icon/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (icp *IconProvider) QueryLatestHeight(ctx context.Context) (int64, error) {
if block != nil {
return block.Height, nil
}
return 0, fmt.Errorf("failed to query Block")
return 0, fmt.Errorf("failed to query latest block")
}

// legacy
Expand Down
27 changes: 8 additions & 19 deletions relayer/chains/wasm/wasm_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,15 @@ type WasmChainProcessor struct {
parsedGasPrices *sdk.DecCoins

verifier *Verifier

heightSnapshotChan chan struct{}
}

type Verifier struct {
Header *types.LightBlock
}

func NewWasmChainProcessor(log *zap.Logger, provider *WasmProvider, metrics *processor.PrometheusMetrics) *WasmChainProcessor {
func NewWasmChainProcessor(log *zap.Logger, provider *WasmProvider, metrics *processor.PrometheusMetrics, heightSnapshot chan struct{}) *WasmChainProcessor {
return &WasmChainProcessor{
log: log.With(zap.String("chain_name", provider.ChainName()), zap.String("chain_id", provider.ChainId())),
chainProvider: provider,
Expand All @@ -74,6 +76,7 @@ func NewWasmChainProcessor(log *zap.Logger, provider *WasmProvider, metrics *pro
connectionClients: make(map[string]string),
channelConnections: make(map[string]string),
metrics: metrics,
heightSnapshotChan: heightSnapshot,
}
}

Expand Down Expand Up @@ -290,6 +293,8 @@ func (ccp *WasmChainProcessor) Run(ctx context.Context, initialBlockHistory uint
select {
case <-ctx.Done():
return nil
case <-ccp.heightSnapshotChan:
ccp.SnapshotHeight(ccp.getHeightToSave(persistence.latestHeight))
case <-ticker.C:
ticker.Reset(persistence.minQueryLoopDuration)
}
Expand Down Expand Up @@ -364,7 +369,8 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer
zap.Error(err),
)

ccp.SnapshotHeight(ccp.getHeightToSave(status.SyncInfo.LatestBlockHeight))
// TODO: Save height when node status is false?
// ccp.SnapshotHeight(ccp.getHeightToSave(status.SyncInfo.LatestBlockHeight))
return nil
}

Expand Down Expand Up @@ -404,11 +410,6 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer
chainID := ccp.chainProvider.ChainId()
var latestHeader provider.IBCHeader

ht, takeSnapshot := ccp.shouldSnapshot(int(persistence.latestHeight))
if takeSnapshot {
ccp.SnapshotHeight(ht)
}

for i := persistence.latestQueriedBlock + 1; i <= persistence.latestHeight; i++ {
var eg errgroup.Group
var blockRes *ctypes.ResultBlockResults
Expand Down Expand Up @@ -509,18 +510,6 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer
return nil
}

func (ccp *WasmChainProcessor) shouldSnapshot(height int) (int, bool) {
blockInterval := ccp.Provider().ProviderConfig().GetBlockInterval()
snapshotThreshold := common.ONE_HOUR / int(blockInterval)

snapshotHeight := ccp.getHeightToSave(int64(height))

if snapshotHeight%snapshotThreshold == 0 {
return snapshotHeight, true
}
return 0, false
}

func (ccp *WasmChainProcessor) getHeightToSave(height int64) int {
retryAfter := ccp.Provider().ProviderConfig().GetFirstRetryBlockAfter()
ht := int(height - int64(retryAfter))
Expand Down
12 changes: 6 additions & 6 deletions relayer/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ func (c *Chain) CreateOpenChannels(

return processor.NewEventProcessor().
WithChainProcessors(
c.chainProcessor(c.log, nil),
dst.chainProcessor(c.log, nil),
c.chainProcessor(c.log, nil, nil),
dst.chainProcessor(c.log, nil, nil),
).
WithPathProcessors(pp).
WithInitialBlockHistory(0).
Expand Down Expand Up @@ -121,8 +121,8 @@ func (c *Chain) CloseChannel(

flushProcessor := processor.NewEventProcessor().
WithChainProcessors(
c.chainProcessor(c.log, nil),
dst.chainProcessor(c.log, nil),
c.chainProcessor(c.log, nil, nil),
dst.chainProcessor(c.log, nil, nil),
).
WithPathProcessors(processor.NewPathProcessor(
c.log,
Expand Down Expand Up @@ -159,8 +159,8 @@ func (c *Chain) CloseChannel(

return processor.NewEventProcessor().
WithChainProcessors(
c.chainProcessor(c.log, nil),
dst.chainProcessor(c.log, nil),
c.chainProcessor(c.log, nil, nil),
dst.chainProcessor(c.log, nil, nil),
).
WithPathProcessors(processor.NewPathProcessor(
c.log,
Expand Down
4 changes: 2 additions & 2 deletions relayer/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func (c *Chain) CreateOpenConnections(

return connectionSrc, connectionDst, processor.NewEventProcessor().
WithChainProcessors(
c.chainProcessor(c.log, nil),
dst.chainProcessor(c.log, nil),
c.chainProcessor(c.log, nil, nil),
dst.chainProcessor(c.log, nil, nil),
).
WithPathProcessors(pp).
WithInitialBlockHistory(initialBlockHistory).
Expand Down
38 changes: 33 additions & 5 deletions relayer/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,27 @@ const (
TwoMB = 2 * 1024 * 1024
)

func timerChannel(ctx context.Context, log *zap.Logger, timerChan map[string]chan struct{}, chains map[string]*Chain) {
ticker := time.NewTicker(time.Hour)
defer ticker.Stop()
for {
NamedLoop:
select {
case <-ticker.C:
for _, c := range chains {
_, err := c.ChainProvider.QueryLatestHeight(ctx)
if err != nil {
log.Warn("Failed getting status of chain", zap.String("chain_id", c.ChainID()), zap.Error(err))
break NamedLoop
}
}
for _, c := range timerChan {
c <- struct{}{}
}
}
}
}

// StartRelayer starts the main relaying loop and returns a channel that will contain any control-flow related errors.
func StartRelayer(
ctx context.Context,
Expand All @@ -49,13 +70,20 @@ func StartRelayer(
metrics *processor.PrometheusMetrics,
) chan error {
errorChan := make(chan error, 1)
chans := make(map[string]chan struct{})

for k := range chains {
chans[k] = make(chan struct{})
}

go timerChannel(ctx, log, chans, chains)

switch processorType {
case ProcessorEvents:
chainProcessors := make([]processor.ChainProcessor, 0, len(chains))

for _, chain := range chains {
chainProcessors = append(chainProcessors, chain.chainProcessor(log, metrics))
for name, chain := range chains {
chainProcessors = append(chainProcessors, chain.chainProcessor(log, metrics, chans[name]))
}

ePaths := make([]path, len(paths))
Expand Down Expand Up @@ -116,17 +144,17 @@ type path struct {
}

// chainProcessor returns the corresponding ChainProcessor implementation instance for a pathChain.
func (chain *Chain) chainProcessor(log *zap.Logger, metrics *processor.PrometheusMetrics) processor.ChainProcessor {
func (chain *Chain) chainProcessor(log *zap.Logger, metrics *processor.PrometheusMetrics, timerChan chan struct{}) processor.ChainProcessor {
// Handle new ChainProcessor implementations as cases here
switch p := chain.ChainProvider.(type) {
case *penumbraprocessor.PenumbraProvider:
return penumbraprocessor.NewPenumbraChainProcessor(log, p)
case *cosmos.CosmosProvider:
return cosmos.NewCosmosChainProcessor(log, p, metrics)
case *icon.IconProvider:
return icon.NewIconChainProcessor(log, p, metrics)
return icon.NewIconChainProcessor(log, p, metrics, timerChan)
case *wasm.WasmProvider:
return wasm.NewWasmChainProcessor(log, p, metrics)
return wasm.NewWasmChainProcessor(log, p, metrics, timerChan)
default:
panic(fmt.Errorf("unsupported chain provider type: %T", chain.ChainProvider))
}
Expand Down

0 comments on commit 2fc35ae

Please sign in to comment.