diff --git a/main.go b/main.go index aed59c2a19..baaeb96c13 100644 --- a/main.go +++ b/main.go @@ -24,6 +24,8 @@ import ( "strings" "time" + corev1 "k8s.io/api/core/v1" + "github.com/cloudflare/cfssl/log" v3 "github.com/tigera/api/pkg/apis/projectcalico/v3" @@ -246,17 +248,20 @@ func main() { Port: 9443, LeaderElection: enableLeaderElection, LeaderElectionID: "operator-lock", - // We should test this again in the future to see if the problem with LicenseKey updates - // being missed is resolved. Prior to controller-runtime 0.7 we observed Test failures - // where LicenseKey updates would be missed and the client cache did not have the LicenseKey. - // The controller-runtime was updated and we made use of this ClientDisableCacheFor feature - // for the LicenseKey. We should test again in the future to see if the cache issue is fixed - // and we can remove this. Here is a link to the upstream issue - // https://github.com/kubernetes-sigs/controller-runtime/issues/1316 Client: client.Options{ Cache: &client.CacheOptions{ DisableFor: []client.Object{ + // We should test this again in the future to see if the problem with LicenseKey updates + // being missed is resolved. Prior to controller-runtime 0.7 we observed Test failures + // where LicenseKey updates would be missed and the client cache did not have the LicenseKey. + // The controller-runtime was updated and we made use of this ClientDisableCacheFor feature + // for the LicenseKey. We should test again in the future to see if the cache issue is fixed + // and we can remove this. Here is a link to the upstream issue + // https://github.com/kubernetes-sigs/controller-runtime/issues/1316 &v3.LicenseKey{}, + + // Avoid caching Pods, as this can cause excessive memory usage in large clusters. + &corev1.Pod{}, }, }, }, diff --git a/pkg/controller/clusterconnection/clusterconnection_controller.go b/pkg/controller/clusterconnection/clusterconnection_controller.go index 283cbe75f5..553324e6d3 100644 --- a/pkg/controller/clusterconnection/clusterconnection_controller.go +++ b/pkg/controller/clusterconnection/clusterconnection_controller.go @@ -19,6 +19,12 @@ import ( "fmt" "net" + "golang.org/x/net/http/httpproxy" + v1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/labels" + + "github.com/tigera/operator/pkg/url" + "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" @@ -166,6 +172,10 @@ func add(mgr manager.Manager, c ctrlruntime.Controller) error { return fmt.Errorf("%s failed to watch ImageSet: %w", controllerName, err) } + if err := utils.AddDeploymentWatch(c, render.GuardianDeploymentName, render.GuardianNamespace); err != nil { + return fmt.Errorf("%s failed to watch Guardian deployment: %w", controllerName, err) + } + // Watch for changes to TigeraStatus. if err = utils.AddTigeraStatusWatch(c, ResourceName); err != nil { return fmt.Errorf("clusterconnection-controller failed to watch management-cluster-connection Tigerastatus: %w", err) @@ -185,6 +195,7 @@ type ReconcileConnection struct { status status.StatusManager clusterDomain string tierWatchReady *utils.ReadyFlag + resolvedProxy *httpproxy.Config } // Reconcile reads that state of the cluster for a ManagementClusterConnection object and makes changes based on the @@ -345,7 +356,7 @@ func (r *ReconcileConnection) Reconcile(ctx context.Context, request reconcile.R // The Tier has been created, which means that this controller's reconciliation should no longer be a dependency // of the License being deployed. If NetworkPolicy requires license features, it should now be safe to validate // License presence and sufficiency. - if networkPolicyRequiresEgressAccessControl(managementClusterConnection, log) { + if networkPolicyRequiresEgressAccessControl(managementClusterConnection.Spec.ManagementClusterAddr, r.resolvedProxy, log) { license, err := utils.FetchLicenseKey(ctx, r.Client) if err != nil { if k8serrors.IsNotFound(err) { @@ -363,9 +374,70 @@ func (r *ReconcileConnection) Reconcile(ctx context.Context, request reconcile.R } } + // We need to resolve whether Guardian has been configured for a proxy by a mutating admission webhook. + if r.resolvedProxy == nil { + // Check if the guardian deployment is available. + var guardianDeploymentAvailable bool + guardianDeployment := v1.Deployment{} + err = r.Client.Get(ctx, client.ObjectKey{Name: render.GuardianDeploymentName, Namespace: render.GuardianNamespace}, &guardianDeployment) + + // If we error for something other than not found, treat the proxy as unresolved and error. + if err != nil && !k8serrors.IsNotFound(err) { + r.status.SetDegraded(operatorv1.ResourceReadError, "Failed to read the deployment status of Guardian", err, reqLogger) + return reconcile.Result{}, nil + } + + if err == nil { + for _, condition := range guardianDeployment.Status.Conditions { + if condition.Type == v1.DeploymentAvailable && condition.Status == corev1.ConditionTrue { + guardianDeploymentAvailable = true + break + } + } + } + + if guardianDeploymentAvailable { + // Query guardian pods. + labelSelector := labels.SelectorFromSet(map[string]string{ + "app.kubernetes.io/name": render.GuardianDeploymentName, + }) + pods := corev1.PodList{} + err := r.Client.List(ctx, &pods, &client.ListOptions{ + LabelSelector: labelSelector, + Namespace: render.GuardianNamespace, + }) + if err != nil { + r.status.SetDegraded(operatorv1.ResourceReadError, "Failed to list the pods of the Guardian deployment", err, reqLogger) + return reconcile.Result{}, nil + } + + // Parse the pod spec to resolve the proxy config. + var proxyConfig *httpproxy.Config + for _, pod := range pods.Items { + for _, container := range pod.Spec.Containers { + if container.Name == render.GuardianDeploymentName { + proxyConfig = &httpproxy.Config{} + for _, env := range container.Env { + switch env.Name { + case "https_proxy", "HTTPS_PROXY": + proxyConfig.HTTPSProxy = env.Value + case "no_proxy", "NO_PROXY": + proxyConfig.NoProxy = env.Value + } + } + break + } + } + } + + r.resolvedProxy = proxyConfig + } + } + ch := utils.NewComponentHandler(log, r.Client, r.Scheme, managementClusterConnection) guardianCfg := &render.GuardianConfiguration{ URL: managementClusterConnection.Spec.ManagementClusterAddr, + HTTPProxyConfig: r.resolvedProxy, TunnelCAType: managementClusterConnection.Spec.TLS.CA, PullSecrets: pullSecrets, OpenShift: r.Provider.IsOpenShift(), @@ -417,23 +489,43 @@ func fillDefaults(mcc *operatorv1.ManagementClusterConnection) { } } -func networkPolicyRequiresEgressAccessControl(connection *operatorv1.ManagementClusterConnection, log logr.Logger) bool { - if clusterAddrHasDomain, err := managementClusterAddrHasDomain(connection); err == nil && clusterAddrHasDomain { - return true - } else { +func networkPolicyRequiresEgressAccessControl(target string, httpProxyConfig *httpproxy.Config, log logr.Logger) bool { + var destinationHostPort string + if httpProxyConfig != nil && httpProxyConfig.HTTPSProxy != "" { + // HTTPS proxy is specified as a URL. + proxyHostPort, err := url.ParseHostPortFromHTTPProxyString(httpProxyConfig.HTTPSProxy) if err != nil { log.Error(err, fmt.Sprintf( - "Failed to parse ManagementClusterAddr. Assuming %s does not require license feature %s", + "Failed to parse HTTP Proxy URL (%s). Assuming %s does not require license feature %s", + httpProxyConfig.HTTPSProxy, render.GuardianPolicyName, common.EgressAccessControlFeature, )) + return false } + + destinationHostPort = proxyHostPort + } else { + // Target is already specified as host:port. + destinationHostPort = target + } + + // Determine if the host in the host:port is a domain name. + hostPortHasDomain, err := hostPortUsesDomainName(destinationHostPort) + if err != nil { + log.Error(err, fmt.Sprintf( + "Failed to parse resolved host:port (%s) for remote tunnel endpoint. Assuming %s does not require license feature %s", + destinationHostPort, + render.GuardianPolicyName, + common.EgressAccessControlFeature, + )) return false } + return hostPortHasDomain } -func managementClusterAddrHasDomain(connection *operatorv1.ManagementClusterConnection) (bool, error) { - host, _, err := net.SplitHostPort(connection.Spec.ManagementClusterAddr) +func hostPortUsesDomainName(hostPort string) (bool, error) { + host, _, err := net.SplitHostPort(hostPort) if err != nil { return false, err } diff --git a/pkg/controller/clusterconnection/clusterconnection_controller_test.go b/pkg/controller/clusterconnection/clusterconnection_controller_test.go index f7974d92a1..0eb389f34c 100644 --- a/pkg/controller/clusterconnection/clusterconnection_controller_test.go +++ b/pkg/controller/clusterconnection/clusterconnection_controller_test.go @@ -17,11 +17,18 @@ package clusterconnection_test import ( "context" "fmt" + "net" + "net/url" + "strconv" + "strings" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/stretchr/testify/mock" + "github.com/tigera/operator/pkg/ptr" + "github.com/tigera/operator/pkg/render/common/networkpolicy" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime/schema" appsv1 "k8s.io/api/apps/v1" rbacv1 "k8s.io/api/rbac/v1" @@ -52,9 +59,10 @@ var _ = Describe("ManagementClusterConnection controller tests", func() { var ctx context.Context var cfg *operatorv1.ManagementClusterConnection var r reconcile.Reconciler - var scheme *runtime.Scheme + var clientScheme *runtime.Scheme var dpl *appsv1.Deployment var mockStatus *status.MockStatus + var objTrackerWithCalls utils.ObjectTrackerWithCalls notReady := &utils.ReadyFlag{} ready := &utils.ReadyFlag{} @@ -62,13 +70,14 @@ var _ = Describe("ManagementClusterConnection controller tests", func() { BeforeEach(func() { // Create a Kubernetes client. - scheme = runtime.NewScheme() - Expect(apis.AddToScheme(scheme)).ShouldNot(HaveOccurred()) - Expect(appsv1.SchemeBuilder.AddToScheme(scheme)).ShouldNot(HaveOccurred()) - Expect(rbacv1.SchemeBuilder.AddToScheme(scheme)).ShouldNot(HaveOccurred()) - err := operatorv1.SchemeBuilder.AddToScheme(scheme) + clientScheme = runtime.NewScheme() + Expect(apis.AddToScheme(clientScheme)).ShouldNot(HaveOccurred()) + Expect(appsv1.SchemeBuilder.AddToScheme(clientScheme)).ShouldNot(HaveOccurred()) + Expect(rbacv1.SchemeBuilder.AddToScheme(clientScheme)).ShouldNot(HaveOccurred()) + err := operatorv1.SchemeBuilder.AddToScheme(clientScheme) Expect(err).NotTo(HaveOccurred()) - c = ctrlrfake.DefaultFakeClientBuilder(scheme).Build() + objTrackerWithCalls = utils.NewObjectTrackerWithCalls(clientScheme) + c = ctrlrfake.DefaultFakeClientBuilder(clientScheme).WithObjectTracker(&objTrackerWithCalls).Build() ctx = context.Background() mockStatus = &status.MockStatus{} mockStatus.On("Run").Return() @@ -84,7 +93,7 @@ var _ = Describe("ManagementClusterConnection controller tests", func() { Expect(c.Create(ctx, &operatorv1.Monitor{ ObjectMeta: metav1.ObjectMeta{Name: "tigera-secure"}, })) - r = clusterconnection.NewReconcilerWithShims(c, scheme, mockStatus, operatorv1.ProviderNone, ready) + r = clusterconnection.NewReconcilerWithShims(c, clientScheme, mockStatus, operatorv1.ProviderNone, ready) dpl = &appsv1.Deployment{ TypeMeta: metav1.TypeMeta{Kind: "Deployment", APIVersion: "apps/v1"}, ObjectMeta: metav1.ObjectMeta{ @@ -162,7 +171,7 @@ var _ = Describe("ManagementClusterConnection controller tests", func() { Context("image reconciliation", func() { It("should use builtin images", func() { - r = clusterconnection.NewReconcilerWithShims(c, scheme, mockStatus, operatorv1.ProviderNone, ready) + r = clusterconnection.NewReconcilerWithShims(c, clientScheme, mockStatus, operatorv1.ProviderNone, ready) _, err := r.Reconcile(ctx, reconcile.Request{}) Expect(err).ShouldNot(HaveOccurred()) @@ -193,7 +202,7 @@ var _ = Describe("ManagementClusterConnection controller tests", func() { }, })).ToNot(HaveOccurred()) - r = clusterconnection.NewReconcilerWithShims(c, scheme, mockStatus, operatorv1.ProviderNone, ready) + r = clusterconnection.NewReconcilerWithShims(c, clientScheme, mockStatus, operatorv1.ProviderNone, ready) _, err := r.Reconcile(ctx, reconcile.Request{}) Expect(err).ShouldNot(HaveOccurred()) @@ -229,7 +238,7 @@ var _ = Describe("ManagementClusterConnection controller tests", func() { } Expect(c.Create(ctx, licenseKey)).NotTo(HaveOccurred()) Expect(c.Create(ctx, &v3.Tier{ObjectMeta: metav1.ObjectMeta{Name: "allow-tigera"}})).NotTo(HaveOccurred()) - r = clusterconnection.NewReconcilerWithShims(c, scheme, mockStatus, operatorv1.ProviderNone, ready) + r = clusterconnection.NewReconcilerWithShims(c, clientScheme, mockStatus, operatorv1.ProviderNone, ready) }) Context("IP-based management cluster address", func() { @@ -261,7 +270,7 @@ var _ = Describe("ManagementClusterConnection controller tests", func() { mockStatus.On("OnCRFound").Return() mockStatus.On("SetMetaData", mock.Anything).Return() - r = clusterconnection.NewReconcilerWithShims(c, scheme, mockStatus, operatorv1.ProviderNone, notReady) + r = clusterconnection.NewReconcilerWithShims(c, clientScheme, mockStatus, operatorv1.ProviderNone, notReady) test.ExpectWaitForTierWatch(ctx, r, mockStatus) policies := v3.NetworkPolicyList{} @@ -298,7 +307,7 @@ var _ = Describe("ManagementClusterConnection controller tests", func() { mockStatus.On("SetDegraded", operatorv1.ResourceReadError, "Feature is not active - License does not support feature: egress-access-control", mock.Anything, mock.Anything).Return() mockStatus.On("SetMetaData", mock.Anything).Return() - r = clusterconnection.NewReconcilerWithShims(c, scheme, mockStatus, operatorv1.ProviderNone, ready) + r = clusterconnection.NewReconcilerWithShims(c, clientScheme, mockStatus, operatorv1.ProviderNone, ready) _, err := r.Reconcile(ctx, reconcile.Request{}) Expect(err).ShouldNot(HaveOccurred()) mockStatus.AssertExpectations(GinkgoT()) @@ -310,7 +319,7 @@ var _ = Describe("ManagementClusterConnection controller tests", func() { mockStatus.On("OnCRFound").Return() mockStatus.On("SetMetaData", mock.Anything).Return() - r = clusterconnection.NewReconcilerWithShims(c, scheme, mockStatus, operatorv1.ProviderNone, notReady) + r = clusterconnection.NewReconcilerWithShims(c, clientScheme, mockStatus, operatorv1.ProviderNone, notReady) test.ExpectWaitForTierWatch(ctx, r, mockStatus) policies := v3.NetworkPolicyList{} @@ -338,7 +347,120 @@ var _ = Describe("ManagementClusterConnection controller tests", func() { Expect(policies.Items).To(HaveLen(0)) }) }) + + Context("Proxy detection", func() { + // Generate test cases based on the combinations of proxy address forms and proxy settings. + // Here we specify the base targets along with the base proxy IP, domain, and port that will be used for generation. + testCases := generateCases("voltron.io:9000", "192.168.1.2:9000", "proxy.io", "10.1.2.3", "8080") + + for _, testCase := range testCases { + Describe(fmt.Sprintf("Proxy detection when %+v", testCase), func() { + // Set up the test based on the test case. + BeforeEach(func() { + pod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "tigera-guardian", + Namespace: "tigera-guardian", + Labels: map[string]string{ + "k8s-app": "tigera-guardian", + "app.kubernetes.io/name": "tigera-guardian", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{{ + Name: "tigera-guardian", + Env: []v1.EnvVar{}, + }}, + }, + } + + // Set the env vars. + httpsProxyVarName := "HTTPS_PROXY" + noProxyVarName := "NO_PROXY" + if testCase.lowercase { + httpsProxyVarName = strings.ToLower(httpsProxyVarName) + noProxyVarName = strings.ToLower(noProxyVarName) + } + if testCase.httpsProxy != nil { + pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, v1.EnvVar{ + Name: httpsProxyVarName, + Value: *testCase.httpsProxy, + }) + } + if testCase.noProxy != nil { + pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, v1.EnvVar{ + Name: noProxyVarName, + Value: *testCase.noProxy, + }) + } + + err := c.Create(ctx, &pod) + Expect(err).NotTo(HaveOccurred()) + + // Set the target + cfg.Spec.ManagementClusterAddr = testCase.target + err = c.Update(ctx, cfg) + Expect(err).NotTo(HaveOccurred()) + }) + + It(fmt.Sprintf("detects proxy correctly when %+v", testCase), func() { + // First reconcile creates the guardian deployment. + _, err := r.Reconcile(ctx, reconcile.Request{}) + Expect(err).ShouldNot(HaveOccurred()) + + // Validate that we made no calls to get Pods at this stage. + podGVR := schema.GroupVersionResource{ + Version: "v1", + Resource: "pods", + } + Expect(objTrackerWithCalls.CallCount(podGVR, utils.ObjectTrackerCallList)).To(BeZero()) + + // Now we set the deployment to be available. + gd := appsv1.Deployment{} + err = c.Get(ctx, client.ObjectKey{Name: "tigera-guardian", Namespace: "tigera-guardian"}, &gd) + Expect(err).NotTo(HaveOccurred()) + gd.Status.Conditions = append(gd.Status.Conditions, appsv1.DeploymentCondition{ + Type: appsv1.DeploymentAvailable, + Status: v1.ConditionTrue, + }) + err = c.Update(ctx, &gd) + Expect(err).NotTo(HaveOccurred()) + + // Reconcile again. The proxy detection logic should kick in since the guardian deployment is ready. + _, err = r.Reconcile(ctx, reconcile.Request{}) + Expect(err).ShouldNot(HaveOccurred()) + Expect(objTrackerWithCalls.CallCount(podGVR, utils.ObjectTrackerCallList)).To(Equal(1)) + + // Resolve the rendered rule that governs egress from guardian to voltron. + policies := v3.NetworkPolicyList{} + Expect(c.List(ctx, &policies)).ToNot(HaveOccurred()) + Expect(policies.Items).To(HaveLen(2)) + Expect(policies.Items[1].Name).To(Equal("allow-tigera.guardian-access")) + policy := policies.Items[1] + Expect(policy.Spec.Egress).To(HaveLen(7)) + managementClusterEgressRule := policy.Spec.Egress[5] + + // Generate the expectation based on the test case, and compare the rendered rule to our expectation. + expected := getExpectedProxyPolicyFromCase(testCase) + if expected.hostIsIP { + Expect(managementClusterEgressRule.Destination.Nets).To(HaveLen(1)) + Expect(managementClusterEgressRule.Destination.Nets[0]).To(Equal(fmt.Sprintf("%s/32", expected.host))) + Expect(managementClusterEgressRule.Destination.Ports).To(Equal(networkpolicy.Ports(expected.port))) + } else { + Expect(managementClusterEgressRule.Destination.Domains).To(Equal([]string{expected.host})) + Expect(managementClusterEgressRule.Destination.Ports).To(Equal(networkpolicy.Ports(expected.port))) + } + + // Reconcile again. Verify that we do not cause any additional query for pods now that we have resolved the proxy. + _, err = r.Reconcile(ctx, reconcile.Request{}) + Expect(err).ShouldNot(HaveOccurred()) + Expect(objTrackerWithCalls.CallCount(podGVR, utils.ObjectTrackerCallList)).To(Equal(1)) + }) + }) + } + }) }) + Context("Reconcile for Condition status", func() { generation := int64(2) It("should reconcile with empty tigerastatus conditions ", func() { @@ -513,3 +635,152 @@ var _ = Describe("ManagementClusterConnection controller tests", func() { }) }) }) + +type proxyTestCase struct { + lowercase bool + target string + httpsProxy *string + noProxy *string +} + +type expectedProxyPolicy struct { + host string + port uint16 + hostIsIP bool + isProxied bool +} + +func generateCases(targetDomain, targetIP, proxyDomain, proxyIP, proxyPort string) []proxyTestCase { + var cases []proxyTestCase + // We will collect the cases by target type. Targets are in the form of ip:port or domain:port. + for _, target := range []string{targetDomain, targetIP} { + var casesByTargetType []proxyTestCase + // Generate the proxy strings. They can be http or https, use a domain or IP host, and can optionally specify a port. + var proxyStrings []*string + for _, scheme := range []string{"http", "https"} { + for _, host := range []string{proxyDomain, proxyIP} { + for _, port := range []string{"", proxyPort} { + proxyString := fmt.Sprintf("%s://%s", scheme, host) + if port != "" { + proxyString = fmt.Sprintf("%s:%s", proxyString, port) + } + proxyStrings = append(proxyStrings, ptr.ToPtr[string](proxyString)) + } + } + } + // Add base cases: proxy is unset, or an empty proxy is set. + proxyStrings = append(proxyStrings, nil, ptr.ToPtr[string]("")) + + // Generate the "no proxy" strings. They can either match or not match the target, can list one or many exemptions, + // and can optionally specify a port. + var noProxyStrings []*string + for _, matchesTarget := range []bool{true, false} { + noProxyContainsPort := []bool{false} + if matchesTarget { + noProxyContainsPort = append(noProxyContainsPort, true) + } + for _, containsPort := range noProxyContainsPort { + for _, multipleExemptions := range []bool{true, false} { + host, port, err := net.SplitHostPort(target) + Expect(err).NotTo(HaveOccurred()) + matchString := host + if containsPort { + matchString = fmt.Sprintf("%s:%s", matchString, port) + } + + var noProxyString string + if matchesTarget { + noProxyString = matchString + } else { + noProxyString = "nomatch.com" + } + + if multipleExemptions { + noProxyString = fmt.Sprintf("1.1.1.1,%s,nobueno.com", noProxyString) + } + + noProxyStrings = append(noProxyStrings, ptr.ToPtr[string](noProxyString)) + } + } + } + // Add base cases: no-proxy is unset, or an empty no-proxy is set. + noProxyStrings = append(noProxyStrings, nil, ptr.ToPtr[string]("")) + + // Create the cases based on the generated combinations of proxy strings. + // The env vars can be set as either lowercase or uppercase on the container, we express that possibility here. + for _, lowercase := range []bool{true, false} { + for _, proxyString := range proxyStrings { + for _, noProxyString := range noProxyStrings { + casesByTargetType = append(casesByTargetType, proxyTestCase{ + lowercase: lowercase, + target: target, + httpsProxy: proxyString, + noProxy: noProxyString, + }) + } + } + } + cases = append(cases, casesByTargetType...) + } + return cases +} + +func getExpectedProxyPolicyFromCase(c proxyTestCase) expectedProxyPolicy { + var isProxied bool + if c.httpsProxy != nil && *c.httpsProxy != "" { + if c.noProxy == nil || *c.noProxy == "" { + isProxied = true + } else { + var proxyIsExempt bool + for _, noProxySubstring := range strings.Split(*c.noProxy, ",") { + if strings.Contains(c.target, noProxySubstring) { + proxyIsExempt = true + break + } + } + if !proxyIsExempt { + isProxied = true + } + } + } + + var host string + var port uint16 + if isProxied { + proxyURL, err := url.ParseRequestURI(*c.httpsProxy) + Expect(err).NotTo(HaveOccurred()) + + // Resolve port + hostSplit := strings.Split(proxyURL.Host, ":") + switch { + case len(hostSplit) == 2: + port64, err := strconv.ParseUint(hostSplit[1], 10, 16) + Expect(err).NotTo(HaveOccurred()) + host = hostSplit[0] + port = uint16(port64) + case proxyURL.Scheme == "https": + host = proxyURL.Host + port = 443 + default: + host = proxyURL.Host + port = 80 + } + + Expect(err).NotTo(HaveOccurred()) + } else { + var portString string + var err error + host, portString, err = net.SplitHostPort(c.target) + Expect(err).NotTo(HaveOccurred()) + port64, err := strconv.ParseUint(portString, 10, 16) + Expect(err).NotTo(HaveOccurred()) + port = uint16(port64) + } + + return expectedProxyPolicy{ + host: host, + port: port, + isProxied: isProxied, + hostIsIP: net.ParseIP(host) != nil, + } +} diff --git a/pkg/controller/utils/utils.go b/pkg/controller/utils/utils.go index dc7f481a3a..0f1b5e7feb 100644 --- a/pkg/controller/utils/utils.go +++ b/pkg/controller/utils/utils.go @@ -24,6 +24,10 @@ import ( esv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/elasticsearch/v1" "github.com/elastic/cloud-on-k8s/v2/pkg/utils/stringsutil" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/testing" "github.com/go-logr/logr" @@ -919,3 +923,77 @@ func RemoveInstallationFinalizer(i *operatorv1.Installation, finalizer string) { i.SetFinalizers(stringsutil.RemoveStringInSlice(finalizer, i.GetFinalizers())) } } + +type ObjectTrackerCall string + +const ( + ObjectTrackerCallGet ObjectTrackerCall = "get" + ObjectTrackerCallCreate ObjectTrackerCall = "create" + ObjectTrackerCallUpdate ObjectTrackerCall = "update" + ObjectTrackerCallList ObjectTrackerCall = "list" + ObjectTrackerCallDelete ObjectTrackerCall = "delete" + ObjectTrackerCallWatch ObjectTrackerCall = "watch" +) + +func NewObjectTrackerWithCalls(clientScheme testing.ObjectScheme) ObjectTrackerWithCalls { + return ObjectTrackerWithCalls{ + ObjectTracker: testing.NewObjectTracker(clientScheme, scheme.Codecs.UniversalDecoder()), + callsByGVR: make(map[schema.GroupVersionResource]map[ObjectTrackerCall]int), + } +} + +// ObjectTrackerWithCalls wraps the default implementation of testing.ObjectTracker to track the calls made. +type ObjectTrackerWithCalls struct { + testing.ObjectTracker + callsByGVR map[schema.GroupVersionResource]map[ObjectTrackerCall]int +} + +func (o *ObjectTrackerWithCalls) Add(obj runtime.Object) error { + return o.ObjectTracker.Add(obj) +} + +func (o *ObjectTrackerWithCalls) inc(gvr schema.GroupVersionResource, call ObjectTrackerCall) { + if o.callsByGVR == nil { + o.callsByGVR = make(map[schema.GroupVersionResource]map[ObjectTrackerCall]int) + } + + if o.callsByGVR[gvr] == nil { + o.callsByGVR[gvr] = make(map[ObjectTrackerCall]int) + } + + o.callsByGVR[gvr][call]++ +} + +func (o *ObjectTrackerWithCalls) CallCount(gvr schema.GroupVersionResource, call ObjectTrackerCall) int { + return o.callsByGVR[gvr][call] +} + +func (o *ObjectTrackerWithCalls) Get(gvr schema.GroupVersionResource, ns, name string) (runtime.Object, error) { + o.inc(gvr, ObjectTrackerCallGet) + return o.ObjectTracker.Get(gvr, ns, name) +} + +func (o *ObjectTrackerWithCalls) Create(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error { + o.inc(gvr, ObjectTrackerCallCreate) + return o.ObjectTracker.Create(gvr, obj, ns) +} + +func (o *ObjectTrackerWithCalls) Update(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error { + o.inc(gvr, ObjectTrackerCallUpdate) + return o.ObjectTracker.Update(gvr, obj, ns) +} + +func (o *ObjectTrackerWithCalls) List(gvr schema.GroupVersionResource, gvk schema.GroupVersionKind, ns string) (runtime.Object, error) { + o.inc(gvr, ObjectTrackerCallList) + return o.ObjectTracker.List(gvr, gvk, ns) +} + +func (o *ObjectTrackerWithCalls) Delete(gvr schema.GroupVersionResource, ns, name string) error { + o.inc(gvr, ObjectTrackerCallDelete) + return o.ObjectTracker.Delete(gvr, ns, name) +} + +func (o *ObjectTrackerWithCalls) Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error) { + o.inc(gvr, ObjectTrackerCallWatch) + return o.ObjectTracker.Watch(gvr, ns) +} diff --git a/pkg/crds/calico/crd.projectcalico.org_bgpfilters.yaml b/pkg/crds/calico/crd.projectcalico.org_bgpfilters.yaml index 49dc53f8e8..09db5e3eff 100644 --- a/pkg/crds/calico/crd.projectcalico.org_bgpfilters.yaml +++ b/pkg/crds/calico/crd.projectcalico.org_bgpfilters.yaml @@ -49,6 +49,19 @@ spec: type: string matchOperator: type: string + prefixLength: + properties: + max: + format: int32 + maximum: 32 + minimum: 0 + type: integer + min: + format: int32 + maximum: 32 + minimum: 0 + type: integer + type: object source: type: string required: @@ -70,6 +83,19 @@ spec: type: string matchOperator: type: string + prefixLength: + properties: + max: + format: int32 + maximum: 128 + minimum: 0 + type: integer + min: + format: int32 + maximum: 128 + minimum: 0 + type: integer + type: object source: type: string required: @@ -91,6 +117,19 @@ spec: type: string matchOperator: type: string + prefixLength: + properties: + max: + format: int32 + maximum: 32 + minimum: 0 + type: integer + min: + format: int32 + maximum: 32 + minimum: 0 + type: integer + type: object source: type: string required: @@ -112,6 +151,19 @@ spec: type: string matchOperator: type: string + prefixLength: + properties: + max: + format: int32 + maximum: 128 + minimum: 0 + type: integer + min: + format: int32 + maximum: 128 + minimum: 0 + type: integer + type: object source: type: string required: diff --git a/pkg/crds/calico/crd.projectcalico.org_globalnetworkpolicies.yaml b/pkg/crds/calico/crd.projectcalico.org_globalnetworkpolicies.yaml index bf4420ca1b..039119087d 100644 --- a/pkg/crds/calico/crd.projectcalico.org_globalnetworkpolicies.yaml +++ b/pkg/crds/calico/crd.projectcalico.org_globalnetworkpolicies.yaml @@ -794,10 +794,10 @@ spec: order: description: Order is an optional field that specifies the order in which the policy is applied. Policies with higher "order" are applied - after those with lower order. If the order is omitted, it may be - considered to be "infinite" - i.e. the policy will be applied last. Policies - with identical order will be applied in alphanumerical order based - on the Policy "Name". + after those with lower order within the same tier. If the order + is omitted, it may be considered to be "infinite" - i.e. the policy + will be applied last. Policies with identical order will be applied + in alphanumerical order based on the Policy "Name" within the tier. type: number performanceHints: description: "PerformanceHints contains a list of hints to Calico's @@ -839,6 +839,14 @@ spec: description: ServiceAccountSelector is an optional field for an expression used to select a pod based on service accounts. type: string + tier: + description: The name of the tier that this policy belongs to. If + this is omitted, the default tier (name is "default") is assumed. The + specified tier must exist in order to create security policies within + the tier, the "default" tier is created automatically if it does + not exist, this means for deployments requiring only a single Tier, + the tier name may be omitted on all policy management requests. + type: string types: description: "Types indicates whether this policy applies to ingress, or to egress, or to both. When not explicitly specified (and so diff --git a/pkg/crds/calico/crd.projectcalico.org_networkpolicies.yaml b/pkg/crds/calico/crd.projectcalico.org_networkpolicies.yaml index 6a92754520..b2a4c07797 100644 --- a/pkg/crds/calico/crd.projectcalico.org_networkpolicies.yaml +++ b/pkg/crds/calico/crd.projectcalico.org_networkpolicies.yaml @@ -779,10 +779,10 @@ spec: order: description: Order is an optional field that specifies the order in which the policy is applied. Policies with higher "order" are applied - after those with lower order. If the order is omitted, it may be - considered to be "infinite" - i.e. the policy will be applied last. Policies - with identical order will be applied in alphanumerical order based - on the Policy "Name". + after those with lower order within the same tier. If the order + is omitted, it may be considered to be "infinite" - i.e. the policy + will be applied last. Policies with identical order will be applied + in alphanumerical order based on the Policy "Name" within the tier. type: number performanceHints: description: "PerformanceHints contains a list of hints to Calico's @@ -820,6 +820,14 @@ spec: description: ServiceAccountSelector is an optional field for an expression used to select a pod based on service accounts. type: string + tier: + description: The name of the tier that this policy belongs to. If + this is omitted, the default tier (name is "default") is assumed. The + specified tier must exist in order to create security policies within + the tier, the "default" tier is created automatically if it does + not exist, this means for deployments requiring only a single Tier, + the tier name may be omitted on all policy management requests. + type: string types: description: "Types indicates whether this policy applies to ingress, or to egress, or to both. When not explicitly specified (and so diff --git a/pkg/crds/calico/crd.projectcalico.org_tiers.yaml b/pkg/crds/calico/crd.projectcalico.org_tiers.yaml new file mode 100644 index 0000000000..323e092467 --- /dev/null +++ b/pkg/crds/calico/crd.projectcalico.org_tiers.yaml @@ -0,0 +1,54 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: (devel) + creationTimestamp: null + name: tiers.crd.projectcalico.org +spec: + group: crd.projectcalico.org + names: + kind: Tier + listKind: TierList + plural: tiers + singular: tier + scope: Cluster + versions: + - name: v1 + schema: + openAPIV3Schema: + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: TierSpec contains the specification for a security policy + tier resource. + properties: + order: + description: Order is an optional field that specifies the order in + which the tier is applied. Tiers with higher "order" are applied + after those with lower order. If the order is omitted, it may be + considered to be "infinite" - i.e. the tier will be applied last. Tiers + with identical order will be applied in alphanumerical order based + on the Tier "Name". + type: number + type: object + type: object + served: true + storage: true +status: + acceptedNames: + kind: "" + plural: "" + conditions: [] + storedVersions: [] diff --git a/pkg/render/guardian.go b/pkg/render/guardian.go index 0a1d1d6a4e..d108acdb09 100644 --- a/pkg/render/guardian.go +++ b/pkg/render/guardian.go @@ -18,6 +18,10 @@ package render import ( "net" + "net/url" + + operatorurl "github.com/tigera/operator/pkg/url" + "golang.org/x/net/http/httpproxy" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -82,6 +86,7 @@ func GuardianPolicy(cfg *GuardianConfiguration) (Component, error) { // GuardianConfiguration contains all the config information needed to render the component. type GuardianConfiguration struct { URL string + HTTPProxyConfig *httpproxy.Config PullSecrets []*corev1.Secret OpenShift bool Installation *operatorv1.InstallationSpec @@ -378,8 +383,36 @@ func guardianAllowTigeraPolicy(cfg *GuardianConfiguration) (*v3.NetworkPolicy, e }, }...) - // Assumes address has the form "host:port", required by net.Dial for TCP. - host, port, err := net.SplitHostPort(cfg.URL) + var proxyURL *url.URL + var err error + if cfg.HTTPProxyConfig != nil && cfg.HTTPProxyConfig.HTTPSProxy != "" { + targetURL := &url.URL{ + // The scheme should be HTTPS, as we are establishing an mTLS session with the target. + Scheme: "https", + + // We expect `target` to be of the form host:port. + Host: cfg.URL, + } + + proxyURL, err = cfg.HTTPProxyConfig.ProxyFunc()(targetURL) + if err != nil { + return nil, err + } + } + + var tunnelDestinationHostPort string + if proxyURL != nil { + proxyHostPort, err := operatorurl.ParseHostPortFromHTTPProxyURL(proxyURL) + if err != nil { + return nil, err + } + tunnelDestinationHostPort = proxyHostPort + } else { + // cfg.URL has host:port form + tunnelDestinationHostPort = cfg.URL + } + + host, port, err := net.SplitHostPort(tunnelDestinationHostPort) if err != nil { return nil, err } diff --git a/pkg/render/guardian_test.go b/pkg/render/guardian_test.go index 19eb39a020..e8725796e2 100644 --- a/pkg/render/guardian_test.go +++ b/pkg/render/guardian_test.go @@ -15,10 +15,14 @@ package render_test import ( + "fmt" + "net" + "net/url" + "strconv" + . "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo/extensions/table" . "github.com/onsi/gomega" - v3 "github.com/tigera/api/pkg/apis/projectcalico/v3" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -47,7 +51,7 @@ var _ = Describe("Rendering tests", func() { var g render.Component var resources []client.Object - createGuardianConfig := func(i operatorv1.InstallationSpec, addr string, openshift bool) *render.GuardianConfiguration { + createGuardianConfig := func(i operatorv1.InstallationSpec, addr string, proxyAddr string, openshift bool) *render.GuardianConfiguration { secret := &corev1.Secret{ TypeMeta: metav1.TypeMeta{Kind: "Secret", APIVersion: "v1"}, ObjectMeta: metav1.ObjectMeta{ @@ -86,7 +90,7 @@ var _ = Describe("Rendering tests", func() { Context("Guardian component", func() { renderGuardian := func(i operatorv1.InstallationSpec) { - cfg = createGuardianConfig(i, "127.0.0.1:1234", false) + cfg = createGuardianConfig(i, "127.0.0.1:1234", "", false) g = render.Guardian(cfg) Expect(g.ResolveImages(nil)).To(BeNil()) resources, _ = g.Objects() @@ -193,8 +197,8 @@ var _ = Describe("Rendering tests", func() { guardianPolicy := testutils.GetExpectedPolicyFromFile("./testutils/expected_policies/guardian.json") guardianPolicyForOCP := testutils.GetExpectedPolicyFromFile("./testutils/expected_policies/guardian_ocp.json") - renderGuardianPolicy := func(addr string, openshift bool) { - cfg := createGuardianConfig(operatorv1.InstallationSpec{Registry: "my-reg/"}, addr, openshift) + renderGuardianPolicy := func(addr string, proxyAddr string, openshift bool) { + cfg := createGuardianConfig(operatorv1.InstallationSpec{Registry: "my-reg/"}, addr, proxyAddr, openshift) g, err := render.GuardianPolicy(cfg) Expect(err).NotTo(HaveOccurred()) resources, _ = g.Objects() @@ -213,7 +217,7 @@ var _ = Describe("Rendering tests", func() { DescribeTable("should render allow-tigera policy", func(scenario testutils.AllowTigeraScenario) { - renderGuardianPolicy("127.0.0.1:1234", scenario.OpenShift) + renderGuardianPolicy("127.0.0.1:1234", "", scenario.OpenShift) policy := testutils.GetAllowTigeraPolicyFromResources(policyName, resources) expectedPolicy := getExpectedPolicy(policyName, scenario) Expect(policy).To(Equal(expectedPolicy)) @@ -224,13 +228,72 @@ var _ = Describe("Rendering tests", func() { // The test matrix above validates against an IP-based management cluster address. // Validate policy adaptation for domain-based management cluster address here. - It("should adapt Guardian policy if ManagementClusterAddr is domain-based", func() { - renderGuardianPolicy("mydomain.io:8080", false) + + DescribeTable("should adapt Guardian policy based on destination type", func(destAddr, proxyAddr string) { + renderGuardianPolicy(destAddr, proxyAddr, false) policy := testutils.GetAllowTigeraPolicyFromResources(policyName, resources) + Expect(policy.Spec.Egress).To(HaveLen(7)) managementClusterEgressRule := policy.Spec.Egress[5] - Expect(managementClusterEgressRule.Destination.Domains).To(Equal([]string{"mydomain.io"})) - Expect(managementClusterEgressRule.Destination.Ports).To(Equal(networkpolicy.Ports(8080))) - }) + + isProxied := proxyAddr != "" + var expectedHost string + var expectedPortString string + if isProxied { + uri, err := url.ParseRequestURI(proxyAddr) + Expect(err).NotTo(HaveOccurred()) + expectedPortString = uri.Port() + if expectedPortString == "" { + expectedHost = uri.Host + if uri.Scheme == "http" { + expectedPortString = "80" + } else if uri.Scheme == "https" { + expectedPortString = "443" + } + } else { + host, port, err := net.SplitHostPort(uri.Host) + Expect(err).NotTo(HaveOccurred()) + expectedHost = host + expectedPortString = port + } + } else { + host, port, err := net.SplitHostPort(destAddr) + Expect(err).NotTo(HaveOccurred()) + expectedHost = host + expectedPortString = port + } + expectedPort, err := strconv.ParseUint(expectedPortString, 10, 16) + Expect(err).NotTo(HaveOccurred()) + + if net.ParseIP(expectedHost) != nil { + // Host is an IP. + Expect(managementClusterEgressRule.Destination.Nets).To(HaveLen(1)) + Expect(managementClusterEgressRule.Destination.Nets[0]).To(Equal(fmt.Sprintf("%s/32", expectedHost))) + Expect(managementClusterEgressRule.Destination.Ports).To(Equal(networkpolicy.Ports(uint16(expectedPort)))) + } else { + // Host is a domain name. + Expect(managementClusterEgressRule.Destination.Domains).To(Equal([]string{expectedHost})) + Expect(managementClusterEgressRule.Destination.Ports).To(Equal(networkpolicy.Ports(uint16(expectedPort)))) + } + }, + Entry("domain host:port, no proxy", "mydomain.io:8080", ""), + Entry("domain host:port, http proxy domain host", "mydomain.io:8080", "http://myproxy.io/"), + Entry("domain host:port, https proxy domain host", "mydomain.io:8080", "https://myproxy.io/"), + Entry("domain host:port, http proxy domain host:port", "mydomain.io:8080", "http://myproxy.io:9000/"), + Entry("domain host:port, https proxy domain host:port", "mydomain.io:8080", "https://myproxy.io:9000/"), + Entry("domain host:port, http proxy ip host", "mydomain.io:8080", "http://10.0.0.1/"), + Entry("domain host:port, https proxy ip host", "mydomain.io:8080", "https://10.0.0.1/"), + Entry("domain host:port, http proxy ip host:port", "mydomain.io:8080", "http://10.0.0.1:9000/"), + Entry("domain host:port, https proxy ip host:port", "mydomain.io:8080", "https://10.0.0.1:9000/"), + Entry("ip host:port, no proxy", "192.168.0.01:8080", ""), + Entry("ip host:port, http proxy domain host", "192.168.0.01:8080", "http://myproxy.io/"), + Entry("ip host:port, https proxy domain host", "192.168.0.01:8080", "https://myproxy.io/"), + Entry("ip host:port, http proxy domain host:port", "192.168.0.01:8080", "http://myproxy.io:9000/"), + Entry("ip host:port, https proxy domain host:port", "192.168.0.01:8080", "https://myproxy.io:9000/"), + Entry("ip host:port, http proxy ip host", "192.168.0.01:8080", "http://10.0.0.1/"), + Entry("ip host:port, https proxy ip host", "192.168.0.01:8080", "https://10.0.0.1/"), + Entry("ip host:port, http proxy ip host:port", "192.168.0.01:8080", "http://10.0.0.1:9000/"), + Entry("ip host:port, https proxy ip host:port", "192.168.0.01:8080", "https://10.0.0.1:9000/"), + ) }) }) }) diff --git a/pkg/url/url.go b/pkg/url/url.go index 8bec1a5851..126f2f8373 100644 --- a/pkg/url/url.go +++ b/pkg/url/url.go @@ -1,7 +1,22 @@ +// Copyright (c) 2021-2024 Tigera, Inc. All rights reserved. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package url import ( "fmt" + "net" "net/url" "strings" ) @@ -18,3 +33,31 @@ func ParseEndpoint(endpoint string) (string, string, string, error) { } return url.Scheme, splits[0], splits[1], nil } + +func ParseHostPortFromHTTPProxyString(proxyURL string) (string, error) { + parsedProxyURL, err := url.ParseRequestURI(proxyURL) + if err != nil { + return "", err + } + + return ParseHostPortFromHTTPProxyURL(parsedProxyURL) +} + +func ParseHostPortFromHTTPProxyURL(url *url.URL) (string, error) { + parsedScheme := url.Scheme + if parsedScheme == "" || (parsedScheme != "http" && parsedScheme != "https") { + return "", fmt.Errorf("unexpected scheme for HTTP proxy URL: %s", parsedScheme) + } + + if url.Port() != "" { + // Host is already in host:port form. + return url.Host, nil + } + + // Scheme is either http or https at this point. + if url.Scheme == "http" { + return net.JoinHostPort(url.Host, "80"), nil + } else { + return net.JoinHostPort(url.Host, "443"), nil + } +}