Skip to content

Commit

Permalink
internal/server: add pagination support to workers
Browse files Browse the repository at this point in the history
  • Loading branch information
johanbrandhorst committed Oct 31, 2023
1 parent 6b77acf commit 7496011
Show file tree
Hide file tree
Showing 22 changed files with 1,723 additions and 417 deletions.
4 changes: 2 additions & 2 deletions internal/daemon/cluster/handlers/worker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (ws *workerServiceServer) Status(ctx context.Context, req *pbs.StatusReques

authorizedDownstreams := &pbs.AuthorizedDownstreamWorkerList{}
if len(req.GetConnectedWorkerPublicIds()) > 0 {
knownConnectedWorkers, err := serverRepo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithWorkerPool(req.GetConnectedWorkerPublicIds()), server.WithLiveness(-1))
knownConnectedWorkers, err := serverRepo.ListWorkersUnpaginated(ctx, []string{scope.Global.String()}, server.WithWorkerPool(req.GetConnectedWorkerPublicIds()), server.WithLiveness(-1))
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error getting known connected worker ids"))
return &pbs.StatusResponse{}, status.Errorf(codes.Internal, "Error getting known connected worker ids: %v", err)
Expand Down Expand Up @@ -342,7 +342,7 @@ func (ws *workerServiceServer) ListHcpbWorkers(ctx context.Context, req *pbs.Lis
if err != nil {
return nil, status.Errorf(codes.Internal, "Error getting servers repo: %v", err)
}
workers, err := serversRepo.ListWorkers(ctx, []string{scope.Global.String()},
workers, err := serversRepo.ListWorkersUnpaginated(ctx, []string{scope.Global.String()},
// We use the livenessTimeToStale here instead of WorkerStatusGracePeriod
// since WorkerStatusGracePeriod is more for deciding which workers
// should be used for session proxying, but here we care about providing
Expand Down
10 changes: 8 additions & 2 deletions internal/daemon/controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,14 @@ func (c *Controller) registerGrpcServices(s *grpc.Server) error {
services.RegisterCredentialLibraryServiceServer(s, cl)
}
if _, ok := currentServices[services.WorkerService_ServiceDesc.ServiceName]; !ok {
ws, err := workers.NewService(c.baseContext, c.ServersRepoFn, c.IamRepoFn, c.WorkerAuthRepoStorageFn,
c.downstreamWorkers)
ws, err := workers.NewService(
c.baseContext,
c.ServersRepoFn,
c.IamRepoFn,
c.WorkerAuthRepoStorageFn,
c.downstreamWorkers,
1000,
)
if err != nil {
return fmt.Errorf("failed to create worker handler service: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,7 @@ func (s Service) AuthorizeSession(ctx context.Context, req *pbs.AuthorizeSession
}

// Get workers and filter down to ones that can service this request
selectedWorkers, err := serversRepo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithLiveness(time.Duration(s.workerStatusGracePeriod.Load())))
selectedWorkers, err := serversRepo.ListWorkersUnpaginated(ctx, []string{scope.Global.String()}, server.WithLiveness(time.Duration(s.workerStatusGracePeriod.Load())))
if err != nil {
return nil, err
}
Expand Down
165 changes: 129 additions & 36 deletions internal/daemon/controller/handlers/workers/worker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,34 @@ import (
"github.com/hashicorp/boundary/internal/daemon/controller/handlers"
"github.com/hashicorp/boundary/internal/errors"
pbs "github.com/hashicorp/boundary/internal/gen/controller/api/services"
"github.com/hashicorp/boundary/internal/pagination"
"github.com/hashicorp/boundary/internal/perms"
"github.com/hashicorp/boundary/internal/refreshtoken"
"github.com/hashicorp/boundary/internal/requests"
"github.com/hashicorp/boundary/internal/server"
"github.com/hashicorp/boundary/internal/server/store"
"github.com/hashicorp/boundary/internal/types/action"
"github.com/hashicorp/boundary/internal/types/resource"
"github.com/hashicorp/boundary/internal/types/scope"
"github.com/hashicorp/boundary/internal/util"
"github.com/hashicorp/boundary/sdk/pbs/controller/api/resources/scopes"
pb "github.com/hashicorp/boundary/sdk/pbs/controller/api/resources/workers"
"github.com/hashicorp/go-secure-stdlib/strutil"
"github.com/hashicorp/nodeenrollment/types"
"github.com/mr-tron/base58"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb"
)

const (
PkiWorkerType = "pki"
KmsWorkerType = "kms"
// The default max page size is used when one is not
// provided to NewService.
defaultMaxPageSize = 1000
)

var (
Expand Down Expand Up @@ -94,13 +101,19 @@ type Service struct {
workerAuthFn common.WorkerAuthRepoStorageFactory
iamRepoFn common.IamRepoFactory
downstreams common.Downstreamers
maxPageSize uint
}

var _ pbs.WorkerServiceServer = (*Service)(nil)

// NewService returns a worker service which handles worker related requests to boundary.
func NewService(ctx context.Context, repo common.ServersRepoFactory, iamRepoFn common.IamRepoFactory,
workerAuthFn common.WorkerAuthRepoStorageFactory, ds common.Downstreamers,
func NewService(
ctx context.Context,
repo common.ServersRepoFactory,
iamRepoFn common.IamRepoFactory,
workerAuthFn common.WorkerAuthRepoStorageFactory,
ds common.Downstreamers,
maxPageSize uint,
) (Service, error) {
const op = "workers.NewService"
if repo == nil {
Expand All @@ -112,11 +125,15 @@ func NewService(ctx context.Context, repo common.ServersRepoFactory, iamRepoFn c
if workerAuthFn == nil {
return Service{}, errors.New(ctx, errors.InvalidParameter, op, "missing worker auth repository")
}
return Service{repoFn: repo, iamRepoFn: iamRepoFn, workerAuthFn: workerAuthFn, downstreams: ds}, nil
if maxPageSize == 0 {
maxPageSize = uint(defaultMaxPageSize)
}
return Service{repoFn: repo, iamRepoFn: iamRepoFn, workerAuthFn: workerAuthFn, downstreams: ds, maxPageSize: maxPageSize}, nil
}

// ListWorkers implements the interface pbs.WorkerServiceServer.
func (s Service) ListWorkers(ctx context.Context, req *pbs.ListWorkersRequest) (*pbs.ListWorkersResponse, error) {
const op = "workers.(Service).ListWorkers"
if err := validateListRequest(ctx, req); err != nil {
return nil, err
}
Expand Down Expand Up @@ -144,50 +161,115 @@ func (s Service) ListWorkers(ctx context.Context, req *pbs.ListWorkersRequest) (
return &pbs.ListWorkersResponse{}, nil
}

ul, err := s.listFromRepo(ctx, scopeIds)
pageSize := int(s.maxPageSize)
// Use the requested page size only if it is smaller than
// the configured max.
if req.GetPageSize() != 0 && uint(req.GetPageSize()) < s.maxPageSize {
pageSize = int(req.GetPageSize())
}
filter, err := handlers.NewFilter(ctx, req.GetFilter())
if err != nil {
return nil, err
}
if len(ul) == 0 {
return &pbs.ListWorkersResponse{}, nil
}

filter, err := handlers.NewFilter(ctx, req.GetFilter())
repo, err := s.repoFn()
if err != nil {
return nil, err
}
finalItems := make([]*pb.Worker, 0, len(ul))
res := perms.Resource{
Type: resource.Worker,
}
for _, item := range ul {
res.Id = item.GetPublicId()
res.ScopeId = item.GetScopeId()
authorizedActions := authResults.FetchActionSetForId(ctx, item.GetPublicId(), IdActions, auth.WithResource(&res)).Strings()
if len(authorizedActions) == 0 {
continue
filterItemFn := func(ctx context.Context, item *server.Worker) (bool, error) {
outputOpts, ok := newOutputOpts(ctx, item, authResults, scopeInfoMap)
if !ok {
return false, nil
}
pbItem, err := s.toProto(ctx, item, outputOpts...)
if err != nil {
return false, err
}
return filter.Match(pbItem), nil
}
grantsHash, err := authResults.GrantsHash(ctx)
if err != nil {
return nil, err
}

outputFields := authResults.FetchOutputFields(res, action.List).SelfOrDefaults(authResults.UserId)
outputOpts := make([]handlers.Option, 0, 3)
outputOpts = append(outputOpts, handlers.WithOutputFields(outputFields))
if outputFields.Has(globals.ScopeField) {
outputOpts = append(outputOpts, handlers.WithScope(scopeInfoMap[item.GetScopeId()]))
var listResp *pagination.ListResponse[*server.Worker]
if req.GetRefreshToken() == "" {
listResp, err = server.ListWorkers(ctx, grantsHash, pageSize, filterItemFn, repo, scopeIds)
if err != nil {
return nil, err
}
if outputFields.Has(globals.AuthorizedActionsField) {
outputOpts = append(outputOpts, handlers.WithAuthorizedActions(authorizedActions))
} else {
rt, err := handlers.ParseRefreshToken(ctx, req.GetRefreshToken())
if err != nil {
return nil, err
}
// We're doing the conversion from the protobuf types to the
// domain types here rather than in the domain so that the domain
// doesn't need to know about the protobuf types.
domainRefreshToken, err := refreshtoken.New(
ctx,
rt.CreatedTime.AsTime(),
rt.UpdatedTime.AsTime(),
handlers.RefreshTokenResourceToResource(rt.ResourceType),
rt.GrantsHash,
rt.LastItemId,
rt.LastItemUpdatedTime.AsTime(),
)
if err != nil {
return nil, err
}
if err := domainRefreshToken.Validate(ctx, resource.Worker, grantsHash); err != nil {
return nil, err
}
listResp, err = server.ListWorkersRefresh(ctx, grantsHash, pageSize, filterItemFn, domainRefreshToken, repo, scopeIds)
if err != nil {
return nil, err
}
}

finalItems := make([]*pb.Worker, 0, len(listResp.Items))
for _, item := range listResp.Items {
outputOpts, ok := newOutputOpts(ctx, item, authResults, scopeInfoMap)
if !ok {
continue
}
item, err := s.toProto(ctx, item, outputOpts...)
if err != nil {
return nil, err
}
finalItems = append(finalItems, item)
}
respType := "delta"
if listResp.CompleteListing {
respType = "complete"
}
resp := &pbs.ListWorkersResponse{
Items: finalItems,
EstItemCount: uint32(listResp.EstimatedItemCount),
RemovedIds: listResp.DeletedIds,
ResponseType: respType,
SortBy: "updated_time",
SortDir: "asc",
}

if filter.Match(item) {
finalItems = append(finalItems, item)
if listResp.RefreshToken != nil {
if listResp.RefreshToken.ResourceType != resource.Worker {
return nil, errors.New(ctx, errors.Internal, op, "refresh token resource type does not match service resource type")
}
resp.RefreshToken, err = handlers.MarshalRefreshToken(ctx, &pbs.ListRefreshToken{
CreatedTime: timestamppb.New(listResp.RefreshToken.CreatedTime),
UpdatedTime: timestamppb.New(listResp.RefreshToken.UpdatedTime),
ResourceType: pbs.ResourceType_RESOURCE_TYPE_WORKER,
GrantsHash: listResp.RefreshToken.GrantsHash,
LastItemId: listResp.RefreshToken.LastItemId,
LastItemUpdatedTime: timestamppb.New(listResp.RefreshToken.LastItemUpdatedTime),
})
if err != nil {
return nil, err
}
}
return &pbs.ListWorkersResponse{Items: finalItems}, nil

return resp, nil
}

// GetWorker implements the interface pbs.WorkerServiceServer.
Expand Down Expand Up @@ -572,16 +654,27 @@ func (s Service) ReinitializeCertificateAuthority(ctx context.Context, req *pbs.
return &pbs.ReinitializeCertificateAuthorityResponse{Item: ca}, nil
}

func (s Service) listFromRepo(ctx context.Context, scopeIds []string) ([]*server.Worker, error) {
repo, err := s.repoFn()
if err != nil {
return nil, err
func newOutputOpts(ctx context.Context, item *server.Worker, authResults auth.VerifyResults, scopeInfoMap map[string]*scopes.ScopeInfo) ([]handlers.Option, bool) {
res := perms.Resource{
Type: resource.Worker,
Id: item.GetPublicId(),
ScopeId: item.GetScopeId(),
}
wl, err := repo.ListWorkers(ctx, scopeIds, server.WithLiveness(-1), server.WithLimit(-1))
if err != nil {
return nil, err
authorizedActions := authResults.FetchActionSetForId(ctx, item.GetPublicId(), IdActions, auth.WithResource(&res)).Strings()
if len(authorizedActions) == 0 {
return nil, false
}

outputFields := authResults.FetchOutputFields(res, action.List).SelfOrDefaults(authResults.UserId)
outputOpts := make([]handlers.Option, 0, 3)
outputOpts = append(outputOpts, handlers.WithOutputFields(outputFields))
if outputFields.Has(globals.ScopeField) {
outputOpts = append(outputOpts, handlers.WithScope(scopeInfoMap[item.GetScopeId()]))
}
if outputFields.Has(globals.AuthorizedActionsField) {
outputOpts = append(outputOpts, handlers.WithAuthorizedActions(authorizedActions))
}
return wl, nil
return outputOpts, true
}

func (s Service) getFromRepo(ctx context.Context, id string) (*server.Worker, error) {
Expand Down
Loading

0 comments on commit 7496011

Please sign in to comment.