Skip to content

feat(messenger) #126

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package epaxos

import (
"github.com/go-distributed/epaxos/message"
)

// A codec interface that defines what a codec should implement.
// A codec should be able to marshal/unmarshal through the given
// connection.
type Codec interface {
// Init a codec.
Initial() error

// Marshal a message into bytes.
Marshal(msg message.Message) ([]byte, error)

// Unmarshal a message from bytes.
Unmarshal(data []byte) (message.Message, error)

// Stop a codec.
Stop() error

// Destroy a codec, release the resource.
Destroy() error
}
101 changes: 101 additions & 0 deletions codec/gogoprotobufcodec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package codec

import (
"bytes"
"fmt"

"github.com/go-distributed/epaxos/message"
"github.com/golang/glog"
)

// The gogoprotobuf codec.
type GoGoProtobufCodec struct{}

// Create a new gogpprotobuf codec.
func NewGoGoProtobuCodec() (*GoGoProtobufCodec, error) {
return &GoGoProtobufCodec{}, nil
}

// Initial the gogoprotobuf codec (no-op for now).
func (gc *GoGoProtobufCodec) Initial() error {
return nil
}

// Stop the gogoprotobuf codec (no-op for now).
func (gc *GoGoProtobufCodec) Stop() error {
return nil
}

// Destroy the gogoprotobuf codec (no-op for now).
func (gc *GoGoProtobufCodec) Destroy() error {
return nil
}

// Marshal a message into a byte slice.
func (gc *GoGoProtobufCodec) Marshal(msg message.Message) ([]byte, error) {
b, err := msg.MarshalProtobuf()
if err != nil {
glog.Warning("GoGoProtobufCodec: Failed to Marshal: ", err)
return nil, err
}

// Use bytes.Buffer to write efficiently.
var buf bytes.Buffer
if err = buf.WriteByte(byte(msg.Type())); err != nil {
glog.Warning("GoGoProtobufCodec: Failed to Marshal: ", err)
return nil, err
}

n, err := buf.Write(b)
if err != nil || n != len(b) {
glog.Warning("GoGoProtobufCodec: Failed to Marshal: ", err)
return nil, err
}
return buf.Bytes(), nil
}

// Unmarshal a message from a byte slice.
func (c *GoGoProtobufCodec) Unmarshal(data []byte) (message.Message, error) {
var msg message.Message

buf := bytes.NewBuffer(data)
bt, err := buf.ReadByte()
if err != nil {
glog.Warning("GoGoProtobufCodec: Failed to Unmarshal: ", err)
return nil, err
}

mtype := message.MsgType(bt)
switch mtype {
case message.ProposeMsg:
msg = new(message.Propose)
case message.PreAcceptMsg:
msg = new(message.PreAccept)
case message.PreAcceptOkMsg:
msg = new(message.PreAcceptOk)
case message.PreAcceptReplyMsg:
msg = new(message.PreAcceptReply)
case message.AcceptMsg:
msg = new(message.Accept)
case message.AcceptReplyMsg:
msg = new(message.AcceptReply)
case message.CommitMsg:
msg = new(message.Commit)
case message.PrepareMsg:
msg = new(message.Prepare)
case message.PrepareReplyMsg:
msg = new(message.PrepareReply)
default:
err := fmt.Errorf("Unknown message type %s\n", message.TypeToString(mtype))
glog.Warning("GoGoProtobufCodec: Failed to Unmarshal: ", err)
return nil, err
}

// TODO(yifan): Move this from the message package
// to the protobuf package.
if err := msg.UnmarshalProtobuf(buf.Bytes()); err != nil {
glog.Warning("GoGoProtobufCodec: Failed to Unmarshal: ", err)
return nil, err
}
return msg, nil
}
106 changes: 92 additions & 14 deletions message/accept.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package message

import (
"fmt"

"github.com/go-distributed/epaxos/protobuf"
"github.com/golang/glog"
)

