diff --git a/api/gen/proto/go/teleport/machineid/v1/federation.pb.go b/api/gen/proto/go/teleport/machineid/v1/federation.pb.go index fd2ca8b74408..a2290d9a9380 100644 --- a/api/gen/proto/go/teleport/machineid/v1/federation.pb.go +++ b/api/gen/proto/go/teleport/machineid/v1/federation.pb.go @@ -24,7 +24,6 @@ import ( v1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/header/v1" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" - durationpb "google.golang.org/protobuf/types/known/durationpb" timestamppb "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" @@ -352,9 +351,17 @@ type SPIFFEFederationStatus struct { CurrentBundle string `protobuf:"bytes,1,opt,name=current_bundle,json=currentBundle,proto3" json:"current_bundle,omitempty"` // The time that the most recently fetched bundle was obtained. CurrentBundleSyncedAt *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=current_bundle_synced_at,json=currentBundleSyncedAt,proto3" json:"current_bundle_synced_at,omitempty"` - // The duration that the current bundle suggests the next bundle should be - // refresh after. - CurrentBundleRefreshHint *durationpb.Duration `protobuf:"bytes,3,opt,name=current_bundle_refresh_hint,json=currentBundleRefreshHint,proto3" json:"current_bundle_refresh_hint,omitempty"` + // The time that this SPIFFE federation should be synced again. This is + // usually determined by the refresh hint provided within the current bundle + // but this can be overridden by the server where the provided refresh hint + // is not appropriate. + // + // A value of zero indicates that an automatic sync is not scheduled (e.g. + // because the bundle source is static). + NextSyncAt *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=next_sync_at,json=nextSyncAt,proto3" json:"next_sync_at,omitempty"` + // The SPIFFEFederationBundleSource that was used for the currently synced + // bundle. This allows the bundle to be resynced if the source changes. + CurrentBundleSyncedFrom *SPIFFEFederationBundleSource `protobuf:"bytes,5,opt,name=current_bundle_synced_from,json=currentBundleSyncedFrom,proto3" json:"current_bundle_synced_from,omitempty"` } func (x *SPIFFEFederationStatus) Reset() { @@ -403,9 +410,16 @@ func (x *SPIFFEFederationStatus) GetCurrentBundleSyncedAt() *timestamppb.Timesta return nil } -func (x *SPIFFEFederationStatus) GetCurrentBundleRefreshHint() *durationpb.Duration { +func (x *SPIFFEFederationStatus) GetNextSyncAt() *timestamppb.Timestamp { if x != nil { - return x.CurrentBundleRefreshHint + return x.NextSyncAt + } + return nil +} + +func (x *SPIFFEFederationStatus) GetCurrentBundleSyncedFrom() *SPIFFEFederationBundleSource { + if x != nil { + return x.CurrentBundleSyncedFrom } return nil } @@ -417,8 +431,6 @@ var file_teleport_machineid_v1_federation_proto_rawDesc = []byte{ 0x6e, 0x65, 0x69, 0x64, 0x2f, 0x76, 0x31, 0x2f, 0x66, 0x65, 0x64, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x15, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x69, 0x64, 0x2e, 0x76, 0x31, 0x1a, - 0x1e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, - 0x2f, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x21, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x68, 0x65, 0x61, 0x64, 0x65, @@ -470,7 +482,7 @@ var file_teleport_machineid_v1_federation_proto_rawDesc = []byte{ 0x74, 0x2e, 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x69, 0x64, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x50, 0x49, 0x46, 0x46, 0x45, 0x46, 0x65, 0x64, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0x75, 0x6e, 0x64, 0x6c, 0x65, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x52, 0x0c, 0x62, 0x75, 0x6e, - 0x64, 0x6c, 0x65, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x22, 0xee, 0x01, 0x0a, 0x16, 0x53, 0x50, + 0x64, 0x6c, 0x65, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x22, 0xe7, 0x02, 0x0a, 0x16, 0x53, 0x50, 0x49, 0x46, 0x46, 0x45, 0x46, 0x65, 0x64, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x62, 0x75, 0x6e, 0x64, 0x6c, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x75, @@ -480,18 +492,26 @@ var file_teleport_machineid_v1_federation_proto_rawDesc = []byte{ 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x15, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x42, 0x75, 0x6e, 0x64, 0x6c, 0x65, 0x53, 0x79, 0x6e, 0x63, 0x65, 0x64, 0x41, 0x74, - 0x12, 0x58, 0x0a, 0x1b, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x62, 0x75, 0x6e, 0x64, - 0x6c, 0x65, 0x5f, 0x72, 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, 0x5f, 0x68, 0x69, 0x6e, 0x74, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, - 0x52, 0x18, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x42, 0x75, 0x6e, 0x64, 0x6c, 0x65, 0x52, - 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, 0x48, 0x69, 0x6e, 0x74, 0x42, 0x56, 0x5a, 0x54, 0x67, 0x69, - 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x72, 0x61, 0x76, 0x69, 0x74, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2f, - 0x61, 0x70, 0x69, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, - 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e, - 0x65, 0x69, 0x64, 0x2f, 0x76, 0x31, 0x3b, 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x69, 0x64, - 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x12, 0x3c, 0x0a, 0x0c, 0x6e, 0x65, 0x78, 0x74, 0x5f, 0x73, 0x79, 0x6e, 0x63, 0x5f, 0x61, 0x74, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, + 0x6d, 0x70, 0x52, 0x0a, 0x6e, 0x65, 0x78, 0x74, 0x53, 0x79, 0x6e, 0x63, 0x41, 0x74, 0x12, 0x70, + 0x0a, 0x1a, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x62, 0x75, 0x6e, 0x64, 0x6c, 0x65, + 0x5f, 0x73, 0x79, 0x6e, 0x63, 0x65, 0x64, 0x5f, 0x66, 0x72, 0x6f, 0x6d, 0x18, 0x05, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x33, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x6d, 0x61, + 0x63, 0x68, 0x69, 0x6e, 0x65, 0x69, 0x64, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x50, 0x49, 0x46, 0x46, + 0x45, 0x46, 0x65, 0x64, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0x75, 0x6e, 0x64, 0x6c, + 0x65, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x52, 0x17, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, + 0x42, 0x75, 0x6e, 0x64, 0x6c, 0x65, 0x53, 0x79, 0x6e, 0x63, 0x65, 0x64, 0x46, 0x72, 0x6f, 0x6d, + 0x4a, 0x04, 0x08, 0x03, 0x10, 0x04, 0x52, 0x1b, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, + 0x62, 0x75, 0x6e, 0x64, 0x6c, 0x65, 0x5f, 0x72, 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, 0x5f, 0x68, + 0x69, 0x6e, 0x74, 0x42, 0x56, 0x5a, 0x54, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x67, 0x72, 0x61, 0x76, 0x69, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x2f, + 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x65, 0x6e, + 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, + 0x72, 0x74, 0x2f, 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x69, 0x64, 0x2f, 0x76, 0x31, 0x3b, + 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x69, 0x64, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, } var ( @@ -516,7 +536,6 @@ var file_teleport_machineid_v1_federation_proto_goTypes = []any{ (*SPIFFEFederationStatus)(nil), // 5: teleport.machineid.v1.SPIFFEFederationStatus (*v1.Metadata)(nil), // 6: teleport.header.v1.Metadata (*timestamppb.Timestamp)(nil), // 7: google.protobuf.Timestamp - (*durationpb.Duration)(nil), // 8: google.protobuf.Duration } var file_teleport_machineid_v1_federation_proto_depIdxs = []int32{ 6, // 0: teleport.machineid.v1.SPIFFEFederation.metadata:type_name -> teleport.header.v1.Metadata @@ -526,12 +545,13 @@ var file_teleport_machineid_v1_federation_proto_depIdxs = []int32{ 2, // 4: teleport.machineid.v1.SPIFFEFederationBundleSource.https_web:type_name -> teleport.machineid.v1.SPIFFEFederationBundleSourceHTTPSWeb 3, // 5: teleport.machineid.v1.SPIFFEFederationSpec.bundle_source:type_name -> teleport.machineid.v1.SPIFFEFederationBundleSource 7, // 6: teleport.machineid.v1.SPIFFEFederationStatus.current_bundle_synced_at:type_name -> google.protobuf.Timestamp - 8, // 7: teleport.machineid.v1.SPIFFEFederationStatus.current_bundle_refresh_hint:type_name -> google.protobuf.Duration - 8, // [8:8] is the sub-list for method output_type - 8, // [8:8] is the sub-list for method input_type - 8, // [8:8] is the sub-list for extension type_name - 8, // [8:8] is the sub-list for extension extendee - 0, // [0:8] is the sub-list for field type_name + 7, // 7: teleport.machineid.v1.SPIFFEFederationStatus.next_sync_at:type_name -> google.protobuf.Timestamp + 3, // 8: teleport.machineid.v1.SPIFFEFederationStatus.current_bundle_synced_from:type_name -> teleport.machineid.v1.SPIFFEFederationBundleSource + 9, // [9:9] is the sub-list for method output_type + 9, // [9:9] is the sub-list for method input_type + 9, // [9:9] is the sub-list for extension type_name + 9, // [9:9] is the sub-list for extension extendee + 0, // [0:9] is the sub-list for field type_name } func init() { file_teleport_machineid_v1_federation_proto_init() } diff --git a/api/proto/teleport/machineid/v1/federation.proto b/api/proto/teleport/machineid/v1/federation.proto index f80eda451bec..8d23553a0c54 100644 --- a/api/proto/teleport/machineid/v1/federation.proto +++ b/api/proto/teleport/machineid/v1/federation.proto @@ -16,7 +16,6 @@ syntax = "proto3"; package teleport.machineid.v1; -import "google/protobuf/duration.proto"; import "google/protobuf/timestamp.proto"; import "teleport/header/v1/metadata.proto"; @@ -72,11 +71,22 @@ message SPIFFEFederationSpec { // FederationStatus is the status of a trust domain federation. message SPIFFEFederationStatus { + reserved 3; + reserved "current_bundle_refresh_hint"; + // The most recently fetched bundle from the federated trust domain. string current_bundle = 1; // The time that the most recently fetched bundle was obtained. google.protobuf.Timestamp current_bundle_synced_at = 2; - // The duration that the current bundle suggests the next bundle should be - // refresh after. - google.protobuf.Duration current_bundle_refresh_hint = 3; + // The time that this SPIFFE federation should be synced again. This is + // usually determined by the refresh hint provided within the current bundle + // but this can be overridden by the server where the provided refresh hint + // is not appropriate. + // + // A value of zero indicates that an automatic sync is not scheduled (e.g. + // because the bundle source is static). + google.protobuf.Timestamp next_sync_at = 4; + // The SPIFFEFederationBundleSource that was used for the currently synced + // bundle. This allows the bundle to be resynced if the source changes. + SPIFFEFederationBundleSource current_bundle_synced_from = 5; } diff --git a/lib/auth/machineid/machineidv1/spiffe_federation_syncer.go b/lib/auth/machineid/machineidv1/spiffe_federation_syncer.go new file mode 100644 index 000000000000..160aea56b4e0 --- /dev/null +++ b/lib/auth/machineid/machineidv1/spiffe_federation_syncer.go @@ -0,0 +1,645 @@ +/* + * Teleport + * Copyright (C) 2024 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package machineidv1 + +import ( + "context" + "log/slog" + "sync" + "time" + + "github.com/gravitational/trace" + "github.com/jonboulle/clockwork" + "github.com/spiffe/go-spiffe/v2/bundle/spiffebundle" + "github.com/spiffe/go-spiffe/v2/federation" + "github.com/spiffe/go-spiffe/v2/spiffeid" + "go.opentelemetry.io/otel" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" + + machineidv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/machineid/v1" + "github.com/gravitational/teleport/api/observability/tracing" + "github.com/gravitational/teleport/api/types" + "github.com/gravitational/teleport/api/utils/retryutils" + "github.com/gravitational/teleport/lib/backend" +) + +var tracer = otel.Tracer("github.com/gravitational/teleport/lib/auth/machineid/v1") + +type spiffeFederationStore interface { + ListSPIFFEFederations(ctx context.Context, limit int, token string) ([]*machineidv1.SPIFFEFederation, string, error) + GetSPIFFEFederation(ctx context.Context, name string) (*machineidv1.SPIFFEFederation, error) + UpdateSPIFFEFederation(ctx context.Context, federation *machineidv1.SPIFFEFederation) (*machineidv1.SPIFFEFederation, error) +} + +type eventsWatcher interface { + NewWatcher(ctx context.Context, watch types.Watch) (types.Watcher, error) +} + +func listAllTrustDomains( + ctx context.Context, store spiffeFederationStore, +) ([]*machineidv1.SPIFFEFederation, error) { + var trustDomains []*machineidv1.SPIFFEFederation + var token string + for { + tds, nextToken, err := store.ListSPIFFEFederations(ctx, 100, token) + if err != nil { + return nil, trace.Wrap(err, "failed to list trust domains") + } + trustDomains = append(trustDomains, tds...) + if nextToken == "" { + break + } + token = nextToken + } + return trustDomains, nil +} + +// SPIFFEFederationSyncerConfig is the configuration for the SPIFFE federation +// syncer. +type SPIFFEFederationSyncerConfig struct { + // Backend should be a backend.Backend which can be used for obtaining the + // lock required to run the syncer. + Backend backend.Backend + // Store is where the SPIFFEFederation resources can be fetched and updated. + Store spiffeFederationStore + // EventsWatcher is used to watch for changes to SPIFFEFederations. + EventsWatcher eventsWatcher + // Logger is the logger that the syncer will use. + Logger *slog.Logger + // Clock is the clock that the syncer will use. + Clock clockwork.Clock + + // MinSyncInterval is the minimum interval between syncs. If an upstream + // trust domain specifies a refresh hint that is less than this value, this + // value will be used instead. This allows us to prevent a poorly configured + // upstream trust domain from causing excessive load on the local cluster. + MinSyncInterval time.Duration + // MaxSyncInterval is the maximum interval between syncs. If an upstream + // trust domain specifies a refresh hint that is greater than this value, + // this value will be used instead. This allows us to prevent a poorly + // configured upstream trust domain from causing excessive staleness in the + // local cluster. + MaxSyncInterval time.Duration + // DefaultSyncInterval is the interval between syncs that will be used if an + // upstream trust domain does not specify a refresh hint. + DefaultSyncInterval time.Duration + + // SyncTimeout is the maximum time that a sync operation is allowed to take. + // If a sync operation takes longer than this value, it will be aborted and + // retried. + SyncTimeout time.Duration + + // SPIFFEFetchOptions are the options that will be used when fetching a + // trust bundle from a remote cluster. These options will be passed to the + // spiffebundle.FetchBundle function. This is usually used during testing + // to override the Web PKI CAs. + SPIFFEFetchOptions []federation.FetchOption +} + +// CheckAndSetDefaults checks the configuration and sets defaults where +// necessary. +func (c *SPIFFEFederationSyncerConfig) CheckAndSetDefaults() error { + switch { + case c.Backend == nil: + return trace.BadParameter("backend: must be non-nil") + case c.Store == nil: + return trace.BadParameter("store: must be non-nil") + case c.Logger == nil: + return trace.BadParameter("logger: must be non-nil") + case c.Clock == nil: + return trace.BadParameter("clock: must be non-nil") + } + if c.MinSyncInterval == 0 { + c.MinSyncInterval = minRefreshInterval + } + if c.MaxSyncInterval == 0 { + c.MaxSyncInterval = maxRefreshInterval + } + if c.DefaultSyncInterval == 0 { + c.DefaultSyncInterval = defaultRefreshInterval + } + if c.SyncTimeout == 0 { + c.SyncTimeout = defaultSyncTimeout + } + return nil +} + +// SPIFFEFederationSyncer is a syncer that manages the trust bundles of +// federated clusters. It runs on a single elected auth server. +type SPIFFEFederationSyncer struct { + cfg SPIFFEFederationSyncerConfig +} + +const ( + minRefreshInterval = time.Minute * 1 + maxRefreshInterval = time.Hour * 24 + // SPIFFE Federation (4.1): + // > If not set, a reasonably low default value should apply - five minutes + // > is recommended + defaultRefreshInterval = time.Minute * 5 + defaultSyncTimeout = time.Second * 30 +) + +// NewSPIFFEFederationSyncer creates a new SPIFFEFederationSyncer. +func NewSPIFFEFederationSyncer(cfg SPIFFEFederationSyncerConfig) (*SPIFFEFederationSyncer, error) { + if err := cfg.CheckAndSetDefaults(); err != nil { + return nil, trace.Wrap(err, "validating SPIFFE federation syncer config") + } + return &SPIFFEFederationSyncer{ + cfg: cfg, + }, nil +} + +func (s *SPIFFEFederationSyncer) Run(ctx context.Context) error { + // Loop to retry if acquiring lock fails, with some backoff to avoid pinning + // the CPU. + waitWithJitter := retryutils.NewSeventhJitter()(time.Second * 10) + for { + err := backend.RunWhileLocked(ctx, backend.RunWhileLockedConfig{ + LockConfiguration: backend.LockConfiguration{ + Backend: s.cfg.Backend, + LockName: "spiffe_federation_syncer", + TTL: time.Minute, + // It doesn't matter too much if the syncer isn't running for + // a short period of time so we can take a relaxed approach to + // retrying to grab the lock. + RetryInterval: time.Second * 30, + }, + }, s.syncTrustDomains) + if err != nil { + s.cfg.Logger.ErrorContext( + ctx, + "SPIFFEFederation syncer encountered a fatal error, it will restart after backoff", + "error", err, + "restart_after", waitWithJitter, + ) + } + select { + case <-ctx.Done(): + return nil + case <-time.After(waitWithJitter): + } + } +} + +type trustDomainSyncState struct { + // putEventsCh is a channel for passing PUT events to a specific + // SPIFFEFederations syncer. + putEventsCh chan types.Event + // stopCh is a channel for signaling a specific SPIFFEFederations + // syncer to stop syncing. This is closed when the watcher detects that the + // resource has been deleted. + stopCh chan struct{} +} + +// syncTrustDomains is the core loop of the syncer that runs on a single auth +// server. +// +// Its goal is to sync the contents of trust bundles from remote clusters to +// the local cluster. It does this by creating a goroutine that manages each +// federated cluster. +func (s *SPIFFEFederationSyncer) syncTrustDomains(ctx context.Context) error { + s.cfg.Logger.InfoContext( + ctx, + "Obtained lock, SPIFFEFederation syncer is starting", + ) + defer func() { + s.cfg.Logger.InfoContext( + ctx, "SPIFFEFederation syncer has stopped", + ) + }() + + // This wg will track all active syncers. We'll wait here until we're done. + wg := &sync.WaitGroup{} + defer func() { + wg.Wait() + }() + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Set up state management that will hold a list of all active trust domain + // syncers. + states := map[string]trustDomainSyncState{} + mu := &sync.Mutex{} + startSyncingFederation := func(trustDomain string) { + mu.Lock() + defer mu.Unlock() + + // Don't start if we're already syncing this trust domain. + _, ok := states[trustDomain] + if ok { + return + } + + eventsCh := make(chan types.Event, 1) + stopCh := make(chan struct{}) + states[trustDomain] = trustDomainSyncState{ + putEventsCh: eventsCh, + stopCh: stopCh, + } + + wg.Add(1) + go func() { + defer func() { + mu.Lock() + delete(states, trustDomain) + mu.Unlock() + wg.Done() + }() + s.syncTrustDomainLoop(ctx, trustDomain, eventsCh, stopCh) + }() + } + + // Establish our watcher, we'll use this to react instantly to changes to SPIFFEFederations. + w, err := s.cfg.EventsWatcher.NewWatcher(ctx, types.Watch{ + Kinds: []types.WatchKind{{ + Kind: types.KindSPIFFEFederation, + }}, + }) + if err != nil { + return trace.Wrap(err, "failed to create watcher") + } + defer func() { + err := w.Close() + if err != nil { + s.cfg.Logger.ErrorContext( + ctx, "Failed to close watcher", "error", err, + ) + } + }() + + // Wait for initial "Init" event to indicate we're now receiving events. + select { + case <-w.Done(): + if err := w.Error(); err != nil { + return trace.Wrap(err, "watcher failed") + } + return trace.BadParameter("watcher closed unexpectedly") + case evt := <-w.Events(): + if evt.Type == types.OpInit { + break + } + return trace.BadParameter("expected init event, got %v", evt.Type) + case <-ctx.Done(): + return nil + } + + // Fetch an initial list of all federations and start syncers for them. + trustDomains, err := listAllTrustDomains(ctx, s.cfg.Store) + if err != nil { + return trace.Wrap(err, "initially listing trust domains") + } + for _, td := range trustDomains { + startSyncingFederation(td.GetMetadata().GetName()) + } + + // Now we can start reacting to events, we'll want to start/stop syncers as + // needed. We'll want to start a syncer for any new trust domain, and + // propagate events to existing syncers. + for { + select { + case evt := <-w.Events(): + s.cfg.Logger.DebugContext( + ctx, + "Received event from SPIFFEFederation watcher", + "evt_type", evt.Type, + ) + switch evt.Type { + case types.OpPut: + mu.Lock() + existingState, ok := states[evt.Resource.GetName()] + mu.Unlock() + // If it already exists, we can just pass the event along. If + // there's already a sync queued due to an event, we don't need to + // queue another since it fetches the latest resource anyway. + if ok { + select { + case existingState.putEventsCh <- evt: + default: + s.cfg.Logger.DebugContext( + ctx, + "Sync already queued for trust domain, ignoring event", + ) + } + continue + } + // If it doesn't exist, we should kick off a goroutine to start + // managing it. That routine will sync automatically on first + // run so we don't need to pass the event along. + startSyncingFederation(evt.Resource.GetName()) + case types.OpDelete: + mu.Lock() + existingState, ok := states[evt.Resource.GetName()] + // If it exists, close the stopCh to tell it to exit and remove + // it from the states map. + if ok { + close(existingState.stopCh) + delete(states, evt.Resource.GetName()) + } + mu.Unlock() + default: + } + case <-w.Done(): + if err := w.Error(); err != nil { + return trace.Wrap(err, "watcher failed") + } + return trace.BadParameter("watcher closed unexpectedly") + case <-ctx.Done(): + return nil + } + } +} + +func (s *SPIFFEFederationSyncer) syncTrustDomainLoop( + ctx context.Context, + name string, + putEventsCh <-chan types.Event, + stopCh <-chan struct{}, +) { + log := s.cfg.Logger.With("trust_domain", name) + log.InfoContext(ctx, "Starting sync loop for trust domain") + defer func() { + log.InfoContext(ctx, "Stopped sync loop for trust domain") + }() + + retry, err := retryutils.NewLinear(retryutils.LinearConfig{ + First: time.Second, + Step: time.Second, + Max: time.Second * 10, + Clock: s.cfg.Clock, + Jitter: retryutils.NewSeventhJitter(), + }) + if err != nil { + log.ErrorContext( + ctx, + "Failed to create retry strategy, trust domain sync loop will not run", + "error", err, + ) + return + } + + var synced *machineidv1.SPIFFEFederation + var nextRetry <-chan time.Time + nextSync := s.cfg.Clock.NewTimer(time.Minute) + nextSync.Stop() + defer nextSync.Stop() + firstRun := make(chan struct{}, 1) + firstRun <- struct{}{} + for { + select { + case <-firstRun: + // On the first run, we should sync immediately. + log.DebugContext(ctx, "First run, trying sync immediately") + case <-nextSync.Chan(): + log.DebugContext(ctx, "Next sync time has passed, trying sync") + case <-nextRetry: + log.InfoContext(ctx, "Wait for backoff complete, retrying sync") + case evt := <-putEventsCh: + // If we've just synced, we can effectively expect an "echo" of our + // last update. We can ignore this event safely. + if synced != nil { + if evt.Resource.GetRevision() == synced.GetMetadata().GetRevision() { + continue + } + log.DebugContext( + ctx, + "Resource has been updated, trying to sync trust domain immediately", + ) + } + // Note, we explicitly don't use the resource within the event. + // Instead, we will fetch the latest upon starting the sync. This + // avoids completing multiple syncs if multiple changes are queued. + case <-stopCh: + log.DebugContext(ctx, "Stop signal received, stopping sync loop") + return + case <-ctx.Done(): + return + } + // Stop our sync timer, we'll only set it if we successfully sync. + nextSync.Stop() + + syncCtx, cancel := context.WithTimeout(ctx, s.cfg.SyncTimeout) + synced, err = s.syncTrustDomain(syncCtx, name) + cancel() + + if err != nil { + // If the resource has been deleted, there's no point retrying. + // We should stop syncing. + if trace.IsNotFound(err) { + log.ErrorContext( + ctx, + "Resource has been deleted, stopping sync loop for trust domain", + "error", err, + ) + return + } + retry.Inc() + log.ErrorContext( + ctx, + "Failed to sync trust domain, will retry after backoff", + "error", err, + "backoff", retry.Duration(), + ) + nextRetry = retry.After() + continue + } + retry.Reset() + nextRetry = nil + + // If we've successfully synced, set the timer up for our next sync. + if nextSyncAt := synced.GetStatus().GetNextSyncAt().AsTime(); !nextSyncAt.IsZero() { + timeUntil := nextSyncAt.Sub(s.cfg.Clock.Now()) + // Ensure the timer will tick /after/ the next sync time. + timeUntil = timeUntil + time.Second + nextSync.Reset(timeUntil) + log.InfoContext( + ctx, + "Waiting to sync again", + "next_sync_at", nextSyncAt, + ) + } + } +} + +func (s *SPIFFEFederationSyncer) shouldSyncTrustDomain( + ctx context.Context, + log *slog.Logger, + in *machineidv1.SPIFFEFederation, +) string { + if in.Status == nil { + log.DebugContext(ctx, "No status, will sync") + return "no_status" + } + if in.Status.CurrentBundle == "" { + log.DebugContext(ctx, "No status.current_bundle, will sync") + return "no_current_bundle" + } + if in.Status.CurrentBundleSyncedAt.AsTime().IsZero() { + log.DebugContext(ctx, "No status.current_bundle_synced_at, will sync") + return "no_current_bundle_synced_at" + } + // Check if we've passed the next sync time. + nextSyncAt := in.Status.NextSyncAt.AsTime() + now := s.cfg.Clock.Now() + if !nextSyncAt.IsZero() && now.After(nextSyncAt) { + log.DebugContext( + ctx, + "status.next_sync_at has passed, will sync", + "next_sync_at", nextSyncAt, + "now", now, + ) + return "next_sync_at_passed" + } + // Check to see if the configured bundle source has changed + if in.Status.CurrentBundleSyncedFrom != nil { + if !proto.Equal(in.Spec.BundleSource, in.Status.CurrentBundleSyncedFrom) { + log.DebugContext(ctx, "status.current_bundle_synced_from has changed, will sync") + return "bundle_source_changed" + } + } + + return "" +} + +func (s *SPIFFEFederationSyncer) syncTrustDomain( + ctx context.Context, name string, +) (out *machineidv1.SPIFFEFederation, err error) { + ctx, span := tracer.Start(ctx, "SPIFFEFederationSyncer/syncTrustDomain") + defer func() { + tracing.EndSpan(span, err) + }() + + current, err := s.cfg.Store.GetSPIFFEFederation(ctx, name) + if err != nil { + return nil, trace.Wrap(err, "failed to get SPIFFEFederation") + } + + log := s.cfg.Logger.With( + "current_revision", current.GetMetadata().GetRevision(), + "trust_domain", current.GetMetadata().GetName(), + ) + + td, err := spiffeid.TrustDomainFromString(current.GetMetadata().GetName()) + if err != nil { + return nil, trace.Wrap(err, "parsing metadata.name as trust domain name") + } + + // Determine - should we sync... + syncReason := s.shouldSyncTrustDomain(ctx, log, current) + if syncReason == "" { + log.DebugContext(ctx, "Skipping sync as is not required") + return current, nil + } + log.InfoContext(ctx, "Sync starting", "reason", syncReason) + + // Clone the persisted resource so we can compare to it. + out = proto.Clone(current).(*machineidv1.SPIFFEFederation) + + // Refresh... + if out.Status == nil { + out.Status = &machineidv1.SPIFFEFederationStatus{} + } + + var bundle *spiffebundle.Bundle + var nextSyncIn time.Duration + switch { + case current.GetSpec().GetBundleSource().GetHttpsWeb() != nil: + url := current.Spec.BundleSource.HttpsWeb.BundleEndpointUrl + log.DebugContext( + ctx, + "Fetching bundle using https_web profile", + "url", url, + ) + bundle, err = federation.FetchBundle(ctx, td, url, s.cfg.SPIFFEFetchOptions...) + if err != nil { + return nil, trace.Wrap(err, "fetching bundle using https_web profile") + } + + // Calculate the duration before we should next sync, applying any + // sensible upper and lower bounds. + nextSyncIn = s.cfg.DefaultSyncInterval + if refreshHint, ok := bundle.RefreshHint(); ok { + if refreshHint < s.cfg.MinSyncInterval { + log.DebugContext( + ctx, + "Refresh hint is less than MinSyncInterval, using MinSyncInterval", + "refresh_hint", refreshHint, + "min_sync_interval", s.cfg.MinSyncInterval, + ) + nextSyncIn = s.cfg.MinSyncInterval + } else if refreshHint > s.cfg.MaxSyncInterval { + log.DebugContext( + ctx, + "Refresh hint is greater than MaxSyncInterval, using MaxSyncInterval", + "refresh_hint", refreshHint, + "max_sync_interval", s.cfg.MaxSyncInterval, + ) + nextSyncIn = s.cfg.MaxSyncInterval + } else { + nextSyncIn = refreshHint + } + } + case current.GetSpec().GetBundleSource().GetStatic() != nil: + log.DebugContext( + ctx, "Fetching bundle using spec.bundle_source.static.bundle", + ) + bundle, err = spiffebundle.Parse( + td, []byte(current.Spec.BundleSource.Static.Bundle), + ) + if err != nil { + return nil, trace.Wrap( + err, "parsing bundle from static profile", + ) + } + default: + return nil, trace.BadParameter( + "spec.bundle_source: at least one of https_web or static must be set", + ) + } + + bundleBytes, err := bundle.Marshal() + if err != nil { + return nil, trace.Wrap(err, "marshaling bundle") + } + out.Status.CurrentBundle = string(bundleBytes) + out.Status.CurrentBundleSyncedFrom = current.Spec.BundleSource + + syncedAt := s.cfg.Clock.Now().UTC() + out.Status.CurrentBundleSyncedAt = timestamppb.New(syncedAt) + // For certain sources, we need to set a next sync time. + if nextSyncIn > 0 { + out.Status.NextSyncAt = timestamppb.New(syncedAt.Add(nextSyncIn)) + } + + out, err = s.cfg.Store.UpdateSPIFFEFederation(ctx, out) + if err != nil { + return nil, trace.Wrap( + err, "persisting updated SPIFFEFederation", + ) + } + log.InfoContext( + ctx, + "Sync succeeded, new SPIFFEFederation persisted", + "new_revision", out.Metadata.Revision, + ) + + return out, nil +} diff --git a/lib/auth/machineid/machineidv1/spiffe_federation_syncer_test.go b/lib/auth/machineid/machineidv1/spiffe_federation_syncer_test.go new file mode 100644 index 000000000000..1a1ecc5a3af5 --- /dev/null +++ b/lib/auth/machineid/machineidv1/spiffe_federation_syncer_test.go @@ -0,0 +1,314 @@ +/* + * Teleport + * Copyright (C) 2024 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package machineidv1 + +import ( + "context" + "crypto/x509" + "encoding/pem" + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/jonboulle/clockwork" + "github.com/spiffe/go-spiffe/v2/bundle/spiffebundle" + "github.com/spiffe/go-spiffe/v2/federation" + "github.com/spiffe/go-spiffe/v2/spiffeid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/testing/protocmp" + "google.golang.org/protobuf/types/known/timestamppb" + + headerv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/header/v1" + machineidv1pb "github.com/gravitational/teleport/api/gen/proto/go/teleport/machineid/v1" + "github.com/gravitational/teleport/api/types" + "github.com/gravitational/teleport/lib/backend/memory" + "github.com/gravitational/teleport/lib/fixtures" + "github.com/gravitational/teleport/lib/services/local" + "github.com/gravitational/teleport/lib/utils" +) + +func makeTrustDomain(t *testing.T, name string) (spiffeid.TrustDomain, *spiffebundle.Bundle) { + td := spiffeid.RequireTrustDomainFromString(name) + bundle := spiffebundle.New(td) + b, _ := pem.Decode([]byte(fixtures.TLSCACertPEM)) + cert, err := x509.ParseCertificate(b.Bytes) + require.NoError(t, err) + bundle.AddX509Authority(cert) + bundle.SetRefreshHint(time.Minute * 12) + return td, bundle +} + +func TestSPIFFEFederationSyncer(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + logger := utils.NewSlogLoggerForTests() + clock := clockwork.NewRealClock() + backend, err := memory.New(memory.Config{}) + require.NoError(t, err) + + store, err := local.NewSPIFFEFederationService(backend) + require.NoError(t, err) + eventsSvc := local.NewEventsService(backend) + + td1, bundle1 := makeTrustDomain(t, "1.example.com") + marshaledBundle1, err := bundle1.Marshal() + require.NoError(t, err) + td2, bundle2 := makeTrustDomain(t, "2.example.com") + marshaledBundle2, err := bundle1.Marshal() + require.NoError(t, err) + + // Implement a fake SPIFFE Federation bundle endpoint + testSrv1 := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + h, err := federation.NewHandler(td1, bundle1) + if !assert.NoError(t, err) { + return + } + h.ServeHTTP(w, r) + })) + testSrv2 := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + h, err := federation.NewHandler(td2, bundle2) + if !assert.NoError(t, err) { + return + } + h.ServeHTTP(w, r) + })) + + caPool := x509.NewCertPool() + caPool.AddCert(testSrv1.Certificate()) + caPool.AddCert(testSrv2.Certificate()) + + // Create one trust domain prior to startng the syncer + created1, err := store.CreateSPIFFEFederation(ctx, &machineidv1pb.SPIFFEFederation{ + Kind: types.KindSPIFFEFederation, + Version: types.V1, + Metadata: &headerv1.Metadata{ + Name: "1.example.com", + }, + Spec: &machineidv1pb.SPIFFEFederationSpec{ + BundleSource: &machineidv1pb.SPIFFEFederationBundleSource{ + HttpsWeb: &machineidv1pb.SPIFFEFederationBundleSourceHTTPSWeb{ + BundleEndpointUrl: testSrv1.URL, + }, + }, + }, + }) + require.NoError(t, err) + + syncer, err := NewSPIFFEFederationSyncer(SPIFFEFederationSyncerConfig{ + Backend: backend, + Store: store, + EventsWatcher: eventsSvc, + Clock: clock, + Logger: logger, + SPIFFEFetchOptions: []federation.FetchOption{ + federation.WithWebPKIRoots(caPool), + }, + }) + require.NoError(t, err) + + errCh := make(chan error, 1) + go func() { + err := syncer.Run(ctx) + assert.NoError(t, err) + errCh <- err + }() + + // Wait for the initially created SPIFFEFederation to be synced + require.EventuallyWithT(t, func(collect *assert.CollectT) { + got, err := store.GetSPIFFEFederation(ctx, created1.Metadata.Name) + if !assert.NoError(t, err) { + return + } + // Check that some update as occurred (as indicated by the revision) + if !assert.NotEqual(t, got.Metadata.Revision, created1.Metadata.Revision) { + return + } + // Check that the expected status fields have been set... + if !assert.NotNil(t, got.Status) { + return + } + assert.Equal(t, string(marshaledBundle1), got.Status.CurrentBundle) + }, time.Second*5, time.Millisecond*100) + + // Create a second SPIFFEFederation and wait for it to be synced + created2, err := store.CreateSPIFFEFederation(ctx, &machineidv1pb.SPIFFEFederation{ + Kind: types.KindSPIFFEFederation, + Version: types.V1, + Metadata: &headerv1.Metadata{ + Name: "2.example.com", + }, + Spec: &machineidv1pb.SPIFFEFederationSpec{ + BundleSource: &machineidv1pb.SPIFFEFederationBundleSource{ + HttpsWeb: &machineidv1pb.SPIFFEFederationBundleSourceHTTPSWeb{ + BundleEndpointUrl: testSrv2.URL, + }, + }, + }, + }) + require.NoError(t, err) + require.EventuallyWithT(t, func(collect *assert.CollectT) { + got, err := store.GetSPIFFEFederation(ctx, created2.Metadata.Name) + if !assert.NoError(t, err) { + return + } + // Check that some update as occurred (as indicated by the revision) + if !assert.NotEqual(t, got.Metadata.Revision, created2.Metadata.Revision) { + return + } + // Check that the expected status fields have been set... + if !assert.NotNil(t, got.Status) { + return + } + assert.Equal(t, string(marshaledBundle2), got.Status.CurrentBundle) + }, time.Second*5, time.Millisecond*100) + + cancel() + select { + case err := <-errCh: + assert.NoError(t, err) + case <-time.After(time.Second * 5): + t.Fatalf("timeout waiting for syncer to stop") + } +} + +func TestSPIFFEFederationSyncer_syncFederation(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + logger := utils.NewSlogLoggerForTests() + clock := clockwork.NewFakeClock() + backend, err := memory.New(memory.Config{}) + require.NoError(t, err) + + store, err := local.NewSPIFFEFederationService(backend) + require.NoError(t, err) + eventsSvc := local.NewEventsService(backend) + + td, bundle := makeTrustDomain(t, "example.com") + marshaledBundle, err := bundle.Marshal() + require.NoError(t, err) + + // Implement a fake SPIFFE Federation bundle endpoint + testSrv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + h, err := federation.NewHandler(td, bundle) + if !assert.NoError(t, err) { + return + } + h.ServeHTTP(w, r) + })) + caPool := x509.NewCertPool() + caPool.AddCert(testSrv.Certificate()) + + syncer, err := NewSPIFFEFederationSyncer(SPIFFEFederationSyncerConfig{ + Backend: backend, + Store: store, + EventsWatcher: eventsSvc, + Clock: clock, + Logger: logger, + SPIFFEFetchOptions: []federation.FetchOption{ + federation.WithWebPKIRoots(caPool), + }, + }) + require.NoError(t, err) + + t.Run("https-web", func(t *testing.T) { + in := &machineidv1pb.SPIFFEFederation{ + Kind: types.KindSPIFFEFederation, + Version: types.V1, + Metadata: &headerv1.Metadata{ + Name: "example.com", + }, + Spec: &machineidv1pb.SPIFFEFederationSpec{ + BundleSource: &machineidv1pb.SPIFFEFederationBundleSource{ + HttpsWeb: &machineidv1pb.SPIFFEFederationBundleSourceHTTPSWeb{ + BundleEndpointUrl: testSrv.URL, + }, + }, + }, + } + created, err := store.CreateSPIFFEFederation(ctx, in) + require.NoError(t, err) + + firstSync, err := syncer.syncTrustDomain(ctx, "example.com") + require.NoError(t, err) + got, err := store.GetSPIFFEFederation(ctx, "example.com") + require.NoError(t, err) + // Require that the persisted resource equals the resource output by syncTrustDomain + require.Empty(t, cmp.Diff(got, firstSync, protocmp.Transform())) + // Check that some update as occurred (as indicated by the revision) + require.NotEqual(t, created.Metadata.Revision, firstSync.Metadata.Revision) + // Check that the expected status fields have been set... + require.NotNil(t, firstSync.Status) + wantStatus := &machineidv1pb.SPIFFEFederationStatus{ + CurrentBundleSyncedAt: timestamppb.New(clock.Now().UTC()), + CurrentBundleSyncedFrom: proto.Clone(created).(*machineidv1pb.SPIFFEFederation).Spec.BundleSource, + NextSyncAt: timestamppb.New(clock.Now().UTC().Add(time.Minute * 12)), + CurrentBundle: string(marshaledBundle), + } + require.Empty(t, cmp.Diff(firstSync.Status, wantStatus, protocmp.Transform())) + + // Check that syncing again does nothing. + secondSync, err := syncer.syncTrustDomain(ctx, "example.com") + require.NoError(t, err) + require.Empty(t, cmp.Diff(secondSync, firstSync, protocmp.Transform())) + + // Advance the clock and check that the sync is triggered again. + clock.Advance(time.Minute * 15) + thirdSync, err := syncer.syncTrustDomain(ctx, "example.com") + require.NoError(t, err) + require.NotEqual(t, firstSync.Metadata.Revision, thirdSync.Metadata.Revision) + wantStatus = &machineidv1pb.SPIFFEFederationStatus{ + CurrentBundleSyncedAt: timestamppb.New(clock.Now().UTC()), + CurrentBundleSyncedFrom: proto.Clone(created).(*machineidv1pb.SPIFFEFederation).Spec.BundleSource, + NextSyncAt: timestamppb.New(clock.Now().UTC().Add(time.Minute * 12)), + CurrentBundle: string(marshaledBundle), + } + require.Empty(t, cmp.Diff(thirdSync.Status, wantStatus, protocmp.Transform())) + + // Check that syncing again does nothing. + fourthSync, err := syncer.syncTrustDomain(ctx, "example.com") + require.NoError(t, err) + require.Empty(t, cmp.Diff(fourthSync, thirdSync, protocmp.Transform())) + + // Check that modifying the bundle source triggers a sync. + fourthSync.Spec.BundleSource.HttpsWeb.BundleEndpointUrl = fmt.Sprintf("%s/modified", testSrv.URL) + updated, err := store.UpdateSPIFFEFederation(ctx, fourthSync) + require.NoError(t, err) + fifthSync, err := syncer.syncTrustDomain(ctx, "example.com") + require.NoError(t, err) + require.NotEqual(t, updated.Metadata.Revision, fifthSync.Metadata.Revision) + wantStatus = &machineidv1pb.SPIFFEFederationStatus{ + CurrentBundleSyncedAt: timestamppb.New(clock.Now().UTC()), + CurrentBundleSyncedFrom: proto.Clone(updated).(*machineidv1pb.SPIFFEFederation).Spec.BundleSource, + NextSyncAt: timestamppb.New(clock.Now().UTC().Add(time.Minute * 12)), + CurrentBundle: string(marshaledBundle), + } + require.Empty(t, cmp.Diff(fifthSync.Status, wantStatus, protocmp.Transform())) + }) +} diff --git a/lib/service/service.go b/lib/service/service.go index 9a2372a28577..86c728de539d 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -91,6 +91,7 @@ import ( "github.com/gravitational/teleport/lib/auth/accesspoint" "github.com/gravitational/teleport/lib/auth/authclient" "github.com/gravitational/teleport/lib/auth/keygen" + "github.com/gravitational/teleport/lib/auth/machineid/machineidv1" "github.com/gravitational/teleport/lib/auth/native" "github.com/gravitational/teleport/lib/auth/state" "github.com/gravitational/teleport/lib/auth/storage" @@ -2505,6 +2506,22 @@ func (process *TeleportProcess) initAuthService() error { } process.RegisterFunc("auth.heartbeat", heartbeat.Run) + spiffeFedSyncer, err := machineidv1.NewSPIFFEFederationSyncer(machineidv1.SPIFFEFederationSyncerConfig{ + Backend: b, + Store: authServer.Services.SPIFFEFederations, + EventsWatcher: authServer.Services.Events, + Clock: process.Clock, + Logger: logger.With( + teleport.ComponentKey, teleport.Component(teleport.ComponentAuth, "spiffe_federation_syncer"), + ), + }) + if err != nil { + return trace.Wrap(err, "initializing SPIFFEFederation Syncer") + } + process.RegisterFunc("auth.spiffe_federation_syncer", func() error { + return trace.Wrap(spiffeFedSyncer.Run(process.GracefulExitContext()), "running SPIFFEFederation Syncer") + }) + process.RegisterFunc("auth.server_info", func() error { return trace.Wrap(authServer.ReconcileServerInfos(process.GracefulExitContext())) }) diff --git a/lib/services/local/spiffe_federations.go b/lib/services/local/spiffe_federations.go index 67910e22ac83..611c3e96ab5f 100644 --- a/lib/services/local/spiffe_federations.go +++ b/lib/services/local/spiffe_federations.go @@ -110,3 +110,14 @@ func (b *SPIFFEFederationService) UpsertSPIFFEFederation( upserted, err := b.service.UpsertResource(ctx, federation) return upserted, trace.Wrap(err) } + +// UpdateSPIFFEFederation updates a specific SPIFFEFederation. +func (b *SPIFFEFederationService) UpdateSPIFFEFederation( + ctx context.Context, federation *machineidv1.SPIFFEFederation, +) (*machineidv1.SPIFFEFederation, error) { + if err := services.ValidateSPIFFEFederation(federation); err != nil { + return nil, trace.Wrap(err) + } + updated, err := b.service.ConditionalUpdateResource(ctx, federation) + return updated, trace.Wrap(err) +} diff --git a/lib/services/local/spiffe_federations_test.go b/lib/services/local/spiffe_federations_test.go index aac270fc631b..edc9a1ac3c1f 100644 --- a/lib/services/local/spiffe_federations_test.go +++ b/lib/services/local/spiffe_federations_test.go @@ -276,3 +276,54 @@ func TestSPIFFEFederationService_DeleteAllSPIFFEFederation(t *testing.T) { require.NoError(t, err) require.Empty(t, page) } + +func TestSPIFFEFederationService_UpdateSPIFFEFederation(t *testing.T) { + ctx, service := setupSPIFFEFederationTest(t) + + t.Run("ok", func(t *testing.T) { + // Create resource for us to Update since we can't update a non-existent resource. + created, err := service.CreateSPIFFEFederation( + ctx, + newSPIFFEFederation("example.com"), + ) + require.NoError(t, err) + want := proto.Clone(created).(*machineidv1.SPIFFEFederation) + want.Spec.BundleSource.HttpsWeb.BundleEndpointUrl = "https://example.com/new-bundle.json" + + updated, err := service.UpdateSPIFFEFederation( + ctx, + // Clone to avoid Marshaling modifying want + proto.Clone(want).(*machineidv1.SPIFFEFederation), + ) + require.NoError(t, err) + require.NotEqual(t, created.Metadata.Revision, updated.Metadata.Revision) + require.Empty(t, cmp.Diff( + want, + updated, + protocmp.Transform(), + protocmp.IgnoreFields(&headerv1.Metadata{}, "revision"), + )) + + got, err := service.GetSPIFFEFederation(ctx, "example.com") + require.NoError(t, err) + require.Empty(t, cmp.Diff( + want, + got, + protocmp.Transform(), + protocmp.IgnoreFields(&headerv1.Metadata{}, "revision"), + )) + require.Equal(t, updated.Metadata.Revision, got.Metadata.Revision) + }) + t.Run("validation occurs", func(t *testing.T) { + out, err := service.UpdateSPIFFEFederation(ctx, newSPIFFEFederation("spiffe://i-will-fail")) + require.ErrorContains(t, err, "metadata.name: must not include the spiffe:// prefix") + require.Nil(t, out) + }) + t.Run("no create", func(t *testing.T) { + _, err := service.UpdateSPIFFEFederation( + ctx, + newSPIFFEFederation("non-existing.com"), + ) + require.Error(t, err) + }) +} diff --git a/lib/services/resource.go b/lib/services/resource.go index bb384b582f9d..4b2f52adb9ec 100644 --- a/lib/services/resource.go +++ b/lib/services/resource.go @@ -241,6 +241,8 @@ func ParseShortcut(in string) (string, error) { return types.KindPlugin, nil case types.KindAccessGraphSettings, "ags": return types.KindAccessGraphSettings, nil + case types.KindSPIFFEFederation, types.KindSPIFFEFederation + "s": + return types.KindSPIFFEFederation, nil } return "", trace.BadParameter("unsupported resource: %q - resources should be expressed as 'type/name', for example 'connector/github'", in) } diff --git a/lib/services/spiffe_federations.go b/lib/services/spiffe_federations.go index 58d24f4662cb..266c7b759fa2 100644 --- a/lib/services/spiffe_federations.go +++ b/lib/services/spiffe_federations.go @@ -48,6 +48,11 @@ type SPIFFEFederations interface { ) (*machineidv1.SPIFFEFederation, error) // DeleteSPIFFEFederation deletes a SPIFFE Federation by name. DeleteSPIFFEFederation(ctx context.Context, name string) error + // UpdateSPIFFEFederation updates a SPIFFE Federation. It will not act if the resource is not found + // or where the revision does not match. + UpdateSPIFFEFederation( + ctx context.Context, spiffeFederation *machineidv1.SPIFFEFederation, + ) (*machineidv1.SPIFFEFederation, error) } // MarshalSPIFFEFederation marshals the SPIFFEFederation object into a JSON byte @@ -124,5 +129,16 @@ func ValidateSPIFFEFederation(s *machineidv1.SPIFFEFederation) error { } } + // Ensure that all key status fields are set if any are set. This is a safeguard against weird inconsistent states + // where some fields are set and others are not. + currentBundleSet := s.Status.GetCurrentBundle() != "" + currentBundledSyncedAtSet := s.Status.GetCurrentBundleSyncedAt() != nil + currentBundleSyncedFromSet := s.Status.GetCurrentBundleSyncedFrom() != nil + anyStatusFieldSet := currentBundleSet || currentBundledSyncedAtSet || currentBundleSyncedFromSet + allStatusFieldsSet := currentBundleSet && currentBundledSyncedAtSet && currentBundleSyncedFromSet + if anyStatusFieldSet && !allStatusFieldsSet { + return trace.BadParameter("status: all of ['current_bundle', 'current_bundle_synced_at', 'current_bundle_synced_from'] must be set if any are set") + } + return nil }