Skip to content

Commit

Permalink
Option to mark positioned clients with insufficient state if publicat…
Browse files Browse the repository at this point in the history
…ion lag exceeds threshold (#385)
  • Loading branch information
FZambia authored May 26, 2024
1 parent 48fe37f commit 5ad3b3d
Show file tree
Hide file tree
Showing 19 changed files with 157 additions and 57 deletions.
4 changes: 2 additions & 2 deletions _examples/document_sync/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
document += update.increment
return document
},
compareVersion: (publication, currentVersion) => {
const newVersion = publication.data.version;
compareVersion: (currentVersion, update) => {
const newVersion = update.version;
return newVersion > currentVersion ? newVersion : null;
},
onChange: (document) => {
Expand Down
2 changes: 2 additions & 0 deletions _examples/document_sync/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
_ "net/http/pprof"

"github.com/centrifugal/centrifuge"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

// Counter is a document we sync here. Returned when loading full state.
Expand Down Expand Up @@ -114,6 +115,7 @@ func main() {

http.Handle("/connection/websocket", authMiddleware(centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{})))
http.HandleFunc("/api/counter", getCounterHandler)
http.Handle("/metrics", promhttp.Handler())
http.Handle("/", http.FileServer(http.Dir("./")))

counter = Counter{Version: 0, Value: 0}
Expand Down
4 changes: 2 additions & 2 deletions _examples/document_sync/rtdocument.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class RealTimeDocument {
return;
}
// Process new messages immediately if initial state is already loaded.
const newVersion = this.#compareVersion(ctx, this.#version);
const newVersion = this.#compareVersion(this.#version, ctx.data);
if (newVersion === null) {
this.#debugLog("Skip real-time publication", ctx);
return;
Expand Down Expand Up @@ -143,7 +143,7 @@ class RealTimeDocument {

#processBufferedMessages() {
this.#messageBuffer.forEach((msg) => {
const newVersion = this.#compareVersion(msg, this.#version);
const newVersion = this.#compareVersion(this.#version, msg.data);
if (newVersion) {
this.#document = this.#applyUpdate(this.#document, msg.data);
this.#version = newVersion;
Expand Down
2 changes: 1 addition & 1 deletion _examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ replace github.com/centrifugal/centrifuge => ../
require (
github.com/FZambia/tarantool v0.2.2
github.com/centrifugal/centrifuge v0.8.2
github.com/centrifugal/protocol v0.13.2
github.com/centrifugal/protocol v0.13.3
github.com/cristalhq/jwt/v5 v5.4.0
github.com/dchest/uniuri v1.2.0
github.com/gin-contrib/sessions v0.0.3
Expand Down
4 changes: 2 additions & 2 deletions _examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ github.com/bradleypeabody/gorilla-sessions-memcache v0.0.0-20181103040241-659414
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s=
github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U=
github.com/centrifugal/protocol v0.13.2 h1:LLPp2KZBK++Vr5HDHTG9fMssapucir0qev4YAbjhiqA=
github.com/centrifugal/protocol v0.13.2/go.mod h1:7V5vI30VcoxJe4UD87xi7bOsvI0bmEhvbQuMjrFM2L4=
github.com/centrifugal/protocol v0.13.3 h1:Ryt5uIYCz5wpJOHc0+L2dC1ty2OQzwdU4TV3pmPOfnA=
github.com/centrifugal/protocol v0.13.3/go.mod h1:7V5vI30VcoxJe4UD87xi7bOsvI0bmEhvbQuMjrFM2L4=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY=
Expand Down
3 changes: 1 addition & 2 deletions _examples/unidirectional_grpc/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@ var (
)

func handlePush(push *clientproto.Push) {
log.Printf("push received (type %d, channel %s, data %s", push.Type, push.Channel, fmt.Sprintf("%#v", string(push.Data)))
if push.Connect != nil {
log.Printf("connected to a server with ID: %s", push.Connect.Client)
} else if push.Pub != nil {
log.Printf("new publication from channel %s: %s", push.Channel, fmt.Sprintf("%#v", string(push.Pub.Data)))
} else if push.Disconnect != nil {
log.Printf("disconnected from a server: %s", push.Disconnect.Reason)
} else {
log.Println("push type handling not implemented")
log.Printf("push type handling not implemented: %v", push)
}
}

Expand Down
4 changes: 4 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ type Publication struct {
// Tags contains a map with custom key-values attached to a Publication. Tags map
// will be delivered to a client.
Tags map[string]string
// Optional time of publication as Unix timestamp milliseconds. At this point
// we use it for calculating PUB/SUB time lag, it's not exposed to the client
// protocol.
Time int64
}

// ClientInfo contains information about client connection.
Expand Down
1 change: 1 addition & 0 deletions broker_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (b *MemoryBroker) Publish(ch string, data []byte, opts PublishOptions) (Str
Data: data,
Info: opts.ClientInfo,
Tags: opts.Tags,
Time: time.Now().UnixMilli(),
}
var prevPub *Publication
if opts.HistorySize > 0 && opts.HistoryTTL > 0 {
Expand Down
31 changes: 18 additions & 13 deletions broker_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ func (b *RedisBroker) publish(s *shardWrapper, ch string, data []byte, opts Publ
Data: data,
Info: infoToProto(opts.ClientInfo),
Tags: opts.Tags,
Time: time.Now().UnixMilli(),
}
if opts.HistorySize <= 0 || opts.HistoryTTL <= 0 {
// In no history case we communicate delta flag over Publication field. This field is then
Expand Down Expand Up @@ -1363,7 +1364,11 @@ func extractPushData(data []byte) ([]byte, pushType, StreamPosition, bool, []byt
// d1:offset:epoch:prev_payload_length:prev_payload:payload_length:payload
stringContent := convert.BytesToString(content)
parsedDelta, err := parseDeltaPush(stringContent)
return convert.StringToBytes(parsedDelta.Payload), pubPushType, StreamPosition{Epoch: parsedDelta.Epoch, Offset: parsedDelta.Offset}, true, convert.StringToBytes(parsedDelta.PrevPayload), err == nil
if err != nil {
// Unexpected error.
return nil, pubPushType, StreamPosition{Epoch: epoch, Offset: offset}, false, nil, false
}
return convert.StringToBytes(parsedDelta.Payload), pubPushType, StreamPosition{Epoch: parsedDelta.Epoch, Offset: parsedDelta.Offset}, true, convert.StringToBytes(parsedDelta.PrevPayload), true
default:
// Unknown content type.
return nil, pubPushType, StreamPosition{Epoch: epoch, Offset: offset}, false, nil, false
Expand All @@ -1379,31 +1384,31 @@ type deltaPublicationPush struct {
Payload string
}

func parseDeltaPush(input string) (*deltaPublicationPush, error) {
func parseDeltaPush(input string) (deltaPublicationPush, error) {
// d1:offset:epoch:prev_payload_length:prev_payload:payload_length:payload
const prefix = "d1:"
if !strings.HasPrefix(input, prefix) {
return nil, fmt.Errorf("input does not start with the expected prefix")
return deltaPublicationPush{}, fmt.Errorf("input does not start with the expected prefix")
}
input = input[len(prefix):] // Remove prefix

// offset:epoch:prev_payload_length:prev_payload:payload_length:payload

idx := strings.IndexByte(input, ':')
if idx == -1 {
return nil, fmt.Errorf("invalid format, missing offset")
return deltaPublicationPush{}, fmt.Errorf("invalid format, missing offset")
}
offset, err := strconv.ParseUint(input[:idx], 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing offset: %v", err)
return deltaPublicationPush{}, fmt.Errorf("error parsing offset: %v", err)
}
input = input[idx+1:]

// epoch:prev_payload_length:prev_payload:payload_length:payload

idx = strings.IndexByte(input, ':')
if idx == -1 {
return nil, fmt.Errorf("invalid format, missing epoch")
return deltaPublicationPush{}, fmt.Errorf("invalid format, missing epoch")
}
epoch := input[:idx]
input = input[idx+1:]
Expand All @@ -1412,40 +1417,40 @@ func parseDeltaPush(input string) (*deltaPublicationPush, error) {

idx = strings.IndexByte(input, ':')
if idx == -1 {
return nil, fmt.Errorf("invalid format, missing prev payload length")
return deltaPublicationPush{}, fmt.Errorf("invalid format, missing prev payload length")
}
prevPayloadLength, err := strconv.Atoi(input[:idx])
if err != nil {
return nil, fmt.Errorf("error parsing prev payload length: %v", err)
return deltaPublicationPush{}, fmt.Errorf("error parsing prev payload length: %v", err)
}

input = input[idx+1:]

// Extract prev_payload based on prev_payload_length
if len(input) < prevPayloadLength {
return nil, fmt.Errorf("input is shorter than expected prev payload length")
return deltaPublicationPush{}, fmt.Errorf("input is shorter than expected prev payload length")
}
prevPayload := input[:prevPayloadLength]
input = input[prevPayloadLength+1:]

// payload_length:payload
idx = strings.IndexByte(input, ':')
if idx == -1 {
return nil, fmt.Errorf("invalid format, missing payload")
return deltaPublicationPush{}, fmt.Errorf("invalid format, missing payload")
}
payloadLength, err := strconv.Atoi(input[:idx])
if err != nil {
return nil, fmt.Errorf("error parsing payload_length: %v", err)
return deltaPublicationPush{}, fmt.Errorf("error parsing payload_length: %v", err)
}
input = input[idx+1:]

// Extract payload based on payload_length
if len(input) < payloadLength {
return nil, fmt.Errorf("input is shorter than expected payload length")
return deltaPublicationPush{}, fmt.Errorf("input is shorter than expected payload length")
}
payload := input[:payloadLength]

return &deltaPublicationPush{
return deltaPublicationPush{
Offset: offset,
Epoch: epoch,
PrevPayloadLength: prevPayloadLength,
Expand Down
6 changes: 3 additions & 3 deletions broker_redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1965,13 +1965,13 @@ func TestParseDeltaPush(t *testing.T) {
name string
input string
expectError bool
expectedResult *deltaPublicationPush
expectedResult deltaPublicationPush
}{
{
name: "valid data with colon in payload",
input: "d1:1234567890:epoch1:4:test:18:payload:with:colon",
expectError: false,
expectedResult: &deltaPublicationPush{
expectedResult: deltaPublicationPush{
Offset: 1234567890,
Epoch: "epoch1",
PrevPayloadLength: 4,
Expand All @@ -1984,7 +1984,7 @@ func TestParseDeltaPush(t *testing.T) {
name: "valid data with empty payload",
input: "d1:1234567890:epoch2:0::0:",
expectError: false,
expectedResult: &deltaPublicationPush{
expectedResult: deltaPublicationPush{
Offset: 1234567890,
Epoch: "epoch2",
PrevPayloadLength: 0,
Expand Down
56 changes: 44 additions & 12 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,10 +732,7 @@ func (c *Client) checkPosition(checkDelay time.Duration, ch string, chCtx Channe
}
nowUnix := c.node.nowTimeGetter().Unix()

isInitialCheck := chCtx.positionCheckTime == 0
isTimeToCheck := nowUnix-chCtx.positionCheckTime > int64(checkDelay.Seconds())
needCheckPosition := isInitialCheck || isTimeToCheck

needCheckPosition := nowUnix-chCtx.positionCheckTime > int64(checkDelay.Seconds())
if !needCheckPosition {
return true
}
Expand Down Expand Up @@ -3104,7 +3101,7 @@ func (c *Client) handleAsyncUnsubscribe(ch string, unsub Unsubscribe) {
}
}

func (c *Client) writePublicationUpdatePosition(ch string, pub *protocol.Publication, prep preparedData, sp StreamPosition) error {
func (c *Client) writePublicationUpdatePosition(ch string, pub *protocol.Publication, prep preparedData, sp StreamPosition, maxLagExceeded bool) error {
c.mu.Lock()
channelContext, ok := c.channels[ch]
if !ok || !channelHasFlag(channelContext.flags, flagSubscribed) {
Expand Down Expand Up @@ -3141,18 +3138,52 @@ func (c *Client) writePublicationUpdatePosition(ch string, pub *protocol.Publica
nextExpectedOffset := currentPositionOffset + 1
pubOffset := pub.Offset
pubEpoch := sp.Epoch
if pubEpoch != channelContext.streamPosition.Epoch || pubOffset != nextExpectedOffset {
if maxLagExceeded {
// PUB/SUB lag is too big.
// We can introduce an option to mark connection with insufficient state flag instead
// of disconnecting it immediately. In that case connection will eventually reconnect
// due to periodic sync. While connection channel is in the insufficient state we must
// skip publications coming to it. This mode may be useful to spread the resubscribe load.
if c.node.logger.enabled(LogLevelDebug) {
c.node.logger.log(newLogEntry(LogLevelDebug, "client insufficient state (lag)", map[string]any{"channel": ch, "user": c.user, "client": c.uid}))
}
// Tell client about insufficient state, can reconnect/resubscribe to recover the state.
go func() { c.handleInsufficientState(ch, serverSide) }()
c.mu.Unlock()
return nil
}
if pubEpoch != channelContext.streamPosition.Epoch {
// Wrong stream epoch is always a signal of insufficient state.
// We can introduce an option to mark connection with insufficient state flag instead
// of disconnecting it immediately. In that case connection will eventually reconnect
// due to periodic sync. While connection channel is in the insufficient state we must
// skip publications coming to it. This mode may be useful to spread the resubscribe load.
if c.node.logger.enabled(LogLevelDebug) {
c.node.logger.log(newLogEntry(LogLevelDebug, "client insufficient state (epoch)", map[string]any{"channel": ch, "user": c.user, "client": c.uid, "epoch": pubEpoch, "expectedEpoch": channelContext.streamPosition.Epoch}))
}
// Tell client about insufficient state, can reconnect/resubscribe to recover the state.
go func() { c.handleInsufficientState(ch, serverSide) }()
c.mu.Unlock()
return nil
}
if pubOffset > nextExpectedOffset {
// Missed message detected.
// We can introduce an option to mark connection with insufficient state flag instead
// of disconnecting it immediately. In that case connection will eventually reconnect
// due to periodic sync. While connection channel is in the insufficient state we must
// skip publications coming to it. This mode may be useful to spread the resubscribe load.
if c.node.logger.enabled(LogLevelDebug) {
c.node.logger.log(newLogEntry(LogLevelDebug, "client insufficient state", map[string]any{"channel": ch, "user": c.user, "client": c.uid, "epoch": pubEpoch, "expectedEpoch": channelContext.streamPosition.Epoch, "offset": pubOffset, "expectedOffset": nextExpectedOffset}))
c.node.logger.log(newLogEntry(LogLevelDebug, "client insufficient state (offset)", map[string]any{"channel": ch, "user": c.user, "client": c.uid, "offset": pubOffset, "expectedOffset": nextExpectedOffset}))
}
// Oops: sth lost, let client reconnect/resubscribe to recover its state.
// Tell client about insufficient state, can reconnect/resubscribe to recover the state.
go func() { c.handleInsufficientState(ch, serverSide) }()
c.mu.Unlock()
return nil
} else if pubOffset < nextExpectedOffset {
// Epoch is correct, but due to the lag in PUB/SUB processing we received non-actual update
// here. Safe to just skip for the subscriber.
c.mu.Unlock()
return nil
}
channelContext.positionCheckTime = time.Now().Unix()
channelContext.streamPosition.Offset = pub.Offset
Expand All @@ -3176,10 +3207,10 @@ func (c *Client) writePublicationUpdatePosition(ch string, pub *protocol.Publica
}

func (c *Client) writePublicationNoDelta(ch string, pub *protocol.Publication, data []byte, sp StreamPosition) error {
return c.writePublication(ch, pub, preparedData{fullData: data, brokerDeltaData: nil, localDeltaData: nil, deltaSub: false}, sp)
return c.writePublication(ch, pub, preparedData{fullData: data, brokerDeltaData: nil, localDeltaData: nil, deltaSub: false}, sp, false)
}

func (c *Client) writePublication(ch string, pub *protocol.Publication, prep preparedData, sp StreamPosition) error {
func (c *Client) writePublication(ch string, pub *protocol.Publication, prep preparedData, sp StreamPosition, maxLagExceeded bool) error {
if c.node.LogEnabled(LogLevelTrace) {
c.traceOutPush(&protocol.Push{Channel: ch, Pub: pub})
}
Expand Down Expand Up @@ -3213,7 +3244,7 @@ func (c *Client) writePublication(ch string, pub *protocol.Publication, prep pre
return c.transportEnqueue(prep.fullData, ch, protocol.FrameTypePushPublication)
}
c.pubSubSync.SyncPublication(ch, pub, func() {
_ = c.writePublicationUpdatePosition(ch, pub, prep, sp)
_ = c.writePublicationUpdatePosition(ch, pub, prep, sp, maxLagExceeded)
})
return nil
}
Expand Down Expand Up @@ -3354,7 +3385,8 @@ func (c *Client) logWriteInternalErrorFlush(ch string, frameType protocol.FrameT
}

func toClientErr(err error) *Error {
if clientErr, ok := err.(*Error); ok {
var clientErr *Error
if errors.As(err, &clientErr) {
return clientErr
}
return ErrorInternal
Expand Down
6 changes: 6 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ type Config struct {
// will be disconnected with DisconnectInsufficientState.
// Zero value means 40 * time.Second.
ClientChannelPositionCheckDelay time.Duration
// Maximum allowed time lag for publications for subscribers with positioning on.
// When exceeded we mark connection with insufficient state. By default, not used - i.e.
// Centrifuge does not take lag into account for positioning.
// See also pub_sub_time_lag_seconds as a helpful metric.
ClientChannelPositionMaxTimeLag time.Duration

// ClientQueueMaxSize is a maximum size of client's message queue in
// bytes. After this queue size exceeded Centrifuge closes client's connection.
// Zero value means 1048576 bytes (1MB).
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.21
require (
github.com/FZambia/eagle v0.1.0
github.com/Yiling-J/theine-go v0.3.2
github.com/centrifugal/protocol v0.13.2
github.com/centrifugal/protocol v0.13.3
github.com/google/uuid v1.6.0
github.com/prometheus/client_golang v1.19.1
github.com/redis/rueidis v1.0.37
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ github.com/Yiling-J/theine-go v0.3.2 h1:XcSdMPV9DwBD9gqqSxbBfVJnP8CCiqNSqp3C6Ypm
github.com/Yiling-J/theine-go v0.3.2/go.mod h1:ygLXqrWPZT/a+PzK5hQ0+a6gu0lpAY5IudTcgnPleqI=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/centrifugal/protocol v0.13.2 h1:LLPp2KZBK++Vr5HDHTG9fMssapucir0qev4YAbjhiqA=
github.com/centrifugal/protocol v0.13.2/go.mod h1:7V5vI30VcoxJe4UD87xi7bOsvI0bmEhvbQuMjrFM2L4=
github.com/centrifugal/protocol v0.13.3 h1:Ryt5uIYCz5wpJOHc0+L2dC1ty2OQzwdU4TV3pmPOfnA=
github.com/centrifugal/protocol v0.13.3/go.mod h1:7V5vI30VcoxJe4UD87xi7bOsvI0bmEhvbQuMjrFM2L4=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down
Loading

0 comments on commit 5ad3b3d

Please sign in to comment.