Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
marcoandredinis committed Oct 2, 2024
1 parent 5e20588 commit 0c450ad
Show file tree
Hide file tree
Showing 6 changed files with 307 additions and 1 deletion.
3 changes: 3 additions & 0 deletions lib/auth/authclient/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,9 @@ type ReadDiscoveryAccessPoint interface {

// GetProxies returns a list of registered proxies.
GetProxies() ([]types.Server, error)

// GetUserTask gets a single User Task by its name.
GetUserTask(ctx context.Context, name string) (*usertasksv1.UserTask, error)
}

// DiscoveryAccessPoint is an API interface implemented by a certificate authority (CA) to be
Expand Down
55 changes: 55 additions & 0 deletions lib/srv/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ import (
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/timestamppb"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/client/proto"
usertasksv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/usertasks/v1"
usageeventsv1 "github.com/gravitational/teleport/api/gen/proto/go/usageevents/v1"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/discoveryconfig"
Expand Down Expand Up @@ -329,6 +331,7 @@ type Server struct {

awsSyncStatus awsSyncStatus
awsEC2ResourcesStatus awsResourcesStatus
awsEC2Tasks awsEC2Tasks

// caRotationCh receives nodes that need to have their CAs rotated.
caRotationCh chan []types.Server
Expand Down Expand Up @@ -460,6 +463,7 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error {
server.WithTriggerFetchC(s.newDiscoveryConfigChangedSub()),
server.WithPreFetchHookFn(func() {
s.awsEC2ResourcesStatus.iterationStarted()
s.awsEC2Tasks.iterationStarted()
}),
)
if err != nil {
Expand Down Expand Up @@ -883,6 +887,22 @@ func (s *Server) heartbeatEICEInstance(instances *server.EC2Instances) {
discoveryConfig: instances.DiscoveryConfig,
integration: instances.Integration,
}, 1)

s.awsEC2Tasks.addFailedEnrollment(
awsEC2FailedEnrollmentGroup{
integration: instances.Integration,
issueType: types.AutoDiscoverEC2IssueEICEFailedToCreateNode,
accountID: instances.AccountID,
region: instances.Region,
},
&usertasksv1.DiscoverEC2Instance{
// TODO(marco): add instance name
DiscoveryConfig: instances.DiscoveryConfig,
DiscoveryGroup: s.DiscoveryGroup,
SyncTime: timestamppb.New(s.clock.Now()),
InstanceId: ec2Instance.InstanceID,
},
)
continue
}

Expand Down Expand Up @@ -925,6 +945,22 @@ func (s *Server) heartbeatEICEInstance(instances *server.EC2Instances) {
discoveryConfig: instances.DiscoveryConfig,
integration: instances.Integration,
}, 1)

s.awsEC2Tasks.addFailedEnrollment(
awsEC2FailedEnrollmentGroup{
integration: instances.Integration,
issueType: types.AutoDiscoverEC2IssueEICEFailedToUpsertNode,
accountID: instances.AccountID,
region: instances.Region,
},
&usertasksv1.DiscoverEC2Instance{
// TODO(marco): add instance name
DiscoveryConfig: instances.DiscoveryConfig,
DiscoveryGroup: s.DiscoveryGroup,
SyncTime: timestamppb.New(s.clock.Now()),
InstanceId: instanceID,
},
)
}
})
if err != nil {
Expand Down Expand Up @@ -960,6 +996,24 @@ func (s *Server) handleEC2RemoteInstallation(instances *server.EC2Instances) err
discoveryConfig: instances.DiscoveryConfig,
integration: instances.Integration,
}, len(req.Instances))

for _, instance := range req.Instances {
s.awsEC2Tasks.addFailedEnrollment(
awsEC2FailedEnrollmentGroup{
integration: instances.Integration,
issueType: types.AutoDiscoverEC2IssueInvocationFailure,
accountID: instances.AccountID,
region: instances.Region,
},
&usertasksv1.DiscoverEC2Instance{
// TODO(marco): add instance name
DiscoveryConfig: instances.DiscoveryConfig,
DiscoveryGroup: s.DiscoveryGroup,
SyncTime: timestamppb.New(s.clock.Now()),
InstanceId: instance.InstanceID,
},
)
}
return trace.Wrap(err)
}
return nil
Expand Down Expand Up @@ -1072,6 +1126,7 @@ func (s *Server) handleEC2Discovery() {
}