type Accept struct {
Expand All @@ -11,59 +14,134 @@ type Accept struct {
Deps Dependencies
Ballot *Ballot
From uint8
pb protobuf.Accept // for protobuf
}

type AcceptReply struct {
ReplicaId uint8
InstanceId uint64
Ballot *Ballot
From uint8
pb protobuf.AcceptReply // for protobuf
}

func (a *Accept) Sender() uint8 {
return a.From
}

func (a *Accept) Type() uint8 {
func (a *Accept) Type() MsgType {
return AcceptMsg
}

func (a *Accept) Content() interface{} {
return a
}

func (p *Accept) Replica() uint8 {
return p.ReplicaId
func (a *Accept) Replica() uint8 {
return a.ReplicaId
}

func (a *Accept) Instance() uint64 {
return a.InstanceId
}

func (a *Accept) String() string {
return fmt.Sprintf("Accept, Instance[%v][%v], Ballot[%v]",
a.ReplicaId, a.InstanceId, a.Ballot.String())
}

func (p *Accept) Instance() uint64 {
return p.InstanceId
func (a *Accept) MarshalProtobuf() ([]byte, error) {
replicaID := uint32(a.ReplicaId)
instanceID := uint64(a.InstanceId)
from := uint32(a.From)

a.pb.ReplicaID = &replicaID
a.pb.InstanceID = &instanceID
a.pb.Cmds = a.Cmds.ToBytesSlice()
a.pb.Deps = a.Deps
a.pb.Ballot = a.Ballot.ToProtobuf()
a.pb.From = &from

data, err := a.pb.Marshal()
if err != nil {
glog.Warning("Accept: MarshalProtobuf() error: ", err)
return nil, err
}
return data, nil
}

func (p *Accept) String() string {
return fmt.Sprintf("Accept, Instance[%v][%v], Ballot[%v]", p.ReplicaId, p.InstanceId, p.Ballot.String())
func (a *Accept) UnmarshalProtobuf(data []byte) error {
if err := a.pb.Unmarshal(data); err != nil {
glog.Warning("Accept: UnmarshalProtobuf() error: ", err)
return err
}

a.ReplicaId = uint8(a.pb.GetReplicaID())
a.InstanceId = uint64(a.pb.GetInstanceID())
a.Cmds.FromBytesSlice(a.pb.GetCmds())
a.Deps = a.pb.GetDeps()
if a.Ballot == nil {
a.Ballot = new(Ballot)
}
a.Ballot.FromProtobuf(a.pb.GetBallot())
a.From = uint8(a.pb.GetFrom())
return nil
}

func (a *AcceptReply) Sender() uint8 {
return a.From
}

func (a *AcceptReply) Type() uint8 {
func (a *AcceptReply) Type() MsgType {
return AcceptReplyMsg
}

func (a *AcceptReply) Content() interface{} {
return a
}

func (p *AcceptReply) Replica() uint8 {
return p.ReplicaId
func (a *AcceptReply) Replica() uint8 {
return a.ReplicaId
}

func (p *AcceptReply) Instance() uint64 {
return p.InstanceId
func (a *AcceptReply) Instance() uint64 {
return a.InstanceId
}

func (p *AcceptReply) String() string {
return fmt.Sprintf("AcceptReply, Instance[%v][%v], Ballot[%v]", p.ReplicaId, p.InstanceId, p.Ballot.String())
func (a *AcceptReply) String() string {
return fmt.Sprintf("AcceptReply, Instance[%v][%v], Ballot[%v]", a.ReplicaId, a.InstanceId, a.Ballot.String())
}

func (a *AcceptReply) MarshalProtobuf() ([]byte, error) {
replicaID := uint32(a.ReplicaId)
instanceID := uint64(a.InstanceId)
from := uint32(a.From)

a.pb.ReplicaID = &replicaID
a.pb.InstanceID = &instanceID
a.pb.Ballot = a.Ballot.ToProtobuf()
a.pb.From = &from

data, err := a.pb.Marshal()
if err != nil {
glog.Warning("AcceptReply: MarshalProtobuf() error: ", err)
return nil, err
}
return data, nil
}

func (a *AcceptReply) UnmarshalProtobuf(data []byte) error {
if err := a.pb.Unmarshal(data); err != nil {
glog.Warning("AcceptReply: UnmarshalProtobuf() error: ", err)
return err
}

a.ReplicaId = uint8(a.pb.GetReplicaID())
a.InstanceId = uint64(a.pb.GetInstanceID())
if a.Ballot == nil {
a.Ballot = new(Ballot)
}
a.Ballot.FromProtobuf(a.pb.GetBallot())
a.From = uint8(a.pb.GetFrom())
return nil
}
35 changes: 26 additions & 9 deletions message/ballot.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package message

import (
"fmt"

"github.com/go-distributed/epaxos/protobuf"
)

const (
Expand All @@ -25,9 +27,9 @@ type Ballot struct {

func NewBallot(epoch uint32, number uint64, replicId uint8) *Ballot {
return &Ballot{
epoch,
number,
replicId,
Epoch: epoch,
Number: number,
ReplicaId: replicId,
}
}

Expand Down Expand Up @@ -96,9 +98,9 @@ func (b *Ballot) SetReplicaId(rId uint8) {

func (b *Ballot) IncNumClone() *Ballot {
return &Ballot{
b.Epoch,
b.Number + 1,
b.ReplicaId,
Epoch: b.Epoch,
Number: b.Number + 1,
ReplicaId: b.ReplicaId,
}
}

Expand All @@ -111,12 +113,27 @@ func (b *Ballot) Clone() *Ballot {
panic("")
}
return &Ballot{
b.Epoch,
b.Number,
b.ReplicaId,
Epoch: b.Epoch,
Number: b.Number,
ReplicaId: b.ReplicaId,
}
}

func (b *Ballot) String() string {
return fmt.Sprintf("%v.%v.%v", b.Epoch, b.Number, b.ReplicaId)
}

func (b *Ballot) ToProtobuf() *protobuf.Ballot {
epoch, number, replicaID := b.Epoch, b.Number, uint32(b.ReplicaId)
return &protobuf.Ballot{
Epoch: &epoch,
Number: &number,
ReplicaID: &replicaID,
}
}

func (b *Ballot) FromProtobuf(pballot *protobuf.Ballot) {
b.Epoch = uint32(*pballot.Epoch)
b.Number = uint64(*pballot.Number)
b.ReplicaId = uint8(*pballot.ReplicaID)
}
17 changes: 17 additions & 0 deletions message/ballot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,20 @@ func TestBallotSetNumber(t *testing.T) {
b.SetNumber(4)
assert.Equal(t, b.GetNumber(), uint64(4))
}

func TestToProtobuf(t *testing.T) {
b := NewBallot(1, 2, 3)
pb := b.ToProtobuf()
assert.Equal(t, *pb.Epoch, uint32(1))
assert.Equal(t, *pb.Number, uint64(2))
assert.Equal(t, *pb.ReplicaID, uint8(3))
}

func TestFromProtobuf(t *testing.T) {
b := NewBallot(1, 2, 3)
pb := b.ToProtobuf()

var bb Ballot
bb.FromProtobuf(pb)
assert.Equal(t, *b, bb)
}
Loading