Skip to content

Commit

Permalink
added metrics for transactional outbox (#193)
Browse files Browse the repository at this point in the history
* added metrics for transactional outbox

The follwoing metrics were added
outbox_total_records: reports the total amount of records currently in the outbox
outbox_pending_delivery: reports the total amount of records pending delivery currently in the outbox
outbox_pending_removal: reports the total amount of records that were sent and pending removal currently in the outbox

* reading status and count fields in the correct order from rows
  • Loading branch information
Guy Baron authored Oct 13, 2019
1 parent b109f06 commit 784d8f4
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 23 deletions.
4 changes: 4 additions & 0 deletions docs/METRICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@ grabbit exposes and reports the following metrics to Prometheus
| grabbit | handlers | latency | records the execution time of each run of a handler, having the handler's name, message type as labels|
| grabbit | messages | rejected_messages | increments each time a message gets rejected |
| grabbit | saga | timedout_sagas | counting the number of timedout saga instances |
| grabbit | outbox | outbox_total_records | reports the total amount of records currently in the outbox |
| grabbit | outbox | outbox_pending_delivery | reports the total amount of records pending delivery currently in the outbox |
| grabbit | outbox | outbox_pending_removal | reports the total amount of records that were sent and pending removal currently in the outbox |

15 changes: 8 additions & 7 deletions gbus/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,16 @@ type BusConfiguration struct {

//OutboxConfiguration configures the transactional outbox
type OutboxConfiguration struct {
/*
Ackers the number of goroutines configured to drain incoming ack/nack signals from the broker.
Increase this value if you are experiencing deadlocks.
Default is 10
*/
Ackers uint
//PageSize is the amount of pending messsage records the outbox selects from the database every iteration, the default is 500
PageSize uint
//MetricsInterval is the duration the outbox waits between each metrics report, default is 15 seconds
MetricsInterval time.Duration
//SendInterval is the duration the outbox waits before each iteration, default is 1 second
SendInterval time.Duration
/*
Expand All @@ -21,11 +29,4 @@ type OutboxConfiguration struct {
Default is 60 seconds
*/
ScavengeInterval time.Duration
/*
Ackers the number of goroutines configured to drain incoming ack/nack signals from the broker.
Increase this value if you are experiencing deadlocks.
Default is 10
*/

Ackers uint
}
27 changes: 27 additions & 0 deletions gbus/metrics/outbox_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package metrics

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var OutboxSize = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: grabbitPrefix,
Name: "outbox_total_records",
Subsystem: "outbox",
Help: "reports the total amount of records currently in the outbox",
})

var PendingMessages = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: grabbitPrefix,
Name: "outbox_pending_delivery",
Subsystem: "outbox",
Help: "reports the total amount of records pending delivery currently in the outbox",
})

var SentMessages = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: grabbitPrefix,
Name: "outbox_pending_removal",
Subsystem: "outbox",
Help: "reports the total amount of records that were sent and pending removal currently in the outbox",
})
83 changes: 67 additions & 16 deletions gbus/tx/mysql/txoutbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,22 @@ import (
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/metrics"
"github.com/wework/grabbit/gbus/tx"
)

