Skip to content

Commit

Permalink
communicate publication delta option in no history case
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed May 19, 2024
1 parent a465a9d commit 5f51eef
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 18 deletions.
2 changes: 1 addition & 1 deletion broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type ClientInfo struct {
// BrokerEventHandler can handle messages received from PUB/SUB system.
type BrokerEventHandler interface {
// HandlePublication to handle received Publications.
HandlePublication(ch string, pub *Publication, sp StreamPosition, prevPub *Publication) error
HandlePublication(ch string, pub *Publication, sp StreamPosition, useDelta bool, prevPub *Publication) error
// HandleJoin to handle received Join messages.
HandleJoin(ch string, info *ClientInfo) error
// HandleLeave to handle received Leave messages.
Expand Down
4 changes: 2 additions & 2 deletions broker_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (b *MemoryBroker) Publish(ch string, data []byte, opts PublishOptions) (Str
}
b.saveResultToCache(ch, opts.IdempotencyKey, streamTop, resultExpireSeconds)
}
return streamTop, false, b.eventHandler.HandlePublication(ch, pub, streamTop, prevPub)
return streamTop, false, b.eventHandler.HandlePublication(ch, pub, streamTop, opts.UseDelta, prevPub)
}
streamPosition := StreamPosition{}
if opts.IdempotencyKey != "" {
Expand All @@ -131,7 +131,7 @@ func (b *MemoryBroker) Publish(ch string, data []byte, opts PublishOptions) (Str
}
b.saveResultToCache(ch, opts.IdempotencyKey, streamPosition, resultExpireSeconds)
}
return streamPosition, false, b.eventHandler.HandlePublication(ch, pub, StreamPosition{}, prevPub)
return streamPosition, false, b.eventHandler.HandlePublication(ch, pub, StreamPosition{}, opts.UseDelta, prevPub)
}

