diff --git a/broker.go b/broker.go index 348d6ebe..75f6e201 100644 --- a/broker.go +++ b/broker.go @@ -35,7 +35,7 @@ type ClientInfo struct { // BrokerEventHandler can handle messages received from PUB/SUB system. type BrokerEventHandler interface { // HandlePublication to handle received Publications. - HandlePublication(ch string, pub *Publication, sp StreamPosition, prevPub *Publication) error + HandlePublication(ch string, pub *Publication, sp StreamPosition, useDelta bool, prevPub *Publication) error // HandleJoin to handle received Join messages. HandleJoin(ch string, info *ClientInfo) error // HandleLeave to handle received Leave messages. diff --git a/broker_memory.go b/broker_memory.go index f2097ea5..55a1a445 100644 --- a/broker_memory.go +++ b/broker_memory.go @@ -121,7 +121,7 @@ func (b *MemoryBroker) Publish(ch string, data []byte, opts PublishOptions) (Str } b.saveResultToCache(ch, opts.IdempotencyKey, streamTop, resultExpireSeconds) } - return streamTop, false, b.eventHandler.HandlePublication(ch, pub, streamTop, prevPub) + return streamTop, false, b.eventHandler.HandlePublication(ch, pub, streamTop, opts.UseDelta, prevPub) } streamPosition := StreamPosition{} if opts.IdempotencyKey != "" { @@ -131,7 +131,7 @@ func (b *MemoryBroker) Publish(ch string, data []byte, opts PublishOptions) (Str } b.saveResultToCache(ch, opts.IdempotencyKey, streamPosition, resultExpireSeconds) } - return streamPosition, false, b.eventHandler.HandlePublication(ch, pub, StreamPosition{}, prevPub) + return streamPosition, false, b.eventHandler.HandlePublication(ch, pub, StreamPosition{}, opts.UseDelta, prevPub) } func (b *MemoryBroker) getResultFromCache(ch string, key string) (StreamPosition, bool) { diff --git a/broker_memory_test.go b/broker_memory_test.go index 5f4725f7..ba4361eb 100644 --- a/broker_memory_test.go +++ b/broker_memory_test.go @@ -141,7 +141,7 @@ func TestMemoryBrokerPublishIdempotent(t *testing.T) { numPubs := 0 e.eventHandler = &testBrokerEventHandler{ - HandlePublicationFunc: func(ch string, pub *Publication, sp StreamPosition, prevPub *Publication) error { + HandlePublicationFunc: func(ch string, pub *Publication, sp StreamPosition, delta bool, prevPub *Publication) error { numPubs++ return nil }, @@ -169,7 +169,7 @@ func TestMemoryBrokerPublishIdempotentWithHistory(t *testing.T) { numPubs := 0 e.eventHandler = &testBrokerEventHandler{ - HandlePublicationFunc: func(ch string, pub *Publication, sp StreamPosition, prevPub *Publication) error { + HandlePublicationFunc: func(ch string, pub *Publication, sp StreamPosition, delta bool, prevPub *Publication) error { numPubs++ return nil }, diff --git a/broker_redis.go b/broker_redis.go index 6e3adb5b..69cfecaf 100644 --- a/broker_redis.go +++ b/broker_redis.go @@ -614,6 +614,12 @@ func (b *RedisBroker) publish(s *shardWrapper, ch string, data []byte, opts Publ Info: infoToProto(opts.ClientInfo), Tags: opts.Tags, } + if opts.HistorySize <= 0 || opts.HistoryTTL <= 0 { + // In no history case we communicate delta flag over Publication field. This field is then + // cleaned up before passing to the Node layer when handling Redis message. + protoPub.Delta = opts.UseDelta + } + byteMessage, err := protoPub.MarshalVT() if err != nil { return StreamPosition{}, false, err @@ -1019,15 +1025,21 @@ func (b *RedisBroker) handleRedisClientMessage(eventHandler BrokerEventHandler, // it to unmarshalled Publication. pub.Offset = sp.Offset } + if pub.Delta { + // In at most once scenario we are passing delta in Publication itself. But need to clean it + // before passing further. + delta = true + pub.Delta = false + } if delta && len(prevPayload) > 0 { var prevPub protocol.Publication err = prevPub.UnmarshalVT(prevPayload) if err != nil { return err } - _ = eventHandler.HandlePublication(channel, pubFromProto(&pub), sp, pubFromProto(&prevPub)) + _ = eventHandler.HandlePublication(channel, pubFromProto(&pub), sp, true, pubFromProto(&prevPub)) } else { - _ = eventHandler.HandlePublication(channel, pubFromProto(&pub), sp, nil) + _ = eventHandler.HandlePublication(channel, pubFromProto(&pub), sp, delta, nil) } } else if pushType == joinPushType { var info protocol.ClientInfo diff --git a/channel_medium.go b/channel_medium.go index 5ceeee9a..0dfff13d 100644 --- a/channel_medium.go +++ b/channel_medium.go @@ -97,13 +97,14 @@ type queuedPub struct { pub *Publication sp StreamPosition prevPub *Publication + delta bool isInsufficientState bool } const defaultChannelLayerQueueMaxSize = 16 * 1024 * 1024 -func (c *channelMedium) broadcastPublication(pub *Publication, sp StreamPosition, prevPub *Publication) { - bp := queuedPub{pub: pub, sp: sp, prevPub: prevPub} +func (c *channelMedium) broadcastPublication(pub *Publication, sp StreamPosition, delta bool, prevPub *Publication) { + bp := queuedPub{pub: pub, sp: sp, prevPub: prevPub, delta: delta} c.mu.Lock() c.positionCheckTime = channelMediumTimeNow().UnixNano() c.mu.Unlock() @@ -151,7 +152,7 @@ func (c *channelMedium) broadcast(qp queuedPub) { prevPub := qp.prevPub var localPrevPub *Publication useLocalLatestPub := c.options.KeepLatestPublication && !qp.isInsufficientState - if useLocalLatestPub { + if useLocalLatestPub && qp.delta { localPrevPub = c.latestPublication } if c.options.broadcastDelay > 0 && !c.options.KeepLatestPublication { diff --git a/channel_medium_test.go b/channel_medium_test.go index b1e9be86..4edb753f 100644 --- a/channel_medium_test.go +++ b/channel_medium_test.go @@ -77,7 +77,7 @@ func TestChannelMediumHandlePublication(t *testing.T) { pub := &Publication{Data: []byte("test data")} sp := StreamPosition{Offset: 1} - cache.broadcastPublication(pub, sp, nil) + cache.broadcastPublication(pub, sp, false, nil) select { case <-doneCh: diff --git a/client_test.go b/client_test.go index 63642052..e6465fd6 100644 --- a/client_test.go +++ b/client_test.go @@ -1545,7 +1545,7 @@ func TestClientPublishNotAvailable(t *testing.T) { type testBrokerEventHandler struct { // Publication must register callback func to handle Publications received. - HandlePublicationFunc func(ch string, pub *Publication, sp StreamPosition, prevPub *Publication) error + HandlePublicationFunc func(ch string, pub *Publication, sp StreamPosition, delta bool, prevPub *Publication) error // Join must register callback func to handle Join messages received. HandleJoinFunc func(ch string, info *ClientInfo) error // Leave must register callback func to handle Leave messages received. @@ -1554,9 +1554,9 @@ type testBrokerEventHandler struct { HandleControlFunc func([]byte) error } -func (b *testBrokerEventHandler) HandlePublication(ch string, pub *Publication, sp StreamPosition, prevPub *Publication) error { +func (b *testBrokerEventHandler) HandlePublication(ch string, pub *Publication, sp StreamPosition, delta bool, prevPub *Publication) error { if b.HandlePublicationFunc != nil { - return b.HandlePublicationFunc(ch, pub, sp, prevPub) + return b.HandlePublicationFunc(ch, pub, sp, delta, prevPub) } return nil } @@ -1602,7 +1602,7 @@ func TestClientPublishHandler(t *testing.T) { connectClientV2(t, client) node.broker.(*MemoryBroker).eventHandler = &testBrokerEventHandler{ - HandlePublicationFunc: func(ch string, pub *Publication, sp StreamPosition, prevPub *Publication) error { + HandlePublicationFunc: func(ch string, pub *Publication, sp StreamPosition, delta bool, prevPub *Publication) error { var msg testClientMessage err := json.Unmarshal(pub.Data, &msg) require.NoError(t, err) diff --git a/node.go b/node.go index 9df9dfb0..8161f36f 100644 --- a/node.go +++ b/node.go @@ -1594,7 +1594,7 @@ type brokerEventHandler struct { } // HandlePublication coming from Broker. -func (h *brokerEventHandler) HandlePublication(ch string, pub *Publication, sp StreamPosition, prevPub *Publication) error { +func (h *brokerEventHandler) HandlePublication(ch string, pub *Publication, sp StreamPosition, delta bool, prevPub *Publication) error { if pub == nil { panic("nil Publication received, this must never happen") } @@ -1604,7 +1604,7 @@ func (h *brokerEventHandler) HandlePublication(ch string, pub *Publication, sp S medium, ok := h.node.mediums[ch] mu.Unlock() if ok { - medium.broadcastPublication(pub, sp, prevPub) + medium.broadcastPublication(pub, sp, delta, prevPub) return nil } } diff --git a/node_test.go b/node_test.go index a31b7edc..df8b0dd4 100644 --- a/node_test.go +++ b/node_test.go @@ -1175,7 +1175,7 @@ func TestBrokerEventHandler_PanicsOnNil(t *testing.T) { defer func() { _ = node.Shutdown(context.Background()) }() handler := &brokerEventHandler{node: node} require.Panics(t, func() { - _ = handler.HandlePublication("test", nil, StreamPosition{}, nil) + _ = handler.HandlePublication("test", nil, StreamPosition{}, false, nil) }) require.Panics(t, func() { _ = handler.HandleJoin("test", nil)