From 2993114a059cf750a330765991cd11e559605a41 Mon Sep 17 00:00:00 2001 From: Yifan Gu Date: Sat, 31 May 2014 00:49:58 -0700 Subject: [PATCH 1/3] feat(messenger) Added messenger interfaces and a GoGoProtobuf over HTTP messenger. Added GoGoprotobuf Codec. Added HTTP transporter. --- codec.go | 22 ++++ codec/gogoprotobufcodec.go | 59 ++++++++++ message/accept.go | 102 +++++++++++++++--- message/ballot.go | 35 ++++-- message/ballot_test.go | 17 +++ message/command.go | 27 ++++- message/command_test.go | 22 ++++ message/commit.go | 37 +++++++ message/data_test.go | 172 +++++++++++++++++++++++++++++ message/message.go | 28 +++++ message/pre_accept.go | 109 +++++++++++++++++++ message/prepare.go | 87 +++++++++++++++ message/propose.go | 8 ++ message/timeout.go | 8 ++ messenger.go | 147 +++++++++++++++++++++++++ protobuf/message.pb.go | 46 ++++---- protobuf/message.proto | 2 +- transporter.go | 32 +++--- transporter/http_transporter.go | 184 ++++++++++++++++++++++++++++++++ 19 files changed, 1081 insertions(+), 63 deletions(-) create mode 100644 codec.go create mode 100644 codec/gogoprotobufcodec.go create mode 100644 messenger.go create mode 100644 transporter/http_transporter.go diff --git a/codec.go b/codec.go new file mode 100644 index 0000000..d63da3c --- /dev/null +++ b/codec.go @@ -0,0 +1,22 @@ +package epaxos + +import ( + "github.com/go-distributed/epaxos/message" +) + +// A codec interface that defines what a codec should implement. +// A codec should be able to marshal/unmarshal through the given +// connection. +type Codec interface { + // Init a codec. + Initial() error + + // Marshal a message into bytes. + Marshal(msg message.Message) ([]byte, error) + + // Unmarshal a message from bytes. + Unmarshal(mtype uint8, data []byte) (message.Message, error) + + // Destroy a codec, release the resource. + Destroy() error +} diff --git a/codec/gogoprotobufcodec.go b/codec/gogoprotobufcodec.go new file mode 100644 index 0000000..b440e7c --- /dev/null +++ b/codec/gogoprotobufcodec.go @@ -0,0 +1,59 @@ +package codec + +import ( + "github.com/go-distributed/epaxos/message" +) + +type GoGoProtobufCodec struct{} + +func NewGoGoProtobufHTTPTransporter() (*GoGoProtobufCodec, error) { + return new(GoGoProtobufCodec), nil +} + +// Initial the gogoprotobuf (no-op for now). +func (gc *GoGoProtobufCodec) Initial() error { + return nil +} + +// Destroy te gogoprotobuf (no-op for now). +func (gc *GoGoProtobufCodec) Destroy() error { + return nil +} + +// Marshal a message into a byte slice. +func (gc *GoGoProtobufCodec) Marshal(msg message.Message) ([]byte, error) { + return msg.MarshalProtobuf() +} + +// Unmarshal a message from a byte slice. +func (c *GoGoProtobufCodec) Unmarshal(mtype uint8, data []byte) (message.Message, error) { + var msg message.Message + + switch mtype { + case message.ProposeMsg: + msg = new(message.Propose) + case message.PreAcceptMsg: + msg = new(message.PreAccept) + case message.PreAcceptOkMsg: + msg = new(message.PreAccept) + case message.PreAcceptReplyMsg: + msg = new(message.PreAcceptReply) + case message.AcceptMsg: + msg = new(message.Accept) + case message.AcceptReplyMsg: + msg = new(message.AcceptReply) + case message.CommitMsg: + msg = new(message.Commit) + case message.PrepareMsg: + msg = new(message.Prepare) + case message.PrepareReplyMsg: + msg = new(message.PrepareReply) + default: + panic("Unknown message type") + } + + if err := msg.UnmarshalProtobuf(data); err != nil { + return nil, err + } + return msg, nil +} diff --git a/message/accept.go b/message/accept.go index e63c249..e62684b 100644 --- a/message/accept.go +++ b/message/accept.go @@ -2,6 +2,9 @@ package message import ( "fmt" + + "github.com/go-distributed/epaxos/protobuf" + "github.com/golang/glog" ) type Accept struct { @@ -11,6 +14,7 @@ type Accept struct { Deps Dependencies Ballot *Ballot From uint8 + pb protobuf.Accept // for protobuf } type AcceptReply struct { @@ -18,6 +22,7 @@ type AcceptReply struct { InstanceId uint64 Ballot *Ballot From uint8 + pb protobuf.AcceptReply // for protobuf } func (a *Accept) Sender() uint8 { @@ -32,16 +37,55 @@ func (a *Accept) Content() interface{} { return a } -func (p *Accept) Replica() uint8 { - return p.ReplicaId +func (a *Accept) Replica() uint8 { + return a.ReplicaId +} + +func (a *Accept) Instance() uint64 { + return a.InstanceId +} + +func (a *Accept) String() string { + return fmt.Sprintf("Accept, Instance[%v][%v], Ballot[%v]", + a.ReplicaId, a.InstanceId, a.Ballot.String()) } -func (p *Accept) Instance() uint64 { - return p.InstanceId +func (a *Accept) MarshalProtobuf() ([]byte, error) { + replicaID := uint32(a.ReplicaId) + instanceID := uint64(a.InstanceId) + from := uint32(a.From) + + a.pb.ReplicaID = &replicaID + a.pb.InstanceID = &instanceID + a.pb.Cmds = a.Cmds.ToBytesSlice() + a.pb.Deps = a.Deps + a.pb.Ballot = a.Ballot.ToProtobuf() + a.pb.From = &from + + data, err := a.pb.Marshal() + if err != nil { + glog.Warning("Accept: MarshalProtobuf() error: ", err) + return nil, err + } + return data, nil } -func (p *Accept) String() string { - return fmt.Sprintf("Accept, Instance[%v][%v], Ballot[%v]", p.ReplicaId, p.InstanceId, p.Ballot.String()) +func (a *Accept) UnmarshalProtobuf(data []byte) error { + if err := a.pb.Unmarshal(data); err != nil { + glog.Warning("Accept: UnmarshalProtobuf() error: ", err) + return err + } + + a.ReplicaId = uint8(a.pb.GetReplicaID()) + a.InstanceId = uint64(a.pb.GetInstanceID()) + a.Cmds.FromBytesSlice(a.pb.GetCmds()) + a.Deps = a.pb.GetDeps() + if a.Ballot == nil { + a.Ballot = new(Ballot) + } + a.Ballot.FromProtobuf(a.pb.GetBallot()) + a.From = uint8(a.pb.GetFrom()) + return nil } func (a *AcceptReply) Sender() uint8 { @@ -56,14 +100,48 @@ func (a *AcceptReply) Content() interface{} { return a } -func (p *AcceptReply) Replica() uint8 { - return p.ReplicaId +func (a *AcceptReply) Replica() uint8 { + return a.ReplicaId } -func (p *AcceptReply) Instance() uint64 { - return p.InstanceId +func (a *AcceptReply) Instance() uint64 { + return a.InstanceId } -func (p *AcceptReply) String() string { - return fmt.Sprintf("AcceptReply, Instance[%v][%v], Ballot[%v]", p.ReplicaId, p.InstanceId, p.Ballot.String()) +func (a *AcceptReply) String() string { + return fmt.Sprintf("AcceptReply, Instance[%v][%v], Ballot[%v]", a.ReplicaId, a.InstanceId, a.Ballot.String()) +} + +func (a *AcceptReply) MarshalProtobuf() ([]byte, error) { + replicaID := uint32(a.ReplicaId) + instanceID := uint64(a.InstanceId) + from := uint32(a.From) + + a.pb.ReplicaID = &replicaID + a.pb.InstanceID = &instanceID + a.pb.Ballot = a.Ballot.ToProtobuf() + a.pb.From = &from + + data, err := a.pb.Marshal() + if err != nil { + glog.Warning("AcceptReply: MarshalProtobuf() error: ", err) + return nil, err + } + return data, nil +} + +func (a *AcceptReply) UnmarshalProtobuf(data []byte) error { + if err := a.pb.Unmarshal(data); err != nil { + glog.Warning("AcceptReply: UnmarshalProtobuf() error: ", err) + return err + } + + a.ReplicaId = uint8(a.pb.GetReplicaID()) + a.InstanceId = uint64(a.pb.GetInstanceID()) + if a.Ballot == nil { + a.Ballot = new(Ballot) + } + a.Ballot.FromProtobuf(a.pb.GetBallot()) + a.From = uint8(a.pb.GetFrom()) + return nil } diff --git a/message/ballot.go b/message/ballot.go index 764db77..afef5ee 100644 --- a/message/ballot.go +++ b/message/ballot.go @@ -2,6 +2,8 @@ package message import ( "fmt" + + "github.com/go-distributed/epaxos/protobuf" ) const ( @@ -25,9 +27,9 @@ type Ballot struct { func NewBallot(epoch uint32, number uint64, replicId uint8) *Ballot { return &Ballot{ - epoch, - number, - replicId, + Epoch: epoch, + Number: number, + ReplicaId: replicId, } } @@ -96,9 +98,9 @@ func (b *Ballot) SetReplicaId(rId uint8) { func (b *Ballot) IncNumClone() *Ballot { return &Ballot{ - b.Epoch, - b.Number + 1, - b.ReplicaId, + Epoch: b.Epoch, + Number: b.Number + 1, + ReplicaId: b.ReplicaId, } } @@ -111,12 +113,27 @@ func (b *Ballot) Clone() *Ballot { panic("") } return &Ballot{ - b.Epoch, - b.Number, - b.ReplicaId, + Epoch: b.Epoch, + Number: b.Number, + ReplicaId: b.ReplicaId, } } func (b *Ballot) String() string { return fmt.Sprintf("%v.%v.%v", b.Epoch, b.Number, b.ReplicaId) } + +func (b *Ballot) ToProtobuf() *protobuf.Ballot { + epoch, number, replicaID := b.Epoch, b.Number, uint32(b.ReplicaId) + return &protobuf.Ballot{ + Epoch: &epoch, + Number: &number, + ReplicaID: &replicaID, + } +} + +func (b *Ballot) FromProtobuf(pballot *protobuf.Ballot) { + b.Epoch = uint32(*pballot.Epoch) + b.Number = uint64(*pballot.Number) + b.ReplicaId = uint8(*pballot.ReplicaID) +} diff --git a/message/ballot_test.go b/message/ballot_test.go index 2cec85f..56b3fd5 100644 --- a/message/ballot_test.go +++ b/message/ballot_test.go @@ -130,3 +130,20 @@ func TestBallotSetNumber(t *testing.T) { b.SetNumber(4) assert.Equal(t, b.GetNumber(), uint64(4)) } + +func TestToProtobuf(t *testing.T) { + b := NewBallot(1, 2, 3) + pb := b.ToProtobuf() + assert.Equal(t, *pb.Epoch, uint32(1)) + assert.Equal(t, *pb.Number, uint64(2)) + assert.Equal(t, *pb.ReplicaID, uint8(3)) +} + +func TestFromProtobuf(t *testing.T) { + b := NewBallot(1, 2, 3) + pb := b.ToProtobuf() + + var bb Ballot + bb.FromProtobuf(pb) + assert.Equal(t, *b, bb) +} diff --git a/message/command.go b/message/command.go index e17a12b..9d4cce9 100644 --- a/message/command.go +++ b/message/command.go @@ -17,15 +17,40 @@ func (c Commands) Clone() Commands { return nil } cmds := make(Commands, len(c)) - for i := range cmds { + for i := range c { cmds[i] = make(Command, len(c[i])) copy(cmds[i], c[i]) } return cmds } +func (c Commands) ToBytesSlice() [][]byte { + b := make([][]byte, len(c)) + for i := range c { + b[i] = c[i].ToBytes() + } + return b +} + +func (c *Commands) FromBytesSlice(b [][]byte) { + *c = make([]Command, len(b)) + for i := range b { + (*c)[i].FromBytes(b[i]) + } +} + func (c Command) Clone() Command { cmd := make(Command, len(c)) copy(cmd, c) return cmd } + +func (c Command) ToBytes() []byte { + b := make([]byte, len(c)) + copy(b, c) + return b +} +func (c *Command) FromBytes(b []byte) { + *c = make([]byte, len(b)) + copy(*c, b) +} diff --git a/message/command_test.go b/message/command_test.go index dcd49a2..3dca056 100644 --- a/message/command_test.go +++ b/message/command_test.go @@ -37,3 +37,25 @@ func TestCommandClone(t *testing.T) { assert.ObjectsAreEqual(manyCmds.Clone(), manyCmds), ) } + +func TestCommandsToBytesSlice(t *testing.T) { + manyCmds := Commands{ + Command("1"), + Command("2"), + } + + b := manyCmds.ToBytesSlice() + assert.Equal(t, b, [][]byte{[]byte("1"), []byte("2")}) +} + +func TestBytesSliceToCommands(t *testing.T) { + b := [][]byte{[]byte("1"), []byte("2")} + + var cmds Commands + cmds.FromBytesSlice(b) + expectedCmds := Commands{ + Command("1"), + Command("2"), + } + assert.Equal(t, cmds, expectedCmds) +} diff --git a/message/commit.go b/message/commit.go index ff53a0c..54ac38d 100644 --- a/message/commit.go +++ b/message/commit.go @@ -2,6 +2,9 @@ package message import ( "fmt" + + "github.com/go-distributed/epaxos/protobuf" + "github.com/golang/glog" ) type Commit struct { @@ -10,6 +13,7 @@ type Commit struct { Cmds Commands Deps Dependencies From uint8 + pb protobuf.Commit // for protobuf } func (c *Commit) Sender() uint8 { @@ -35,3 +39,36 @@ func (c *Commit) Instance() uint64 { func (c *Commit) String() string { return fmt.Sprintf("Commit, Instance[%v][%v]", c.ReplicaId, c.InstanceId) } + +func (c *Commit) MarshalProtobuf() ([]byte, error) { + replicaID := uint32(c.ReplicaId) + instanceID := uint64(c.InstanceId) + from := uint32(c.From) + + c.pb.ReplicaID = &replicaID + c.pb.InstanceID = &instanceID + c.pb.Cmds = c.Cmds.ToBytesSlice() + c.pb.Deps = c.Deps + c.pb.From = &from + + data, err := c.pb.Marshal() + if err != nil { + glog.Warning("Commit: MarshalProtobuf() error: ", err) + return nil, err + } + return data, nil +} + +func (c *Commit) UnmarshalProtobuf(data []byte) error { + if err := c.pb.Unmarshal(data); err != nil { + glog.Warning("Commit: UnmarshalProtobuf() error: ", err) + return err + } + + c.ReplicaId = uint8(c.pb.GetReplicaID()) + c.InstanceId = uint64(c.pb.GetInstanceID()) + c.Cmds.FromBytesSlice(c.pb.GetCmds()) + c.Deps = c.pb.GetDeps() + c.From = uint8(c.pb.GetFrom()) + return nil +} diff --git a/message/data_test.go b/message/data_test.go index aea007c..97ef7de 100644 --- a/message/data_test.go +++ b/message/data_test.go @@ -93,3 +93,175 @@ func TestInstance(t *testing.T) { assert.Equal(t, ppr.Instance(), uint64(8)) assert.Equal(t, pps.Instance(), uint64(9)) } + +func TestMarshalUnmarshalProtobuf(t *testing.T) { + // Tests for PreAccept. + p0 := &PreAccept{ + ReplicaId: 1, + InstanceId: 2, + Cmds: Commands{ + Command("Hello"), + Command("World"), + }, + Deps: Dependencies{1, 2, 3}, + Ballot: NewBallot(1, 2, 3), + From: 1, + } + + data, err := p0.MarshalProtobuf() + assert.NoError(t, err) + assert.NotNil(t, data) + + p1 := new(PreAccept) + assert.NoError(t, p1.UnmarshalProtobuf(data)) + assert.Equal(t, p0, p1) + + // Tests for PreAcceptOk + p2 := &PreAcceptOk{ + ReplicaId: 1, + InstanceId: 2, + From: 1, + } + + data, err = p2.MarshalProtobuf() + assert.NoError(t, err) + assert.NotNil(t, data) + + p3 := new(PreAcceptOk) + assert.NoError(t, p3.UnmarshalProtobuf(data)) + assert.Equal(t, p2, p3) + + // Tests for PreAcceptReply + p4 := &PreAcceptReply{ + ReplicaId: 1, + InstanceId: 2, + Deps: Dependencies{1, 2, 3}, + Ballot: NewBallot(1, 2, 3), + From: 1, + } + + data, err = p4.MarshalProtobuf() + assert.NoError(t, err) + assert.NotNil(t, data) + + p5 := new(PreAcceptReply) + assert.NoError(t, p5.UnmarshalProtobuf(data)) + assert.Equal(t, p4, p5) + + // Tests for Accept. + a0 := &Accept{ + ReplicaId: 1, + InstanceId: 2, + Cmds: Commands{ + Command("Hello"), + Command("World"), + }, + Deps: Dependencies{1, 2, 3}, + Ballot: NewBallot(1, 2, 3), + From: 1, + } + + data, err = a0.MarshalProtobuf() + assert.NoError(t, err) + assert.NotNil(t, data) + + a1 := new(Accept) + assert.NoError(t, a1.UnmarshalProtobuf(data)) + assert.Equal(t, a0, a1) + + // Tests for AcceptReply. + a2 := &AcceptReply{ + ReplicaId: 1, + InstanceId: 2, + Ballot: NewBallot(1, 2, 3), + From: 1, + } + + data, err = a2.MarshalProtobuf() + assert.NoError(t, err) + assert.NotNil(t, data) + + a3 := new(AcceptReply) + assert.NoError(t, a3.UnmarshalProtobuf(data)) + assert.Equal(t, a2, a3) + + // Tests for Commit. + c0 := &Commit{ + ReplicaId: 1, + InstanceId: 2, + Cmds: Commands{ + Command("Hello"), + Command("World"), + }, + Deps: Dependencies{1, 2, 3}, + From: 1, + } + + data, err = c0.MarshalProtobuf() + assert.NoError(t, err) + assert.NotNil(t, data) + + c1 := new(Commit) + assert.NoError(t, c1.UnmarshalProtobuf(data)) + assert.Equal(t, c0, c1) + + // Tests for Prepare. + p6 := &Prepare{ + ReplicaId: 1, + InstanceId: 2, + Ballot: NewBallot(1, 2, 3), + From: 1, + } + + data, err = p6.MarshalProtobuf() + assert.NoError(t, err) + assert.NotNil(t, data) + + p7 := new(Prepare) + assert.NoError(t, p7.UnmarshalProtobuf(data)) + assert.Equal(t, p6, p7) + + // Tests for PrepareReply. + p8 := &PrepareReply{ + ReplicaId: 1, + InstanceId: 2, + Status: 1, // TODO(yifan): Change to exported state. + Cmds: Commands{ + Command("Hello"), + Command("World"), + }, + Deps: Dependencies{1, 2, 3}, + Ballot: NewBallot(1, 2, 3), + OriginalBallot: NewBallot(3, 2, 1), + IsFromLeader: true, + From: 1, + } + + data, err = p8.MarshalProtobuf() + assert.NoError(t, err) + assert.NotNil(t, data) + + p9 := new(PrepareReply) + assert.NoError(t, p9.UnmarshalProtobuf(data)) + assert.Equal(t, p8, p9) + + // Tests for Propose + p10 := new(Propose) + + data, err = p10.MarshalProtobuf() + assert.Nil(t, data) + assert.Error(t, err) + + p11 := new(Propose) + assert.Error(t, p11.UnmarshalProtobuf(data)) + + // Test for Timeout + t0 := new(Timeout) + + data, err = t0.MarshalProtobuf() + assert.Nil(t, data) + assert.Error(t, err) + + t1 := new(Propose) + assert.Error(t, t1.UnmarshalProtobuf(data)) +} diff --git a/message/message.go b/message/message.go index f165b74..b82668a 100644 --- a/message/message.go +++ b/message/message.go @@ -7,8 +7,11 @@ type Message interface { Replica() uint8 Instance() uint64 String() string + MarshalProtobuf() ([]byte, error) + UnmarshalProtobuf([]byte) error } +// TODO(yifan): Remove this. func MessageTypeString(m Message) string { switch m.Type() { case ProposeMsg: @@ -33,3 +36,28 @@ func MessageTypeString(m Message) string { panic("") } } + +func TypeToString(msgType uint8) string { + switch msgType { + case ProposeMsg: + return "Propose" + case PreAcceptMsg: + return "PreAccept" + case PreAcceptOkMsg: + return "PreAcceptOk" + case PreAcceptReplyMsg: + return "PreAcceptReply" + case AcceptMsg: + return "Accept" + case AcceptReplyMsg: + return "AcceptReply" + case CommitMsg: + return "Commit" + case PrepareMsg: + return "Prepare" + case PrepareReplyMsg: + return "PrepareReply" + default: + panic("") + } +} diff --git a/message/pre_accept.go b/message/pre_accept.go index 2ace7f1..18c9e2d 100644 --- a/message/pre_accept.go +++ b/message/pre_accept.go @@ -2,6 +2,9 @@ package message import ( "fmt" + + "github.com/go-distributed/epaxos/protobuf" + "github.com/golang/glog" ) type PreAccept struct { @@ -11,6 +14,7 @@ type PreAccept struct { Deps Dependencies Ballot *Ballot From uint8 + pb protobuf.PreAccept // for protobuf } // we don't need ReplicaId in PreAcceptOk, @@ -19,6 +23,7 @@ type PreAcceptOk struct { ReplicaId uint8 InstanceId uint64 From uint8 + pb protobuf.PreAcceptOK // for protobuf } type PreAcceptReply struct { @@ -27,6 +32,7 @@ type PreAcceptReply struct { Deps Dependencies Ballot *Ballot From uint8 + pb protobuf.PreAcceptReply } // PreAccept @@ -54,6 +60,44 @@ func (p *PreAccept) String() string { return fmt.Sprintf("PreAccept, Instance[%v][%v], Ballot[%v]", p.ReplicaId, p.InstanceId, p.Ballot.String()) } +func (p *PreAccept) MarshalProtobuf() ([]byte, error) { + replicaID := uint32(p.ReplicaId) + instanceID := uint64(p.InstanceId) + from := uint32(p.From) + + p.pb.ReplicaID = &replicaID + p.pb.InstanceID = &instanceID + p.pb.Cmds = p.Cmds.ToBytesSlice() + p.pb.Deps = p.Deps + p.pb.Ballot = p.Ballot.ToProtobuf() + p.pb.From = &from + + data, err := p.pb.Marshal() + if err != nil { + glog.Warning("PreAccept: MarshalProtobuf() error: ", err) + return nil, err + } + return data, nil +} + +func (p *PreAccept) UnmarshalProtobuf(data []byte) error { + if err := p.pb.Unmarshal(data); err != nil { + glog.Warning("PreAccept: UnmarshalProtobuf() error: ", err) + return err + } + + p.ReplicaId = uint8(p.pb.GetReplicaID()) + p.InstanceId = uint64(p.pb.GetInstanceID()) + p.Cmds.FromBytesSlice(p.pb.GetCmds()) + p.Deps = p.pb.GetDeps() + if p.Ballot == nil { + p.Ballot = new(Ballot) + } + p.Ballot.FromProtobuf(p.pb.GetBallot()) + p.From = uint8(p.pb.GetFrom()) + return nil +} + // PreAcceptOk func (p *PreAcceptOk) Sender() uint8 { return p.From @@ -79,6 +123,35 @@ func (p *PreAcceptOk) String() string { return fmt.Sprintf("PreAcceptOk, Instance[%v][%v]", p.ReplicaId, p.InstanceId) } +func (p *PreAcceptOk) MarshalProtobuf() ([]byte, error) { + replicaID := uint32(p.ReplicaId) + instanceID := uint64(p.InstanceId) + from := uint32(p.From) + + p.pb.ReplicaID = &replicaID + p.pb.InstanceID = &instanceID + p.pb.From = &from + + data, err := p.pb.Marshal() + if err != nil { + glog.Warning("PreAcceptOk: MarshalProtobuf() error: ", err) + return nil, err + } + return data, nil +} + +func (p *PreAcceptOk) UnmarshalProtobuf(data []byte) error { + if err := p.pb.Unmarshal(data); err != nil { + glog.Warning("PreAcceptOk: UnmarshalProtobuf() error: ", err) + return err + } + + p.ReplicaId = uint8(p.pb.GetReplicaID()) + p.InstanceId = uint64(p.pb.GetInstanceID()) + p.From = uint8(p.pb.GetFrom()) + return nil +} + // PreAcceptReply func (p *PreAcceptReply) Sender() uint8 { return p.From @@ -103,3 +176,39 @@ func (p *PreAcceptReply) Instance() uint64 { func (p *PreAcceptReply) String() string { return fmt.Sprintf("PreAcceptReply, Instance[%v][%v], Ballot[%v]", p.ReplicaId, p.InstanceId, p.Ballot.String()) } + +func (p *PreAcceptReply) MarshalProtobuf() ([]byte, error) { + replicaID := uint32(p.ReplicaId) + instanceID := uint64(p.InstanceId) + from := uint32(p.From) + + p.pb.ReplicaID = &replicaID + p.pb.InstanceID = &instanceID + p.pb.Deps = p.Deps + p.pb.Ballot = p.Ballot.ToProtobuf() + p.pb.From = &from + + data, err := p.pb.Marshal() + if err != nil { + glog.Warning("PreAcceptReply: MarshalProtobuf() error: ", err) + return nil, err + } + return data, nil +} + +func (p *PreAcceptReply) UnmarshalProtobuf(data []byte) error { + if err := p.pb.Unmarshal(data); err != nil { + glog.Warning("PreAcceptReply: UnmarshalProtobuf() error: ", err) + return err + } + + p.ReplicaId = uint8(p.pb.GetReplicaID()) + p.InstanceId = uint64(p.pb.GetInstanceID()) + p.Deps = p.pb.GetDeps() + if p.Ballot == nil { + p.Ballot = new(Ballot) + } + p.Ballot.FromProtobuf(p.pb.GetBallot()) + p.From = uint8(p.pb.GetFrom()) + return nil +} diff --git a/message/prepare.go b/message/prepare.go index 7f05ae2..d0d79c0 100644 --- a/message/prepare.go +++ b/message/prepare.go @@ -2,6 +2,9 @@ package message import ( "fmt" + + "github.com/go-distributed/epaxos/protobuf" + "github.com/golang/glog" ) type Prepare struct { @@ -9,6 +12,7 @@ type Prepare struct { InstanceId uint64 Ballot *Ballot From uint8 + pb protobuf.Prepare // for protobuf } type PrepareReply struct { @@ -22,6 +26,7 @@ type PrepareReply struct { OriginalBallot *Ballot IsFromLeader bool From uint8 + pb protobuf.PrepareReply // for protobuf } func (p *Prepare) Sender() uint8 { @@ -48,6 +53,40 @@ func (p *Prepare) String() string { return fmt.Sprintf("Prepare, Instance[%v][%v], Ballot[%v]", p.ReplicaId, p.InstanceId, p.Ballot.String()) } +func (p *Prepare) MarshalProtobuf() ([]byte, error) { + replicaID := uint32(p.ReplicaId) + instanceID := uint64(p.InstanceId) + from := uint32(p.From) + + p.pb.ReplicaID = &replicaID + p.pb.InstanceID = &instanceID + p.pb.Ballot = p.Ballot.ToProtobuf() + p.pb.From = &from + + data, err := p.pb.Marshal() + if err != nil { + glog.Warning("Prepare: MarshalProtobuf() error: ", err) + return nil, err + } + return data, nil +} + +func (p *Prepare) UnmarshalProtobuf(data []byte) error { + if err := p.pb.Unmarshal(data); err != nil { + glog.Warning("Prepare: UnmarshalProtobuf() error: ", err) + return err + } + + p.ReplicaId = uint8(p.pb.GetReplicaID()) + p.InstanceId = uint64(p.pb.GetInstanceID()) + if p.Ballot == nil { + p.Ballot = new(Ballot) + } + p.Ballot.FromProtobuf(p.pb.GetBallot()) + p.From = uint8(p.pb.GetFrom()) + return nil +} + func (p *PrepareReply) Sender() uint8 { return p.From } @@ -72,3 +111,51 @@ func (p *PrepareReply) String() string { return fmt.Sprintf("PrepareReply, Instance[%v][%v], Ballot[%v], Original Ballot[%v]", p.ReplicaId, p.InstanceId, p.Ballot.String(), p.OriginalBallot.String()) } + +func (p *PrepareReply) MarshalProtobuf() ([]byte, error) { + replicaID := uint32(p.ReplicaId) + instanceID := uint64(p.InstanceId) + state := protobuf.State(p.Status) + from := uint32(p.From) + + p.pb.ReplicaID = &replicaID + p.pb.InstanceID = &instanceID + p.pb.State = &state + p.pb.Cmds = p.Cmds.ToBytesSlice() + p.pb.Deps = p.Deps + p.pb.Ballot = p.Ballot.ToProtobuf() + p.pb.OriginalBallot = p.OriginalBallot.ToProtobuf() + p.pb.IsFromLeader = &p.IsFromLeader + p.pb.From = &from + + data, err := p.pb.Marshal() + if err != nil { + glog.Warning("PrepareReply: MarshalProtobuf() error: ", err) + return nil, err + } + return data, nil +} + +func (p *PrepareReply) UnmarshalProtobuf(data []byte) error { + if err := p.pb.Unmarshal(data); err != nil { + glog.Warning("PrepareReply: UnmarshalProtobuf() error: ", err) + return err + } + + p.ReplicaId = uint8(p.pb.GetReplicaID()) + p.InstanceId = uint64(p.pb.GetInstanceID()) + p.Status = uint8(p.pb.GetState()) + p.Cmds.FromBytesSlice(p.pb.GetCmds()) + p.Deps = p.pb.GetDeps() + if p.Ballot == nil { + p.Ballot = new(Ballot) + } + p.Ballot.FromProtobuf(p.pb.GetBallot()) + if p.OriginalBallot == nil { + p.OriginalBallot = new(Ballot) + } + p.OriginalBallot.FromProtobuf(p.pb.GetOriginalBallot()) + p.IsFromLeader = p.pb.GetIsFromLeader() + p.From = uint8(p.pb.GetFrom()) + return nil +} diff --git a/message/propose.go b/message/propose.go index aaf98a7..06673d5 100644 --- a/message/propose.go +++ b/message/propose.go @@ -44,3 +44,11 @@ func (p *Propose) Instance() uint64 { func (p *Propose) String() string { return fmt.Sprintf("Propose, Instance[%v][%v]", p.ReplicaId, p.InstanceId) } + +func (p *Propose) MarshalProtobuf() ([]byte, error) { + return nil, fmt.Errorf("Propose: MarshalProtobuf() not implemented\n") +} + +func (p *Propose) UnmarshalProtobuf([]byte) error { + return fmt.Errorf("Propose: UnmarshalProtobuf() not implemented\n") +} diff --git a/message/timeout.go b/message/timeout.go index 1ee9bef..902cf9a 100644 --- a/message/timeout.go +++ b/message/timeout.go @@ -33,3 +33,11 @@ func (t *Timeout) Instance() uint64 { func (t *Timeout) String() string { return fmt.Sprintf("Timeout, Instance[%v][%v]", t.ReplicaId, t.InstanceId) } + +func (t *Timeout) MarshalProtobuf() ([]byte, error) { + return nil, fmt.Errorf("Timeout: MarshalProtobuf() not implemented\n") +} + +func (t *Timeout) UnmarshalProtobuf([]byte) error { + return fmt.Errorf("Timeout: UnmarshalProtobuf() not implemented\n") +} diff --git a/messenger.go b/messenger.go new file mode 100644 index 0000000..9765749 --- /dev/null +++ b/messenger.go @@ -0,0 +1,147 @@ +package epaxos + +import ( + "math/rand" + + "github.com/go-distributed/epaxos/codec" + "github.com/go-distributed/epaxos/message" + "github.com/go-distributed/epaxos/transporter" +) + +// A messenger can: +// 1, Send a message to the specified peer. +// 2, Multicast a message to a fast quorum of peers. +// 3, Broad a message to all peers. +// 4, Receive a message. +// All operations will block. +type Messenger interface { + // Send a message to a specified peer. + Send(to uint8, msg message.Message) error + + // Multicast a message to the fast quorum. + MulticastFastquorum(msg message.Message) error + + // Broadcast a message to all peers. + Broadcast(msg message.Message) error + + // Tries to receive a message. + Recv() (message.Message, error) + + // Start the messenger. + Start() error + + // Stop the messenger. + Stop() error +} + +type EpaxosMessenger struct { + hostports map[uint8]string + self uint8 + fastQuorum int // Consider to remove this. + all int + tr Transporter + codec Codec +} + +// Send a message to the specified peer. +func (m *EpaxosMessenger) Send(to uint8, msg message.Message) error { + data, err := m.codec.Marshal(msg) + if err != nil { + return err + } + return m.tr.Send(m.hostports[to], msg.Type(), data) +} + +// Multicast a message to the fast quorum. +func (m *EpaxosMessenger) MulticastFastQuorum(msg message.Message) error { + data, err := m.codec.Marshal(msg) + if err != nil { + return err + } + + skip := uint8(rand.Intn(m.all)) + if skip == m.self { + skip = (skip + 1) % uint8(m.all) + } + + for i := uint8(0); i < uint8(m.all); i++ { + if i == m.self || i == skip { + // Skip itself and one more. + continue + } + err = m.tr.Send(m.hostports[i], msg.Type(), data) + if err != nil { + return err + } + } + return nil +} + +// Broadcast a message to all peers. +func (m *EpaxosMessenger) Broadcast(msg message.Message) error { + data, err := m.codec.Marshal(msg) + if err != nil { + return err + } + + for i := uint8(0); i < uint8(m.all); i++ { + if i == m.self { // Skip itself. + continue + } + err = m.tr.Send(m.hostports[i], msg.Type(), data) + if err != nil { + return err + } + } + return nil +} + +// Tries to receive a message. +func (m *EpaxosMessenger) Recv() (message.Message, error) { + mtype, data, err := m.tr.Recv() + if err != nil { + return nil, err + } + return m.codec.Unmarshal(mtype, data) +} + +// Start the messenger. +func (m *EpaxosMessenger) Start() error { + if err := m.codec.Initial(); err != nil { + return err + } + return m.tr.Start() +} + +// Stop the messenger. +func (m *EpaxosMessenger) Stop() error { + if err := m.tr.Stop(); err != nil { + return err + } + return m.codec.Destroy() +} + +// Create a new messenger that uses GoGoprotobuf over HTTP. +func NewGoGoProtobufHTTPMessenger(hostports map[uint8]string, self uint8, size int) (*EpaxosMessenger, error) { + m := &EpaxosMessenger{ + hostports: hostports, + self: self, + fastQuorum: size - 1, + all: size, + } + + tr, err := transporter.NewHTTPTransporter(hostports[self]) + if err != nil { + return nil, err + } + + codec, err := codec.NewGoGoProtobufHTTPTransporter() + if err != nil { + return nil, err + // This can't happen, added here + // for symmetric looking. + } + + m.tr, m.codec = tr, codec + return m, nil +} diff --git a/protobuf/message.pb.go b/protobuf/message.pb.go index ba66a14..3810a3f 100644 --- a/protobuf/message.pb.go +++ b/protobuf/message.pb.go @@ -471,7 +471,7 @@ func (m *PrepareReply) GetFrom() uint32 { type Commit struct { ReplicaID *uint32 `protobuf:"varint,1,req" json:"ReplicaID,omitempty"` - InstancdID *uint64 `protobuf:"varint,2,req" json:"InstancdID,omitempty"` + InstanceID *uint64 `protobuf:"varint,2,req" json:"InstanceID,omitempty"` Cmds [][]byte `protobuf:"bytes,3,rep" json:"Cmds,omitempty"` Deps []uint64 `protobuf:"varint,4,rep" json:"Deps,omitempty"` From *uint32 `protobuf:"varint,5,req" json:"From,omitempty"` @@ -488,9 +488,9 @@ func (m *Commit) GetReplicaID() uint32 { return 0 } -func (m *Commit) GetInstancdID() uint64 { - if m != nil && m.InstancdID != nil { - return *m.InstancdID +func (m *Commit) GetInstanceID() uint64 { + if m != nil && m.InstanceID != nil { + return *m.InstanceID } return 0 } @@ -1676,7 +1676,7 @@ func (m *Commit) Unmarshal(data []byte) error { break } } - m.InstancdID = &v + m.InstanceID = &v case 3: if wireType != 2 { return code_google_com_p_gogoprotobuf_proto.ErrWrongType @@ -1883,7 +1883,7 @@ func (this *Commit) String() string { } s := strings.Join([]string{`&Commit{`, `ReplicaID:` + valueToStringMessage(this.ReplicaID) + `,`, - `InstancdID:` + valueToStringMessage(this.InstancdID) + `,`, + `InstanceID:` + valueToStringMessage(this.InstanceID) + `,`, `Cmds:` + fmt.Sprintf("%v", this.Cmds) + `,`, `Deps:` + fmt.Sprintf("%v", this.Deps) + `,`, `From:` + valueToStringMessage(this.From) + `,`, @@ -2114,8 +2114,8 @@ func (m *Commit) Size() (n int) { if m.ReplicaID != nil { n += 1 + sovMessage(uint64(*m.ReplicaID)) } - if m.InstancdID != nil { - n += 1 + sovMessage(uint64(*m.InstancdID)) + if m.InstanceID != nil { + n += 1 + sovMessage(uint64(*m.InstanceID)) } if len(m.Cmds) > 0 { for _, b := range m.Cmds { @@ -2339,7 +2339,7 @@ func NewPopulatedCommit(r randyMessage, easy bool) *Commit { v37 := r.Uint32() this.ReplicaID = &v37 v38 := uint64(r.Uint32()) - this.InstancdID = &v38 + this.InstanceID = &v38 if r.Intn(10) != 0 { v39 := r.Intn(100) this.Cmds = make([][]byte, v39) @@ -2893,10 +2893,10 @@ func (m *Commit) MarshalTo(data []byte) (n int, err error) { i++ i = encodeVarintMessage(data, i, uint64(*m.ReplicaID)) } - if m.InstancdID != nil { + if m.InstanceID != nil { data[i] = 0x10 i++ - i = encodeVarintMessage(data, i, uint64(*m.InstancdID)) + i = encodeVarintMessage(data, i, uint64(*m.InstanceID)) } if len(m.Cmds) > 0 { for _, b := range m.Cmds { @@ -3016,7 +3016,7 @@ func (this *Commit) GoString() string { if this == nil { return "nil" } - s := strings1.Join([]string{`&protobuf.Commit{` + `ReplicaID:` + valueToGoStringMessage(this.ReplicaID, "uint32"), `InstancdID:` + valueToGoStringMessage(this.InstancdID, "uint64"), `Cmds:` + fmt1.Sprintf("%#v", this.Cmds), `Deps:` + fmt1.Sprintf("%#v", this.Deps), `From:` + valueToGoStringMessage(this.From, "uint32"), `XXX_unrecognized:` + fmt1.Sprintf("%#v", this.XXX_unrecognized) + `}`}, ", ") + s := strings1.Join([]string{`&protobuf.Commit{` + `ReplicaID:` + valueToGoStringMessage(this.ReplicaID, "uint32"), `InstanceID:` + valueToGoStringMessage(this.InstanceID, "uint64"), `Cmds:` + fmt1.Sprintf("%#v", this.Cmds), `Deps:` + fmt1.Sprintf("%#v", this.Deps), `From:` + valueToGoStringMessage(this.From, "uint32"), `XXX_unrecognized:` + fmt1.Sprintf("%#v", this.XXX_unrecognized) + `}`}, ", ") return s } func valueToGoStringMessage(v interface{}, typ string) string { @@ -4095,14 +4095,14 @@ func (this *Commit) VerboseEqual(that interface{}) error { } else if that1.ReplicaID != nil { return fmt2.Errorf("ReplicaID this(%v) Not Equal that(%v)", this.ReplicaID, that1.ReplicaID) } - if this.InstancdID != nil && that1.InstancdID != nil { - if *this.InstancdID != *that1.InstancdID { - return fmt2.Errorf("InstancdID this(%v) Not Equal that(%v)", *this.InstancdID, *that1.InstancdID) + if this.InstanceID != nil && that1.InstanceID != nil { + if *this.InstanceID != *that1.InstanceID { + return fmt2.Errorf("InstanceID this(%v) Not Equal that(%v)", *this.InstanceID, *that1.InstanceID) } - } else if this.InstancdID != nil { - return fmt2.Errorf("this.InstancdID == nil && that.InstancdID != nil") - } else if that1.InstancdID != nil { - return fmt2.Errorf("InstancdID this(%v) Not Equal that(%v)", this.InstancdID, that1.InstancdID) + } else if this.InstanceID != nil { + return fmt2.Errorf("this.InstanceID == nil && that.InstanceID != nil") + } else if that1.InstanceID != nil { + return fmt2.Errorf("InstanceID this(%v) Not Equal that(%v)", this.InstanceID, that1.InstanceID) } if len(this.Cmds) != len(that1.Cmds) { return fmt2.Errorf("Cmds this(%v) Not Equal that(%v)", len(this.Cmds), len(that1.Cmds)) @@ -4163,13 +4163,13 @@ func (this *Commit) Equal(that interface{}) bool { } else if that1.ReplicaID != nil { return false } - if this.InstancdID != nil && that1.InstancdID != nil { - if *this.InstancdID != *that1.InstancdID { + if this.InstanceID != nil && that1.InstanceID != nil { + if *this.InstanceID != *that1.InstanceID { return false } - } else if this.InstancdID != nil { + } else if this.InstanceID != nil { return false - } else if that1.InstancdID != nil { + } else if that1.InstanceID != nil { return false } if len(this.Cmds) != len(that1.Cmds) { diff --git a/protobuf/message.proto b/protobuf/message.proto index 0fb46be..a9d7928 100644 --- a/protobuf/message.proto +++ b/protobuf/message.proto @@ -90,7 +90,7 @@ message PrepareReply { message Commit { required uint32 ReplicaID = 1; - required uint64 InstancdID = 2; + required uint64 InstanceID = 2; repeated bytes Cmds = 3; repeated uint64 Deps = 4; required uint32 From = 5; diff --git a/transporter.go b/transporter.go index ff20cfe..572e7ba 100644 --- a/transporter.go +++ b/transporter.go @@ -1,25 +1,23 @@ package epaxos -import ( - "github.com/go-distributed/epaxos/message" -) - +// A transporter provides only simple primitives, including +// Send and Recv. type Transporter interface { - // non-blocking send - Send(to uint8, msg message.Message) - - // non-blocking multicast - MulticastFastquorum(msg message.Message) - - // non-blocking broadcast - Broadcast(msg message.Message) + // Send an encoded message to the host:port. + // This will block. We need the msgType here because + // we assume the message is not self-explained. + Send(hostport string, msgType uint8, b []byte) error - // register a channel to communicate with replica - RegisterChannel(ch chan message.Message) + // Receive an encoded message from some peer. + // Return the type of the message and the content. + // We need the msgType here because we assume the + // message is not self-explained. + Recv() (msgType uint8, b []byte, err error) - // start the transporter, it's non-blocking + // Start the transporter, this will block if succeeds, + // or return an error if it fails. Start() error - // stop the transporter - Stop() + // Stop the transporter. + Stop() error } diff --git a/transporter/http_transporter.go b/transporter/http_transporter.go new file mode 100644 index 0000000..914c503 --- /dev/null +++ b/transporter/http_transporter.go @@ -0,0 +1,184 @@ +package transporter + +import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" + "path" + + "github.com/go-distributed/epaxos/message" + "github.com/golang/glog" +) + +const ( + prefix = "/epaxos" + proposePath = "/propose" + preAcceptPath = "/preAccept" + preAcceptOkPath = "/preAcceptOk" + preAcceptReplyPath = "/preAcceptReply" + acceptPath = "/accept" + acceptReplyPath = "/acceptReply" + commitPath = "/commit" + preparePath = "/prepare" + prepareReplyPath = "/prepareReply" +) + +// The buffer size for the internal channel. +const defaultChanSize = 1024 + +type rawMessage struct { + mtype uint8 + data []byte + err error +} + +type HTTPTransporter struct { + self string + path map[uint8]string + rawMessageChan chan *rawMessage + + mux *http.ServeMux + client *http.Client +} + +// Create a new http transporter. +func NewHTTPTransporter(self string) (*HTTPTransporter, error) { + t := &HTTPTransporter{ + self: self, + path: make(map[uint8]string), + rawMessageChan: make(chan *rawMessage, defaultChanSize), + + mux: http.NewServeMux(), + client: new(http.Client), + } + t.installPaths() + t.installHandlers() + return t, nil +} + +// Install paths, so that we don't need to do join every time. +func (t *HTTPTransporter) installPaths() { + t.path[message.ProposeMsg] = path.Join(prefix, proposePath) + t.path[message.PreAcceptMsg] = path.Join(prefix, preAcceptPath) + t.path[message.PreAcceptOkMsg] = path.Join(prefix, preAcceptOkPath) + t.path[message.PreAcceptReplyMsg] = path.Join(prefix, preAcceptReplyPath) + t.path[message.AcceptMsg] = path.Join(prefix, acceptPath) + t.path[message.AcceptReplyMsg] = path.Join(prefix, acceptReplyPath) + t.path[message.CommitMsg] = path.Join(prefix, commitPath) + t.path[message.PrepareMsg] = path.Join(prefix, preparePath) + t.path[message.PrepareReplyMsg] = path.Join(prefix, prepareReplyPath) +} + +func (t *HTTPTransporter) installHandlers() { + t.mux.HandleFunc(t.path[message.ProposeMsg], t.proposeHandler) + t.mux.HandleFunc(t.path[message.PreAcceptMsg], t.preAcceptHandler) + t.mux.HandleFunc(t.path[message.PreAcceptOkMsg], t.preAcceptOkHandler) + t.mux.HandleFunc(t.path[message.PreAcceptReplyMsg], t.preAcceptReplyHandler) + t.mux.HandleFunc(t.path[message.AcceptMsg], t.acceptHandler) + t.mux.HandleFunc(t.path[message.AcceptReplyMsg], t.acceptReplyHandler) + t.mux.HandleFunc(t.path[message.CommitMsg], t.commitHandler) + t.mux.HandleFunc(t.path[message.PrepareMsg], t.prepareHandler) + t.mux.HandleFunc(t.path[message.PrepareReplyMsg], t.prepareReplyHandler) +} + +// Send an encoded message to the host:port. +// This will block. We need the msgType here because +// we assume the message is not self-explained. +func (t *HTTPTransporter) Send(hostport string, msgType uint8, b []byte) error { + if hostport == t.self { + glog.Warning("Sending message to self!") + return nil + } + + // TODO(yifan): Support https. + targetURL := fmt.Sprintf("http://%s%s", hostport, t.path[msgType]) + + // Send POST. + resp, err := t.client.Post(targetURL, "application/protobuf", bytes.NewBuffer(b)) + if resp == nil || err != nil { + glog.Warning("HTTPTransporter: Post error for: ", + message.TypeToString(msgType), + err) + return err + } + defer resp.Body.Close() + return nil +} + +// Receive an encoded message from some peer. +// Return the type of the message and the content. +// We need the msgType here because we assume the +// message is not self-explained. +func (t *HTTPTransporter) Recv() (msgType uint8, b []byte, err error) { + rawMsg := <-t.rawMessageChan + return rawMsg.mtype, rawMsg.data, rawMsg.err +} + +// Start the transporter, it will block until success or failure. +func (t *HTTPTransporter) Start() error { + err := http.ListenAndServe(t.self, t.mux) + if err != nil { + return err + } + return nil +} + +// Stop the transporter. +func (t *HTTPTransporter) Stop() error { + close(t.rawMessageChan) + return nil +} + +// Message handlers +func (t *HTTPTransporter) proposeHandler(w http.ResponseWriter, r *http.Request) { + panic("Not implemented yet") +} + +func (t *HTTPTransporter) preAcceptHandler(w http.ResponseWriter, r *http.Request) { + t.handleMessage(message.PreAcceptMsg, r) +} + +func (t *HTTPTransporter) preAcceptOkHandler(w http.ResponseWriter, r *http.Request) { + t.handleMessage(message.PreAcceptOkMsg, r) +} + +func (t *HTTPTransporter) preAcceptReplyHandler(w http.ResponseWriter, r *http.Request) { + t.handleMessage(message.PreAcceptReplyMsg, r) +} + +func (t *HTTPTransporter) acceptHandler(w http.ResponseWriter, r *http.Request) { + t.handleMessage(message.AcceptMsg, r) +} + +func (t *HTTPTransporter) acceptReplyHandler(w http.ResponseWriter, r *http.Request) { + t.handleMessage(message.AcceptReplyMsg, r) +} + +func (t *HTTPTransporter) commitHandler(w http.ResponseWriter, r *http.Request) { + t.handleMessage(message.CommitMsg, r) +} + +func (t *HTTPTransporter) prepareHandler(w http.ResponseWriter, r *http.Request) { + t.handleMessage(message.PrepareMsg, r) +} + +func (t *HTTPTransporter) prepareReplyHandler(w http.ResponseWriter, r *http.Request) { + t.handleMessage(message.PrepareReplyMsg, r) +} + +// Send the messag type and raw bytes into the internal channel. +func (t *HTTPTransporter) handleMessage(msgType uint8, r *http.Request) { + b, err := ioutil.ReadAll(r.Body) + if err != nil { + glog.Warning("HTTPTransporter: Read HTTP body error for: ", + message.TypeToString(msgType), + err) + } + + t.rawMessageChan <- &rawMessage{ + mtype: msgType, + data: b, + err: err, + } +} From 46c4064b94664e81d76019674c1d431c1bb56b9f Mon Sep 17 00:00:00 2001 From: Yifan Gu Date: Wed, 18 Jun 2014 02:29:11 -0700 Subject: [PATCH 2/3] test(messenger) Added a test for the messenger. --- codec.go | 3 + codec/gogoprotobufcodec.go | 9 +- messenger.go | 86 +++++------ messenger/messenger_test.go | 244 ++++++++++++++++++++++++++++++++ transporter.go | 3 + transporter/http_transporter.go | 13 +- 6 files changed, 303 insertions(+), 55 deletions(-) create mode 100644 messenger/messenger_test.go diff --git a/codec.go b/codec.go index d63da3c..4e6f9ce 100644 --- a/codec.go +++ b/codec.go @@ -17,6 +17,9 @@ type Codec interface { // Unmarshal a message from bytes. Unmarshal(mtype uint8, data []byte) (message.Message, error) + // Stop a codec. + Stop() error + // Destroy a codec, release the resource. Destroy() error } diff --git a/codec/gogoprotobufcodec.go b/codec/gogoprotobufcodec.go index b440e7c..870d371 100644 --- a/codec/gogoprotobufcodec.go +++ b/codec/gogoprotobufcodec.go @@ -15,7 +15,12 @@ func (gc *GoGoProtobufCodec) Initial() error { return nil } -// Destroy te gogoprotobuf (no-op for now). +// Stop the gogoprotobuf (no-op for now). +func (gc *GoGoProtobufCodec) Stop() error { + return nil +} + +// Destroy the gogoprotobuf (no-op for now). func (gc *GoGoProtobufCodec) Destroy() error { return nil } @@ -35,7 +40,7 @@ func (c *GoGoProtobufCodec) Unmarshal(mtype uint8, data []byte) (message.Message case message.PreAcceptMsg: msg = new(message.PreAccept) case message.PreAcceptOkMsg: - msg = new(message.PreAccept) + msg = new(message.PreAcceptOk) case message.PreAcceptReplyMsg: msg = new(message.PreAcceptReply) case message.AcceptMsg: diff --git a/messenger.go b/messenger.go index 9765749..5f44c14 100644 --- a/messenger.go +++ b/messenger.go @@ -3,9 +3,7 @@ package epaxos import ( "math/rand" - "github.com/go-distributed/epaxos/codec" "github.com/go-distributed/epaxos/message" - "github.com/go-distributed/epaxos/transporter" ) // A messenger can: @@ -19,7 +17,7 @@ type Messenger interface { Send(to uint8, msg message.Message) error // Multicast a message to the fast quorum. - MulticastFastquorum(msg message.Message) error + MulticastFastQuorum(msg message.Message) error // Broadcast a message to all peers. Broadcast(msg message.Message) error @@ -32,44 +30,47 @@ type Messenger interface { // Stop the messenger. Stop() error + + // Destroy the messenger. + Destroy() error } type EpaxosMessenger struct { - hostports map[uint8]string - self uint8 - fastQuorum int // Consider to remove this. - all int - tr Transporter - codec Codec + Hostports map[uint8]string + Self uint8 + FastQuorum int // Consider to remove this. + All int + Tr Transporter + Codec Codec } // Send a message to the specified peer. func (m *EpaxosMessenger) Send(to uint8, msg message.Message) error { - data, err := m.codec.Marshal(msg) + data, err := m.Codec.Marshal(msg) if err != nil { return err } - return m.tr.Send(m.hostports[to], msg.Type(), data) + return m.Tr.Send(m.Hostports[to], msg.Type(), data) } // Multicast a message to the fast quorum. func (m *EpaxosMessenger) MulticastFastQuorum(msg message.Message) error { - data, err := m.codec.Marshal(msg) + data, err := m.Codec.Marshal(msg) if err != nil { return err } - skip := uint8(rand.Intn(m.all)) - if skip == m.self { - skip = (skip + 1) % uint8(m.all) + skip := uint8(rand.Intn(m.All)) + if skip == m.Self { + skip = (skip + 1) % uint8(m.All) } - for i := uint8(0); i < uint8(m.all); i++ { - if i == m.self || i == skip { - // Skip itself and one more. + for i := uint8(0); i < uint8(m.All); i++ { + if i == m.Self || i == skip { + // Skip itSelf and one more. continue } - err = m.tr.Send(m.hostports[i], msg.Type(), data) + err = m.Tr.Send(m.Hostports[i], msg.Type(), data) if err != nil { return err } @@ -79,16 +80,16 @@ func (m *EpaxosMessenger) MulticastFastQuorum(msg message.Message) error { // Broadcast a message to all peers. func (m *EpaxosMessenger) Broadcast(msg message.Message) error { - data, err := m.codec.Marshal(msg) + data, err := m.Codec.Marshal(msg) if err != nil { return err } - for i := uint8(0); i < uint8(m.all); i++ { - if i == m.self { // Skip itself. + for i := uint8(0); i < uint8(m.All); i++ { + if i == m.Self { // Skip itself. continue } - err = m.tr.Send(m.hostports[i], msg.Type(), data) + err = m.Tr.Send(m.Hostports[i], msg.Type(), data) if err != nil { return err } @@ -98,50 +99,33 @@ func (m *EpaxosMessenger) Broadcast(msg message.Message) error { // Tries to receive a message. func (m *EpaxosMessenger) Recv() (message.Message, error) { - mtype, data, err := m.tr.Recv() + mtype, data, err := m.Tr.Recv() if err != nil { return nil, err } - return m.codec.Unmarshal(mtype, data) + return m.Codec.Unmarshal(mtype, data) } // Start the messenger. func (m *EpaxosMessenger) Start() error { - if err := m.codec.Initial(); err != nil { + if err := m.Codec.Initial(); err != nil { return err } - return m.tr.Start() + return m.Tr.Start() } // Stop the messenger. func (m *EpaxosMessenger) Stop() error { - if err := m.tr.Stop(); err != nil { + if err := m.Tr.Stop(); err != nil { return err } - return m.codec.Destroy() + return m.Codec.Stop() } -// Create a new messenger that uses GoGoprotobuf over HTTP. -func NewGoGoProtobufHTTPMessenger(hostports map[uint8]string, self uint8, size int) (*EpaxosMessenger, error) { - m := &EpaxosMessenger{ - hostports: hostports, - self: self, - fastQuorum: size - 1, - all: size, - } - - tr, err := transporter.NewHTTPTransporter(hostports[self]) - if err != nil { - return nil, err - } - - codec, err := codec.NewGoGoProtobufHTTPTransporter() - if err != nil { - return nil, err - // This can't happen, added here - // for symmetric looking. +// Destroy the messenger. +func (m *EpaxosMessenger) Destroy() error { + if err := m.Tr.Destroy(); err != nil { + return err } - - m.tr, m.codec = tr, codec - return m, nil + return m.Codec.Destroy() } diff --git a/messenger/messenger_test.go b/messenger/messenger_test.go new file mode 100644 index 0000000..7f3f2ff --- /dev/null +++ b/messenger/messenger_test.go @@ -0,0 +1,244 @@ +package messenger + +import ( + "fmt" + "math/rand" + "testing" + "time" + + "github.com/go-distributed/epaxos" + "github.com/go-distributed/epaxos/message" + "github.com/go-distributed/testify/assert" +) + +func sendMessages(t *testing.T, mngr epaxos.Messenger, to uint8, msgs []message.Message) { + for i := range msgs { + assert.NoError(t, mngr.Send(to, msgs[i])) + } +} + +func randomReplicaID() uint8 { + return uint8(rand.Int()) +} + +func randomInstanceID() uint64 { + return uint64(rand.Int()) +} + +func randomCommands() message.Commands { + // Add 1 here to avoid assert.Equal == false when len == 0. + cmds := make([]message.Command, 1+rand.Intn(1024)) + for i := range cmds { + randStr := fmt.Sprintf("%10d", rand.Int()) + cmds[i] = message.Command(randStr) + } + return cmds +} + +func randomDependencies() message.Dependencies { + // Add 1 here to avoid assert.Equal == false when len == 0. + deps := make([]uint64, 1+rand.Intn(1024)) + for i := range deps { + deps[i] = uint64(rand.Int()) + } + return deps +} + +func randomBallot() *message.Ballot { + return message.NewBallot( + uint32(rand.Int()), + uint64(rand.Int()), + uint8(rand.Int())) +} + +func randomFrom() uint8 { + return uint8(rand.Int()) +} + +func randomStatus() uint8 { + return uint8(rand.Int()) +} + +func randomBool() bool { + if rand.Int()%2 == 0 { + return false + } + return true +} + +func generateAllMessages(num int) []message.Message { + ms := make([]message.Message, 0) + + for i := 0; i < num; i++ { + pa := &message.PreAccept{ + ReplicaId: randomReplicaID(), + InstanceId: randomInstanceID(), + Cmds: randomCommands(), + Deps: randomDependencies(), + Ballot: randomBallot(), + From: randomFrom(), + } + ms = append(ms, pa) + + po := &message.PreAcceptOk{ + ReplicaId: randomReplicaID(), + InstanceId: randomInstanceID(), + From: randomFrom(), + } + ms = append(ms, po) + + pr := &message.PreAcceptReply{ + ReplicaId: randomReplicaID(), + InstanceId: randomInstanceID(), + Deps: randomDependencies(), + Ballot: randomBallot(), + From: randomFrom(), + } + ms = append(ms, pr) + + ac := &message.Accept{ + ReplicaId: randomReplicaID(), + InstanceId: randomInstanceID(), + Cmds: randomCommands(), + Deps: randomDependencies(), + Ballot: randomBallot(), + From: randomFrom(), + } + ms = append(ms, ac) + + ar := &message.AcceptReply{ + ReplicaId: randomReplicaID(), + InstanceId: randomInstanceID(), + Ballot: randomBallot(), + From: randomFrom(), + } + ms = append(ms, ar) + + pp := &message.Prepare{ + ReplicaId: randomReplicaID(), + InstanceId: randomInstanceID(), + Ballot: randomBallot(), + From: randomFrom(), + } + ms = append(ms, pp) + + ppr := &message.PrepareReply{ + ReplicaId: randomReplicaID(), + InstanceId: randomInstanceID(), + Status: randomStatus(), + Cmds: randomCommands(), + Deps: randomDependencies(), + Ballot: randomBallot(), + OriginalBallot: randomBallot(), + IsFromLeader: randomBool(), + From: randomFrom(), + } + ms = append(ms, ppr) + + cm := &message.Commit{ + ReplicaId: randomReplicaID(), + InstanceId: randomInstanceID(), + Cmds: randomCommands(), + Deps: randomDependencies(), + From: randomFrom(), + } + ms = append(ms, cm) + } + + // Shuffle the messages. + for i := range ms { + index := rand.Intn(i + 1) + ms[i], ms[index] = ms[index], ms[i] + } + + return ms +} + +func verifyMesseges(t *testing.T, sender, receiver int, receivedMsgs []message.Message, msgs []message.Message, done chan bool) { + <-done // Wait for receiving completed. + + if sender == receiver { + assert.Empty(t, receivedMsgs) + return + } + assert.Equal(t, msgs, receivedMsgs) +} + +func receiveMessages(t *testing.T, mngr epaxos.Messenger, msgs *[]message.Message, start, done chan bool) { + go func() { + for { + <-start // Wait for trigger. + <-time.After(time.Second * 10) + done <- true // Tell that we completed receiving. + } + }() + + for { + msg, err := mngr.Recv() + assert.NoError(t, err) + *msgs = append(*msgs, msg) + } +} + +func newGoGoProtobufHTTPMessengerGroup(t *testing.T, hostports map[uint8]string, chanSize int) []epaxos.Messenger { + mngrs := make([]epaxos.Messenger, len(hostports)) + for i := range mngrs { + var err error + mngrs[i], err = NewGoGoProtobufHTTPMessenger(hostports, uint8(i), len(mngrs), chanSize) + assert.NoError(t, err) + } + return mngrs +} + +// Test Send of GoGoProtobufHTTPMessenger +func TestGoGoProtobufHTTPMessenger(t *testing.T) { + numMessages := 1024 + + hostports := make(map[uint8]string) + hostports[0] = "localhost:8080" + hostports[1] = "localhost:8081" + hostports[2] = "localhost:8082" + + mngrs := newGoGoProtobufHTTPMessengerGroup(t, hostports, numMessages*10) + + for i := range mngrs { + go mngrs[i].Start() + time.Sleep(time.Second) + } + + startChans := make([]chan bool, len(mngrs)) + for i := range startChans { + startChans[i] = make(chan bool, 1) + } + + done := make(chan bool, 1) + receivedMsgs := make([]message.Message, 0) + for i := range mngrs { + go receiveMessages(t, mngrs[i], &receivedMsgs, startChans[i], done) + } + + for i := range mngrs { + for j := range mngrs { + fmt.Printf("Generating %d messages\n", numMessages*8) + receivedMsgs = receivedMsgs[:0] // Reset the slice. + msgs := generateAllMessages(numMessages) + + t1 := time.Now().UnixNano() + fmt.Printf("Sending %d messages from [%d] to [%d]\n", numMessages*8, i, j) + sendMessages(t, mngrs[i], uint8(j), msgs) + + t2 := time.Now().UnixNano() + + fmt.Printf("Done! time elasped: %vms\n", (t2-t1)/1000000) + startChans[j] <- true // Trigger receiving. + + fmt.Printf("Verifying %d messages\n", numMessages*8) + verifyMesseges(t, i, j, receivedMsgs, msgs, done) + fmt.Println("Messages look good!") + } + } + + for i := range mngrs { + assert.NoError(t, mngrs[i].Stop()) + } +} diff --git a/transporter.go b/transporter.go index 572e7ba..c79ddf0 100644 --- a/transporter.go +++ b/transporter.go @@ -20,4 +20,7 @@ type Transporter interface { // Stop the transporter. Stop() error + + // Destroy the transporter. + Destroy() error } diff --git a/transporter/http_transporter.go b/transporter/http_transporter.go index 914c503..85adcdd 100644 --- a/transporter/http_transporter.go +++ b/transporter/http_transporter.go @@ -43,11 +43,15 @@ type HTTPTransporter struct { } // Create a new http transporter. -func NewHTTPTransporter(self string) (*HTTPTransporter, error) { +func NewHTTPTransporter(self string, chanSize int) (*HTTPTransporter, error) { + if chanSize <= 0 { + chanSize = defaultChanSize + } + t := &HTTPTransporter{ self: self, path: make(map[uint8]string), - rawMessageChan: make(chan *rawMessage, defaultChanSize), + rawMessageChan: make(chan *rawMessage, chanSize), mux: http.NewServeMux(), client: new(http.Client), @@ -126,6 +130,11 @@ func (t *HTTPTransporter) Start() error { // Stop the transporter. func (t *HTTPTransporter) Stop() error { + return nil +} + +// Destroy the transporter. +func (t *HTTPTransporter) Destroy() error { close(t.rawMessageChan) return nil } From caa0cf25e6d652303e279c6aaeda7ae241b01e20 Mon Sep 17 00:00:00 2001 From: Yifan Gu Date: Wed, 18 Jun 2014 22:24:41 -0700 Subject: [PATCH 3/3] chore(codec, transporter) Changed codec and transporter's interface. Now they are fully separate. --- codec.go | 2 +- codec/gogoprotobufcodec.go | 55 +++++++++++--- message/accept.go | 4 +- message/commit.go | 2 +- message/message.go | 6 +- message/message_type.go | 4 +- message/pre_accept.go | 6 +- message/prepare.go | 4 +- message/propose.go | 2 +- message/timeout.go | 2 +- messenger.go | 10 +-- transporter.go | 12 ++- transporter/http_transporter.go | 125 ++++++++------------------------ 13 files changed, 103 insertions(+), 131 deletions(-) diff --git a/codec.go b/codec.go index 4e6f9ce..31b9466 100644 --- a/codec.go +++ b/codec.go @@ -15,7 +15,7 @@ type Codec interface { Marshal(msg message.Message) ([]byte, error) // Unmarshal a message from bytes. - Unmarshal(mtype uint8, data []byte) (message.Message, error) + Unmarshal(data []byte) (message.Message, error) // Stop a codec. Stop() error diff --git a/codec/gogoprotobufcodec.go b/codec/gogoprotobufcodec.go index 870d371..e68aabf 100644 --- a/codec/gogoprotobufcodec.go +++ b/codec/gogoprotobufcodec.go @@ -1,39 +1,71 @@ package codec import ( + "bytes" + "fmt" + "github.com/go-distributed/epaxos/message" + "github.com/golang/glog" ) +// The gogoprotobuf codec. type GoGoProtobufCodec struct{} -func NewGoGoProtobufHTTPTransporter() (*GoGoProtobufCodec, error) { - return new(GoGoProtobufCodec), nil +// Create a new gogpprotobuf codec. +func NewGoGoProtobuCodec() (*GoGoProtobufCodec, error) { + return &GoGoProtobufCodec{}, nil } -// Initial the gogoprotobuf (no-op for now). +// Initial the gogoprotobuf codec (no-op for now). func (gc *GoGoProtobufCodec) Initial() error { return nil } -// Stop the gogoprotobuf (no-op for now). +// Stop the gogoprotobuf codec (no-op for now). func (gc *GoGoProtobufCodec) Stop() error { return nil } -// Destroy the gogoprotobuf (no-op for now). +// Destroy the gogoprotobuf codec (no-op for now). func (gc *GoGoProtobufCodec) Destroy() error { return nil } // Marshal a message into a byte slice. func (gc *GoGoProtobufCodec) Marshal(msg message.Message) ([]byte, error) { - return msg.MarshalProtobuf() + b, err := msg.MarshalProtobuf() + if err != nil { + glog.Warning("GoGoProtobufCodec: Failed to Marshal: ", err) + return nil, err + } + + // Use bytes.Buffer to write efficiently. + var buf bytes.Buffer + if err = buf.WriteByte(byte(msg.Type())); err != nil { + glog.Warning("GoGoProtobufCodec: Failed to Marshal: ", err) + return nil, err + } + + n, err := buf.Write(b) + if err != nil || n != len(b) { + glog.Warning("GoGoProtobufCodec: Failed to Marshal: ", err) + return nil, err + } + return buf.Bytes(), nil } // Unmarshal a message from a byte slice. -func (c *GoGoProtobufCodec) Unmarshal(mtype uint8, data []byte) (message.Message, error) { +func (c *GoGoProtobufCodec) Unmarshal(data []byte) (message.Message, error) { var msg message.Message + buf := bytes.NewBuffer(data) + bt, err := buf.ReadByte() + if err != nil { + glog.Warning("GoGoProtobufCodec: Failed to Unmarshal: ", err) + return nil, err + } + + mtype := message.MsgType(bt) switch mtype { case message.ProposeMsg: msg = new(message.Propose) @@ -54,10 +86,15 @@ func (c *GoGoProtobufCodec) Unmarshal(mtype uint8, data []byte) (message.Message case message.PrepareReplyMsg: msg = new(message.PrepareReply) default: - panic("Unknown message type") + err := fmt.Errorf("Unknown message type %s\n", message.TypeToString(mtype)) + glog.Warning("GoGoProtobufCodec: Failed to Unmarshal: ", err) + return nil, err } - if err := msg.UnmarshalProtobuf(data); err != nil { + // TODO(yifan): Move this from the message package + // to the protobuf package. + if err := msg.UnmarshalProtobuf(buf.Bytes()); err != nil { + glog.Warning("GoGoProtobufCodec: Failed to Unmarshal: ", err) return nil, err } return msg, nil diff --git a/message/accept.go b/message/accept.go index e62684b..72e8a0c 100644 --- a/message/accept.go +++ b/message/accept.go @@ -29,7 +29,7 @@ func (a *Accept) Sender() uint8 { return a.From } -func (a *Accept) Type() uint8 { +func (a *Accept) Type() MsgType { return AcceptMsg } @@ -92,7 +92,7 @@ func (a *AcceptReply) Sender() uint8 { return a.From } -func (a *AcceptReply) Type() uint8 { +func (a *AcceptReply) Type() MsgType { return AcceptReplyMsg } diff --git a/message/commit.go b/message/commit.go index 54ac38d..860adb5 100644 --- a/message/commit.go +++ b/message/commit.go @@ -20,7 +20,7 @@ func (c *Commit) Sender() uint8 { return c.From } -func (c *Commit) Type() uint8 { +func (c *Commit) Type() MsgType { return CommitMsg } diff --git a/message/message.go b/message/message.go index b82668a..e3ff54d 100644 --- a/message/message.go +++ b/message/message.go @@ -2,7 +2,7 @@ package message type Message interface { Sender() uint8 - Type() uint8 + Type() MsgType Content() interface{} Replica() uint8 Instance() uint64 @@ -37,8 +37,8 @@ func MessageTypeString(m Message) string { } } -func TypeToString(msgType uint8) string { - switch msgType { +func TypeToString(mtype MsgType) string { + switch mtype { case ProposeMsg: return "Propose" case PreAcceptMsg: diff --git a/message/message_type.go b/message/message_type.go index a50b190..3be2961 100644 --- a/message/message_type.go +++ b/message/message_type.go @@ -1,7 +1,9 @@ package message +type MsgType uint8 + const ( - ProposeMsg uint8 = iota + 1 + ProposeMsg MsgType = iota + 1 PreAcceptMsg PreAcceptOkMsg PreAcceptReplyMsg diff --git a/message/pre_accept.go b/message/pre_accept.go index 18c9e2d..828487e 100644 --- a/message/pre_accept.go +++ b/message/pre_accept.go @@ -40,7 +40,7 @@ func (p *PreAccept) Sender() uint8 { return p.From } -func (p *PreAccept) Type() uint8 { +func (p *PreAccept) Type() MsgType { return PreAcceptMsg } @@ -103,7 +103,7 @@ func (p *PreAcceptOk) Sender() uint8 { return p.From } -func (p *PreAcceptOk) Type() uint8 { +func (p *PreAcceptOk) Type() MsgType { return PreAcceptOkMsg } @@ -157,7 +157,7 @@ func (p *PreAcceptReply) Sender() uint8 { return p.From } -func (p *PreAcceptReply) Type() uint8 { +func (p *PreAcceptReply) Type() MsgType { return PreAcceptReplyMsg } diff --git a/message/prepare.go b/message/prepare.go index d0d79c0..bf241ae 100644 --- a/message/prepare.go +++ b/message/prepare.go @@ -33,7 +33,7 @@ func (p *Prepare) Sender() uint8 { return p.From } -func (p *Prepare) Type() uint8 { +func (p *Prepare) Type() MsgType { return PrepareMsg } @@ -91,7 +91,7 @@ func (p *PrepareReply) Sender() uint8 { return p.From } -func (p *PrepareReply) Type() uint8 { +func (p *PrepareReply) Type() MsgType { return PrepareReplyMsg } diff --git a/message/propose.go b/message/propose.go index 06673d5..6ed38e5 100644 --- a/message/propose.go +++ b/message/propose.go @@ -26,7 +26,7 @@ func (p *Propose) Sender() uint8 { return p.From } -func (p *Propose) Type() uint8 { +func (p *Propose) Type() MsgType { return ProposeMsg } func (p *Propose) Content() interface{} { diff --git a/message/timeout.go b/message/timeout.go index 902cf9a..eea1468 100644 --- a/message/timeout.go +++ b/message/timeout.go @@ -14,7 +14,7 @@ func (t *Timeout) Sender() uint8 { return t.From } -func (t *Timeout) Type() uint8 { +func (t *Timeout) Type() MsgType { return TimeoutMsg } diff --git a/messenger.go b/messenger.go index 5f44c14..bbf1e8e 100644 --- a/messenger.go +++ b/messenger.go @@ -50,7 +50,7 @@ func (m *EpaxosMessenger) Send(to uint8, msg message.Message) error { if err != nil { return err } - return m.Tr.Send(m.Hostports[to], msg.Type(), data) + return m.Tr.Send(m.Hostports[to], data) } // Multicast a message to the fast quorum. @@ -70,7 +70,7 @@ func (m *EpaxosMessenger) MulticastFastQuorum(msg message.Message) error { // Skip itSelf and one more. continue } - err = m.Tr.Send(m.Hostports[i], msg.Type(), data) + err = m.Tr.Send(m.Hostports[i], data) if err != nil { return err } @@ -89,7 +89,7 @@ func (m *EpaxosMessenger) Broadcast(msg message.Message) error { if i == m.Self { // Skip itself. continue } - err = m.Tr.Send(m.Hostports[i], msg.Type(), data) + err = m.Tr.Send(m.Hostports[i], data) if err != nil { return err } @@ -99,11 +99,11 @@ func (m *EpaxosMessenger) Broadcast(msg message.Message) error { // Tries to receive a message. func (m *EpaxosMessenger) Recv() (message.Message, error) { - mtype, data, err := m.Tr.Recv() + data, err := m.Tr.Recv() if err != nil { return nil, err } - return m.Codec.Unmarshal(mtype, data) + return m.Codec.Unmarshal(data) } // Start the messenger. diff --git a/transporter.go b/transporter.go index c79ddf0..47c445b 100644 --- a/transporter.go +++ b/transporter.go @@ -4,15 +4,13 @@ package epaxos // Send and Recv. type Transporter interface { // Send an encoded message to the host:port. - // This will block. We need the msgType here because - // we assume the message is not self-explained. - Send(hostport string, msgType uint8, b []byte) error + // This will block. We don't need the msgType here + // because we assume the message is self-explained. + Send(hostport string, b []byte) error // Receive an encoded message from some peer. - // Return the type of the message and the content. - // We need the msgType here because we assume the - // message is not self-explained. - Recv() (msgType uint8, b []byte, err error) + // Return the bytes form of the message. + Recv() (b []byte, err error) // Start the transporter, this will block if succeeds, // or return an error if it fails. diff --git a/transporter/http_transporter.go b/transporter/http_transporter.go index 85adcdd..bfab39f 100644 --- a/transporter/http_transporter.go +++ b/transporter/http_transporter.go @@ -7,30 +7,23 @@ import ( "net/http" "path" - "github.com/go-distributed/epaxos/message" "github.com/golang/glog" ) -const ( - prefix = "/epaxos" - proposePath = "/propose" - preAcceptPath = "/preAccept" - preAcceptOkPath = "/preAcceptOk" - preAcceptReplyPath = "/preAcceptReply" - acceptPath = "/accept" - acceptReplyPath = "/acceptReply" - commitPath = "/commit" - preparePath = "/prepare" - prepareReplyPath = "/prepareReply" +const prefix = "/epaxos" + +const defaultChanSize = 1024 + +var ( + internalMessagePath string + proposeRequestPath string ) // The buffer size for the internal channel. -const defaultChanSize = 1024 type rawMessage struct { - mtype uint8 - data []byte - err error + data []byte + err error } type HTTPTransporter struct { @@ -63,47 +56,30 @@ func NewHTTPTransporter(self string, chanSize int) (*HTTPTransporter, error) { // Install paths, so that we don't need to do join every time. func (t *HTTPTransporter) installPaths() { - t.path[message.ProposeMsg] = path.Join(prefix, proposePath) - t.path[message.PreAcceptMsg] = path.Join(prefix, preAcceptPath) - t.path[message.PreAcceptOkMsg] = path.Join(prefix, preAcceptOkPath) - t.path[message.PreAcceptReplyMsg] = path.Join(prefix, preAcceptReplyPath) - t.path[message.AcceptMsg] = path.Join(prefix, acceptPath) - t.path[message.AcceptReplyMsg] = path.Join(prefix, acceptReplyPath) - t.path[message.CommitMsg] = path.Join(prefix, commitPath) - t.path[message.PrepareMsg] = path.Join(prefix, preparePath) - t.path[message.PrepareReplyMsg] = path.Join(prefix, prepareReplyPath) + internalMessagePath = path.Join(prefix, "/internalMessage") + proposeRequestPath = path.Join(prefix, "/proposeRequest") } func (t *HTTPTransporter) installHandlers() { - t.mux.HandleFunc(t.path[message.ProposeMsg], t.proposeHandler) - t.mux.HandleFunc(t.path[message.PreAcceptMsg], t.preAcceptHandler) - t.mux.HandleFunc(t.path[message.PreAcceptOkMsg], t.preAcceptOkHandler) - t.mux.HandleFunc(t.path[message.PreAcceptReplyMsg], t.preAcceptReplyHandler) - t.mux.HandleFunc(t.path[message.AcceptMsg], t.acceptHandler) - t.mux.HandleFunc(t.path[message.AcceptReplyMsg], t.acceptReplyHandler) - t.mux.HandleFunc(t.path[message.CommitMsg], t.commitHandler) - t.mux.HandleFunc(t.path[message.PrepareMsg], t.prepareHandler) - t.mux.HandleFunc(t.path[message.PrepareReplyMsg], t.prepareReplyHandler) + t.mux.HandleFunc(internalMessagePath, t.internalMessageHandler) + t.mux.HandleFunc(proposeRequestPath, t.proposeHandler) } // Send an encoded message to the host:port. -// This will block. We need the msgType here because -// we assume the message is not self-explained. -func (t *HTTPTransporter) Send(hostport string, msgType uint8, b []byte) error { +// This will block. We don't need the msgType here +// because we assume the message is self-explained. +func (t *HTTPTransporter) Send(hostport string, b []byte) error { if hostport == t.self { glog.Warning("Sending message to self!") return nil } - // TODO(yifan): Support https. - targetURL := fmt.Sprintf("http://%s%s", hostport, t.path[msgType]) + targetURL := fmt.Sprintf("http://%s%s", hostport, internalMessagePath) // Send POST. - resp, err := t.client.Post(targetURL, "application/protobuf", bytes.NewBuffer(b)) + resp, err := t.client.Post(targetURL, "application/epaxos", bytes.NewReader(b)) if resp == nil || err != nil { - glog.Warning("HTTPTransporter: Post error for: ", - message.TypeToString(msgType), - err) + glog.Warning("HTTPTransporter: Failed to POST ", err) return err } defer resp.Body.Close() @@ -111,12 +87,10 @@ func (t *HTTPTransporter) Send(hostport string, msgType uint8, b []byte) error { } // Receive an encoded message from some peer. -// Return the type of the message and the content. -// We need the msgType here because we assume the -// message is not self-explained. -func (t *HTTPTransporter) Recv() (msgType uint8, b []byte, err error) { +// Return the bytes form of the message. +func (t *HTTPTransporter) Recv() (b []byte, err error) { rawMsg := <-t.rawMessageChan - return rawMsg.mtype, rawMsg.data, rawMsg.err + return rawMsg.data, rawMsg.err } // Start the transporter, it will block until success or failure. @@ -139,55 +113,16 @@ func (t *HTTPTransporter) Destroy() error { return nil } -// Message handlers -func (t *HTTPTransporter) proposeHandler(w http.ResponseWriter, r *http.Request) { - panic("Not implemented yet") -} - -func (t *HTTPTransporter) preAcceptHandler(w http.ResponseWriter, r *http.Request) { - t.handleMessage(message.PreAcceptMsg, r) -} - -func (t *HTTPTransporter) preAcceptOkHandler(w http.ResponseWriter, r *http.Request) { - t.handleMessage(message.PreAcceptOkMsg, r) -} - -func (t *HTTPTransporter) preAcceptReplyHandler(w http.ResponseWriter, r *http.Request) { - t.handleMessage(message.PreAcceptReplyMsg, r) -} - -func (t *HTTPTransporter) acceptHandler(w http.ResponseWriter, r *http.Request) { - t.handleMessage(message.AcceptMsg, r) -} - -func (t *HTTPTransporter) acceptReplyHandler(w http.ResponseWriter, r *http.Request) { - t.handleMessage(message.AcceptReplyMsg, r) -} - -func (t *HTTPTransporter) commitHandler(w http.ResponseWriter, r *http.Request) { - t.handleMessage(message.CommitMsg, r) -} - -func (t *HTTPTransporter) prepareHandler(w http.ResponseWriter, r *http.Request) { - t.handleMessage(message.PrepareMsg, r) -} - -func (t *HTTPTransporter) prepareReplyHandler(w http.ResponseWriter, r *http.Request) { - t.handleMessage(message.PrepareReplyMsg, r) -} - -// Send the messag type and raw bytes into the internal channel. -func (t *HTTPTransporter) handleMessage(msgType uint8, r *http.Request) { +// Handle incoming messages (except for propose). +func (t *HTTPTransporter) internalMessageHandler(w http.ResponseWriter, r *http.Request) { b, err := ioutil.ReadAll(r.Body) if err != nil { - glog.Warning("HTTPTransporter: Read HTTP body error for: ", - message.TypeToString(msgType), - err) + glog.Warning("HTTPTransporter: Read HTTP body error for: ", err) } + t.rawMessageChan <- &rawMessage{b, err} +} - t.rawMessageChan <- &rawMessage{ - mtype: msgType, - data: b, - err: err, - } +// Handle incoming propose requests. +func (t *HTTPTransporter) proposeHandler(w http.ResponseWriter, r *http.Request) { + panic("Not implemented yet") }