Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf!: use topic selector store in subscriber #944

Merged
merged 2 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions bolt_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestBoltTransportHistory(t *testing.T) {
})
}

s := NewSubscriber("8", transport.logger)
s := NewSubscriber("8", transport.logger, &TopicSelectorStore{})
s.SetTopics(topics, nil)

require.NoError(t, transport.AddSubscriber(s))
Expand Down Expand Up @@ -68,7 +68,7 @@ func TestBoltTransportLogsBogusLastEventID(t *testing.T) {
Topics: topics,
})

s := NewSubscriber("711131", logger)
s := NewSubscriber("711131", logger, &TopicSelectorStore{})
s.SetTopics(topics, nil)

require.NoError(t, transport.AddSubscriber(s))
Expand All @@ -87,7 +87,7 @@ func TestBoltTopicSelectorHistory(t *testing.T) {
transport.Dispatch(&Update{Topics: []string{"http://example.com/subscribed-public-only"}, Private: true, Event: Event{ID: "3"}})
transport.Dispatch(&Update{Topics: []string{"http://example.com/subscribed-public-only"}, Event: Event{ID: "4"}})

s := NewSubscriber(EarliestLastEventID, transport.logger)
s := NewSubscriber(EarliestLastEventID, transport.logger, &TopicSelectorStore{})
s.SetTopics([]string{"http://example.com/subscribed", "http://example.com/subscribed-public-only"}, []string{"http://example.com/subscribed"})

require.NoError(t, transport.AddSubscriber(s))
Expand All @@ -109,7 +109,7 @@ func TestBoltTransportRetrieveAllHistory(t *testing.T) {
})
}

s := NewSubscriber(EarliestLastEventID, transport.logger)
s := NewSubscriber(EarliestLastEventID, transport.logger, &TopicSelectorStore{})
s.SetTopics(topics, nil)
require.NoError(t, transport.AddSubscriber(s))

Expand Down Expand Up @@ -139,7 +139,7 @@ func TestBoltTransportHistoryAndLive(t *testing.T) {
})
}

s := NewSubscriber("8", transport.logger)
s := NewSubscriber("8", transport.logger, &TopicSelectorStore{})
s.SetTopics(topics, nil)
require.NoError(t, transport.AddSubscriber(s))

