Skip to content

Commit

Permalink
[m3aggregator] Enable m3agg consumer writers to be able to use multip…
Browse files Browse the repository at this point in the history
…le filters (#4223)

* Enable m3agg consumer writers to be able to use both a storage policy filter and a consumer service filter
  • Loading branch information
saad-zaman authored Apr 19, 2023
1 parent 181430f commit 69e3675
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 40 deletions.
12 changes: 6 additions & 6 deletions src/msg/producer/producer_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions src/msg/producer/ref_counted.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,16 @@ func NewRefCountedMessage(m Message, fn OnFinalizeFn) *RefCountedMessage {
}

// Accept returns true if the message can be accepted by the filter.
func (rm *RefCountedMessage) Accept(fn FilterFunc) bool {
return fn(rm.Message)
func (rm *RefCountedMessage) Accept(fn []FilterFunc) bool {
if len(fn) == 0 {
return false
}
for _, f := range fn {
if !f(rm.Message) {
return false
}
}
return true
}

// IncRef increments the ref count.
Expand Down
15 changes: 13 additions & 2 deletions src/msg/producer/ref_counted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,26 @@ func TestRefCountedMessageFilter(t *testing.T) {
return m.Shard() == 0
}

sizeFilter := func(m Message) bool {
called++
return m.Size() == 0
}

mm := NewMockMessage(ctrl)
mm.EXPECT().Size().Return(0)
rm := NewRefCountedMessage(mm, nil)

mm.EXPECT().Shard().Return(uint32(0))
require.True(t, rm.Accept(filter))
require.True(t, rm.Accept([]FilterFunc{filter}))

mm.EXPECT().Shard().Return(uint32(1))
require.False(t, rm.Accept(filter))
require.False(t, rm.Accept([]FilterFunc{filter}))

mm.EXPECT().Shard().Return(uint32(0))
mm.EXPECT().Size().Return(0)
require.True(t, rm.Accept([]FilterFunc{filter, sizeFilter}))

require.False(t, rm.Accept([]FilterFunc{}))
}

func TestRefCountedMessageOnDropFn(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions src/msg/producer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ type Producer interface {
// RegisterFilter registers a filter to a consumer service.
RegisterFilter(sid services.ServiceID, fn FilterFunc)

// UnregisterFilter unregisters the filter of a consumer service.
UnregisterFilter(sid services.ServiceID)
// UnregisterFilters unregisters the filter of a consumer service.
UnregisterFilters(sid services.ServiceID)

// NumShards returns the total number of shards of the topic the producer is
// producing to.
Expand Down Expand Up @@ -125,8 +125,8 @@ type Writer interface {
// RegisterFilter registers a filter to a consumer service.
RegisterFilter(sid services.ServiceID, fn FilterFunc)

// UnregisterFilter unregisters the filter of a consumer service.
UnregisterFilter(sid services.ServiceID)
// UnregisterFilters unregisters the filters of a consumer service.
UnregisterFilters(sid services.ServiceID)

// NumShards returns the total number of shards of the topic the writer is
// writing to.
Expand Down
17 changes: 9 additions & 8 deletions src/msg/producer/writer/consumer_service_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ type consumerServiceWriter interface {
// RegisterFilter registers a filter for the consumer service.
RegisterFilter(fn producer.FilterFunc)

// UnregisterFilter unregisters the filter for the consumer service.
UnregisterFilter()
// UnregisterFilters unregisters the filters for the consumer service.
UnregisterFilters()
}

type consumerServiceWriterMetrics struct {
Expand Down Expand Up @@ -107,7 +107,7 @@ type consumerServiceWriterImpl struct {
logger *zap.Logger

value watch.Value
dataFilter producer.FilterFunc
dataFilters []producer.FilterFunc
router ackRouter
consumerWriters map[string]consumerWriter
closed bool
Expand Down Expand Up @@ -140,7 +140,7 @@ func newConsumerServiceWriter(
shardWriters: initShardWriters(router, ct, numShards, opts),
opts: opts,
logger: opts.InstrumentOptions().Logger(),
dataFilter: acceptAllFilter,
dataFilters: []producer.FilterFunc{acceptAllFilter},
router: router,
consumerWriters: make(map[string]consumerWriter),
closed: false,
Expand Down Expand Up @@ -179,7 +179,7 @@ func initShardWriters(
}

func (w *consumerServiceWriterImpl) Write(rm *producer.RefCountedMessage) {
if rm.Accept(w.dataFilter) {
if rm.Accept(w.dataFilters) {
w.shardWriters[rm.Shard()].Write(rm)
w.m.filterAccepted.Inc(1)
return
Expand Down Expand Up @@ -328,13 +328,14 @@ func (w *consumerServiceWriterImpl) SetMessageTTLNanos(value int64) {

func (w *consumerServiceWriterImpl) RegisterFilter(filter producer.FilterFunc) {
w.Lock()
w.dataFilter = filter
w.dataFilters = append(w.dataFilters, filter)
w.Unlock()
}

func (w *consumerServiceWriterImpl) UnregisterFilter() {
func (w *consumerServiceWriterImpl) UnregisterFilters() {
w.Lock()
w.dataFilter = acceptAllFilter
w.dataFilters[0] = acceptAllFilter
w.dataFilters = w.dataFilters[:1]
w.Unlock()
}

Expand Down
12 changes: 6 additions & 6 deletions src/msg/producer/writer/consumer_service_writer_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 19 additions & 1 deletion src/msg/producer/writer/consumer_service_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,21 +512,39 @@ func TestConsumerServiceWriterFilter(t *testing.T) {
mm1 := producer.NewMockMessage(ctrl)
mm1.EXPECT().Shard().Return(uint32(1)).AnyTimes()
mm1.EXPECT().Size().Return(3).AnyTimes()
mm2 := producer.NewMockMessage(ctrl)
mm2.EXPECT().Shard().Return(uint32(0)).AnyTimes()
mm2.EXPECT().Size().Return(4).AnyTimes()

sw0.EXPECT().Write(gomock.Any())
csw.Write(producer.NewRefCountedMessage(mm0, nil))
sw1.EXPECT().Write(gomock.Any())
csw.Write(producer.NewRefCountedMessage(mm1, nil))

csw.RegisterFilter(func(m producer.Message) bool { return m.Shard() == uint32(0) })
// Write is not expected due to mm1 shard != 0
csw.Write(producer.NewRefCountedMessage(mm1, nil))

sw0.EXPECT().Write(gomock.Any())
// Write is expected due to mm0 shard == 0
csw.Write(producer.NewRefCountedMessage(mm0, nil))

csw.UnregisterFilter()
csw.RegisterFilter(func(m producer.Message) bool { return m.Size() == 3 })
sw0.EXPECT().Write(gomock.Any())
// Write is expected because to mm0 shard == 0 and mm0 size == 3
csw.Write(producer.NewRefCountedMessage(mm0, nil))

// Write is not expected because to mm2 size != 3
csw.Write(producer.NewRefCountedMessage(mm2, nil))

// All messages are expected to write after unregistering filters
csw.UnregisterFilters()
sw0.EXPECT().Write(gomock.Any())
csw.Write(producer.NewRefCountedMessage(mm0, nil))
sw1.EXPECT().Write(gomock.Any())
csw.Write(producer.NewRefCountedMessage(mm1, nil))
sw0.EXPECT().Write(gomock.Any())
csw.Write(producer.NewRefCountedMessage(mm2, nil))
}

func TestConsumerServiceWriterAllowInitValueErrorWithCreateWatchError(t *testing.T) {
Expand Down
21 changes: 14 additions & 7 deletions src/msg/producer/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type writer struct {
initType initType
numShards uint32
consumerServiceWriters map[string]consumerServiceWriter
filterRegistry map[string]producer.FilterFunc
filterRegistry map[string][]producer.FilterFunc
isClosed bool
m writerMetrics

Expand All @@ -87,7 +87,7 @@ func NewWriter(opts Options) producer.Writer {
logger: opts.InstrumentOptions().Logger(),
initType: failOnError,
consumerServiceWriters: make(map[string]consumerServiceWriter),
filterRegistry: make(map[string]producer.FilterFunc),
filterRegistry: make(map[string][]producer.FilterFunc),
isClosed: false,
m: newWriterMetrics(opts.InstrumentOptions().MetricsScope()),
}
Expand Down Expand Up @@ -221,8 +221,10 @@ func (w *writer) process(update interface{}) error {
// Apply the new consumer service writers.
w.Lock()
for key, csw := range newConsumerServiceWriters {
if filter, ok := w.filterRegistry[key]; ok {
csw.RegisterFilter(filter)
if filters, ok := w.filterRegistry[key]; ok {
for _, filter := range filters {
csw.RegisterFilter(filter)
}
}
}
w.consumerServiceWriters = newConsumerServiceWriters
Expand Down Expand Up @@ -264,21 +266,26 @@ func (w *writer) RegisterFilter(sid services.ServiceID, filter producer.FilterFu
defer w.Unlock()

key := sid.String()
w.filterRegistry[key] = filter
if _, ok := w.filterRegistry[key]; ok {
w.filterRegistry[key] = append(w.filterRegistry[key], filter)
} else {
w.filterRegistry[key] = []producer.FilterFunc{filter}
}

csw, ok := w.consumerServiceWriters[key]
if ok {
csw.RegisterFilter(filter)
}
}

func (w *writer) UnregisterFilter(sid services.ServiceID) {
func (w *writer) UnregisterFilters(sid services.ServiceID) {
w.Lock()
defer w.Unlock()

key := sid.String()
delete(w.filterRegistry, key)
csw, ok := w.consumerServiceWriters[key]
if ok {
csw.UnregisterFilter()
csw.UnregisterFilters()
}
}
19 changes: 15 additions & 4 deletions src/msg/producer/writer/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,21 +212,26 @@ func TestWriterRegisterFilter(t *testing.T) {

sid2 := services.NewServiceID().SetName("s2")
filter := func(producer.Message) bool { return false }
filter2 := func(producer.Message) bool { return true }

w := NewWriter(opts).(*writer)
w.consumerServiceWriters[cs1.ServiceID().String()] = csw1

csw1.EXPECT().UnregisterFilter()
w.UnregisterFilter(sid1)
csw1.EXPECT().UnregisterFilters()
w.UnregisterFilters(sid1)
_, ok := w.filterRegistry[sid1.String()]
require.True(t, !ok)

// Wrong service id triggers nothing.
w.RegisterFilter(sid2, filter)
_, ok = w.filterRegistry[sid2.String()]
require.True(t, ok)

csw1.EXPECT().RegisterFilter(gomock.Any())
w.RegisterFilter(sid1, filter)

csw1.EXPECT().UnregisterFilter()
w.UnregisterFilter(sid1)
csw1.EXPECT().UnregisterFilters()
w.UnregisterFilters(sid1)

csw1.EXPECT().RegisterFilter(gomock.Any())
w.RegisterFilter(sid1, filter)
Expand All @@ -238,6 +243,12 @@ func TestWriterRegisterFilter(t *testing.T) {
SetNumberOfShards(6).
SetConsumerServices([]topic.ConsumerService{cs1})
w.process(testTopic)

csw1.EXPECT().RegisterFilter(gomock.Any())
w.RegisterFilter(sid1, filter2)
require.True(t, len(w.filterRegistry[sid1.String()]) == 2)
csw1.EXPECT().UnregisterFilters()
w.UnregisterFilters(sid1)
}

func TestWriterTopicUpdate(t *testing.T) {
Expand Down

0 comments on commit 69e3675

Please sign in to comment.