s.updateDiscoveryConfigStatus(instances.EC2.DiscoveryConfig)
s.upsertTasksForAWSEC2FailedEnrollments()
case <-s.ctx.Done():
s.ec2Watcher.Stop()
return
Expand Down
29 changes: 29 additions & 0 deletions lib/srv/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import (
"github.com/gravitational/teleport/api/defaults"
discoveryconfigv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/discoveryconfig/v1"
integrationpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/integration/v1"
usertasksv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/usertasks/v1"
usageeventsv1 "github.com/gravitational/teleport/api/gen/proto/go/usageevents/v1"
"github.com/gravitational/teleport/api/internalutils/stream"
"github.com/gravitational/teleport/api/types"
Expand Down Expand Up @@ -269,6 +270,16 @@ func TestDiscoveryServer(t *testing.T) {
)
require.NoError(t, err)

sortUserTasksFn := func(a *usertasksv1.UserTask, b *usertasksv1.UserTask) int {
if a.Metadata.GetName() < b.Metadata.GetName() {
return 1
}
if a.Metadata.GetName() > b.Metadata.GetName() {
return -1
}
return 0
}

tcs := []struct {
name string
// presentInstances is a list of servers already present in teleport
Expand All @@ -280,6 +291,7 @@ func TestDiscoveryServer(t *testing.T) {
staticMatchers Matchers
wantInstalledInstances []string
wantDiscoveryConfigStatus *discoveryconfig.Status
wantUserTasks []*usertasksv1.UserTask
}{
{
name: "no nodes present, 1 found ",
Expand Down Expand Up @@ -640,6 +652,23 @@ func TestDiscoveryServer(t *testing.T) {
return true
}, 500*time.Millisecond, 50*time.Millisecond)
}

if tc.wantUserTasks != nil {
var allUserTasks []*usertasksv1.UserTask
var nextToken string
for {
var userTasks []*usertasksv1.UserTask
userTasks, nextToken, err = tlsServer.Auth().UserTasks.ListUserTasks(ctx, 0, "")
require.NoError(t, err)
allUserTasks = append(allUserTasks, userTasks...)
if nextToken == "" {
break
}
}
slices.SortFunc(allUserTasks, sortUserTasksFn)
slices.SortFunc(tc.wantUserTasks, sortUserTasksFn)
require.Equal(t, tc.wantUserTasks, allUserTasks)
}
})
}
}
Expand Down
199 changes: 199 additions & 0 deletions lib/srv/discovery/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,16 @@ import (
"time"

"github.com/gravitational/trace"
"google.golang.org/protobuf/types/known/timestamppb"

discoveryconfigv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/discoveryconfig/v1"
usertasksv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/usertasks/v1"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/discoveryconfig"
"github.com/gravitational/teleport/api/types/usertasks"
"github.com/gravitational/teleport/api/utils/retryutils"
libevents "github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/services"
aws_sync "github.com/gravitational/teleport/lib/srv/discovery/fetchers/aws-sync"
"github.com/gravitational/teleport/lib/srv/server"
)
Expand Down Expand Up @@ -293,5 +299,198 @@ func (s *Server) ReportEC2SSMInstallationResult(ctx context.Context, result *ser

s.updateDiscoveryConfigStatus(result.DiscoveryConfig)

s.awsEC2Tasks.addFailedEnrollment(
awsEC2FailedEnrollmentGroup{
integration: result.IntegrationName,
issueType: result.IssueType,
accountID: result.SSMRunEvent.AccountID,
region: result.SSMRunEvent.Region,
},
&usertasksv1.DiscoverEC2Instance{
// TODO(marco): add instance name
InvocationUrl: result.SSMRunEvent.InvocationURL,
DiscoveryConfig: result.DiscoveryConfig,
DiscoveryGroup: s.DiscoveryGroup,
SyncTime: timestamppb.New(result.SSMRunEvent.Time),
InstanceId: result.SSMRunEvent.InstanceID,
},
)

return nil
}

// awsEC2FailedEnrollments ...
type awsEC2Tasks struct {
mu sync.RWMutex
// instancesIssue maps the DiscoveryConfig name and integration to a summary of discovered/enrolled resources.
instancesIssue map[awsEC2FailedEnrollmentGroup]map[string]*usertasksv1.DiscoverEC2Instance
// groupPending is used to register which groups were changed in memory but were not yet sent upstream.
// When upserting User Tasks, if the group is not present in groupPending,
// then the cluster already has the latest version of this particular task/group.
groupPending map[awsEC2FailedEnrollmentGroup]struct{}
}

// awsEC2FailedEnrollmentGroup identifies a UserTask key.
type awsEC2FailedEnrollmentGroup struct {
integration string
issueType string
accountID string
region string
}

// iterationStarted clears out any in memory issues that were recorded.
// This is used when starting a new Auto Discover EC2 watcher iteration.
func (d *awsEC2Tasks) iterationStarted() {
d.mu.Lock()
defer d.mu.Unlock()

d.instancesIssue = make(map[awsEC2FailedEnrollmentGroup]map[string]*usertasksv1.DiscoverEC2Instance)
d.groupPending = make(map[awsEC2FailedEnrollmentGroup]struct{})
}

// addFailedEnrollment adds a
func (d *awsEC2Tasks) addFailedEnrollment(g awsEC2FailedEnrollmentGroup, instance *usertasksv1.DiscoverEC2Instance) {
d.mu.Lock()
defer d.mu.Unlock()
if d.instancesIssue == nil {
d.instancesIssue = make(map[awsEC2FailedEnrollmentGroup]map[string]*usertasksv1.DiscoverEC2Instance)
}
if _, ok := d.instancesIssue[g]; !ok {
d.instancesIssue[g] = make(map[string]*usertasksv1.DiscoverEC2Instance)
}
d.instancesIssue[g][instance.InstanceId] = instance

if d.groupPending == nil {
d.groupPending = make(map[awsEC2FailedEnrollmentGroup]struct{})
}
d.groupPending[g] = struct{}{}
}

