From 821cf95cef7cccd08071b18477a9a1adcadd440a Mon Sep 17 00:00:00 2001 From: v-kamerdinerov Date: Thu, 10 Oct 2024 14:28:55 +0300 Subject: [PATCH 1/4] Increase speed sync with k8s --- control-plane/catalog/to-consul/syncer.go | 140 +++++++++++++----- .../catalog/to-consul/syncer_test.go | 35 +++++ 2 files changed, 141 insertions(+), 34 deletions(-) diff --git a/control-plane/catalog/to-consul/syncer.go b/control-plane/catalog/to-consul/syncer.go index 95a7b93dd1..beb67cc0a1 100644 --- a/control-plane/catalog/to-consul/syncer.go +++ b/control-plane/catalog/to-consul/syncer.go @@ -5,6 +5,7 @@ package catalog import ( "context" + "reflect" "fmt" "sync" "time" @@ -58,7 +59,7 @@ var SyncCatalogGauge = []prometheus.GaugeDefinition{ const ( // ConsulSyncPeriod is how often the syncer will attempt to // reconcile the expected service states with the remote Consul server. - ConsulSyncPeriod = 30 * time.Second + ConsulSyncPeriod = 2 * time.Second // ConsulServicePollPeriod is how often a service is checked for // whether it has instances to reap. @@ -171,8 +172,44 @@ func (s *ConsulSyncer) Sync(rs []*api.CatalogRegistration) { } s.namespaces[ns][r.Service.ID] = r s.Log.Debug("[Sync] adding service to namespaces map", "service", r.Service) + // Sync immediately if the registration is new or changed + if s.shouldSync(r) { + s.Log.Info("syncing service", "node-name", r.Node, "service-name", r.Service.Service) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + s.syncOne(ctx, r) + } + } + + // Deregister any services that are no longer present + for ns, services := range s.namespaces { + for _, svc := range services { + // Make sure the namespace exists before we run checks against it + if _, ok := serviceNames[ns]; ok { + // If the service is valid and its info isn't nil, we don't deregister it + if serviceNames[ns].Contains(svc.Service.Service) && namespaces[ns][svc.Service.ID] != nil { + continue + } + } + + // Create deregistration object with optional namespace + dereg := api.CatalogDeregistration{ + Node: svc.Node, + ServiceID: svc.Service.ID, + } + if s.EnableNamespaces { + dereg.Namespace = ns + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + s.deregOne(ctx, &dereg) + } } + s.serviceNames = serviceNames + s.namespaces = namespaces // Signal that the initial sync is complete and our maps have been populated. // We can now safely reap untracked services. s.initialSyncOnce.Do(func() { close(s.initialSync) }) @@ -485,34 +522,7 @@ func (s *ConsulSyncer) syncFull(ctx context.Context) { // Do all deregistrations first. for _, r := range s.deregs { - s.Log.Info("deregistering service", - "node-name", r.Node, - "service-id", r.ServiceID, - "service-consul-namespace", r.Namespace) - - _, err = consulClient.Catalog().Deregister(r, nil) - if err != nil { - // metric count for error deregistering k8s services from Consul - labels := []metrics.Label{ - {Name: "error", Value: err.Error()}, - } - s.PrometheusSink.IncrCounterWithLabels(deregisterErrorName, 1, labels) - - s.Log.Warn("error deregistering service", - "node-name", r.Node, - "service-id", r.ServiceID, - "service-consul-namespace", r.Namespace, - "err", err) - continue - } - - // metric count for deregistering k8s services from Consul - labels := []metrics.Label{ - {Name: "id", Value: r.ServiceID}, - {Name: "node", Value: r.Node}, - {Name: "namespace", Value: r.Namespace}, - } - s.PrometheusSink.IncrCounterWithLabels(deregisterName, 1, labels) + s.deregOne(ctx, r) } // Always clear deregistrations, they'll repopulate if we had errors @@ -522,6 +532,7 @@ func (s *ConsulSyncer) syncFull(ctx context.Context) { // may have been made to the registered services. for _, services := range s.namespaces { for _, r := range services { + s.syncOne(ctx, r) if s.EnableNamespaces { _, err = namespaces.EnsureExists(consulClient, r.Service.Namespace, s.CrossNamespaceACLPolicy) if err != nil { @@ -553,11 +564,6 @@ func (s *ConsulSyncer) syncFull(ctx context.Context) { continue } - s.Log.Debug("registered service instance", - "node-name", r.Node, - "service-name", r.Service.Service, - "consul-namespace-name", r.Service.Namespace, - "service", r.Service) // metric count and service metadata syncing k8s services to Consul labels := []metrics.Label{ @@ -581,6 +587,72 @@ func (s *ConsulSyncer) syncFull(ctx context.Context) { } } +func (s *ConsulSyncer) syncOne(ctx context.Context, r *api.CatalogRegistration) { + + // Register the service + wopt := (&api.WriteOptions{}).WithContext(ctx) + _, err := s.Client.Catalog().Register(r, wopt) + if err != nil { + s.Log.Warn("error registering service", + "node-name", r.Node, + "service-name", r.Service.Service, + "err", err) + return + } + + s.Log.Debug("registered service instance", + "node-name", r.Node, + "service-name", r.Service.Service, + "consul-namespace-name", r.Service.Namespace, + "service", r.Service) +} + +func (s *ConsulSyncer) deregOne(ctx context.Context, r *api.CatalogDeregistration) { + s.Log.Info("deregistering service", + "node-name", r.Node, + "service-id", r.ServiceID, + "service-consul-namespace", r.Namespace) + + _, err = consulClient.Catalog().Deregister(r, nil) + if err != nil { + // metric count for error deregistering k8s services from Consul + labels := []metrics.Label{ + {Name: "error", Value: err.Error()}, + } + s.PrometheusSink.IncrCounterWithLabels(deregisterErrorName, 1, labels) + + s.Log.Warn("error deregistering service", + "node-name", r.Node, + "service-id", r.ServiceID, + "service-consul-namespace", r.Namespace, + "err", err) + continue + } + + // metric count for deregistering k8s services from Consul + labels := []metrics.Label{ + {Name: "id", Value: r.ServiceID}, + {Name: "node", Value: r.Node}, + {Name: "namespace", Value: r.Namespace}, + } + s.PrometheusSink.IncrCounterWithLabels(deregisterName, 1, labels) +} + +func (s *ConsulSyncer) shouldSync(r *api.CatalogRegistration) bool { + // If the namespace doesn't exist, this service will + // definitely need to be registered + _, ok := s.namespaces[r.Service.Namespace] + if !ok { + return true + } + reg, ok := s.namespaces[r.Service.Namespace][r.Service.ID] + if !ok { + return true + } + + return !reflect.DeepEqual(reg, r) +} + func (s *ConsulSyncer) init() { s.lock.Lock() defer s.lock.Unlock() diff --git a/control-plane/catalog/to-consul/syncer_test.go b/control-plane/catalog/to-consul/syncer_test.go index 3f0d19ee50..4b07b09ecb 100644 --- a/control-plane/catalog/to-consul/syncer_test.go +++ b/control-plane/catalog/to-consul/syncer_test.go @@ -64,6 +64,41 @@ func TestConsulSyncer_register(t *testing.T) { require.Equal(t, "127.0.0.1", service.Address) } +func TestConsulSyncer_registerImmediate(t *testing.T) { + t.Parallel() + require := require.New(t) + + a := agent.NewTestAgent(t, t.Name(), ``) + defer a.Shutdown() + testrpc.WaitForTestAgent(t, a.RPC, "dc1") + client := a.Client() + + s, closer := testConsulSyncerWithConfig(client, func(s *ConsulSyncer) { + s.SyncPeriod = 10 * time.Second + }) + defer closer() + + // Sync + s.Sync([]*api.CatalogRegistration{ + testRegistration("foo", "bar", "default"), + }) + + // Read the service back out + services, _, err := client.Catalog().Service("bar", "", nil) + if err != nil { + t.Fatalf("err: %s", err) + } + if len(services) == 0 { + t.Fatal("service not found") + } + service := services[0] + + // Verify the settings + require.Equal("foo", service.Node) + require.Equal("bar", service.ServiceName) + require.Equal("127.0.0.1", service.Address) +} + // Test that the syncer reaps individual invalid service instances. func TestConsulSyncer_reapServiceInstance(t *testing.T) { t.Parallel() From deec063de71491dc0cd8c73279c6881f067a0f60 Mon Sep 17 00:00:00 2001 From: v-kamerdinerov Date: Thu, 10 Oct 2024 16:27:35 +0300 Subject: [PATCH 2/4] fmt + proper var names --- control-plane/catalog/to-consul/syncer.go | 57 +++++++++++------------ 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/control-plane/catalog/to-consul/syncer.go b/control-plane/catalog/to-consul/syncer.go index beb67cc0a1..b9d51eccbc 100644 --- a/control-plane/catalog/to-consul/syncer.go +++ b/control-plane/catalog/to-consul/syncer.go @@ -5,8 +5,8 @@ package catalog import ( "context" - "reflect" "fmt" + "reflect" "sync" "time" @@ -186,9 +186,9 @@ func (s *ConsulSyncer) Sync(rs []*api.CatalogRegistration) { for ns, services := range s.namespaces { for _, svc := range services { // Make sure the namespace exists before we run checks against it - if _, ok := serviceNames[ns]; ok { + if _, ok := s.serviceNames[ns]; ok { // If the service is valid and its info isn't nil, we don't deregister it - if serviceNames[ns].Contains(svc.Service.Service) && namespaces[ns][svc.Service.ID] != nil { + if s.serviceNames[ns].Contains(svc.Service.Service) && s.namespaces[ns][svc.Service.ID] != nil { continue } } @@ -564,7 +564,6 @@ func (s *ConsulSyncer) syncFull(ctx context.Context) { continue } - // metric count and service metadata syncing k8s services to Consul labels := []metrics.Label{ {Name: "id", Value: r.Service.ID}, @@ -608,34 +607,34 @@ func (s *ConsulSyncer) syncOne(ctx context.Context, r *api.CatalogRegistration) } func (s *ConsulSyncer) deregOne(ctx context.Context, r *api.CatalogDeregistration) { - s.Log.Info("deregistering service", - "node-name", r.Node, - "service-id", r.ServiceID, - "service-consul-namespace", r.Namespace) - - _, err = consulClient.Catalog().Deregister(r, nil) - if err != nil { - // metric count for error deregistering k8s services from Consul - labels := []metrics.Label{ - {Name: "error", Value: err.Error()}, - } - s.PrometheusSink.IncrCounterWithLabels(deregisterErrorName, 1, labels) - - s.Log.Warn("error deregistering service", - "node-name", r.Node, - "service-id", r.ServiceID, - "service-consul-namespace", r.Namespace, - "err", err) - continue - } + s.Log.Info("deregistering service", + "node-name", r.Node, + "service-id", r.ServiceID, + "service-consul-namespace", r.Namespace) - // metric count for deregistering k8s services from Consul + _, err = consulClient.Catalog().Deregister(r, nil) + if err != nil { + // metric count for error deregistering k8s services from Consul labels := []metrics.Label{ - {Name: "id", Value: r.ServiceID}, - {Name: "node", Value: r.Node}, - {Name: "namespace", Value: r.Namespace}, + {Name: "error", Value: err.Error()}, } - s.PrometheusSink.IncrCounterWithLabels(deregisterName, 1, labels) + s.PrometheusSink.IncrCounterWithLabels(deregisterErrorName, 1, labels) + + s.Log.Warn("error deregistering service", + "node-name", r.Node, + "service-id", r.ServiceID, + "service-consul-namespace", r.Namespace, + "err", err) + continue + } + + // metric count for deregistering k8s services from Consul + labels := []metrics.Label{ + {Name: "id", Value: r.ServiceID}, + {Name: "node", Value: r.Node}, + {Name: "namespace", Value: r.Namespace}, + } + s.PrometheusSink.IncrCounterWithLabels(deregisterName, 1, labels) } func (s *ConsulSyncer) shouldSync(r *api.CatalogRegistration) bool { From 663c00e395a1ef232e87f28c7281265c350f834c Mon Sep 17 00:00:00 2001 From: v-kamerdinerov Date: Mon, 21 Oct 2024 21:25:02 +0300 Subject: [PATCH 3/4] add consulClient --- control-plane/catalog/to-consul/syncer.go | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/control-plane/catalog/to-consul/syncer.go b/control-plane/catalog/to-consul/syncer.go index b9d51eccbc..5f97c17141 100644 --- a/control-plane/catalog/to-consul/syncer.go +++ b/control-plane/catalog/to-consul/syncer.go @@ -208,8 +208,8 @@ func (s *ConsulSyncer) Sync(rs []*api.CatalogRegistration) { } } - s.serviceNames = serviceNames - s.namespaces = namespaces + //s.serviceNames = serviceNames + //s.namespaces = namespaces // Signal that the initial sync is complete and our maps have been populated. // We can now safely reap untracked services. s.initialSyncOnce.Do(func() { close(s.initialSync) }) @@ -588,9 +588,16 @@ func (s *ConsulSyncer) syncFull(ctx context.Context) { func (s *ConsulSyncer) syncOne(ctx context.Context, r *api.CatalogRegistration) { + // Create a new consul client. + consulClient, err := consul.NewClientFromConnMgr(s.ConsulClientConfig, s.ConsulServerConnMgr) + if err != nil { + s.Log.Error("failed to create Consul API client", "err", err) + return + } + // Register the service wopt := (&api.WriteOptions{}).WithContext(ctx) - _, err := s.Client.Catalog().Register(r, wopt) + _, err = consulClient.Catalog().Register(r, wopt) if err != nil { s.Log.Warn("error registering service", "node-name", r.Node, @@ -612,6 +619,13 @@ func (s *ConsulSyncer) deregOne(ctx context.Context, r *api.CatalogDeregistratio "service-id", r.ServiceID, "service-consul-namespace", r.Namespace) + // Create a new consul client. + consulClient, err := consul.NewClientFromConnMgr(s.ConsulClientConfig, s.ConsulServerConnMgr) + if err != nil { + s.Log.Error("failed to create Consul API client", "err", err) + return + } + _, err = consulClient.Catalog().Deregister(r, nil) if err != nil { // metric count for error deregistering k8s services from Consul @@ -625,7 +639,6 @@ func (s *ConsulSyncer) deregOne(ctx context.Context, r *api.CatalogDeregistratio "service-id", r.ServiceID, "service-consul-namespace", r.Namespace, "err", err) - continue } // metric count for deregistering k8s services from Consul From ba0ae30ef126a90997d70489d76703fa82b5f07b Mon Sep 17 00:00:00 2001 From: v-kamerdinerov Date: Tue, 22 Oct 2024 12:31:30 +0300 Subject: [PATCH 4/4] Drop unused comment lines --- control-plane/catalog/to-consul/syncer.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/control-plane/catalog/to-consul/syncer.go b/control-plane/catalog/to-consul/syncer.go index 5f97c17141..4744208b44 100644 --- a/control-plane/catalog/to-consul/syncer.go +++ b/control-plane/catalog/to-consul/syncer.go @@ -208,8 +208,6 @@ func (s *ConsulSyncer) Sync(rs []*api.CatalogRegistration) { } } - //s.serviceNames = serviceNames - //s.namespaces = namespaces // Signal that the initial sync is complete and our maps have been populated. // We can now safely reap untracked services. s.initialSyncOnce.Do(func() { close(s.initialSync) })