Skip to content

Commit

Permalink
feat(broadcast): almost done ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
aleksander-vedvik committed May 9, 2024
1 parent 78fd91e commit e7ab4e1
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 13 deletions.
6 changes: 6 additions & 0 deletions broadcast/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,9 @@ type AlreadyProcessedErr struct{}
func (err AlreadyProcessedErr) Error() string {
return "already processed request"
}

type OutOfOrderErr struct{}

func (err OutOfOrderErr) Error() string {
return "the message is out of order"
}
63 changes: 56 additions & 7 deletions broadcast/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@ type BroadcastRequest struct {
cancellationCtx context.Context
cancellationCtxCancel context.CancelFunc // should only be called by the shard
sentCancellation bool

executionOrder []string
orderIndex int
outOfOrderMsgs map[string][]Content
}

// func (req *BroadcastRequest) handle(router *BroadcastRouter, broadcastID uint64, msg Content, metrics *Metric) {
func (req *BroadcastRequest) handle(router Router, broadcastID uint64, msg Content) {
sent := false
methods := make([]string, 0, 3)
req.initOrder()
var respErr error
var respMsg protoreflect.ProtoMessage
// connect to client immediately to potentially save some time
Expand Down Expand Up @@ -51,6 +56,7 @@ func (req *BroadcastRequest) handle(router Router, broadcastID uint64, msg Conte
continue
}*/
methods = append(methods, bMsg.Method)
req.updateOrder(bMsg.Method)
} else {
// BroadcastCall if origin addr is non-empty.
if msg.isBroadcastCall() {
Expand Down Expand Up @@ -104,18 +110,21 @@ func (req *BroadcastRequest) handle(router Router, broadcastID uint64, msg Conte
// direct connection to the client, e.g. QuorumCall.
if sent && !msg.isBroadcastCall() {
err := msg.send(respMsg, respErr)
if err != nil {
new.ReceiveChan <- shardResponse{
err: err,
}
return
if err == nil {
err = AlreadyProcessedErr{}
}
//new.ReceiveChan <- errors.New("req is done and should be returned immediately to client")
new.ReceiveChan <- shardResponse{
err: AlreadyProcessedErr{},
err: err,
}
return
}
if !req.isInOrder(new.CurrentMethod) {
// save the message and execute it later
req.addToOutOfOrder(new)
new.ReceiveChan <- shardResponse{
err: OutOfOrderErr{},
}
}
new.ReceiveChan <- shardResponse{
err: nil,
reqCtx: req.cancellationCtx,
Expand All @@ -140,3 +149,43 @@ func (c *Content) isBroadcastCall() bool {
func (c *Content) hasReceivedClientRequest() bool {
return c.IsBroadcastClient && c.SendFn != nil
}

func (r *BroadcastRequest) initOrder() {
// the implementer has not specified an execution order
if r.executionOrder == nil || len(r.executionOrder) <= 0 {
return
}
r.outOfOrderMsgs = make(map[string][]Content)
}

func (r *BroadcastRequest) isInOrder(method string) bool {
// the implementer has not specified an execution order
if r.executionOrder == nil || len(r.executionOrder) <= 0 {
return true
}
// the first method should always be allowed to be executed
if r.executionOrder[0] == method {
return true
}

return false
}

func (r *BroadcastRequest) addToOutOfOrder(msg Content) {

}

func (r *BroadcastRequest) updateOrder(method string) {
// the implementer has not specified an execution order
if r.executionOrder == nil || len(r.executionOrder) <= 0 {
return
}
for i, m := range r.executionOrder {
if m == method {
if i > r.orderIndex {
r.orderIndex = i
}
return
}
}
}
3 changes: 2 additions & 1 deletion broadcast/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,12 @@ type Content struct {
IsCancellation bool
OriginAddr string
OriginMethod string
CurrentMethod string
ReceiveChan chan shardResponse
SendFn func(resp protoreflect.ProtoMessage, err error)
Ctx context.Context
CancelCtx context.CancelFunc
Run func()
Run func(context.Context)
}

func (c Content) send(resp protoreflect.ProtoMessage, err error) error {
Expand Down
14 changes: 9 additions & 5 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,32 +56,36 @@ func BroadcastHandler[T RequestTypes, V Broadcaster](impl implementationFunc[T,
broadcaster := srv.broadcastSrv.createBroadcaster(broadcastMetadata, srv.broadcastSrv.orchestrator).(V)
// due to ordering we wrap the actual implementation function to be able to
// run it at a later time.
run := func() {
run := func(reqCtx context.Context) {
// we need to pass in the reqCtx because we can only retrieve
// it after we have gotten a response from the shard. The reqCtx
// is used for cancellations.
ctx.Context = reqCtx
impl(ctx, req, broadcaster)
}

msg := broadcast.Content{}
createRequest(&msg, ctx, in, finished, run)

var err error
// we are not interested in the server context as this is tied to the previous hop.
// instead we want to check whether the client has cancelled the broadcast request
// and if so, we return a cancelled context. This enables the implementer to listen
// for cancels and do proper actions.
err, ctx.Context = srv.broadcastSrv.manager.Process(msg)
err, reqCtx := srv.broadcastSrv.manager.Process(msg)
if err != nil {
return
}

run()
run(reqCtx)
}
}

func createRequest(msg *broadcast.Content, ctx ServerCtx, in *Message, finished chan<- *Message, run func()) {
func createRequest(msg *broadcast.Content, ctx ServerCtx, in *Message, finished chan<- *Message, run func(context.Context)) {
msg.BroadcastID = in.Metadata.BroadcastMsg.BroadcastID
msg.IsBroadcastClient = in.Metadata.BroadcastMsg.IsBroadcastClient
msg.OriginAddr = in.Metadata.BroadcastMsg.OriginAddr
msg.OriginMethod = in.Metadata.BroadcastMsg.OriginMethod
msg.CurrentMethod = in.Metadata.Method
msg.Ctx = ctx.Context
msg.Run = run
if msg.OriginAddr == "" && msg.IsBroadcastClient {
Expand Down

0 comments on commit e7ab4e1

Please sign in to comment.