Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds packet watcher for Polypus #201

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/ordering/cosipbft/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ import (
const (
// RoundTimeout is the maximum of time the service waits for an event to
// happen.
RoundTimeout = 10 * time.Second
RoundTimeout = 20 * time.Second

// RoundWait is the constant value of the exponential backoff use between
// round failures.
RoundWait = 5 * time.Millisecond
RoundWait = 15 * time.Millisecond

// RoundMaxWait is the maximum amount for the backoff.
RoundMaxWait = 5 * time.Minute
RoundMaxWait = 15 * time.Minute

rpcName = "cosipbft"
)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.14
require (
github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
github.com/golang/protobuf v1.5.2
github.com/opentracing-contrib/go-grpc v0.0.0-20200813121455-4a6760c71486
github.com/opentracing/opentracing-go v1.2.0
github.com/prometheus/client_golang v1.12.1
Expand All @@ -22,6 +21,7 @@ require (
golang.org/x/tools v0.1.11-0.20220316014157-77aa08bb151a
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
google.golang.org/grpc v1.45.0
google.golang.org/protobuf v1.26.0
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect
gopkg.in/yaml.v2 v2.4.0
)
6 changes: 1 addition & 5 deletions internal/testing/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import (
"strings"
"testing"

"github.com/golang/protobuf/descriptor"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
)

// CoverProtoMessage triggers a test on the message definition to force the
Expand All @@ -18,9 +17,6 @@ func CoverProtoMessage(t *testing.T, message proto.Message) error {
err = proto.Unmarshal(buffer, message)
require.NoError(t, err)
proto.Merge(message, message)
proto.DiscardUnknown(message)
require.NotNil(t, message.String())
message.(descriptor.Message).Descriptor()

// Run the getters
tt := reflect.TypeOf(message)
Expand Down
42 changes: 0 additions & 42 deletions internal/traffic/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"time"

"go.dedis.ch/dela"
"go.dedis.ch/dela/core"
"go.dedis.ch/dela/mino"
"go.dedis.ch/dela/mino/router"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -65,43 +64,6 @@ var (
headerURIKey = "apiuri"
)

// GlobalWatcher can be used to watch for sent and received messages.
var GlobalWatcher = Watcher{
outWatcher: core.NewWatcher(),
inWatcher: core.NewWatcher(),
}

// Watcher defines an element to watch for sent and received messages.
type Watcher struct {
outWatcher core.Observable
inWatcher core.Observable
}

// WatchOuts returns a channel populated with sent messages.
func (w *Watcher) WatchOuts(ctx context.Context) <-chan Event {
return watch(ctx, w.outWatcher)
}

// WatchIns returns a channel populated with received messages.
func (w *Watcher) WatchIns(ctx context.Context) <-chan Event {
return watch(ctx, w.inWatcher)
}

// watch is a generic function to watch for events
func watch(ctx context.Context, watcher core.Observable) <-chan Event {
obs := observer{ch: make(chan Event, watcherSize)}

watcher.Add(obs)

go func() {
<-ctx.Done()
watcher.Remove(obs)
close(obs.ch)
}()

return obs.ch
}

// SaveItems saves all the items as a graph
func SaveItems(path string, withSend, withRcv bool) error {
f, err := os.Create(path)
Expand Down Expand Up @@ -161,16 +123,12 @@ func (t *Traffic) Save(path string, withSend, withRcv bool) error {
// sender and the gateway as the receiver, while also recording the packet
// itself.
func (t *Traffic) LogSend(ctx context.Context, gateway mino.Address, pkt router.Packet) {
GlobalWatcher.outWatcher.Notify(Event{Address: gateway, Pkt: pkt})

t.addItem(ctx, "send", gateway, pkt)
}

// LogRecv records a packet received by the node. The sender is the gateway and
// the receiver the node.
func (t *Traffic) LogRecv(ctx context.Context, gateway mino.Address, pkt router.Packet) {
GlobalWatcher.inWatcher.Notify(Event{Address: gateway, Pkt: pkt})

t.addItem(ctx, "received", gateway, pkt)
}

Expand Down
18 changes: 8 additions & 10 deletions internal/traffic/mod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,13 @@ func TestGenerateEventGraphViz(t *testing.T) {
}

func TestWatcherIns(t *testing.T) {
watcher := GlobalWatcher
events := watcher.WatchIns(context.Background())

traffic := NewTraffic(fake.NewAddress(0), ioutil.Discard)
eh := NewEventHandler()
events := eh.WatchIns(context.Background())

addr := fake.NewAddress(0)
pkt := newFakePacket(fake.NewAddress(1), fake.NewAddress(2))
traffic.LogRecv(context.Background(), addr, pkt)

eh.NotifyIn(addr, pkt)

select {
case event := <-events:
Expand All @@ -180,14 +179,13 @@ func TestWatcherIns(t *testing.T) {
}

func TestWatcherOuts(t *testing.T) {
watcher := GlobalWatcher
events := watcher.WatchOuts(context.Background())

traffic := NewTraffic(fake.NewAddress(0), ioutil.Discard)
eh := NewEventHandler()
events := eh.WatchOuts(context.Background())

addr := fake.NewAddress(0)
pkt := newFakePacket(fake.NewAddress(1), fake.NewAddress(2))
traffic.LogSend(context.Background(), addr, pkt)

eh.NotifyOut(addr, pkt)

select {
case event := <-events:
Expand Down
80 changes: 80 additions & 0 deletions internal/traffic/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package traffic

import (
"context"

"go.dedis.ch/dela/core"
"go.dedis.ch/dela/mino"
"go.dedis.ch/dela/mino/router"
)

// Watcher describes a Watcher of outgoing and incoming events.
type Watcher interface {
WatchOuts(ctx context.Context) <-chan Event
WatchIns(ctx context.Context) <-chan Event
}

// Notifier describes the function to push events to a Watcher.
type Notifier interface {
NotifyIn(mino.Address, router.Packet)
NotifyOut(mino.Address, router.Packet)
}

// EventsHandler describes the functions to handle events
type EventsHandler interface {
Watcher
Notifier
}

// NewEventHandler returns a new initialized event handler.
func NewEventHandler() EventsHandler {
return &DefaultEventHandlers{
outWatcher: core.NewWatcher(),
inWatcher: core.NewWatcher(),
}
}

// DefaultEventHandlers provides a default implementation of an EventHandler.
//
// - implements EventsHandler
type DefaultEventHandlers struct {
outWatcher core.Observable
inWatcher core.Observable
}

// WatchOuts implements Watcher. It returns a channel populated with sent
// messages.
func (d *DefaultEventHandlers) WatchOuts(ctx context.Context) <-chan Event {
return watch(ctx, d.outWatcher)
}

// WatchIns implements Watcher. It returns a channel populated with received
// messages.
func (d *DefaultEventHandlers) WatchIns(ctx context.Context) <-chan Event {
return watch(ctx, d.inWatcher)
}

// NotifyIn implements Notifier.
func (d *DefaultEventHandlers) NotifyIn(a mino.Address, p router.Packet) {
d.inWatcher.Notify(Event{Address: a, Pkt: p})
}

// NotifyOut implements Notifier.
func (d *DefaultEventHandlers) NotifyOut(a mino.Address, p router.Packet) {
d.outWatcher.Notify(Event{Address: a, Pkt: p})
}

// watch is a generic function to watch for events
func watch(ctx context.Context, watcher core.Observable) <-chan Event {
obs := observer{ch: make(chan Event, watcherSize)}

watcher.Add(obs)

go func() {
<-ctx.Done()
watcher.Remove(obs)
close(obs.ch)
}()

return obs.ch
}
8 changes: 7 additions & 1 deletion mino/minogrpc/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ type Minogrpc struct {
endpoints map[string]*Endpoint
started chan struct{}
closing chan error

eh traffic.EventsHandler
}

type minoTemplate struct {
Expand Down Expand Up @@ -217,6 +219,7 @@ func NewMinogrpc(listen net.Addr, public *url.URL, router router.Router, opts ..
endpoints: make(map[string]*Endpoint),
started: make(chan struct{}),
closing: make(chan error, 1),
eh: traffic.NewEventHandler(),
}

// Counter needs to be >=1 for asynchronous call to Add.
Expand All @@ -225,6 +228,7 @@ func NewMinogrpc(listen net.Addr, public *url.URL, router router.Router, opts ..
ptypes.RegisterOverlayServer(server, &overlayServer{
overlay: o,
endpoints: m.endpoints,
notifier: m.eh,
})

dela.Logger.Info().Msgf("listening on: %s", socket.Addr().String())
Expand Down Expand Up @@ -301,6 +305,7 @@ func (m *Minogrpc) WithSegment(segment string) mino.Mino {
overlay: m.overlay,
segments: append(m.segments, segment),
endpoints: m.endpoints,
eh: m.eh,
}

return newM
Expand All @@ -316,6 +321,7 @@ func (m *Minogrpc) CreateRPC(name string, h mino.Handler, f serde.Factory) (mino
uri: strings.Join(uri, "/"),
overlay: m.overlay,
factory: f,
eh: m.eh,
}

for _, segment := range uri {
Expand Down Expand Up @@ -347,7 +353,7 @@ func (m *Minogrpc) String() string {

// GetTrafficWatcher returns the traffic watcher.
func (m *Minogrpc) GetTrafficWatcher() traffic.Watcher {
return traffic.GlobalWatcher
return m.eh
}

// Listen starts the server. It waits for the go routine to start before
Expand Down
2 changes: 1 addition & 1 deletion mino/minogrpc/ptypes/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
// minogrpc.
package ptypes

//go:generate protoc -I ./ --go_out=plugins=grpc:./ ./overlay.proto
//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative overlay.proto
Loading