Skip to content

Commit

Permalink
Query by tags
Browse files Browse the repository at this point in the history
  • Loading branch information
boreq committed Jun 27, 2023
1 parent 4e7789f commit cb20547
Show file tree
Hide file tree
Showing 16 changed files with 493 additions and 59 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ recreate-emulator:
sudo docker-compose -f ./docker-compose-integration.yml up -d

.PHONY: test-integration
test-integration: recreate-emulator
test-integration:
FIRESTORE_EMULATOR_HOST=localhost:8200 go test -tags=test_integration -v ./...

.PHONY: tidy
Expand Down
2 changes: 2 additions & 0 deletions cmd/notification-service/di/inject_adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ var firestoreTxAdaptersSet = wire.NewSet(

firestore.NewPublicKeyRepository,
wire.Bind(new(app.PublicKeyRepository), new(*firestore.PublicKeyRepository)),

firestore.NewTagRepository,
)

var adaptersSet = wire.NewSet(
Expand Down
37 changes: 20 additions & 17 deletions cmd/notification-service/di/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

83 changes: 83 additions & 0 deletions integration_tests/get_events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//go:build test_integration

package integration_tests

import (
"context"
"testing"
"time"

"github.com/nbd-wtf/go-nostr"
"github.com/planetary-social/go-notification-service/internal/fixtures"
"github.com/planetary-social/go-notification-service/service/app"
"github.com/planetary-social/go-notification-service/service/domain"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestGetEvents(t *testing.T) {
ctx := fixtures.Context(t)
_, service := createService(ctx, t)

_, sk1 := fixtures.SomeKeyPair()
pk2, _ := fixtures.SomeKeyPair()

libevent := nostr.Event{
CreatedAt: nostr.Now(),
Kind: domain.EventKindNote.Int(),
Tags: nostr.Tags{
{"p", pk2.Hex()},
},
Content: "some content",
}

err := libevent.Sign(sk1)
require.NoError(t, err)

event, err := domain.NewEvent(libevent)
require.NoError(t, err)

cmd := app.NewProcessReceivedEvent(fixtures.SomeRelayAddress(), event)
err = service.App().Commands.ProcessReceivedEvent.Handle(ctx, cmd)
require.NoError(t, err)

require.EventuallyWithT(t, func(c *assert.CollectT) {
since := nostr.Timestamp(time.Now().Add(-1 * time.Hour).Unix())

filters, err := domain.NewFilters(nostr.Filters{
{
IDs: nil,
Kinds: []int{domain.EventKindNote.Int(), domain.EventKindReaction.Int()},
Authors: nil,
Tags: nostr.TagMap{
"p": []string{pk2.Hex()},
},
Since: &since,
Until: nil,
Limit: 0,
Search: "",
},
})
require.NoError(t, err)

var events []domain.Event

ctx, cancel := context.WithCancel(ctx)
defer cancel()

for v := range service.App().Queries.GetEvents.Handle(ctx, filters) {
if err := v.Err(); err != nil {
c.Errorf("error: %s", err)
}

if v.EOSE() {
break
}

events = append(events, v.Event())
}

assert.Len(t, events, 1)
assert.Equal(t, event.Id(), events[0].Id())
}, durationTimeout, durationTick)
}
4 changes: 2 additions & 2 deletions integration_tests/registration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

const (
durationTimeout = 5 * time.Second
durationTimeout = 1 * time.Second
durationTick = 100 * time.Millisecond
)

Expand Down Expand Up @@ -117,7 +117,6 @@ func createService(ctx context.Context, tb testing.TB) (config.Config, di.Servic
tb.Cleanup(cleanup)

terminatedCh := make(chan error)

tb.Cleanup(func() {
if err := <-terminatedCh; err != nil {
if errors.Is(err, net.ErrClosed) {
Expand All @@ -131,6 +130,7 @@ func createService(ctx context.Context, tb testing.TB) (config.Config, di.Servic
tb.Cleanup(cancelRunCtx)
go func() {
terminatedCh <- service.Run(runCtx)
//service.Run(runCtx)
}()

return config, service
Expand Down
137 changes: 128 additions & 9 deletions service/adapters/firestore/repository_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,41 @@ import (
"github.com/planetary-social/go-notification-service/service/app"
"github.com/planetary-social/go-notification-service/service/domain"
"github.com/planetary-social/go-notification-service/service/domain/notifications"
"google.golang.org/api/iterator"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
collectionEvents = "events"
collectionEventsNotifications = "notifications"

eventFieldId = "id"
eventFieldPublicKey = "publicKey"
eventFieldCreatedAt = "createdAt"
eventFieldKind = "kind"
eventFieldContent = "content"
eventFieldRaw = "raw"
)

type EventRepository struct {
client *firestore.Client
tx *firestore.Transaction
relayRepository *RelayRepository
tagRepository *TagRepository
}

func NewEventRepository(
client *firestore.Client,
tx *firestore.Transaction,
relayRepository *RelayRepository,
tagRepository *TagRepository,
) *EventRepository {
return &EventRepository{
client: client,
tx: tx,
relayRepository: relayRepository,
tagRepository: tagRepository,
}
}

Expand All @@ -40,6 +51,10 @@ func (e *EventRepository) Save(event domain.Event) error {
return errors.Wrap(err, "error saving under events")
}

if err := e.tagRepository.Save(event); err != nil {
return errors.Wrap(err, "error saving in tag repository")
}

return nil
}

Expand Down Expand Up @@ -75,16 +90,13 @@ func (e *EventRepository) SaveNotificationForEvent(notification notifications.No
}

func (e *EventRepository) saveUnderEvents(event domain.Event) error {
// todo how to handle tags? do we want to save tags in a searchable way?

eventDocPath := e.client.Collection(collectionEvents).Doc(event.Id().Hex())
eventDocData := map[string]any{
"id": event.Id().Hex(),
"publicKey": event.PubKey().Hex(),
"createdAt": event.CreatedAt(),
"kind": event.Kind().Int(),
"content": event.Content(),
"sig": event.Sig().Hex(),
eventFieldId: event.Id().Hex(),
eventFieldPublicKey: event.PubKey().Hex(),
eventFieldCreatedAt: event.CreatedAt(),
eventFieldKind: event.Kind().Int(),
eventFieldRaw: event.Raw(),
}
if err := e.tx.Set(eventDocPath, eventDocData, firestore.MergeAll); err != nil {
return errors.Wrap(err, "error updating the event doc")
Expand All @@ -100,8 +112,115 @@ func (e *EventRepository) GetEvents(ctx context.Context, filters domain.Filters)
}

func (e *EventRepository) getEvents(ctx context.Context, filters domain.Filters, ch chan<- app.EventOrError) {
defer close(ch)

events, err := e.loadEventsForFilters(ctx, filters)
if err != nil {
sendErr(ctx, ch, err)
return
}

for _, event := range events {
select {
case ch <- app.NewEventOrErrorWithEvent(event):
case <-ctx.Done():
}
}
}

func (e *EventRepository) loadEventsForFilters(ctx context.Context, filters domain.Filters) ([]domain.Event, error) {
events := make(map[string]domain.Event)

query := e.client.Collection(collectionEvents).Query

// either the compound OR queries don't work with the simulator or they don't work at all
// given how buggy the simulator has proven to be in the past maybe they work with the real firestore instance
for _, filter := range filters.Filters() {
if len(filter.Ids()) == 0 && len(filter.Kinds()) == 0 && len(filter.Authors()) == 0 && len(filter.Tags()) == 0 {
if err := e.loadEvents(ctx, query, events, filter); err != nil {
return nil, errors.Wrap(err, "error loading events")
}
} else {
for _, v := range filter.Ids() {
if err := e.loadEvents(ctx, query.Where(eventFieldId, "==", v.Hex()), events, filter); err != nil {
return nil, errors.Wrapf(err, "error loading events for id filter '%s'", v.Hex())
}
}

for _, v := range filter.Kinds() {
if err := e.loadEvents(ctx, query.Where(eventFieldKind, "==", v.Int()), events, filter); err != nil {
return nil, errors.Wrapf(err, "error loading events for kind filter '%d'", v.Int())
}
}

for _, v := range filter.Authors() {
if err := e.loadEvents(ctx, query.Where(eventFieldPublicKey, "==", v.Hex()), events, filter); err != nil {
return nil, errors.Wrapf(err, "error loading events for author filter '%s'", v.Hex())
}
}

for tagName, tagValues := range filter.Tags() {
for _, tagValue := range tagValues {
if err := e.tagRepository.GetEvents(ctx, tagName, tagValue, filter.Since(), filter.Until(), filter.Limit(), events); err != nil {
return nil, errors.Wrapf(err, "error loading events for tag '%s'->'%s'", tagName.String(), tagValue)
}
}
}
}
}

// it is in my opinion unclear how to apply the limit field with multiple filters
var result []domain.Event
for _, event := range events {
if filters.Match(event) {
result = append(result, event)
}
}
return result, nil
}

func (e *EventRepository) loadEvents(ctx context.Context, query firestore.Query, events map[string]domain.Event, filter domain.Filter) error {
if since := filter.Since(); since != nil {
query = query.Where(eventFieldCreatedAt, ">", since)
}

if until := filter.Until(); until != nil {
query = query.Where(eventFieldCreatedAt, "<", until)
}

if filter.Limit() > 0 {
query = query.Limit(filter.Limit())
}

docs := query.Documents(ctx)
for {
doc, err := docs.Next()
if err != nil {
if err == iterator.Done {
break
}
return errors.Wrap(err, "error getting next document")
}

data := make(map[string]any)
if err := doc.DataTo(&data); err != nil {
return errors.Wrap(err, "error reading document data")
}

event, err := domain.NewEventFromRaw(data[eventFieldRaw].([]byte))
if err != nil {
return errors.Wrap(err, "error creating the event")
}

events[event.Id().Hex()] = event
}

return nil
}

func sendErr(ctx context.Context, ch chan<- app.EventOrError, err error) {
select {
case ch <- app.NewEventOrErrorWithError(errors.New("not implemented")):
case ch <- app.NewEventOrErrorWithError(err):
case <-ctx.Done():
}
}
Loading

0 comments on commit cb20547

Please sign in to comment.