Skip to content

Commit

Permalink
glob filter (#9148)
Browse files Browse the repository at this point in the history
  • Loading branch information
elee1766 authored Jan 7, 2024
1 parent e958d35 commit 235af8a
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 97 deletions.
2 changes: 1 addition & 1 deletion cl/phase1/network/gossip_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (g *GossipManager) onRecv(ctx context.Context, data *sentinel.GossipData, l
}

func (g *GossipManager) Start(ctx context.Context) {
subscription, err := g.sentinel.SubscribeGossip(ctx, &sentinel.EmptyMessage{})
subscription, err := g.sentinel.SubscribeGossip(ctx, &sentinel.SubscriptionData{})
if err != nil {
return
}
Expand Down
19 changes: 18 additions & 1 deletion cl/sentinel/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net/http"
"path"
"strconv"
"strings"
"sync"
Expand All @@ -24,6 +25,8 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
)

var _ sentinelrpc.SentinelServer = (*SentinelServer)(nil)

type SentinelServer struct {
sentinelrpc.UnimplementedSentinelServer

Expand Down Expand Up @@ -108,7 +111,7 @@ func (s *SentinelServer) PublishGossip(_ context.Context, msg *sentinelrpc.Gossi
return &sentinelrpc.EmptyMessage{}, subscription.Publish(compressedData)
}

func (s *SentinelServer) SubscribeGossip(_ *sentinelrpc.EmptyMessage, stream sentinelrpc.Sentinel_SubscribeGossipServer) error {
func (s *SentinelServer) SubscribeGossip(data *sentinelrpc.SubscriptionData, stream sentinelrpc.Sentinel_SubscribeGossipServer) error {
// first of all subscribe
ch, subId, err := s.gossipNotifier.addSubscriber()
if err != nil {
Expand All @@ -122,6 +125,9 @@ func (s *SentinelServer) SubscribeGossip(_ *sentinelrpc.EmptyMessage, stream sen
case <-stream.Context().Done():
return nil
case packet := <-ch:
if !s.gossipMatchSubscription(packet, data) {
continue
}
if err := stream.Send(&sentinelrpc.GossipData{
Data: packet.data,
Name: packet.t,
Expand All @@ -135,6 +141,17 @@ func (s *SentinelServer) SubscribeGossip(_ *sentinelrpc.EmptyMessage, stream sen
}
}

func (s *SentinelServer) gossipMatchSubscription(obj gossipObject, data *sentinelrpc.SubscriptionData) bool {
if data.Filter != nil {
filter := data.GetFilter()
matched, err := path.Match(obj.t, filter)
if err != nil || !matched {
return false
}
}
return true
}

func (s *SentinelServer) withTimeoutCtx(pctx context.Context, dur time.Duration) (ctx context.Context, cn func()) {
if dur > 0 {
ctx, cn = context.WithTimeout(pctx, 8*time.Second)
Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/direct/sentinel_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (s *SentinelClientDirect) PublishGossip(ctx context.Context, in *sentinel.G

// Subscribe gossip part. the only complex section of this bullshit

func (s *SentinelClientDirect) SubscribeGossip(ctx context.Context, in *sentinel.EmptyMessage, opts ...grpc.CallOption) (sentinel.Sentinel_SubscribeGossipClient, error) {
func (s *SentinelClientDirect) SubscribeGossip(ctx context.Context, in *sentinel.SubscriptionData, opts ...grpc.CallOption) (sentinel.Sentinel_SubscribeGossipClient, error) {
ch := make(chan *gossipReply, 16384)
streamServer := &SentinelSubscribeGossipS{ch: ch, ctx: ctx}
go func() {
Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.20
require (
github.com/erigontech/mdbx-go v0.27.21
github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20240101230756-23fbc6c56a1d
github.com/ledgerwatch/interfaces v0.0.0-20231209102305-b17e86fbe07d
github.com/ledgerwatch/interfaces v0.0.0-20240105165407-ac2eea9d3c55
github.com/ledgerwatch/log/v3 v3.9.0
github.com/ledgerwatch/secp256k1 v1.0.0
)
Expand Down
4 changes: 2 additions & 2 deletions erigon-lib/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20240101230756-23fbc6c56a1d h1:rMqDEGLdmVgGdpDmaNp4Do1vc9BtUQ3rjFD9gQBRSx0=
github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20240101230756-23fbc6c56a1d/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
github.com/ledgerwatch/interfaces v0.0.0-20231209102305-b17e86fbe07d h1:7aB9lKmUGAaWt4TzXnGLzJSZkhyuqREMmaao+Gn5Ky0=
github.com/ledgerwatch/interfaces v0.0.0-20231209102305-b17e86fbe07d/go.mod h1:ugQv1QllJzBny3cKZKxUrSnykkjkBgm27eQM6dnGAcc=
github.com/ledgerwatch/interfaces v0.0.0-20240105165407-ac2eea9d3c55 h1:Avbyef7Fyz5O9TkN/AoPy4NRXiRiFoYrG6Zp8CyRcrM=
github.com/ledgerwatch/interfaces v0.0.0-20240105165407-ac2eea9d3c55/go.mod h1:ugQv1QllJzBny3cKZKxUrSnykkjkBgm27eQM6dnGAcc=
github.com/ledgerwatch/log/v3 v3.9.0 h1:iDwrXe0PVwBC68Dd94YSsHbMgQ3ufsgjzXtFNFVZFRk=
github.com/ledgerwatch/log/v3 v3.9.0/go.mod h1:EiAY6upmI/6LkNhOVxb4eVsmsP11HZCnZ3PlJMjYiqE=
github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ=
Expand Down
Loading

0 comments on commit 235af8a

Please sign in to comment.