From 65efbdf2997a228b7f543792ceeb8d48f02c9dbd Mon Sep 17 00:00:00 2001 From: Mikel Cortes Date: Tue, 26 Mar 2024 11:37:51 +0100 Subject: [PATCH 1/4] update ethereum RPCs and Services --- go.mod | 2 +- go.sum | 2 + pkg/crawler/ethereum.go | 5 + pkg/networks/ethereum/beacon_blobs.go | 66 +++++++++ pkg/networks/ethereum/beacon_blocks.go | 66 +++++++++ pkg/networks/ethereum/rpc/methods/blobs.go | 140 ++++++++++++++++++++ pkg/networks/ethereum/rpc/methods/blocks.go | 70 ++++------ 7 files changed, 303 insertions(+), 48 deletions(-) create mode 100644 pkg/networks/ethereum/beacon_blobs.go create mode 100644 pkg/networks/ethereum/beacon_blocks.go create mode 100644 pkg/networks/ethereum/rpc/methods/blobs.go diff --git a/go.mod b/go.mod index d029c05..0ee0258 100644 --- a/go.mod +++ b/go.mod @@ -122,7 +122,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/polydawn/refmt v0.0.0-20190807091052-3d65705ee9f1 // indirect github.com/prometheus/client_model v0.6.0 // indirect - github.com/prometheus/common v0.50.0 // indirect + github.com/prometheus/common v0.51.1 // indirect github.com/prometheus/procfs v0.13.0 // indirect github.com/protolambda/bls12-381-util v0.1.0 // indirect github.com/quic-go/qpack v0.4.0 // indirect diff --git a/go.sum b/go.sum index 0eff705..0978caf 100644 --- a/go.sum +++ b/go.sum @@ -939,6 +939,8 @@ github.com/prometheus/common v0.15.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16 github.com/prometheus/common v0.18.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= github.com/prometheus/common v0.50.0 h1:YSZE6aa9+luNa2da6/Tik0q0A5AbR+U003TItK57CPQ= github.com/prometheus/common v0.50.0/go.mod h1:wHFBCEVWVmHMUpg7pYcOm2QUR/ocQdYSJVQJKnHc3xQ= +github.com/prometheus/common v0.51.1 h1:eIjN50Bwglz6a/c3hAgSMcofL3nD+nFQkV6Dd4DsQCw= +github.com/prometheus/common v0.51.1/go.mod h1:lrWtQx+iDfn2mbH5GUzlH9TSHyfZpHkSiG1W7y3sF2Q= github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= diff --git a/pkg/crawler/ethereum.go b/pkg/crawler/ethereum.go index 1c9d790..65904cc 100644 --- a/pkg/crawler/ethereum.go +++ b/pkg/crawler/ethereum.go @@ -237,8 +237,13 @@ func NewEthereumCrawler(mainCtx *cli.Context, conf config.EthereumCrawlerConfig) func (c *EthereumCrawler) Run() { // init all the eth_protocols c.EthNode.ServeBeaconPing(c.Host.Host()) + c.EthNode.ServeBeaconGoodbye(c.Host.Host()) c.EthNode.ServeBeaconStatus(c.Host.Host()) c.EthNode.ServeBeaconMetadata(c.Host.Host()) + c.EthNode.ServeBeaconBlocksByRootV2(c.Host.Host()) + c.EthNode.ServeBeaconBlocksByRangeV2(c.Host.Host()) + c.EthNode.ServeBeaconBlobsByRootV1(c.Host.Host()) + c.EthNode.ServeBeaconBlobsByRangeV1(c.Host.Host()) // initialization secuence for the crawler c.Events.Start(c.ctx) diff --git a/pkg/networks/ethereum/beacon_blobs.go b/pkg/networks/ethereum/beacon_blobs.go new file mode 100644 index 0000000..50f5962 --- /dev/null +++ b/pkg/networks/ethereum/beacon_blobs.go @@ -0,0 +1,66 @@ +package ethereum + +import ( + "context" + + "github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/methods" + "github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/reqresp" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + log "github.com/sirupsen/logrus" +) + +func (en *LocalEthereumNode) ServeBeaconBlobsByRangeV1(h host.Host) { + go func() { + sCtxFn := func() context.Context { + reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout) + return reqCtx + } + comp := new(reqresp.SnappyCompression) + listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) { + blobsRange := new(methods.BlobsByRangeReqV1) + err := handler.ReadRequest(blobsRange) + if err != nil { + _ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse blobs_by_range request") + log.Errorf("failed to read blobs_by_range request: %v from %s", err, peerId.String()) + } else { + log.Info("dropped blobs_by_range request", *blobsRange) + } + } + b := methods.BlobsByRangeRPCv1 + streamHandler := b.MakeStreamHandler(sCtxFn, comp, listenReq) + h.SetStreamHandler(b.Protocol, streamHandler) + log.Info("Started serving blobs_by_range") + // wait untill the ctx is down + <-en.ctx.Done() // TODO: do it better + log.Info("Stopped serving blobs_by_range") + }() +} + +func (en *LocalEthereumNode) ServeBeaconBlobsByRootV1(h host.Host) { + go func() { + sCtxFn := func() context.Context { + reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout) + return reqCtx + } + comp := new(reqresp.SnappyCompression) + listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) { + blobRoots := new(methods.BlobByRootV1) + err := handler.ReadRequest(blobRoots) + if err != nil { + _ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse blobs_by_root request") + log.Errorf("failed to read blobs_by_root request: %v from %s", err, peerId.String()) + } else { + log.Info("dropped blobs_by_root request", *blobRoots) + } + } + b := methods.BlobsByRootRPCv1 + streamHandler := b.MakeStreamHandler(sCtxFn, comp, listenReq) + h.SetStreamHandler(b.Protocol, streamHandler) + log.Info("Started serving blobs_by_root") + // wait untill the ctx is down + <-en.ctx.Done() // TODO: do it better + log.Info("Stopped serving blobs_by_root") + }() +} diff --git a/pkg/networks/ethereum/beacon_blocks.go b/pkg/networks/ethereum/beacon_blocks.go new file mode 100644 index 0000000..76918f0 --- /dev/null +++ b/pkg/networks/ethereum/beacon_blocks.go @@ -0,0 +1,66 @@ +package ethereum + +import ( + "context" + + "github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/methods" + "github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/reqresp" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + log "github.com/sirupsen/logrus" +) + +func (en *LocalEthereumNode) ServeBeaconBlocksByRangeV2(h host.Host) { + go func() { + sCtxFn := func() context.Context { + reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout) + return reqCtx + } + comp := new(reqresp.SnappyCompression) + listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) { + blockRange := new(methods.BlocksByRootReq) + err := handler.ReadRequest(blockRange) + if err != nil { + _ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse block_by_range request") + log.Errorf("failed to read block_by_range request: %v from %s", err, peerId.String()) + } else { + log.Infof("dropped block_by_range request %v", *blockRange) + } + } + m := methods.BlocksByRangeRPCv2 + streamHandler := m.MakeStreamHandler(sCtxFn, comp, listenReq) + h.SetStreamHandler(m.Protocol, streamHandler) + log.Info("Started serving block_by_range") + // wait untill the ctx is down + <-en.ctx.Done() // TODO: do it better + log.Info("Stopped serving block_by_range") + }() +} + +func (en *LocalEthereumNode) ServeBeaconBlocksByRootV2(h host.Host) { + go func() { + sCtxFn := func() context.Context { + reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout) + return reqCtx + } + comp := new(reqresp.SnappyCompression) + listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) { + blockRoot := new(methods.BlocksByRootReq) + err := handler.ReadRequest(blockRoot) + if err != nil { + _ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse block_by_root request") + log.Error("failed to read block_by_root request: %v from %s", err, peerId.String()) + } else { + log.Infof("dropped block_by_root request %v", *blockRoot) + } + } + m := methods.BlocksByRootRPCv2 + streamHandler := m.MakeStreamHandler(sCtxFn, comp, listenReq) + h.SetStreamHandler(m.Protocol, streamHandler) + log.Info("Started serving block_by_root") + // wait untill the ctx is down + <-en.ctx.Done() // TODO: do it better + log.Info("Stopped serving block_by_root") + }() +} diff --git a/pkg/networks/ethereum/rpc/methods/blobs.go b/pkg/networks/ethereum/rpc/methods/blobs.go new file mode 100644 index 0000000..5d921fa --- /dev/null +++ b/pkg/networks/ethereum/rpc/methods/blobs.go @@ -0,0 +1,140 @@ +package methods + +import ( + "encoding/hex" + "fmt" + "github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/reqresp" + "github.com/protolambda/ztyp/codec" + "github.com/protolambda/ztyp/tree" + "github.com/protolambda/ztyp/view" +) + +// https://github.com/ethereum/consensus-specs/blob/dev/specs/deneb/p2p-interface.md#blobsidecarsbyroot-v1 +const ( + MAX_BLOBS_PER_BLOCK int = 6 + MAX_BLOBS_PER_RPC_REQ int = MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK +) + +type BlobIdentifier struct { + BlockRoot Root + Index view.Uint64View +} + +func (blobId *BlobIdentifier) Deserialize(dr *codec.DecodingReader) error { + return dr.FixedLenContainer(&blobId.BlockRoot, &blobId.Index) +} + +func (blobId *BlobIdentifier) Serialize(w *codec.EncodingWriter) error { + return w.FixedLenContainer(&blobId.BlockRoot, &blobId.Index) +} + +func (blobId *BlobIdentifier) ByteLength() uint64 { + return blobId.BlockRoot.FixedLength() + blobId.Index.FixedLength() +} + +func (blobId *BlobIdentifier) FixedLength() uint64 { + return blobId.BlockRoot.FixedLength() + blobId.Index.FixedLength() +} + +func (blobId *BlobIdentifier) HashTreeRoot(hFn tree.HashFn) Root { + return hFn.HashTreeRoot(&blobId.BlockRoot, &blobId.Index) +} + +func (blobId *BlobIdentifier) String() string { + return fmt.Sprintf("%v", *blobId) +} + +type BlobByRootV1 []BlobIdentifier + +func (b BlobByRootV1) Deserialize(dr *codec.DecodingReader) error { + var idx int = 0 + return dr.List( + func() codec.Deserializable { + i := idx + idx++ + return &b[i] + }, + uint64(len(b)), + uint64(MAX_BLOBS_PER_RPC_REQ)) +} + +func (b BlobByRootV1) Serialize(w *codec.EncodingWriter) error { + return w.List(func(i uint64) codec.Serializable { + return &b[i] + }, + uint64(len(b)), + uint64(MAX_BLOBS_PER_RPC_REQ)) +} + +func (b BlobByRootV1) ByteLength() uint64 { + return uint64(len(b) * (32 + 8)) +} + +func (b BlobByRootV1) FixedLength() uint64 { + return 0 +} + +func (b BlobByRootV1) String() string { + if len(b) == 0 { + return "empty blobs-by-root request" + } + out := make([]byte, 0, len(b)*66) + for i, bId := range b { + hex.Encode(out[i*66:], bId.BlockRoot[:]) + out[(i+1)*66-2] = ',' + out[(i+1)*66-1] = ' ' + } + return "blobs-by-root requested: " + string(out[:len(out)-1]) +} + +type BlobsByRangeReqV1 struct { + StartSlot Slot + Count view.Uint64View +} + +func (b *BlobsByRangeReqV1) Data() map[string]interface{} { + return map[string]interface{}{ + "start_slot": b.StartSlot, + "count": b.Count, + } +} + +func (b *BlobsByRangeReqV1) Deserialize(dr *codec.DecodingReader) error { + return dr.FixedLenContainer(&b.StartSlot, &b.Count) +} + +func (b *BlobsByRangeReqV1) Serialize(w *codec.EncodingWriter) error { + return w.FixedLenContainer(&b.StartSlot, &b.Count) +} + +const blobsByRangeReqBytes uint64 = 8 + 8 + +func (b BlobsByRangeReqV1) ByteLength() uint64 { + return blobsByRangeReqBytes +} + +func (b *BlobsByRangeReqV1) FixedLength() uint64 { + return blobsByRangeReqBytes +} + +func (b *BlobsByRangeReqV1) HashTreeRoot(hFn tree.HashFn) Root { + return hFn.HashTreeRoot(&b.StartSlot, &b.Count) +} + +func (b *BlobsByRangeReqV1) String() string { + return fmt.Sprintf("%v", *b) +} + +var BlobsByRangeRPCv1 = reqresp.RPCMethod{ + Protocol: "/eth2/beacon_chain/req/blob_sidecars_by_range/1/ssz_snappy", + RequestCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return new(BlobsByRangeReqV1) }, blobsByRangeReqBytes, blobsByRangeReqBytes), + ResponseChunkCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return new(BlobsByRangeReqV1) }, 0, uint64(0)), + DefaultResponseChunkCount: 20, +} + +var BlobsByRootRPCv1 = reqresp.RPCMethod{ + Protocol: "/eth2/beacon_chain/req/blob_sidecars_by_root/1/ssz_snappy", + RequestCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return new(BlobByRootV1) }, 0, uint64((32+8)*MAX_BLOBS_PER_RPC_REQ)), + ResponseChunkCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return new(BlobByRootV1) }, 0, uint64(0)), + DefaultResponseChunkCount: 20, +} diff --git a/pkg/networks/ethereum/rpc/methods/blocks.go b/pkg/networks/ethereum/rpc/methods/blocks.go index 950368b..5fd5eb3 100644 --- a/pkg/networks/ethereum/rpc/methods/blocks.go +++ b/pkg/networks/ethereum/rpc/methods/blocks.go @@ -3,43 +3,21 @@ package methods import ( "encoding/hex" "fmt" - "github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/reqresp" - "github.com/protolambda/zrnt/eth2/beacon" - "github.com/protolambda/zrnt/eth2/beacon/common" "github.com/protolambda/ztyp/codec" "github.com/protolambda/ztyp/tree" "github.com/protolambda/ztyp/view" ) -// instead of parsing the whole body, we can just leave it as bytes. -type BeaconBlockBodyRaw []byte - -func (b *BeaconBlockBodyRaw) Limit() uint64 { - // just cap block body size at 1 MB - return 1 << 20 -} - -type BeaconBlock struct { - Slot Slot - ProposerIndex ValidatorIndex - ParentRoot Root - StateRoot Root - Body BeaconBlockBodyRaw -} - -type SignedBeaconBlock struct { - Message BeaconBlock - Signature BLSSignature -} +const MAX_REQUEST_BLOCKS_DENEB int = 128 -type BlocksByRangeReqV1 struct { +type BlocksByRangeReqV2 struct { StartSlot Slot Count view.Uint64View Step view.Uint64View } -func (r *BlocksByRangeReqV1) Data() map[string]interface{} { +func (r *BlocksByRangeReqV2) Data() map[string]interface{} { return map[string]interface{}{ "start_slot": r.StartSlot, "count": r.Count, @@ -47,41 +25,32 @@ func (r *BlocksByRangeReqV1) Data() map[string]interface{} { } } -func (d *BlocksByRangeReqV1) Deserialize(dr *codec.DecodingReader) error { +func (d *BlocksByRangeReqV2) Deserialize(dr *codec.DecodingReader) error { return dr.FixedLenContainer(&d.StartSlot, &d.Count, &d.Step) } -func (d *BlocksByRangeReqV1) Serialize(w *codec.EncodingWriter) error { +func (d *BlocksByRangeReqV2) Serialize(w *codec.EncodingWriter) error { return w.FixedLenContainer(&d.StartSlot, &d.Count, &d.Step) } const blocksByRangeReqByteLen = 8 + 8 + 8 -func (d BlocksByRangeReqV1) ByteLength() uint64 { +func (d *BlocksByRangeReqV2) ByteLength() uint64 { return blocksByRangeReqByteLen } -func (*BlocksByRangeReqV1) FixedLength() uint64 { +func (*BlocksByRangeReqV2) FixedLength() uint64 { return blocksByRangeReqByteLen } -func (d *BlocksByRangeReqV1) HashTreeRoot(hFn tree.HashFn) Root { +func (d *BlocksByRangeReqV2) HashTreeRoot(hFn tree.HashFn) Root { return hFn.HashTreeRoot(&d.StartSlot, &d.Count, &d.Step) } -func (r *BlocksByRangeReqV1) String() string { +func (r *BlocksByRangeReqV2) String() string { return fmt.Sprintf("%v", *r) } -func BlocksByRangeRPCv1(spec *common.Spec, opcBlock beacon.OpaqueBlock) *reqresp.RPCMethod { - return &reqresp.RPCMethod{ - Protocol: "/eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz_snappy", - RequestCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return new(BlocksByRangeReqV1) }, blocksByRangeReqByteLen, blocksByRangeReqByteLen), - ResponseChunkCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return spec.Wrap(opcBlock) }, 0, opcBlock.ByteLength(spec)), - DefaultResponseChunkCount: 20, - } -} - const MAX_REQUEST_BLOCKS_BY_ROOT = 1024 type BlocksByRootReq []Root @@ -123,11 +92,18 @@ func (r BlocksByRootReq) String() string { return "blocks-by-root requested: " + string(out[:len(out)-1]) } -func BlocksByRootRPCv1(spec *common.Spec, opcBlock beacon.OpaqueBlock) *reqresp.RPCMethod { - return &reqresp.RPCMethod{ - Protocol: "/eth2/beacon_chain/req/beacon_blocks_by_root/1/ssz", - RequestCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return new(BlocksByRootReq) }, 0, 32*MAX_REQUEST_BLOCKS_BY_ROOT), - ResponseChunkCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return spec.Wrap(opcBlock) }, 0, opcBlock.ByteLength(spec)), - DefaultResponseChunkCount: 20, - } +// methods + +var BlocksByRangeRPCv2 = reqresp.RPCMethod{ + Protocol: "/eth2/beacon_chain/req/beacon_blocks_by_range/2/ssz_snappy", + RequestCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return new(BlocksByRangeReqV2) }, blocksByRangeReqByteLen, blocksByRangeReqByteLen), + ResponseChunkCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return new(BlocksByRangeReqV2) }, 0, uint64(0)), + DefaultResponseChunkCount: 20, +} + +var BlocksByRootRPCv2 = reqresp.RPCMethod{ + Protocol: "/eth2/beacon_chain/req/beacon_blocks_by_root/2/ssz_snappy", + RequestCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return new(BlocksByRootReq) }, 0, 32*MAX_REQUEST_BLOCKS_BY_ROOT), + ResponseChunkCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return new(BlocksByRootReq) }, 0, uint64(0)), + DefaultResponseChunkCount: 20, } From 48c61e1ca6c943fbf860b9e86411d054e5251c16 Mon Sep 17 00:00:00 2001 From: Mikel Cortes Date: Thu, 28 Mar 2024 13:33:01 +0100 Subject: [PATCH 2/4] first approach to update gossip messages/handlers --- pkg/events/events.go | 2 +- pkg/networks/ethereum/beacon_blobs.go | 66 ----- pkg/networks/ethereum/beacon_blocks.go | 66 ----- pkg/networks/ethereum/beacon_metadata.go | 101 ------- pkg/networks/ethereum/beacon_rpc.go | 74 ----- pkg/networks/ethereum/beacon_rpcs.go | 351 +++++++++++++++++++++++ pkg/networks/ethereum/beacon_status.go | 92 ------ pkg/networks/ethereum/gossip_handlers.go | 289 ++++++++++++++++--- pkg/networks/ethereum/gossip_messages.go | 146 +++++++--- pkg/networks/ethereum/gossip_topics.go | 93 ++++++ pkg/networks/ethereum/network_info.go | 88 +----- 11 files changed, 814 insertions(+), 554 deletions(-) delete mode 100644 pkg/networks/ethereum/beacon_blobs.go delete mode 100644 pkg/networks/ethereum/beacon_blocks.go delete mode 100644 pkg/networks/ethereum/beacon_metadata.go delete mode 100644 pkg/networks/ethereum/beacon_rpc.go create mode 100644 pkg/networks/ethereum/beacon_rpcs.go delete mode 100644 pkg/networks/ethereum/beacon_status.go create mode 100644 pkg/networks/ethereum/gossip_topics.go diff --git a/pkg/events/events.go b/pkg/events/events.go index 20b48b6..175bfe4 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -147,7 +147,7 @@ func (f *Forwarder) processAttestationEvent(e *ethereum.AttestationReceievedEven if err := f.publishTimedEthereumAttestation(&TimedEthereumAttestation{ Attestation: e.Attestation, AttestationExtraData: &AttestationExtraData{ - ArrivedAt: e.TrackedAttestation.ArrivalTime, + ArrivedAt: e.TrackedAttestation.Time, P2PMsgID: e.TrackedAttestation.MsgID, Subnet: e.TrackedAttestation.Subnet, TimeInSlot: e.TrackedAttestation.TimeInSlot, diff --git a/pkg/networks/ethereum/beacon_blobs.go b/pkg/networks/ethereum/beacon_blobs.go deleted file mode 100644 index 50f5962..0000000 --- a/pkg/networks/ethereum/beacon_blobs.go +++ /dev/null @@ -1,66 +0,0 @@ -package ethereum - -import ( - "context" - - "github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/methods" - "github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/reqresp" - - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/peer" - log "github.com/sirupsen/logrus" -) - -func (en *LocalEthereumNode) ServeBeaconBlobsByRangeV1(h host.Host) { - go func() { - sCtxFn := func() context.Context { - reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout) - return reqCtx - } - comp := new(reqresp.SnappyCompression) - listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) { - blobsRange := new(methods.BlobsByRangeReqV1) - err := handler.ReadRequest(blobsRange) - if err != nil { - _ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse blobs_by_range request") - log.Errorf("failed to read blobs_by_range request: %v from %s", err, peerId.String()) - } else { - log.Info("dropped blobs_by_range request", *blobsRange) - } - } - b := methods.BlobsByRangeRPCv1 - streamHandler := b.MakeStreamHandler(sCtxFn, comp, listenReq) - h.SetStreamHandler(b.Protocol, streamHandler) - log.Info("Started serving blobs_by_range") - // wait untill the ctx is down - <-en.ctx.Done() // TODO: do it better - log.Info("Stopped serving blobs_by_range") - }() -} - -func (en *LocalEthereumNode) ServeBeaconBlobsByRootV1(h host.Host) { - go func() { - sCtxFn := func() context.Context { - reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout) - return reqCtx - } - comp := new(reqresp.SnappyCompression) - listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) { - blobRoots := new(methods.BlobByRootV1) - err := handler.ReadRequest(blobRoots) - if err != nil { - _ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse blobs_by_root request") - log.Errorf("failed to read blobs_by_root request: %v from %s", err, peerId.String()) - } else { - log.Info("dropped blobs_by_root request", *blobRoots) - } - } - b := methods.BlobsByRootRPCv1 - streamHandler := b.MakeStreamHandler(sCtxFn, comp, listenReq) - h.SetStreamHandler(b.Protocol, streamHandler) - log.Info("Started serving blobs_by_root") - // wait untill the ctx is down - <-en.ctx.Done() // TODO: do it better - log.Info("Stopped serving blobs_by_root") - }() -} diff --git a/pkg/networks/ethereum/beacon_blocks.go b/pkg/networks/ethereum/beacon_blocks.go deleted file mode 100644 index 76918f0..0000000 --- a/pkg/networks/ethereum/beacon_blocks.go +++ /dev/null @@ -1,66 +0,0 @@ -package ethereum - -import ( - "context" - - "github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/methods" - "github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/reqresp" - - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/peer" - log "github.com/sirupsen/logrus" -) - -func (en *LocalEthereumNode) ServeBeaconBlocksByRangeV2(h host.Host) { - go func() { - sCtxFn := func() context.Context { - reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout) - return reqCtx - } - comp := new(reqresp.SnappyCompression) - listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) { - blockRange := new(methods.BlocksByRootReq) - err := handler.ReadRequest(blockRange) - if err != nil { - _ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse block_by_range request") - log.Errorf("failed to read block_by_range request: %v from %s", err, peerId.String()) - } else { - log.Infof("dropped block_by_range request %v", *blockRange) - } - } - m := methods.BlocksByRangeRPCv2 - streamHandler := m.MakeStreamHandler(sCtxFn, comp, listenReq) - h.SetStreamHandler(m.Protocol, streamHandler) - log.Info("Started serving block_by_range") - // wait untill the ctx is down - <-en.ctx.Done() // TODO: do it better - log.Info("Stopped serving block_by_range") - }() -} - -func (en *LocalEthereumNode) ServeBeaconBlocksByRootV2(h host.Host) { - go func() { - sCtxFn := func() context.Context { - reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout) - return reqCtx - } - comp := new(reqresp.SnappyCompression) - listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) { - blockRoot := new(methods.BlocksByRootReq) - err := handler.ReadRequest(blockRoot) - if err != nil { - _ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse block_by_root request") - log.Error("failed to read block_by_root request: %v from %s", err, peerId.String()) - } else { - log.Infof("dropped block_by_root request %v", *blockRoot) - } - } - m := methods.BlocksByRootRPCv2 - streamHandler := m.MakeStreamHandler(sCtxFn, comp, listenReq) - h.SetStreamHandler(m.Protocol, streamHandler) - log.Info("Started serving block_by_root") - // wait untill the ctx is down - <-en.ctx.Done() // TODO: do it better - log.Info("Stopped serving block_by_root") - }() -} diff --git a/pkg/networks/ethereum/beacon_metadata.go b/pkg/networks/ethereum/beacon_metadata.go deleted file mode 100644 index 8304cde..0000000 --- a/pkg/networks/ethereum/beacon_metadata.go +++ /dev/null @@ -1,101 +0,0 @@ -package ethereum - -import ( - "context" - "encoding/hex" - "sync" - - "github.com/pkg/errors" - - "github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/methods" - "github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/reqresp" - - log "github.com/sirupsen/logrus" - - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/protolambda/zrnt/eth2/beacon/common" -) - -// ReqBeaconMetadata opens a new Stream from the given host to send a RPC reqresping the BeaconStatus of the given peer.ID. -// Returns the BeaconStatus of the given peer if succeed, error if failed. -func (en *LocalEthereumNode) ReqBeaconMetadata( - ctx context.Context, - wg *sync.WaitGroup, - h host.Host, - peerID peer.ID, - result *common.MetaData, - finErr *error) { - - defer wg.Done() - - // declare the result output of the RPC call - var metadata common.MetaData - - attnets := new(common.AttnetBits) - bytes, err := hex.DecodeString("ffffffffffffffff") //TODO: remove hardcodding!!!! - if err != nil { - log.Panic("unable to decode ForkDigest", err.Error()) - } - attnets.UnmarshalText(bytes) - - // Generate the Server Error Code - var resCode reqresp.ResponseCode // error by default - // record the error into the error channel - err = methods.MetaDataRPCv2NoSnappy.RunRequest(ctx, h.NewStream, peerID, new(reqresp.SnappyCompression), - reqresp.RequestSSZInput{Obj: nil}, 1, - func() error { - return nil - }, - func(chunk reqresp.ChunkedResponseHandler) error { - resCode = chunk.ResultCode() - switch resCode { - case reqresp.ServerErrCode, reqresp.InvalidReqCode: - msg, err := chunk.ReadErrMsg() - if err != nil { - return errors.Errorf("error reqresping BeaconMetadata RPC: %s", msg) - } - case reqresp.SuccessCode: - if err := chunk.ReadObj(&metadata); err != nil { - return errors.Wrap(err, "from reqresping BeaconMetadata RPC") - } - default: - return errors.New("unexpected result code for BeaconMetadata RPC reqresp") - } - return nil - }) - *finErr = err - *result = metadata -} - -func (en *LocalEthereumNode) ServeBeaconMetadata(h host.Host) { - - go func() { - sCtxFn := func() context.Context { - reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout) - return reqCtx - } - comp := new(reqresp.SnappyCompression) - listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) { - var reqMetadata common.MetaData - err := handler.ReadRequest(&reqMetadata) - if err != nil { - _ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse status request") - log.Tracef("failed to read metadata request: %v from %s", err, peerId.String()) - } else { - if err := handler.WriteResponseChunk(reqresp.SuccessCode, &en.LocalMetadata); err != nil { - log.Tracef("failed to respond to metadata request: %v", err) - } else { - log.Tracef("handled metadata request") - } - } - } - m := methods.MetaDataRPCv2 - streamHandler := m.MakeStreamHandler(sCtxFn, comp, listenReq) - h.SetStreamHandler(m.Protocol, streamHandler) - log.Info("Started serving Beacon Metadata") - // wait untill the ctx is down - <-en.ctx.Done() // TODO: do it better - log.Info("Stopped serving Beacon Metadata") - }() -} diff --git a/pkg/networks/ethereum/beacon_rpc.go b/pkg/networks/ethereum/beacon_rpc.go deleted file mode 100644 index 51a0bb4..0000000 --- a/pkg/networks/ethereum/beacon_rpc.go +++ /dev/null @@ -1,74 +0,0 @@ -package ethereum - -import ( - "context" - - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/methods" - "github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/reqresp" - "github.com/protolambda/zrnt/eth2/beacon/common" - log "github.com/sirupsen/logrus" -) - -func (en *LocalEthereumNode) ServeBeaconPing(h host.Host) { - go func() { - sCtxFn := func() context.Context { - reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout) - return reqCtx - } - comp := new(reqresp.SnappyCompression) - listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) { - var ping common.Ping - err := handler.ReadRequest(&ping) - if err != nil { - _ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse ping request") - log.Tracef("failed to read ping request: %v from %s", err, peerId.String()) - } else { - if err := handler.WriteResponseChunk(reqresp.SuccessCode, &ping); err != nil { - log.Tracef("failed to respond to ping request: %v", err) - } else { - log.Tracef("handled ping request", ping) - } - } - } - m := methods.PingRPCv1 - streamHandler := m.MakeStreamHandler(sCtxFn, comp, listenReq) - h.SetStreamHandler(m.Protocol, streamHandler) - log.Info("Started serving ping") - // wait untill the ctx is down - <-en.ctx.Done() // TODO: do it better - log.Info("Stopped serving ping") - }() -} - -func (en *LocalEthereumNode) ServeBeaconGoodbye(h host.Host) { - go func() { - sCtxFn := func() context.Context { - reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout) - return reqCtx - } - comp := new(reqresp.SnappyCompression) - listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) { - var goodbye common.Goodbye - err := handler.ReadRequest(&goodbye) - if err != nil { - _ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse goodbye request") - log.Tracef("failed to read goodbye request: %v from %s", err, peerId.String()) - } else { - if err := handler.WriteResponseChunk(reqresp.SuccessCode, &goodbye); err != nil { - log.Tracef("failed to respond to goodbye request: %v", err) - } else { - log.Tracef("handled goodbye request", goodbye) - } - } - } - m := methods.GoodbyeRPCv1 - streamHandler := m.MakeStreamHandler(sCtxFn, comp, listenReq) - h.SetStreamHandler(m.Protocol, streamHandler) - log.Info("Started serving goodbye") - // wait untill the ctx is down - <-en.ctx.Done() // TODO: do it better - log.Info("Stopped serving goodbye") - }() -} diff --git a/pkg/networks/ethereum/beacon_rpcs.go b/pkg/networks/ethereum/beacon_rpcs.go new file mode 100644 index 0000000..74e6dec --- /dev/null +++ b/pkg/networks/ethereum/beacon_rpcs.go @@ -0,0 +1,351 @@ +package ethereum + +import ( + "context" + "encoding/hex" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "sync" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/methods" + "github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/reqresp" + "github.com/protolambda/zrnt/eth2/beacon/common" +) + +// BeaconPing +func (en *LocalEthereumNode) ServeBeaconPing(h host.Host) { + go func() { + sCtxFn := func() context.Context { + reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout) + return reqCtx + } + comp := new(reqresp.SnappyCompression) + listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) { + var ping common.Ping + err := handler.ReadRequest(&ping) + if err != nil { + _ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse ping request") + log.Tracef("failed to read ping request: %v from %s", err, peerId.String()) + } else { + if err := handler.WriteResponseChunk(reqresp.SuccessCode, &ping); err != nil { + log.Tracef("failed to respond to ping request: %v", err) + } else { + log.Tracef("handled ping request", ping) + } + } + } + m := methods.PingRPCv1 + streamHandler := m.MakeStreamHandler(sCtxFn, comp, listenReq) + h.SetStreamHandler(m.Protocol, streamHandler) + log.Info("Started serving ping") + // wait until the ctx is down + <-en.ctx.Done() // TODO: do it better + log.Info("Stopped serving ping") + }() +} + +// BeaconGoodbye +func (en *LocalEthereumNode) ServeBeaconGoodbye(h host.Host) { + go func() { + sCtxFn := func() context.Context { + reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout) + return reqCtx + } + comp := new(reqresp.SnappyCompression) + listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) { + var goodbye common.Goodbye + err := handler.ReadRequest(&goodbye) + if err != nil { + _ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse goodbye request") + log.Tracef("failed to read goodbye request: %v from %s", err, peerId.String()) + } else { + if err := handler.WriteResponseChunk(reqresp.SuccessCode, &goodbye); err != nil { + log.Tracef("failed to respond to goodbye request: %v", err) + } else { + log.Tracef("handled goodbye request", goodbye) + } + } + } + m := methods.GoodbyeRPCv1 + streamHandler := m.MakeStreamHandler(sCtxFn, comp, listenReq) + h.SetStreamHandler(m.Protocol, streamHandler) + log.Info("Started serving goodbye") + // wait until the ctx is down + <-en.ctx.Done() // TODO: do it better + log.Info("Stopped serving goodbye") + }() +} + +// Metadata +// ReqBeaconMetadata opens a new Stream from the given host to send a RPC reqresping the BeaconStatus of the given peer.ID. +// Returns the BeaconStatus of the given peer if succeed, error if failed. +func (en *LocalEthereumNode) ReqBeaconMetadata( + ctx context.Context, + wg *sync.WaitGroup, + h host.Host, + peerID peer.ID, + result *common.MetaData, + finErr *error) { + + defer wg.Done() + + // declare the result output of the RPC call + var metadata common.MetaData + + attnets := new(common.AttnetBits) + bytes, err := hex.DecodeString("ffffffffffffffff") //TODO: remove hardcodding!!!! + if err != nil { + log.Panic("unable to decode ForkDigest", err.Error()) + } + attnets.UnmarshalText(bytes) + + // Generate the Server Error Code + var resCode reqresp.ResponseCode // error by default + // record the error into the error channel + err = methods.MetaDataRPCv2NoSnappy.RunRequest(ctx, h.NewStream, peerID, new(reqresp.SnappyCompression), + reqresp.RequestSSZInput{Obj: nil}, 1, + func() error { + return nil + }, + func(chunk reqresp.ChunkedResponseHandler) error { + resCode = chunk.ResultCode() + switch resCode { + case reqresp.ServerErrCode, reqresp.InvalidReqCode: + msg, err := chunk.ReadErrMsg() + if err != nil { + return errors.Errorf("error reqresping BeaconMetadata RPC: %s", msg) + } + case reqresp.SuccessCode: + if err := chunk.ReadObj(&metadata); err != nil { + return errors.Wrap(err, "from reqresping BeaconMetadata RPC") + } + default: + return errors.New("unexpected result code for BeaconMetadata RPC reqresp") + } + return nil + }) + *finErr = err + *result = metadata +} + +func (en *LocalEthereumNode) ServeBeaconMetadata(h host.Host) { + + go func() { + sCtxFn := func() context.Context { + reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout) + return reqCtx + } + comp := new(reqresp.SnappyCompression) + listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) { + var reqMetadata common.MetaData + err := handler.ReadRequest(&reqMetadata) + if err != nil { + _ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse status request") + log.Tracef("failed to read metadata request: %v from %s", err, peerId.String()) + } else { + if err := handler.WriteResponseChunk(reqresp.SuccessCode, &en.LocalMetadata); err != nil { + log.Tracef("failed to respond to metadata request: %v", err) + } else { + log.Tracef("handled metadata request") + } + } + } + m := methods.MetaDataRPCv2 + streamHandler := m.MakeStreamHandler(sCtxFn, comp, listenReq) + h.SetStreamHandler(m.Protocol, streamHandler) + log.Info("Started serving Beacon Metadata") + // wait until the ctx is down + <-en.ctx.Done() // TODO: do it better + log.Info("Stopped serving Beacon Metadata") + }() +} + +// Status +// ReqBeaconStatus opens a new Stream from the given host to send a RPC reqresping the BeaconStatus of the given peer.ID. +// Returns the BeaconStatus of the given peer if succeed, error if failed. +func (en *LocalEthereumNode) ReqBeaconStatus( + ctx context.Context, + wg *sync.WaitGroup, + h host.Host, + peerID peer.ID, + result *common.Status, + finErr *error) { + + defer wg.Done() + // declare the result obj of the RPC call + var remoteStatus common.Status + + var resCode reqresp.ResponseCode // error by default + err := methods.StatusRPCv1NoSnappy.RunRequest(ctx, h.NewStream, peerID, new(reqresp.SnappyCompression), + reqresp.RequestSSZInput{Obj: &en.LocalStatus}, 1, + func() error { + return nil + }, + func(chunk reqresp.ChunkedResponseHandler) error { + resCode = chunk.ResultCode() + switch resCode { + case reqresp.ServerErrCode, reqresp.InvalidReqCode: + msg, err := chunk.ReadErrMsg() + if err != nil { + return errors.Wrap(err, msg) + } + case reqresp.SuccessCode: + if err := chunk.ReadObj(&remoteStatus); err != nil { + return err + } + default: + return errors.New("unexpected result code") + } + return nil + }) + *finErr = err + *result = remoteStatus + // update the local status with the remote one if the local on is older + en.UpdateStatus(remoteStatus) +} + +func (en *LocalEthereumNode) ServeBeaconStatus(h host.Host) { + + go func() { + sCtxFn := func() context.Context { + reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout) + return reqCtx + } + comp := new(reqresp.SnappyCompression) + listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) { + var reqStatus common.Status + err := handler.ReadRequest(&reqStatus) + if err != nil { + _ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse status request") + log.Tracef("failed to read status request: %v from %s", err, peerId.String()) + } else { + if err := handler.WriteResponseChunk(reqresp.SuccessCode, &en.LocalStatus); err != nil { + log.Tracef("failed to respond to status request: %v", err) + } else { + // update if possible out status + en.UpdateStatus(reqStatus) + log.Tracef("handled status request") + } + } + } + m := methods.StatusRPCv1 + streamHandler := m.MakeStreamHandler(sCtxFn, comp, listenReq) + h.SetStreamHandler(m.Protocol, streamHandler) + log.Info("Started serving Beacon Status") + // wait until the ctx is down + <-en.ctx.Done() // TODO: do it better + log.Info("Stopped serving Beacon Status") + }() +} + +// Blocks +func (en *LocalEthereumNode) ServeBeaconBlocksByRangeV2(h host.Host) { + go func() { + sCtxFn := func() context.Context { + reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout) + return reqCtx + } + comp := new(reqresp.SnappyCompression) + listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) { + blockRange := new(methods.BlocksByRootReq) + err := handler.ReadRequest(blockRange) + if err != nil { + _ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse block_by_range request") + log.Errorf("failed to read block_by_range request: %v from %s", err, peerId.String()) + } else { + log.Infof("dropped block_by_range request %v", *blockRange) + } + } + m := methods.BlocksByRangeRPCv2 + streamHandler := m.MakeStreamHandler(sCtxFn, comp, listenReq) + h.SetStreamHandler(m.Protocol, streamHandler) + log.Info("Started serving block_by_range") + // wait until the ctx is down + <-en.ctx.Done() // TODO: do it better + log.Info("Stopped serving block_by_range") + }() +} + +func (en *LocalEthereumNode) ServeBeaconBlocksByRootV2(h host.Host) { + go func() { + sCtxFn := func() context.Context { + reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout) + return reqCtx + } + comp := new(reqresp.SnappyCompression) + listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) { + blockRoot := new(methods.BlocksByRootReq) + err := handler.ReadRequest(blockRoot) + if err != nil { + _ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse block_by_root request") + log.Error("failed to read block_by_root request: %v from %s", err, peerId.String()) + } else { + log.Infof("dropped block_by_root request %v", *blockRoot) + } + } + m := methods.BlocksByRootRPCv2 + streamHandler := m.MakeStreamHandler(sCtxFn, comp, listenReq) + h.SetStreamHandler(m.Protocol, streamHandler) + log.Info("Started serving block_by_root") + // wait until the ctx is down + <-en.ctx.Done() // TODO: do it better + log.Info("Stopped serving block_by_root") + }() +} + +// Blobs +func (en *LocalEthereumNode) ServeBeaconBlobsByRangeV1(h host.Host) { + go func() { + sCtxFn := func() context.Context { + reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout) + return reqCtx + } + comp := new(reqresp.SnappyCompression) + listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) { + blobsRange := new(methods.BlobsByRangeReqV1) + err := handler.ReadRequest(blobsRange) + if err != nil { + _ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse blobs_by_range request") + log.Errorf("failed to read blobs_by_range request: %v from %s", err, peerId.String()) + } else { + log.Info("dropped blobs_by_range request", *blobsRange) + } + } + b := methods.BlobsByRangeRPCv1 + streamHandler := b.MakeStreamHandler(sCtxFn, comp, listenReq) + h.SetStreamHandler(b.Protocol, streamHandler) + log.Info("Started serving blobs_by_range") + // wait until the ctx is down + <-en.ctx.Done() // TODO: do it better + log.Info("Stopped serving blobs_by_range") + }() +} + +func (en *LocalEthereumNode) ServeBeaconBlobsByRootV1(h host.Host) { + go func() { + sCtxFn := func() context.Context { + reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout) + return reqCtx + } + comp := new(reqresp.SnappyCompression) + listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) { + blobRoots := new(methods.BlobByRootV1) + err := handler.ReadRequest(blobRoots) + if err != nil { + _ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse blobs_by_root request") + log.Errorf("failed to read blobs_by_root request: %v from %s", err, peerId.String()) + } else { + log.Info("dropped blobs_by_root request", *blobRoots) + } + } + b := methods.BlobsByRootRPCv1 + streamHandler := b.MakeStreamHandler(sCtxFn, comp, listenReq) + h.SetStreamHandler(b.Protocol, streamHandler) + log.Info("Started serving blobs_by_root") + // wait until the ctx is down + <-en.ctx.Done() // TODO: do it better + log.Info("Stopped serving blobs_by_root") + }() +} diff --git a/pkg/networks/ethereum/beacon_status.go b/pkg/networks/ethereum/beacon_status.go deleted file mode 100644 index e5e62c3..0000000 --- a/pkg/networks/ethereum/beacon_status.go +++ /dev/null @@ -1,92 +0,0 @@ -package ethereum - -import ( - "context" - "sync" - - "github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/methods" - "github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/reqresp" - "github.com/pkg/errors" - log "github.com/sirupsen/logrus" - - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/protolambda/zrnt/eth2/beacon/common" -) - -// ReqBeaconStatus opens a new Stream from the given host to send a RPC reqresping the BeaconStatus of the given peer.ID. -// Returns the BeaconStatus of the given peer if succeed, error if failed. -func (en *LocalEthereumNode) ReqBeaconStatus( - ctx context.Context, - wg *sync.WaitGroup, - h host.Host, - peerID peer.ID, - result *common.Status, - finErr *error) { - - defer wg.Done() - // declare the result obj of the RPC call - var remoteStatus common.Status - - var resCode reqresp.ResponseCode // error by default - err := methods.StatusRPCv1NoSnappy.RunRequest(ctx, h.NewStream, peerID, new(reqresp.SnappyCompression), - reqresp.RequestSSZInput{Obj: &en.LocalStatus}, 1, - func() error { - return nil - }, - func(chunk reqresp.ChunkedResponseHandler) error { - resCode = chunk.ResultCode() - switch resCode { - case reqresp.ServerErrCode, reqresp.InvalidReqCode: - msg, err := chunk.ReadErrMsg() - if err != nil { - return errors.Wrap(err, msg) - } - case reqresp.SuccessCode: - if err := chunk.ReadObj(&remoteStatus); err != nil { - return err - } - default: - return errors.New("unexpected result code") - } - return nil - }) - *finErr = err - *result = remoteStatus - // update the local status with the remote one if the local on is older - en.UpdateStatus(remoteStatus) -} - -func (en *LocalEthereumNode) ServeBeaconStatus(h host.Host) { - - go func() { - sCtxFn := func() context.Context { - reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout) - return reqCtx - } - comp := new(reqresp.SnappyCompression) - listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) { - var reqStatus common.Status - err := handler.ReadRequest(&reqStatus) - if err != nil { - _ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse status request") - log.Tracef("failed to read status request: %v from %s", err, peerId.String()) - } else { - if err := handler.WriteResponseChunk(reqresp.SuccessCode, &en.LocalStatus); err != nil { - log.Tracef("failed to respond to status request: %v", err) - } else { - // update if possible out status - en.UpdateStatus(reqStatus) - log.Tracef("handled status request") - } - } - } - m := methods.StatusRPCv1 - streamHandler := m.MakeStreamHandler(sCtxFn, comp, listenReq) - h.SetStreamHandler(m.Protocol, streamHandler) - log.Info("Started serving Beacon Status") - // wait untill the ctx is down - <-en.ctx.Done() // TODO: do it better - log.Info("Stopped serving Beacon Status") - }() -} diff --git a/pkg/networks/ethereum/gossip_handlers.go b/pkg/networks/ethereum/gossip_handlers.go index 032d657..00ac976 100644 --- a/pkg/networks/ethereum/gossip_handlers.go +++ b/pkg/networks/ethereum/gossip_handlers.go @@ -3,14 +3,16 @@ package ethereum import ( "bytes" "fmt" + "github.com/protolambda/zrnt/eth2/beacon/altair" + "github.com/protolambda/zrnt/eth2/beacon/deneb" "github.com/protolambda/zrnt/eth2/beacon/phase0" + "sync" "time" // bls "github.com/phoreproject/github.com/bls/g1pubs" "github.com/migalabs/armiarma/pkg/gossipsub" "github.com/protolambda/zrnt/eth2/beacon/common" - "github.com/protolambda/zrnt/eth2/beacon/deneb" "github.com/protolambda/zrnt/eth2/configs" "github.com/protolambda/ztyp/codec" @@ -35,13 +37,15 @@ type EthMessageHandler struct { genesisTime time.Time pubkeys []*common.BLSPubkey // pubkeys of those validators we want to track - attestationCallbacks []func(event *AttestationReceievedEvent) + callbackM sync.RWMutex + messageCallbacks map[EthereumGossipTopic][]func(event interface{}) } func NewEthMessageHandler(genesis time.Time, pubkeysStr []string) (*EthMessageHandler, error) { subHandler := &EthMessageHandler{ - genesisTime: genesis, - pubkeys: make([]*common.BLSPubkey, 0, len(pubkeysStr)), + genesisTime: genesis, + pubkeys: make([]*common.BLSPubkey, 0, len(pubkeysStr)), + messageCallbacks: make(map[EthereumGossipTopic][]func(event interface{})), } // parse pubkeys for _, pubkeyStr := range pubkeysStr { @@ -58,12 +62,67 @@ func NewEthMessageHandler(genesis time.Time, pubkeysStr []string) (*EthMessageHa return subHandler, nil } -func (s *EthMessageHandler) OnAttestation(fn func(event *AttestationReceievedEvent)) { - s.attestationCallbacks = append(s.attestationCallbacks, fn) +// AddCallBack makes the message sub-handling independent of the message read and agnostic to the topic +func (s *EthMessageHandler) AddCallback(topic EthereumGossipTopic, fn func(event interface{})) { + // check if there are already any existing callback + s.callbackM.Lock() + defer s.callbackM.Unlock() + callbacks, ok := s.messageCallbacks[topic] + if !ok { + callbacks = make([]func(event interface{}), 0) + } + callbacks = append(callbacks, fn) + s.messageCallbacks[topic] = callbacks +} + +func (mh *EthMessageHandler) getCallBacks(topic EthereumGossipTopic) ([]func(event interface{}), bool) { + mh.callbackM.RLock() + defer mh.callbackM.RUnlock() + callbacks, ok := mh.messageCallbacks[topic] + return callbacks, ok +} + +func (mh *EthMessageHandler) BeaconBlockMessageHandler(msg *pubsub.Message) (gossipsub.PersistableMsg, error) { + t := time.Now() + defer log.Trace("total time to handle msg:", time.Since(t)) + topic := *msg.Topic + + // extract the data from the raw message + msgBytes, err := EthMessageBaseHandler(topic, msg) + if err != nil { + return nil, err + } + msgBuf := bytes.NewBuffer(msgBytes) + bblock := new(deneb.SignedBeaconBlock) + + err = bblock.Deserialize(configs.Mainnet, codec.NewDecodingReader(msgBuf, uint64(len(msgBuf.Bytes())))) + if err != nil { + return nil, err + } + + trackedBlock := &TrackedBeaconBlock{ + TrackedMessage: TrackedMessage{ + Msg: bblock, + MsgID: msg.ID, + Time: msg.ArrivalTime, + Sender: msg.ReceivedFrom, + }, + TimeInSlot: GetTimeInSlot(mh.genesisTime, msg.ArrivalTime, int64(bblock.Message.Slot)), + ValIndex: uint64(bblock.Message.ProposerIndex), + Slot: uint64(bblock.Message.Slot), + } + // check if there is any callback + callbacks, ok := mh.getCallBacks(BeaconBlockTopic) + if ok { + for _, callback := range callbacks { + callback(&trackedBlock) // TODO: update to submite the + } + } + return trackedBlock, nil } // as reference https://github.com/protolambda/zrnt/blob/4ecaadfe0cb3c0a90d85e6a6dddcd3ebed0411b9/eth2/beacon/phase0/indexed.go#L99 -func (s *EthMessageHandler) SubnetMessageHandler(msg *pubsub.Message) (gossipsub.PersistableMsg, error) { +func (s *EthMessageHandler) AttestationSubnetMessageHandler(msg *pubsub.Message) (gossipsub.PersistableMsg, error) { t := time.Now() defer log.Trace("total time to handle msg:", time.Since(t)) @@ -100,36 +159,36 @@ func (s *EthMessageHandler) SubnetMessageHandler(msg *pubsub.Message) (gossipsub // verify if the hash of the message, the signature and the pubkeys of the list of validators match - subnet, err := GetSubnetFromTopic(*msg.Topic) + subnet, err := GetSubnetFromTopic("attestation", *msg.Topic) if err != nil { return nil, err } trackedAttestation := &TrackedAttestation{ - MsgID: msg.ID, - ArrivalTime: msg.ArrivalTime, - Subnet: subnet, - Slot: int64(attestation.Data.Slot), - TimeInSlot: GetTimeInSlot(s.genesisTime, msg.ArrivalTime, int64(attestation.Data.Slot)), - Sender: msg.ReceivedFrom, - ValPubkey: "", + TrackedMessage: TrackedMessage{ + Msg: attestation, + MsgID: msg.ID, + Time: msg.ArrivalTime, + Sender: msg.ReceivedFrom, + }, + Subnet: subnet, + Slot: uint64(attestation.Data.Slot), + TimeInSlot: GetTimeInSlot(s.genesisTime, msg.ArrivalTime, int64(attestation.Data.Slot)), + ValPubkey: "", } - // Publish the event - for _, fn := range s.attestationCallbacks { - // Warning: blocking call, but the only consumers of these "internal" events should be the "events" forwarder which will throw it - // in to a buffered channel. - fn(&AttestationReceievedEvent{ - Attestation: &attestation, - TrackedAttestation: trackedAttestation, - PeerID: msg.ReceivedFrom, - }) + callbacks, ok := s.getCallBacks(BeaconSubnetAttestationTopic) + if ok { + for _, callback := range callbacks { + // Warning: blocking call, but the only consumers of these "internal" events should be the "events" forwarder which will throw it + // in to a buffered channel. + callback(trackedAttestation) + } } - return trackedAttestation, nil } -func (mh *EthMessageHandler) BeaconBlockMessageHandler(msg *pubsub.Message) (gossipsub.PersistableMsg, error) { +func (mh *EthMessageHandler) BeaconAggregationAndProofMessageHandler(msg *pubsub.Message) (gossipsub.PersistableMsg, error) { t := time.Now() defer log.Trace("total time to handle msg:", time.Since(t)) topic := *msg.Topic @@ -140,21 +199,179 @@ func (mh *EthMessageHandler) BeaconBlockMessageHandler(msg *pubsub.Message) (gos return nil, err } msgBuf := bytes.NewBuffer(msgBytes) - bblock := new(deneb.SignedBeaconBlock) + aggregation := new(altair.SignedContributionAndProof) - err = bblock.Deserialize(configs.Mainnet, codec.NewDecodingReader(msgBuf, uint64(len(msgBuf.Bytes())))) + err = aggregation.Deserialize(configs.Mainnet, codec.NewDecodingReader(msgBuf, uint64(len(msgBuf.Bytes())))) if err != nil { return nil, err } - trackedBlock := &TrackedBeaconBlock{ - MsgID: msg.ID, - Sender: msg.ReceivedFrom, - ArrivalTime: msg.ArrivalTime, - TimeInSlot: GetTimeInSlot(mh.genesisTime, msg.ArrivalTime, int64(bblock.Message.Slot)), - ValIndex: int64(bblock.Message.ProposerIndex), - Slot: int64(bblock.Message.Slot), + trackedAggragation := &TrackedAggregateAndProof{ + TrackedMessage: TrackedMessage{ + Msg: aggregation, + MsgID: msg.ID, + Time: msg.ArrivalTime, + Sender: msg.ReceivedFrom, + }, + TimeInSlot: GetTimeInSlot(mh.genesisTime, msg.ArrivalTime, int64(aggregation.Message.Contribution.Slot)), + Slot: uint64(aggregation.Message.Contribution.Slot), } + // check if there is any callback + callbacks, ok := mh.getCallBacks(BeaconAggregateAndProofTopic) + if ok { + for _, callback := range callbacks { + callback(trackedAggragation) // TODO: update to submite the event + } + } + return trackedAggragation, nil +} - return trackedBlock, nil +func (mh *EthMessageHandler) VoluntaryExitMessageHandler(msg *pubsub.Message) (gossipsub.PersistableMsg, error) { + t := time.Now() + defer log.Trace("total time to handle msg:", time.Since(t)) + topic := *msg.Topic + + // extract the data from the raw message + msgBytes, err := EthMessageBaseHandler(topic, msg) + if err != nil { + return nil, err + } + msgBuf := bytes.NewBuffer(msgBytes) + voluntaryExit := new(phase0.SignedVoluntaryExit) + + err = voluntaryExit.Deserialize(codec.NewDecodingReader(msgBuf, uint64(len(msgBuf.Bytes())))) + if err != nil { + return nil, err + } + + trackedVoluntaryExit := &TrackedVoluntaryExit{ + TrackedMessage: TrackedMessage{ + Msg: voluntaryExit, + MsgID: msg.ID, + Time: msg.ArrivalTime, + Sender: msg.ReceivedFrom, + }, + ValIndex: uint64(voluntaryExit.Message.ValidatorIndex), + Epoch: uint64(voluntaryExit.Message.Epoch), + } + // check if there is any callback + callbacks, ok := mh.getCallBacks(BeaconVoluntaryExitTopic) + if ok { + for _, callback := range callbacks { + callback(trackedVoluntaryExit) // TODO: update to submite the event + } + } + return trackedVoluntaryExit, nil +} + +func (mh *EthMessageHandler) ProposerSlashingMessageHandler(msg *pubsub.Message) (gossipsub.PersistableMsg, error) { + t := time.Now() + defer log.Trace("total time to handle msg:", time.Since(t)) + topic := *msg.Topic + + // extract the data from the raw message + msgBytes, err := EthMessageBaseHandler(topic, msg) + if err != nil { + return nil, err + } + msgBuf := bytes.NewBuffer(msgBytes) + proposerSlashing := new(phase0.ProposerSlashing) + + err = proposerSlashing.Deserialize(codec.NewDecodingReader(msgBuf, uint64(len(msgBuf.Bytes())))) + if err != nil { + return nil, err + } + + trackedProposerSlashing := &TrackedProposerSlashing{ + TrackedMessage: TrackedMessage{ + Msg: proposerSlashing, + MsgID: msg.ID, + Time: msg.ArrivalTime, + Sender: msg.ReceivedFrom, + }, + ProposerIndex: uint64(proposerSlashing.SignedHeader1.Message.ProposerIndex), + Slot: uint64(proposerSlashing.SignedHeader1.Message.Slot), + } + // check if there is any callback + callbacks, ok := mh.getCallBacks(BeaconVoluntaryExitTopic) + if ok { + for _, callback := range callbacks { + callback(trackedProposerSlashing) // TODO: update to submite the event + } + } + return trackedProposerSlashing, nil +} + +func (mh *EthMessageHandler) TrackedSyncAggregate(msg *pubsub.Message) (gossipsub.PersistableMsg, error) { + t := time.Now() + defer log.Trace("total time to handle msg:", time.Since(t)) + topic := *msg.Topic + + // extract the data from the raw message + msgBytes, err := EthMessageBaseHandler(topic, msg) + if err != nil { + return nil, err + } + msgBuf := bytes.NewBuffer(msgBytes) + syncContribution := new(altair.ContributionAndProof) + + err = syncContribution.Deserialize(configs.Mainnet, codec.NewDecodingReader(msgBuf, uint64(len(msgBuf.Bytes())))) + if err != nil { + return nil, err + } + trackedSyncAggregate := &TrackedSyncAggregate{ + TrackedMessage: TrackedMessage{ + MsgID: msg.ID, + Time: msg.ArrivalTime, + Sender: msg.ReceivedFrom, + }, + AggragatorIndex: uint64(syncContribution.AggregatorIndex), + TimeInSlot: GetTimeInSlot(mh.genesisTime, msg.ArrivalTime, int64(syncContribution.Contribution.Slot)), + Slot: uint64(syncContribution.Contribution.Slot), + } + // check if there is any callback + callbacks, ok := mh.getCallBacks(BeaconSyncCommitteeAggregationTopic) + if ok { + for _, callback := range callbacks { + callback(trackedSyncAggregate) // TODO: update to submite the event + } + } + return trackedSyncAggregate, nil +} + +func (mh *EthMessageHandler) TrackedSyncVotes(msg *pubsub.Message) (gossipsub.PersistableMsg, error) { + t := time.Now() + defer log.Trace("total time to handle msg:", time.Since(t)) + topic := *msg.Topic + + // extract the data from the raw message + msgBytes, err := EthMessageBaseHandler(topic, msg) + if err != nil { + return nil, err + } + msgBuf := bytes.NewBuffer(msgBytes) + syncContribution := new(altair.SyncCommitteeMessage) + + err = syncContribution.Deserialize(codec.NewDecodingReader(msgBuf, uint64(len(msgBuf.Bytes())))) + if err != nil { + return nil, err + } + trackedSyncMsg := &TrackedSyncMessage{ + TrackedMessage: TrackedMessage{ + MsgID: msg.ID, + Time: msg.ArrivalTime, + Sender: msg.ReceivedFrom, + }, + ValIndex: uint64(syncContribution.ValidatorIndex), + TimeInSlot: GetTimeInSlot(mh.genesisTime, msg.ArrivalTime, int64(syncContribution.Slot)), + Slot: uint64(syncContribution.Slot), + } + // check if there is any callback + callbacks, ok := mh.getCallBacks(BeaconSubnetSyncCommitteeVoteTopic) + if ok { + for _, callback := range callbacks { + callback(trackedSyncMsg) // TODO: update to submite the event + } + } + return trackedSyncMsg, nil } diff --git a/pkg/networks/ethereum/gossip_messages.go b/pkg/networks/ethereum/gossip_messages.go index b66e400..9cefd23 100644 --- a/pkg/networks/ethereum/gossip_messages.go +++ b/pkg/networks/ethereum/gossip_messages.go @@ -1,8 +1,6 @@ package ethereum import ( - "regexp" - "strconv" "time" "github.com/libp2p/go-libp2p/core/peer" @@ -14,52 +12,134 @@ var ( ErrorNotParsableSubnet = errors.New("not parseable subnet int") ) -type TrackedAttestation struct { +// Tracked Message basis +type GossipSubTrackedMessage interface { + ReceivedFrom() peer.ID + MessageID() string + ArrivalTime() time.Time + Message() any // Not safe, but will work (should be marshaleable anyways) +} + +type TrackedMessage struct { MsgID string Sender peer.ID - Subnet int + Time time.Time + Msg any // Not safe, but will work (same as before) +} + +func (m *TrackedMessage) ReceivedFrom() peer.ID { + return m.Sender +} - ArrivalTime time.Time // time of arrival - TimeInSlot time.Duration // exact time inside the slot (range between 0secs and 12s*32slots) +func (m *TrackedMessage) MessageID() string { + return m.MsgID +} + +func (m *TrackedMessage) ArrivalTime() peer.ID { + return m.Sender +} - ValPubkey string - Slot int64 +func (m *TrackedMessage) Message() any { + return m.Msg +} + +// Ethereum Message-Specifics +// Beacon Block +type TrackedBeaconBlock struct { + TrackedMessage + TimeInSlot time.Duration // exact time inside the slot (range between 0secs and 12s*32slots) + ValIndex uint64 + Slot uint64 +} + +func (a *TrackedBeaconBlock) IsZero() bool { + return a.Slot == 0 +} + +// Attestations +type TrackedAttestation struct { + TrackedMessage + TimeInSlot time.Duration // exact time inside the slot (range between 0secs and 12s*32slots) + Subnet int + ValPubkey string + Slot uint64 } func (a *TrackedAttestation) IsZero() bool { return a.Slot == 0 } -type TrackedBeaconBlock struct { - MsgID string - Sender peer.ID +// Aggregations and Proofs +type TrackedAggregateAndProof struct { + TrackedMessage + TimeInSlot time.Duration // exact time inside the slot (range between 0secs and 12s*32slots) + Slot uint64 +} - ArrivalTime time.Time // time of arrival - TimeInSlot time.Duration // exact time inside the slot (range between 0secs and 12s*32slots) +func (a *TrackedAggregateAndProof) IsZero() bool { + return a.Slot == 0 +} - ValIndex int64 - Slot int64 +// Voluntarý Exits +type TrackedVoluntaryExit struct { + TrackedMessage + Epoch uint64 + ValIndex uint64 } -func (a *TrackedBeaconBlock) IsZero() bool { +func (a *TrackedVoluntaryExit) IsZero() bool { + return a.Epoch == 0 +} + +// Propose Slashing +type TrackedProposerSlashing struct { + TrackedMessage + Slot uint64 + ProposerIndex uint64 +} + +func (a *TrackedProposerSlashing) IsZero() bool { + return a.Slot == 0 +} + +// Attester Slashing +type TrackedAttesterSlashing struct { + TrackedMessage + Epoch uint64 + ValIndex uint64 +} + +func (a *TrackedAttesterSlashing) IsZero() bool { + return a.Epoch == 0 +} + +// SyncAggregations: https://github.com/protolambda/zrnt/blob/6bc42739f502a06171cc6f2378ec7aa556e41182/eth2/beacon/altair/sync_contribution.go#L14 +type TrackedSyncAggregate struct { + TrackedMessage + AggragatorIndex uint64 + TimeInSlot time.Duration + Slot uint64 +} + +func (a *TrackedSyncAggregate) IsZero() bool { return a.Slot == 0 } -func GetSubnetFromTopic(topic string) (int, error) { - re := regexp.MustCompile(`attestation_([0-9]+)`) - match := re.FindAllString(topic, -1) - if len(match) < 1 { - return -1, ErrorNoSubnet - } - - re2 := regexp.MustCompile("([0-9]+)") - match = re2.FindAllString(match[0], -1) - if len(match) < 1 { - return -1, ErrorNotParsableSubnet - } - subnet, err := strconv.Atoi(match[0]) - if err != nil { - return -1, errors.Wrap(err, "unable to conver subnet to int") - } - return subnet, nil +// SyncVote +type TrackedSyncMessage struct { + TrackedMessage + ValIndex uint64 + TimeInSlot time.Duration + Slot uint64 +} + +func (a *TrackedSyncMessage) IsZero() bool { + return a.Slot == 0 +} + +// BLS_Changes (TODO) + +// blobs (TODO: - zrnt doesn't include the blob struct, still looking for the time to implement the entire structure, the SSZ serialization, the view, the tree hashing, etc) +type TrackedBlobSidecards struct { + TrackedMessage } diff --git a/pkg/networks/ethereum/gossip_topics.go b/pkg/networks/ethereum/gossip_topics.go new file mode 100644 index 0000000..1f10a23 --- /dev/null +++ b/pkg/networks/ethereum/gossip_topics.go @@ -0,0 +1,93 @@ +package ethereum + +import ( + "fmt" + "regexp" + "strconv" + "strings" + + "github.com/pkg/errors" +) + +type EthereumGossipTopic uint8 + +const ( + // gossip topic types + BeaconBlockTopic EthereumGossipTopic = iota + BeaconAggregateAndProofTopic + BeaconSubnetAttestationTopic + BeaconVoluntaryExitTopic + BeaconProposerSlashingTopic + BeaconAttesterSlashingTopic + BeaconSyncCommitteeAggregationTopic + BeaconSubnetSyncCommitteeVoteTopic + BeaconBLSExectionChangesTopic + BeaconSubnetBlobsTopic + + // gossip topic bases + BeaconBlockTopicBase string = "beacon_block" + BeaconAggregateAndProofTopicBase string = "beacon_aggregate_and_proof" + VoluntaryExitTopicBase string = "voluntary_exit" + ProposerSlashingTopicBase string = "proposer_slashing" + AttesterSlashingTopicBase string = "attester_slashing" + AttestationSubnetsTopicBase string = "beacon_attestation_{__subnet_id__}" + SubnetLimit = 64 + SyncCommitteeAggregationsTopicBase string = "sync_committee_contribution_and_proof" + SyncCommitteeSubnetsTopicBase string = "sync_committee_{__subnet_id__}" + SyncCommitteeLimit = 4 + BLStoExectionChangeTopicBase string = "bls_to_execution_change" + BlobsSubnetsTopicBase string = "blob_sidecar_{__subnet_id__}" + + // encoding-compression + Encoding string = "ssz_snappy" +) + +// EthTopicPretty returns the topic based on its message type in a pretty version of it. +// It would return "beacon_block" out of the given "/eth2/b5303f2a/beacon_block/ssz_snappy" topic +func EthTopicPretty(eth2topic string) string { + return strings.Split(eth2topic, "/")[3] +} + +// GenerateEth2Topic returns the built topic out of the given arguments +func ComposeTopic(forkDigest string, messageTypeName string) string { + forkDigest = strings.Trim(forkDigest, ForkDigestPrefix) + // if we reach here, inputs were okay + return "/" + BlockchainName + + "/" + forkDigest + + "/" + messageTypeName + + "/" + Encoding +} + +// ComposeSubnetTopic generates the GossipSub topic for the given ForkDigest, base, and subnet +func ComposeSubnetTopic(base, forkDigest string, subnet int) string { + if subnet > SubnetLimit || subnet <= 0 { + return "" + } + + // trim "0x" if exists + forkDigest = strings.Trim(forkDigest, "0x") + name := strings.Replace(base, "{__subnet_id__}", fmt.Sprintf("%d", subnet), -1) + return "/" + BlockchainName + + "/" + forkDigest + + "/" + name + + "/" + Encoding +} + +func GetSubnetFromTopic(base, topic string) (int, error) { + re := regexp.MustCompile(base + `_([0-9]+)`) + match := re.FindAllString(topic, -1) + if len(match) < 1 { + return -1, ErrorNoSubnet + } + + re2 := regexp.MustCompile("([0-9]+)") + match = re2.FindAllString(match[0], -1) + if len(match) < 1 { + return -1, ErrorNotParsableSubnet + } + subnet, err := strconv.Atoi(match[0]) + if err != nil { + return -1, errors.Wrap(err, "unable to conver subnet to int") + } + return subnet, nil +} diff --git a/pkg/networks/ethereum/network_info.go b/pkg/networks/ethereum/network_info.go index 5833c73..ed16fd0 100644 --- a/pkg/networks/ethereum/network_info.go +++ b/pkg/networks/ethereum/network_info.go @@ -2,7 +2,6 @@ package ethereum import ( "encoding/hex" - "fmt" "strings" "time" ) @@ -59,24 +58,6 @@ var ( // Deneb DenebCancunKey: "0xee7b3a32", } - - MessageTypes = []string{ - BeaconBlockTopicBase, - BeaconAggregateAndProofTopicBase, - VoluntaryExitTopicBase, - ProposerSlashingTopicBase, - AttesterSlashingTopicBase, - } - - BeaconBlockTopicBase string = "beacon_block" - BeaconAggregateAndProofTopicBase string = "beacon_aggregate_and_proof" - VoluntaryExitTopicBase string = "voluntary_exit" - ProposerSlashingTopicBase string = "proposer_slashing" - AttesterSlashingTopicBase string = "attester_slashing" - AttestationTopicBase string = "beacon_attestation_{__subnet_id__}" - SubnetLimit = 64 - - Encoding string = "ssz_snappy" ) var ( @@ -86,72 +67,7 @@ var ( SecondsPerSlot time.Duration = 12 * time.Second ) -// GenerateEth2Topic returns the built topic out of the given arguments. -// You may check the commented examples above.nstants. -func ComposeTopic(forkDigest string, messageTypeName string) string { - forkDigest = strings.Trim(forkDigest, ForkDigestPrefix) - // if we reach here, inputs were okay - return "/" + BlockchainName + - "/" + forkDigest + - "/" + messageTypeName + - "/" + Encoding -} - -// ComposeAttnetsTopic generates the GossipSub topic for the given ForkDigest and subnet -func ComposeAttnetsTopic(forkDigest string, subnet int) string { - if subnet > SubnetLimit || subnet <= 0 { - return "" - } - - // trim "0x" if exists - forkDigest = strings.Trim(forkDigest, "0x") - name := strings.Replace(AttestationTopicBase, "{__subnet_id__}", fmt.Sprintf("%d", subnet), -1) - return "/" + BlockchainName + - "/" + forkDigest + - "/" + name + - "/" + Encoding -} - -// Eth2TopicPretty: -// This method returns the topic based on it's message type -// in a pretty version of it. -// It would return "beacon_block" out of the given "/eth2/b5303f2a/beacon_block/ssz_snappy" topic -// @param eth2topic:the entire composed eth2 topic with fork digest and compression. -// @return topic pretty. -func Eth2TopicPretty(eth2topic string) string { - return strings.Split(eth2topic, "/")[3] -} - -// ReturnAllTopics: -// This method will iterate over the mesagetype map and return any possible topic for the -// given fork digest. -// @return the array of topics. -func ReturnAllTopics(inputForkDigest string) []string { - result_array := make([]string, 0) - for _, messageValue := range MessageTypes { - result_array = append(result_array, ComposeTopic(inputForkDigest, messageValue)) - } - return result_array -} - -// ReturnTopics: -// Returns topics for the given parameters. -// @param forkDigest: the forkDigest to use in the topic. -// @param messageTypeName: the type of topic. -// @return the list of generated topics with the given parameters (several messageTypes). -func ComposeTopics(forkDigest string, messageTypeName []string) []string { - result_array := make([]string, 0) - - for _, messageTypeTmp := range messageTypeName { - result_array = append(result_array, ComposeTopic(forkDigest, messageTypeTmp)) - } - return result_array -} - -// CheckValidForkDigest: -// This method will check if Fork Digest exists in the corresponding map (ForkDigests). -// @return the fork digest of the given network. -// @return a boolean (true for valid, false for not valid). +// CheckValidForkDigest checks if Fork Digest exists in the corresponding map (ForkDigests). func CheckValidForkDigest(inStr string) (string, bool) { for forkDigestKey, forkDigest := range ForkDigests { if strings.ToLower(forkDigestKey) == inStr { @@ -170,3 +86,5 @@ func CheckValidForkDigest(inStr string) (string, bool) { } return inStr, true } + +// utils From 3a0d7b8324b857481f2388e856840bb1b9356a68 Mon Sep 17 00:00:00 2001 From: Mikel Cortes Date: Thu, 28 Mar 2024 13:33:34 +0100 Subject: [PATCH 3/4] add grafana update --- grafana/Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/grafana/Dockerfile b/grafana/Dockerfile index 5a4c2f7..d0b9572 100644 --- a/grafana/Dockerfile +++ b/grafana/Dockerfile @@ -1,4 +1,4 @@ -FROM grafana/grafana:8.0.0 +FROM grafana/grafana:10.1.0 ADD ./provisioning /etc/grafana/provisioning ADD ./config.ini /etc/grafana/config.ini -ADD ./dashboards /var/lib/grafana/dashboards \ No newline at end of file +ADD ./dashboards /var/lib/grafana/dashboards From 19b8781d97abbb12ca7782eb223175bb673fd185 Mon Sep 17 00:00:00 2001 From: Mikel Cortes <45786396+cortze@users.noreply.github.com> Date: Thu, 28 Mar 2024 17:26:34 +0000 Subject: [PATCH 4/4] add message types and handlers --- go.mod | 9 +++ go.sum | 27 ++++++++ pkg/crawler/ethereum.go | 87 ++++++++++++++++++------ pkg/networks/ethereum/gossip_handlers.go | 66 +++++++++++++++--- pkg/networks/ethereum/gossip_messages.go | 19 +++++- pkg/networks/ethereum/gossip_topics.go | 21 +++++- 6 files changed, 193 insertions(+), 36 deletions(-) diff --git a/go.mod b/go.mod index 0ee0258..c7d0347 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/migalabs/armiarma go 1.21 require ( + github.com/attestantio/go-eth2-client v0.21.1 github.com/ethereum/go-ethereum v1.13.14 github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb github.com/jackc/pgx/v4 v4.18.3 @@ -43,11 +44,14 @@ require ( github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/elastic/gosigar v0.14.3 // indirect + github.com/fatih/color v1.16.0 // indirect + github.com/ferranbt/fastssz v0.1.3 // indirect github.com/flynn/noise v1.1.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-ole/go-ole v1.3.0 // indirect github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect + github.com/goccy/go-yaml v1.9.2 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect @@ -98,11 +102,13 @@ require ( github.com/libp2p/go-reuseport v0.4.0 // indirect github.com/libp2p/go-yamux/v4 v4.0.1 // indirect github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect + github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-pointer v0.0.1 // indirect github.com/miekg/dns v1.1.58 // indirect github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect + github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mr-tron/base58 v1.2.0 // indirect github.com/multiformats/go-base32 v0.1.0 // indirect github.com/multiformats/go-base36 v0.2.0 // indirect @@ -125,6 +131,7 @@ require ( github.com/prometheus/common v0.51.1 // indirect github.com/prometheus/procfs v0.13.0 // indirect github.com/protolambda/bls12-381-util v0.1.0 // indirect + github.com/prysmaticlabs/go-bitfield v0.0.0-20210809151128-385d8c5e3fb7 // indirect github.com/quic-go/qpack v0.4.0 // indirect github.com/quic-go/quic-go v0.42.0 // indirect github.com/quic-go/webtransport-go v0.6.0 // indirect @@ -151,8 +158,10 @@ require ( golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.19.0 // indirect + golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/protobuf v1.33.0 // indirect gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/blake3 v1.2.1 // indirect ) diff --git a/go.sum b/go.sum index 0978caf..1b9fdb5 100644 --- a/go.sum +++ b/go.sum @@ -33,6 +33,8 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5 github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A= +github.com/attestantio/go-eth2-client v0.21.1 h1:yvsMd/azPUbxiJzWZhgqfOJJRNF1zLvAJpcBXTHzyh8= +github.com/attestantio/go-eth2-client v0.21.1/go.mod h1:Tb412NpzhsC0sbtpXS4D51y5se6nDkWAi6amsJrqX9c= github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU= github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= @@ -147,6 +149,11 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 github.com/ethereum/go-ethereum v1.13.14 h1:EwiY3FZP94derMCIam1iW4HFVrSgIcpsu0HwTQtm6CQ= github.com/ethereum/go-ethereum v1.13.14/go.mod h1:TN8ZiHrdJwSe8Cb6x+p0hs5CxhJZPbqB7hHkaUXcmIU= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= +github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= +github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= +github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= +github.com/ferranbt/fastssz v0.1.3 h1:ZI+z3JH05h4kgmFXdHuR1aWYsgrg7o+Fw7/NCzM16Mo= +github.com/ferranbt/fastssz v0.1.3/go.mod h1:0Y9TEd/9XuFlh7mskMPfXiI2Dkw4Ddg9EyXt1W7MRvE= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag= github.com/flynn/noise v1.1.0 h1:KjPQoQCEFdZDiP03phOvGi11+SVVhBG2wOWAorLsstg= @@ -177,11 +184,17 @@ github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ4 github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= +github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= +github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= +github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= +github.com/goccy/go-yaml v1.9.2 h1:2Njwzw+0+pjU2gb805ZC1B/uBuAs2VcZ3K+ZgHwDs7w= +github.com/goccy/go-yaml v1.9.2/go.mod h1:U/jl18uSupI5rdI2jmuCswEA2htH9eXfferR3KfscvA= github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= @@ -482,6 +495,7 @@ github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= @@ -722,12 +736,16 @@ github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd/go.mod h1:QuCEs github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-pointer v0.0.1 h1:n+XhsuGeVO6MEAp7xyEukFINEa+Quek5psIR/ylA6o0= @@ -765,6 +783,8 @@ github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS4 github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY= github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= @@ -957,6 +977,8 @@ github.com/protolambda/zrnt v0.32.3 h1:b3mkBEjcmxtft115cBIQk+2qz1HEb2ExDdduVQqN4 github.com/protolambda/zrnt v0.32.3/go.mod h1:A0fezkp9Tt3GBLATSPIbuY4ywYESyAuc/FFmPKg8Lqs= github.com/protolambda/ztyp v0.2.2 h1:rVcL3vBu9W/aV646zF6caLS/dyn9BN8NYiuJzicLNyY= github.com/protolambda/ztyp v0.2.2/go.mod h1:9bYgKGqg3wJqT9ac1gI2hnVb0STQq7p/1lapqrqY1dU= +github.com/prysmaticlabs/go-bitfield v0.0.0-20210809151128-385d8c5e3fb7 h1:0tVE4tdWQK9ZpYygoV7+vS6QkDvQVySboMVEIxBJmXw= +github.com/prysmaticlabs/go-bitfield v0.0.0-20210809151128-385d8c5e3fb7/go.mod h1:wmuf/mdK4VMD+jA9ThwcUKjg3a2XWM9cVfFYjDyY4j4= github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo= github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A= github.com/quic-go/quic-go v0.42.0 h1:uSfdap0eveIl8KXnipv9K7nlwZ5IqLlYOpJ58u5utpM= @@ -972,6 +994,7 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc= @@ -1313,6 +1336,7 @@ golang.org/x/sys v0.0.0-20210511113859-b0526f3d8744/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -1371,6 +1395,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= +golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= google.golang.org/api v0.0.0-20181030000543-1d582fd0359e/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= google.golang.org/api v0.1.0/go.mod h1:UGEZY7KEX120AnNLIHFMKIo4obdJhkp2tPbaPlQx13Y= @@ -1445,6 +1471,7 @@ gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/crawler/ethereum.go b/pkg/crawler/ethereum.go index 65904cc..c76c388 100644 --- a/pkg/crawler/ethereum.go +++ b/pkg/crawler/ethereum.go @@ -145,31 +145,12 @@ func NewEthereumCrawler(mainCtx *cli.Context, conf config.EthereumCrawlerConfig) // create a gossipsub routing gs := gossipsub.NewGossipSub(ctx, host.Host(), dbClient) - // generate a new subnets-handler - ethMsgHandler, err := eth.NewEthMessageHandler(ethNode.GetNetworkGenesis(), conf.ValPubkeys) + // subscribe to all the network topics + ethMsgHandler, err := subscribeToGossipSubTopics(gs) if err != nil { cancel() return nil, err } - // subscribe the topics - for _, top := range conf.GossipTopics { - var msgHandler gossipsub.MessageHandler - switch top { - case eth.BeaconBlockTopicBase: - msgHandler = ethMsgHandler.BeaconBlockMessageHandler - default: - log.Error("untraceable gossipsub topic", top) - continue - - } - topic := eth.ComposeTopic(conf.ForkDigest, top) - gs.JoinAndSubscribe(topic, msgHandler, conf.PersistMsgs) - } - // subcribe to attestation subnets - for _, subnet := range conf.Subnets { - subTopics := eth.ComposeAttnetsTopic(conf.ForkDigest, subnet) - gs.JoinAndSubscribe(subTopics, ethMsgHandler.SubnetMessageHandler, conf.PersistMsgs) - } // generate the peering strategy pStrategy, err := peering.NewPruningStrategy( @@ -262,3 +243,67 @@ func (c *EthereumCrawler) Close() { c.Events.Stop() c.cancel() } + + +func subscribeToGossipSubTopics(gs *gossipsub.GossipSub) (*EthMessageHandler, error) { + + // generate a new subnets-handler + ethMsgHandler, err := eth.NewEthMessageHandler(ethNode.GetNetworkGenesis(), conf.ValPubkeys) + if err != nil { + return nil, err + } + + emptyhandler := func(msg *pubsub.Message) (PersistableMsg, error) { + return &DummyMessage, nil + } + + // subscribe to all topics + for _, top := range ethereum.EthereumValidTopics { + var msgHandler gossipsub.MessageHandler + switch top { + case eth.BeaconBlockTopicBase: + msgHandler = ethMsgHandler.BeaconBlockMessageHandler + case eth.BeaconAggregationAndProofMessageHandler: + msgHandler = ethMsgHandler.BeaconAggregationAndProofMessageHandler + case eth.AttestationSubnetsTopicBase: + for _, subnet := range conf.AttestationSubnetLimit { + t := eth.ComposeSubnetTopic(eth.AttestationSubnetsTopicBase, conf.ForkDigest, subnet) + gs.JoinAndSubscribe(t, ethMsgHandler.SubnetMessageHandler, conf.PersistMsgs) + } + case eth.VoluntaryExitTopicBase: + + case eth.ProposerSlashingTopicBase: + + case eth.AttesterSlashingTopicBase: + + case eth.SyncCommitteeAggregationsTopicBase: + + case eth.SyncCommitteeSubnetsTopicBase: + for _, subnet := range conf.SyncCommitteeSubnetLimit { + t := eth.ComposeSubnetTopic(eth.SyncCommitteeSubnetsTopicBase, conf.ForkDigest, subnet) + gs.JoinAndSubscribe(t, ethMsgHandler.SubnetMessageHandler, conf.PersistMsgs) + } + case eth.BLStoExectionChangeTopicBase: + + case eth.BlobsSubnetsTopicBase: + for _, subnet := range conf.BlobSubnetLimit { + t := eth.ComposeSubnetTopic(eth.BlobsSubnetsTopicBase, conf.ForkDigest, subnet) + gs.JoinAndSubscribe(t, ethMsgHandler.SubnetMessageHandler, conf.PersistMsgs) + } + + default: + log.Error("untraceable gossipsub topic", top) + continue + } + topic := eth.ComposeTopic(conf.ForkDigest, top) + gs.JoinAndSubscribe(topic, msgHandler, conf.PersistMsgs) + } + // subcribe to attestation subnets + for _, subnet := range conf.Subnets { + subTopics := eth.ComposeAttnetsTopic(conf.ForkDigest, subnet) + gs.JoinAndSubscribe(subTopics, ethMsgHandler.SubnetMessageHandler, conf.PersistMsgs) + } + + return ethMsgHandler, nil +} + diff --git a/pkg/networks/ethereum/gossip_handlers.go b/pkg/networks/ethereum/gossip_handlers.go index 00ac976..fb5b576 100644 --- a/pkg/networks/ethereum/gossip_handlers.go +++ b/pkg/networks/ethereum/gossip_handlers.go @@ -3,21 +3,23 @@ package ethereum import ( "bytes" "fmt" + "sync" + "time" + "github.com/protolambda/zrnt/eth2/beacon/altair" "github.com/protolambda/zrnt/eth2/beacon/deneb" "github.com/protolambda/zrnt/eth2/beacon/phase0" - "sync" - "time" + "github.com/protolambda/zrnt/eth2/beacon/common" + "github.com/protolambda/zrnt/eth2/configs" + "github.com/protolambda/ztyp/codec" + attdeneb "github.com/attestantio/go-eth2-client/spec/deneb" // bls "github.com/phoreproject/github.com/bls/g1pubs" + pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/migalabs/armiarma/pkg/gossipsub" - "github.com/protolambda/zrnt/eth2/beacon/common" - "github.com/protolambda/zrnt/eth2/configs" - "github.com/protolambda/ztyp/codec" "github.com/golang/snappy" - pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) @@ -37,6 +39,7 @@ type EthMessageHandler struct { genesisTime time.Time pubkeys []*common.BLSPubkey // pubkeys of those validators we want to track + // SubHanndlers for each gossipsub topic (Oriented mostly for HTTP SSE) callbackM sync.RWMutex messageCallbacks map[EthereumGossipTopic][]func(event interface{}) } @@ -321,6 +324,7 @@ func (mh *EthMessageHandler) TrackedSyncAggregate(msg *pubsub.Message) (gossipsu } trackedSyncAggregate := &TrackedSyncAggregate{ TrackedMessage: TrackedMessage{ + Msg: syncContribution, MsgID: msg.ID, Time: msg.ArrivalTime, Sender: msg.ReceivedFrom, @@ -350,21 +354,22 @@ func (mh *EthMessageHandler) TrackedSyncVotes(msg *pubsub.Message) (gossipsub.Pe return nil, err } msgBuf := bytes.NewBuffer(msgBytes) - syncContribution := new(altair.SyncCommitteeMessage) + syncVote := new(altair.SyncCommitteeMessage) - err = syncContribution.Deserialize(codec.NewDecodingReader(msgBuf, uint64(len(msgBuf.Bytes())))) + err = syncVote.Deserialize(codec.NewDecodingReader(msgBuf, uint64(len(msgBuf.Bytes())))) if err != nil { return nil, err } trackedSyncMsg := &TrackedSyncMessage{ TrackedMessage: TrackedMessage{ + Msg: syncVote, MsgID: msg.ID, Time: msg.ArrivalTime, Sender: msg.ReceivedFrom, }, - ValIndex: uint64(syncContribution.ValidatorIndex), - TimeInSlot: GetTimeInSlot(mh.genesisTime, msg.ArrivalTime, int64(syncContribution.Slot)), - Slot: uint64(syncContribution.Slot), + ValIndex: uint64(syncVote.ValidatorIndex), + TimeInSlot: GetTimeInSlot(mh.genesisTime, msg.ArrivalTime, int64(syncVote.Slot)), + Slot: uint64(syncVote.Slot), } // check if there is any callback callbacks, ok := mh.getCallBacks(BeaconSubnetSyncCommitteeVoteTopic) @@ -375,3 +380,42 @@ func (mh *EthMessageHandler) TrackedSyncVotes(msg *pubsub.Message) (gossipsub.Pe } return trackedSyncMsg, nil } + + +func (mh *EthMessageHandler) TrackedBlobSidecars(msg *pubsub.Message) (gossipsub.PersistableMsg, error) { + t := time.Now() + defer log.Trace("total time to handle msg:", time.Since(t)) + topic := *msg.Topic + + // extract the data from the raw message + msgBytes, err := EthMessageBaseHandler(topic, msg) + if err != nil { + return nil, err + } + msgBuf := bytes.NewBuffer(msgBytes) + blobSidecar := new(attdeneb.BlobSidecar) + + err = blobSidecar.UnmarshalSSZ(msgBuf.Bytes()) + if err != nil { + return nil, err + } + trackedSyncMsg := &TrackedBlobSidecards{ + TrackedMessage: TrackedMessage{ + Msg: blobSidecar, + MsgID: msg.ID, + Time: msg.ArrivalTime, + Sender: msg.ReceivedFrom, + }, + BlobIndex: uint64(blobSidecar.Index), + BeaconBlockRoot: blobSidecar.SignedBlockHeader.Message.Root.String(), + + } + // check if there is any callback + callbacks, ok := mh.getCallBacks(BeaconSubnetSyncCommitteeVoteTopic) + if ok { + for _, callback := range callbacks { + callback(trackedSyncMsg) // TODO: update to submite the event + } + } + return trackedSyncMsg, nil +} \ No newline at end of file diff --git a/pkg/networks/ethereum/gossip_messages.go b/pkg/networks/ethereum/gossip_messages.go index 9cefd23..fe7401e 100644 --- a/pkg/networks/ethereum/gossip_messages.go +++ b/pkg/networks/ethereum/gossip_messages.go @@ -39,10 +39,20 @@ func (m *TrackedMessage) ArrivalTime() peer.ID { return m.Sender } -func (m *TrackedMessage) Message() any { +func (m *TrackedMessage) Message() any { // any = Marshaleable return m.Msg } + +// Dummy message (For not yet ready topics) +type DummyMessage struct { + TrackedMessage +} + +func (m *DummyMessage) IsZero() bool { + return true +} + // Ethereum Message-Specifics // Beacon Block type TrackedBeaconBlock struct { @@ -140,6 +150,13 @@ func (a *TrackedSyncMessage) IsZero() bool { // BLS_Changes (TODO) // blobs (TODO: - zrnt doesn't include the blob struct, still looking for the time to implement the entire structure, the SSZ serialization, the view, the tree hashing, etc) +// Experimental using the Eth2Clients library from attestant -> https://github.com/attestantio/go-eth2-client/blob/2d68bcd60d23ca11bbf073332f86a15b83b7a265/spec/deneb/blobsidecar.go#L24 type TrackedBlobSidecards struct { TrackedMessage + BlobIndex uint64 + BeaconBlockRoot string } + +func (a *TrackedBlobSidecards) IsZero() bool { + return a.BeaconBlockRoot != "" +} \ No newline at end of file diff --git a/pkg/networks/ethereum/gossip_topics.go b/pkg/networks/ethereum/gossip_topics.go index 1f10a23..319765b 100644 --- a/pkg/networks/ethereum/gossip_topics.go +++ b/pkg/networks/ethereum/gossip_topics.go @@ -31,17 +31,32 @@ const ( ProposerSlashingTopicBase string = "proposer_slashing" AttesterSlashingTopicBase string = "attester_slashing" AttestationSubnetsTopicBase string = "beacon_attestation_{__subnet_id__}" - SubnetLimit = 64 + AttestationSubnetLimit = 64 SyncCommitteeAggregationsTopicBase string = "sync_committee_contribution_and_proof" SyncCommitteeSubnetsTopicBase string = "sync_committee_{__subnet_id__}" - SyncCommitteeLimit = 4 + SyncCommitteeSubnetLimit = 4 BLStoExectionChangeTopicBase string = "bls_to_execution_change" BlobsSubnetsTopicBase string = "blob_sidecar_{__subnet_id__}" + BlobSubnetLimit = 6 // encoding-compression Encoding string = "ssz_snappy" ) +// valid ethereum topics +var EthereumValidTopics = []string{ + BeaconBlockTopicBase, + BeaconAggregateAndProofTopicBase, + AttestationSubnetsTopicBase, + VoluntaryExitTopicBase, + ProposerSlashingTopicBase, + AttesterSlashingTopicBase, + SyncCommitteeAggregationsTopicBase, + SyncCommitteeSubnetsTopicBase, + BLStoExectionChangeTopicBase, + BlobsSubnetsTopicBase, +} + // EthTopicPretty returns the topic based on its message type in a pretty version of it. // It would return "beacon_block" out of the given "/eth2/b5303f2a/beacon_block/ssz_snappy" topic func EthTopicPretty(eth2topic string) string { @@ -60,7 +75,7 @@ func ComposeTopic(forkDigest string, messageTypeName string) string { // ComposeSubnetTopic generates the GossipSub topic for the given ForkDigest, base, and subnet func ComposeSubnetTopic(base, forkDigest string, subnet int) string { - if subnet > SubnetLimit || subnet <= 0 { + if subnet <= 0 { return "" }