Skip to content

Commit

Permalink
Adds logging + improves url parsing fault tolerance
Browse files Browse the repository at this point in the history
  • Loading branch information
Eagerod committed May 7, 2023
1 parent 764e73b commit 4711f9b
Showing 1 changed file with 18 additions and 5 deletions.
23 changes: 18 additions & 5 deletions pkg/rmqhttp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"path"
"time"
)

Expand All @@ -17,7 +19,7 @@ type HttpController struct {
rmq *RMQ
queue *amqp.Queue

managementConnectionString string
managementUrl *url.URL
}

func NewHttpController() *HttpController {
Expand All @@ -44,7 +46,12 @@ func (hc *HttpController) Connect(connectionString, queueName string) error {
}

func (hc *HttpController) SetManagementConnectionString(mcs string) {
hc.managementConnectionString = mcs
u, err := url.Parse(mcs)
if err != nil {
panic(err)
}

hc.managementUrl = u
}

func (hc *HttpController) respondError(w http.ResponseWriter, statusCode int, message string) {
Expand Down Expand Up @@ -77,6 +84,7 @@ func (hc *HttpController) HttpHandler(w http.ResponseWriter, r *http.Request) {

channel, err := hc.rmq.LockChannel()
if err != nil {
log.Error(err)
hc.respondError(w, http.StatusInternalServerError, "Failed to lock channel")
return
}
Expand Down Expand Up @@ -116,6 +124,7 @@ func (hc *HttpController) HealthHandler(w http.ResponseWriter, r *http.Request)

queueInspect, err := channel.QueueInspect(queue)
if err != nil {
log.Error(err)
hc.respondError(w, http.StatusInternalServerError, "Failed to inspect DLQ")
return
}
Expand All @@ -128,13 +137,16 @@ func (hc *HttpController) HealthHandler(w http.ResponseWriter, r *http.Request)
}

func (hc *HttpController) StatsHandler(w http.ResponseWriter, r *http.Request) {
if hc.managementConnectionString == "" {
if hc.managementUrl == nil {
log.Error("Management API not configured")
hc.respondError(w, http.StatusInternalServerError, "Management API not configured")
return
}

// Currently only supports default vhost.
fullManagementUrl := fmt.Sprintf("%sapi/queues/%%2F/%s", hc.managementConnectionString, hc.queue.Name)
u, _ := url.Parse(hc.managementUrl.String())
u.Path = path.Join(u.Path, "api", "queues", "%%2F", hc.queue.Name)
fullManagementUrl := u.String()

client := &http.Client{}
req, err := http.NewRequest("GET", fullManagementUrl, nil)
Expand All @@ -143,8 +155,8 @@ func (hc *HttpController) StatsHandler(w http.ResponseWriter, r *http.Request) {
}

resp, err := client.Do(req)

if err != nil {
log.Error(err)
hc.respondError(w, http.StatusInternalServerError, "Failed to get data")
return
}
Expand All @@ -154,6 +166,7 @@ func (hc *HttpController) StatsHandler(w http.ResponseWriter, r *http.Request) {

stats := rmqStats{}
if err := json.Unmarshal(body, &stats); err != nil {
log.Error(err)
hc.respondError(w, http.StatusInternalServerError, "Failed to parse response")
return
}
Expand Down

0 comments on commit 4711f9b

Please sign in to comment.