Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
johanbrandhorst committed Nov 22, 2024
1 parent ffd0d2a commit 466b58c
Show file tree
Hide file tree
Showing 23 changed files with 2,407 additions and 1,014 deletions.
569 changes: 417 additions & 152 deletions internal/daemon/cluster/handlers/worker_service.go

Large diffs are not rendered by default.

34 changes: 1 addition & 33 deletions internal/daemon/cluster/handlers/worker_service_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ func TestStatus(t *testing.T) {
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{},
},
},
Expand Down Expand Up @@ -228,7 +227,6 @@ func TestStatus(t *testing.T) {
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{},
JobsRequests: []*pbs.JobChangeRequest{
{
Expand Down Expand Up @@ -336,7 +334,6 @@ func TestStatus(t *testing.T) {
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{},
JobsRequests: []*pbs.JobChangeRequest{
{
Expand Down Expand Up @@ -414,7 +411,6 @@ func TestStatus(t *testing.T) {
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{},
},
},
Expand Down Expand Up @@ -468,7 +464,6 @@ func TestStatus(t *testing.T) {
pbs.SessionJobInfo{},
pbs.MonitorSessionJobInfo{},
pbs.Connection{},
pbs.AuthorizedWorkerList{},
pbs.AuthorizedDownstreamWorkerList{},
),
cmpopts.IgnoreFields(pb.ServerWorkerStatus{}, "Tags"),
Expand Down Expand Up @@ -628,7 +623,6 @@ func TestStatusSessionClosed(t *testing.T) {
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{},
},
},
Expand Down Expand Up @@ -662,7 +656,6 @@ func TestStatusSessionClosed(t *testing.T) {
pbs.Job_SessionInfo{},
pbs.SessionJobInfo{},
pbs.Connection{},
pbs.AuthorizedWorkerList{},
pbs.AuthorizedDownstreamWorkerList{},
),
cmpopts.IgnoreFields(pb.ServerWorkerStatus{}, "Tags"),
Expand Down Expand Up @@ -797,7 +790,6 @@ func TestStatusDeadConnection(t *testing.T) {
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{},
}

Expand All @@ -816,7 +808,6 @@ func TestStatusDeadConnection(t *testing.T) {
pbs.Job_SessionInfo{},
pbs.SessionJobInfo{},
pbs.Connection{},
pbs.AuthorizedWorkerList{},
pbs.AuthorizedDownstreamWorkerList{},
),
cmpopts.IgnoreFields(pb.ServerWorkerStatus{}, "Tags"),
Expand Down Expand Up @@ -952,7 +943,6 @@ func TestStatusWorkerWithKeyId(t *testing.T) {
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{},
},
},
Expand Down Expand Up @@ -992,7 +982,6 @@ func TestStatusWorkerWithKeyId(t *testing.T) {
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{},
},
},
Expand Down Expand Up @@ -1023,7 +1012,6 @@ func TestStatusWorkerWithKeyId(t *testing.T) {
pbs.Job_SessionInfo{},
pbs.SessionJobInfo{},
pbs.Connection{},
pbs.AuthorizedWorkerList{},
pbs.AuthorizedDownstreamWorkerList{},
),
cmpopts.IgnoreFields(pb.ServerWorkerStatus{}, "Tags"),
Expand Down Expand Up @@ -1094,7 +1082,6 @@ func TestStatusAuthorizedWorkers(t *testing.T) {
Name: worker1.GetName(),
Address: worker1.GetAddress(),
},
ConnectedWorkerKeyIdentifiers: []string{},
},
want: &pbs.StatusResponse{
CalculatedUpstreams: []*pbs.UpstreamServer{
Expand All @@ -1104,7 +1091,6 @@ func TestStatusAuthorizedWorkers(t *testing.T) {
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{},
},
},
Expand All @@ -1126,7 +1112,6 @@ func TestStatusAuthorizedWorkers(t *testing.T) {
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{},
},
},
Expand All @@ -1139,7 +1124,6 @@ func TestStatusAuthorizedWorkers(t *testing.T) {
Name: worker1.GetName(),
Address: worker1.GetAddress(),
},
ConnectedWorkerKeyIdentifiers: []string{w1KeyId, w2KeyId, "unknown"},
},
want: &pbs.StatusResponse{
CalculatedUpstreams: []*pbs.UpstreamServer{
Expand All @@ -1148,10 +1132,7 @@ func TestStatusAuthorizedWorkers(t *testing.T) {
Address: "127.0.0.1",
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{
WorkerKeyIdentifiers: []string{w1KeyId, w2KeyId},
},
WorkerId: worker1.PublicId,
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{},
},
},
Expand All @@ -1164,7 +1145,6 @@ func TestStatusAuthorizedWorkers(t *testing.T) {
Name: worker1.GetName(),
Address: worker1.GetAddress(),
},
ConnectedWorkerKeyIdentifiers: []string{w1KeyId, w2KeyId, "unknown"},
ConnectedUnmappedWorkerKeyIdentifiers: []string{w2KeyId, "unknown"},
ConnectedWorkerPublicIds: []string{w1.GetPublicId(), "unknown"},
},
Expand All @@ -1176,9 +1156,6 @@ func TestStatusAuthorizedWorkers(t *testing.T) {
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{
WorkerKeyIdentifiers: []string{w1KeyId, w2KeyId},
},
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{
UnmappedWorkerKeyIdentifiers: []string{w2KeyId},
WorkerPublicIds: []string{w1.GetPublicId()},
Expand All @@ -1194,7 +1171,6 @@ func TestStatusAuthorizedWorkers(t *testing.T) {
Name: worker1.GetName(),
Address: worker1.GetAddress(),
},
ConnectedWorkerKeyIdentifiers: []string{w1KeyId, w2KeyId, "unknown"},
ConnectedUnmappedWorkerKeyIdentifiers: []string{"unknown"},
ConnectedWorkerPublicIds: []string{w1.GetPublicId(), w2.GetPublicId(), "unknown"},
},
Expand All @@ -1206,9 +1182,6 @@ func TestStatusAuthorizedWorkers(t *testing.T) {
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{
WorkerKeyIdentifiers: []string{w1KeyId, w2KeyId},
},
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{
WorkerPublicIds: []string{w1.GetPublicId(), w2.GetPublicId()},
},
Expand All @@ -1227,10 +1200,6 @@ func TestStatusAuthorizedWorkers(t *testing.T) {
assert.Equal(tc.wantErrMsg, err.Error())
return
}
gotAuthorizedWorkers := got.GetAuthorizedWorkers()
sort.Strings(gotAuthorizedWorkers.GetWorkerKeyIdentifiers())
wantAuthorizedWorkers := tc.want.GetAuthorizedWorkers()
sort.Strings(wantAuthorizedWorkers.GetWorkerKeyIdentifiers())
sort.Strings(got.GetAuthorizedDownstreamWorkers().GetWorkerPublicIds())
sort.Strings(tc.want.GetAuthorizedDownstreamWorkers().GetWorkerPublicIds())
sort.Strings(got.GetAuthorizedDownstreamWorkers().GetUnmappedWorkerKeyIdentifiers())
Expand All @@ -1248,7 +1217,6 @@ func TestStatusAuthorizedWorkers(t *testing.T) {
pbs.Job_SessionInfo{},
pbs.SessionJobInfo{},
pbs.Connection{},
pbs.AuthorizedWorkerList{},
pbs.AuthorizedDownstreamWorkerList{},
),
cmpopts.IgnoreFields(pb.ServerWorkerStatus{}, "Tags"),
Expand Down
24 changes: 24 additions & 0 deletions internal/daemon/worker/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,28 @@ const (
// DefaultStatusTimeout is the timeout duration on status calls to the controller from
// the worker
DefaultStatusTimeout = server.DefaultLiveness / 3

// JobInfoInterval is the base duration used in the calculation of the random backoff
// during the worker job info report
JobInfoInterval = 2 * time.Second

// DefaultJobInfoTimeout is the timeout duration on JobInfo calls to the controller from
// the worker
DefaultJobInfoTimeout = server.DefaultLiveness / 3

// RoutingInfoInterval is the base duration used in the calculation of the random backoff
// during the worker routing info report
RoutingInfoInterval = 10 * time.Second

// DefaultRoutingInfoTimeout is the timeout duration on RoutingInfo calls to the controller from
// the worker
DefaultRoutingInfoTimeout = server.DefaultLiveness / 2

// StatisticsInterval is the base duration used in the calculation of the random backoff
// during the worker statistics report
StatisticsInterval = 15 * time.Second

// DefaultStatisticsTimeout is the timeout duration on Statistics calls to the controller from
// the worker
DefaultStatisticsTimeout = server.DefaultLiveness
)
Loading

0 comments on commit 466b58c

Please sign in to comment.