diff --git a/README.markdown b/README.markdown index 64c1e83..5878924 100644 --- a/README.markdown +++ b/README.markdown @@ -12,11 +12,12 @@ Wray is only a client for Faye. You will need to setup a server using Ruby or No ###Subscribing to channels -``` +```go package main import "github.com/pythonandchips/wray" import "fmt" +import "time" func main() { //register the types of transport you want available. Only long-polling is currently supported @@ -26,24 +27,47 @@ func main() { client := wray.NewFayeClient("http://localhost:5000/faye") //subscribe to the channels you want to listen to - client.Subscribe("/foo", false, func(message wray.Message) { + _, err := client.Subscribe("/foo", false, func(message wray.Message) { fmt.Println("-------------------------------------------") fmt.Println(message.Data) }) + if err != nil { + fmt.Println("Subscription to /foo failed", err) + } + //wildcards can be used to subscribe to multipule channels - client.Subscribe("/foo/*", false, func(message wray.Message) { + promise, _ = client.Subscribe("/foo/*", false, func(message wray.Message) { fmt.Println("-------------------------------------------") fmt.Println(message.Data) }) + if !promise.Successful() { + fmt.Println("Subscription to /foo/* failed", promise.Error()) + } + + // try to subscribe forever + for { + _, err = client.Subscribe("/foo/*", false, func(message wray.Message) { + fmt.Println("-------------------------------------------") + fmt.Println(message.Data) + }) + + if err == nil { + break // break out of the loop if there was no error + } + + time.Sleep(1*time.Second) + } + //start listening on all subscribed channels and hold the process open client.Listen() } ``` ###Publishing to channels -``` + +```go package main import "github.com/pythonandchips/wray" diff --git a/go_faye.go b/go_faye.go index ab78eed..f81ae54 100644 --- a/go_faye.go +++ b/go_faye.go @@ -1,6 +1,7 @@ package wray import ( + "errors" "fmt" "path/filepath" "strings" @@ -43,6 +44,15 @@ type Subscription struct { type SubscriptionPromise struct { subscription Subscription + subError error +} + +func (self SubscriptionPromise) Error() error { + return self.subError +} + +func (self SubscriptionPromise) Successful() bool { + return self.subError == nil } func NewFayeClient(url string) *FayeClient { @@ -80,16 +90,32 @@ func (self *FayeClient) handshake() { } } -func (self *FayeClient) Subscribe(channel string, force bool, callback func(Message)) SubscriptionPromise { +func (self *FayeClient) Subscribe(channel string, force bool, callback func(Message)) (promise SubscriptionPromise, err error) { if self.state == UNCONNECTED { self.handshake() } subscriptionParams := map[string]interface{}{"channel": "/meta/subscribe", "clientId": self.clientId, "subscription": channel, "id": "1"} subscription := Subscription{channel: channel, callback: callback} - //TODO: deal with subscription failures - self.transport.send(subscriptionParams) + + res, err := self.transport.send(subscriptionParams) + promise = SubscriptionPromise{subscription, nil} + + if err != nil { + promise.subError = err + return + } + + if !res.successful { + // TODO: put more information in the error message about why it failed + err = errors.New("Response was unsuccessful") + promise.subError = err + return + } + + // don't add to the subscriptions until we know it succeeded self.subscriptions = append(self.subscriptions, subscription) - return SubscriptionPromise{subscription} + + return } func (self *FayeClient) handleResponse(response Response) { diff --git a/go_faye_test.go b/go_faye_test.go index fd88f99..fb8c57f 100644 --- a/go_faye_test.go +++ b/go_faye_test.go @@ -43,6 +43,7 @@ func TestSubscribe(t *testing.T) { var fakeHttpTransport *FakeHttpTransport var subscriptionParams map[string]interface{} var response Response + var err error Given(func() { response = Response{id: "1", channel: "/meta/handshake", successful: true, clientId: "client4", supportedConnectionTypes: []string{"long-polling"}} }) @@ -53,8 +54,9 @@ func TestSubscribe(t *testing.T) { subscriptionParams = map[string]interface{}{"channel": "/meta/subscribe", "clientId": response.clientId, "subscription": "/foo/*", "id": "1"} }) Given(func() { callback = func(message Message) {} }) - When(func() { subscriptionPromise = fayeClient.Subscribe("/foo/*", false, callback) }) + When(func() { subscriptionPromise, err = fayeClient.Subscribe("/foo/*", false, callback) }) Convey("connects the faye client", func() { + Then(func() { So(err, ShouldEqual, nil) }) Then(func() { So(fayeClient.state, ShouldEqual, CONNECTED) }) }) Convey("add the subscription to the client", func() { @@ -74,6 +76,41 @@ func TestSubscribe(t *testing.T) { }) } +func TestSubscriptionError(t *testing.T) { + Convey("subscribe to a channel when unconnected", t, func() { + var fayeClient FayeClient + var callback func(Message) + var subscriptionPromise SubscriptionPromise + var fakeHttpTransport *FakeHttpTransport + var subscriptionParams map[string]interface{} + var failedResponse Response + var clientId string = "client1" + var err error + Given(func() { + failedResponse = Response{id: "1", channel: "/meta/subscribe", successful: false, clientId: clientId, supportedConnectionTypes: []string{"long-polling"}} + }) + Given(func() { fakeHttpTransport = &FakeHttpTransport{usable: true, response: failedResponse} }) + Given(func() { registeredTransports = []Transport{fakeHttpTransport} }) + Given(func() { fayeClient = BuildFayeClient().WithTransport(fakeHttpTransport).Client() }) + Given(func() { fayeClient.state = CONNECTED }) + Given(func() { + subscriptionParams = map[string]interface{}{"channel": "/meta/subscribe", "clientId": clientId, "subscription": "/foo/*", "id": "1"} + }) + Given(func() { callback = func(message Message) {} }) + When(func() { subscriptionPromise, err = fayeClient.Subscribe("/foo/*", false, callback) }) + Convey("fails to subscribe", func() { + Then(func() { So(err, ShouldNotEqual, nil) }) + Then(func() { So(subscriptionPromise.Successful(), ShouldEqual, false) }) + }) + Convey("not add the subscription to the client", func() { + Then(func() { So(len(fayeClient.subscriptions), ShouldEqual, 0) }) + }) + Convey("the client send the subscription to the server", func() { + Then(func() { So(fakeHttpTransport.sentParams, ShouldResemble, subscriptionParams) }) + }) + }) +} + func TestPerformHandshake(t *testing.T) { Convey("successful handshake with server", t, func() { var fayeClient FayeClient @@ -163,29 +200,32 @@ func TestHandleResponse(t *testing.T) { var subscriptions []Subscription var firstParams map[string]interface{} var secondParams map[string]interface{} - var firstMessages []Message - var secondMessages []Message + var firstMessages []map[string]interface{} + var secondMessages []map[string]interface{} Given(func() { - subscriptions = []Subscription{Subscription{"/foo/bar", func(message Message) { firstMessages = append(firstMessages, message) }}, - Subscription{"/foo/*", func(message Message) { secondMessages = append(secondMessages, message) }}, + subscriptions = []Subscription{ + {"/foo/bar", func(message Message) { firstMessages = append(firstMessages, message.Data) }}, + {"/foo/*", func(message Message) { secondMessages = append(secondMessages, message.Data) }}, } }) Given(func() { firstParams = map[string]interface{}{"foo": "bar"} }) Given(func() { secondParams = map[string]interface{}{"baz": "qux"} }) Given(func() { fayeClient = BuildFayeClient().WithSubscriptions(subscriptions).Client() }) Given(func() { - messages = []Message{Message{Channel: "/foo/bar", Id: "1", Data: firstParams}, - Message{Channel: "/foo/quz", Id: "1", Data: secondParams}} + messages = []Message{ + {Channel: "/foo/bar", Id: "1", Data: firstParams}, + {Channel: "/foo/quz", Id: "1", Data: secondParams}, + } }) Given(func() { response = Response{messages: messages, channel: "/meta/connect", clientId: "client1"} }) When(func() { fayeClient.handleResponse(response) }) //need a very short sleep in here to allow the go routines to complete //as all they are doing is assigning a variable 10 milliseconds shoule be more than enough - Wait(10 * time.Millisecond) - Then(func() { So(firstMessages[0].Data, ShouldResemble, firstParams) }) + Wait(100 * time.Millisecond) + Then(func() { So(firstMessages, ShouldContain, firstParams) }) Then(func() { So(len(secondMessages), ShouldEqual, 2) }) - Then(func() { So(secondMessages[0].Data, ShouldResemble, firstParams) }) - Then(func() { So(secondMessages[1].Data, ShouldResemble, secondParams) }) + Then(func() { So(secondMessages, ShouldContain, firstParams) }) + Then(func() { So(secondMessages, ShouldContain, secondParams) }) }) } diff --git a/response.go b/response.go index e9815c3..d2fe7ff 100644 --- a/response.go +++ b/response.go @@ -1,61 +1,75 @@ package wray type Response struct { - id string - channel string - successful bool - clientId string + id string + channel string + successful bool + clientId string supportedConnectionTypes []string - messages []Message - error error + messages []Message + error error } type Message struct { Channel string - Id string - Data map[string]interface{} + Id string + Data map[string]interface{} } func newResponse(data []interface{}) Response { headerData := data[0].(map[string]interface{}) messagesData := data[1.:] messages := parseMessages(messagesData) + var id string if headerData["id"] != nil { id = headerData["id"].(string) } + supportedConnectionTypes := []string{} + if headerData["supportedConnectionTypes"] != nil { d := headerData["supportedConnectionTypes"].([]interface{}) - for _, sct := range(d) { + for _, sct := range d { supportedConnectionTypes = append(supportedConnectionTypes, sct.(string)) } } + var clientId string if headerData["clientId"] != nil { clientId = headerData["clientId"].(string) } - return Response{id: id, - clientId: clientId, - channel: headerData["channel"].(string), - successful: headerData["successful"].(bool), - messages: messages, - supportedConnectionTypes: supportedConnectionTypes} + + return Response{ + id: id, + clientId: clientId, + channel: headerData["channel"].(string), + successful: headerData["successful"].(bool), + messages: messages, + supportedConnectionTypes: supportedConnectionTypes, + } } func parseMessages(data []interface{}) []Message { messages := []Message{} - for _, messageData := range(data) { + + for _, messageData := range data { + m := messageData.(map[string]interface{}) var id string + if m["id"] != nil { id = m["id"].(string) } - message := Message{Channel: m["channel"].(string), - Id: id, - Data: m["data"].(map[string]interface{})} + + message := Message{ + Channel: m["channel"].(string), + Id: id, + Data: m["data"].(map[string]interface{}), + } + messages = append(messages, message) } + return messages } -