var (
pending int
//waitingConfirm = 1
const (
Pending int = iota + 1
Sent
)

//TODO:get these values from configuration
var (
maxPageSize = 500
maxDeliveryAttempts = 50
sendInterval = time.Second

scavengeInterval = time.Second * 60
ackers = 10
scavengeInterval = time.Second * 60
metricsInterval = time.Second * 15
ackers = 10
)

//TxOutbox is a mysql based transactional outbox
Expand Down Expand Up @@ -114,7 +116,7 @@ func (outbox *TxOutbox) Save(tx *sql.Tx, exchange, routingKey string, amqpMessag
return err
}
unknownDeliverTag := -1
_, insertErr := tx.Exec(insertSQL, amqpMessage.MessageId, amqpMessage.Headers["x-msg-name"], unknownDeliverTag, exchange, routingKey, buf.Bytes(), pending)
_, insertErr := tx.Exec(insertSQL, amqpMessage.MessageId, amqpMessage.Headers["x-msg-name"], unknownDeliverTag, exchange, routingKey, buf.Bytes(), Pending)

return insertErr
}
Expand Down Expand Up @@ -149,6 +151,9 @@ func NewOutbox(svcName string, txProv gbus.TxProvider, purgeOnStartup bool, cfg
if cfg.ScavengeInterval.String() != "0s" {
scavengeInterval = cfg.ScavengeInterval
}
if cfg.MetricsInterval.String() != "0s" {
metricsInterval = cfg.MetricsInterval
}
if cfg.Ackers > 0 {
ackers = int(cfg.Ackers)
}
Expand Down Expand Up @@ -176,29 +181,75 @@ func (outbox *TxOutbox) ackRec() {

func (outbox *TxOutbox) processOutbox() {

send := time.NewTicker(sendInterval).C
// cleanUp := time.NewTicker(cleanupInterval).C
scavenge := time.NewTicker(scavengeInterval).C
send := time.NewTicker(sendInterval)
scavenge := time.NewTicker(scavengeInterval)
metrics := time.NewTicker(metricsInterval)

for {
select {
case <-outbox.exit:
send.Stop()
scavenge.Stop()
metrics.Stop()
return
//TODO:get time duration from configuration
case <-send:
case <-send.C:

err := outbox.sendMessages(outbox.getMessageRecords)
if err != nil {
outbox.log().WithError(err).Error("failed to send messages from outbox")
}

case <-scavenge:
case <-scavenge.C:
err := outbox.sendMessages(outbox.scavengeOrphanedRecords)
if err != nil {
outbox.log().WithError(err).Error("failed to scavenge records")
}
case <-metrics.C:
if err := outbox.reportMetrics(); err != nil {
outbox.log().WithError(err).Error("failed to report outbox meetrics")
}
}

}
}

func (outbox *TxOutbox) reportMetrics() error {

tx, txErr := outbox.txProv.New()
if txErr != nil {
return txErr
}

rows, qErr := tx.Query(`SELECT status, count(*) FROM ` + getOutboxName(outbox.svcName) + ` GROUP BY status`)
if qErr != nil {
_ = tx.Rollback()
return qErr
}

var totalOutboxSize int
for rows.Next() {
var count, status int
rows.Scan(&status, &count)
totalOutboxSize += count
switch status {
case Pending:
metrics.PendingMessages.Set(float64(count))
case Sent:
metrics.SentMessages.Set(float64(count))
}
}
metrics.OutboxSize.Set(float64(totalOutboxSize))

if closeErr := rows.Close(); closeErr != nil {
outbox.log().WithError(closeErr).Warn("failed closing rows after iteration for metric data")
}

if commitErr := tx.Commit(); commitErr != nil {
outbox.log().WithError(commitErr).Warn("failed committing transaction after iteration for metric data")
return commitErr
}
return nil
}

func (outbox *TxOutbox) updateAckedRecord(deliveryTag uint64) error {
Expand Down Expand Up @@ -237,12 +288,12 @@ func (outbox *TxOutbox) updateAckedRecord(deliveryTag uint64) error {
}

func (outbox *TxOutbox) getMessageRecords(tx *sql.Tx) (*sql.Rows, error) {
selectSQL := "SELECT rec_id, exchange, routing_key, publishing FROM " + getOutboxName(outbox.svcName) + " USE INDEX (status_delivery) WHERE status = 0 AND delivery_attempts < " + strconv.Itoa(maxDeliveryAttempts) + " ORDER BY rec_id ASC LIMIT " + strconv.Itoa(maxPageSize) + " FOR UPDATE SKIP LOCKED"
selectSQL := "SELECT rec_id, exchange, routing_key, publishing FROM " + getOutboxName(outbox.svcName) + " USE INDEX (status_delivery) WHERE status = " + strconv.Itoa(Pending) + " AND delivery_attempts < " + strconv.Itoa(maxDeliveryAttempts) + " ORDER BY rec_id ASC LIMIT " + strconv.Itoa(maxPageSize) + " FOR UPDATE SKIP LOCKED"
return tx.Query(selectSQL)
}

func (outbox *TxOutbox) scavengeOrphanedRecords(tx *sql.Tx) (*sql.Rows, error) {
selectSQL := "SELECT rec_id, exchange, routing_key, publishing FROM " + getOutboxName(outbox.svcName) + " WHERE status = 1 ORDER BY rec_id ASC LIMIT ? FOR UPDATE SKIP LOCKED"
selectSQL := "SELECT rec_id, exchange, routing_key, publishing FROM " + getOutboxName(outbox.svcName) + " WHERE status = " + strconv.Itoa(Sent) + " ORDER BY rec_id ASC LIMIT ? FOR UPDATE SKIP LOCKED"
return tx.Query(selectSQL, strconv.Itoa(maxPageSize))
}

Expand Down Expand Up @@ -310,7 +361,7 @@ func (outbox *TxOutbox) sendMessages(recordSelector func(tx *sql.Tx) (*sql.Rows,
outbox.log().WithField("messages_sent", len(successfulDeliveries)).Info("outbox relayed messages")
}
for deliveryTag, id := range successfulDeliveries {
_, updateErr := tx.Exec("UPDATE "+getOutboxName(outbox.svcName)+" SET status=1, delivery_tag=?, relay_id=? WHERE rec_id=?", deliveryTag, outbox.ID, id)
_, updateErr := tx.Exec("UPDATE "+getOutboxName(outbox.svcName)+" SET status="+strconv.Itoa(Sent)+", delivery_tag=?, relay_id=? WHERE rec_id=?", deliveryTag, outbox.ID, id)
if updateErr != nil {
outbox.log().WithError(updateErr).
WithFields(log.Fields{"record_id": id, "delivery_tag": deliveryTag, "relay_id": outbox.ID}).
Expand Down

0 comments on commit 784d8f4

Please sign in to comment.