Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added metrics for transactional outbox #193

Merged
merged 3 commits into from
Oct 13, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/METRICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@ grabbit exposes and reports the following metrics to Prometheus
| grabbit | handlers | latency | records the execution time of each run of a handler, having the handler's name, message type as labels|
| grabbit | messages | rejected_messages | increments each time a message gets rejected |
| grabbit | saga | timedout_sagas | counting the number of timedout saga instances |
| grabbit | outbox | outbox_total_records | reports the total amount of records currently in the outbox |
| grabbit | outbox | outbox_pending_delivery | reports the total amount of records pending delivery currently in the outbox |
| grabbit | outbox | outbox_pending_removal | reports the total amount of records that were sent and pending removal currently in the outbox |

15 changes: 8 additions & 7 deletions gbus/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,16 @@ type BusConfiguration struct {

//OutboxConfiguration configures the transactional outbox
type OutboxConfiguration struct {
/*
Ackers the number of goroutines configured to drain incoming ack/nack signals from the broker.
Increase this value if you are experiencing deadlocks.
Default is 10
*/
Ackers uint
//PageSize is the amount of pending messsage records the outbox selects from the database every iteration, the default is 500
PageSize uint
//MetricsInterval is the duration the outbox waits between each metrics report, default is 15 seconds
MetricsInterval time.Duration
//SendInterval is the duration the outbox waits before each iteration, default is 1 second
SendInterval time.Duration
/*
Expand All @@ -21,11 +29,4 @@ type OutboxConfiguration struct {
Default is 60 seconds
*/
ScavengeInterval time.Duration
/*
Ackers the number of goroutines configured to drain incoming ack/nack signals from the broker.
Increase this value if you are experiencing deadlocks.
Default is 10
*/

Ackers uint
}
27 changes: 27 additions & 0 deletions gbus/metrics/outbox_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package metrics

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var OutboxSize = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: grabbitPrefix,
Name: "outbox_total_records",
Subsystem: "outbox",
Help: "reports the total amount of records currently in the outbox",
})

var PendingMessages = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: grabbitPrefix,
Name: "outbox_pending_delivery",
Subsystem: "outbox",
Help: "reports the total amount of records pending delivery currently in the outbox",
})

var SentMessages = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: grabbitPrefix,
Name: "outbox_pending_removal",
Subsystem: "outbox",
Help: "reports the total amount of records that were sent and pending removal currently in the outbox",
})
83 changes: 67 additions & 16 deletions gbus/tx/mysql/txoutbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,22 @@ import (
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/metrics"
"github.com/wework/grabbit/gbus/tx"
)

