Skip to content

Commit

Permalink
Showing 8 changed files with 127 additions and 92 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ test: ut
## Use this to populate the vendor directory after checking out the repository.
## To update upstream dependencies, delete the glide.lock file first.
vendor:
glide install -strip-vendor -strip-vcs --cache
glide install -strip-vendor

.PHONY: ut
## Run the UTs locally. This requires a local etcd to be running.
96 changes: 43 additions & 53 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 1 addition & 6 deletions glide.yaml
Original file line number Diff line number Diff line change
@@ -26,9 +26,4 @@ import:
version: ^0.10.0
- package: github.com/termie/go-shutil
- package: github.com/mitchellh/go-ps
- package: k8s.io/kubernetes
subpackages:
- pkg/api
- pkg/api/v1
- pkg/client/clientset_generated/release_1_4
- pkg/client/unversioned/clientcmd
- package: k8s.io/client-go
8 changes: 4 additions & 4 deletions lib/backend/k8s/conversion.go
Original file line number Diff line number Diff line change
@@ -27,9 +27,9 @@ import (
"github.com/projectcalico/libcalico-go/lib/backend/model"
cnet "github.com/projectcalico/libcalico-go/lib/net"
"github.com/projectcalico/libcalico-go/lib/numorstring"
k8sapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/client-go/pkg/api/unversioned"
k8sapi "k8s.io/client-go/pkg/api/v1"
extensions "k8s.io/client-go/pkg/apis/extensions/v1beta1"
)

