Skip to content

feat: log height every hour simulateneously #126

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 7 additions & 17 deletions relayer/chains/icon/icon_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type IconChainProcessor struct {
metrics *processor.PrometheusMetrics

verifier *Verifier

heightSnapshotChan chan struct{}
}

type Verifier struct {
Expand All @@ -74,7 +76,7 @@ type Verifier struct {
prevNetworkSectionHash []byte
}

func NewIconChainProcessor(log *zap.Logger, provider *IconProvider, metrics *processor.PrometheusMetrics) *IconChainProcessor {
func NewIconChainProcessor(log *zap.Logger, provider *IconProvider, metrics *processor.PrometheusMetrics, heightSnapshot chan struct{}) *IconChainProcessor {
return &IconChainProcessor{
log: log.With(zap.String("chain_name", provider.ChainName()), zap.String("chain_id", provider.ChainId())),
chainProvider: provider,
Expand All @@ -84,6 +86,7 @@ func NewIconChainProcessor(log *zap.Logger, provider *IconProvider, metrics *pro
connectionClients: make(map[string]string),
channelConnections: make(map[string]string),
metrics: metrics,
heightSnapshotChan: heightSnapshot,
}
}

Expand Down Expand Up @@ -306,6 +309,9 @@ loop:
case err := <-errCh:
return err

case <-icp.heightSnapshotChan:
icp.SnapshotHeight(icp.getHeightToSave(int64(icp.latestBlock.Height)))

case <-reconnectCh:
cancelMonitorBlock()
ctxMonitorBlock, cancelMonitorBlock = context.WithCancel(ctx)
Expand Down Expand Up @@ -374,10 +380,6 @@ loop:
if br = nil; len(btpBlockRespCh) > 0 {
br = <-btpBlockRespCh
}
ht, takeSnapshot := icp.shouldSnapshot(int(icp.latestBlock.Height))
if takeSnapshot {
icp.SnapshotHeight(ht)
}
}
// remove unprocessed blockResponses
for len(btpBlockRespCh) > 0 {
Expand Down Expand Up @@ -468,18 +470,6 @@ loop:
}
}

func (icp *IconChainProcessor) shouldSnapshot(height int) (int, bool) {
blockInterval := icp.Provider().ProviderConfig().GetBlockInterval()
snapshotThreshold := rlycommon.ONE_HOUR / int(blockInterval)

snapshotHeight := icp.getHeightToSave(int64(height))

if snapshotHeight%snapshotThreshold == 0 {
return snapshotHeight, true
}
return 0, false
}

func (icp *IconChainProcessor) getHeightToSave(height int64) int {
retryAfter := icp.Provider().ProviderConfig().GetFirstRetryBlockAfter()
ht := int(height - int64(retryAfter))
Expand Down
2 changes: 1 addition & 1 deletion relayer/chains/icon/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (icp *IconProvider) QueryLatestHeight(ctx context.Context) (int64, error) {
if block != nil {
return block.Height, nil
}
return 0, fmt.Errorf("failed to query Block")
return 0, fmt.Errorf("failed to query latest block")
}

// legacy
Expand Down
27 changes: 8 additions & 19 deletions relayer/chains/wasm/wasm_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,15 @@ type WasmChainProcessor struct {
parsedGasPrices *sdk.DecCoins

verifier *Verifier

heightSnapshotChan chan struct{}
}

type Verifier struct {
Header *types.LightBlock
}

func NewWasmChainProcessor(log *zap.Logger, provider *WasmProvider, metrics *processor.PrometheusMetrics) *WasmChainProcessor {
func NewWasmChainProcessor(log *zap.Logger, provider *WasmProvider, metrics *processor.PrometheusMetrics, heightSnapshot chan struct{}) *WasmChainProcessor {
return &WasmChainProcessor{
log: log.With(zap.String("chain_name", provider.ChainName()), zap.String("chain_id", provider.ChainId())),
chainProvider: provider,
Expand All @@ -74,6 +76,7 @@ func NewWasmChainProcessor(log *zap.Logger, provider *WasmProvider, metrics *pro
connectionClients: make(map[string]string),
channelConnections: make(map[string]string),
metrics: metrics,
heightSnapshotChan: heightSnapshot,
}
}

Expand Down Expand Up @@ -290,6 +293,8 @@ func (ccp *WasmChainProcessor) Run(ctx context.Context, initialBlockHistory uint
select {
case <-ctx.Done():
return nil
case <-ccp.heightSnapshotChan:
ccp.SnapshotHeight(ccp.getHeightToSave(persistence.latestHeight))
case <-ticker.C:
ticker.Reset(persistence.minQueryLoopDuration)
}
Expand Down Expand Up @@ -364,7 +369,8 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer
zap.Error(err),
)

ccp.SnapshotHeight(ccp.getHeightToSave(status.SyncInfo.LatestBlockHeight))
// TODO: Save height when node status is false?
// ccp.SnapshotHeight(ccp.getHeightToSave(status.SyncInfo.LatestBlockHeight))
return nil
}

Expand Down Expand Up @@ -404,11 +410,6 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer
chainID := ccp.chainProvider.ChainId()
var latestHeader provider.IBCHeader

ht, takeSnapshot := ccp.shouldSnapshot(int(persistence.latestHeight))
if takeSnapshot {
ccp.SnapshotHeight(ht)
}

for i := persistence.latestQueriedBlock + 1; i <= persistence.latestHeight; i++ {
var eg errgroup.Group
var blockRes *ctypes.ResultBlockResults
Expand Down Expand Up @@ -509,18 +510,6 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer
return nil
}

func (ccp *WasmChainProcessor) shouldSnapshot(height int) (int, bool) {
blockInterval := ccp.Provider().ProviderConfig().GetBlockInterval()
snapshotThreshold := common.ONE_HOUR / int(blockInterval)

snapshotHeight := ccp.getHeightToSave(int64(height))

if snapshotHeight%snapshotThreshold == 0 {
return snapshotHeight, true
}
return 0, false
}

func (ccp *WasmChainProcessor) getHeightToSave(height int64) int {
retryAfter := ccp.Provider().ProviderConfig().GetFirstRetryBlockAfter()
ht := int(height - int64(retryAfter))
Expand Down
12 changes: 6 additions & 6 deletions relayer/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ func (c *Chain) CreateOpenChannels(

return processor.NewEventProcessor().
WithChainProcessors(
c.chainProcessor(c.log, nil),
dst.chainProcessor(c.log, nil),
c.chainProcessor(c.log, nil, nil),
dst.chainProcessor(c.log, nil, nil),
).
WithPathProcessors(pp).
WithInitialBlockHistory(0).
Expand Down Expand Up @@ -121,8 +121,8 @@ func (c *Chain) CloseChannel(

flushProcessor := processor.NewEventProcessor().
WithChainProcessors(
c.chainProcessor(c.log, nil),
dst.chainProcessor(c.log, nil),
c.chainProcessor(c.log, nil, nil),
dst.chainProcessor(c.log, nil, nil),
).
WithPathProcessors(processor.NewPathProcessor(
c.log,
Expand Down Expand Up @@ -159,8 +159,8 @@ func (c *Chain) CloseChannel(

return processor.NewEventProcessor().
WithChainProcessors(
c.chainProcessor(c.log, nil),
dst.chainProcessor(c.log, nil),
c.chainProcessor(c.log, nil, nil),
dst.chainProcessor(c.log, nil, nil),
).
WithPathProcessors(processor.NewPathProcessor(
c.log,
Expand Down
4 changes: 2 additions & 2 deletions relayer/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func (c *Chain) CreateOpenConnections(

return connectionSrc, connectionDst, processor.NewEventProcessor().
WithChainProcessors(
c.chainProcessor(c.log, nil),
dst.chainProcessor(c.log, nil),
c.chainProcessor(c.log, nil, nil),
dst.chainProcessor(c.log, nil, nil),
).
WithPathProcessors(pp).
WithInitialBlockHistory(initialBlockHistory).
Expand Down
38 changes: 33 additions & 5 deletions relayer/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,27 @@ const (
TwoMB = 2 * 1024 * 1024
)

func timerChannel(ctx context.Context, log *zap.Logger, timerChan map[string]chan struct{}, chains map[string]*Chain) {
ticker := time.NewTicker(time.Hour)
defer ticker.Stop()
for {
NamedLoop:
select {
case <-ticker.C:
for _, c := range chains {
_, err := c.ChainProvider.QueryLatestHeight(ctx)
if err != nil {
log.Warn("Failed getting status of chain", zap.String("chain_id", c.ChainID()), zap.Error(err))
break NamedLoop
}
}
for _, c := range timerChan {
c <- struct{}{}
}
}
}
}

// StartRelayer starts the main relaying loop and returns a channel that will contain any control-flow related errors.
func StartRelayer(
ctx context.Context,
Expand All @@ -49,13 +70,20 @@ func StartRelayer(
metrics *processor.PrometheusMetrics,
) chan error {
errorChan := make(chan error, 1)
chans := make(map[string]chan struct{})

for k := range chains {
chans[k] = make(chan struct{})
}

go timerChannel(ctx, log, chans, chains)

switch processorType {
case ProcessorEvents:
chainProcessors := make([]processor.ChainProcessor, 0, len(chains))

for _, chain := range chains {
chainProcessors = append(chainProcessors, chain.chainProcessor(log, metrics))
for name, chain := range chains {
chainProcessors = append(chainProcessors, chain.chainProcessor(log, metrics, chans[name]))
}

ePaths := make([]path, len(paths))
Expand Down Expand Up @@ -116,17 +144,17 @@ type path struct {
}

// chainProcessor returns the corresponding ChainProcessor implementation instance for a pathChain.
func (chain *Chain) chainProcessor(log *zap.Logger, metrics *processor.PrometheusMetrics) processor.ChainProcessor {
func (chain *Chain) chainProcessor(log *zap.Logger, metrics *processor.PrometheusMetrics, timerChan chan struct{}) processor.ChainProcessor {
// Handle new ChainProcessor implementations as cases here
switch p := chain.ChainProvider.(type) {
case *penumbraprocessor.PenumbraProvider:
return penumbraprocessor.NewPenumbraChainProcessor(log, p)
case *cosmos.CosmosProvider:
return cosmos.NewCosmosChainProcessor(log, p, metrics)
case *icon.IconProvider:
return icon.NewIconChainProcessor(log, p, metrics)
return icon.NewIconChainProcessor(log, p, metrics, timerChan)
case *wasm.WasmProvider:
return wasm.NewWasmChainProcessor(log, p, metrics)
return wasm.NewWasmChainProcessor(log, p, metrics, timerChan)
default:
panic(fmt.Errorf("unsupported chain provider type: %T", chain.ChainProvider))
}
Expand Down