var (
pending int
//waitingConfirm = 1
const (
Pending int = iota + 1
Sent
)

//TODO:get these values from configuration
var (
maxPageSize = 500
maxDeliveryAttempts = 50
sendInterval = time.Second

scavengeInterval = time.Second * 60
ackers = 10
scavengeInterval = time.Second * 60
metricsInterval = time.Second * 15
ackers = 10
)

//TxOutbox is a mysql based transactional outbox
Expand Down Expand Up @@ -114,7 +116,7 @@ func (outbox *TxOutbox) Save(tx *sql.Tx, exchange, routingKey string, amqpMessag
return err
}
unknownDeliverTag := -1
_, insertErr := tx.Exec(insertSQL, amqpMessage.MessageId, amqpMessage.Headers["x-msg-name"], unknownDeliverTag, exchange, routingKey, buf.Bytes(), pending)
_, insertErr := tx.Exec(insertSQL, amqpMessage.MessageId, amqpMessage.Headers["x-msg-name"], unknownDeliverTag, exchange, routingKey, buf.Bytes(), Pending)

return insertErr
}
Expand Down Expand Up @@ -149,6 +151,9 @@ func NewOutbox(svcName string, txProv gbus.TxProvider, purgeOnStartup bool, cfg
if cfg.ScavengeInterval.String() != "0s" {
scavengeInterval = cfg.ScavengeInterval
}
if cfg.MetricsInterval.String() != "0s" {
metricsInterval = cfg.MetricsInterval
}
if cfg.Ackers > 0 {
ackers = int(cfg.Ackers)
}
Expand Down Expand Up @@ -176,29 +181,75 @@ func (outbox *TxOutbox) ackRec() {

func (outbox *TxOutbox) processOutbox() {

send := time.NewTicker(sendInterval).C
// cleanUp := time.NewTicker(cleanupInterval).C
scavenge := time.NewTicker(scavengeInterval).C
send := time.NewTicker(sendInterval)
scavenge := time.NewTicker(scavengeInterval)
metrics := time.NewTicker(metricsInterval)

for {
select {
case <-outbox.exit:
send.Stop()
scavenge.Stop()
metrics.Stop()
return
//TODO:get time duration from configuration
case <-send:
case <-send.C:

err := outbox.sendMessages(outbox.getMessageRecords)
if err != nil {
outbox.log().WithError(err).Error("failed to send messages from outbox")
}

case <-scavenge:
case <-scavenge.C:
err := outbox.sendMessages(outbox.scavengeOrphanedRecords)
if err != nil {
outbox.log().WithError(err).Error("failed to scavenge records")
}
case <-metrics.C:
if err := outbox.reportMetrics(); err != nil {
outbox.log().WithError(err).Error("failed to report outbox meetrics")
}
}

}
}

func (outbox *TxOutbox) reportMetrics() error {

tx, txErr := outbox.txProv.New()
if txErr != nil {
return txErr
}

rows, qErr := tx.Query(`SELECT status, count(*) FROM ` + getOutboxName(outbox.svcName) + ` GROUP BY status`)
if qErr != nil {
_ = tx.Rollback()
return qErr
}

var totalOutboxSize int
for rows.Next() {
var count, status int
rows.Scan(&status, &count)
totalOutboxSize += count
switch status {
case Pending:
metrics.PendingMessages.Set(float64(count))
case Sent:
metrics.SentMessages.Set(float64(count))
}
}
metrics.OutboxSize.Set(float64(totalOutboxSize))

if closeErr := rows.Close(); closeErr != nil {
outbox.log().WithError(closeErr).Warn("failed closing rows after iteration for metric data")
}

if commitErr := tx.Commit(); commitErr != nil {
outbox.log().WithError(commitErr).Warn("failed committing transaction after iteration for metric data")
return commitErr
}
return nil
}

func (outbox *TxOutbox) updateAckedRecord(deliveryTag uint64) error {
Expand Down Expand Up @@ -237,12 +288,12 @@ func (outbox *TxOutbox) updateAckedRecord(deliveryTag uint64) error {
}

func (outbox *TxOutbox) getMessageRecords(tx *sql.Tx) (*sql.Rows, error) {
selectSQL := "SELECT rec_id, exchange, routing_key, publishing FROM " + getOutboxName(outbox.svcName) + " USE INDEX (status_delivery) WHERE status = 0 AND delivery_attempts < " + strconv.Itoa(maxDeliveryAttempts) + " ORDER BY rec_id ASC LIMIT " + strconv.Itoa(maxPageSize) + " FOR UPDATE SKIP LOCKED"
selectSQL := "SELECT rec_id, exchange, routing_key, publishing FROM " + getOutboxName(outbox.svcName) + " USE INDEX (status_delivery) WHERE status = " + strconv.Itoa(Pending) + " AND delivery_attempts < " + strconv.Itoa(maxDeliveryAttempts) + " ORDER BY rec_id ASC LIMIT " + strconv.Itoa(maxPageSize) + " FOR UPDATE SKIP LOCKED"
return tx.Query(selectSQL)
}

func (outbox *TxOutbox) scavengeOrphanedRecords(tx *sql.Tx) (*sql.Rows, error) {
selectSQL := "SELECT rec_id, exchange, routing_key, publishing FROM " + getOutboxName(outbox.svcName) + " WHERE status = 1 ORDER BY rec_id ASC LIMIT ? FOR UPDATE SKIP LOCKED"
selectSQL := "SELECT rec_id, exchange, routing_key, publishing FROM " + getOutboxName(outbox.svcName) + " WHERE status = " + strconv.Itoa(Sent) + " ORDER BY rec_id ASC LIMIT ? FOR UPDATE SKIP LOCKED"
return tx.Query(selectSQL, strconv.Itoa(maxPageSize))
}

Expand Down Expand Up @@ -310,7 +361,7 @@ func (outbox *TxOutbox) sendMessages(recordSelector func(tx *sql.Tx) (*sql.Rows,
outbox.log().WithField("messages_sent", len(successfulDeliveries)).Info("outbox relayed messages")
}
for deliveryTag, id := range successfulDeliveries {
_, updateErr := tx.Exec("UPDATE "+getOutboxName(outbox.svcName)+" SET status=1, delivery_tag=?, relay_id=? WHERE rec_id=?", deliveryTag, outbox.ID, id)
_, updateErr := tx.Exec("UPDATE "+getOutboxName(outbox.svcName)+" SET status="+strconv.Itoa(Sent)+", delivery_tag=?, relay_id=? WHERE rec_id=?", deliveryTag, outbox.ID, id)
if updateErr != nil {
outbox.log().WithError(updateErr).
WithFields(log.Fields{"record_id": id, "delivery_tag": deliveryTag, "relay_id": outbox.ID}).
Expand Down