Skip to content

Commit

Permalink
refact(server): rename Downstreamers interface to Graph
Browse files Browse the repository at this point in the history
  • Loading branch information
irenarindos authored and hugoghx committed Jan 7, 2025
1 parent 596d1ad commit 458de27
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 31 deletions.
6 changes: 3 additions & 3 deletions internal/daemon/cluster/handlers/worker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type workerServiceServer struct {
workerAuthRepoFn common.WorkerAuthRepoStorageFactory
sessionRepoFn session.RepositoryFactory
connectionRepoFn common.ConnectionRepoFactory
downstreams downstream.Downstreamers
downstreams downstream.Graph
updateTimes *sync.Map
kms *kms.Kms
livenessTimeToStale *atomic.Int64
Expand Down Expand Up @@ -67,7 +67,7 @@ var (
)

// singleHopConnectionRoute returns a route consisting of the singlehop worker (the root worker id)
func singleHopConnectionRoute(_ context.Context, w *server.Worker, _ *session.Session, _ *server.Repository, _ downstream.Downstreamers) ([]string, error) {
func singleHopConnectionRoute(_ context.Context, w *server.Worker, _ *session.Session, _ *server.Repository, _ downstream.Graph) ([]string, error) {
return []string{w.GetPublicId()}, nil
}

Expand All @@ -76,7 +76,7 @@ func NewWorkerServiceServer(
workerAuthRepoFn common.WorkerAuthRepoStorageFactory,
sessionRepoFn session.RepositoryFactory,
connectionRepoFn common.ConnectionRepoFactory,
downstreams downstream.Downstreamers,
downstreams downstream.Graph,
updateTimes *sync.Map,
kms *kms.Kms,
livenessTimeToStale *atomic.Int64,
Expand Down
10 changes: 5 additions & 5 deletions internal/daemon/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ type downstreamWorkersTicker interface {
var (
downstreamReceiverFactory func(*atomic.Int64) (downstreamReceiver, error)

downstreamersFactory func(context.Context, string, string) (downstream.Downstreamers, error)
downstreamWorkersTickerFactory func(context.Context, string, string, downstream.Downstreamers, downstreamReceiver, *atomic.Int64) (downstreamWorkersTicker, error)
graphFactory func(context.Context, string, string) (downstream.Graph, error)
downstreamWorkersTickerFactory func(context.Context, string, string, downstream.Graph, downstreamReceiver, *atomic.Int64) (downstreamWorkersTicker, error)
commandClientFactory func(context.Context, *Controller) error
extControllerFactory func(ctx context.Context, c *Controller, r db.Reader, w db.Writer, kms *kms.Kms) (intglobals.ControllerExtension, error)
)
Expand All @@ -110,7 +110,7 @@ type Controller struct {
workerAuthCache *sync.Map

// downstream workers and routes to those workers
downstreamWorkers downstream.Downstreamers
downstreamWorkers downstream.Graph
downstreamConns downstreamReceiver

apiListeners []*base.ServerListener
Expand Down Expand Up @@ -495,9 +495,9 @@ func New(ctx context.Context, conf *Config) (*Controller, error) {
event.WriteSysEvent(ctx, op, "unable to ensure worker auth roots exist, may be due to multiple controllers starting at once, continuing")
}

if downstreamersFactory != nil {
if graphFactory != nil {
boundVer := version.Get().VersionNumber()
c.downstreamWorkers, err = downstreamersFactory(ctx, "root", boundVer)
c.downstreamWorkers, err = graphFactory(ctx, "root", boundVer)
if err != nil {
return nil, fmt.Errorf("unable to initialize downstream workers graph: %w", err)
}
Expand Down
7 changes: 4 additions & 3 deletions internal/daemon/controller/downstream/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@

package downstream

// Downstreamers provides at least a minimum interface that must be met by a
// Graph provides at least a minimum interface that must be met by a
// Controller.downstreamWorkers field which is far better than allowing any (empty
// interface)
type Downstreamers interface {
// RootId returns the root ID of the downstreamers' graph
// This is used to interact with downstream workers DAG
type Graph interface {
// RootId returns the root ID of the graph
RootId() string
}
4 changes: 2 additions & 2 deletions internal/daemon/controller/handlers/targets/target_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ type Service struct {
staticHostRepoFn common.StaticRepoFactory
vaultCredRepoFn common.VaultCredentialRepoFactory
staticCredRepoFn common.StaticCredentialRepoFactory
downstreams downstream.Downstreamers
downstreams downstream.Graph
kmsCache *kms.Kms
workerStatusGracePeriod *atomic.Int64
maxPageSize uint
Expand All @@ -152,7 +152,7 @@ func NewService(
vaultCredRepoFn common.VaultCredentialRepoFactory,
staticCredRepoFn common.StaticCredentialRepoFactory,
aliasRepoFn common.TargetAliasRepoFactory,
downstreams downstream.Downstreamers,
downstreams downstream.Graph,
workerStatusGracePeriod *atomic.Int64,
maxPageSize uint,
controllerExt intglobals.ControllerExtension,
Expand Down
6 changes: 3 additions & 3 deletions internal/daemon/controller/handlers/workers/worker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func init() {
action.RegisterResource(resource.Worker, IdActions, CollectionActions)
}

func emptyDownstreamWorkers(context.Context, string, downstream.Downstreamers) []string {
func emptyDownstreamWorkers(context.Context, string, downstream.Graph) []string {
return nil
}

Expand All @@ -97,14 +97,14 @@ type Service struct {
repoFn common.ServersRepoFactory
workerAuthFn common.WorkerAuthRepoStorageFactory
iamRepoFn common.IamRepoFactory
downstreams downstream.Downstreamers
downstreams downstream.Graph
}

var _ pbs.WorkerServiceServer = (*Service)(nil)

// NewService returns a worker service which handles worker related requests to boundary.
func NewService(ctx context.Context, repo common.ServersRepoFactory, iamRepoFn common.IamRepoFactory,
workerAuthFn common.WorkerAuthRepoStorageFactory, ds downstream.Downstreamers,
workerAuthFn common.WorkerAuthRepoStorageFactory, ds downstream.Graph,
) (Service, error) {
const op = "workers.NewService"
if repo == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestGet(t *testing.T) {
downstreamWorkers = oldDownstramFn
})
connectedDownstreams := []string{"first", "second", "third"}
downstreamWorkers = func(_ context.Context, id string, _ downstream.Downstreamers) []string {
downstreamWorkers = func(_ context.Context, id string, _ downstream.Graph) []string {
return connectedDownstreams
}

Expand Down Expand Up @@ -333,7 +333,7 @@ func TestList(t *testing.T) {
downstreamWorkers = oldDownstramFn
})
connectedDownstreams := []string{"first", "second", "third"}
downstreamWorkers = func(_ context.Context, id string, _ downstream.Downstreamers) []string {
downstreamWorkers = func(_ context.Context, id string, _ downstream.Graph) []string {
return connectedDownstreams
}

Expand Down Expand Up @@ -592,7 +592,7 @@ func TestUpdate(t *testing.T) {
downstreamWorkers = oldDownstramFn
})
connectedDownstreams := []string{"first", "second", "third"}
downstreamWorkers = func(_ context.Context, id string, _ downstream.Downstreamers) []string {
downstreamWorkers = func(_ context.Context, id string, _ downstream.Graph) []string {
return connectedDownstreams
}

Expand Down
4 changes: 2 additions & 2 deletions internal/daemon/worker/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

var firstStatusCheckPostHooks []func(context.Context, *Worker) error

var downstreamWorkersFactory func(ctx context.Context, workerId string, ver string) (downstreamers, error)
var downstreamWorkersFactory func(ctx context.Context, workerId string, ver string) (graph, error)

var checkHCPBUpstreams func(w *Worker) bool

Expand Down Expand Up @@ -380,7 +380,7 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess
w.conf.ServerSideShutdownCh <- struct{}{}
return
}
w.downstreamWorkers.Store(&downstreamersContainer{downstreamers: downstreamWorkers})
w.downstreamWorkers.Store(&graphContainer{graph: downstreamWorkers})
}
for _, fn := range firstStatusCheckPostHooks {
if err := fn(cancelCtx, w); err != nil {
Expand Down
17 changes: 9 additions & 8 deletions internal/daemon/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,19 @@ type reverseConnReceiver interface {
StartProcessingPendingConnections(context.Context, func() string) error
}

// downstreamersContainer is a struct that exists purely so we can perform
// graphContainer is a struct that exists purely so we can perform
// atomic swap operations on the interface, to avoid/fix data races in tests
// (and any other potential location).
type downstreamersContainer struct {
downstreamers
// This is used to interact with downstream workers DAG
type graphContainer struct {
graph
}

// downstreamers provides at least a minimum interface that must be met by a
// graph provides at least a minimum interface that must be met by a
// Worker.downstreamWorkers field which is far better than allowing any (empty
// interface)
type downstreamers interface {
// RootId returns the root ID of the downstreamers' graph
type graph interface {
// RootId returns the root ID of the graph
RootId() string
}

Expand Down Expand Up @@ -189,7 +190,7 @@ type Worker struct {
RecordingStorage storage.RecordingStorage

// downstream workers and routes to those workers
downstreamWorkers *atomic.Pointer[downstreamersContainer]
downstreamWorkers *atomic.Pointer[graphContainer]
downstreamReceiver reverseConnReceiver

// Timing variables. These are atomics for SIGHUP support, and are int64
Expand Down Expand Up @@ -243,7 +244,7 @@ func New(ctx context.Context, conf *Config) (*Worker, error) {
statusCallTimeoutDuration: new(atomic.Int64),
getDownstreamWorkersTimeoutDuration: new(atomic.Int64),
upstreamConnectionState: new(atomic.Value),
downstreamWorkers: new(atomic.Pointer[downstreamersContainer]),
downstreamWorkers: new(atomic.Pointer[graphContainer]),
}

w.operationalState.Store(server.UnknownOperationalState)
Expand Down
4 changes: 2 additions & 2 deletions internal/server/repository_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ func (r *Repository) SelectSessionWorkers(ctx context.Context,
host string,
ce globals.ControllerExtension,
sbFn StorageBucketFilterCredIdFn,
ds downstream.Downstreamers,
ds downstream.Graph,
) ([]WorkerAddress, string, error) {
const op = "server.(Repository).SelectSessionWorkers"

Expand Down Expand Up @@ -970,7 +970,7 @@ func filterWorkers(
_ string,
_ globals.ControllerExtension,
_ StorageBucketFilterCredIdFn,
_ downstream.Downstreamers,
_ downstream.Graph,
_ ...target.Option,
) (WorkerList, *Worker, error) {
const op = "server.filterWorkers"
Expand Down

0 comments on commit 458de27

Please sign in to comment.