Skip to content

Commit

Permalink
Merge pull request #1365 from josephschorr/parallel-cursored-rr
Browse files Browse the repository at this point in the history
Parallelize the entrypoint computation in ReachableResources
  • Loading branch information
josephschorr authored Jun 1, 2023
2 parents 8dafd04 + 8d5c7cd commit fb656d6
Show file tree
Hide file tree
Showing 11 changed files with 491 additions and 8 deletions.
3 changes: 2 additions & 1 deletion internal/dispatch/graph/reachableresources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -896,8 +896,9 @@ func TestReachableResourcesPaginationWithLimit(t *testing.T) {
defer cancel()

newFound := 0
existingCursor := cursor
for _, result := range stream.Results() {
require.True(t, foundResources.Add(result.Resource.ResourceId))
require.True(t, foundResources.Add(result.Resource.ResourceId), "found duplicate %s for iteration %d with cursor %s", result.Resource.ResourceId, i, existingCursor)
newFound++

cursor = result.AfterResponseCursor
Expand Down
34 changes: 34 additions & 0 deletions internal/dispatch/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dispatch
import (
"context"
"sync"
"sync/atomic"

grpc "google.golang.org/grpc"
)
Expand Down Expand Up @@ -146,8 +147,41 @@ func (s *HandlingDispatchStream[T]) Context() context.Context {
return s.ctx
}

// CountingDispatchStream is a dispatch stream that counts the number of items published.
// It uses an internal atomic int to ensure it is thread safe.
type CountingDispatchStream[T any] struct {
Stream Stream[T]
count *atomic.Uint64
}

func NewCountingDispatchStream[T any](wrapped Stream[T]) *CountingDispatchStream[T] {
return &CountingDispatchStream[T]{
Stream: wrapped,
count: &atomic.Uint64{},
}
}

func (s *CountingDispatchStream[T]) PublishedCount() uint64 {
return s.count.Load()
}

func (s *CountingDispatchStream[T]) Publish(result T) error {
err := s.Stream.Publish(result)
if err != nil {
return err
}

s.count.Add(1)
return nil
}

func (s *CountingDispatchStream[T]) Context() context.Context {
return s.Stream.Context()
}

// Ensure the streams implement the interface.
var (
_ Stream[any] = &CollectingDispatchStream[any]{}
_ Stream[any] = &WrappedDispatchStream[any]{}
_ Stream[any] = &CountingDispatchStream[any]{}
)
208 changes: 208 additions & 0 deletions internal/graph/cursors.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package graph

import (
"context"
"errors"
"strconv"
"sync"

"golang.org/x/exp/slices"

"github.com/authzed/spicedb/internal/dispatch"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
v1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1"
Expand Down Expand Up @@ -56,6 +60,16 @@ func (ci cursorInformation) responsePartialCursor() *v1.Cursor {
}
}

func (ci cursorInformation) withClonedLimits(ctx context.Context) (cursorInformation, context.Context) {
cloned, ctx := ci.limits.clone(ctx)
return cursorInformation{
currentCursor: ci.currentCursor,
outgoingCursorSections: ci.outgoingCursorSections,
limits: cloned,
revision: ci.revision,
}, ctx
}

// hasHeadSection returns true if the current cursor has the given name as the prefix of the cursor.
func (ci cursorInformation) hasHeadSection(name string) (bool, error) {
if ci.currentCursor == nil || len(ci.currentCursor.Sections) == 0 {
Expand Down Expand Up @@ -310,3 +324,197 @@ func combineCursors(cursor *v1.Cursor, toAdd *v1.Cursor) (*v1.Cursor, error) {
Sections: append(slices.Clone(cursor.Sections), toAdd.Sections...),
}, nil
}

// withParallelizedStreamingIterableInCursor executes the given handler for each item in the items list, skipping any
// items marked as completed at the head of the cursor and injecting a cursor representing the current
// item.
//
// For example, if items contains 3 items, and the cursor returned was within the handler for item
// index #1, then item index #0 will be skipped on subsequent invocation.
//
// The next index is executed in parallel with the current index, with its results stored in a CollectingStream
// until the next iteration.
func withParallelizedStreamingIterableInCursor[T any, Q any](
ctx context.Context,
ci cursorInformation,
name string,
items []T,
parentStream dispatch.Stream[Q],
concurrencyLimit uint16,
handler func(ctx context.Context, ci cursorInformation, item T, stream dispatch.Stream[Q]) error,
) error {
// Check the cursor for a starting index, before which any items will be skipped.
startingIndex, err := ci.integerSectionValue(name)
if err != nil {
return err
}

itemsToRun := items[startingIndex:]
if len(itemsToRun) == 0 {
return nil
}

// Queue up each iteration's worth of items to be run by the task runner.
tr := newPreloadedTaskRunner(ctx, concurrencyLimit, len(itemsToRun))
stream, err := newParallelLimitedIndexedStream[Q](ctx, ci, parentStream, len(itemsToRun))
if err != nil {
return err
}

// Schedule a task to be invoked for each item to be run.
for taskIndex, item := range itemsToRun {
taskIndex := taskIndex
item := item
tr.add(func(ctx context.Context) error {
if ci.limits.hasExhaustedLimit() {
return nil
}

// Create an updated cursor referencing the current item's index, so that any items returned know to resume from this point.
currentCursor, err := ci.withOutgoingSection(name, strconv.Itoa(taskIndex+startingIndex))
if err != nil {
return err
}

// If not the first iteration, we need to clear incoming sections to ensure the iteration starts at the top
// of the cursor.
if taskIndex > 0 {
currentCursor = currentCursor.clearIncoming()
}

// Invoke the handler with the current item's index in the outgoing cursor, indicating that
// subsequent invocations should jump right to this item.
ictx, istream, icursor := stream.forTaskIndex(ctx, taskIndex, currentCursor)

err = handler(ictx, icursor, item, istream)
if err != nil {
return err
}

return stream.completedTaskIndex(taskIndex)
})
}

// NOTE: since branches can be canceled if they have reached limits, the context Canceled error is ignored here.
if err := tr.startAndWait(); !errors.Is(err, context.Canceled) {
return err
}

return nil
}

// parallelLimitedIndexedStream is a specialization of a dispatch.Stream that collects results from multiple
// tasks running in parallel, and emits them in the order of the tasks. The first task's results are directly
// emitted to the parent stream, while subsequent tasks' results are emitted in the defined order of the tasks
// to ensure cursors and limits work as expected.
type parallelLimitedIndexedStream[Q any] struct {
lock sync.Mutex

ctx context.Context
ci cursorInformation
parentStream dispatch.Stream[Q]

streamCount int
toPublishTaskIndex int
countingStream *dispatch.CountingDispatchStream[Q]
childStreams map[int]*dispatch.CollectingDispatchStream[Q]
completedTaskIndexes map[int]bool
}

func newParallelLimitedIndexedStream[Q any](
ctx context.Context,
ci cursorInformation,
parentStream dispatch.Stream[Q],
streamCount int,
) (*parallelLimitedIndexedStream[Q], error) {
if streamCount <= 0 {
return nil, spiceerrors.MustBugf("got invalid stream count")
}

return &parallelLimitedIndexedStream[Q]{
ctx: ctx,
ci: ci,
parentStream: parentStream,
countingStream: nil,
childStreams: map[int]*dispatch.CollectingDispatchStream[Q]{},
completedTaskIndexes: map[int]bool{},
toPublishTaskIndex: 0,
streamCount: streamCount,
}, nil
}

// forTaskIndex returns a new context, stream and cursor for invoking the task at the specific index and publishing its results.
func (ls *parallelLimitedIndexedStream[Q]) forTaskIndex(ctx context.Context, index int, currentCursor cursorInformation) (context.Context, dispatch.Stream[Q], cursorInformation) {
// Create a new cursor with cloned limits, because each child task which executes (in parallel) will need its own
// limit tracking. The overall limit on the original cursor is managed in completedTaskIndex.
childCI, cctx := currentCursor.withClonedLimits(ctx)

// If executing for the first index, it can stream directly to the parent stream, but we need to count the number
// of items streamed to adjust the overall limits.
if index == 0 {
countingStream := dispatch.NewCountingDispatchStream[Q](ls.parentStream)
ls.countingStream = countingStream
return cctx, countingStream, childCI
}

// Otherwise, create a child stream with an adjusted limits on the cursor. We have to clone the cursor's
// limits here to ensure that the child's publishing doesn't affect the first branch.
ls.lock.Lock()
defer ls.lock.Unlock()

childStream := dispatch.NewCollectingDispatchStream[Q](ctx)
ls.childStreams[index] = childStream
return cctx, childStream, childCI
}

// completedTaskIndex indicates the the task at the specific index has completed successfully and that its collected
// results should be published to the parent stream, so long as all previous tasks have been completed and published as well.
func (ls *parallelLimitedIndexedStream[Q]) completedTaskIndex(index int) error {
ls.lock.Lock()
defer ls.lock.Unlock()

// Mark the task as completed, but not yet published.
ls.completedTaskIndexes[index] = true

// If the overall limit has been reached, nothing more to do.
if ls.ci.limits.hasExhaustedLimit() {
return nil
}

// Otherwise, publish any results from previous completed tasks up, and including, this task. This loop ensures
// that the collected results for each task are published to the parent stream in the correct order.
for {
if !ls.completedTaskIndexes[ls.toPublishTaskIndex] {
return nil
}

if ls.toPublishTaskIndex == 0 {
// Remove the already emitted data from the overall limits.
done, err := ls.ci.limits.markAlreadyPublished(uint32(ls.countingStream.PublishedCount()))
if err != nil {
return err
}

defer done()
} else {
// Publish, to the parent stream, the results produced by the task and stored in the child stream.
childStream := ls.childStreams[ls.toPublishTaskIndex]
for _, result := range childStream.Results() {
ok, done := ls.ci.limits.prepareForPublishing()
defer done()

if !ok {
return nil
}

err := ls.parentStream.Publish(result)
if err != nil {
return err
}
}
ls.childStreams[ls.toPublishTaskIndex] = nil
}

ls.toPublishTaskIndex++
}
}
Loading

0 comments on commit fb656d6

Please sign in to comment.