Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase speed sync with k8s #4384

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 117 additions & 35 deletions control-plane/catalog/to-consul/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package catalog
import (
"context"
"fmt"
"reflect"
"sync"
"time"

Expand Down Expand Up @@ -58,7 +59,7 @@ var SyncCatalogGauge = []prometheus.GaugeDefinition{
const (
// ConsulSyncPeriod is how often the syncer will attempt to
// reconcile the expected service states with the remote Consul server.
ConsulSyncPeriod = 30 * time.Second
ConsulSyncPeriod = 2 * time.Second

// ConsulServicePollPeriod is how often a service is checked for
// whether it has instances to reap.
Expand Down Expand Up @@ -171,6 +172,40 @@ func (s *ConsulSyncer) Sync(rs []*api.CatalogRegistration) {
}
s.namespaces[ns][r.Service.ID] = r
s.Log.Debug("[Sync] adding service to namespaces map", "service", r.Service)
// Sync immediately if the registration is new or changed
if s.shouldSync(r) {
s.Log.Info("syncing service", "node-name", r.Node, "service-name", r.Service.Service)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
s.syncOne(ctx, r)
}
}

// Deregister any services that are no longer present
for ns, services := range s.namespaces {
for _, svc := range services {
// Make sure the namespace exists before we run checks against it
if _, ok := s.serviceNames[ns]; ok {
// If the service is valid and its info isn't nil, we don't deregister it
if s.serviceNames[ns].Contains(svc.Service.Service) && s.namespaces[ns][svc.Service.ID] != nil {
continue
}
}

// Create deregistration object with optional namespace
dereg := api.CatalogDeregistration{
Node: svc.Node,
ServiceID: svc.Service.ID,
}
if s.EnableNamespaces {
dereg.Namespace = ns
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
s.deregOne(ctx, &dereg)
}
}

// Signal that the initial sync is complete and our maps have been populated.
Expand Down Expand Up @@ -485,34 +520,7 @@ func (s *ConsulSyncer) syncFull(ctx context.Context) {

// Do all deregistrations first.
for _, r := range s.deregs {
s.Log.Info("deregistering service",
"node-name", r.Node,
"service-id", r.ServiceID,
"service-consul-namespace", r.Namespace)

_, err = consulClient.Catalog().Deregister(r, nil)
if err != nil {
// metric count for error deregistering k8s services from Consul
labels := []metrics.Label{
{Name: "error", Value: err.Error()},
}
s.PrometheusSink.IncrCounterWithLabels(deregisterErrorName, 1, labels)

s.Log.Warn("error deregistering service",
"node-name", r.Node,
"service-id", r.ServiceID,
"service-consul-namespace", r.Namespace,
"err", err)
continue
}

// metric count for deregistering k8s services from Consul
labels := []metrics.Label{
{Name: "id", Value: r.ServiceID},
{Name: "node", Value: r.Node},
{Name: "namespace", Value: r.Namespace},
}
s.PrometheusSink.IncrCounterWithLabels(deregisterName, 1, labels)
s.deregOne(ctx, r)
}

// Always clear deregistrations, they'll repopulate if we had errors
Expand All @@ -522,6 +530,7 @@ func (s *ConsulSyncer) syncFull(ctx context.Context) {
// may have been made to the registered services.
for _, services := range s.namespaces {
for _, r := range services {
s.syncOne(ctx, r)
if s.EnableNamespaces {
_, err = namespaces.EnsureExists(consulClient, r.Service.Namespace, s.CrossNamespaceACLPolicy)
if err != nil {
Expand Down Expand Up @@ -553,12 +562,6 @@ func (s *ConsulSyncer) syncFull(ctx context.Context) {
continue
}

s.Log.Debug("registered service instance",
"node-name", r.Node,
"service-name", r.Service.Service,
"consul-namespace-name", r.Service.Namespace,
"service", r.Service)

// metric count and service metadata syncing k8s services to Consul
labels := []metrics.Label{
{Name: "id", Value: r.Service.ID},
Expand All @@ -581,6 +584,85 @@ func (s *ConsulSyncer) syncFull(ctx context.Context) {
}
}

func (s *ConsulSyncer) syncOne(ctx context.Context, r *api.CatalogRegistration) {

// Create a new consul client.
consulClient, err := consul.NewClientFromConnMgr(s.ConsulClientConfig, s.ConsulServerConnMgr)
if err != nil {
s.Log.Error("failed to create Consul API client", "err", err)
return
}

// Register the service
wopt := (&api.WriteOptions{}).WithContext(ctx)
_, err = consulClient.Catalog().Register(r, wopt)
if err != nil {
s.Log.Warn("error registering service",
"node-name", r.Node,
"service-name", r.Service.Service,
"err", err)
return
}

s.Log.Debug("registered service instance",
"node-name", r.Node,
"service-name", r.Service.Service,
"consul-namespace-name", r.Service.Namespace,
"service", r.Service)
}

func (s *ConsulSyncer) deregOne(ctx context.Context, r *api.CatalogDeregistration) {
s.Log.Info("deregistering service",
"node-name", r.Node,
"service-id", r.ServiceID,
"service-consul-namespace", r.Namespace)

// Create a new consul client.
consulClient, err := consul.NewClientFromConnMgr(s.ConsulClientConfig, s.ConsulServerConnMgr)
if err != nil {
s.Log.Error("failed to create Consul API client", "err", err)
return
}

_, err = consulClient.Catalog().Deregister(r, nil)
if err != nil {
// metric count for error deregistering k8s services from Consul
labels := []metrics.Label{
{Name: "error", Value: err.Error()},
}
s.PrometheusSink.IncrCounterWithLabels(deregisterErrorName, 1, labels)

s.Log.Warn("error deregistering service",
"node-name", r.Node,
"service-id", r.ServiceID,
"service-consul-namespace", r.Namespace,
"err", err)
}

// metric count for deregistering k8s services from Consul
labels := []metrics.Label{
{Name: "id", Value: r.ServiceID},
{Name: "node", Value: r.Node},
{Name: "namespace", Value: r.Namespace},
}
s.PrometheusSink.IncrCounterWithLabels(deregisterName, 1, labels)
}

func (s *ConsulSyncer) shouldSync(r *api.CatalogRegistration) bool {
// If the namespace doesn't exist, this service will
// definitely need to be registered
_, ok := s.namespaces[r.Service.Namespace]
if !ok {
return true
}
reg, ok := s.namespaces[r.Service.Namespace][r.Service.ID]
if !ok {
return true
}

return !reflect.DeepEqual(reg, r)
}

func (s *ConsulSyncer) init() {
s.lock.Lock()
defer s.lock.Unlock()
Expand Down
35 changes: 35 additions & 0 deletions control-plane/catalog/to-consul/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,41 @@ func TestConsulSyncer_register(t *testing.T) {
require.Equal(t, "127.0.0.1", service.Address)
}

func TestConsulSyncer_registerImmediate(t *testing.T) {
t.Parallel()
require := require.New(t)

a := agent.NewTestAgent(t, t.Name(), ``)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
client := a.Client()

s, closer := testConsulSyncerWithConfig(client, func(s *ConsulSyncer) {
s.SyncPeriod = 10 * time.Second
})
defer closer()

// Sync
s.Sync([]*api.CatalogRegistration{
testRegistration("foo", "bar", "default"),
})

// Read the service back out
services, _, err := client.Catalog().Service("bar", "", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if len(services) == 0 {
t.Fatal("service not found")
}
service := services[0]

// Verify the settings
require.Equal("foo", service.Node)
require.Equal("bar", service.ServiceName)
require.Equal("127.0.0.1", service.Address)
}

// Test that the syncer reaps individual invalid service instances.
func TestConsulSyncer_reapServiceInstance(t *testing.T) {
t.Parallel()
Expand Down