diff --git a/lib/auth/authclient/api.go b/lib/auth/authclient/api.go index f3fadb2e1959a..e41d09ca54e4d 100644 --- a/lib/auth/authclient/api.go +++ b/lib/auth/authclient/api.go @@ -755,6 +755,9 @@ type ReadDiscoveryAccessPoint interface { // GetProxies returns a list of registered proxies. GetProxies() ([]types.Server, error) + + // GetUserTask gets a single User Task by its name. + GetUserTask(ctx context.Context, name string) (*usertasksv1.UserTask, error) } // DiscoveryAccessPoint is an API interface implemented by a certificate authority (CA) to be diff --git a/lib/srv/discovery/discovery.go b/lib/srv/discovery/discovery.go index 92e46ad3f3abe..58b4f70b79ff6 100644 --- a/lib/srv/discovery/discovery.go +++ b/lib/srv/discovery/discovery.go @@ -36,11 +36,13 @@ import ( "github.com/gravitational/trace" "github.com/jonboulle/clockwork" "github.com/sirupsen/logrus" + "google.golang.org/protobuf/types/known/timestamppb" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/client/proto" + usertasksv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/usertasks/v1" usageeventsv1 "github.com/gravitational/teleport/api/gen/proto/go/usageevents/v1" "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/types/discoveryconfig" @@ -329,6 +331,7 @@ type Server struct { awsSyncStatus awsSyncStatus awsEC2ResourcesStatus awsResourcesStatus + awsEC2Tasks awsEC2Tasks // caRotationCh receives nodes that need to have their CAs rotated. caRotationCh chan []types.Server @@ -460,6 +463,7 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error { server.WithTriggerFetchC(s.newDiscoveryConfigChangedSub()), server.WithPreFetchHookFn(func() { s.awsEC2ResourcesStatus.iterationStarted() + s.awsEC2Tasks.iterationStarted() }), ) if err != nil { @@ -883,6 +887,22 @@ func (s *Server) heartbeatEICEInstance(instances *server.EC2Instances) { discoveryConfig: instances.DiscoveryConfig, integration: instances.Integration, }, 1) + + s.awsEC2Tasks.addFailedEnrollment( + awsEC2FailedEnrollmentGroup{ + integration: instances.Integration, + issueType: types.AutoDiscoverEC2IssueEICEFailedToCreateNode, + accountID: instances.AccountID, + region: instances.Region, + }, + &usertasksv1.DiscoverEC2Instance{ + // TODO(marco): add instance name + DiscoveryConfig: instances.DiscoveryConfig, + DiscoveryGroup: s.DiscoveryGroup, + SyncTime: timestamppb.New(s.clock.Now()), + InstanceId: ec2Instance.InstanceID, + }, + ) continue } @@ -925,6 +945,22 @@ func (s *Server) heartbeatEICEInstance(instances *server.EC2Instances) { discoveryConfig: instances.DiscoveryConfig, integration: instances.Integration, }, 1) + + s.awsEC2Tasks.addFailedEnrollment( + awsEC2FailedEnrollmentGroup{ + integration: instances.Integration, + issueType: types.AutoDiscoverEC2IssueEICEFailedToUpsertNode, + accountID: instances.AccountID, + region: instances.Region, + }, + &usertasksv1.DiscoverEC2Instance{ + // TODO(marco): add instance name + DiscoveryConfig: instances.DiscoveryConfig, + DiscoveryGroup: s.DiscoveryGroup, + SyncTime: timestamppb.New(s.clock.Now()), + InstanceId: instanceID, + }, + ) } }) if err != nil { @@ -960,6 +996,24 @@ func (s *Server) handleEC2RemoteInstallation(instances *server.EC2Instances) err discoveryConfig: instances.DiscoveryConfig, integration: instances.Integration, }, len(req.Instances)) + + for _, instance := range req.Instances { + s.awsEC2Tasks.addFailedEnrollment( + awsEC2FailedEnrollmentGroup{ + integration: instances.Integration, + issueType: types.AutoDiscoverEC2IssueInvocationFailure, + accountID: instances.AccountID, + region: instances.Region, + }, + &usertasksv1.DiscoverEC2Instance{ + // TODO(marco): add instance name + DiscoveryConfig: instances.DiscoveryConfig, + DiscoveryGroup: s.DiscoveryGroup, + SyncTime: timestamppb.New(s.clock.Now()), + InstanceId: instance.InstanceID, + }, + ) + } return trace.Wrap(err) } return nil @@ -1072,6 +1126,7 @@ func (s *Server) handleEC2Discovery() { } s.updateDiscoveryConfigStatus(instances.EC2.DiscoveryConfig) + s.upsertTasksForAWSEC2FailedEnrollments() case <-s.ctx.Done(): s.ec2Watcher.Stop() return diff --git a/lib/srv/discovery/discovery_test.go b/lib/srv/discovery/discovery_test.go index 6f0b655e6e34a..93cde18b0ab08 100644 --- a/lib/srv/discovery/discovery_test.go +++ b/lib/srv/discovery/discovery_test.go @@ -65,6 +65,7 @@ import ( "github.com/gravitational/teleport/api/defaults" discoveryconfigv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/discoveryconfig/v1" integrationpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/integration/v1" + usertasksv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/usertasks/v1" usageeventsv1 "github.com/gravitational/teleport/api/gen/proto/go/usageevents/v1" "github.com/gravitational/teleport/api/internalutils/stream" "github.com/gravitational/teleport/api/types" @@ -269,6 +270,16 @@ func TestDiscoveryServer(t *testing.T) { ) require.NoError(t, err) + sortUserTasksFn := func(a *usertasksv1.UserTask, b *usertasksv1.UserTask) int { + if a.Metadata.GetName() < b.Metadata.GetName() { + return 1 + } + if a.Metadata.GetName() > b.Metadata.GetName() { + return -1 + } + return 0 + } + tcs := []struct { name string // presentInstances is a list of servers already present in teleport @@ -280,6 +291,7 @@ func TestDiscoveryServer(t *testing.T) { staticMatchers Matchers wantInstalledInstances []string wantDiscoveryConfigStatus *discoveryconfig.Status + wantUserTasks []*usertasksv1.UserTask }{ { name: "no nodes present, 1 found ", @@ -640,6 +652,23 @@ func TestDiscoveryServer(t *testing.T) { return true }, 500*time.Millisecond, 50*time.Millisecond) } + + if tc.wantUserTasks != nil { + var allUserTasks []*usertasksv1.UserTask + var nextToken string + for { + var userTasks []*usertasksv1.UserTask + userTasks, nextToken, err = tlsServer.Auth().UserTasks.ListUserTasks(ctx, 0, "") + require.NoError(t, err) + allUserTasks = append(allUserTasks, userTasks...) + if nextToken == "" { + break + } + } + slices.SortFunc(allUserTasks, sortUserTasksFn) + slices.SortFunc(tc.wantUserTasks, sortUserTasksFn) + require.Equal(t, tc.wantUserTasks, allUserTasks) + } }) } } diff --git a/lib/srv/discovery/status.go b/lib/srv/discovery/status.go index a7194d87372b5..47b7025a8e4f4 100644 --- a/lib/srv/discovery/status.go +++ b/lib/srv/discovery/status.go @@ -25,10 +25,16 @@ import ( "time" "github.com/gravitational/trace" + "google.golang.org/protobuf/types/known/timestamppb" discoveryconfigv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/discoveryconfig/v1" + usertasksv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/usertasks/v1" + "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/types/discoveryconfig" + "github.com/gravitational/teleport/api/types/usertasks" + "github.com/gravitational/teleport/api/utils/retryutils" libevents "github.com/gravitational/teleport/lib/events" + "github.com/gravitational/teleport/lib/services" aws_sync "github.com/gravitational/teleport/lib/srv/discovery/fetchers/aws-sync" "github.com/gravitational/teleport/lib/srv/server" ) @@ -293,5 +299,198 @@ func (s *Server) ReportEC2SSMInstallationResult(ctx context.Context, result *ser s.updateDiscoveryConfigStatus(result.DiscoveryConfig) + s.awsEC2Tasks.addFailedEnrollment( + awsEC2FailedEnrollmentGroup{ + integration: result.IntegrationName, + issueType: result.IssueType, + accountID: result.SSMRunEvent.AccountID, + region: result.SSMRunEvent.Region, + }, + &usertasksv1.DiscoverEC2Instance{ + // TODO(marco): add instance name + InvocationUrl: result.SSMRunEvent.InvocationURL, + DiscoveryConfig: result.DiscoveryConfig, + DiscoveryGroup: s.DiscoveryGroup, + SyncTime: timestamppb.New(result.SSMRunEvent.Time), + InstanceId: result.SSMRunEvent.InstanceID, + }, + ) + + return nil +} + +// awsEC2FailedEnrollments ... +type awsEC2Tasks struct { + mu sync.RWMutex + // instancesIssue maps the DiscoveryConfig name and integration to a summary of discovered/enrolled resources. + instancesIssue map[awsEC2FailedEnrollmentGroup]map[string]*usertasksv1.DiscoverEC2Instance + // groupPending is used to register which groups were changed in memory but were not yet sent upstream. + // When upserting User Tasks, if the group is not present in groupPending, + // then the cluster already has the latest version of this particular task/group. + groupPending map[awsEC2FailedEnrollmentGroup]struct{} +} + +// awsEC2FailedEnrollmentGroup identifies a UserTask key. +type awsEC2FailedEnrollmentGroup struct { + integration string + issueType string + accountID string + region string +} + +// iterationStarted clears out any in memory issues that were recorded. +// This is used when starting a new Auto Discover EC2 watcher iteration. +func (d *awsEC2Tasks) iterationStarted() { + d.mu.Lock() + defer d.mu.Unlock() + + d.instancesIssue = make(map[awsEC2FailedEnrollmentGroup]map[string]*usertasksv1.DiscoverEC2Instance) + d.groupPending = make(map[awsEC2FailedEnrollmentGroup]struct{}) +} + +// addFailedEnrollment adds a +func (d *awsEC2Tasks) addFailedEnrollment(g awsEC2FailedEnrollmentGroup, instance *usertasksv1.DiscoverEC2Instance) { + d.mu.Lock() + defer d.mu.Unlock() + if d.instancesIssue == nil { + d.instancesIssue = make(map[awsEC2FailedEnrollmentGroup]map[string]*usertasksv1.DiscoverEC2Instance) + } + if _, ok := d.instancesIssue[g]; !ok { + d.instancesIssue[g] = make(map[string]*usertasksv1.DiscoverEC2Instance) + } + d.instancesIssue[g][instance.InstanceId] = instance + + if d.groupPending == nil { + d.groupPending = make(map[awsEC2FailedEnrollmentGroup]struct{}) + } + d.groupPending[g] = struct{}{} +} + +// mergeUpsertDiscoverEC2Task takes the current DiscoverEC2 User Task issues stored in memory and +// merges them against the ones that exist in the cluster. +// +// All of this flow is protected by a lock to ensure there's no race between this and other DiscoveryServices. +func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2FailedEnrollmentGroup, failedInstances map[string]*usertasksv1.DiscoverEC2Instance) error { + userTaskName := usertasks.TaskNameForDiscoverEC2(usertasks.TaskNameForDiscoverEC2Parts{ + Integration: taskGroup.integration, + IssueType: taskGroup.issueType, + AccountID: taskGroup.accountID, + Region: taskGroup.region, + }) + + // Use the deterministic task name as semaphore name. + semaphoreName := userTaskName + semaphoreExpiration := 5 * time.Second + + // AcquireSemaphoreLock will retry until the semaphore is acquired. + // This prevents multiple discovery services to write AWS resources in parallel. + // lease must be released to cleanup the resource in auth server. + lease, err := services.AcquireSemaphoreLockWithRetry( + s.ctx, + services.SemaphoreLockConfigWithRetry{ + SemaphoreLockConfig: services.SemaphoreLockConfig{ + Service: s.AccessPoint, + Params: types.AcquireSemaphoreRequest{ + SemaphoreKind: types.KindUserTask, + SemaphoreName: semaphoreName, + MaxLeases: 1, + Holder: s.Config.ServerID, + }, + Expiry: semaphoreExpiration, + Clock: s.clock, + }, + Retry: retryutils.LinearConfig{ + Clock: s.clock, + First: time.Second, + Step: semaphoreExpiration / 2, + Max: semaphoreExpiration, + Jitter: retryutils.NewJitter(), + }, + }, + ) + if err != nil { + return trace.Wrap(err) + } + + // once the lease parent context is canceled, the lease will be released. + ctxWithLease, cancel := context.WithCancel(lease) + defer cancel() + + defer func() { + lease.Stop() + if err := lease.Wait(); err != nil { + s.Log.WithError(err).WithField("semaphore", userTaskName).Warn("error cleaning up UserTask semaphore") + } + }() + + // Fetch the current task because it might have instances discovered by another discovery group. + currentUserTask, err := s.AccessPoint.GetUserTask(ctxWithLease, userTaskName) + switch { + case trace.IsNotFound(err): + case err != nil: + return trace.Wrap(err) + default: + for existingIntanceID, existingInstance := range currentUserTask.Spec.DiscoverEc2.Instances { + // The current DiscoveryService has the last state for a given DiscoveryGroup. + // Any previous state for the DiscoveryGroup must be ignored. + if existingInstance.DiscoveryGroup == s.DiscoveryGroup { + continue + } + + // For instances whose sync time is too far in the past, just drop them. + // This ensures that if an instance is removed from AWS, it will eventually disappear from the User Tasks' instance list. + // It might also be the case that the DiscoveryConfig was changed and the instance is no longer matcher (because of labels/regions or other matchers). + instanceIssueExpiration := s.clock.Now().Add(-2 * s.PollInterval) + if existingInstance.SyncTime.AsTime().Before(instanceIssueExpiration) { + continue + } + + failedInstances[existingIntanceID] = existingInstance + } + } + + // If the DiscoveryService is stopped, or the issue does not happen again + // the task should be removed to prevent users from working on issues that are no longer happening. + taskExpiration := s.clock.Now().Add(2 * s.PollInterval) + + task, err := usertasks.NewDiscoverEC2UserTask( + &usertasksv1.UserTaskSpec{ + Integration: taskGroup.integration, + TaskType: usertasks.TaskTypeDiscoverEC2, + IssueType: taskGroup.issueType, + State: usertasks.TaskStateOpen, + DiscoverEc2: &usertasksv1.DiscoverEC2{ + AccountId: taskGroup.accountID, + Region: taskGroup.region, + Instances: failedInstances, + }, + }, + usertasks.WithExpiration(taskExpiration), + ) + if err != nil { + return trace.Wrap(err) + } + + if _, err := s.AccessPoint.UpsertUserTask(ctxWithLease, task); err != nil { + return trace.Wrap(err) + } + return nil } + +func (s *Server) upsertTasksForAWSEC2FailedEnrollments() { + s.awsEC2Tasks.mu.Lock() + defer s.awsEC2Tasks.mu.Unlock() + for g, instances := range s.awsEC2Tasks.instancesIssue { + if _, pending := s.awsEC2Tasks.groupPending[g]; !pending { + continue + } + + if err := s.mergeUpsertDiscoverEC2Task(g, instances); err != nil { + s.Log.WithError(err).Warning("failed to create discover ec2 user task") + continue + } + + delete(s.awsEC2Tasks.groupPending, g) + } +} diff --git a/lib/srv/server/ssm_install.go b/lib/srv/server/ssm_install.go index b97fe8eb23dee..01cdc32ae869c 100644 --- a/lib/srv/server/ssm_install.go +++ b/lib/srv/server/ssm_install.go @@ -34,6 +34,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/gravitational/teleport" + "github.com/gravitational/teleport/api/types" apievents "github.com/gravitational/teleport/api/types/events" awslib "github.com/gravitational/teleport/lib/cloud/aws" libevents "github.com/gravitational/teleport/lib/events" @@ -59,6 +60,9 @@ type SSMInstallationResult struct { // DiscoveryConfig is the DiscoveryConfig name which originated this Run Request. // Empty if using static matchers (coming from the `teleport.yaml`). DiscoveryConfig string + // IssueType identifies the type of issue that occurred if the installation failed. + // These are well known identifiers that can be found at types.AutoDiscoverEC2Issue* + IssueType string } // SSMInstaller handles running SSM commands that install Teleport on EC2 instances. @@ -191,7 +195,7 @@ func (si *SSMInstaller) Run(ctx context.Context, req SSMRunRequest) error { return trace.Wrap(g.Wait()) } -func invalidSSMInstanceInstallationResult(req SSMRunRequest, instanceID, status string) *SSMInstallationResult { +func invalidSSMInstanceInstallationResult(req SSMRunRequest, instanceID, status, issueType string) *SSMInstallationResult { return &SSMInstallationResult{ SSMRunEvent: &apievents.SSMRun{ Metadata: apievents.Metadata{ @@ -207,6 +211,7 @@ func invalidSSMInstanceInstallationResult(req SSMRunRequest, instanceID, status }, IntegrationName: req.IntegrationName, DiscoveryConfig: req.DiscoveryConfig, + IssueType: issueType, } } @@ -215,6 +220,7 @@ func (si *SSMInstaller) emitInvalidInstanceEvents(ctx context.Context, req SSMRu for _, instanceID := range instanceIDsState.missing { installationResult := invalidSSMInstanceInstallationResult(req, instanceID, "EC2 Instance is not registered in SSM. Make sure that the instance has AmazonSSMManagedInstanceCore policy assigned.", + types.AutoDiscoverEC2IssueScriptInstanceNotRegistered, ) if err := si.ReportSSMInstallationResultFunc(ctx, installationResult); err != nil { errs = append(errs, trace.Wrap(err)) @@ -224,6 +230,7 @@ func (si *SSMInstaller) emitInvalidInstanceEvents(ctx context.Context, req SSMRu for _, instanceID := range instanceIDsState.connectionLost { installationResult := invalidSSMInstanceInstallationResult(req, instanceID, "SSM Agent in EC2 Instance is not connecting to SSM Service. Restart or reinstall the SSM service. See https://docs.aws.amazon.com/systems-manager/latest/userguide/ami-preinstalled-agent.html#verify-ssm-agent-status for more details.", + types.AutoDiscoverEC2IssueScriptInstanceConnectionLost, ) if err := si.ReportSSMInstallationResultFunc(ctx, installationResult); err != nil { errs = append(errs, trace.Wrap(err)) @@ -233,6 +240,7 @@ func (si *SSMInstaller) emitInvalidInstanceEvents(ctx context.Context, req SSMRu for _, instanceID := range instanceIDsState.unsupportedOS { installationResult := invalidSSMInstanceInstallationResult(req, instanceID, "EC2 instance is running an unsupported Operating System. Only Linux is supported.", + types.AutoDiscoverEC2IssueScriptInstanceUnsupportedOS, ) if err := si.ReportSSMInstallationResultFunc(ctx, installationResult); err != nil { errs = append(errs, trace.Wrap(err)) @@ -350,6 +358,7 @@ func (si *SSMInstaller) checkCommand(ctx context.Context, req SSMRunRequest, com SSMRunEvent: invocationResultEvent, IntegrationName: req.IntegrationName, DiscoveryConfig: req.DiscoveryConfig, + IssueType: types.AutoDiscoverEC2IssueScriptFailure, })) } @@ -363,6 +372,7 @@ func (si *SSMInstaller) checkCommand(ctx context.Context, req SSMRunRequest, com SSMRunEvent: stepResultEvent, IntegrationName: req.IntegrationName, DiscoveryConfig: req.DiscoveryConfig, + IssueType: types.AutoDiscoverEC2IssueScriptFailure, })) } } diff --git a/lib/srv/server/ssm_install_test.go b/lib/srv/server/ssm_install_test.go index 8e5225ee3a88c..5d24398433bcc 100644 --- a/lib/srv/server/ssm_install_test.go +++ b/lib/srv/server/ssm_install_test.go @@ -144,6 +144,7 @@ func TestSSMInstaller(t *testing.T) { Status: ssm.CommandStatusSuccess, InvocationURL: "https://eu-central-1.console.aws.amazon.com/systems-manager/run-command/command-id-1/instance-id-1", }, + IssueType: "ec2-ssm-script-failure", }}, }, { @@ -188,6 +189,7 @@ func TestSSMInstaller(t *testing.T) { Status: ssm.CommandStatusSuccess, InvocationURL: "https://eu-central-1.console.aws.amazon.com/systems-manager/run-command/command-id-1/instance-id-1", }, + IssueType: "ec2-ssm-script-failure", }}, }, { @@ -234,6 +236,7 @@ func TestSSMInstaller(t *testing.T) { StandardError: "timeout error", InvocationURL: "https://eu-central-1.console.aws.amazon.com/systems-manager/run-command/command-id-1/instance-id-1", }, + IssueType: "ec2-ssm-script-failure", }}, }, { @@ -284,6 +287,7 @@ func TestSSMInstaller(t *testing.T) { StandardError: "timeout error", InvocationURL: "https://eu-central-1.console.aws.amazon.com/systems-manager/run-command/command-id-1/instance-id-1", }, + IssueType: "ec2-ssm-script-failure", }}, }, { @@ -351,6 +355,7 @@ func TestSSMInstaller(t *testing.T) { Status: ssm.CommandStatusSuccess, InvocationURL: "https://eu-central-1.console.aws.amazon.com/systems-manager/run-command/command-id-1/instance-id-1", }, + IssueType: "ec2-ssm-script-failure", }, { SSMRunEvent: &events.SSMRun{ @@ -365,6 +370,7 @@ func TestSSMInstaller(t *testing.T) { ExitCode: -1, Status: "SSM Agent in EC2 Instance is not connecting to SSM Service. Restart or reinstall the SSM service. See https://docs.aws.amazon.com/systems-manager/latest/userguide/ami-preinstalled-agent.html#verify-ssm-agent-status for more details.", }, + IssueType: "ec2-ssm-agent-connection-lost", }, { SSMRunEvent: &events.SSMRun{ @@ -379,6 +385,7 @@ func TestSSMInstaller(t *testing.T) { ExitCode: -1, Status: "EC2 instance is running an unsupported Operating System. Only Linux is supported.", }, + IssueType: "ec2-ssm-unsupported-os", }, { SSMRunEvent: &events.SSMRun{ @@ -393,6 +400,7 @@ func TestSSMInstaller(t *testing.T) { ExitCode: -1, Status: "EC2 Instance is not registered in SSM. Make sure that the instance has AmazonSSMManagedInstanceCore policy assigned.", }, + IssueType: "ec2-ssm-agent-not-registered", }, }, }, @@ -448,6 +456,7 @@ func TestSSMInstaller(t *testing.T) { StandardOutput: "custom output", InvocationURL: "https://eu-central-1.console.aws.amazon.com/systems-manager/run-command/command-id-1/instance-id-1", }, + IssueType: "ec2-ssm-script-failure", }}, }, { @@ -488,6 +497,7 @@ func TestSSMInstaller(t *testing.T) { Status: ssm.CommandStatusSuccess, InvocationURL: "https://eu-central-1.console.aws.amazon.com/systems-manager/run-command/command-id-1/instance-id-1", }, + IssueType: "ec2-ssm-script-failure", }}, }, // todo(amk): test that incomplete commands eventually return