Skip to content

Commit

Permalink
Merge pull request #137 from chzyer-dev/use_builtin_json_pkg
Browse files Browse the repository at this point in the history
remove go-simplejson dependency
  • Loading branch information
mreiferson committed Apr 30, 2015
2 parents 46348ca + 32c4c8d commit 91f8996
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 33 deletions.
38 changes: 25 additions & 13 deletions api_request.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package nsq

import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"time"

"github.com/bitly/go-simplejson"
)

type deadlinedConn struct {
Expand Down Expand Up @@ -39,40 +38,53 @@ func newDeadlineTransport(timeout time.Duration) *http.Transport {
return transport
}

func apiRequestNegotiateV1(method string, endpoint string, body io.Reader) (*simplejson.Json, error) {
type wrappedResp struct {
Status string `json:"status_txt"`
StatusCode int `json:"status_code"`
Data interface{} `json:"data"`
}

// stores the result in the value pointed to by ret(must be a pointer)
func apiRequestNegotiateV1(method string, endpoint string, body io.Reader, ret interface{}) error {
httpclient := &http.Client{Transport: newDeadlineTransport(2 * time.Second)}
req, err := http.NewRequest(method, endpoint, body)
if err != nil {
return nil, err
return err
}

req.Header.Add("Accept", "application/vnd.nsq; version=1.0")

resp, err := httpclient.Do(req)
if err != nil {
return nil, err
return err
}

respBody, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, err
return err
}

if resp.StatusCode != 200 {
return nil, fmt.Errorf("got response %s %q", resp.Status, respBody)
return fmt.Errorf("got response %s %q", resp.Status, respBody)
}

if len(respBody) == 0 {
respBody = []byte("{}")
}

data, err := simplejson.NewJson(respBody)
if err != nil {
return nil, err
if resp.Header.Get("X-NSQ-Content-Type") == "nsq; version=1.0" {
return json.Unmarshal(respBody, ret)
}

if resp.Header.Get("X-NSQ-Content-Type") == "nsq; version=1.0" {
return data, nil
wResp := &wrappedResp{
Data: ret,
}
return data.Get("data"), nil

if err = json.Unmarshal(respBody, wResp); err != nil {
return err
}

// wResp.StatusCode here is equal to resp.StatusCode, so ignore it
return nil
}
36 changes: 20 additions & 16 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,21 @@ func (r *Consumer) nextLookupdEndpoint() string {
return u.String()
}

type lookupResp struct {
Channels []string `json:"channels"`
Producers []*peerInfo `json:"producers"`
Timestamp int64 `json:"timestamp"`
}

type peerInfo struct {
RemoteAddress string `json:"remote_address"`
Hostname string `json:"hostname"`
BroadcastAddress string `json:"broadcast_address"`
TCPPort int `json:"tcp_port"`
HTTPPort int `json:"http_port"`
Version string `json:"version"`
}

// make an HTTP req to one of the configured nsqlookupd instances to discover
// which nsqd's provide the topic we are consuming.
//
Expand All @@ -440,28 +455,17 @@ func (r *Consumer) queryLookupd() {

r.log(LogLevelInfo, "querying nsqlookupd %s", endpoint)

data, err := apiRequestNegotiateV1("GET", endpoint, nil)
var data lookupResp
err := apiRequestNegotiateV1("GET", endpoint, nil, &data)
if err != nil {
r.log(LogLevelError, "error querying nsqlookupd (%s) - %s", endpoint, err)
return
}

// {
// "channels": [],
// "producers": [
// {
// "broadcast_address": "jehiah-air.local",
// "http_port": 4151,
// "tcp_port": 4150
// }
// ],
// "timestamp": 1340152173
// }
nsqdAddrs := make([]string, 0)
for i := range data.Get("producers").MustArray() {
producer := data.Get("producers").GetIndex(i)
broadcastAddress := producer.Get("broadcast_address").MustString()
port := producer.Get("tcp_port").MustInt()
for _, producer := range data.Producers {
broadcastAddress := producer.BroadcastAddress
port := producer.TCPPort
joined := net.JoinHostPort(broadcastAddress, strconv.Itoa(port))
nsqdAddrs = append(nsqdAddrs, joined)
}
Expand Down
11 changes: 7 additions & 4 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nsq
import (
"bytes"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
Expand All @@ -14,8 +15,6 @@ import (
"strings"
"testing"
"time"

"github.com/bitly/go-simplejson"
)

type MyTestHandler struct {
Expand All @@ -39,12 +38,16 @@ func (h *MyTestHandler) HandleMessage(message *Message) error {
return errors.New("fail this message")
}

data, err := simplejson.NewJson(message.Body)
data := struct {
Msg string
}{}

err := json.Unmarshal(message.Body, &data)
if err != nil {
return err
}

msg, _ := data.Get("msg").String()
msg := data.Msg
if msg != "single" && msg != "double" {
h.t.Error("message 'action' was not correct: ", msg, data)
}
Expand Down

0 comments on commit 91f8996

Please sign in to comment.