diff --git a/api/gen/proto/go/teleport/machineid/v1/federation.pb.go b/api/gen/proto/go/teleport/machineid/v1/federation.pb.go
index c11aeea70433..eb5679a68115 100644
--- a/api/gen/proto/go/teleport/machineid/v1/federation.pb.go
+++ b/api/gen/proto/go/teleport/machineid/v1/federation.pb.go
@@ -24,7 +24,6 @@ import (
v1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/header/v1"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
- durationpb "google.golang.org/protobuf/types/known/durationpb"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
reflect "reflect"
sync "sync"
@@ -352,9 +351,17 @@ type SPIFFEFederationStatus struct {
CurrentBundle string `protobuf:"bytes,1,opt,name=current_bundle,json=currentBundle,proto3" json:"current_bundle,omitempty"`
// The time that the most recently fetched bundle was obtained.
CurrentBundleSyncedAt *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=current_bundle_synced_at,json=currentBundleSyncedAt,proto3" json:"current_bundle_synced_at,omitempty"`
- // The duration that the current bundle suggests the next bundle should be
- // refresh after.
- CurrentBundleRefreshHint *durationpb.Duration `protobuf:"bytes,3,opt,name=current_bundle_refresh_hint,json=currentBundleRefreshHint,proto3" json:"current_bundle_refresh_hint,omitempty"`
+ // The time that this SPIFFE federation should be synced again. This is
+ // usually determined by the refresh hint provided within the current bundle
+ // but this can be overridden by the server where the provided refresh hint
+ // is not appropriate.
+ //
+ // A value of zero indicates that an automatic sync is not scheduled (e.g.
+ // because the bundle source is static).
+ NextSyncAt *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=next_sync_at,json=nextSyncAt,proto3" json:"next_sync_at,omitempty"`
+ // The SPIFFEFederationBundleSource that was used for the currently synced
+ // bundle. This allows the bundle to be resynced if the source changes.
+ CurrentBundleSyncedFrom *SPIFFEFederationBundleSource `protobuf:"bytes,5,opt,name=current_bundle_synced_from,json=currentBundleSyncedFrom,proto3" json:"current_bundle_synced_from,omitempty"`
}
func (x *SPIFFEFederationStatus) Reset() {
@@ -403,9 +410,16 @@ func (x *SPIFFEFederationStatus) GetCurrentBundleSyncedAt() *timestamppb.Timesta
return nil
}
-func (x *SPIFFEFederationStatus) GetCurrentBundleRefreshHint() *durationpb.Duration {
+func (x *SPIFFEFederationStatus) GetNextSyncAt() *timestamppb.Timestamp {
if x != nil {
- return x.CurrentBundleRefreshHint
+ return x.NextSyncAt
+ }
+ return nil
+}
+
+func (x *SPIFFEFederationStatus) GetCurrentBundleSyncedFrom() *SPIFFEFederationBundleSource {
+ if x != nil {
+ return x.CurrentBundleSyncedFrom
}
return nil
}
@@ -417,8 +431,6 @@ var file_teleport_machineid_v1_federation_proto_rawDesc = []byte{
0x6e, 0x65, 0x69, 0x64, 0x2f, 0x76, 0x31, 0x2f, 0x66, 0x65, 0x64, 0x65, 0x72, 0x61, 0x74, 0x69,
0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x15, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f,
0x72, 0x74, 0x2e, 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x69, 0x64, 0x2e, 0x76, 0x31, 0x1a,
- 0x1e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66,
- 0x2f, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a,
0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66,
0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x1a, 0x21, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x68, 0x65, 0x61, 0x64, 0x65,
@@ -470,7 +482,7 @@ var file_teleport_machineid_v1_federation_proto_rawDesc = []byte{
0x74, 0x2e, 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x69, 0x64, 0x2e, 0x76, 0x31, 0x2e, 0x53,
0x50, 0x49, 0x46, 0x46, 0x45, 0x46, 0x65, 0x64, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x42,
0x75, 0x6e, 0x64, 0x6c, 0x65, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x52, 0x0c, 0x62, 0x75, 0x6e,
- 0x64, 0x6c, 0x65, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x22, 0xee, 0x01, 0x0a, 0x16, 0x53, 0x50,
+ 0x64, 0x6c, 0x65, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x22, 0xe7, 0x02, 0x0a, 0x16, 0x53, 0x50,
0x49, 0x46, 0x46, 0x45, 0x46, 0x65, 0x64, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74,
0x61, 0x74, 0x75, 0x73, 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f,
0x62, 0x75, 0x6e, 0x64, 0x6c, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x75,
@@ -480,18 +492,26 @@ var file_teleport_machineid_v1_federation_proto_rawDesc = []byte{
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x15, 0x63, 0x75, 0x72, 0x72, 0x65,
0x6e, 0x74, 0x42, 0x75, 0x6e, 0x64, 0x6c, 0x65, 0x53, 0x79, 0x6e, 0x63, 0x65, 0x64, 0x41, 0x74,
- 0x12, 0x58, 0x0a, 0x1b, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x62, 0x75, 0x6e, 0x64,
- 0x6c, 0x65, 0x5f, 0x72, 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, 0x5f, 0x68, 0x69, 0x6e, 0x74, 0x18,
- 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70,
- 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e,
- 0x52, 0x18, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x42, 0x75, 0x6e, 0x64, 0x6c, 0x65, 0x52,
- 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, 0x48, 0x69, 0x6e, 0x74, 0x42, 0x56, 0x5a, 0x54, 0x67, 0x69,
- 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x72, 0x61, 0x76, 0x69, 0x74, 0x61,
- 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2f,
- 0x61, 0x70, 0x69, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f,
- 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e,
- 0x65, 0x69, 0x64, 0x2f, 0x76, 0x31, 0x3b, 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x69, 0x64,
- 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x12, 0x3c, 0x0a, 0x0c, 0x6e, 0x65, 0x78, 0x74, 0x5f, 0x73, 0x79, 0x6e, 0x63, 0x5f, 0x61, 0x74,
+ 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e,
+ 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61,
+ 0x6d, 0x70, 0x52, 0x0a, 0x6e, 0x65, 0x78, 0x74, 0x53, 0x79, 0x6e, 0x63, 0x41, 0x74, 0x12, 0x70,
+ 0x0a, 0x1a, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x62, 0x75, 0x6e, 0x64, 0x6c, 0x65,
+ 0x5f, 0x73, 0x79, 0x6e, 0x63, 0x65, 0x64, 0x5f, 0x66, 0x72, 0x6f, 0x6d, 0x18, 0x05, 0x20, 0x01,
+ 0x28, 0x0b, 0x32, 0x33, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x6d, 0x61,
+ 0x63, 0x68, 0x69, 0x6e, 0x65, 0x69, 0x64, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x50, 0x49, 0x46, 0x46,
+ 0x45, 0x46, 0x65, 0x64, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0x75, 0x6e, 0x64, 0x6c,
+ 0x65, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x52, 0x17, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74,
+ 0x42, 0x75, 0x6e, 0x64, 0x6c, 0x65, 0x53, 0x79, 0x6e, 0x63, 0x65, 0x64, 0x46, 0x72, 0x6f, 0x6d,
+ 0x4a, 0x04, 0x08, 0x03, 0x10, 0x04, 0x52, 0x1b, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f,
+ 0x62, 0x75, 0x6e, 0x64, 0x6c, 0x65, 0x5f, 0x72, 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, 0x5f, 0x68,
+ 0x69, 0x6e, 0x74, 0x42, 0x56, 0x5a, 0x54, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f,
+ 0x6d, 0x2f, 0x67, 0x72, 0x61, 0x76, 0x69, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x2f,
+ 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x65, 0x6e,
+ 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f,
+ 0x72, 0x74, 0x2f, 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x69, 0x64, 0x2f, 0x76, 0x31, 0x3b,
+ 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x69, 0x64, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f,
+ 0x74, 0x6f, 0x33,
}
var (
@@ -516,7 +536,6 @@ var file_teleport_machineid_v1_federation_proto_goTypes = []interface{}{
(*SPIFFEFederationStatus)(nil), // 5: teleport.machineid.v1.SPIFFEFederationStatus
(*v1.Metadata)(nil), // 6: teleport.header.v1.Metadata
(*timestamppb.Timestamp)(nil), // 7: google.protobuf.Timestamp
- (*durationpb.Duration)(nil), // 8: google.protobuf.Duration
}
var file_teleport_machineid_v1_federation_proto_depIdxs = []int32{
6, // 0: teleport.machineid.v1.SPIFFEFederation.metadata:type_name -> teleport.header.v1.Metadata
@@ -526,12 +545,13 @@ var file_teleport_machineid_v1_federation_proto_depIdxs = []int32{
2, // 4: teleport.machineid.v1.SPIFFEFederationBundleSource.https_web:type_name -> teleport.machineid.v1.SPIFFEFederationBundleSourceHTTPSWeb
3, // 5: teleport.machineid.v1.SPIFFEFederationSpec.bundle_source:type_name -> teleport.machineid.v1.SPIFFEFederationBundleSource
7, // 6: teleport.machineid.v1.SPIFFEFederationStatus.current_bundle_synced_at:type_name -> google.protobuf.Timestamp
- 8, // 7: teleport.machineid.v1.SPIFFEFederationStatus.current_bundle_refresh_hint:type_name -> google.protobuf.Duration
- 8, // [8:8] is the sub-list for method output_type
- 8, // [8:8] is the sub-list for method input_type
- 8, // [8:8] is the sub-list for extension type_name
- 8, // [8:8] is the sub-list for extension extendee
- 0, // [0:8] is the sub-list for field type_name
+ 7, // 7: teleport.machineid.v1.SPIFFEFederationStatus.next_sync_at:type_name -> google.protobuf.Timestamp
+ 3, // 8: teleport.machineid.v1.SPIFFEFederationStatus.current_bundle_synced_from:type_name -> teleport.machineid.v1.SPIFFEFederationBundleSource
+ 9, // [9:9] is the sub-list for method output_type
+ 9, // [9:9] is the sub-list for method input_type
+ 9, // [9:9] is the sub-list for extension type_name
+ 9, // [9:9] is the sub-list for extension extendee
+ 0, // [0:9] is the sub-list for field type_name
}
func init() { file_teleport_machineid_v1_federation_proto_init() }
diff --git a/api/proto/teleport/machineid/v1/federation.proto b/api/proto/teleport/machineid/v1/federation.proto
index f80eda451bec..8d23553a0c54 100644
--- a/api/proto/teleport/machineid/v1/federation.proto
+++ b/api/proto/teleport/machineid/v1/federation.proto
@@ -16,7 +16,6 @@ syntax = "proto3";
package teleport.machineid.v1;
-import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
import "teleport/header/v1/metadata.proto";
@@ -72,11 +71,22 @@ message SPIFFEFederationSpec {
// FederationStatus is the status of a trust domain federation.
message SPIFFEFederationStatus {
+ reserved 3;
+ reserved "current_bundle_refresh_hint";
+
// The most recently fetched bundle from the federated trust domain.
string current_bundle = 1;
// The time that the most recently fetched bundle was obtained.
google.protobuf.Timestamp current_bundle_synced_at = 2;
- // The duration that the current bundle suggests the next bundle should be
- // refresh after.
- google.protobuf.Duration current_bundle_refresh_hint = 3;
+ // The time that this SPIFFE federation should be synced again. This is
+ // usually determined by the refresh hint provided within the current bundle
+ // but this can be overridden by the server where the provided refresh hint
+ // is not appropriate.
+ //
+ // A value of zero indicates that an automatic sync is not scheduled (e.g.
+ // because the bundle source is static).
+ google.protobuf.Timestamp next_sync_at = 4;
+ // The SPIFFEFederationBundleSource that was used for the currently synced
+ // bundle. This allows the bundle to be resynced if the source changes.
+ SPIFFEFederationBundleSource current_bundle_synced_from = 5;
}
diff --git a/lib/auth/machineid/machineidv1/spiffe_federation_syncer.go b/lib/auth/machineid/machineidv1/spiffe_federation_syncer.go
new file mode 100644
index 000000000000..160aea56b4e0
--- /dev/null
+++ b/lib/auth/machineid/machineidv1/spiffe_federation_syncer.go
@@ -0,0 +1,645 @@
+/*
+ * Teleport
+ * Copyright (C) 2024 Gravitational, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package machineidv1
+
+import (
+ "context"
+ "log/slog"
+ "sync"
+ "time"
+
+ "github.com/gravitational/trace"
+ "github.com/jonboulle/clockwork"
+ "github.com/spiffe/go-spiffe/v2/bundle/spiffebundle"
+ "github.com/spiffe/go-spiffe/v2/federation"
+ "github.com/spiffe/go-spiffe/v2/spiffeid"
+ "go.opentelemetry.io/otel"
+ "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ machineidv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/machineid/v1"
+ "github.com/gravitational/teleport/api/observability/tracing"
+ "github.com/gravitational/teleport/api/types"
+ "github.com/gravitational/teleport/api/utils/retryutils"
+ "github.com/gravitational/teleport/lib/backend"
+)
+
+var tracer = otel.Tracer("github.com/gravitational/teleport/lib/auth/machineid/v1")
+
+type spiffeFederationStore interface {
+ ListSPIFFEFederations(ctx context.Context, limit int, token string) ([]*machineidv1.SPIFFEFederation, string, error)
+ GetSPIFFEFederation(ctx context.Context, name string) (*machineidv1.SPIFFEFederation, error)
+ UpdateSPIFFEFederation(ctx context.Context, federation *machineidv1.SPIFFEFederation) (*machineidv1.SPIFFEFederation, error)
+}
+
+type eventsWatcher interface {
+ NewWatcher(ctx context.Context, watch types.Watch) (types.Watcher, error)
+}
+
+func listAllTrustDomains(
+ ctx context.Context, store spiffeFederationStore,
+) ([]*machineidv1.SPIFFEFederation, error) {
+ var trustDomains []*machineidv1.SPIFFEFederation
+ var token string
+ for {
+ tds, nextToken, err := store.ListSPIFFEFederations(ctx, 100, token)
+ if err != nil {
+ return nil, trace.Wrap(err, "failed to list trust domains")
+ }
+ trustDomains = append(trustDomains, tds...)
+ if nextToken == "" {
+ break
+ }
+ token = nextToken
+ }
+ return trustDomains, nil
+}
+
+// SPIFFEFederationSyncerConfig is the configuration for the SPIFFE federation
+// syncer.
+type SPIFFEFederationSyncerConfig struct {
+ // Backend should be a backend.Backend which can be used for obtaining the
+ // lock required to run the syncer.
+ Backend backend.Backend
+ // Store is where the SPIFFEFederation resources can be fetched and updated.
+ Store spiffeFederationStore
+ // EventsWatcher is used to watch for changes to SPIFFEFederations.
+ EventsWatcher eventsWatcher
+ // Logger is the logger that the syncer will use.
+ Logger *slog.Logger
+ // Clock is the clock that the syncer will use.
+ Clock clockwork.Clock
+
+ // MinSyncInterval is the minimum interval between syncs. If an upstream
+ // trust domain specifies a refresh hint that is less than this value, this
+ // value will be used instead. This allows us to prevent a poorly configured
+ // upstream trust domain from causing excessive load on the local cluster.
+ MinSyncInterval time.Duration
+ // MaxSyncInterval is the maximum interval between syncs. If an upstream
+ // trust domain specifies a refresh hint that is greater than this value,
+ // this value will be used instead. This allows us to prevent a poorly
+ // configured upstream trust domain from causing excessive staleness in the
+ // local cluster.
+ MaxSyncInterval time.Duration
+ // DefaultSyncInterval is the interval between syncs that will be used if an
+ // upstream trust domain does not specify a refresh hint.
+ DefaultSyncInterval time.Duration
+
+ // SyncTimeout is the maximum time that a sync operation is allowed to take.
+ // If a sync operation takes longer than this value, it will be aborted and
+ // retried.
+ SyncTimeout time.Duration
+
+ // SPIFFEFetchOptions are the options that will be used when fetching a
+ // trust bundle from a remote cluster. These options will be passed to the
+ // spiffebundle.FetchBundle function. This is usually used during testing
+ // to override the Web PKI CAs.
+ SPIFFEFetchOptions []federation.FetchOption
+}
+
+// CheckAndSetDefaults checks the configuration and sets defaults where
+// necessary.
+func (c *SPIFFEFederationSyncerConfig) CheckAndSetDefaults() error {
+ switch {
+ case c.Backend == nil:
+ return trace.BadParameter("backend: must be non-nil")
+ case c.Store == nil:
+ return trace.BadParameter("store: must be non-nil")
+ case c.Logger == nil:
+ return trace.BadParameter("logger: must be non-nil")
+ case c.Clock == nil:
+ return trace.BadParameter("clock: must be non-nil")
+ }
+ if c.MinSyncInterval == 0 {
+ c.MinSyncInterval = minRefreshInterval
+ }
+ if c.MaxSyncInterval == 0 {
+ c.MaxSyncInterval = maxRefreshInterval
+ }
+ if c.DefaultSyncInterval == 0 {
+ c.DefaultSyncInterval = defaultRefreshInterval
+ }
+ if c.SyncTimeout == 0 {
+ c.SyncTimeout = defaultSyncTimeout
+ }
+ return nil
+}
+
+// SPIFFEFederationSyncer is a syncer that manages the trust bundles of
+// federated clusters. It runs on a single elected auth server.
+type SPIFFEFederationSyncer struct {
+ cfg SPIFFEFederationSyncerConfig
+}
+
+const (
+ minRefreshInterval = time.Minute * 1
+ maxRefreshInterval = time.Hour * 24
+ // SPIFFE Federation (4.1):
+ // > If not set, a reasonably low default value should apply - five minutes
+ // > is recommended
+ defaultRefreshInterval = time.Minute * 5
+ defaultSyncTimeout = time.Second * 30
+)
+
+// NewSPIFFEFederationSyncer creates a new SPIFFEFederationSyncer.
+func NewSPIFFEFederationSyncer(cfg SPIFFEFederationSyncerConfig) (*SPIFFEFederationSyncer, error) {
+ if err := cfg.CheckAndSetDefaults(); err != nil {
+ return nil, trace.Wrap(err, "validating SPIFFE federation syncer config")
+ }
+ return &SPIFFEFederationSyncer{
+ cfg: cfg,
+ }, nil
+}
+
+func (s *SPIFFEFederationSyncer) Run(ctx context.Context) error {
+ // Loop to retry if acquiring lock fails, with some backoff to avoid pinning
+ // the CPU.
+ waitWithJitter := retryutils.NewSeventhJitter()(time.Second * 10)
+ for {
+ err := backend.RunWhileLocked(ctx, backend.RunWhileLockedConfig{
+ LockConfiguration: backend.LockConfiguration{
+ Backend: s.cfg.Backend,
+ LockName: "spiffe_federation_syncer",
+ TTL: time.Minute,
+ // It doesn't matter too much if the syncer isn't running for
+ // a short period of time so we can take a relaxed approach to
+ // retrying to grab the lock.
+ RetryInterval: time.Second * 30,
+ },
+ }, s.syncTrustDomains)
+ if err != nil {
+ s.cfg.Logger.ErrorContext(
+ ctx,
+ "SPIFFEFederation syncer encountered a fatal error, it will restart after backoff",
+ "error", err,
+ "restart_after", waitWithJitter,
+ )
+ }
+ select {
+ case <-ctx.Done():
+ return nil
+ case <-time.After(waitWithJitter):
+ }
+ }
+}
+
+type trustDomainSyncState struct {
+ // putEventsCh is a channel for passing PUT events to a specific
+ // SPIFFEFederations syncer.
+ putEventsCh chan types.Event
+ // stopCh is a channel for signaling a specific SPIFFEFederations
+ // syncer to stop syncing. This is closed when the watcher detects that the
+ // resource has been deleted.
+ stopCh chan struct{}
+}
+
+// syncTrustDomains is the core loop of the syncer that runs on a single auth
+// server.
+//
+// Its goal is to sync the contents of trust bundles from remote clusters to
+// the local cluster. It does this by creating a goroutine that manages each
+// federated cluster.
+func (s *SPIFFEFederationSyncer) syncTrustDomains(ctx context.Context) error {
+ s.cfg.Logger.InfoContext(
+ ctx,
+ "Obtained lock, SPIFFEFederation syncer is starting",
+ )
+ defer func() {
+ s.cfg.Logger.InfoContext(
+ ctx, "SPIFFEFederation syncer has stopped",
+ )
+ }()
+
+ // This wg will track all active syncers. We'll wait here until we're done.
+ wg := &sync.WaitGroup{}
+ defer func() {
+ wg.Wait()
+ }()
+
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ // Set up state management that will hold a list of all active trust domain
+ // syncers.
+ states := map[string]trustDomainSyncState{}
+ mu := &sync.Mutex{}
+ startSyncingFederation := func(trustDomain string) {
+ mu.Lock()
+ defer mu.Unlock()
+
+ // Don't start if we're already syncing this trust domain.
+ _, ok := states[trustDomain]
+ if ok {
+ return
+ }
+
+ eventsCh := make(chan types.Event, 1)
+ stopCh := make(chan struct{})
+ states[trustDomain] = trustDomainSyncState{
+ putEventsCh: eventsCh,
+ stopCh: stopCh,
+ }
+
+ wg.Add(1)
+ go func() {
+ defer func() {
+ mu.Lock()
+ delete(states, trustDomain)
+ mu.Unlock()
+ wg.Done()
+ }()
+ s.syncTrustDomainLoop(ctx, trustDomain, eventsCh, stopCh)
+ }()
+ }
+
+ // Establish our watcher, we'll use this to react instantly to changes to SPIFFEFederations.
+ w, err := s.cfg.EventsWatcher.NewWatcher(ctx, types.Watch{
+ Kinds: []types.WatchKind{{
+ Kind: types.KindSPIFFEFederation,
+ }},
+ })
+ if err != nil {
+ return trace.Wrap(err, "failed to create watcher")
+ }
+ defer func() {
+ err := w.Close()
+ if err != nil {
+ s.cfg.Logger.ErrorContext(
+ ctx, "Failed to close watcher", "error", err,
+ )
+ }
+ }()
+
+ // Wait for initial "Init" event to indicate we're now receiving events.
+ select {
+ case <-w.Done():
+ if err := w.Error(); err != nil {
+ return trace.Wrap(err, "watcher failed")
+ }
+ return trace.BadParameter("watcher closed unexpectedly")
+ case evt := <-w.Events():
+ if evt.Type == types.OpInit {
+ break
+ }
+ return trace.BadParameter("expected init event, got %v", evt.Type)
+ case <-ctx.Done():
+ return nil
+ }
+
+ // Fetch an initial list of all federations and start syncers for them.
+ trustDomains, err := listAllTrustDomains(ctx, s.cfg.Store)
+ if err != nil {
+ return trace.Wrap(err, "initially listing trust domains")
+ }
+ for _, td := range trustDomains {
+ startSyncingFederation(td.GetMetadata().GetName())
+ }
+
+ // Now we can start reacting to events, we'll want to start/stop syncers as
+ // needed. We'll want to start a syncer for any new trust domain, and
+ // propagate events to existing syncers.
+ for {
+ select {
+ case evt := <-w.Events():
+ s.cfg.Logger.DebugContext(
+ ctx,
+ "Received event from SPIFFEFederation watcher",
+ "evt_type", evt.Type,
+ )
+ switch evt.Type {
+ case types.OpPut:
+ mu.Lock()
+ existingState, ok := states[evt.Resource.GetName()]
+ mu.Unlock()
+ // If it already exists, we can just pass the event along. If
+ // there's already a sync queued due to an event, we don't need to
+ // queue another since it fetches the latest resource anyway.
+ if ok {
+ select {
+ case existingState.putEventsCh <- evt:
+ default:
+ s.cfg.Logger.DebugContext(
+ ctx,
+ "Sync already queued for trust domain, ignoring event",
+ )
+ }
+ continue
+ }
+ // If it doesn't exist, we should kick off a goroutine to start
+ // managing it. That routine will sync automatically on first
+ // run so we don't need to pass the event along.
+ startSyncingFederation(evt.Resource.GetName())
+ case types.OpDelete:
+ mu.Lock()
+ existingState, ok := states[evt.Resource.GetName()]
+ // If it exists, close the stopCh to tell it to exit and remove
+ // it from the states map.
+ if ok {
+ close(existingState.stopCh)
+ delete(states, evt.Resource.GetName())
+ }
+ mu.Unlock()
+ default:
+ }
+ case <-w.Done():
+ if err := w.Error(); err != nil {
+ return trace.Wrap(err, "watcher failed")
+ }
+ return trace.BadParameter("watcher closed unexpectedly")
+ case <-ctx.Done():
+ return nil
+ }
+ }
+}
+
+func (s *SPIFFEFederationSyncer) syncTrustDomainLoop(
+ ctx context.Context,
+ name string,
+ putEventsCh <-chan types.Event,
+ stopCh <-chan struct{},
+) {
+ log := s.cfg.Logger.With("trust_domain", name)
+ log.InfoContext(ctx, "Starting sync loop for trust domain")
+ defer func() {
+ log.InfoContext(ctx, "Stopped sync loop for trust domain")
+ }()
+
+ retry, err := retryutils.NewLinear(retryutils.LinearConfig{
+ First: time.Second,
+ Step: time.Second,
+ Max: time.Second * 10,
+ Clock: s.cfg.Clock,
+ Jitter: retryutils.NewSeventhJitter(),
+ })
+ if err != nil {
+ log.ErrorContext(
+ ctx,
+ "Failed to create retry strategy, trust domain sync loop will not run",
+ "error", err,
+ )
+ return
+ }
+
+ var synced *machineidv1.SPIFFEFederation
+ var nextRetry <-chan time.Time
+ nextSync := s.cfg.Clock.NewTimer(time.Minute)
+ nextSync.Stop()
+ defer nextSync.Stop()
+ firstRun := make(chan struct{}, 1)
+ firstRun <- struct{}{}
+ for {
+ select {
+ case <-firstRun:
+ // On the first run, we should sync immediately.
+ log.DebugContext(ctx, "First run, trying sync immediately")
+ case <-nextSync.Chan():
+ log.DebugContext(ctx, "Next sync time has passed, trying sync")
+ case <-nextRetry:
+ log.InfoContext(ctx, "Wait for backoff complete, retrying sync")
+ case evt := <-putEventsCh:
+ // If we've just synced, we can effectively expect an "echo" of our
+ // last update. We can ignore this event safely.
+ if synced != nil {
+ if evt.Resource.GetRevision() == synced.GetMetadata().GetRevision() {
+ continue
+ }
+ log.DebugContext(
+ ctx,
+ "Resource has been updated, trying to sync trust domain immediately",
+ )
+ }
+ // Note, we explicitly don't use the resource within the event.
+ // Instead, we will fetch the latest upon starting the sync. This
+ // avoids completing multiple syncs if multiple changes are queued.
+ case <-stopCh:
+ log.DebugContext(ctx, "Stop signal received, stopping sync loop")
+ return
+ case <-ctx.Done():
+ return
+ }
+ // Stop our sync timer, we'll only set it if we successfully sync.
+ nextSync.Stop()
+
+ syncCtx, cancel := context.WithTimeout(ctx, s.cfg.SyncTimeout)
+ synced, err = s.syncTrustDomain(syncCtx, name)
+ cancel()
+
+ if err != nil {
+ // If the resource has been deleted, there's no point retrying.
+ // We should stop syncing.
+ if trace.IsNotFound(err) {
+ log.ErrorContext(
+ ctx,
+ "Resource has been deleted, stopping sync loop for trust domain",
+ "error", err,
+ )
+ return
+ }
+ retry.Inc()
+ log.ErrorContext(
+ ctx,
+ "Failed to sync trust domain, will retry after backoff",
+ "error", err,
+ "backoff", retry.Duration(),
+ )
+ nextRetry = retry.After()
+ continue
+ }
+ retry.Reset()
+ nextRetry = nil
+
+ // If we've successfully synced, set the timer up for our next sync.
+ if nextSyncAt := synced.GetStatus().GetNextSyncAt().AsTime(); !nextSyncAt.IsZero() {
+ timeUntil := nextSyncAt.Sub(s.cfg.Clock.Now())
+ // Ensure the timer will tick /after/ the next sync time.
+ timeUntil = timeUntil + time.Second
+ nextSync.Reset(timeUntil)
+ log.InfoContext(
+ ctx,
+ "Waiting to sync again",
+ "next_sync_at", nextSyncAt,
+ )
+ }
+ }
+}
+
+func (s *SPIFFEFederationSyncer) shouldSyncTrustDomain(
+ ctx context.Context,
+ log *slog.Logger,
+ in *machineidv1.SPIFFEFederation,
+) string {
+ if in.Status == nil {
+ log.DebugContext(ctx, "No status, will sync")
+ return "no_status"
+ }
+ if in.Status.CurrentBundle == "" {
+ log.DebugContext(ctx, "No status.current_bundle, will sync")
+ return "no_current_bundle"
+ }
+ if in.Status.CurrentBundleSyncedAt.AsTime().IsZero() {
+ log.DebugContext(ctx, "No status.current_bundle_synced_at, will sync")
+ return "no_current_bundle_synced_at"
+ }
+ // Check if we've passed the next sync time.
+ nextSyncAt := in.Status.NextSyncAt.AsTime()
+ now := s.cfg.Clock.Now()
+ if !nextSyncAt.IsZero() && now.After(nextSyncAt) {
+ log.DebugContext(
+ ctx,
+ "status.next_sync_at has passed, will sync",
+ "next_sync_at", nextSyncAt,
+ "now", now,
+ )
+ return "next_sync_at_passed"
+ }
+ // Check to see if the configured bundle source has changed
+ if in.Status.CurrentBundleSyncedFrom != nil {
+ if !proto.Equal(in.Spec.BundleSource, in.Status.CurrentBundleSyncedFrom) {
+ log.DebugContext(ctx, "status.current_bundle_synced_from has changed, will sync")
+ return "bundle_source_changed"
+ }
+ }
+
+ return ""
+}
+
+func (s *SPIFFEFederationSyncer) syncTrustDomain(
+ ctx context.Context, name string,
+) (out *machineidv1.SPIFFEFederation, err error) {
+ ctx, span := tracer.Start(ctx, "SPIFFEFederationSyncer/syncTrustDomain")
+ defer func() {
+ tracing.EndSpan(span, err)
+ }()
+
+ current, err := s.cfg.Store.GetSPIFFEFederation(ctx, name)
+ if err != nil {
+ return nil, trace.Wrap(err, "failed to get SPIFFEFederation")
+ }
+
+ log := s.cfg.Logger.With(
+ "current_revision", current.GetMetadata().GetRevision(),
+ "trust_domain", current.GetMetadata().GetName(),
+ )
+
+ td, err := spiffeid.TrustDomainFromString(current.GetMetadata().GetName())
+ if err != nil {
+ return nil, trace.Wrap(err, "parsing metadata.name as trust domain name")
+ }
+
+ // Determine - should we sync...
+ syncReason := s.shouldSyncTrustDomain(ctx, log, current)
+ if syncReason == "" {
+ log.DebugContext(ctx, "Skipping sync as is not required")
+ return current, nil
+ }
+ log.InfoContext(ctx, "Sync starting", "reason", syncReason)
+
+ // Clone the persisted resource so we can compare to it.
+ out = proto.Clone(current).(*machineidv1.SPIFFEFederation)
+
+ // Refresh...
+ if out.Status == nil {
+ out.Status = &machineidv1.SPIFFEFederationStatus{}
+ }
+
+ var bundle *spiffebundle.Bundle
+ var nextSyncIn time.Duration
+ switch {
+ case current.GetSpec().GetBundleSource().GetHttpsWeb() != nil:
+ url := current.Spec.BundleSource.HttpsWeb.BundleEndpointUrl
+ log.DebugContext(
+ ctx,
+ "Fetching bundle using https_web profile",
+ "url", url,
+ )
+ bundle, err = federation.FetchBundle(ctx, td, url, s.cfg.SPIFFEFetchOptions...)
+ if err != nil {
+ return nil, trace.Wrap(err, "fetching bundle using https_web profile")
+ }
+
+ // Calculate the duration before we should next sync, applying any
+ // sensible upper and lower bounds.
+ nextSyncIn = s.cfg.DefaultSyncInterval
+ if refreshHint, ok := bundle.RefreshHint(); ok {
+ if refreshHint < s.cfg.MinSyncInterval {
+ log.DebugContext(
+ ctx,
+ "Refresh hint is less than MinSyncInterval, using MinSyncInterval",
+ "refresh_hint", refreshHint,
+ "min_sync_interval", s.cfg.MinSyncInterval,
+ )
+ nextSyncIn = s.cfg.MinSyncInterval
+ } else if refreshHint > s.cfg.MaxSyncInterval {
+ log.DebugContext(
+ ctx,
+ "Refresh hint is greater than MaxSyncInterval, using MaxSyncInterval",
+ "refresh_hint", refreshHint,
+ "max_sync_interval", s.cfg.MaxSyncInterval,
+ )
+ nextSyncIn = s.cfg.MaxSyncInterval
+ } else {
+ nextSyncIn = refreshHint
+ }
+ }
+ case current.GetSpec().GetBundleSource().GetStatic() != nil:
+ log.DebugContext(
+ ctx, "Fetching bundle using spec.bundle_source.static.bundle",
+ )
+ bundle, err = spiffebundle.Parse(
+ td, []byte(current.Spec.BundleSource.Static.Bundle),
+ )
+ if err != nil {
+ return nil, trace.Wrap(
+ err, "parsing bundle from static profile",
+ )
+ }
+ default:
+ return nil, trace.BadParameter(
+ "spec.bundle_source: at least one of https_web or static must be set",
+ )
+ }
+
+ bundleBytes, err := bundle.Marshal()
+ if err != nil {
+ return nil, trace.Wrap(err, "marshaling bundle")
+ }
+ out.Status.CurrentBundle = string(bundleBytes)
+ out.Status.CurrentBundleSyncedFrom = current.Spec.BundleSource
+
+ syncedAt := s.cfg.Clock.Now().UTC()
+ out.Status.CurrentBundleSyncedAt = timestamppb.New(syncedAt)
+ // For certain sources, we need to set a next sync time.
+ if nextSyncIn > 0 {
+ out.Status.NextSyncAt = timestamppb.New(syncedAt.Add(nextSyncIn))
+ }
+
+ out, err = s.cfg.Store.UpdateSPIFFEFederation(ctx, out)
+ if err != nil {
+ return nil, trace.Wrap(
+ err, "persisting updated SPIFFEFederation",
+ )
+ }
+ log.InfoContext(
+ ctx,
+ "Sync succeeded, new SPIFFEFederation persisted",
+ "new_revision", out.Metadata.Revision,
+ )
+
+ return out, nil
+}
diff --git a/lib/auth/machineid/machineidv1/spiffe_federation_syncer_test.go b/lib/auth/machineid/machineidv1/spiffe_federation_syncer_test.go
new file mode 100644
index 000000000000..568a765eaa23
--- /dev/null
+++ b/lib/auth/machineid/machineidv1/spiffe_federation_syncer_test.go
@@ -0,0 +1,314 @@
+/*
+ * Teleport
+ * Copyright (C) 2024 Gravitational, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package machineidv1
+
+import (
+ "context"
+ "crypto/x509"
+ "encoding/pem"
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+ "time"
+
+ "github.com/google/go-cmp/cmp"
+ "github.com/jonboulle/clockwork"
+ "github.com/spiffe/go-spiffe/v2/bundle/spiffebundle"
+ "github.com/spiffe/go-spiffe/v2/federation"
+ "github.com/spiffe/go-spiffe/v2/spiffeid"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/testing/protocmp"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ headerv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/header/v1"
+ machineidv1pb "github.com/gravitational/teleport/api/gen/proto/go/teleport/machineid/v1"
+ "github.com/gravitational/teleport/api/types"
+ "github.com/gravitational/teleport/lib/backend/memory"
+ "github.com/gravitational/teleport/lib/fixtures"
+ "github.com/gravitational/teleport/lib/services/local"
+ "github.com/gravitational/teleport/lib/utils"
+)
+
+func makeTrustDomain(t *testing.T, name string) (spiffeid.TrustDomain, *spiffebundle.Bundle) {
+ td := spiffeid.RequireTrustDomainFromString(name)
+ bundle := spiffebundle.New(td)
+ b, _ := pem.Decode([]byte(fixtures.TLSCACertPEM))
+ cert, err := x509.ParseCertificate(b.Bytes)
+ require.NoError(t, err)
+ bundle.AddX509Authority(cert)
+ bundle.SetRefreshHint(time.Minute * 12)
+ return td, bundle
+}
+
+func TestSPIFFEFederationSyncer(t *testing.T) {
+ t.Parallel()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ t.Cleanup(cancel)
+
+ logger := utils.NewSlogLoggerForTests()
+ clock := clockwork.NewRealClock()
+ backend, err := memory.New(memory.Config{})
+ require.NoError(t, err)
+
+ store, err := local.NewSPIFFEFederationService(backend)
+ require.NoError(t, err)
+ eventsSvc := local.NewEventsService(backend)
+
+ td1, bundle1 := makeTrustDomain(t, "1.example.com")
+ marshaledBundle1, err := bundle1.Marshal()
+ require.NoError(t, err)
+ td2, bundle2 := makeTrustDomain(t, "2.example.com")
+ marshaledBundle2, err := bundle1.Marshal()
+ require.NoError(t, err)
+
+ // Implement a fake SPIFFE Federation bundle endpoint
+ testSrv1 := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ h, err := federation.NewHandler(td1, bundle1)
+ if !assert.NoError(t, err) {
+ return
+ }
+ h.ServeHTTP(w, r)
+ }))
+ testSrv2 := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ h, err := federation.NewHandler(td2, bundle2)
+ if !assert.NoError(t, err) {
+ return
+ }
+ h.ServeHTTP(w, r)
+ }))
+
+ caPool := x509.NewCertPool()
+ caPool.AddCert(testSrv1.Certificate())
+ caPool.AddCert(testSrv2.Certificate())
+
+ // Create one trust domain prior to startng the syncer
+ created1, err := store.CreateSPIFFEFederation(ctx, &machineidv1pb.SPIFFEFederation{
+ Kind: types.KindSPIFFEFederation,
+ Version: types.V1,
+ Metadata: &headerv1.Metadata{
+ Name: "1.example.com",
+ },
+ Spec: &machineidv1pb.SPIFFEFederationSpec{
+ BundleSource: &machineidv1pb.SPIFFEFederationBundleSource{
+ HttpsWeb: &machineidv1pb.SPIFFEFederationBundleSourceHTTPSWeb{
+ BundleEndpointUrl: testSrv1.URL,
+ },
+ },
+ },
+ })
+ require.NoError(t, err)
+
+ syncer, err := NewSPIFFEFederationSyncer(SPIFFEFederationSyncerConfig{
+ Backend: backend,
+ Store: store,
+ EventsWatcher: eventsSvc,
+ Clock: clock,
+ Logger: logger,
+ SPIFFEFetchOptions: []federation.FetchOption{
+ federation.WithWebPKIRoots(caPool),
+ },
+ })
+ require.NoError(t, err)
+
+ errCh := make(chan error, 1)
+ go func() {
+ err := syncer.Run(ctx)
+ assert.NoError(t, err)
+ errCh <- err
+ }()
+
+ // Wait for the initially created SPIFFEFederation to be synced
+ require.EventuallyWithT(t, func(collect *assert.CollectT) {
+ got, err := store.GetSPIFFEFederation(ctx, created1.Metadata.Name)
+ if !assert.NoError(t, err) {
+ return
+ }
+ // Check that some update as occurred (as indicated by the revision)
+ if !assert.NotEqual(t, got.Metadata.Revision, created1.Metadata.Revision) {
+ return
+ }
+ // Check that the expected status fields have been set...
+ if !assert.NotNil(t, got.Status) {
+ return
+ }
+ assert.Equal(t, string(marshaledBundle1), got.Status.CurrentBundle)
+ }, time.Second*5, time.Millisecond*100)
+
+ // Create a second SPIFFEFederation and wait for it to be synced
+ created2, err := store.CreateSPIFFEFederation(ctx, &machineidv1pb.SPIFFEFederation{
+ Kind: types.KindSPIFFEFederation,
+ Version: types.V1,
+ Metadata: &headerv1.Metadata{
+ Name: "2.example.com",
+ },
+ Spec: &machineidv1pb.SPIFFEFederationSpec{
+ BundleSource: &machineidv1pb.SPIFFEFederationBundleSource{
+ HttpsWeb: &machineidv1pb.SPIFFEFederationBundleSourceHTTPSWeb{
+ BundleEndpointUrl: testSrv2.URL,
+ },
+ },
+ },
+ })
+ require.NoError(t, err)
+ require.EventuallyWithT(t, func(collect *assert.CollectT) {
+ got, err := store.GetSPIFFEFederation(ctx, created2.Metadata.Name)
+ if !assert.NoError(t, err) {
+ return
+ }
+ // Check that some update as occurred (as indicated by the revision)
+ if !assert.NotEqual(t, got.Metadata.Revision, created2.Metadata.Revision) {
+ return
+ }
+ // Check that the expected status fields have been set...
+ if !assert.NotNil(t, got.Status) {
+ return
+ }
+ assert.Equal(t, string(marshaledBundle2), got.Status.CurrentBundle)
+ }, time.Second*5, time.Millisecond*100)
+
+ cancel()
+ select {
+ case err := <-errCh:
+ assert.NoError(t, err)
+ case <-time.After(time.Second * 5):
+ t.Fatalf("timeout waiting for syncer to stop")
+ }
+}
+
+func TestSPIFFEFederationSyncer_syncFederation(t *testing.T) {
+ t.Parallel()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ t.Cleanup(cancel)
+
+ logger := utils.NewSlogLoggerForTests()
+ clock := clockwork.NewFakeClock()
+ backend, err := memory.New(memory.Config{})
+ require.NoError(t, err)
+
+ store, err := local.NewSPIFFEFederationService(backend)
+ require.NoError(t, err)
+ eventsSvc := local.NewEventsService(backend)
+
+ td, bundle := makeTrustDomain(t, "example.com")
+ marshaledBundle, err := bundle.Marshal()
+ require.NoError(t, err)
+
+ // Implement a fake SPIFFE Federation bundle endpoint
+ testSrv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ h, err := federation.NewHandler(td, bundle)
+ if !assert.NoError(t, err) {
+ return
+ }
+ h.ServeHTTP(w, r)
+ }))
+ caPool := x509.NewCertPool()
+ caPool.AddCert(testSrv.Certificate())
+
+ syncer, err := NewSPIFFEFederationSyncer(SPIFFEFederationSyncerConfig{
+ Backend: backend,
+ Store: store,
+ EventsWatcher: eventsSvc,
+ Clock: clock,
+ Logger: logger,
+ SPIFFEFetchOptions: []federation.FetchOption{
+ federation.WithWebPKIRoots(caPool),
+ },
+ })
+ require.NoError(t, err)
+
+ t.Run("https-web", func(t *testing.T) {
+ in := &machineidv1pb.SPIFFEFederation{
+ Kind: types.KindSPIFFEFederation,
+ Version: types.V1,
+ Metadata: &headerv1.Metadata{
+ Name: "example.com",
+ },
+ Spec: &machineidv1pb.SPIFFEFederationSpec{
+ BundleSource: &machineidv1pb.SPIFFEFederationBundleSource{
+ HttpsWeb: &machineidv1pb.SPIFFEFederationBundleSourceHTTPSWeb{
+ BundleEndpointUrl: testSrv.URL,
+ },
+ },
+ },
+ }
+ created, err := store.CreateSPIFFEFederation(ctx, in)
+ require.NoError(t, err)
+
+ firstSync, err := syncer.syncTrustDomain(ctx, "example.com")
+ require.NoError(t, err)
+ got, err := store.GetSPIFFEFederation(ctx, "example.com")
+ require.NoError(t, err)
+ // Require that the persisted resource equals the resource output by syncTrustDomain
+ require.Empty(t, cmp.Diff(got, firstSync, protocmp.Transform(), protocmp.IgnoreFields(&headerv1.Metadata{}, "id")))
+ // Check that some update as occurred (as indicated by the revision)
+ require.NotEqual(t, created.Metadata.Revision, firstSync.Metadata.Revision)
+ // Check that the expected status fields have been set...
+ require.NotNil(t, firstSync.Status)
+ wantStatus := &machineidv1pb.SPIFFEFederationStatus{
+ CurrentBundleSyncedAt: timestamppb.New(clock.Now().UTC()),
+ CurrentBundleSyncedFrom: proto.Clone(created).(*machineidv1pb.SPIFFEFederation).Spec.BundleSource,
+ NextSyncAt: timestamppb.New(clock.Now().UTC().Add(time.Minute * 12)),
+ CurrentBundle: string(marshaledBundle),
+ }
+ require.Empty(t, cmp.Diff(firstSync.Status, wantStatus, protocmp.Transform()))
+
+ // Check that syncing again does nothing.
+ secondSync, err := syncer.syncTrustDomain(ctx, "example.com")
+ require.NoError(t, err)
+ require.Empty(t, cmp.Diff(secondSync, firstSync, protocmp.Transform(), protocmp.IgnoreFields(&headerv1.Metadata{}, "id")))
+
+ // Advance the clock and check that the sync is triggered again.
+ clock.Advance(time.Minute * 15)
+ thirdSync, err := syncer.syncTrustDomain(ctx, "example.com")
+ require.NoError(t, err)
+ require.NotEqual(t, firstSync.Metadata.Revision, thirdSync.Metadata.Revision)
+ wantStatus = &machineidv1pb.SPIFFEFederationStatus{
+ CurrentBundleSyncedAt: timestamppb.New(clock.Now().UTC()),
+ CurrentBundleSyncedFrom: proto.Clone(created).(*machineidv1pb.SPIFFEFederation).Spec.BundleSource,
+ NextSyncAt: timestamppb.New(clock.Now().UTC().Add(time.Minute * 12)),
+ CurrentBundle: string(marshaledBundle),
+ }
+ require.Empty(t, cmp.Diff(thirdSync.Status, wantStatus, protocmp.Transform()))
+
+ // Check that syncing again does nothing.
+ fourthSync, err := syncer.syncTrustDomain(ctx, "example.com")
+ require.NoError(t, err)
+ require.Empty(t, cmp.Diff(fourthSync, thirdSync, protocmp.Transform(), protocmp.IgnoreFields(&headerv1.Metadata{}, "id")))
+
+ // Check that modifying the bundle source triggers a sync.
+ fourthSync.Spec.BundleSource.HttpsWeb.BundleEndpointUrl = fmt.Sprintf("%s/modified", testSrv.URL)
+ updated, err := store.UpdateSPIFFEFederation(ctx, fourthSync)
+ require.NoError(t, err)
+ fifthSync, err := syncer.syncTrustDomain(ctx, "example.com")
+ require.NoError(t, err)
+ require.NotEqual(t, updated.Metadata.Revision, fifthSync.Metadata.Revision)
+ wantStatus = &machineidv1pb.SPIFFEFederationStatus{
+ CurrentBundleSyncedAt: timestamppb.New(clock.Now().UTC()),
+ CurrentBundleSyncedFrom: proto.Clone(updated).(*machineidv1pb.SPIFFEFederation).Spec.BundleSource,
+ NextSyncAt: timestamppb.New(clock.Now().UTC().Add(time.Minute * 12)),
+ CurrentBundle: string(marshaledBundle),
+ }
+ require.Empty(t, cmp.Diff(fifthSync.Status, wantStatus, protocmp.Transform()))
+ })
+}
diff --git a/lib/service/service.go b/lib/service/service.go
index e762bf5a69f2..e2777bf79df0 100644
--- a/lib/service/service.go
+++ b/lib/service/service.go
@@ -91,6 +91,7 @@ import (
"github.com/gravitational/teleport/lib/auth/authclient"
"github.com/gravitational/teleport/lib/auth/keygen"
"github.com/gravitational/teleport/lib/auth/keystore"
+ "github.com/gravitational/teleport/lib/auth/machineid/machineidv1"
"github.com/gravitational/teleport/lib/auth/native"
"github.com/gravitational/teleport/lib/auth/state"
"github.com/gravitational/teleport/lib/auth/storage"
@@ -2402,6 +2403,22 @@ func (process *TeleportProcess) initAuthService() error {
}
process.RegisterFunc("auth.heartbeat", heartbeat.Run)
+ spiffeFedSyncer, err := machineidv1.NewSPIFFEFederationSyncer(machineidv1.SPIFFEFederationSyncerConfig{
+ Backend: b,
+ Store: authServer.Services.SPIFFEFederations,
+ EventsWatcher: authServer.Services.Events,
+ Clock: process.Clock,
+ Logger: logger.With(
+ teleport.ComponentKey, teleport.Component(teleport.ComponentAuth, "spiffe_federation_syncer"),
+ ),
+ })
+ if err != nil {
+ return trace.Wrap(err, "initializing SPIFFEFederation Syncer")
+ }
+ process.RegisterFunc("auth.spiffe_federation_syncer", func() error {
+ return trace.Wrap(spiffeFedSyncer.Run(process.GracefulExitContext()), "running SPIFFEFederation Syncer")
+ })
+
process.RegisterFunc("auth.server_info", func() error {
return trace.Wrap(authServer.ReconcileServerInfos(process.GracefulExitContext()))
})
diff --git a/lib/services/local/spiffe_federations.go b/lib/services/local/spiffe_federations.go
index 67910e22ac83..611c3e96ab5f 100644
--- a/lib/services/local/spiffe_federations.go
+++ b/lib/services/local/spiffe_federations.go
@@ -110,3 +110,14 @@ func (b *SPIFFEFederationService) UpsertSPIFFEFederation(
upserted, err := b.service.UpsertResource(ctx, federation)
return upserted, trace.Wrap(err)
}
+
+// UpdateSPIFFEFederation updates a specific SPIFFEFederation.
+func (b *SPIFFEFederationService) UpdateSPIFFEFederation(
+ ctx context.Context, federation *machineidv1.SPIFFEFederation,
+) (*machineidv1.SPIFFEFederation, error) {
+ if err := services.ValidateSPIFFEFederation(federation); err != nil {
+ return nil, trace.Wrap(err)
+ }
+ updated, err := b.service.ConditionalUpdateResource(ctx, federation)
+ return updated, trace.Wrap(err)
+}
diff --git a/lib/services/local/spiffe_federations_test.go b/lib/services/local/spiffe_federations_test.go
index aac270fc631b..502274c851d9 100644
--- a/lib/services/local/spiffe_federations_test.go
+++ b/lib/services/local/spiffe_federations_test.go
@@ -276,3 +276,54 @@ func TestSPIFFEFederationService_DeleteAllSPIFFEFederation(t *testing.T) {
require.NoError(t, err)
require.Empty(t, page)
}
+
+func TestSPIFFEFederationService_UpdateSPIFFEFederation(t *testing.T) {
+ ctx, service := setupSPIFFEFederationTest(t)
+
+ t.Run("ok", func(t *testing.T) {
+ // Create resource for us to Update since we can't update a non-existent resource.
+ created, err := service.CreateSPIFFEFederation(
+ ctx,
+ newSPIFFEFederation("example.com"),
+ )
+ require.NoError(t, err)
+ want := proto.Clone(created).(*machineidv1.SPIFFEFederation)
+ want.Spec.BundleSource.HttpsWeb.BundleEndpointUrl = "https://example.com/new-bundle.json"
+
+ updated, err := service.UpdateSPIFFEFederation(
+ ctx,
+ // Clone to avoid Marshaling modifying want
+ proto.Clone(want).(*machineidv1.SPIFFEFederation),
+ )
+ require.NoError(t, err)
+ require.NotEqual(t, created.Metadata.Revision, updated.Metadata.Revision)
+ require.Empty(t, cmp.Diff(
+ want,
+ updated,
+ protocmp.Transform(),
+ protocmp.IgnoreFields(&headerv1.Metadata{}, "revision", "id"),
+ ))
+
+ got, err := service.GetSPIFFEFederation(ctx, "example.com")
+ require.NoError(t, err)
+ require.Empty(t, cmp.Diff(
+ want,
+ got,
+ protocmp.Transform(),
+ protocmp.IgnoreFields(&headerv1.Metadata{}, "revision", "id"),
+ ))
+ require.Equal(t, updated.Metadata.Revision, got.Metadata.Revision)
+ })
+ t.Run("validation occurs", func(t *testing.T) {
+ out, err := service.UpdateSPIFFEFederation(ctx, newSPIFFEFederation("spiffe://i-will-fail"))
+ require.ErrorContains(t, err, "metadata.name: must not include the spiffe:// prefix")
+ require.Nil(t, out)
+ })
+ t.Run("no create", func(t *testing.T) {
+ _, err := service.UpdateSPIFFEFederation(
+ ctx,
+ newSPIFFEFederation("non-existing.com"),
+ )
+ require.Error(t, err)
+ })
+}
diff --git a/lib/services/resource.go b/lib/services/resource.go
index 411264c716c9..744eca794016 100644
--- a/lib/services/resource.go
+++ b/lib/services/resource.go
@@ -248,6 +248,8 @@ func ParseShortcut(in string) (string, error) {
return types.KindPlugin, nil
case types.KindAccessGraphSettings, "ags":
return types.KindAccessGraphSettings, nil
+ case types.KindSPIFFEFederation, types.KindSPIFFEFederation + "s":
+ return types.KindSPIFFEFederation, nil
}
return "", trace.BadParameter("unsupported resource: %q - resources should be expressed as 'type/name', for example 'connector/github'", in)
}
diff --git a/lib/services/spiffe_federations.go b/lib/services/spiffe_federations.go
index 58d24f4662cb..266c7b759fa2 100644
--- a/lib/services/spiffe_federations.go
+++ b/lib/services/spiffe_federations.go
@@ -48,6 +48,11 @@ type SPIFFEFederations interface {
) (*machineidv1.SPIFFEFederation, error)
// DeleteSPIFFEFederation deletes a SPIFFE Federation by name.
DeleteSPIFFEFederation(ctx context.Context, name string) error
+ // UpdateSPIFFEFederation updates a SPIFFE Federation. It will not act if the resource is not found
+ // or where the revision does not match.
+ UpdateSPIFFEFederation(
+ ctx context.Context, spiffeFederation *machineidv1.SPIFFEFederation,
+ ) (*machineidv1.SPIFFEFederation, error)
}
// MarshalSPIFFEFederation marshals the SPIFFEFederation object into a JSON byte
@@ -124,5 +129,16 @@ func ValidateSPIFFEFederation(s *machineidv1.SPIFFEFederation) error {
}
}
+ // Ensure that all key status fields are set if any are set. This is a safeguard against weird inconsistent states
+ // where some fields are set and others are not.
+ currentBundleSet := s.Status.GetCurrentBundle() != ""
+ currentBundledSyncedAtSet := s.Status.GetCurrentBundleSyncedAt() != nil
+ currentBundleSyncedFromSet := s.Status.GetCurrentBundleSyncedFrom() != nil
+ anyStatusFieldSet := currentBundleSet || currentBundledSyncedAtSet || currentBundleSyncedFromSet
+ allStatusFieldsSet := currentBundleSet && currentBundledSyncedAtSet && currentBundleSyncedFromSet
+ if anyStatusFieldSet && !allStatusFieldsSet {
+ return trace.BadParameter("status: all of ['current_bundle', 'current_bundle_synced_at', 'current_bundle_synced_from'] must be set if any are set")
+ }
+
return nil
}