// mergeUpsertDiscoverEC2Task takes the current DiscoverEC2 User Task issues stored in memory and
// merges them against the ones that exist in the cluster.
//
// All of this flow is protected by a lock to ensure there's no race between this and other DiscoveryServices.
func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2FailedEnrollmentGroup, failedInstances map[string]*usertasksv1.DiscoverEC2Instance) error {
userTaskName := usertasks.TaskNameForDiscoverEC2(usertasks.TaskNameForDiscoverEC2Parts{
Integration: taskGroup.integration,
IssueType: taskGroup.issueType,
AccountID: taskGroup.accountID,
Region: taskGroup.region,
})

// Use the deterministic task name as semaphore name.
semaphoreName := userTaskName
semaphoreExpiration := 5 * time.Second

// AcquireSemaphoreLock will retry until the semaphore is acquired.
// This prevents multiple discovery services to write AWS resources in parallel.
// lease must be released to cleanup the resource in auth server.
lease, err := services.AcquireSemaphoreLockWithRetry(
s.ctx,
services.SemaphoreLockConfigWithRetry{
SemaphoreLockConfig: services.SemaphoreLockConfig{
Service: s.AccessPoint,
Params: types.AcquireSemaphoreRequest{
SemaphoreKind: types.KindUserTask,
SemaphoreName: semaphoreName,
MaxLeases: 1,
Holder: s.Config.ServerID,
},
Expiry: semaphoreExpiration,
Clock: s.clock,
},
Retry: retryutils.LinearConfig{
Clock: s.clock,
First: time.Second,
Step: semaphoreExpiration / 2,
Max: semaphoreExpiration,
Jitter: retryutils.NewJitter(),
},
},
)
if err != nil {
return trace.Wrap(err)
}

// once the lease parent context is canceled, the lease will be released.
ctxWithLease, cancel := context.WithCancel(lease)
defer cancel()

defer func() {
lease.Stop()
if err := lease.Wait(); err != nil {
s.Log.WithError(err).WithField("semaphore", userTaskName).Warn("error cleaning up UserTask semaphore")
}
}()

// Fetch the current task because it might have instances discovered by another discovery group.
currentUserTask, err := s.AccessPoint.GetUserTask(ctxWithLease, userTaskName)
switch {
case trace.IsNotFound(err):
case err != nil:
return trace.Wrap(err)
default:
for existingIntanceID, existingInstance := range currentUserTask.Spec.DiscoverEc2.Instances {
// The current DiscoveryService has the last state for a given DiscoveryGroup.
// Any previous state for the DiscoveryGroup must be ignored.
if existingInstance.DiscoveryGroup == s.DiscoveryGroup {
continue
}

// For instances whose sync time is too far in the past, just drop them.
// This ensures that if an instance is removed from AWS, it will eventually disappear from the User Tasks' instance list.
// It might also be the case that the DiscoveryConfig was changed and the instance is no longer matcher (because of labels/regions or other matchers).
instanceIssueExpiration := s.clock.Now().Add(-2 * s.PollInterval)
if existingInstance.SyncTime.AsTime().Before(instanceIssueExpiration) {
continue
}

failedInstances[existingIntanceID] = existingInstance
}
}

// If the DiscoveryService is stopped, or the issue does not happen again
// the task should be removed to prevent users from working on issues that are no longer happening.
taskExpiration := s.clock.Now().Add(2 * s.PollInterval)

task, err := usertasks.NewDiscoverEC2UserTask(
&usertasksv1.UserTaskSpec{
Integration: taskGroup.integration,
TaskType: usertasks.TaskTypeDiscoverEC2,
IssueType: taskGroup.issueType,
State: usertasks.TaskStateOpen,
DiscoverEc2: &usertasksv1.DiscoverEC2{
AccountId: taskGroup.accountID,
Region: taskGroup.region,
Instances: failedInstances,
},
},
usertasks.WithExpiration(taskExpiration),
)
if err != nil {
return trace.Wrap(err)
}

if _, err := s.AccessPoint.UpsertUserTask(ctxWithLease, task); err != nil {
return trace.Wrap(err)
}

return nil
}

func (s *Server) upsertTasksForAWSEC2FailedEnrollments() {
s.awsEC2Tasks.mu.Lock()
defer s.awsEC2Tasks.mu.Unlock()
for g, instances := range s.awsEC2Tasks.instancesIssue {
if _, pending := s.awsEC2Tasks.groupPending[g]; !pending {
continue
}

if err := s.mergeUpsertDiscoverEC2Task(g, instances); err != nil {
s.Log.WithError(err).Warning("failed to create discover ec2 user task")
continue
}

delete(s.awsEC2Tasks.groupPending, g)
}
}
Loading

0 comments on commit 0c450ad

Please sign in to comment.