Expand Down Expand Up @@ -221,7 +221,7 @@ func TestBoltTransportDoNotDispatchUntilListen(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

s := NewSubscriber("", transport.logger)
s := NewSubscriber("", transport.logger, &TopicSelectorStore{})
require.NoError(t, transport.AddSubscriber(s))

var wg sync.WaitGroup
Expand All @@ -245,7 +245,7 @@ func TestBoltTransportDispatch(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

s := NewSubscriber("", transport.logger)
s := NewSubscriber("", transport.logger, &TopicSelectorStore{})
s.SetTopics([]string{"https://example.com/foo", "https://example.com/private"}, []string{"https://example.com/private"})

require.NoError(t, transport.AddSubscriber(s))
Expand Down Expand Up @@ -274,7 +274,7 @@ func TestBoltTransportClosed(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

s := NewSubscriber("", transport.logger)
s := NewSubscriber("", transport.logger, &TopicSelectorStore{})
s.SetTopics([]string{"https://example.com/foo"}, nil)
require.NoError(t, transport.AddSubscriber(s))

Expand All @@ -293,11 +293,11 @@ func TestBoltCleanDisconnectedSubscribers(t *testing.T) {
defer transport.Close()
defer os.Remove("test.db")

s1 := NewSubscriber("", transport.logger)
s1 := NewSubscriber("", transport.logger, &TopicSelectorStore{})
s1.SetTopics([]string{"foo"}, []string{})
require.NoError(t, transport.AddSubscriber(s1))

s2 := NewSubscriber("", transport.logger)
s2 := NewSubscriber("", transport.logger, &TopicSelectorStore{})
s2.SetTopics([]string{"foo"}, []string{})
require.NoError(t, transport.AddSubscriber(s2))

Expand All @@ -318,10 +318,10 @@ func TestBoltGetSubscribers(t *testing.T) {
defer transport.Close()
defer os.Remove("test.db")

s1 := NewSubscriber("", transport.logger)
s1 := NewSubscriber("", transport.logger, &TopicSelectorStore{})
require.NoError(t, transport.AddSubscriber(s1))

s2 := NewSubscriber("", transport.logger)
s2 := NewSubscriber("", transport.logger, &TopicSelectorStore{})
require.NoError(t, transport.AddSubscriber(s2))

lastEventID, subscribers, err := transport.GetSubscribers()
Expand Down
3 changes: 2 additions & 1 deletion local_transport_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ func subBenchLocalTransport(b *testing.B, topics, concurrency, matchPct int, tes
}
}
out := make(chan *Update, 50000)
tss := &TopicSelectorStore{}
for i := 0; i < concurrency; i++ {
s := NewSubscriber("", zap.NewNop())
s := NewSubscriber("", zap.NewNop(), tss)
if i%100 < matchPct {
s.SetTopics(tsMatch, nil)
} else {
Expand Down
42 changes: 27 additions & 15 deletions local_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ import (
)

func TestLocalTransportDoNotDispatchUntilListen(t *testing.T) {
transport, _ := DeprecatedNewLocalTransport(&url.URL{Scheme: "local"}, zap.NewNop())
logger := zap.NewNop()
transport, _ := DeprecatedNewLocalTransport(&url.URL{Scheme: "local"}, logger)
defer transport.Close()
assert.Implements(t, (*Transport)(nil), transport)

u := &Update{Topics: []string{"http://example.com/books/1"}}
err := transport.Dispatch(u)
require.NoError(t, err)

s := NewSubscriber("", zap.NewNop())
s := NewSubscriber("", logger, &TopicSelectorStore{})
s.SetTopics(u.Topics, nil)
require.NoError(t, transport.AddSubscriber(s))

Expand All @@ -37,11 +38,12 @@ func TestLocalTransportDoNotDispatchUntilListen(t *testing.T) {
}

func TestLocalTransportDispatch(t *testing.T) {
transport, _ := DeprecatedNewLocalTransport(&url.URL{Scheme: "local"}, zap.NewNop())
logger := zap.NewNop()
transport, _ := DeprecatedNewLocalTransport(&url.URL{Scheme: "local"}, logger)
defer transport.Close()
assert.Implements(t, (*Transport)(nil), transport)

s := NewSubscriber("", zap.NewNop())
s := NewSubscriber("", logger, &TopicSelectorStore{})
s.SetTopics([]string{"http://example.com/foo"}, nil)
require.NoError(t, transport.AddSubscriber(s))

Expand All @@ -51,29 +53,35 @@ func TestLocalTransportDispatch(t *testing.T) {
}

func TestLocalTransportClosed(t *testing.T) {
transport, _ := DeprecatedNewLocalTransport(&url.URL{Scheme: "local"}, zap.NewNop())
logger := zap.NewNop()
transport, _ := DeprecatedNewLocalTransport(&url.URL{Scheme: "local"}, logger)
defer transport.Close()
assert.Implements(t, (*Transport)(nil), transport)

s := NewSubscriber("", zap.NewNop())
tss := &TopicSelectorStore{}

s := NewSubscriber("", logger, tss)
require.NoError(t, transport.AddSubscriber(s))
require.NoError(t, transport.Close())
assert.Equal(t, transport.AddSubscriber(NewSubscriber("", zap.NewNop())), ErrClosedTransport)
assert.Equal(t, transport.AddSubscriber(NewSubscriber("", logger, tss)), ErrClosedTransport)
assert.Equal(t, transport.Dispatch(&Update{}), ErrClosedTransport)

_, ok := <-s.out
assert.False(t, ok)
}

func TestLiveCleanDisconnectedSubscribers(t *testing.T) {
tr, _ := DeprecatedNewLocalTransport(&url.URL{Scheme: "local"}, zap.NewNop())
logger := zap.NewNop()
tr, _ := DeprecatedNewLocalTransport(&url.URL{Scheme: "local"}, logger)
transport := tr.(*LocalTransport)
defer transport.Close()

s1 := NewSubscriber("", zap.NewNop())
tss := &TopicSelectorStore{}

s1 := NewSubscriber("", logger, tss)
require.NoError(t, transport.AddSubscriber(s1))

s2 := NewSubscriber("", zap.NewNop())
s2 := NewSubscriber("", logger, tss)
require.NoError(t, transport.AddSubscriber(s2))

assert.Equal(t, 2, transport.subscribers.Len())
Expand All @@ -88,11 +96,12 @@ func TestLiveCleanDisconnectedSubscribers(t *testing.T) {
}

func TestLiveReading(t *testing.T) {
transport, _ := DeprecatedNewLocalTransport(&url.URL{Scheme: "local"}, zap.NewNop())
logger := zap.NewNop()
transport, _ := DeprecatedNewLocalTransport(&url.URL{Scheme: "local"}, logger)
defer transport.Close()
assert.Implements(t, (*Transport)(nil), transport)

s := NewSubscriber("", zap.NewNop())
s := NewSubscriber("", logger, &TopicSelectorStore{})
s.SetTopics([]string{"https://example.com"}, nil)
require.NoError(t, transport.AddSubscriber(s))

Expand All @@ -104,14 +113,17 @@ func TestLiveReading(t *testing.T) {
}

func TestLocalTransportGetSubscribers(t *testing.T) {
transport, _ := DeprecatedNewLocalTransport(&url.URL{Scheme: "local"}, zap.NewNop())
logger := zap.NewNop()
transport, _ := DeprecatedNewLocalTransport(&url.URL{Scheme: "local"}, logger)
defer transport.Close()
require.NotNil(t, transport)

s1 := NewSubscriber("", zap.NewNop())
tss := &TopicSelectorStore{}

s1 := NewSubscriber("", logger, tss)
require.NoError(t, transport.AddSubscriber(s1))

s2 := NewSubscriber("", zap.NewNop())
s2 := NewSubscriber("", logger, tss)
require.NoError(t, transport.AddSubscriber(s2))

lastEventID, subscribers, err := transport.(TransportSubscribers).GetSubscribers()
Expand Down
14 changes: 10 additions & 4 deletions metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@ import (
func TestNumberOfRunningSubscribers(t *testing.T) {
m := NewPrometheusMetrics(nil)

s1 := NewSubscriber("", zap.NewNop())
logger := zap.NewNop()
tss := &TopicSelectorStore{}

s1 := NewSubscriber("", logger, tss)
s1.SetTopics([]string{"topic1", "topic2"}, nil)
m.SubscriberConnected(s1)
assertGaugeValue(t, 1.0, m.subscribers)

s2 := NewSubscriber("", zap.NewNop())
s2 := NewSubscriber("", logger, tss)
s2.SetTopics([]string{"topic2"}, nil)
m.SubscriberConnected(s2)
assertGaugeValue(t, 2.0, m.subscribers)
Expand All @@ -32,12 +35,15 @@ func TestNumberOfRunningSubscribers(t *testing.T) {
func TestTotalNumberOfHandledSubscribers(t *testing.T) {
m := NewPrometheusMetrics(nil)

s1 := NewSubscriber("", zap.NewNop())
logger := zap.NewNop()
tss := &TopicSelectorStore{}

s1 := NewSubscriber("", logger, tss)
s1.SetTopics([]string{"topic1", "topic2"}, nil)
m.SubscriberConnected(s1)
assertCounterValue(t, 1.0, m.subscribersTotal)

s2 := NewSubscriber("", zap.NewNop())
s2 := NewSubscriber("", logger, tss)
s2.SetTopics([]string{"topic2"}, nil)
m.SubscriberConnected(s2)
assertCounterValue(t, 2.0, m.subscribersTotal)
Expand Down
4 changes: 2 additions & 2 deletions publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func TestPublishOK(t *testing.T) {
hub := createDummy()

topics := []string{"http://example.com/books/1"}
s := NewSubscriber("", zap.NewNop())
s := NewSubscriber("", zap.NewNop(), &TopicSelectorStore{})
s.SetTopics(topics, topics)
s.Claims = &claims{Mercure: mercureClaim{Subscribe: topics}}

Expand Down Expand Up @@ -238,7 +238,7 @@ func TestPublishNoData(t *testing.T) {
func TestPublishGenerateUUID(t *testing.T) {
h := createDummy()

s := NewSubscriber("", zap.NewNop())
s := NewSubscriber("", zap.NewNop(), &TopicSelectorStore{})
s.SetTopics([]string{"http://example.com/books/1"}, s.SubscribedTopics)

require.NoError(t, h.transport.AddSubscriber(s))
Expand Down
3 changes: 2 additions & 1 deletion subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) {

// registerSubscriber initializes the connection.
func (h *Hub) registerSubscriber(w http.ResponseWriter, r *http.Request) (*Subscriber, *responseController) {
s := NewSubscriber(retrieveLastEventID(r, h.opt, h.logger), h.logger)
s := NewSubscriber(retrieveLastEventID(r, h.opt, h.logger), h.logger, &TopicSelectorStore{})
s.topicSelectorStore = h.topicSelectorStore
s.Debug = h.debug
s.RemoteAddr = r.RemoteAddr
var privateTopics []string
Expand Down
10 changes: 8 additions & 2 deletions subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mercure
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -56,8 +57,13 @@ func (rt *responseTester) Write(buf []byte) (int, error) {
if rt.body == rt.expectedBody {
rt.cancel()
} else if !strings.HasPrefix(rt.expectedBody, rt.body) {
rt.t.Errorf(`Received body "%s" doesn't match expected body "%s"`, rt.body, rt.expectedBody)
rt.cancel()
defer rt.cancel()

mess := fmt.Sprintf(`Received body "%s" doesn't match expected body "%s"`, rt.body, rt.expectedBody)
if rt.t == nil {
panic(mess)
}
rt.t.Error(mess)
}

return len(buf), nil
Expand Down
Loading
Loading