Skip to content

alpha 3.1 2025-03-19 #105

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions commontypes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net"
"strings"

// TODO: is there a way to remove this dependency?
ragetypes "github.com/smartcontractkit/libocr/ragep2p/types"
)

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ require (
github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c // indirect
github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/klauspost/compress v1.16.0 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
Expand Down Expand Up @@ -101,7 +101,7 @@ require (
github.com/urfave/cli/v2 v2.25.7 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/sync v0.9.0 // indirect
golang.org/x/sys v0.27.0 // indirect
golang.org/x/text v0.20.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlT
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4=
github.com/klauspost/compress v1.16.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
Expand Down Expand Up @@ -317,8 +317,8 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4=
golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down
93 changes: 93 additions & 0 deletions internal/ringbuffer/ringbuffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package ringbuffer

import "fmt"

// RingBuffer implements a fixed capacity ring buffer for items of type T.
// NOTE: THIS IMPLEMENTATION IS NOT SAFE FOR CONCURRENT USE.
type RingBuffer[T any] struct {
first int // index of the front (=oldest) element
size int // number of elements currently stored in this ring buffer
items []T // fixed size buffer holding the elements
}

func NewRingBuffer[T any](cap int) *RingBuffer[T] {
if cap <= 0 {
panic(fmt.Sprintf("NewRingBuffer: cap must be positive, got %d", cap))
}
return &RingBuffer[T]{
0,
0,
make([]T, cap),
}
}

func (rb *RingBuffer[T]) Size() int {
return rb.size
}

func (rb *RingBuffer[T]) Cap() int {
return len(rb.items)
}

func (rb *RingBuffer[T]) IsEmpty() bool {
return rb.size == 0
}

func (rb *RingBuffer[T]) IsFull() bool {
return rb.size == len(rb.items)
}

// Peek returns the front (=oldest) item without removing it.
// Return false as second argument if there are no items in the ring buffer.
func (rb *RingBuffer[T]) Peek() (result T, ok bool) {
if rb.size > 0 {
ok = true
result = rb.items[rb.first]
}
return result, ok
}

// Pop removes and returns the front (=oldest) item.
// Return false as second argument if there are no items in the ring buffer.
func (rb *RingBuffer[T]) Pop() (result T, ok bool) {
result, ok = rb.Peek()
if ok {
var zero T
rb.items[rb.first] = zero
rb.first = (rb.first + 1) % len(rb.items)
rb.size--
}
return result, ok
}

// Try to push a new item to the back of the ring buffer.
// Returns
// - true if the item was added, or
// - false if the item cannot be added because the buffer is currently full.
func (rb *RingBuffer[T]) TryPush(item T) (ok bool) {
if rb.IsFull() {
return false
}
rb.items[(rb.first+rb.size)%len(rb.items)] = item
rb.size++
return true
}

// Push new item to the back of the ring buffer.
// If the buffer is currently full, the front (=oldest) item is evicted and returned to make space for the new item.
func (rb *RingBuffer[T]) PushEvict(item T) (evicted T, didEvict bool) {
if rb.IsFull() {
// Evict the oldest item to be returned.
evicted = rb.items[rb.first]
didEvict = true

// Push the new item to new empty space and update the first index to the next (oldest) item.
rb.items[rb.first] = item
rb.first = (rb.first + 1) % len(rb.items)
} else {
// Perform a normal push operation (which is known to be successful as the buffer is not full).
rb.items[(rb.first+rb.size)%len(rb.items)] = item
rb.size++
}
return evicted, didEvict
}
28 changes: 28 additions & 0 deletions networking/ocr3_1_peer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package networking

import (
"github.com/smartcontractkit/libocr/commontypes"
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"
)

var _ types.BinaryNetworkEndpoint2Factory = &ocr3_1BinaryNetworkEndpointFactory{}

type ocr3_1BinaryNetworkEndpointFactory struct {
*concretePeerV2
}

func (o *ocr3_1BinaryNetworkEndpointFactory) NewEndpoint(
configDigest types.ConfigDigest,
pids []string,
v2bootstrappers []commontypes.BootstrapperLocator,
defaultPriorityConfig types.BinaryNetworkEndpoint2Config,
lowPriorityConfig types.BinaryNetworkEndpoint2Config,
) (types.BinaryNetworkEndpoint2, error) {
return o.newEndpoint3_1(
configDigest,
pids,
v2bootstrappers,
defaultPriorityConfig,
lowPriorityConfig,
)
}
25 changes: 14 additions & 11 deletions networking/ocr_endpoint_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type ocrEndpointV2 struct {
// internal and state management
chSendToSelf chan commontypes.BinaryMessageWithSender
chClose chan struct{}
streams map[commontypes.OracleID]*ragep2p.Stream
streams map[commontypes.OracleID]*ragep2p.Stream2
registration io.Closer
state ocrEndpointState

Expand Down Expand Up @@ -131,7 +131,7 @@ func newOCREndpointV2(
ownOracleID,
chSendToSelf,
make(chan struct{}),
make(map[commontypes.OracleID]*ragep2p.Stream),
make(map[commontypes.OracleID]*ragep2p.Stream2),
registration,
ocrEndpointUnstarted,
sync.RWMutex{},
Expand Down Expand Up @@ -168,9 +168,10 @@ func (o *ocrEndpointV2) Start() error {
continue
}
streamName := streamNameFromConfigDigest(o.configDigest)
stream, err := o.host.NewStream(
stream, err := o.host.NewStream2(
pid,
streamName,
ragep2p.StreamPriorityDefault,
o.config.OutgoingMessageBufferSize,
o.config.IncomingMessageBufferSize,
o.limits.MaxMessageLength,
Expand All @@ -190,7 +191,6 @@ func (o *ocrEndpointV2) Start() error {
}

for oid := range o.streams {
oid := oid
o.subs.Go(func() {
o.runRecv(oid)
})
Expand All @@ -209,16 +209,20 @@ func (o *ocrEndpointV2) Start() error {
// remote goes mad and sends us thousands of messages, we don't drop any
// messages from good remotes
func (o *ocrEndpointV2) runRecv(oid commontypes.OracleID) {
chRecv := o.streams[oid].ReceiveMessages()
chRecv := o.streams[oid].Receive()
for {
select {
case payload := <-chRecv:
msg := commontypes.BinaryMessageWithSender{
Msg: payload,
case msg := <-chRecv:
msgPlain, ok := msg.(ragep2p.InboundBinaryMessagePlain)
if !ok {
o.logger.Warn("dropping message ", nil) // TODO
}
msgWithSender := commontypes.BinaryMessageWithSender{
Msg: msgPlain.Payload,
Sender: oid,
}
select {
case o.recv <- msg:
case o.recv <- msgWithSender:
continue
case <-o.chClose:
return
Expand Down Expand Up @@ -299,7 +303,7 @@ func (o *ocrEndpointV2) SendTo(payload []byte, to commontypes.OracleID) {
return
}

o.streams[to].SendMessage(payload)
o.streams[to].Send(ragep2p.OutboundBinaryMessagePlain{payload})
}

func (o *ocrEndpointV2) sendToSelf(payload []byte) {
Expand All @@ -322,7 +326,6 @@ func (o *ocrEndpointV2) Broadcast(payload []byte) {
var subs subprocesses.Subprocesses
defer subs.Wait()
for oracleID := range o.peerMapping {
oracleID := oracleID
subs.Go(func() {
o.SendTo(payload, oracleID)
})
Expand Down
Loading
Loading