Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
internal/pagination: add ListPlugins and ListPluginsRefresh
Browse files Browse the repository at this point in the history
These new methods can be used in places where the caller wants
to automatically paginate over a resource that is returned
together with a slice of plugins, such as host catalogs.
johanbrandhorst committed Nov 9, 2023
1 parent 0d9ad3d commit 6877c05
Showing 2 changed files with 1,996 additions and 0 deletions.
225 changes: 225 additions & 0 deletions internal/pagination/pagination_plugins.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package pagination

import (
"context"

"github.com/hashicorp/boundary/internal/boundary"
"github.com/hashicorp/boundary/internal/errors"
"github.com/hashicorp/boundary/internal/plugin"
"github.com/hashicorp/boundary/internal/refreshtoken"
)

// ListPluginsFilterFunc is a callback used to filter out resources that don't match
// some criteria. The function must return true for items that should be included in the final
// result. Returning an error results in an error being returned from the pagination.
type ListPluginsFilterFunc[T boundary.Resource] func(ctx context.Context, item T, plugins []*plugin.Plugin) (bool, error)

// ListPluginsItemsFunc returns a slice of T that have been updated since prevPageLastItem.
// If prevPageLastItem is empty, it returns a slice of T starting with the least recently updated.
type ListPluginsItemsFunc[T boundary.Resource] func(ctx context.Context, prevPageLastItem T, limit int) ([]T, []*plugin.Plugin, error)

// ListPluginsRefreshItemsFunc returns a slice of T that have been updated since prevPageLastItem.
// If prevPageLastItem is empty, it returns a slice of T that have been updated since the
// item in the refresh token.
type ListPluginsRefreshItemsFunc[T boundary.Resource] func(ctx context.Context, tok *refreshtoken.Token, prevPageLastItem T, limit int) ([]T, []*plugin.Plugin, error)

// ListPlugins returns a ListResponse and a list of plugins associated with the resources.
// The response will contain at most a number of items equal to the pageSize.
// Items are fetched using the listPluginsItemsFn and then items are checked using the filterPluginItemFn
// to determine if they should be included in the response.
// The response includes a new refresh token based on the grants and items.
// The estimatedCountFn is used to provide an estimated total number of
// items that can be returned by making additional requests using the provided
// refresh token. The list of plugins is deduplicated.
func ListPlugins[T boundary.Resource](
ctx context.Context,
grantsHash []byte,
pageSize int,
filterPluginItemFn ListPluginsFilterFunc[T],
listPluginsItemsFn ListPluginsItemsFunc[T],
estimatedCountFn EstimatedCountFunc,
) (*ListResponse[T], []*plugin.Plugin, error) {
const op = "pagination.ListPlugins"

if len(grantsHash) == 0 {
return nil, nil, errors.New(ctx, errors.InvalidParameter, op, "missing grants hash")
}
if pageSize < 1 {
return nil, nil, errors.New(ctx, errors.InvalidParameter, op, "page size must be at least 1")
}
if filterPluginItemFn == nil {
return nil, nil, errors.New(ctx, errors.InvalidParameter, op, "missing filter plugin item callback")
}
if listPluginsItemsFn == nil {
return nil, nil, errors.New(ctx, errors.InvalidParameter, op, "missing list plugin items callback")
}
if estimatedCountFn == nil {
return nil, nil, errors.New(ctx, errors.InvalidParameter, op, "missing estimated count callback")
}

items, plgs, completeListing, err := listPlugins(ctx, pageSize, filterPluginItemFn, listPluginsItemsFn)
if err != nil {
return nil, nil, errors.Wrap(ctx, err, op)
}

resp := &ListResponse[T]{
Items: items,
CompleteListing: completeListing,
EstimatedItemCount: len(items),
}

if !completeListing {
// If this was not a complete listing, get an estimate
// of the total items from the DB.
var err error
resp.EstimatedItemCount, err = estimatedCountFn(ctx)
if err != nil {
return nil, nil, errors.Wrap(ctx, err, op)
}
}

if len(items) > 0 {
resp.RefreshToken = refreshtoken.FromResource(items[len(items)-1], grantsHash)
}

return resp, plgs, nil
}