var (
@@ -123,7 +123,7 @@ func (c converter) isCalicoPod(pod *k8sapi.Pod) bool {
}

func (c converter) isHostNetworked(pod *k8sapi.Pod) bool {
return pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.HostNetwork == true
return pod.Spec.HostNetwork
}

func (c converter) hasIPAddress(pod *k8sapi.Pod) bool {
8 changes: 4 additions & 4 deletions lib/backend/k8s/conversion_test.go
Original file line number Diff line number Diff line change
@@ -19,10 +19,10 @@ import (
. "github.com/onsi/gomega"

"github.com/projectcalico/libcalico-go/lib/backend/model"
k8sapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/client-go/pkg/api/unversioned"
k8sapi "k8s.io/client-go/pkg/api/v1"
extensions "k8s.io/client-go/pkg/apis/extensions/v1beta1"
"k8s.io/client-go/pkg/util/intstr"
)

var _ = Describe("Test parsing strings", func() {
31 changes: 23 additions & 8 deletions lib/backend/k8s/k8s.go
Original file line number Diff line number Diff line change
@@ -16,18 +16,20 @@ package k8s

import (
goerrors "errors"
"time"

log "github.com/Sirupsen/logrus"
"github.com/projectcalico/libcalico-go/lib/backend/api"
"github.com/projectcalico/libcalico-go/lib/backend/model"
"github.com/projectcalico/libcalico-go/lib/errors"
k8sapi "k8s.io/kubernetes/pkg/api"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/client-go/kubernetes"
k8sapi "k8s.io/client-go/pkg/api/v1"
extensions "k8s.io/client-go/pkg/apis/extensions/v1beta1"
"k8s.io/client-go/tools/clientcmd"
)

type KubeClient struct {
clientSet *clientset.Clientset
clientSet *kubernetes.Clientset
converter converter
}

@@ -79,7 +81,7 @@ func NewKubeClient(kc *KubeConfig) (*KubeClient, error) {
}

// Create the clientset
cs, err := clientset.NewForConfig(config)
cs, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
@@ -285,7 +287,12 @@ func (c *KubeClient) listPolicies(l model.PolicyListOptions) ([]*model.KVPair, e
}

// Otherwise, list all NetworkPolicy objects in all Namespaces.
networkPolicies, err := c.clientSet.NetworkPolicies("").List(k8sapi.ListOptions{})
networkPolicies := extensions.NetworkPolicyList{}
err := c.clientSet.Extensions().RESTClient().
Get().
Resource("networkpolicies").
Timeout(10 * time.Second).
Do().Into(&networkPolicies)
if err != nil {
return nil, err
}
@@ -309,11 +316,19 @@ func (c *KubeClient) getPolicy(k model.PolicyKey) (*model.KVPair, error) {
return nil, goerrors.New("Missing policy name")
}
namespace, policyName := c.converter.parsePolicyName(k.Name)
networkPolicy, err := c.clientSet.NetworkPolicies(namespace).Get(policyName)

networkPolicy := extensions.NetworkPolicy{}
err := c.clientSet.Extensions().RESTClient().
Get().
Resource("networkpolicies").
Namespace(namespace).
Name(policyName).
Timeout(10 * time.Second).
Do().Into(&networkPolicy)
if err != nil {
return nil, err
}
return c.converter.networkPolicyToPolicy(networkPolicy)
return c.converter.networkPolicyToPolicy(&networkPolicy)
}

func (c *KubeClient) getReadyStatus(k model.ReadyFlagKey) (*model.KVPair, error) {
42 changes: 32 additions & 10 deletions lib/backend/k8s/k8s_fv_test.go
Original file line number Diff line number Diff line change
@@ -10,9 +10,9 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/projectcalico/libcalico-go/lib/backend/api"
"github.com/projectcalico/libcalico-go/lib/backend/model"
k8sapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/client-go/pkg/api/unversioned"
k8sapi "k8s.io/client-go/pkg/api/v1"
extensions "k8s.io/client-go/pkg/apis/extensions/v1beta1"
)

// cb implements the callback interface required for the
@@ -167,19 +167,29 @@ var _ = Describe("Test Syncer API for Kubernetes backend", func() {
},
},
}
_, err := c.clientSet.NetworkPolicies("default").Create(&np)
res := c.clientSet.Extensions().RESTClient().
Post().
Resource("networkpolicies").
Namespace("default").
Body(&np).
Do()

// Make sure we clean up after ourselves.
defer func() {
err = c.clientSet.NetworkPolicies("default").Delete(np.ObjectMeta.Name, &k8sapi.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())
res := c.clientSet.Extensions().RESTClient().
Delete().
Resource("networkpolicies").
Namespace("default").
Name(np.ObjectMeta.Name).
Do()
Expect(res.Error()).NotTo(HaveOccurred())
}()

// Check to see if the create succeeded.
Expect(err).NotTo(HaveOccurred())
Expect(res.Error()).NotTo(HaveOccurred())

// Perform a List and ensure it shows up in the Calico API.
_, err = c.List(model.PolicyListOptions{})
_, err := c.List(model.PolicyListOptions{})
Expect(err).NotTo(HaveOccurred())

// Perform a Get and ensure no error in the Calico API.
@@ -191,15 +201,27 @@ var _ = Describe("Test Syncer API for Kubernetes backend", func() {
defer func() {
log.Warnf("[TEST] Waiting for policies to tear down")
It("should clean up all policies", func() {
nps, err := c.clientSet.NetworkPolicies("default").List(k8sapi.ListOptions{})
nps := extensions.NetworkPolicyList{}
err := c.clientSet.Extensions().RESTClient().
Get().
Resource("networkpolicies").
Namespace("default").
Timeout(10 * time.Second).
Do().Into(&nps)
Expect(err).NotTo(HaveOccurred())

// Loop until no network policies exist.
for i := 0; i < 10; i++ {
if len(nps.Items) == 0 {
return
}
nps, err = c.clientSet.NetworkPolicies("default").List(k8sapi.ListOptions{})
nps := extensions.NetworkPolicyList{}
err := c.clientSet.Extensions().RESTClient().
Get().
Resource("networkpolicies").
Namespace("default").
Timeout(10 * time.Second).
Do().Into(&nps)
Expect(err).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
}
25 changes: 19 additions & 6 deletions lib/backend/k8s/syncer.go
Original file line number Diff line number Diff line change
@@ -23,9 +23,11 @@ import (
"github.com/projectcalico/libcalico-go/lib/backend/api"
"github.com/projectcalico/libcalico-go/lib/backend/compat"
"github.com/projectcalico/libcalico-go/lib/backend/model"
k8sapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/watch"
k8sapi "k8s.io/client-go/pkg/api/v1"
extensions "k8s.io/client-go/pkg/apis/extensions/v1beta1"
"k8s.io/client-go/pkg/fields"
"k8s.io/client-go/pkg/watch"
"k8s.io/client-go/tools/cache"
)

func newSyncer(kc KubeClient, callbacks api.SyncerCallbacks) *kubeSyncer {
@@ -166,9 +168,15 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
continue
}
opts = k8sapi.ListOptions{ResourceVersion: latestVersions.networkPolicyVersion}
npWatch, err := syn.kc.clientSet.NetworkPolicies("").Watch(opts)

listWatcher := cache.NewListWatchFromClient(
syn.kc.clientSet.Extensions().RESTClient(),
"networkpolicies",
"",
fields.Everything())
npWatch, err := listWatcher.WatchFunc(opts)
if err != nil {
log.Warn("Failed to connect to API, retrying")
log.Warnf("Failed to connect to API, retrying: %s", err)
time.Sleep(1 * time.Second)
continue
}
@@ -285,7 +293,12 @@ func (syn *kubeSyncer) performSnapshot() ([]model.KVPair, map[model.Key]bool, re

// Get NetworkPolicies (Policies)
log.Info("Syncing NetworkPolicy")
npList, err := syn.kc.clientSet.NetworkPolicies("").List(opts)
npList := extensions.NetworkPolicyList{}
err = syn.kc.clientSet.Extensions().RESTClient().
Get().
Resource("networkpolicies").
Timeout(10 * time.Second).
Do().Into(&npList)
if err != nil {
log.Warnf("Error accessing Kubernetes API, retrying: %s", err)
time.Sleep(1 * time.Second)

0 comments on commit d1901d6

Please sign in to comment.