diff --git a/relayer/chains/cosmos/query.go b/relayer/chains/cosmos/query.go index 14780b0cb..687deca48 100644 --- a/relayer/chains/cosmos/query.go +++ b/relayer/chains/cosmos/query.go @@ -1175,9 +1175,10 @@ func (ap *CosmosProvider) QueryClientPrevConsensusStateHeight(ctx context.Contex panic("QueryClientPrevConsensusStateHeight not implemented") } -func (ap *CosmosProvider) QuerySendPacketByHeight(ctx context.Context, srcChanID, srcPortID string, sequence uint64, seqHeight uint64) (provider.PacketInfo, error) { +func (ap *CosmosProvider) QueryPacketMessageByEventHeight(ctx context.Context, eventType, srcChanID, srcPortID string, sequence uint64, seqHeight uint64) (provider.PacketInfo, error) { panic("QuerySendPacketByHeight not implemented") } + func (ap *CosmosProvider) QueryPacketHeights(ctx context.Context, latestHeight int64, channelId, portId string, startSeq, endSeq uint64) (packetHeights provider.MessageHeights, err error) { panic("QueryPacketHeights not implemented") } diff --git a/relayer/chains/icon/query.go b/relayer/chains/icon/query.go index 33e5fdf5b..3c481a6d3 100644 --- a/relayer/chains/icon/query.go +++ b/relayer/chains/icon/query.go @@ -850,7 +850,15 @@ func (icp *IconProvider) QueryMessageHeights(ctx context.Context, methodName str return packetHeights, nil } -func (ap *IconProvider) QuerySendPacketByHeight(ctx context.Context, srcChanID, srcPortID string, sequence uint64, seqHeight uint64) (provider.PacketInfo, error) { +func (ap *IconProvider) QueryPacketMessageByEventHeight(ctx context.Context, eventType string, srcChanID, srcPortID string, sequence uint64, seqHeight uint64) (provider.PacketInfo, error) { + var eventName = "" + switch eventType { + case chantypes.EventTypeSendPacket: + eventName = EventTypeSendPacket + case chantypes.EventTypeWriteAck: + eventName = EventTypeWriteAcknowledgement + } + block, err := ap.client.GetBlockByHeight(&types.BlockHeightParam{ Height: types.NewHexInt(int64(seqHeight)), }) @@ -870,7 +878,7 @@ func (ap *IconProvider) QuerySendPacketByHeight(ctx context.Context, srcChanID, if el.Addr != types.Address(ap.PCfg.IbcHandlerAddress) && // sendPacket will be of index length 2 len(el.Indexed) != 2 && - el.Indexed[0] != EventTypeSendPacket { + el.Indexed[0] != eventName { continue } packetStr := el.Indexed[1] diff --git a/relayer/chains/penumbra/query.go b/relayer/chains/penumbra/query.go index e214af5f1..0ed2ecbb8 100644 --- a/relayer/chains/penumbra/query.go +++ b/relayer/chains/penumbra/query.go @@ -989,9 +989,10 @@ func (cc *PenumbraProvider) QueryClientPrevConsensusStateHeight(ctx context.Cont panic("QueryClientPrevConsensusStateHeight not implemented") } -func (ap *PenumbraProvider) QuerySendPacketByHeight(ctx context.Context, srcChanID, srcPortID string, sequence uint64, seqHeight uint64) (provider.PacketInfo, error) { +func (ap *PenumbraProvider) QueryPacketMessageByEventHeight(ctx context.Context, eventType, srcChanID, srcPortID string, sequence uint64, height uint64) (provider.PacketInfo, error) { panic("QuerySendPacketByHeight not implemented") } + func (ap *PenumbraProvider) QueryPacketHeights(ctx context.Context, latestHeight int64, channelId, portId string, startSeq, endSeq uint64) (packetHeights provider.MessageHeights, err error) { panic("QueryPacketHeights not implemented") } diff --git a/relayer/chains/wasm/query.go b/relayer/chains/wasm/query.go index cbd70b3f4..301923ff1 100644 --- a/relayer/chains/wasm/query.go +++ b/relayer/chains/wasm/query.go @@ -974,7 +974,7 @@ func (ap *WasmProvider) QueryAckHeights(ctx context.Context, latestHeight int64, return ap.QueryMessageHeights(ctx, MethodGetAckHeights, latestHeight, channelId, portId, startSeq, endSeq) } -func (ap *WasmProvider) QuerySendPacketByHeight(ctx context.Context, srcChanID, srcPortID string, sequence uint64, seqHeight uint64) (provider.PacketInfo, error) { +func (ap *WasmProvider) QueryPacketMessageByEventHeight(ctx context.Context, eventType, srcChanID, srcPortID string, sequence uint64, seqHeight uint64) (provider.PacketInfo, error) { h := int64(seqHeight) blockRes, err := ap.RPCClient.BlockResults(ctx, &h) @@ -990,6 +990,10 @@ func (ap *WasmProvider) QuerySendPacketByHeight(ctx context.Context, srcChanID, } messages := ibcMessagesFromEvents(ap.log, tx.Events, ap.ChainId(), seqHeight, ap.PCfg.IbcHandlerAddress, base64Encoded) for _, m := range messages { + // in case eventtype donot match + if m.eventType != eventType { + continue + } switch t := m.info.(type) { case *packetInfo: packet := provider.PacketInfo(*t) diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index 33d16117b..22d1d470f 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -1815,7 +1815,7 @@ func (pp *PathProcessor) queuePendingRecvAndAcksByHeights( seq := seq eg.Go(func() error { - sendPacket, err := src.chainProvider.QuerySendPacketByHeight(ctx, k.ChannelID, k.PortID, seq, seqHeight) + sendPacket, err := src.chainProvider.QueryPacketMessageByEventHeight(ctx, chantypes.EventTypeSendPacket, k.ChannelID, k.PortID, seq, seqHeight) if err != nil { return err } @@ -1853,75 +1853,76 @@ func (pp *PathProcessor) queuePendingRecvAndAcksByHeights( ) } - // TODO: for ackedMessage - // var unacked []uint64 - - // SeqLoop: - // for _, seq := range seqs { - // for _, unrecvSeq := range unrecv { - // if seq == unrecvSeq { - // continue SeqLoop - // } - // } - // // does not exist in unrecv, so this is an ack that must be written - // unacked = append(unacked, seq) - // } - - // for i, seq := range unacked { - // dstMu.Lock() - // ck := k.Counterparty() - // if dstCache.IsCached(chantypes.EventTypeRecvPacket, ck, seq) && - // dstCache.IsCached(chantypes.EventTypeWriteAck, ck, seq) { - // continue // already cached - // } - // dstMu.Unlock() - - // if i >= maxPacketsPerFlush { - // skipped = true - // break - // } - - // seq := seq - - // dst.log.Debug("Querying recv packet", - // zap.String("channel", k.CounterpartyChannelID), - // zap.String("port", k.CounterpartyPortID), - // zap.Uint64("sequence", seq), - // ) - - // eg.Go(func() error { - // recvPacket, err := dst.chainProvider.QueryRecvPacket(ctx, k.CounterpartyChannelID, k.CounterpartyPortID, seq) - // if err != nil { - // return err - // } - - // ck := k.Counterparty() - // dstMu.Lock() - // dstCache.Cache(chantypes.EventTypeRecvPacket, ck, seq, recvPacket) - // dstCache.Cache(chantypes.EventTypeWriteAck, ck, seq, recvPacket) - // dstMu.Unlock() - - // return nil - // }) - // } - - // if err := eg.Wait(); err != nil { - // return false, err - // } - - // if len(unacked) > 0 { - // dst.log.Debug( - // "Will flush MsgAcknowledgement", - // zap.Object("channel", k), - // zap.Uint64s("sequences", unacked), - // ) - // } else { - // dst.log.Debug( - // "No MsgAcknowledgement to flush", - // zap.String("channel", k.CounterpartyChannelID), - // zap.String("port", k.CounterpartyPortID), - // ) - // } + var unacked []uint64 + + ackHeights, err := dst.chainProvider.QueryAckHeights(ctx, int64(dst.latestBlock.Height), dstChan, dstPort, packetHeights.StartSeq, packetHeights.EndSeq) + if err != nil { + return false, err + } + + counter := 0 + for seq, height := range ackHeights { + + // Is packetHeights is present then Ack not received + // not present means ack already received + _, ok := packetHeights.MessageHeights[seq] + if !ok { + continue + } + + dstMu.Lock() + ck := k.Counterparty() + if dstCache.IsCached(chantypes.EventTypeWriteAck, ck, seq) { + continue // already cached + } + dstMu.Unlock() + + if counter >= maxPacketsPerFlush { + skipped = true + break + } + + seq := seq + + dst.log.Debug("Querying write Ack", + zap.String("channel", k.CounterpartyChannelID), + zap.String("port", k.CounterpartyPortID), + zap.Uint64("sequence", seq), + ) + + eg.Go(func() error { + AckPacket, err := dst.chainProvider.QueryPacketMessageByEventHeight(ctx, chantypes.EventTypeWriteAck, k.CounterpartyChannelID, k.CounterpartyPortID, seq, height) + if err != nil { + return err + } + + ck := k.Counterparty() + dstMu.Lock() + dstCache.Cache(chantypes.EventTypeWriteAck, ck, seq, AckPacket) + dstMu.Unlock() + + return nil + }) + counter++ + } + + if err := eg.Wait(); err != nil { + return false, err + } + + if len(unacked) > 0 { + dst.log.Debug( + "Will flush MsgAcknowledgement", + zap.Object("channel", k), + zap.Uint64s("sequences", unacked), + ) + } else { + dst.log.Debug( + "No MsgAcknowledgement to flush", + zap.String("channel", k.CounterpartyChannelID), + zap.String("port", k.CounterpartyPortID), + ) + } return !skipped, nil } diff --git a/relayer/provider/provider.go b/relayer/provider/provider.go index 5cec51b09..c7c0c4354 100644 --- a/relayer/provider/provider.go +++ b/relayer/provider/provider.go @@ -438,8 +438,7 @@ type QueryProvider interface { // query packet info for sequence QuerySendPacket(ctx context.Context, srcChanID, srcPortID string, sequence uint64) (PacketInfo, error) QueryRecvPacket(ctx context.Context, dstChanID, dstPortID string, sequence uint64) (PacketInfo, error) - QuerySendPacketByHeight(ctx context.Context, srcChanID, srcPortID string, sequence uint64, seqHeight uint64) (PacketInfo, error) - + QueryPacketMessageByEventHeight(ctx context.Context, eventType string, srcChanID, srcPortID string, sequence uint64, height uint64) (PacketInfo, error) // bank QueryBalance(ctx context.Context, keyName string) (sdk.Coins, error) QueryBalanceWithAddress(ctx context.Context, addr string) (sdk.Coins, error)