// ListRefresh returns a ListResponse returns a ListResponse and a list of plugins associated with the resources.
// The response will contain at most a number of items equal to the pageSize.
// Items are fetched using the listPluginsRefreshItemsFn and then items are checked using the filterPluginItemFn
// to determine if they should be included in the response.
// The response includes a new refresh token based on the grants and items.
// The estimatedCountFn is used to provide an estimated total number of
// items that can be returned by making additional requests using the provided
// refresh token. The listDeletedIDsFn is used to list the IDs of any
// resources that have been deleted since the refresh token was last used.
// The list of plugins is deduplicated.
func ListPluginsRefresh[T boundary.Resource](
ctx context.Context,
grantsHash []byte,
pageSize int,
filterPluginItemFn ListPluginsFilterFunc[T],
listPluginsRefreshItemsFn ListPluginsRefreshItemsFunc[T],
estimatedCountFn EstimatedCountFunc,
listDeletedIDsFn ListDeletedIDsFunc,
tok *refreshtoken.Token,
) (*ListResponse[T], []*plugin.Plugin, error) {
const op = "pagination.ListPluginsRefresh"

if len(grantsHash) == 0 {
return nil, nil, errors.New(ctx, errors.InvalidParameter, op, "missing grants hash")
}
if pageSize < 1 {
return nil, nil, errors.New(ctx, errors.InvalidParameter, op, "page size must be at least 1")
}
if filterPluginItemFn == nil {
return nil, nil, errors.New(ctx, errors.InvalidParameter, op, "missing filter plugin item callback")
}
if listPluginsRefreshItemsFn == nil {
return nil, nil, errors.New(ctx, errors.InvalidParameter, op, "missing list plugin refresh items callback")
}
if estimatedCountFn == nil {
return nil, nil, errors.New(ctx, errors.InvalidParameter, op, "missing estimated count callback")
}
if listDeletedIDsFn == nil {
return nil, nil, errors.New(ctx, errors.InvalidParameter, op, "missing list deleted IDs callback")
}
if tok == nil {
return nil, nil, errors.New(ctx, errors.InvalidParameter, op, "missing refresh token")
}

deletedIds, transactionTimestamp, err := listDeletedIDsFn(ctx, tok.UpdatedTime)
if err != nil {
return nil, nil, errors.Wrap(ctx, err, op)
}

listItemsFn := func(ctx context.Context, prevPageLast T, limit int) ([]T, []*plugin.Plugin, error) {
return listPluginsRefreshItemsFn(ctx, tok, prevPageLast, limit)
}

items, plgs, completeListing, err := listPlugins(ctx, pageSize, filterPluginItemFn, listItemsFn)
if err != nil {
return nil, nil, errors.Wrap(ctx, err, op)
}

resp := &ListResponse[T]{
Items: items,
CompleteListing: completeListing,
DeletedIds: deletedIds,
}

resp.EstimatedItemCount, err = estimatedCountFn(ctx)
if err != nil {
return nil, nil, errors.Wrap(ctx, err, op)
}

if len(items) > 0 {
resp.RefreshToken = tok.RefreshLastItem(items[len(items)-1], transactionTimestamp)
} else {
resp.RefreshToken = tok.Refresh(transactionTimestamp)
}

return resp, plgs, nil
}

func listPlugins[T boundary.Resource](
ctx context.Context,
pageSize int,
filterItemFn ListPluginsFilterFunc[T],
listItemsFn ListPluginsItemsFunc[T],
) ([]T, []*plugin.Plugin, bool, error) {
const op = "pagination.listPlugins"

var lastItem T
var plgs []*plugin.Plugin
seenPlugins := map[string]struct{}{}
limit := pageSize + 1
items := make([]T, 0, limit)
dbLoop:
for {
// Request another page from the DB until we fill the final items
page, newPlgs, err := listItemsFn(ctx, lastItem, limit)
if err != nil {
return nil, nil, false, errors.Wrap(ctx, err, op)
}
for _, plg := range newPlgs {
if _, ok := seenPlugins[plg.PublicId]; !ok {
seenPlugins[plg.PublicId] = struct{}{}
plgs = append(plgs, plg)
}
}
for _, item := range page {
// Note: pass the plugins returned together with
// the page (newPlgs), not plgs, since these items
// are associated with those plugins.
ok, err := filterItemFn(ctx, item, newPlgs)
if err != nil {
return nil, nil, false, errors.Wrap(ctx, err, op)
}
if ok {
items = append(items, item)
// If we filled the items after filtering,
// we're done.
if len(items) == cap(items) {
break dbLoop
}
}
}
// If the current page was shorter than the limit, stop iterating
if len(page) < limit {
break dbLoop
}

lastItem = page[len(page)-1]
}
// If we couldn't fill the items, it was a complete listing.
completeListing := len(items) < cap(items)
if !completeListing {
// Items is of size pageSize+1, so
// truncate if it was filled.
items = items[:pageSize]
}

return items, plgs, completeListing, nil
}
1,771 changes: 1,771 additions & 0 deletions internal/pagination/pagination_plugins_test.go

Large diffs are not rendered by default.

0 comments on commit 6877c05

Please sign in to comment.