func (b *MemoryBroker) getResultFromCache(ch string, key string) (StreamPosition, bool) {
Expand Down
4 changes: 2 additions & 2 deletions broker_memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestMemoryBrokerPublishIdempotent(t *testing.T) {
numPubs := 0

e.eventHandler = &testBrokerEventHandler{
HandlePublicationFunc: func(ch string, pub *Publication, sp StreamPosition, prevPub *Publication) error {
HandlePublicationFunc: func(ch string, pub *Publication, sp StreamPosition, delta bool, prevPub *Publication) error {
numPubs++
return nil
},
Expand Down Expand Up @@ -169,7 +169,7 @@ func TestMemoryBrokerPublishIdempotentWithHistory(t *testing.T) {
numPubs := 0

e.eventHandler = &testBrokerEventHandler{
HandlePublicationFunc: func(ch string, pub *Publication, sp StreamPosition, prevPub *Publication) error {
HandlePublicationFunc: func(ch string, pub *Publication, sp StreamPosition, delta bool, prevPub *Publication) error {
numPubs++
return nil
},
Expand Down
16 changes: 14 additions & 2 deletions broker_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,12 @@ func (b *RedisBroker) publish(s *shardWrapper, ch string, data []byte, opts Publ
Info: infoToProto(opts.ClientInfo),
Tags: opts.Tags,
}
if opts.HistorySize <= 0 || opts.HistoryTTL <= 0 {
// In no history case we communicate delta flag over Publication field. This field is then
// cleaned up before passing to the Node layer when handling Redis message.
protoPub.Delta = opts.UseDelta
}

byteMessage, err := protoPub.MarshalVT()
if err != nil {
return StreamPosition{}, false, err
Expand Down Expand Up @@ -1019,15 +1025,21 @@ func (b *RedisBroker) handleRedisClientMessage(eventHandler BrokerEventHandler,
// it to unmarshalled Publication.
pub.Offset = sp.Offset
}
if pub.Delta {
// In at most once scenario we are passing delta in Publication itself. But need to clean it
// before passing further.
delta = true
pub.Delta = false
}
if delta && len(prevPayload) > 0 {
var prevPub protocol.Publication
err = prevPub.UnmarshalVT(prevPayload)
if err != nil {
return err
}
_ = eventHandler.HandlePublication(channel, pubFromProto(&pub), sp, pubFromProto(&prevPub))
_ = eventHandler.HandlePublication(channel, pubFromProto(&pub), sp, true, pubFromProto(&prevPub))
} else {
_ = eventHandler.HandlePublication(channel, pubFromProto(&pub), sp, nil)
_ = eventHandler.HandlePublication(channel, pubFromProto(&pub), sp, delta, nil)
}
} else if pushType == joinPushType {
var info protocol.ClientInfo
Expand Down
7 changes: 4 additions & 3 deletions channel_medium.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,14 @@ type queuedPub struct {
pub *Publication
sp StreamPosition
prevPub *Publication
delta bool
isInsufficientState bool
}

const defaultChannelLayerQueueMaxSize = 16 * 1024 * 1024

func (c *channelMedium) broadcastPublication(pub *Publication, sp StreamPosition, prevPub *Publication) {
bp := queuedPub{pub: pub, sp: sp, prevPub: prevPub}
func (c *channelMedium) broadcastPublication(pub *Publication, sp StreamPosition, delta bool, prevPub *Publication) {
bp := queuedPub{pub: pub, sp: sp, prevPub: prevPub, delta: delta}
c.mu.Lock()
c.positionCheckTime = channelMediumTimeNow().UnixNano()
c.mu.Unlock()
Expand Down Expand Up @@ -151,7 +152,7 @@ func (c *channelMedium) broadcast(qp queuedPub) {
prevPub := qp.prevPub
var localPrevPub *Publication
useLocalLatestPub := c.options.KeepLatestPublication && !qp.isInsufficientState
if useLocalLatestPub {
if useLocalLatestPub && qp.delta {
localPrevPub = c.latestPublication
}
if c.options.broadcastDelay > 0 && !c.options.KeepLatestPublication {
Expand Down
2 changes: 1 addition & 1 deletion channel_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestChannelMediumHandlePublication(t *testing.T) {
pub := &Publication{Data: []byte("test data")}
sp := StreamPosition{Offset: 1}

cache.broadcastPublication(pub, sp, nil)
cache.broadcastPublication(pub, sp, false, nil)

select {
case <-doneCh:
Expand Down
8 changes: 4 additions & 4 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1545,7 +1545,7 @@ func TestClientPublishNotAvailable(t *testing.T) {

type testBrokerEventHandler struct {
// Publication must register callback func to handle Publications received.
HandlePublicationFunc func(ch string, pub *Publication, sp StreamPosition, prevPub *Publication) error
HandlePublicationFunc func(ch string, pub *Publication, sp StreamPosition, delta bool, prevPub *Publication) error
// Join must register callback func to handle Join messages received.
HandleJoinFunc func(ch string, info *ClientInfo) error
// Leave must register callback func to handle Leave messages received.
Expand All @@ -1554,9 +1554,9 @@ type testBrokerEventHandler struct {
HandleControlFunc func([]byte) error
}

func (b *testBrokerEventHandler) HandlePublication(ch string, pub *Publication, sp StreamPosition, prevPub *Publication) error {
func (b *testBrokerEventHandler) HandlePublication(ch string, pub *Publication, sp StreamPosition, delta bool, prevPub *Publication) error {
if b.HandlePublicationFunc != nil {
return b.HandlePublicationFunc(ch, pub, sp, prevPub)
return b.HandlePublicationFunc(ch, pub, sp, delta, prevPub)
}
return nil
}
Expand Down Expand Up @@ -1602,7 +1602,7 @@ func TestClientPublishHandler(t *testing.T) {
connectClientV2(t, client)

node.broker.(*MemoryBroker).eventHandler = &testBrokerEventHandler{
HandlePublicationFunc: func(ch string, pub *Publication, sp StreamPosition, prevPub *Publication) error {
HandlePublicationFunc: func(ch string, pub *Publication, sp StreamPosition, delta bool, prevPub *Publication) error {
var msg testClientMessage
err := json.Unmarshal(pub.Data, &msg)
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1594,7 +1594,7 @@ type brokerEventHandler struct {
}

// HandlePublication coming from Broker.
func (h *brokerEventHandler) HandlePublication(ch string, pub *Publication, sp StreamPosition, prevPub *Publication) error {
func (h *brokerEventHandler) HandlePublication(ch string, pub *Publication, sp StreamPosition, delta bool, prevPub *Publication) error {
if pub == nil {
panic("nil Publication received, this must never happen")
}
Expand All @@ -1604,7 +1604,7 @@ func (h *brokerEventHandler) HandlePublication(ch string, pub *Publication, sp S
medium, ok := h.node.mediums[ch]
mu.Unlock()
if ok {
medium.broadcastPublication(pub, sp, prevPub)
medium.broadcastPublication(pub, sp, delta, prevPub)
return nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,7 @@ func TestBrokerEventHandler_PanicsOnNil(t *testing.T) {
defer func() { _ = node.Shutdown(context.Background()) }()
handler := &brokerEventHandler{node: node}
require.Panics(t, func() {
_ = handler.HandlePublication("test", nil, StreamPosition{}, nil)
_ = handler.HandlePublication("test", nil, StreamPosition{}, false, nil)
})
require.Panics(t, func() {
_ = handler.HandleJoin("test", nil)
Expand Down

0 comments on commit 5f51eef

Please sign in to comment.