diff --git a/glide.lock b/glide.lock index 0253b136b..53726a79b 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ hash: 3c05531c0d590c16abd195c43c39e2090ca94de48404032f6bb2651158f8f3b9 -updated: 2016-11-30T11:07:18.630258453-08:00 +updated: 2016-12-04T14:53:57.09779694-08:00 imports: - name: cloud.google.com/go version: 3b1ae45394a234c385be014e9a488f2bb6eef821 @@ -57,7 +57,7 @@ imports: - log - swagger - name: github.com/ghodss/yaml - version: a54de18a07046d8c4b26e9327698a2ebb9285b36 + version: 73d445a93680fa1a78ae23a5839bad48f32ba1ee - name: github.com/go-openapi/jsonpointer version: 46af16f9f7b149af66e5d1bd010e3574dc06de98 - name: github.com/go-openapi/jsonreference @@ -87,7 +87,7 @@ imports: - name: github.com/jonboulle/clockwork version: 2eee05ed794112d45db504eb05aa693efd2b8b09 - name: github.com/kelseyhightower/envconfig - version: 9aca109c9aec4633fced9717c4a09ecab3d33111 + version: ac879f01990976d36677903ead78ae78dece48f1 - name: github.com/mailru/easyjson version: d5b7844b561a7bc640052f1b935f7b800330d7e0 subpackages: @@ -101,7 +101,7 @@ imports: - name: github.com/olekukonko/tablewriter version: bdcc175572fd7abece6c831e643891b9331bc9e7 - name: github.com/onsi/ginkgo - version: 00054c0bb96fc880d4e0be1b90937fad438c5290 + version: 74c678d97c305753605c338c6c78c49ec104b5e7 subpackages: - config - extensions/table @@ -117,8 +117,6 @@ imports: - internal/writer - reporters - reporters/stenographer - - reporters/stenographer/support/go-colorable - - reporters/stenographer/support/go-isatty - types - name: github.com/pborman/uuid version: ca53cad383cad2479bbba7f7a1a05797ec1386e4 @@ -127,7 +125,7 @@ imports: - name: github.com/PuerkitoBio/urlesc version: 5bd2802263f21d8788851d5305584c82a5c75d7e - name: github.com/satori/go.uuid - version: b061729afc07e77a8aa4fad0a2fd840958f1942a + version: 879c5887cd475cd7864858769793b2ceb0d44feb - name: github.com/Sirupsen/logrus version: d26492970760ca5d33129d2d799e34be5c4782eb - name: github.com/spf13/pflag @@ -160,7 +158,7 @@ imports: - jws - jwt - name: golang.org/x/sys - version: d5645953809d8b4752afb2c3224b1f1ad73dfa70 + version: 076b546753157f758b316e59bcb51e6807c04057 subpackages: - unix - name: golang.org/x/text @@ -199,7 +197,7 @@ imports: - name: gopkg.in/yaml.v2 version: 53feefa2559fb8dfa8d81baad31be332c97d6c77 - name: k8s.io/client-go - version: 5d8c36c93cf544e3bde13caa4c6222b65753ec3c + version: 41a99d711af778a177f07402217b85d456b50da1 subpackages: - discovery - kubernetes @@ -317,7 +315,7 @@ imports: - transport testImports: - name: github.com/onsi/gomega - version: f1f0f388b31eca4e2cbe7a6dd8a3a1dfddda5b1c + version: d59fa0ac68bb5dd932ee8d24eed631cdd519efc3 subpackages: - format - internal/assertion diff --git a/lib/backend/api/api.go b/lib/backend/api/api.go index 399fbc8a7..0c4ae7124 100644 --- a/lib/backend/api/api.go +++ b/lib/backend/api/api.go @@ -101,6 +101,10 @@ type Client interface { // the datastore and then generates subsequent KVPair updates for // changes to the datastore. Syncer(callbacks SyncerCallbacks) Syncer + + // EnsureInitialized ensures that the backend is initialized + // any ready to be used. + EnsureInitialized() error } type Syncer interface { diff --git a/lib/backend/compat/compat.go b/lib/backend/compat/compat.go index 412d0d9e3..06c34f6be 100644 --- a/lib/backend/compat/compat.go +++ b/lib/backend/compat/compat.go @@ -35,6 +35,10 @@ func NewAdaptor(c api.Client) *ModelAdaptor { return &ModelAdaptor{client: c} } +func (c *ModelAdaptor) EnsureInitialized() error { + return c.client.EnsureInitialized() +} + // Create an entry in the datastore. This errors if the entry already exists. func (c *ModelAdaptor) Create(d *model.KVPair) (*model.KVPair, error) { var err error diff --git a/lib/backend/etcd/etcd.go b/lib/backend/etcd/etcd.go index 591b8bbbd..dd1c17860 100644 --- a/lib/backend/etcd/etcd.go +++ b/lib/backend/etcd/etcd.go @@ -102,6 +102,10 @@ func NewEtcdClient(config *EtcdConfig) (*EtcdClient, error) { return &EtcdClient{etcdClient: client, etcdKeysAPI: keys}, nil } +func (c *EtcdClient) EnsureInitialized() error { + return nil +} + func (c *EtcdClient) Syncer(callbacks api.SyncerCallbacks) api.Syncer { return newSyncer(c.etcdKeysAPI, callbacks) } diff --git a/lib/backend/k8s/conversion.go b/lib/backend/k8s/conversion.go index ffdc57aa1..c563dcb92 100644 --- a/lib/backend/k8s/conversion.go +++ b/lib/backend/k8s/conversion.go @@ -24,11 +24,13 @@ import ( "encoding/json" log "github.com/Sirupsen/logrus" + "github.com/projectcalico/libcalico-go/lib/backend/k8s/thirdparty" "github.com/projectcalico/libcalico-go/lib/backend/model" cnet "github.com/projectcalico/libcalico-go/lib/net" "github.com/projectcalico/libcalico-go/lib/numorstring" + kapi "k8s.io/client-go/pkg/api" "k8s.io/client-go/pkg/api/unversioned" - k8sapi "k8s.io/client-go/pkg/api/v1" + kapiv1 "k8s.io/client-go/pkg/api/v1" extensions "k8s.io/client-go/pkg/apis/extensions/v1beta1" ) @@ -79,7 +81,7 @@ func (c converter) parseProfileName(profileName string) (string, error) { return splits[1], nil } -func (c converter) namespaceToProfile(ns *k8sapi.Namespace) (*model.KVPair, error) { +func (c converter) namespaceToProfile(ns *kapiv1.Namespace) (*model.KVPair, error) { // Determine the ingress action based off the DefaultDeny annotation. ingressAction := "allow" for k, v := range ns.ObjectMeta.Annotations { @@ -116,21 +118,49 @@ func (c converter) namespaceToProfile(ns *k8sapi.Namespace) (*model.KVPair, erro return &kvp, nil } +func (c converter) tprToGlobalConfig(tpr *thirdparty.GlobalConfig) *model.KVPair { + kvp := &model.KVPair{ + Key: model.GlobalConfigKey{ + Name: tpr.Spec.Name, + }, + Value: tpr.Spec.Value, + Revision: tpr.Metadata.ResourceVersion, + } + return kvp +} + +func (c converter) globalConfigToTPR(kvp *model.KVPair) thirdparty.GlobalConfig { + tpr := thirdparty.GlobalConfig{ + Metadata: kapi.ObjectMeta{ + // Names in Kubernetes must be lower-case. + Name: strings.ToLower(kvp.Key.(model.GlobalConfigKey).Name), + }, + Spec: thirdparty.GlobalConfigSpec{ + Name: kvp.Key.(model.GlobalConfigKey).Name, + Value: kvp.Value.(string), + }, + } + if kvp.Revision != nil { + tpr.Metadata.ResourceVersion = kvp.Revision.(string) + } + return tpr +} + // isCalicoPod returns true if the pod should be shown as a workloadEndpoint // in the Calico API and false otherwise. -func (c converter) isCalicoPod(pod *k8sapi.Pod) bool { +func (c converter) isCalicoPod(pod *kapiv1.Pod) bool { return !c.isHostNetworked(pod) && c.hasIPAddress(pod) } -func (c converter) isHostNetworked(pod *k8sapi.Pod) bool { +func (c converter) isHostNetworked(pod *kapiv1.Pod) bool { return pod.Spec.HostNetwork } -func (c converter) hasIPAddress(pod *k8sapi.Pod) bool { +func (c converter) hasIPAddress(pod *kapiv1.Pod) bool { return pod.Status.PodIP != "" } -func (c converter) podToWorkloadEndpoint(pod *k8sapi.Pod) (*model.KVPair, error) { +func (c converter) podToWorkloadEndpoint(pod *kapiv1.Pod) (*model.KVPair, error) { // Pull out the profile and workload ID based on pod name and Namespace. profile := fmt.Sprintf("k8s_ns.%s", pod.ObjectMeta.Namespace) workload := fmt.Sprintf("%s.%s", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name) @@ -297,7 +327,7 @@ func (c converter) buildRule(port *extensions.NetworkPolicyPort, peer *extension } } -func (c converter) k8sProtocolToCalico(protocol *k8sapi.Protocol) *numorstring.Protocol { +func (c converter) k8sProtocolToCalico(protocol *kapiv1.Protocol) *numorstring.Protocol { if protocol != nil { p := numorstring.ProtocolFromString(strings.ToLower(string(*protocol))) return &p diff --git a/lib/backend/k8s/k8s.go b/lib/backend/k8s/k8s.go index 7d785a738..eb79b6db6 100644 --- a/lib/backend/k8s/k8s.go +++ b/lib/backend/k8s/k8s.go @@ -16,20 +16,39 @@ package k8s import ( goerrors "errors" + "fmt" + "strings" "time" log "github.com/Sirupsen/logrus" + "github.com/projectcalico/libcalico-go/lib/backend/api" + "github.com/projectcalico/libcalico-go/lib/backend/k8s/thirdparty" "github.com/projectcalico/libcalico-go/lib/backend/model" "github.com/projectcalico/libcalico-go/lib/errors" + "k8s.io/client-go/kubernetes" - k8sapi "k8s.io/client-go/pkg/api/v1" + clientapi "k8s.io/client-go/pkg/api" + kerrors "k8s.io/client-go/pkg/api/errors" + kapiv1 "k8s.io/client-go/pkg/api/v1" extensions "k8s.io/client-go/pkg/apis/extensions/v1beta1" + "k8s.io/client-go/pkg/runtime" + "k8s.io/client-go/pkg/runtime/schema" + "k8s.io/client-go/pkg/runtime/serializer" + "k8s.io/client-go/pkg/util/wait" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ) type KubeClient struct { + // Main Kubernetes clients. clientSet *kubernetes.Clientset + + // Client for interacting with ThirdPartyResources. + tprClient *rest.RESTClient + + // Contains methods for converting Kubernetes resources to + // Calico resources. converter converter } @@ -44,7 +63,7 @@ type KubeConfig struct { func NewKubeClient(kc *KubeConfig) (*KubeClient, error) { // Use the kubernetes client code to load the kubeconfig file and combine it with the overrides. - log.Infof("Building client for config: %+v", kc) + log.Debugf("Building client for config: %+v", kc) configOverrides := &clientcmd.ConfigOverrides{} var overridesMap = []struct { variable *string @@ -70,7 +89,7 @@ func NewKubeClient(kc *KubeConfig) (*KubeClient, error) { *override.variable = override.value } } - log.Infof("Config overrides: %+v", configOverrides) + log.Debugf("Config overrides: %+v", configOverrides) // A kubeconfig file was provided. Use it to load a config, passing through // any overrides. @@ -86,7 +105,148 @@ func NewKubeClient(kc *KubeConfig) (*KubeClient, error) { return nil, k8sErrorToCalico(err, nil) } log.Debugf("Created k8s clientSet: %+v", cs) - return &KubeClient{clientSet: cs}, nil + + tprClient, err := buildTPRClient(config) + if err != nil { + return nil, err + } + kubeClient := &KubeClient{ + clientSet: cs, + tprClient: tprClient, + } + + return kubeClient, nil +} + +func (c *KubeClient) EnsureInitialized() error { + // Ensure the necessary ThirdPartyResources exist in the API. + log.Info("Ensuring ThirdPartyResources exist") + err := c.ensureThirdPartyResources() + if err != nil { + return err + } + log.Info("ThirdPartyResources exist") + + // Ensure ClusterType is set. + log.Info("Ensuring ClusterType is set") + err = c.waitForClusterType() + if err != nil { + return err + } + log.Info("ClusterType is set") + return nil +} + +// ensureThirdPartyResources ensures the necessary thirdparty resources are created +// and will retry every second for 30 seconds or until they exist. +func (c *KubeClient) ensureThirdPartyResources() error { + return wait.PollImmediate(1*time.Second, 30*time.Second, func() (bool, error) { + return c.createThirdPartyResources() + }) +} + +// createThirdPartyResources creates the necessary third party resources if they +// do not already exist. +func (c *KubeClient) createThirdPartyResources() (bool, error) { + // Ensure a resource exists for Calico global configuration. + tpr := extensions.ThirdPartyResource{ + ObjectMeta: kapiv1.ObjectMeta{ + Name: "global-config.projectcalico.org", + Namespace: "kube-system", + }, + Description: "Calico Global Configuration", + Versions: []extensions.APIVersion{{Name: "v1"}}, + } + _, err := c.clientSet.Extensions().ThirdPartyResources().Create(&tpr) + if err != nil { + // Don't care if it already exists. + if !kerrors.IsAlreadyExists(err) { + return false, goerrors.New(fmt.Sprintf("failed to create ThirdPartyResource %s: %s", tpr.ObjectMeta.Name, err)) + } + } + return true, nil +} + +// waitForClusterType polls until GlobalConfig is ready, or until 30 seconds have passed. +func (c *KubeClient) waitForClusterType() error { + return wait.PollImmediate(1*time.Second, 30*time.Second, func() (bool, error) { + return c.ensureClusterType() + }) +} + +// ensureClusterType ensures that the ClusterType is configured. +func (c *KubeClient) ensureClusterType() (bool, error) { + k := model.GlobalConfigKey{ + Name: "ClusterType", + } + value := "kubernetes,k8sdatastoredriver" + + // See if a cluster type has been set. If so, append + // any existing values to it. + ct, err := c.Get(k) + if err != nil { + if _, ok := err.(errors.ErrorResourceDoesNotExist); !ok { + // Resource exists but we got another error. + return false, err + } + // Resource does not exist. + } + if ct != nil { + existingValue := ct.Value.(string) + if !strings.Contains(existingValue, "kubernetes") { + existingValue = fmt.Sprintf("%s,kubernetes", existingValue) + } + + if !strings.Contains(existingValue, "k8sdatastoredriver") { + existingValue = fmt.Sprintf("%s,k8sdatastoredriver", existingValue) + } + value = existingValue + } + _, err = c.Apply(&model.KVPair{ + Key: k, + Value: value, + }) + if err != nil { + // Don't return an error, but indicate that we need + // to retry. + log.Warnf("Failed to apply ClusterType: %s", err) + return false, nil + } + return true, nil +} + +// buildTPRClient builds a RESTClient configured to interact with Calico ThirdPartyResources +func buildTPRClient(baseConfig *rest.Config) (*rest.RESTClient, error) { + // Generate config using the base config. + cfg := baseConfig + cfg.GroupVersion = &schema.GroupVersion{ + Group: "projectcalico.org", + Version: "v1", + } + cfg.APIPath = "/apis" + cfg.ContentType = runtime.ContentTypeJSON + cfg.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: clientapi.Codecs} + + cli, err := rest.RESTClientFor(cfg) + if err != nil { + return nil, err + } + + // We also need to register resources. + schemeBuilder := runtime.NewSchemeBuilder( + func(scheme *runtime.Scheme) error { + scheme.AddKnownTypes( + *cfg.GroupVersion, + &thirdparty.GlobalConfig{}, + &thirdparty.GlobalConfigList{}, + &kapiv1.ListOptions{}, + &kapiv1.DeleteOptions{}, + ) + return nil + }) + schemeBuilder.AddToScheme(clientapi.Scheme) + + return cli, nil } func (c *KubeClient) Syncer(callbacks api.SyncerCallbacks) api.Syncer { @@ -95,20 +255,29 @@ func (c *KubeClient) Syncer(callbacks api.SyncerCallbacks) api.Syncer { // Create an entry in the datastore. This errors if the entry already exists. func (c *KubeClient) Create(d *model.KVPair) (*model.KVPair, error) { - log.Warn("Attempt to 'Create' using kubernetes backend is not supported.") - return nil, errors.ErrorResourceDoesNotExist{ - Err: goerrors.New("Resource does not exist"), - Identifier: d.Key, + switch d.Key.(type) { + case model.GlobalConfigKey: + return c.createGlobalConfig(d) + default: + log.Warn("Attempt to 'Create' using kubernetes backend is not supported.") + return nil, errors.ErrorOperationNotSupported{ + Identifier: d.Key, + Operation: "Create", + } } } // Update an existing entry in the datastore. This errors if the entry does // not exist. func (c *KubeClient) Update(d *model.KVPair) (*model.KVPair, error) { - // This is a noop. Calico components shouldn't be modifying - // k8s resources. - log.Infof("Kubernetes backend received 'Update' for %+v - do nothing.", d.Key) - return d, nil + switch d.Key.(type) { + case model.GlobalConfigKey: + return c.updateGlobalConfig(d) + default: + // If the resource isn't supported, then this is a no-op. + log.Infof("'Update' for %+v is no-op", d.Key) + return d, nil + } } // Set an existing entry in the datastore. This ignores whether an entry already @@ -117,21 +286,28 @@ func (c *KubeClient) Apply(d *model.KVPair) (*model.KVPair, error) { switch d.Key.(type) { case model.WorkloadEndpointKey: return c.applyWorkloadEndpoint(d) + case model.GlobalConfigKey: + return c.applyGlobalConfig(d) default: - log.Infof("Ignoring 'Apply' for %s", d.Key) + log.Infof("'Apply' for %s is no-op", d.Key) return d, nil } } // Delete an entry in the datastore. This is a no-op when using the k8s backend. func (c *KubeClient) Delete(d *model.KVPair) error { - log.Warn("Attempt to 'Delete' using kubernetes backend is not supported.") - return nil + switch d.Key.(type) { + case model.GlobalConfigKey: + return c.deleteGlobalConfig(d) + default: + log.Warn("Attempt to 'Delete' using kubernetes backend is not supported.") + return nil + } } // Get an entry from the datastore. This errors if the entry does not exist. func (c *KubeClient) Get(k model.Key) (*model.KVPair, error) { - log.Debugf("Received 'Get' request for %+v", k) + log.Debugf("Performing 'Get' for %+v", k) switch k.(type) { case model.ProfileKey: return c.getProfile(k.(model.ProfileKey)) @@ -146,9 +322,9 @@ func (c *KubeClient) Get(k model.Key) (*model.KVPair, error) { case model.ReadyFlagKey: return c.getReadyStatus(k.(model.ReadyFlagKey)) default: - return nil, errors.ErrorResourceDoesNotExist{ - Err: goerrors.New("Resource does not exist"), + return nil, errors.ErrorOperationNotSupported{ Identifier: k, + Operation: "Get", } } } @@ -156,7 +332,7 @@ func (c *KubeClient) Get(k model.Key) (*model.KVPair, error) { // List entries in the datastore. This may return an empty list of there are // no entries matching the request in the ListInterface. func (c *KubeClient) List(l model.ListInterface) ([]*model.KVPair, error) { - log.Debugf("Received 'List' request for %+v", l) + log.Debugf("Performing 'List' for %+v", l) switch l.(type) { case model.ProfileListOptions: return c.listProfiles(l.(model.ProfileListOptions)) @@ -185,7 +361,7 @@ func (c *KubeClient) listProfiles(l model.ProfileListOptions) ([]*model.KVPair, } // Otherwise, enumerate all. - namespaces, err := c.clientSet.Namespaces().List(k8sapi.ListOptions{}) + namespaces, err := c.clientSet.Namespaces().List(kapiv1.ListOptions{}) if err != nil { return nil, k8sErrorToCalico(err, l) } @@ -262,7 +438,7 @@ func (c *KubeClient) listWorkloadEndpoints(l model.WorkloadEndpointListOptions) // Otherwise, enumerate all pods in all namespaces. // We don't yet support hostname, orchestratorID, for the k8s backend. - pods, err := c.clientSet.Pods("").List(k8sapi.ListOptions{}) + pods, err := c.clientSet.Pods("").List(kapiv1.ListOptions{}) if err != nil { return nil, k8sErrorToCalico(err, l) } @@ -362,12 +538,113 @@ func (c *KubeClient) getReadyStatus(k model.ReadyFlagKey) (*model.KVPair, error) return &model.KVPair{Key: k, Value: true}, nil } +// applyGlobalConfig updates a global config if it exists, and creates it +// if it doesn't. +func (c *KubeClient) applyGlobalConfig(kvp *model.KVPair) (*model.KVPair, error) { + updated, err := c.updateGlobalConfig(kvp) + if err != nil { + if _, ok := err.(errors.ErrorResourceDoesNotExist); !ok { + // Error other than "not found" - return. + return nil, err + } + + // It doesn't exist - create it. + updated, err = c.createGlobalConfig(kvp) + if err != nil { + return nil, err + } + } + return updated, nil +} + +// updateGlobalConfig updates a global config if it exists, and returns an error +// if it doesn't. +func (c *KubeClient) updateGlobalConfig(kvp *model.KVPair) (*model.KVPair, error) { + gcfg := c.converter.globalConfigToTPR(kvp) + res := thirdparty.GlobalConfig{} + req := c.tprClient.Put(). + Resource("globalconfigs"). + Namespace("kube-system"). + Body(&gcfg). + Name(gcfg.Metadata.Name) + err := req.Do().Into(&res) + if err != nil { + return nil, k8sErrorToCalico(err, kvp.Key) + } + kvp.Revision = gcfg.Metadata.ResourceVersion + return kvp, nil +} + +// createGlobalConfig creates a global config if it doesn't exist, and +// returns an error if it does. +func (c *KubeClient) createGlobalConfig(kvp *model.KVPair) (*model.KVPair, error) { + gcfg := c.converter.globalConfigToTPR(kvp) + res := thirdparty.GlobalConfig{} + req := c.tprClient.Post(). + Resource("globalconfigs"). + Namespace("kube-system"). + Body(&gcfg) + err := req.Do().Into(&res) + if err != nil { + return nil, k8sErrorToCalico(err, kvp.Key) + } + kvp.Revision = gcfg.Metadata.ResourceVersion + return kvp, nil +} + +// getGlobalConfig gets a global config and returns an error if it doesn't exist. func (c *KubeClient) getGlobalConfig(k model.GlobalConfigKey) (*model.KVPair, error) { - return nil, goerrors.New("Get for GlobalConfig not supported in kubernetes backend") + cfg := thirdparty.GlobalConfig{} + err := c.tprClient.Get(). + Resource("globalconfigs"). + Namespace("kube-system"). + Name(strings.ToLower(k.Name)). + Do().Into(&cfg) + if err != nil { + return nil, k8sErrorToCalico(err, k) + } + + return c.converter.tprToGlobalConfig(&cfg), nil } +// listGlobalConfig lists all global configs. func (c *KubeClient) listGlobalConfig(l model.GlobalConfigListOptions) ([]*model.KVPair, error) { - return []*model.KVPair{}, nil + cfgs := []*model.KVPair{} + gcfg := thirdparty.GlobalConfigList{} + + // Build the request. + req := c.tprClient.Get().Resource("globalconfigs").Namespace("kube-system") + if l.Name != "" { + req.Name(strings.ToLower(l.Name)) + } + + // Perform the request. + err := req.Do().Into(&gcfg) + if err != nil { + // Don't return errors for "not found". This just + // means thre are no GlobalConfigs, and we should return + // an empty list. + if !kerrors.IsNotFound(err) { + return nil, k8sErrorToCalico(err, l) + } + } + + // Convert them to KVPairs. + for _, cfg := range gcfg.Items { + cfgs = append(cfgs, c.converter.tprToGlobalConfig(&cfg)) + } + + return cfgs, nil +} + +// deleteGlobalConfig deletes the given global config. +func (c *KubeClient) deleteGlobalConfig(k *model.KVPair) error { + result := c.tprClient.Delete(). + Resource("globalconfigs"). + Namespace("kube-system"). + Name(strings.ToLower(k.Key.(model.GlobalConfigKey).Name)). + Do() + return result.Error() } func (c *KubeClient) getHostConfig(k model.HostConfigKey) (*model.KVPair, error) { diff --git a/lib/backend/k8s/k8s_fv_test.go b/lib/backend/k8s/k8s_fv_test.go index b93edc673..1a61d4817 100644 --- a/lib/backend/k8s/k8s_fv_test.go +++ b/lib/backend/k8s/k8s_fv_test.go @@ -67,6 +67,12 @@ func CreateClientAndStartSyncer() *KubeClient { panic(err) } + // Ensure the backend is initialized. + err = c.EnsureInitialized() + if err != nil { + panic(err) + } + // Start the syncer. callback := cb{ status: api.WaitForDatastore, @@ -314,4 +320,86 @@ var _ = Describe("Test Syncer API for Kubernetes backend", func() { Expect(err).NotTo(HaveOccurred()) Expect(len(objs)).To(Equal(0)) }) + + It("should support setting and getting GlobalConfig", func() { + gc := &model.KVPair{ + Key: model.GlobalConfigKey{ + Name: "ClusterGUID", + }, + Value: "someguid", + } + var updGC *model.KVPair + var err error + + By("creating a new object", func() { + updGC, err = c.Create(gc) + Expect(err).NotTo(HaveOccurred()) + Expect(updGC.Value.(string)).To(Equal(gc.Value.(string))) + Expect(updGC.Key.(model.GlobalConfigKey).Name).To(Equal("ClusterGUID")) + Expect(updGC.Revision).NotTo(BeNil()) + }) + + By("getting an existing object", func() { + updGC, err = c.Get(gc.Key) + Expect(err).NotTo(HaveOccurred()) + Expect(updGC.Value.(string)).To(Equal(gc.Value.(string))) + Expect(updGC.Key.(model.GlobalConfigKey).Name).To(Equal("ClusterGUID")) + Expect(updGC.Revision).NotTo(BeNil()) + }) + + By("updating an existing object", func() { + updGC.Value = "someotherguid" + updGC, err = c.Update(updGC) + Expect(err).NotTo(HaveOccurred()) + Expect(updGC.Value.(string)).To(Equal("someotherguid")) + }) + + By("getting the updated object", func() { + updGC, err = c.Get(gc.Key) + Expect(err).NotTo(HaveOccurred()) + Expect(updGC.Value.(string)).To(Equal("someotherguid")) + Expect(updGC.Key.(model.GlobalConfigKey).Name).To(Equal("ClusterGUID")) + Expect(updGC.Revision).NotTo(BeNil()) + }) + + By("applying an existing object", func() { + updGC.Value = "somenewguid" + updGC, err = c.Apply(updGC) + Expect(err).NotTo(HaveOccurred()) + Expect(updGC.Value.(string)).To(Equal("somenewguid")) + }) + + By("getting the applied object", func() { + updGC, err = c.Get(gc.Key) + Expect(err).NotTo(HaveOccurred()) + Expect(updGC.Value.(string)).To(Equal("somenewguid")) + Expect(updGC.Key.(model.GlobalConfigKey).Name).To(Equal("ClusterGUID")) + Expect(updGC.Revision).NotTo(BeNil()) + }) + + By("deleting an existing object", func() { + err = c.Delete(gc) + Expect(err).NotTo(HaveOccurred()) + }) + + By("getting a non-existing object", func() { + updGC, err = c.Get(gc.Key) + Expect(err).To(HaveOccurred()) + Expect(updGC).To(BeNil()) + }) + + By("applying a new object", func() { + updGC, err = c.Apply(gc) + Expect(err).NotTo(HaveOccurred()) + Expect(updGC.Value.(string)).To(Equal(gc.Value.(string))) + }) + + By("getting the applied object", func() { + updGC, err = c.Get(gc.Key) + Expect(err).NotTo(HaveOccurred()) + Expect(updGC.Value.(string)).To(Equal(gc.Value.(string))) + Expect(updGC.Key.(model.GlobalConfigKey).Name).To(Equal("ClusterGUID")) + Expect(updGC.Revision).NotTo(BeNil()) + }) + }) }) diff --git a/lib/backend/k8s/syncer.go b/lib/backend/k8s/syncer.go index 0fb964b22..977eedfa9 100644 --- a/lib/backend/k8s/syncer.go +++ b/lib/backend/k8s/syncer.go @@ -22,7 +22,9 @@ import ( log "github.com/Sirupsen/logrus" "github.com/projectcalico/libcalico-go/lib/backend/api" "github.com/projectcalico/libcalico-go/lib/backend/compat" + "github.com/projectcalico/libcalico-go/lib/backend/k8s/thirdparty" "github.com/projectcalico/libcalico-go/lib/backend/model" + k8sapi "k8s.io/client-go/pkg/api/v1" extensions "k8s.io/client-go/pkg/apis/extensions/v1beta1" "k8s.io/client-go/pkg/fields" @@ -53,6 +55,7 @@ type resourceVersions struct { podVersion string namespaceVersion string networkPolicyVersion string + globalConfigVersion string } func (syn *kubeSyncer) Start() { @@ -124,7 +127,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() { latestVersions := resourceVersions{} // Other watcher vars. - var nsChan, poChan, npChan <-chan watch.Event + var nsChan, poChan, npChan, gcChan <-chan watch.Event var event watch.Event var kvp *model.KVPair var opts k8sapi.ListOptions @@ -142,6 +145,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() { // Get snapshot from datastore. snap, existingKeys, latestVersions := syn.performSnapshot() + log.Debugf("Snapshot: %+v, keys: %+v, versions: %+v", snap, existingKeys, latestVersions) // Go through and delete anything that existed before, but doesn't anymore. syn.performSnapshotDeletes(existingKeys) @@ -156,27 +160,41 @@ func (syn *kubeSyncer) readFromKubernetesAPI() { opts = k8sapi.ListOptions{ResourceVersion: latestVersions.namespaceVersion} nsWatch, err := syn.kc.clientSet.Namespaces().Watch(opts) if err != nil { - log.Warn("Failed to connect to API, retrying") + log.Warn("Failed to watch Namespaces, retrying: %s", err) time.Sleep(1 * time.Second) continue } opts = k8sapi.ListOptions{ResourceVersion: latestVersions.podVersion} poWatch, err := syn.kc.clientSet.Pods("").Watch(opts) if err != nil { - log.Warn("Failed to connect to API, retrying") + log.Warn("Failed to watch Pods, retrying: %s", err) time.Sleep(1 * time.Second) continue } opts = k8sapi.ListOptions{ResourceVersion: latestVersions.networkPolicyVersion} - listWatcher := cache.NewListWatchFromClient( + // Create watcher for NetworkPolicy objects. + netpolListWatcher := cache.NewListWatchFromClient( syn.kc.clientSet.Extensions().RESTClient(), "networkpolicies", "", fields.Everything()) - npWatch, err := listWatcher.WatchFunc(opts) + npWatch, err := netpolListWatcher.WatchFunc(opts) + if err != nil { + log.Warnf("Failed to watch NetworkPolicies, retrying: %s", err) + time.Sleep(1 * time.Second) + continue + } + + // Create watcher for Calico global config resources. + tprListWatcher := cache.NewListWatchFromClient( + syn.kc.tprClient, + "globalconfigs", + "kube-system", + fields.Everything()) + tprWatch, err := tprListWatcher.WatchFunc(opts) if err != nil { - log.Warnf("Failed to connect to API, retrying: %s", err) + log.Warnf("Failed to watch GlobalConfig, retrying: %s", err) time.Sleep(1 * time.Second) continue } @@ -184,6 +202,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() { nsChan = nsWatch.ResultChan() poChan = poWatch.ResultChan() npChan = npWatch.ResultChan() + gcChan = tprWatch.ResultChan() // Success - reset the flag. needsResync = false @@ -236,6 +255,18 @@ func (syn *kubeSyncer) readFromKubernetesAPI() { kvp = syn.parseNetworkPolicyEvent(event) latestVersions.networkPolicyVersion = kvp.Revision.(string) syn.sendUpdates([]model.KVPair{*kvp}) + case event = <-gcChan: + log.Debugf("Incoming GlobalConfig watch event. Type=%s", event.Type) + if needsResync = syn.eventTriggersResync(event); needsResync { + // We need to resync. Break out into the sync loop. + log.Warn("Event triggered resync: %+v", event) + continue + } + + // Event is OK - parse it and send it over the channel. + kvp = syn.parseGlobalConfigEvent(event) + latestVersions.globalConfigVersion = kvp.Revision.(string) + syn.sendUpdates([]model.KVPair{*kvp}) } } } @@ -243,10 +274,12 @@ func (syn *kubeSyncer) readFromKubernetesAPI() { func (syn *kubeSyncer) performSnapshotDeletes(exists map[model.Key]bool) { log.Info("Checking for any deletes for snapshot") deletes := []model.KVPair{} + log.Debugf("Keys in snapshot: %+v", exists) for cachedKey, _ := range syn.tracker { // Check each cached key to see if it exists in the snapshot. If it doesn't, // we need to send a delete for it. if _, stillExists := exists[cachedKey]; !stillExists { + log.Debugf("Cached key not in snapshot: %+v", cachedKey) deletes = append(deletes, model.KVPair{Key: cachedKey, Value: nil}) } } @@ -259,13 +292,14 @@ func (syn *kubeSyncer) performSnapshotDeletes(exists map[model.Key]bool) { // populates the provided resourceVersions with the latest k8s resource version // for each. func (syn *kubeSyncer) performSnapshot() ([]model.KVPair, map[model.Key]bool, resourceVersions) { - var snap []model.KVPair - var keys map[model.Key]bool opts := k8sapi.ListOptions{} versions := resourceVersions{} + var snap []model.KVPair + var keys map[model.Key]bool // Loop until we successfully are able to accesss the API. for { + // Initialize the values to return. snap = []model.KVPair{} keys = map[model.Key]bool{} @@ -273,7 +307,7 @@ func (syn *kubeSyncer) performSnapshot() ([]model.KVPair, map[model.Key]bool, re log.Info("Syncing Namespaces") nsList, err := syn.kc.clientSet.Namespaces().List(opts) if err != nil { - log.Warnf("Error accessing Kubernetes API, retrying: %s", err) + log.Warnf("Error syncing Namespaces, retrying: %s", err) time.Sleep(1 * time.Second) continue } @@ -305,7 +339,7 @@ func (syn *kubeSyncer) performSnapshot() ([]model.KVPair, map[model.Key]bool, re Timeout(10 * time.Second). Do().Into(&npList) if err != nil { - log.Warnf("Error accessing Kubernetes API, retrying: %s", err) + log.Warnf("Error syncing NetworkPolicies, retrying: %s", err) time.Sleep(1 * time.Second) continue } @@ -321,7 +355,7 @@ func (syn *kubeSyncer) performSnapshot() ([]model.KVPair, map[model.Key]bool, re log.Info("Syncing Pods") poList, err := syn.kc.clientSet.Pods("").List(opts) if err != nil { - log.Warnf("Error accessing Kubernetes API, retrying: %s", err) + log.Warnf("Error syncing Pods, retrying: %s", err) time.Sleep(1 * time.Second) continue } @@ -338,7 +372,7 @@ func (syn *kubeSyncer) performSnapshot() ([]model.KVPair, map[model.Key]bool, re // Sync GlobalConfig. confList, err := syn.kc.listGlobalConfig(model.GlobalConfigListOptions{}) if err != nil { - log.Warnf("Error accessing Kubernetes API, retrying: %s", err) + log.Warnf("Error syncing GlobalConfig, retrying: %s", err) time.Sleep(1 * time.Second) continue } @@ -351,7 +385,7 @@ func (syn *kubeSyncer) performSnapshot() ([]model.KVPair, map[model.Key]bool, re // Include ready state. ready, err := syn.kc.getReadyStatus(model.ReadyFlagKey{}) if err != nil { - log.Warnf("Error accessing Kubernetes API, retrying: %s", err) + log.Warnf("Error getting ready status, retrying: %s", err) time.Sleep(1 * time.Second) continue } @@ -477,3 +511,21 @@ func (syn *kubeSyncer) parseNetworkPolicyEvent(e watch.Event) *model.KVPair { } return kvp } + +func (syn *kubeSyncer) parseGlobalConfigEvent(e watch.Event) *model.KVPair { + log.Debug("Parsing GlobalConfig watch event") + // First, check the event type. + gc, ok := e.Object.(*thirdparty.GlobalConfig) + if !ok { + log.Panicf("Invalid GlobalConfig event. Type: %s, Object: %+v", e.Type, e.Object) + } + + // Convert the received GlobalConfig into a KVPair. + kvp := syn.kc.converter.tprToGlobalConfig(gc) + + // For deletes, we need to nil out the Value part of the KVPair + if e.Type == watch.Deleted { + kvp.Value = nil + } + return kvp +} diff --git a/lib/backend/k8s/thirdparty/global_config.go b/lib/backend/k8s/thirdparty/global_config.go new file mode 100644 index 000000000..a58f9d379 --- /dev/null +++ b/lib/backend/k8s/thirdparty/global_config.go @@ -0,0 +1,78 @@ +package thirdparty + +import ( + "encoding/json" + + "k8s.io/client-go/pkg/api" + "k8s.io/client-go/pkg/api/meta" + "k8s.io/client-go/pkg/api/unversioned" + "k8s.io/client-go/pkg/runtime/schema" +) + +type GlobalConfigSpec struct { + Name string `json:"name"` + Value string `json:"value"` +} + +type GlobalConfig struct { + unversioned.TypeMeta `json:",inline"` + Metadata api.ObjectMeta `json:"metadata"` + + Spec GlobalConfigSpec `json:"spec"` +} + +type GlobalConfigList struct { + unversioned.TypeMeta `json:",inline"` + Metadata unversioned.ListMeta `json:"metadata"` + + Items []GlobalConfig `json:"items"` +} + +// Required to satisfy Object interface +func (e *GlobalConfig) GetObjectKind() schema.ObjectKind { + return &e.TypeMeta +} + +// Required to satisfy ObjectMetaAccessor interface +func (e *GlobalConfig) GetObjectMeta() meta.Object { + return &e.Metadata +} + +// Required to satisfy Object interface +func (el *GlobalConfigList) GetObjectKind() schema.ObjectKind { + return &el.TypeMeta +} + +// Required to satisfy ListMetaAccessor interface +func (el *GlobalConfigList) GetListMeta() unversioned.List { + return &el.Metadata +} + +// The code below is used only to work around a known problem with third-party +// resources and ugorji. If/when these issues are resolved, the code below +// should no longer be required. + +type GlobalConfigListCopy GlobalConfigList +type GlobalConfigCopy GlobalConfig + +func (g *GlobalConfig) UnmarshalJSON(data []byte) error { + tmp := GlobalConfigCopy{} + err := json.Unmarshal(data, &tmp) + if err != nil { + return err + } + tmp2 := GlobalConfig(tmp) + *g = tmp2 + return nil +} + +func (l *GlobalConfigList) UnmarshalJSON(data []byte) error { + tmp := GlobalConfigListCopy{} + err := json.Unmarshal(data, &tmp) + if err != nil { + return err + } + tmp2 := GlobalConfigList(tmp) + *l = tmp2 + return nil +} diff --git a/lib/backend/model/profile.go b/lib/backend/model/profile.go index 9f75bb1a2..36c540f13 100644 --- a/lib/backend/model/profile.go +++ b/lib/backend/model/profile.go @@ -77,7 +77,7 @@ func (key ProfileRulesKey) valueType() reflect.Type { } func (key ProfileRulesKey) String() string { - return fmt.Sprintf("Profile(name=%s)", key.Name) + return fmt.Sprintf("ProfileRules(name=%s)", key.Name) } // ProfileTagsKey implements the KeyInterface for the profile tags @@ -95,7 +95,7 @@ func (key ProfileTagsKey) valueType() reflect.Type { } func (key ProfileTagsKey) String() string { - return fmt.Sprintf("Profile(name=%s)", key.Name) + return fmt.Sprintf("ProfileTags(name=%s)", key.Name) } // ProfileLabelsKey implements the KeyInterface for the profile labels @@ -113,7 +113,7 @@ func (key ProfileLabelsKey) valueType() reflect.Type { } func (key ProfileLabelsKey) String() string { - return fmt.Sprintf("Profile(name=%s)", key.Name) + return fmt.Sprintf("ProfileLabels(name=%s)", key.Name) } type ProfileListOptions struct { diff --git a/lib/client/profile_e2e_test.go b/lib/client/profile_e2e_test.go index f543fa26f..e54ab538f 100644 --- a/lib/client/profile_e2e_test.go +++ b/lib/client/profile_e2e_test.go @@ -74,7 +74,7 @@ var _ = Describe("Profile tests", func() { _, outError := c.Profiles().Update(&api.Profile{Metadata: meta1, Spec: spec1}) // Should return an error. - Expect(outError.Error()).To(Equal(errors.New("resource does not exist: Profile(name=profile1)").Error())) + Expect(outError.Error()).To(Equal(errors.New("resource does not exist: ProfileTags(name=profile1)").Error())) By("Create, Apply, Get and compare") @@ -155,7 +155,7 @@ var _ = Describe("Profile tests", func() { _, outError = c.Profiles().Get(meta1) // Expect an error since the profile was deleted. - Expect(outError.Error()).To(Equal(errors.New("resource does not exist: Profile(name=profile1)").Error())) + Expect(outError.Error()).To(Equal(errors.New("resource does not exist: ProfileTags(name=profile1)").Error())) // Delete the second profile with meta2. outError1 = c.Profiles().Delete(meta2) diff --git a/lib/errors/errors.go b/lib/errors/errors.go index 35ad88f9d..1ab9e7ebd 100644 --- a/lib/errors/errors.go +++ b/lib/errors/errors.go @@ -39,6 +39,16 @@ func (e ErrorResourceDoesNotExist) Error() string { return fmt.Sprintf("resource does not exist: %s", e.Identifier) } +// Error indicating an operation is not supported. +type ErrorOperationNotSupported struct { + Operation string + Identifier interface{} +} + +func (e ErrorOperationNotSupported) Error() string { + return fmt.Sprintf("operation %s is not supported on %s", e.Operation, e.Identifier) +} + // Error indicating a resource already exists. Used when attempting to create a // resource that already exists. type ErrorResourceAlreadyExists struct { diff --git a/run-uts b/run-uts index b9915e086..b93092730 100755 --- a/run-uts +++ b/run-uts @@ -4,7 +4,7 @@ set -e # Run tests in random order find tests recursively (-r). echo $WHAT -ginkgo -cover -r --skipPackage vendor -regexScansFilePath $WHAT +ginkgo -cover -r --skipPackage vendor $WHAT echo echo '+==============+'