Skip to content

Commit

Permalink
ReadBulkRelationships init
Browse files Browse the repository at this point in the history
Signed-off-by: Kartikay <[email protected]>
  • Loading branch information
kartikaysaxena committed Jan 7, 2025
1 parent d2f7721 commit 4703caf
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 0 deletions.
46 changes: 46 additions & 0 deletions internal/services/v1/hash.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package v1

import (
"sort"
"strconv"
"strings"

v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
"google.golang.org/protobuf/types/known/structpb"
Expand Down Expand Up @@ -56,6 +58,50 @@ func computeReadRelationshipsRequestHash(req *v1.ReadRelationshipsRequest) (stri
})
}

func computeReadRelationshipsRequestItemHash(req *v1.ReadBulkRelationshipsRequestItem) (string, error) {
osf := req.RelationshipFilter.OptionalSubjectFilter
if osf == nil {
osf = &v1.SubjectFilter{}
}

srf := "(none)"
if osf.OptionalRelation != nil {
srf = osf.OptionalRelation.Relation
}

return computeCallHash("v1.readrelationships", nil, map[string]any{
"filter-resource-type": req.RelationshipFilter.ResourceType,
"filter-relation": req.RelationshipFilter.OptionalRelation,
"filter-resource-id": req.RelationshipFilter.OptionalResourceId,
"subject-type": osf.SubjectType,
"subject-relation": srf,
"subject-resource-id": osf.OptionalSubjectId,
"limit": req.OptionalLimit,
})
}

func computeReadBulkRelationshipsHash(req *v1.ReadBulkRelationshipsRequest) (string, error) {
itemHashes := make([]string, 0, len(req.Items))

for _, item := range req.Items {
if item == nil {
continue
}

itemHash, err := computeReadRelationshipsRequestItemHash(item)
if err != nil {
return "", err
}
itemHashes = append(itemHashes, itemHash)
}

sort.Strings(itemHashes)

return computeCallHash("v1.readbulkrelationships", req.Consistency, map[string]any{
"items": strings.Join(itemHashes, ","),
})
}

