diff --git a/_examples/document_sync/index.html b/_examples/document_sync/index.html
index 3f712033..6acbc665 100644
--- a/_examples/document_sync/index.html
+++ b/_examples/document_sync/index.html
@@ -31,8 +31,8 @@
document += update.increment
return document
},
- compareVersion: (publication, currentVersion) => {
- const newVersion = publication.data.version;
+ compareVersion: (currentVersion, update) => {
+ const newVersion = update.version;
return newVersion > currentVersion ? newVersion : null;
},
onChange: (document) => {
diff --git a/_examples/document_sync/main.go b/_examples/document_sync/main.go
index b0e66aec..ff0d26a3 100644
--- a/_examples/document_sync/main.go
+++ b/_examples/document_sync/main.go
@@ -15,6 +15,7 @@ import (
_ "net/http/pprof"
"github.com/centrifugal/centrifuge"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
)
// Counter is a document we sync here. Returned when loading full state.
@@ -114,6 +115,7 @@ func main() {
http.Handle("/connection/websocket", authMiddleware(centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{})))
http.HandleFunc("/api/counter", getCounterHandler)
+ http.Handle("/metrics", promhttp.Handler())
http.Handle("/", http.FileServer(http.Dir("./")))
counter = Counter{Version: 0, Value: 0}
diff --git a/_examples/document_sync/rtdocument.js b/_examples/document_sync/rtdocument.js
index 190e9bd4..32fbd714 100644
--- a/_examples/document_sync/rtdocument.js
+++ b/_examples/document_sync/rtdocument.js
@@ -48,7 +48,7 @@ class RealTimeDocument {
return;
}
// Process new messages immediately if initial state is already loaded.
- const newVersion = this.#compareVersion(ctx, this.#version);
+ const newVersion = this.#compareVersion(this.#version, ctx.data);
if (newVersion === null) {
this.#debugLog("Skip real-time publication", ctx);
return;
@@ -143,7 +143,7 @@ class RealTimeDocument {
#processBufferedMessages() {
this.#messageBuffer.forEach((msg) => {
- const newVersion = this.#compareVersion(msg, this.#version);
+ const newVersion = this.#compareVersion(this.#version, msg.data);
if (newVersion) {
this.#document = this.#applyUpdate(this.#document, msg.data);
this.#version = newVersion;
diff --git a/_examples/go.mod b/_examples/go.mod
index e1a6519c..28e2612c 100644
--- a/_examples/go.mod
+++ b/_examples/go.mod
@@ -7,7 +7,7 @@ replace github.com/centrifugal/centrifuge => ../
require (
github.com/FZambia/tarantool v0.2.2
github.com/centrifugal/centrifuge v0.8.2
- github.com/centrifugal/protocol v0.13.2
+ github.com/centrifugal/protocol v0.13.3
github.com/cristalhq/jwt/v5 v5.4.0
github.com/dchest/uniuri v1.2.0
github.com/gin-contrib/sessions v0.0.3
diff --git a/_examples/go.sum b/_examples/go.sum
index a34c7418..07b9f1fe 100644
--- a/_examples/go.sum
+++ b/_examples/go.sum
@@ -14,8 +14,8 @@ github.com/bradleypeabody/gorilla-sessions-memcache v0.0.0-20181103040241-659414
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s=
github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U=
-github.com/centrifugal/protocol v0.13.2 h1:LLPp2KZBK++Vr5HDHTG9fMssapucir0qev4YAbjhiqA=
-github.com/centrifugal/protocol v0.13.2/go.mod h1:7V5vI30VcoxJe4UD87xi7bOsvI0bmEhvbQuMjrFM2L4=
+github.com/centrifugal/protocol v0.13.3 h1:Ryt5uIYCz5wpJOHc0+L2dC1ty2OQzwdU4TV3pmPOfnA=
+github.com/centrifugal/protocol v0.13.3/go.mod h1:7V5vI30VcoxJe4UD87xi7bOsvI0bmEhvbQuMjrFM2L4=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY=
diff --git a/_examples/unidirectional_grpc/client/main.go b/_examples/unidirectional_grpc/client/main.go
index d5fcc809..67a380ae 100644
--- a/_examples/unidirectional_grpc/client/main.go
+++ b/_examples/unidirectional_grpc/client/main.go
@@ -17,7 +17,6 @@ var (
)
func handlePush(push *clientproto.Push) {
- log.Printf("push received (type %d, channel %s, data %s", push.Type, push.Channel, fmt.Sprintf("%#v", string(push.Data)))
if push.Connect != nil {
log.Printf("connected to a server with ID: %s", push.Connect.Client)
} else if push.Pub != nil {
@@ -25,7 +24,7 @@ func handlePush(push *clientproto.Push) {
} else if push.Disconnect != nil {
log.Printf("disconnected from a server: %s", push.Disconnect.Reason)
} else {
- log.Println("push type handling not implemented")
+ log.Printf("push type handling not implemented: %v", push)
}
}
diff --git a/broker.go b/broker.go
index 75f6e201..2633d8b1 100644
--- a/broker.go
+++ b/broker.go
@@ -17,6 +17,10 @@ type Publication struct {
// Tags contains a map with custom key-values attached to a Publication. Tags map
// will be delivered to a client.
Tags map[string]string
+ // Optional time of publication as Unix timestamp milliseconds. At this point
+ // we use it for calculating PUB/SUB time lag, it's not exposed to the client
+ // protocol.
+ Time int64
}
// ClientInfo contains information about client connection.
diff --git a/broker_memory.go b/broker_memory.go
index 55a1a445..9de79ad1 100644
--- a/broker_memory.go
+++ b/broker_memory.go
@@ -104,6 +104,7 @@ func (b *MemoryBroker) Publish(ch string, data []byte, opts PublishOptions) (Str
Data: data,
Info: opts.ClientInfo,
Tags: opts.Tags,
+ Time: time.Now().UnixMilli(),
}
var prevPub *Publication
if opts.HistorySize > 0 && opts.HistoryTTL > 0 {
diff --git a/broker_redis.go b/broker_redis.go
index 69cfecaf..374dbfd0 100644
--- a/broker_redis.go
+++ b/broker_redis.go
@@ -613,6 +613,7 @@ func (b *RedisBroker) publish(s *shardWrapper, ch string, data []byte, opts Publ
Data: data,
Info: infoToProto(opts.ClientInfo),
Tags: opts.Tags,
+ Time: time.Now().UnixMilli(),
}
if opts.HistorySize <= 0 || opts.HistoryTTL <= 0 {
// In no history case we communicate delta flag over Publication field. This field is then
@@ -1363,7 +1364,11 @@ func extractPushData(data []byte) ([]byte, pushType, StreamPosition, bool, []byt
// d1:offset:epoch:prev_payload_length:prev_payload:payload_length:payload
stringContent := convert.BytesToString(content)
parsedDelta, err := parseDeltaPush(stringContent)
- return convert.StringToBytes(parsedDelta.Payload), pubPushType, StreamPosition{Epoch: parsedDelta.Epoch, Offset: parsedDelta.Offset}, true, convert.StringToBytes(parsedDelta.PrevPayload), err == nil
+ if err != nil {
+ // Unexpected error.
+ return nil, pubPushType, StreamPosition{Epoch: epoch, Offset: offset}, false, nil, false
+ }
+ return convert.StringToBytes(parsedDelta.Payload), pubPushType, StreamPosition{Epoch: parsedDelta.Epoch, Offset: parsedDelta.Offset}, true, convert.StringToBytes(parsedDelta.PrevPayload), true
default:
// Unknown content type.
return nil, pubPushType, StreamPosition{Epoch: epoch, Offset: offset}, false, nil, false
@@ -1379,11 +1384,11 @@ type deltaPublicationPush struct {
Payload string
}
-func parseDeltaPush(input string) (*deltaPublicationPush, error) {
+func parseDeltaPush(input string) (deltaPublicationPush, error) {
// d1:offset:epoch:prev_payload_length:prev_payload:payload_length:payload
const prefix = "d1:"
if !strings.HasPrefix(input, prefix) {
- return nil, fmt.Errorf("input does not start with the expected prefix")
+ return deltaPublicationPush{}, fmt.Errorf("input does not start with the expected prefix")
}
input = input[len(prefix):] // Remove prefix
@@ -1391,11 +1396,11 @@ func parseDeltaPush(input string) (*deltaPublicationPush, error) {
idx := strings.IndexByte(input, ':')
if idx == -1 {
- return nil, fmt.Errorf("invalid format, missing offset")
+ return deltaPublicationPush{}, fmt.Errorf("invalid format, missing offset")
}
offset, err := strconv.ParseUint(input[:idx], 10, 64)
if err != nil {
- return nil, fmt.Errorf("error parsing offset: %v", err)
+ return deltaPublicationPush{}, fmt.Errorf("error parsing offset: %v", err)
}
input = input[idx+1:]
@@ -1403,7 +1408,7 @@ func parseDeltaPush(input string) (*deltaPublicationPush, error) {
idx = strings.IndexByte(input, ':')
if idx == -1 {
- return nil, fmt.Errorf("invalid format, missing epoch")
+ return deltaPublicationPush{}, fmt.Errorf("invalid format, missing epoch")
}
epoch := input[:idx]
input = input[idx+1:]
@@ -1412,18 +1417,18 @@ func parseDeltaPush(input string) (*deltaPublicationPush, error) {
idx = strings.IndexByte(input, ':')
if idx == -1 {
- return nil, fmt.Errorf("invalid format, missing prev payload length")
+ return deltaPublicationPush{}, fmt.Errorf("invalid format, missing prev payload length")
}
prevPayloadLength, err := strconv.Atoi(input[:idx])
if err != nil {
- return nil, fmt.Errorf("error parsing prev payload length: %v", err)
+ return deltaPublicationPush{}, fmt.Errorf("error parsing prev payload length: %v", err)
}
input = input[idx+1:]
// Extract prev_payload based on prev_payload_length
if len(input) < prevPayloadLength {
- return nil, fmt.Errorf("input is shorter than expected prev payload length")
+ return deltaPublicationPush{}, fmt.Errorf("input is shorter than expected prev payload length")
}
prevPayload := input[:prevPayloadLength]
input = input[prevPayloadLength+1:]
@@ -1431,21 +1436,21 @@ func parseDeltaPush(input string) (*deltaPublicationPush, error) {
// payload_length:payload
idx = strings.IndexByte(input, ':')
if idx == -1 {
- return nil, fmt.Errorf("invalid format, missing payload")
+ return deltaPublicationPush{}, fmt.Errorf("invalid format, missing payload")
}
payloadLength, err := strconv.Atoi(input[:idx])
if err != nil {
- return nil, fmt.Errorf("error parsing payload_length: %v", err)
+ return deltaPublicationPush{}, fmt.Errorf("error parsing payload_length: %v", err)
}
input = input[idx+1:]
// Extract payload based on payload_length
if len(input) < payloadLength {
- return nil, fmt.Errorf("input is shorter than expected payload length")
+ return deltaPublicationPush{}, fmt.Errorf("input is shorter than expected payload length")
}
payload := input[:payloadLength]
- return &deltaPublicationPush{
+ return deltaPublicationPush{
Offset: offset,
Epoch: epoch,
PrevPayloadLength: prevPayloadLength,
diff --git a/broker_redis_test.go b/broker_redis_test.go
index df30d9a7..c88ab2d6 100644
--- a/broker_redis_test.go
+++ b/broker_redis_test.go
@@ -1965,13 +1965,13 @@ func TestParseDeltaPush(t *testing.T) {
name string
input string
expectError bool
- expectedResult *deltaPublicationPush
+ expectedResult deltaPublicationPush
}{
{
name: "valid data with colon in payload",
input: "d1:1234567890:epoch1:4:test:18:payload:with:colon",
expectError: false,
- expectedResult: &deltaPublicationPush{
+ expectedResult: deltaPublicationPush{
Offset: 1234567890,
Epoch: "epoch1",
PrevPayloadLength: 4,
@@ -1984,7 +1984,7 @@ func TestParseDeltaPush(t *testing.T) {
name: "valid data with empty payload",
input: "d1:1234567890:epoch2:0::0:",
expectError: false,
- expectedResult: &deltaPublicationPush{
+ expectedResult: deltaPublicationPush{
Offset: 1234567890,
Epoch: "epoch2",
PrevPayloadLength: 0,
diff --git a/client.go b/client.go
index fc70b91a..f721520a 100644
--- a/client.go
+++ b/client.go
@@ -732,10 +732,7 @@ func (c *Client) checkPosition(checkDelay time.Duration, ch string, chCtx Channe
}
nowUnix := c.node.nowTimeGetter().Unix()
- isInitialCheck := chCtx.positionCheckTime == 0
- isTimeToCheck := nowUnix-chCtx.positionCheckTime > int64(checkDelay.Seconds())
- needCheckPosition := isInitialCheck || isTimeToCheck
-
+ needCheckPosition := nowUnix-chCtx.positionCheckTime > int64(checkDelay.Seconds())
if !needCheckPosition {
return true
}
@@ -3104,7 +3101,7 @@ func (c *Client) handleAsyncUnsubscribe(ch string, unsub Unsubscribe) {
}
}
-func (c *Client) writePublicationUpdatePosition(ch string, pub *protocol.Publication, prep preparedData, sp StreamPosition) error {
+func (c *Client) writePublicationUpdatePosition(ch string, pub *protocol.Publication, prep preparedData, sp StreamPosition, maxLagExceeded bool) error {
c.mu.Lock()
channelContext, ok := c.channels[ch]
if !ok || !channelHasFlag(channelContext.flags, flagSubscribed) {
@@ -3141,18 +3138,52 @@ func (c *Client) writePublicationUpdatePosition(ch string, pub *protocol.Publica
nextExpectedOffset := currentPositionOffset + 1
pubOffset := pub.Offset
pubEpoch := sp.Epoch
- if pubEpoch != channelContext.streamPosition.Epoch || pubOffset != nextExpectedOffset {
+ if maxLagExceeded {
+ // PUB/SUB lag is too big.
+ // We can introduce an option to mark connection with insufficient state flag instead
+ // of disconnecting it immediately. In that case connection will eventually reconnect
+ // due to periodic sync. While connection channel is in the insufficient state we must
+ // skip publications coming to it. This mode may be useful to spread the resubscribe load.
+ if c.node.logger.enabled(LogLevelDebug) {
+ c.node.logger.log(newLogEntry(LogLevelDebug, "client insufficient state (lag)", map[string]any{"channel": ch, "user": c.user, "client": c.uid}))
+ }
+ // Tell client about insufficient state, can reconnect/resubscribe to recover the state.
+ go func() { c.handleInsufficientState(ch, serverSide) }()
+ c.mu.Unlock()
+ return nil
+ }
+ if pubEpoch != channelContext.streamPosition.Epoch {
+ // Wrong stream epoch is always a signal of insufficient state.
+ // We can introduce an option to mark connection with insufficient state flag instead
+ // of disconnecting it immediately. In that case connection will eventually reconnect
+ // due to periodic sync. While connection channel is in the insufficient state we must
+ // skip publications coming to it. This mode may be useful to spread the resubscribe load.
+ if c.node.logger.enabled(LogLevelDebug) {
+ c.node.logger.log(newLogEntry(LogLevelDebug, "client insufficient state (epoch)", map[string]any{"channel": ch, "user": c.user, "client": c.uid, "epoch": pubEpoch, "expectedEpoch": channelContext.streamPosition.Epoch}))
+ }
+ // Tell client about insufficient state, can reconnect/resubscribe to recover the state.
+ go func() { c.handleInsufficientState(ch, serverSide) }()
+ c.mu.Unlock()
+ return nil
+ }
+ if pubOffset > nextExpectedOffset {
+ // Missed message detected.
// We can introduce an option to mark connection with insufficient state flag instead
// of disconnecting it immediately. In that case connection will eventually reconnect
// due to periodic sync. While connection channel is in the insufficient state we must
// skip publications coming to it. This mode may be useful to spread the resubscribe load.
if c.node.logger.enabled(LogLevelDebug) {
- c.node.logger.log(newLogEntry(LogLevelDebug, "client insufficient state", map[string]any{"channel": ch, "user": c.user, "client": c.uid, "epoch": pubEpoch, "expectedEpoch": channelContext.streamPosition.Epoch, "offset": pubOffset, "expectedOffset": nextExpectedOffset}))
+ c.node.logger.log(newLogEntry(LogLevelDebug, "client insufficient state (offset)", map[string]any{"channel": ch, "user": c.user, "client": c.uid, "offset": pubOffset, "expectedOffset": nextExpectedOffset}))
}
- // Oops: sth lost, let client reconnect/resubscribe to recover its state.
+ // Tell client about insufficient state, can reconnect/resubscribe to recover the state.
go func() { c.handleInsufficientState(ch, serverSide) }()
c.mu.Unlock()
return nil
+ } else if pubOffset < nextExpectedOffset {
+ // Epoch is correct, but due to the lag in PUB/SUB processing we received non-actual update
+ // here. Safe to just skip for the subscriber.
+ c.mu.Unlock()
+ return nil
}
channelContext.positionCheckTime = time.Now().Unix()
channelContext.streamPosition.Offset = pub.Offset
@@ -3176,10 +3207,10 @@ func (c *Client) writePublicationUpdatePosition(ch string, pub *protocol.Publica
}
func (c *Client) writePublicationNoDelta(ch string, pub *protocol.Publication, data []byte, sp StreamPosition) error {
- return c.writePublication(ch, pub, preparedData{fullData: data, brokerDeltaData: nil, localDeltaData: nil, deltaSub: false}, sp)
+ return c.writePublication(ch, pub, preparedData{fullData: data, brokerDeltaData: nil, localDeltaData: nil, deltaSub: false}, sp, false)
}
-func (c *Client) writePublication(ch string, pub *protocol.Publication, prep preparedData, sp StreamPosition) error {
+func (c *Client) writePublication(ch string, pub *protocol.Publication, prep preparedData, sp StreamPosition, maxLagExceeded bool) error {
if c.node.LogEnabled(LogLevelTrace) {
c.traceOutPush(&protocol.Push{Channel: ch, Pub: pub})
}
@@ -3213,7 +3244,7 @@ func (c *Client) writePublication(ch string, pub *protocol.Publication, prep pre
return c.transportEnqueue(prep.fullData, ch, protocol.FrameTypePushPublication)
}
c.pubSubSync.SyncPublication(ch, pub, func() {
- _ = c.writePublicationUpdatePosition(ch, pub, prep, sp)
+ _ = c.writePublicationUpdatePosition(ch, pub, prep, sp, maxLagExceeded)
})
return nil
}
@@ -3354,7 +3385,8 @@ func (c *Client) logWriteInternalErrorFlush(ch string, frameType protocol.FrameT
}
func toClientErr(err error) *Error {
- if clientErr, ok := err.(*Error); ok {
+ var clientErr *Error
+ if errors.As(err, &clientErr) {
return clientErr
}
return ErrorInternal
diff --git a/config.go b/config.go
index 3bf7d2e2..e057cf62 100644
--- a/config.go
+++ b/config.go
@@ -48,6 +48,12 @@ type Config struct {
// will be disconnected with DisconnectInsufficientState.
// Zero value means 40 * time.Second.
ClientChannelPositionCheckDelay time.Duration
+ // Maximum allowed time lag for publications for subscribers with positioning on.
+ // When exceeded we mark connection with insufficient state. By default, not used - i.e.
+ // Centrifuge does not take lag into account for positioning.
+ // See also pub_sub_time_lag_seconds as a helpful metric.
+ ClientChannelPositionMaxTimeLag time.Duration
+
// ClientQueueMaxSize is a maximum size of client's message queue in
// bytes. After this queue size exceeded Centrifuge closes client's connection.
// Zero value means 1048576 bytes (1MB).
diff --git a/go.mod b/go.mod
index ab1e6f82..b6316320 100644
--- a/go.mod
+++ b/go.mod
@@ -5,7 +5,7 @@ go 1.21
require (
github.com/FZambia/eagle v0.1.0
github.com/Yiling-J/theine-go v0.3.2
- github.com/centrifugal/protocol v0.13.2
+ github.com/centrifugal/protocol v0.13.3
github.com/google/uuid v1.6.0
github.com/prometheus/client_golang v1.19.1
github.com/redis/rueidis v1.0.37
diff --git a/go.sum b/go.sum
index a96502fb..11288eb1 100644
--- a/go.sum
+++ b/go.sum
@@ -4,8 +4,8 @@ github.com/Yiling-J/theine-go v0.3.2 h1:XcSdMPV9DwBD9gqqSxbBfVJnP8CCiqNSqp3C6Ypm
github.com/Yiling-J/theine-go v0.3.2/go.mod h1:ygLXqrWPZT/a+PzK5hQ0+a6gu0lpAY5IudTcgnPleqI=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
-github.com/centrifugal/protocol v0.13.2 h1:LLPp2KZBK++Vr5HDHTG9fMssapucir0qev4YAbjhiqA=
-github.com/centrifugal/protocol v0.13.2/go.mod h1:7V5vI30VcoxJe4UD87xi7bOsvI0bmEhvbQuMjrFM2L4=
+github.com/centrifugal/protocol v0.13.3 h1:Ryt5uIYCz5wpJOHc0+L2dC1ty2OQzwdU4TV3pmPOfnA=
+github.com/centrifugal/protocol v0.13.3/go.mod h1:7V5vI30VcoxJe4UD87xi7bOsvI0bmEhvbQuMjrFM2L4=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
diff --git a/hub.go b/hub.go
index d0f7c067..44653753 100644
--- a/hub.go
+++ b/hub.go
@@ -4,6 +4,7 @@ import (
"context"
"io"
"sync"
+ "time"
"github.com/centrifugal/centrifuge/internal/convert"
@@ -23,13 +24,13 @@ type Hub struct {
}
// newHub initializes Hub.
-func newHub(logger *logger) *Hub {
+func newHub(logger *logger, metrics *metrics, maxTimeLagMilli int64) *Hub {
h := &Hub{
sessions: map[string]*Client{},
}
for i := 0; i < numHubShards; i++ {
h.connShards[i] = newConnShard()
- h.subShards[i] = newSubShard(logger)
+ h.subShards[i] = newSubShard(logger, metrics, maxTimeLagMilli)
}
return h
}
@@ -498,14 +499,18 @@ type subInfo struct {
type subShard struct {
mu sync.RWMutex
// registry to hold active subscriptions of clients to channels with some additional info.
- subs map[string]map[string]subInfo
- logger *logger
+ subs map[string]map[string]subInfo
+ maxTimeLagMilli int64
+ logger *logger
+ metrics *metrics
}
-func newSubShard(logger *logger) *subShard {
+func newSubShard(logger *logger, metrics *metrics, maxTimeLagMilli int64) *subShard {
return &subShard{
- subs: make(map[string]map[string]subInfo),
- logger: logger,
+ subs: make(map[string]map[string]subInfo),
+ logger: logger,
+ metrics: metrics,
+ maxTimeLagMilli: maxTimeLagMilli,
}
}
@@ -646,6 +651,7 @@ func getDeltaData(sub subInfo, key preparedKey, channel string, deltaPub *protoc
// broadcastPublication sends message to all clients subscribed on a channel.
func (h *subShard) broadcastPublication(channel string, sp StreamPosition, pub, prevPub, localPrevPub *Publication) error {
+ pubTime := pub.Time
fullPub := pubToProto(pub)
preparedDataByKey := make(map[preparedKey]preparedData)
@@ -755,7 +761,18 @@ func (h *subShard) broadcastPublication(channel string, sp StreamPosition, pub,
go func(c *Client) { c.Disconnect(DisconnectInappropriateProtocol) }(sub.client)
continue
}
- _ = sub.client.writePublication(channel, fullPub, prepValue, sp)
+
+ // Check lag in PUB/SUB processing. We use it to notify subscribers with positioning enabled
+ // about insufficient state in the stream.
+ var maxLagExceeded bool
+ if pubTime > 0 {
+ timeLagMilli := time.Now().UnixMilli() - pubTime
+ h.metrics.observePubSubTimeLag(timeLagMilli)
+ if h.maxTimeLagMilli > 0 && timeLagMilli > h.maxTimeLagMilli {
+ maxLagExceeded = true
+ }
+ }
+ _ = sub.client.writePublication(channel, fullPub, prepValue, sp, maxLagExceeded)
}
if jsonEncodeErr != nil && h.logger.enabled(LogLevelWarn) {
// Log that we had clients with inappropriate protocol, and point to the first such client.
diff --git a/hub_test.go b/hub_test.go
index d06e9f67..2fe3b878 100644
--- a/hub_test.go
+++ b/hub_test.go
@@ -10,6 +10,8 @@ import (
"testing"
"time"
+ "github.com/prometheus/client_golang/prometheus"
+
"github.com/centrifugal/centrifuge/internal/convert"
"github.com/centrifugal/protocol"
@@ -153,7 +155,10 @@ func (t *testTransport) Close(disconnect Disconnect) error {
}
func TestHub(t *testing.T) {
- h := newHub(nil)
+ m, err := initMetricsRegistry(prometheus.DefaultRegisterer, "test")
+ require.NoError(t, err)
+
+ h := newHub(nil, m, 0)
c, err := newClient(context.Background(), defaultTestNode(), newTestTransport(func() {}))
require.NoError(t, err)
c.user = "test"
@@ -896,10 +901,12 @@ func TestHubBroadcastLeave(t *testing.T) {
}
func TestHubShutdown(t *testing.T) {
- h := newHub(nil)
- err := h.shutdown(context.Background())
+ m, err := initMetricsRegistry(prometheus.DefaultRegisterer, "test")
require.NoError(t, err)
- h = newHub(nil)
+ h := newHub(nil, m, 0)
+ err = h.shutdown(context.Background())
+ require.NoError(t, err)
+ h = newHub(nil, m, 0)
c, err := newClient(context.Background(), defaultTestNode(), newTestTransport(func() {}))
require.NoError(t, err)
_ = h.add(c)
@@ -914,7 +921,9 @@ func TestHubShutdown(t *testing.T) {
}
func TestHubSubscriptions(t *testing.T) {
- h := newHub(nil)
+ m, err := initMetricsRegistry(prometheus.DefaultRegisterer, "test")
+ require.NoError(t, err)
+ h := newHub(nil, m, 0)
c, err := newClient(context.Background(), defaultTestNode(), newTestTransport(func() {}))
require.NoError(t, err)
@@ -955,7 +964,9 @@ func TestHubSubscriptions(t *testing.T) {
}
func TestUserConnections(t *testing.T) {
- h := newHub(nil)
+ m, err := initMetricsRegistry(prometheus.DefaultRegisterer, "test")
+ require.NoError(t, err)
+ h := newHub(nil, m, 0)
c, err := newClient(context.Background(), defaultTestNode(), newTestTransport(func() {}))
require.NoError(t, err)
_ = h.add(c)
diff --git a/metrics.go b/metrics.go
index a337469b..b9303f23 100644
--- a/metrics.go
+++ b/metrics.go
@@ -83,6 +83,8 @@ type metrics struct {
commandDurationRefresh prometheus.Observer
commandDurationSubRefresh prometheus.Observer
commandDurationUnknown prometheus.Observer
+
+ pubSubTimeLagHistogram prometheus.Histogram
}
func (m *metrics) observeCommandDuration(frameType protocol.FrameType, d time.Duration) {
@@ -117,6 +119,13 @@ func (m *metrics) observeCommandDuration(frameType protocol.FrameType, d time.Du
observer.Observe(d.Seconds())
}
+func (m *metrics) observePubSubTimeLag(lagTimeMilli int64) {
+ if lagTimeMilli < 0 {
+ lagTimeMilli = -lagTimeMilli
+ }
+ m.pubSubTimeLagHistogram.Observe(float64(lagTimeMilli) / 1000)
+}
+
func (m *metrics) setBuildInfo(version string) {
m.buildInfoGauge.WithLabelValues(version).Set(1)
}
@@ -459,6 +468,14 @@ func initMetricsRegistry(registry prometheus.Registerer, metricsNamespace string
Help: "Size in bytes of messages received from client connections over specific transport.",
}, []string{"transport", "frame_type", "channel_namespace"})
+ m.pubSubTimeLagHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{
+ Namespace: metricsNamespace,
+ Subsystem: "node",
+ Name: "pub_sub_time_lag_seconds",
+ Help: "Pub sub time lag in seconds",
+ Buckets: []float64{0.001, 0.005, 0.010, 0.025, 0.050, 0.100, 0.200, 0.500, 1.000, 2.000, 5.000, 10.000},
+ })
+
m.messagesReceivedCountPublication = m.messagesReceivedCount.WithLabelValues("publication")
m.messagesReceivedCountJoin = m.messagesReceivedCount.WithLabelValues("join")
m.messagesReceivedCountLeave = m.messagesReceivedCount.WithLabelValues("leave")
@@ -571,5 +588,8 @@ func initMetricsRegistry(registry prometheus.Registerer, metricsNamespace string
if err := registry.Register(m.surveyDurationSummary); err != nil && !errors.As(err, &alreadyRegistered) {
return nil, err
}
+ if err := registry.Register(m.pubSubTimeLagHistogram); err != nil && !errors.As(err, &alreadyRegistered) {
+ return nil, err
+ }
return m, nil
}
diff --git a/node.go b/node.go
index c610539c..a9ee7ffe 100644
--- a/node.go
+++ b/node.go
@@ -153,7 +153,6 @@ func New(c Config) (*Node, error) {
uid: uid,
nodes: newNodeRegistry(uid),
config: c,
- hub: newHub(lg),
startedAt: time.Now().Unix(),
shutdownCh: make(chan struct{}),
logger: lg,
@@ -174,6 +173,8 @@ func New(c Config) (*Node, error) {
n.metrics = m
}
+ n.hub = newHub(lg, n.metrics, c.ClientChannelPositionMaxTimeLag.Milliseconds())
+
b, err := NewMemoryBroker(n, MemoryBrokerConfig{})
if err != nil {
return nil, err
@@ -1253,6 +1254,7 @@ func pubFromProto(pub *protocol.Publication) *Publication {
Data: pub.Data,
Info: infoFromProto(pub.GetInfo()),
Tags: pub.GetTags(),
+ Time: pub.Time,
}
}
diff --git a/node_test.go b/node_test.go
index 08a3508f..ba971bb6 100644
--- a/node_test.go
+++ b/node_test.go
@@ -183,8 +183,9 @@ func nodeWithPresenceManager(presenceManager PresenceManager) *Node {
func defaultNodeNoHandlers() *Node {
n, err := New(Config{
- LogLevel: LogLevelTrace,
- LogHandler: func(entry LogEntry) {},
+ LogLevel: LogLevelTrace,
+ LogHandler: func(entry LogEntry) {},
+ ClientChannelPositionMaxTimeLag: 5 * time.Second,
})
if err != nil {
panic(err)