Skip to content
This repository was archived by the owner on Aug 19, 2024. It is now read-only.

Subscription failure handling #3

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
32 changes: 28 additions & 4 deletions README.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ Wray is only a client for Faye. You will need to setup a server using Ruby or No

###Subscribing to channels

```
```go
package main

import "github.com/pythonandchips/wray"
import "fmt"
import "time"

func main() {
//register the types of transport you want available. Only long-polling is currently supported
Expand All @@ -26,24 +27,47 @@ func main() {
client := wray.NewFayeClient("http://localhost:5000/faye")

//subscribe to the channels you want to listen to
client.Subscribe("/foo", false, func(message wray.Message) {
_, err := client.Subscribe("/foo", false, func(message wray.Message) {
fmt.Println("-------------------------------------------")
fmt.Println(message.Data)
})

if err != nil {
fmt.Println("Subscription to /foo failed", err)
}

//wildcards can be used to subscribe to multipule channels
client.Subscribe("/foo/*", false, func(message wray.Message) {
promise, _ = client.Subscribe("/foo/*", false, func(message wray.Message) {
fmt.Println("-------------------------------------------")
fmt.Println(message.Data)
})

if !promise.Successful() {
fmt.Println("Subscription to /foo/* failed", promise.Error())
}

// try to subscribe forever
for {
_, err = client.Subscribe("/foo/*", false, func(message wray.Message) {
fmt.Println("-------------------------------------------")
fmt.Println(message.Data)
})

if err == nil {
break // break out of the loop if there was no error
}

time.Sleep(1*time.Second)
}

//start listening on all subscribed channels and hold the process open
client.Listen()
}
```

###Publishing to channels
```

```go
package main

import "github.com/pythonandchips/wray"
Expand Down
34 changes: 30 additions & 4 deletions go_faye.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package wray

import (
"errors"
"fmt"
"path/filepath"
"strings"
Expand Down Expand Up @@ -43,6 +44,15 @@ type Subscription struct {

type SubscriptionPromise struct {
subscription Subscription
subError error
}

func (self SubscriptionPromise) Error() error {
return self.subError
}

func (self SubscriptionPromise) Successful() bool {
return self.subError == nil
}

func NewFayeClient(url string) *FayeClient {
Expand Down Expand Up @@ -80,16 +90,32 @@ func (self *FayeClient) handshake() {
}
}

func (self *FayeClient) Subscribe(channel string, force bool, callback func(Message)) SubscriptionPromise {
func (self *FayeClient) Subscribe(channel string, force bool, callback func(Message)) (promise SubscriptionPromise, err error) {
if self.state == UNCONNECTED {
self.handshake()
}
subscriptionParams := map[string]interface{}{"channel": "/meta/subscribe", "clientId": self.clientId, "subscription": channel, "id": "1"}
subscription := Subscription{channel: channel, callback: callback}
//TODO: deal with subscription failures
self.transport.send(subscriptionParams)

res, err := self.transport.send(subscriptionParams)
promise = SubscriptionPromise{subscription, nil}

if err != nil {
promise.subError = err
return
}

if !res.successful {
// TODO: put more information in the error message about why it failed
err = errors.New("Response was unsuccessful")
promise.subError = err
return
}

// don't add to the subscriptions until we know it succeeded
self.subscriptions = append(self.subscriptions, subscription)
return SubscriptionPromise{subscription}

return
}

func (self *FayeClient) handleResponse(response Response) {
Expand Down
62 changes: 51 additions & 11 deletions go_faye_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func TestSubscribe(t *testing.T) {
var fakeHttpTransport *FakeHttpTransport
var subscriptionParams map[string]interface{}
var response Response
var err error
Given(func() {
response = Response{id: "1", channel: "/meta/handshake", successful: true, clientId: "client4", supportedConnectionTypes: []string{"long-polling"}}
})
Expand All @@ -53,8 +54,9 @@ func TestSubscribe(t *testing.T) {
subscriptionParams = map[string]interface{}{"channel": "/meta/subscribe", "clientId": response.clientId, "subscription": "/foo/*", "id": "1"}
})
Given(func() { callback = func(message Message) {} })
When(func() { subscriptionPromise = fayeClient.Subscribe("/foo/*", false, callback) })
When(func() { subscriptionPromise, err = fayeClient.Subscribe("/foo/*", false, callback) })
Convey("connects the faye client", func() {
Then(func() { So(err, ShouldEqual, nil) })
Then(func() { So(fayeClient.state, ShouldEqual, CONNECTED) })
})
Convey("add the subscription to the client", func() {
Expand All @@ -74,6 +76,41 @@ func TestSubscribe(t *testing.T) {
})
}

func TestSubscriptionError(t *testing.T) {
Convey("subscribe to a channel when unconnected", t, func() {
var fayeClient FayeClient
var callback func(Message)
var subscriptionPromise SubscriptionPromise
var fakeHttpTransport *FakeHttpTransport
var subscriptionParams map[string]interface{}
var failedResponse Response
var clientId string = "client1"
var err error
Given(func() {
failedResponse = Response{id: "1", channel: "/meta/subscribe", successful: false, clientId: clientId, supportedConnectionTypes: []string{"long-polling"}}
})
Given(func() { fakeHttpTransport = &FakeHttpTransport{usable: true, response: failedResponse} })
Given(func() { registeredTransports = []Transport{fakeHttpTransport} })
Given(func() { fayeClient = BuildFayeClient().WithTransport(fakeHttpTransport).Client() })
Given(func() { fayeClient.state = CONNECTED })
Given(func() {
subscriptionParams = map[string]interface{}{"channel": "/meta/subscribe", "clientId": clientId, "subscription": "/foo/*", "id": "1"}
})
Given(func() { callback = func(message Message) {} })
When(func() { subscriptionPromise, err = fayeClient.Subscribe("/foo/*", false, callback) })
Convey("fails to subscribe", func() {
Then(func() { So(err, ShouldNotEqual, nil) })
Then(func() { So(subscriptionPromise.Successful(), ShouldEqual, false) })
})
Convey("not add the subscription to the client", func() {
Then(func() { So(len(fayeClient.subscriptions), ShouldEqual, 0) })
})
Convey("the client send the subscription to the server", func() {
Then(func() { So(fakeHttpTransport.sentParams, ShouldResemble, subscriptionParams) })
})
})
}

func TestPerformHandshake(t *testing.T) {
Convey("successful handshake with server", t, func() {
var fayeClient FayeClient
Expand Down Expand Up @@ -163,29 +200,32 @@ func TestHandleResponse(t *testing.T) {
var subscriptions []Subscription
var firstParams map[string]interface{}
var secondParams map[string]interface{}
var firstMessages []Message
var secondMessages []Message
var firstMessages []map[string]interface{}
var secondMessages []map[string]interface{}
Given(func() {
subscriptions = []Subscription{Subscription{"/foo/bar", func(message Message) { firstMessages = append(firstMessages, message) }},
Subscription{"/foo/*", func(message Message) { secondMessages = append(secondMessages, message) }},
subscriptions = []Subscription{
{"/foo/bar", func(message Message) { firstMessages = append(firstMessages, message.Data) }},
{"/foo/*", func(message Message) { secondMessages = append(secondMessages, message.Data) }},
}
})
Given(func() { firstParams = map[string]interface{}{"foo": "bar"} })
Given(func() { secondParams = map[string]interface{}{"baz": "qux"} })
Given(func() { fayeClient = BuildFayeClient().WithSubscriptions(subscriptions).Client() })
Given(func() {
messages = []Message{Message{Channel: "/foo/bar", Id: "1", Data: firstParams},
Message{Channel: "/foo/quz", Id: "1", Data: secondParams}}
messages = []Message{
{Channel: "/foo/bar", Id: "1", Data: firstParams},
{Channel: "/foo/quz", Id: "1", Data: secondParams},
}
})
Given(func() { response = Response{messages: messages, channel: "/meta/connect", clientId: "client1"} })
When(func() { fayeClient.handleResponse(response) })
//need a very short sleep in here to allow the go routines to complete
//as all they are doing is assigning a variable 10 milliseconds shoule be more than enough
Wait(10 * time.Millisecond)
Then(func() { So(firstMessages[0].Data, ShouldResemble, firstParams) })
Wait(100 * time.Millisecond)
Then(func() { So(firstMessages, ShouldContain, firstParams) })
Then(func() { So(len(secondMessages), ShouldEqual, 2) })
Then(func() { So(secondMessages[0].Data, ShouldResemble, firstParams) })
Then(func() { So(secondMessages[1].Data, ShouldResemble, secondParams) })
Then(func() { So(secondMessages, ShouldContain, firstParams) })
Then(func() { So(secondMessages, ShouldContain, secondParams) })
})
}

Expand Down
54 changes: 34 additions & 20 deletions response.go
Original file line number Diff line number Diff line change
@@ -1,61 +1,75 @@
package wray

type Response struct {
id string
channel string
successful bool
clientId string
id string
channel string
successful bool
clientId string
supportedConnectionTypes []string
messages []Message
error error
messages []Message
error error
}

type Message struct {
Channel string
Id string
Data map[string]interface{}
Id string
Data map[string]interface{}
}

func newResponse(data []interface{}) Response {
headerData := data[0].(map[string]interface{})
messagesData := data[1.:]
messages := parseMessages(messagesData)

var id string
if headerData["id"] != nil {
id = headerData["id"].(string)
}

supportedConnectionTypes := []string{}

if headerData["supportedConnectionTypes"] != nil {
d := headerData["supportedConnectionTypes"].([]interface{})
for _, sct := range(d) {
for _, sct := range d {
supportedConnectionTypes = append(supportedConnectionTypes, sct.(string))
}
}

var clientId string
if headerData["clientId"] != nil {
clientId = headerData["clientId"].(string)
}
return Response{id: id,
clientId: clientId,
channel: headerData["channel"].(string),
successful: headerData["successful"].(bool),
messages: messages,
supportedConnectionTypes: supportedConnectionTypes}

return Response{
id: id,
clientId: clientId,
channel: headerData["channel"].(string),
successful: headerData["successful"].(bool),
messages: messages,
supportedConnectionTypes: supportedConnectionTypes,
}
}

func parseMessages(data []interface{}) []Message {
messages := []Message{}
for _, messageData := range(data) {

for _, messageData := range data {

m := messageData.(map[string]interface{})
var id string

if m["id"] != nil {
id = m["id"].(string)
}
message := Message{Channel: m["channel"].(string),
Id: id,
Data: m["data"].(map[string]interface{})}

message := Message{
Channel: m["channel"].(string),
Id: id,
Data: m["data"].(map[string]interface{}),
}

messages = append(messages, message)
}

return messages
}