func computeLRRequestHash(req *v1.LookupResourcesRequest) (string, error) {
return computeCallHash("v1.lookupresources", req.Consistency, map[string]any{
"resource-type": req.ResourceObjectType,
Expand Down
167 changes: 167 additions & 0 deletions internal/services/v1/relationships.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package v1

import (
"context"
"errors"
"fmt"
"sync"
"time"

v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
Expand All @@ -23,6 +25,7 @@ import (
"github.com/authzed/spicedb/internal/namespace"
"github.com/authzed/spicedb/internal/relationships"
"github.com/authzed/spicedb/internal/services/shared"
"github.com/authzed/spicedb/internal/taskrunner"
"github.com/authzed/spicedb/pkg/cursor"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
Expand Down Expand Up @@ -272,6 +275,170 @@ func (ps *permissionServer) ReadRelationships(req *v1.ReadRelationshipsRequest,
return nil
}

func (ps *permissionServer) ReadBulkRelationships(ctx context.Context, req *v1.ReadBulkRelationshipsRequest) (*v1.ReadBulkRelationshipsResponse, error) {
atRevision, revisionReadAt, err := consistency.RevisionFromContext(ctx)
if err != nil {
return nil, ps.rewriteError(ctx, err)
}

itemCount, err := genutil.EnsureUInt32(len(req.Items))
if err != nil {
return nil, ps.rewriteError(ctx, err)
}

usagemetrics.SetInContext(ctx, &dispatchv1.ResponseMeta{
DispatchCount: itemCount,
})

rbrRequestHash, err := computeReadBulkRelationshipsHash(req)
if err != nil {
return nil, ps.rewriteError(ctx, err)
}

itemIndexByHash := mapz.NewMultiMapWithCap[string, int](itemCount)
itemHashes := make(map[*v1.ReadBulkRelationshipsRequestItem]string, itemCount)
for index, item := range req.Items {
if item == nil {
continue
}

itemHash, err := computeReadRelationshipsRequestItemHash(item)
if err != nil {
return nil, ps.rewriteError(ctx, err)
}

itemHashes[item] = itemHash
itemIndexByHash.Add(itemHash, index)
}

responsePairs := make([]*v1.ReadBulkRelationshipsPair, len(req.Items))
bulkResponseMutex := sync.Mutex{}

dispatchCursor := &dispatchv1.Cursor{
DispatchVersion: 1,
Sections: []string{""},
}

var startCursor options.Cursor
if req.OptionalCursor != nil {
decodedCursor, _, err := cursor.DecodeToDispatchCursor(req.OptionalCursor, rbrRequestHash)
if err != nil {
return nil, ps.rewriteError(ctx, err)
}

if len(decodedCursor.Sections) != 1 {
return nil, ps.rewriteError(ctx, NewInvalidCursorErr("did not find expected resume relationship"))
}

parsed, err := tuple.Parse(decodedCursor.Sections[0])
if err != nil {
return nil, ps.rewriteError(ctx, NewInvalidCursorErr("could not parse resume relationship"))
}

startCursor = options.ToCursor(parsed)
}

var afterResultCursor *v1.Cursor

// Task runner for concurrent reads
tr := taskrunner.NewPreloadedTaskRunner(ctx, uint16(ps.config.MaxReadRelationshipsLimit), len(req.Items))

for i, item := range req.Items {
_, item := i, item
tr.Add(func(ctx context.Context) error {
ds := datastoremw.MustFromContext(ctx).SnapshotReader(atRevision)
if err := validateRelationshipsFilter(ctx, item.RelationshipFilter, ds); err != nil {
return ps.rewriteError(ctx, err)
}

limit := uint64(0)
pageSize := ps.config.MaxDatastoreReadPageSize
if item.OptionalLimit > 0 {
limit = uint64(item.OptionalLimit)
if limit < pageSize {
pageSize = limit
}
}

dsFilter, err := datastore.RelationshipsFilterFromPublicFilter(item.RelationshipFilter)
if err != nil {
return ps.rewriteError(ctx, fmt.Errorf("error filtering: %w", err))
}

it, err := pagination.NewPaginatedIterator(
ctx,
ds,
dsFilter,
pageSize,
options.ByResource,
startCursor,
)
if err != nil {
return ps.rewriteError(ctx, err)
}

for rel, err := range it {
if err != nil {
return ps.rewriteError(ctx, fmt.Errorf("error when reading tuples: %w", err))
}

dispatchCursor.Sections[0] = tuple.StringWithoutCaveatOrExpiration(rel)
encodedCursor, err := cursor.EncodeFromDispatchCursor(dispatchCursor, rbrRequestHash, atRevision, nil)
if err != nil {
return ps.rewriteError(ctx, err)
}
afterResultCursor = encodedCursor

responseRelation := &v1.Relationship{
Resource: &v1.ObjectReference{},
Subject: &v1.SubjectReference{
Object: &v1.ObjectReference{},
},
}

tuple.CopyToV1Relationship(rel, responseRelation)

responsePairItem := &v1.ReadBulkRelationshipsPair_Item{
Item: &v1.ReadBulkRelationshipsResponseItem{
Relationships: responseRelation,
},
}

bulkResponseMutex.Lock()
defer bulkResponseMutex.Unlock()

itemHash, exists := itemHashes[item]
if !exists {
return ps.rewriteError(ctx, errors.New("missing item hash"))
}

found, ok := itemIndexByHash.Get(itemHash)
if !ok {
return ps.rewriteError(ctx, errors.New("missing expected item hash"))
}

for _, index := range found {
responsePairs[index] = &v1.ReadBulkRelationshipsPair{
Request: item,
Response: responsePairItem,
}
}
}
return nil
})
}

if err := tr.StartAndWait(); err != nil {
return nil, ps.rewriteError(ctx, err)
}

return &v1.ReadBulkRelationshipsResponse{
ReadAt: revisionReadAt,
Pairs: responsePairs,
AfterResultCursor: afterResultCursor,
}, nil
}

func (ps *permissionServer) WriteRelationships(ctx context.Context, req *v1.WriteRelationshipsRequest) (*v1.WriteRelationshipsResponse, error) {
if err := ps.validateTransactionMetadata(req.OptionalTransactionMetadata); err != nil {
return nil, ps.rewriteError(ctx, err)
Expand Down

0 comments on commit 4703caf

Please sign in to comment.