diff --git a/broadcast/README.md b/broadcast/README.md new file mode 100644 index 00000000..554bb267 --- /dev/null +++ b/broadcast/README.md @@ -0,0 +1,264 @@ +# Broadcast + +### Preliminary + +The main contribution to Gorums in this Master's thesis are the following files and folders. The root directory is gorums (github.com/relab/gorums) and is not specified in the list for brevity: + +- All files in this folder. (github.com/relab/gorums/broadcast) +- broadcast.go +- handler.go +- clientserver.go +- tests/broadcast +- authentication +- logging + +Additionally, we have made contributions to most of the other files. These changes are presented in this draft pull request: + + https://github.com/relab/gorums/pull/176 + +## Documentation + +We will use an example when presenting the broadcast framework. This example is Eager Reliable Broadcast and is inspired by the implementation on page 124 in this [book](https://link.springer.com/book/10.1007/978-3-642-15260-3). + +#### Prerequisites + +There are no additional prerequisites needed to enable the broadcast framework functionality. The functionality is compatible with the current version of Gorums. If you are using Gorums for the first time, we refer you to the README file in the root directory. + +### Proto file + +The broadcast framework provides two proto options: + +- broadcastcall: Used as entrypoint for clients. +- broadcast: Used by servers to communicate to each others. + +```proto3 +import "gorums.proto"; + +service ReliableBroadcast { + rpc Broadcast(Message) returns (Message) { + option (gorums.broadcastcall) = true; + } + rpc Deliver(Message) returns (Empty) { + option (gorums.broadcast) = true; + } +} + +message Message { + string Data = 1; +} + +message Empty {} +``` + +Notice that the return type of the RPC method `Deliver`. The return type is not used because servers only communicate by broadcasting to each other. The method with broadcastcall does, however, have a return type. This is the type that the servers will reply with when the client invokes `Broadcast`. + +### Client + +After generating the proto files we can define the client in a file named `client.go`: + +```go +type Client struct { + mgr *pb.Manager + config *pb.Configuration +} + +func New(addr string, srvAddresses []string, qSize int) *Client { + mgr := pb.NewManager( + gorums.WithGrpcDialOptions( + grpc.WithTransportCredentials(insecure.NewCredentials()), + ), + ) + lis, _ := net.Listen("tcp", addr) + mgr.AddClientServer(lis, lis.Addr()) + config, _ := mgr.NewConfiguration( + NewQSpec(qSize), + gorums.WithNodeList(srvAddresses), + ) + return &Client{ + mgr: mgr, + config: config, + } +} +``` + +The only addition the broadcast framework brings to how the client is created is the two lines: + +```go +lis, _ := net.Listen("tcp", addr) +mgr.AddClientServer(lis, lis.Addr()) +``` + +The first line creates a listener and the second creates a client-side server. This is necessary in order to accept replies from servers not added to the Gorums configuration. + +Next we can create a function on the `Client` that can be used to invoke broadcasts on the configuration: + +```go +func (c *Client) Broadcast(ctx context.Context, value string) (*pb.Message, error) { + req := &pb.Message{ + Data: value, + } + return c.config.Broadcast(ctx, req) +} +``` + +To be able to collect responses it is also necessary to create a quorum function. When generating the proto files, Gorums will create a QuorumSpec interface containing all quorum functions. In our example this QuorumSpec is generated: + +```go +// QuorumSpec is the interface of quorum functions for ReliableBroadcast. +type QuorumSpec interface { + gorums.ConfigOption + + BroadcastQF(in *Message, replies []*Message) (*Message, bool) +} +``` + +We can then proceed to implement the interface by creating a struct named QSpec that contains all the methods in the QuorumSpec: + +```go +type QSpec struct { + quorumSize int +} + +func NewQSpec(qSize int) pb.QuorumSpec { + return &QSpec{ + quorumSize: qSize, + } +} + +func (qs *QSpec) BroadcastQF(in *pb.Message, replies []*pb.Message) (*pb.Message, bool) { + if len(replies) < qs.quorumSize { + return nil, false + } + return replies[0], true +} +``` + +This `QSpec` struct is used when the Gorums configuration is created. This can be seen in the code example above when we created the client. Here we provide the `NewQSpec` as one of the arguments to the `mgr.NewConfiguration()` function. + +### Server + +To create a server that uses the broadcast functionality we can define a file `server.go` containing the server implementation: + +```go +type Server struct { + *pb.Server + mut sync.Mutex + delivered []*pb.Message + mgr *pb.Manager + addr string + srvAddrs []string +} + +func New(addr string, srvAddrs []string) *Server { + lis, _ := net.Listen("tcp", s.addr) + srv := Server{ + Server: pb.NewServer(gorums.WithListenAddr(lis.Addr())), + addr: addr, + srvAddrs: srvAddrs, + delivered: make([]*pb.Message, 0), + } + srv.configureView() + pb.RegisterReliableBroadcastServer(srv.Server, &srv) + go srv.Serve(lis) + return &srv +} +``` + +The first addition by the broadcast framework when creating the server is that we need to provide the option `gorums.WithListenAddr(lis.Addr())`. This is important because the address of the server is used in the messages sent by the server. Furthermore, we also invoke a function named `configureView()`: + +```go +func (srv *Server) configureView() { + srv.mgr = pb.NewManager( + gorums.WithGrpcDialOptions( + grpc.WithTransportCredentials(insecure.NewCredentials()), + ), + ) + view, err := srv.mgr.NewConfiguration(gorums.WithNodeList(srv.srvAddrs)) + if err != nil { + panic(err) + } + srv.SetView(view) +} +``` + +By creating a Gorums configuration and providing it to the generated method `SetView()` we enable server-to-server communication. We use the term `view` when refering to the Gorums configuration on the server side to distinguish it from the configuration created on the client-side. + +When we have created the server, we can define the server handlers: + +```go +func (s *Server) Broadcast(ctx gorums.ServerCtx, request *pb.Message, broadcast *pb.Broadcast) { + broadcast.Deliver(request) +} + +func (s *Server) Deliver(ctx gorums.ServerCtx, request *pb.Message, broadcast *pb.Broadcast) { + s.mut.Lock() + defer s.mut.Unlock() + if !s.isDelivered(request) { + s.delivered = append(s.delivered, request) + broadcast.Deliver(request) + broadcast.SendToClient(request, nil) + } +} + +func (s *Server) isDelivered(message *pb.Message) bool { + for _, msg := range s.delivered { + if msg.Data == message.Data { + return true + } + } + return false +} +``` + +The server handler signatures have changed a little, as evident from the code. The broadcast framework removes the return types and introduces a new argument named `broadcast`. This struct is the main interface for interacting with the broadcast framework. Each RPC method in the protofile with the option `gorums.broadcast = true` will be generated on the `broadcast struct`. + +The server handler `Broadcast` is quite simple and only contains a single invocation `broadcast.Deliver(request)`. This invocation will broadcast the request to all servers added to the view. + +The next server handler `Deliver` first checks whether the request has already been delivered. If not, it broadcasts `Deliver` to the other servers with the request and sends a reply to the client. + +The broadcast framework handles issues related to late client messages, duplicated broadcasts, and routing of messages. Hence, this is a complete code example that is correct according to the [description of the algorithm](https://link.springer.com/book/10.1007/978-3-642-15260-3). + +## Options + +We have implemented a set of options that can be used to configure the broadcasting functionality. These will be presented in short here: + +#### Broadcast Server + +- `WithShardBuffer(shardBuffer) ServerOption`: Enables the user to specify the buffer size of each shard. A shard stores a map of broadcast requests. A higher buffer size may increase throughput but at the cost of higher memory consumption. The default is 200 broadcast requests. +- `WithSendBuffer(sendBuffer) ServerOption`: Enables the user to specify the buffer size of the communication channels to the broadcast processor. A higher buffer size may increase throughput but at the cost of higher memory consumption. The default is 30 messages. +- `WithBroadcastReqTTL(ttl) ServerOption`: Configures the duration a broadcast request should live on a server, setting the lifetime of a broadcast processor. The default is 5 minutes. + +#### Broadcasting + +- `WithSubset(srvAddrs) BroadcastOption`: Allows the user to specify a subset of servers to broadcast to. The server addresses given must be a subset of the addresses in the server view. +- `WithoutSelf() BroadcastOption`: Prevents the server from broadcasting to itself. +- `AllowDuplication() BroadcastOption`: Allows the user to broadcast to the same RPC method more than once for a particular broadcast request. + +#### Identification + +- `WithMachineID(id) ManagerOption`: Enables the user to set a unique ID for the client. This ID will be embedded in broadcast requests sent from the client, making the requests trackable by the whole cluster. A random ID will be generated if not set, which can cause collisions if there are many clients. The ID is bounded between 0 and 4095. +- `WithSrvID(id) ServerOption`: Enables the user to set a unique ID on the broadcast server. This ID is used to generate BroadcastIDs. +- `WithListenAddr(addr) ServerOption`: Sets the IP address of the broadcast server, which will be used in messages sent by the server. The network of the address must be a TCP network name. + +#### Connection + +- `WithSendRetries(maxRetries) ManagerOption`: Allows the user to specify how many times the node will try to send a message. The message will be dropped if it fails to send more than the specified number of times. Providing `maxRetries = -1` will retry indefinitely. +- `WithConnRetries(maxRetries) ManagerOption`: Allows the user to specify how many times the node will try to reconnect to a node. The default is no limit, but it will follow a backoff strategy. +- `WithClientDialTimeout(timeout) ServerOption`: Enables the user to set a dial timeout for servers when sending replies back to the client in a BroadcastCall. The default is 10 seconds. +- `WithServerGrpcDialOptions(opts) ServerOption`: Enables the user to set gRPC dial options that the Broadcast Router uses when connecting to a client. + +#### Logging + +- `WithLogger(logger) ManagerOption`: Enables the user to provide a structured logger for the Manager. This will log events regarding the creation of nodes and the transmission of messages. +- `WithSLogger(logger) ServerOption`: Enables the user to set a structured logger for the Server. This will log internal events regarding broadcast requests. + +#### Authentication + +- `WithAllowList(allowed) ServerOption`: Enables the user to provide a list of (address, publicKey) pairs which will be used to validate messages. Only nodes on the allow list are permitted to send messages to the server, and the server is only allowed to send replies to nodes on the allow list. +- `EnforceAuthentication() ServerOption`: Requires that messages are signed and validated; otherwise, the server will drop them. +- `WithAuthentication() ManagerOption`: Enables digital signatures for messages. + +#### Execution Ordering + +- `WithOrder(method_1, ..., method_n) ServerOption`: Enables the user to specify the order in which methods should be executed. This option does not order messages but caches messages meant for processing at a later stage. For example, in PBFT, it caches all commit messages if the state is not prepared yet. +- `ProgressTo(method_i) BroadcastOption`: Allows the server to accept messages for the given method or for methods prior in the execution order. diff --git a/broadcast/dtos/dtos.go b/broadcast/dtos/dtos.go index bb30fac1..0877c9b5 100644 --- a/broadcast/dtos/dtos.go +++ b/broadcast/dtos/dtos.go @@ -1,3 +1,4 @@ +// Package dtos implements all data transfer objects used from outside the broadcast implementation context. package dtos import ( @@ -6,15 +7,18 @@ import ( "time" ) +// Msg defines the message sent from a server to another server or client. The messages should be sent by the router. type Msg interface { GetBroadcastID() uint64 GetMethod() string String() string } +// BroadcastMsg is a data transfer object of a message received by another server or client. type BroadcastMsg struct { - Ctx context.Context - Options BroadcastOptions + Ctx context.Context + Options BroadcastOptions + // The address of the client or server that originated the broadcast request OriginAddr string Info Info } @@ -31,8 +35,10 @@ func (msg *BroadcastMsg) String() string { return "broadcast" } +// ReplyMsg is similar to BroadcastMsg, but is strictly used for replying to a client. type ReplyMsg struct { - Info Info + Info Info + // The address of the client that originated the broadcast request ClientAddr string Err error } @@ -49,6 +55,8 @@ func (r *ReplyMsg) String() string { return "reply" } +// Info contains data pertaining to the current message such as routing information, contents, and which server handler +// should receive the message. type Info struct { Message protoreflect.ProtoMessage BroadcastID uint64 @@ -60,12 +68,15 @@ type Info struct { OriginPubKey string } +// Client is a data structure used when sending a reply to a client. type Client struct { Addr string SendMsg func(timeout time.Duration, dto *ReplyMsg) error Close func() error } +// BroadcastOptions is used to configure a particular broadcast, e.g. by only broadcasting to a subset of the servers in +// a view. type BroadcastOptions struct { ServerAddresses []string AllowDuplication bool diff --git a/broadcast/errors/errors.go b/broadcast/errors/errors.go index b25967c5..468d61ce 100644 --- a/broadcast/errors/errors.go +++ b/broadcast/errors/errors.go @@ -1,57 +1,49 @@ -package broadcastErrors +package errors -type BroadcastIDErr struct{} +// IDErr should be used when a message with a BroadcastID is sent to a broadcast processor with another BroadcastID. This +// can happen if a user deliberately changes the BroadcastID of a message. +type IDErr struct{} -func (err BroadcastIDErr) Error() string { - return "wrong broadcastID" +func (err IDErr) Error() string { + return "broadcast: wrong ID" } +// MissingClientReqErr signifies that a server tries to reply to a client, but has not yet received the original request +// form the client. This is especially important when the message does not contain routing information, such as in QuorumCalls. type MissingClientReqErr struct{} func (err MissingClientReqErr) Error() string { - return "has not received client req yet" + return "broadcast: has not received client req yet" } +// AlreadyProcessedErr is used when a message is received after the broadcast processor has stopped. This means that the +// server has sent a reply to the client and thus the incoming message needs not be processed. type AlreadyProcessedErr struct{} func (err AlreadyProcessedErr) Error() string { - return "already processed request" -} - -type ReqFinishedErr struct{} - -func (err ReqFinishedErr) Error() string { - return "request has terminated" + return "broadcast: already processed request" } +// ClientReqAlreadyReceivedErr should be used when a duplicate client request is received. type ClientReqAlreadyReceivedErr struct{} func (err ClientReqAlreadyReceivedErr) Error() string { - return "the client req has already been received. The forward req is thus dropped." -} - -type MissingReqErr struct{} - -func (err MissingReqErr) Error() string { - return "a request has not been created yet." + return "broadcast: client request already received (dropped)" } +// OutOfOrderErr should be used when the preserve ordering configuration option is used and a message is received out of +// order. type OutOfOrderErr struct{} func (err OutOfOrderErr) Error() string { - return "the message is out of order" -} - -type ShardDownErr struct{} - -func (err ShardDownErr) Error() string { - return "the shard is down" + return "broadcast: the message is out of order" } +// InvalidAddrErr should be used when an invalid server/client address is provided. type InvalidAddrErr struct { Addr string } func (err InvalidAddrErr) Error() string { - return "provided Addr is invalid. got: " + err.Addr + return "broadcast: provided Addr is invalid. got: " + err.Addr } diff --git a/broadcast/manager.go b/broadcast/manager.go new file mode 100644 index 00000000..eb104970 --- /dev/null +++ b/broadcast/manager.go @@ -0,0 +1,155 @@ +package broadcast + +import ( + "context" + "github.com/relab/gorums/broadcast/dtos" + "github.com/relab/gorums/broadcast/processor" + "github.com/relab/gorums/broadcast/router" + "github.com/relab/gorums/broadcast/shard" + "log/slog" + "sync" + "time" + + "google.golang.org/grpc" + "google.golang.org/protobuf/reflect/protoreflect" +) + +type Manager struct { + router router.Router + logger *slog.Logger + + mut sync.Mutex + shardMut sync.RWMutex // RW because we often read and very seldom write to the state + parentCtx context.Context + parentCtxCancelFunc context.CancelFunc + reqTTL time.Duration + sendBuffer int + shardBuffer int + snowflake *Snowflake + order map[string]int + shards []*shard.Shard + numShards uint16 +} + +type ManagerConfig struct { + ID uint32 + Addr string + MachineID uint64 + Logger *slog.Logger + CreateClient func(addr string, dialOpts []grpc.DialOption) (*dtos.Client, error) + Order map[string]int + DialTimeout time.Duration + ReqTTL time.Duration + ShardBuffer int + SendBuffer int + AllowList map[string]string + DialOpts []grpc.DialOption + NumShards uint16 +} + +func NewBroadcastManager(config *ManagerConfig) *Manager { + routerConfig := &router.Config{ + ID: config.ID, + Addr: config.Addr, + Logger: config.Logger, + CreateClient: config.CreateClient, + DialTimeout: config.DialTimeout, + AllowList: config.AllowList, + DialOpts: config.DialOpts, + } + r := router.NewRouter(routerConfig) + ctx, cancel := context.WithCancel(context.Background()) + mgr := &Manager{ + router: r, + logger: config.Logger, + parentCtx: ctx, + parentCtxCancelFunc: cancel, + reqTTL: config.ReqTTL, + sendBuffer: config.SendBuffer, + shardBuffer: config.ShardBuffer, + order: config.Order, + numShards: config.NumShards, + snowflake: NewSnowflake(config.MachineID), + } + mgr.initShards() + return mgr +} + +func (m *Manager) initShards() { + m.shards = make([]*shard.Shard, m.numShards) + for i := uint16(0); i < m.numShards; i++ { + shardConfig := &shard.Config{ + Id: i, + ParentCtx: m.parentCtx, + ShardBuffer: m.shardBuffer, + SendBuffer: m.sendBuffer, + ReqTTL: m.reqTTL, + Router: m.router, + Order: m.order, + Logger: m.logger, + } + m.shards[i] = shard.New(shardConfig) + } +} + +func (m *Manager) Process(msg *processor.RequestDto) { + _, shardID, _, _ := DecodeBroadcastID(msg.BroadcastID) + shardID = shardID % m.numShards + s := m.shards[shardID] + s.HandleMsg(msg) +} + +func (m *Manager) Broadcast(broadcastID uint64, req protoreflect.ProtoMessage, method string, enqueueBroadcast processor.EnqueueMsg, opts ...dtos.BroadcastOptions) error { + var options dtos.BroadcastOptions + if len(opts) > 0 { + options = opts[0] + } + return enqueueBroadcast( + &dtos.BroadcastMsg{ + Info: dtos.Info{ + BroadcastID: broadcastID, + Message: req, + Method: method, + }, + Options: options, + }, + ) +} + +func (m *Manager) SendToClient(broadcastID uint64, resp protoreflect.ProtoMessage, err error, enqueueMsg processor.EnqueueMsg) error { + return enqueueMsg( + &dtos.ReplyMsg{ + Info: dtos.Info{ + BroadcastID: broadcastID, + Message: resp, + }, + Err: err, + }, + ) +} + +func (m *Manager) NewBroadcastID() uint64 { + return m.snowflake.NewBroadcastID() +} + +func (m *Manager) AddHandler(method string, handler any) { + m.router.AddHandler(method, handler) +} + +func (m *Manager) Close() error { + m.mut.Lock() + defer m.mut.Unlock() + if m.logger != nil { + m.logger.Debug("broadcast: closing state") + } + m.parentCtxCancelFunc() + return m.router.Close() +} + +func (m *Manager) ResetState() { + m.parentCtxCancelFunc() + m.mut.Lock() + m.parentCtx, m.parentCtxCancelFunc = context.WithCancel(context.Background()) + m.initShards() + m.shardMut.Unlock() +} diff --git a/broadcast/snowflake.go b/broadcast/snowflake.go new file mode 100644 index 00000000..9ee7c4ec --- /dev/null +++ b/broadcast/snowflake.go @@ -0,0 +1,79 @@ +package broadcast + +import ( + "math/rand" + "sync" + "time" +) + +type Snowflake struct { + mut sync.Mutex + MachineID uint64 + SequenceNum uint64 + lastT uint64 + lastS uint64 + epoch time.Time +} + +const ( + MaxMachineID = uint16(1 << 12) + maxShard = uint8(1 << 4) + maxSequenceNum = uint32(1 << 18) + bitMaskTimestamp = uint64((1<<30)-1) << 34 + bitMaskShardID = uint64((1<<4)-1) << 30 + bitMaskMachineID = uint64((1<<12)-1) << 18 + bitMaskSequenceNum = uint64((1 << 18) - 1) + epoch = "2024-01-01T00:00:00" +) + +func Epoch() time.Time { + timestamp, _ := time.Parse("2006-01-02T15:04:05", epoch) + return timestamp +} + +func NewSnowflake(id uint64) *Snowflake { + if id >= uint64(MaxMachineID) { + id = uint64(rand.Int31n(int32(MaxMachineID))) + } + return &Snowflake{ + MachineID: id, + SequenceNum: 0, + epoch: Epoch(), + } +} + +func (s *Snowflake) NewBroadcastID() uint64 { + // timestamp: 30 bit -> seconds since 01.01.2024 + // shardID: 4 bit -> 16 different shards + // machineID: 12 bit -> 4096 clients + // sequenceNum: 18 bit -> 262 144 messages +start: + s.mut.Lock() + timestamp := uint64(time.Since(s.epoch).Seconds()) + l := (s.SequenceNum + 1) % uint64(maxSequenceNum) + if timestamp-s.lastT <= 0 && l == s.lastS { + s.mut.Unlock() + time.Sleep(10 * time.Millisecond) + goto start + } + if timestamp > s.lastT { + s.lastT = timestamp + s.lastS = l + } + s.SequenceNum = l + s.mut.Unlock() + + t := (timestamp << 34) & bitMaskTimestamp + shard := (uint64(rand.Int31n(int32(maxShard))) << 30) & bitMaskShardID + m := s.MachineID << 18 & bitMaskMachineID + n := l & bitMaskSequenceNum + return t | shard | m | n +} + +func DecodeBroadcastID(broadcastID uint64) (timestamp uint32, shardID uint16, machineID uint16, sequenceNo uint32) { + t := (broadcastID & bitMaskTimestamp) >> 34 + shard := (broadcastID & bitMaskShardID) >> 30 + m := (broadcastID & bitMaskMachineID) >> 18 + n := broadcastID & bitMaskSequenceNum + return uint32(t), uint16(shard), uint16(m), uint32(n) +} diff --git a/broadcast/snowflake_test.go b/broadcast/snowflake_test.go new file mode 100644 index 00000000..b39dadc4 --- /dev/null +++ b/broadcast/snowflake_test.go @@ -0,0 +1,48 @@ +package broadcast + +import ( + "testing" +) + +func TestBroadcastID(t *testing.T) { + if MaxMachineID != 4096 { + t.Errorf("maxMachineID is hardcoded in test. want: %v, got: %v", 4096, MaxMachineID) + } + if maxSequenceNum != 262144 { + t.Errorf("maxSequenceNum is hardcoded in test. want: %v, got: %v", 262144, maxSequenceNum) + } + if maxShard != 16 { + t.Errorf("maxShard is hardcoded in test. want: %v, got: %v", 16, maxShard) + } + // intentionally provide an illegal machineID. A random machineID should be given instead. + snowflake := NewSnowflake(8000) + machineID := snowflake.MachineID + timestampDistribution := make(map[uint32]int) + maxN := 262144 // = 2^18 + for j := 1; j < 3*maxN; j++ { + i := j % maxN + broadcastID := snowflake.NewBroadcastID() + timestamp, shard, m, n := DecodeBroadcastID(broadcastID) + if i != int(n) { + t.Errorf("wrong sequence number. want: %v, got: %v", i, n) + } + if m >= 4096 { + t.Errorf("machine ID cannot be higher than max. want: %v, got: %v", 4095, m) + } + if m != uint16(machineID) { + t.Errorf("wrong machine ID. want: %v, got: %v", machineID, m) + } + if shard >= 16 { + t.Errorf("cannot have higher shard than max. want: %v, got: %v", 15, shard) + } + if n >= uint32(maxN) { + t.Errorf("sequence number cannot be higher than max. want: %v, got: %v", maxN, n) + } + timestampDistribution[timestamp]++ + } + for k, v := range timestampDistribution { + if v > maxN { + t.Errorf("cannot have more than maxN in a second. want: %v, got: %v", maxN, k) + } + } +}