-
Notifications
You must be signed in to change notification settings - Fork 289
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
internal/pagination: add ListPlugin and ListRefreshPlugin
These new methods can be used in places where the caller wants to automatically paginate over a resource that is returned together with a plugin, such as plugin hosts, host sets and storage buckets.
- Loading branch information
1 parent
6bdba65
commit 0d9ad3d
Showing
2 changed files
with
1,910 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,221 @@ | ||
package pagination | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/hashicorp/boundary/internal/boundary" | ||
"github.com/hashicorp/boundary/internal/errors" | ||
"github.com/hashicorp/boundary/internal/plugin" | ||
"github.com/hashicorp/boundary/internal/refreshtoken" | ||
) | ||
|
||
// ListPluginFilterFunc is a callback used to filter out resources that don't match | ||
// some criteria. The function must return true for items that should be included in the final | ||
// result. Returning an error results in an error being returned from the pagination. | ||
type ListPluginFilterFunc[T boundary.Resource] func(ctx context.Context, item T, plugin *plugin.Plugin) (bool, error) | ||
|
||
// ListPluginItemsFunc returns a slice of T that have been updated since prevPageLastItem. | ||
// If prevPageLastItem is empty, it returns a slice of T starting with the least recently updated. | ||
type ListPluginItemsFunc[T boundary.Resource] func(ctx context.Context, prevPageLastItem T, limit int) ([]T, *plugin.Plugin, error) | ||
|
||
// ListPluginRefreshItemsFunc returns a slice of T that have been updated since prevPageLastItem. | ||
// If prevPageLastItem is empty, it returns a slice of T that have been updated since the | ||
// item in the refresh token. | ||
type ListPluginRefreshItemsFunc[T boundary.Resource] func(ctx context.Context, tok *refreshtoken.Token, prevPageLastItem T, limit int) ([]T, *plugin.Plugin, error) | ||
|
||
// ListPlugin returns a ListResponse and a plugin. The response will contain at most a | ||
// number of items equal to the pageSize. Items are fetched using the | ||
// listItemsFn and then items are checked using the filterItemFn | ||
// to determine if they should be included in the response. | ||
// The response includes a new refresh token based on the grants and items. | ||
// The estimatedCountFn is used to provide an estimated total number of | ||
// items that can be returned by making additional requests using the provided | ||
// refresh token. The plugin is expected to be the same across invocations | ||
// of listPluginItemsFn. | ||
func ListPlugin[T boundary.Resource]( | ||
ctx context.Context, | ||
grantsHash []byte, | ||
pageSize int, | ||
filterPluginItemFn ListPluginFilterFunc[T], | ||
listPluginItemsFn ListPluginItemsFunc[T], | ||
estimatedCountFn EstimatedCountFunc, | ||
) (*ListResponse[T], *plugin.Plugin, error) { | ||
const op = "pagination.ListPlugin" | ||
|
||
if len(grantsHash) == 0 { | ||
return nil, nil, errors.New(ctx, errors.InvalidParameter, op, "missing grants hash") | ||
} | ||
if pageSize < 1 { | ||
return nil, nil, errors.New(ctx, errors.InvalidParameter, op, "page size must be at least 1") | ||
} | ||
if filterPluginItemFn == nil { | ||
return nil, nil, errors.New(ctx, errors.InvalidParameter, op, "missing filter plugin item callback") | ||
} | ||
if listPluginItemsFn == nil { | ||
return nil, nil, errors.New(ctx, errors.InvalidParameter, op, "missing list plugin items callback") | ||
} | ||
if estimatedCountFn == nil { | ||
return nil, nil, errors.New(ctx, errors.InvalidParameter, op, "missing estimated count callback") | ||
} | ||
|
||
items, plg, completeListing, err := listPlugin(ctx, pageSize, filterPluginItemFn, listPluginItemsFn) | ||
if err != nil { | ||
return nil, nil, errors.Wrap(ctx, err, op) | ||
} | ||
|
||
resp := &ListResponse[T]{ | ||
Items: items, | ||
CompleteListing: completeListing, | ||
EstimatedItemCount: len(items), | ||
} | ||
|
||
if !completeListing { | ||
// If this was not a complete listing, get an estimate | ||
// of the total items from the DB. | ||
var err error | ||
resp.EstimatedItemCount, err = estimatedCountFn(ctx) | ||
if err != nil { | ||
return nil, nil, errors.Wrap(ctx, err, op) | ||
} | ||
} | ||
|
||
if len(items) > 0 { | ||
resp.RefreshToken = refreshtoken.FromResource(items[len(items)-1], grantsHash) | ||
} | ||
|
||
return resp, plg, nil | ||
} | ||
|
||
// ListRefresh returns a ListResponse and a plugin. The response will contain at most a | ||
// number of items equal to the pageSize. Items are fetched using the | ||
// listRefreshItemsFn and then items are checked using the filterItemFn | ||
// to determine if they should be included in the response. | ||
// The response includes a new refresh token based on the grants and items. | ||
// The estimatedCountFn is used to provide an estimated total number of | ||
// items that can be returned by making additional requests using the provided | ||
// refresh token. The listDeletedIDsFn is used to list the IDs of any | ||
// resources that have been deleted since the refresh token was last used. | ||
// The plugin is expected to be the same across invocations of listPluginItemsFn. | ||
func ListPluginRefresh[T boundary.Resource]( | ||
ctx context.Context, | ||
grantsHash []byte, | ||
pageSize int, | ||
filterPluginItemFn ListPluginFilterFunc[T], | ||
listPluginRefreshItemsFn ListPluginRefreshItemsFunc[T], | ||
estimatedCountFn EstimatedCountFunc, | ||
listDeletedIDsFn ListDeletedIDsFunc, | ||
tok *refreshtoken.Token, | ||
) (*ListResponse[T], *plugin.Plugin, error) { | ||
const op = "pagination.ListPluginRefresh" | ||
|
||
if len(grantsHash) == 0 { | ||
return nil, nil, errors.New(ctx, errors.InvalidParameter, op, "missing grants hash") | ||
} | ||
if pageSize < 1 { | ||
return nil, nil, errors.New(ctx, errors.InvalidParameter, op, "page size must be at least 1") | ||
} | ||
if filterPluginItemFn == nil { | ||
return nil, nil, errors.New(ctx, errors.InvalidParameter, op, "missing filter plugin item callback") | ||
} | ||
if listPluginRefreshItemsFn == nil { | ||
return nil, nil, errors.New(ctx, errors.InvalidParameter, op, "missing list plugin refresh items callback") | ||
} | ||
if estimatedCountFn == nil { | ||
return nil, nil, errors.New(ctx, errors.InvalidParameter, op, "missing estimated count callback") | ||
} | ||
if listDeletedIDsFn == nil { | ||
return nil, nil, errors.New(ctx, errors.InvalidParameter, op, "missing list deleted IDs callback") | ||
} | ||
if tok == nil { | ||
return nil, nil, errors.New(ctx, errors.InvalidParameter, op, "missing refresh token") | ||
} | ||
|
||
deletedIds, transactionTimestamp, err := listDeletedIDsFn(ctx, tok.UpdatedTime) | ||
if err != nil { | ||
return nil, nil, errors.Wrap(ctx, err, op) | ||
} | ||
|
||
listItemsFn := func(ctx context.Context, prevPageLast T, limit int) ([]T, *plugin.Plugin, error) { | ||
return listPluginRefreshItemsFn(ctx, tok, prevPageLast, limit) | ||
} | ||
|
||
items, plg, completeListing, err := listPlugin(ctx, pageSize, filterPluginItemFn, listItemsFn) | ||
if err != nil { | ||
return nil, nil, errors.Wrap(ctx, err, op) | ||
} | ||
|
||
resp := &ListResponse[T]{ | ||
Items: items, | ||
CompleteListing: completeListing, | ||
DeletedIds: deletedIds, | ||
} | ||
|
||
resp.EstimatedItemCount, err = estimatedCountFn(ctx) | ||
if err != nil { | ||
return nil, nil, errors.Wrap(ctx, err, op) | ||
} | ||
|
||
if len(items) > 0 { | ||
resp.RefreshToken = tok.RefreshLastItem(items[len(items)-1], transactionTimestamp) | ||
} else { | ||
resp.RefreshToken = tok.Refresh(transactionTimestamp) | ||
} | ||
|
||
return resp, plg, nil | ||
} | ||
|
||
func listPlugin[T boundary.Resource]( | ||
ctx context.Context, | ||
pageSize int, | ||
filterItemFn ListPluginFilterFunc[T], | ||
listItemsFn ListPluginItemsFunc[T], | ||
) ([]T, *plugin.Plugin, bool, error) { | ||
const op = "pagination.listPlugin" | ||
|
||
var lastItem T | ||
var plg *plugin.Plugin | ||
limit := pageSize + 1 | ||
items := make([]T, 0, limit) | ||
dbLoop: | ||
for { | ||
// Request another page from the DB until we fill the final items | ||
page, newPlg, err := listItemsFn(ctx, lastItem, limit) | ||
if err != nil { | ||
return nil, nil, false, errors.Wrap(ctx, err, op) | ||
} | ||
if plg == nil { | ||
plg = newPlg | ||
} else if newPlg != nil && plg.PublicId != newPlg.PublicId { | ||
return nil, nil, false, errors.New(ctx, errors.Internal, op, "plugin changed between list invocations") | ||
} | ||
for _, item := range page { | ||
ok, err := filterItemFn(ctx, item, plg) | ||
if err != nil { | ||
return nil, nil, false, errors.Wrap(ctx, err, op) | ||
} | ||
if ok { | ||
items = append(items, item) | ||
// If we filled the items after filtering, | ||
// we're done. | ||
if len(items) == cap(items) { | ||
break dbLoop | ||
} | ||
} | ||
} | ||
// If the current page was shorter than the limit, stop iterating | ||
if len(page) < limit { | ||
break dbLoop | ||
} | ||
|
||
lastItem = page[len(page)-1] | ||
} | ||
// If we couldn't fill the items, it was a complete listing. | ||
completeListing := len(items) < cap(items) | ||
if !completeListing { | ||
// Items is of size pageSize+1, so | ||
// truncate if it was filled. | ||
items = items[:pageSize] | ||
} | ||
|
||
return items, plg, completeListing, nil | ||
} |
Oops, something went wrong.