diff --git a/.github/workflows/dapr-pubsub.yaml b/.github/workflows/dapr-export.yaml similarity index 86% rename from .github/workflows/dapr-pubsub.yaml rename to .github/workflows/dapr-export.yaml index bb5cd844540..e6f9e798f42 100644 --- a/.github/workflows/dapr-pubsub.yaml +++ b/.github/workflows/dapr-export.yaml @@ -1,18 +1,18 @@ -name: dapr-pubsub +name: dapr-export on: push: paths: - - "pkg/pubsub/dapr" - - "test/pubsub/**" + - "pkg/export/dapr" + - "test/export/**" pull_request: paths: - - "pkg/pubsub/dapr" - - "test/pubsub/**" + - "pkg/export/dapr" + - "test/export/**" permissions: read-all jobs: dapr_test: - name: "Dapr pubsub test" + name: "Dapr export test" runs-on: ubuntu-22.04 timeout-minutes: 15 strategy: @@ -50,20 +50,20 @@ jobs: kind load docker-image --name kind gatekeeper-e2e:latest gatekeeper-crds:latest kubectl create ns gatekeeper-system make e2e-publisher-deploy - make e2e-helm-deploy HELM_REPO=gatekeeper-e2e HELM_CRD_REPO=gatekeeper-crds HELM_RELEASE=latest ENABLE_PUBSUB=true LOG_LEVEL=DEBUG - make test-e2e ENABLE_PUBSUB_TESTS=1 + make e2e-helm-deploy HELM_REPO=gatekeeper-e2e HELM_CRD_REPO=gatekeeper-crds HELM_RELEASE=latest ENABLE_EXPORT=true LOG_LEVEL=DEBUG + make test-e2e ENABLE_EXPORT_TESTS=1 - name: Save logs if: ${{ always() }} run: | kubectl logs -n fake-subscriber -l app=sub --tail=-1 > logs-audit-subscribe.json - kubectl logs -n gatekeeper-system -l control-plane=audit-controller --tail=-1 > logs-audit-publish.json + kubectl logs -n gatekeeper-system -l control-plane=audit-controller --tail=-1 > logs-audit-export.json - name: Upload artifacts uses: actions/upload-artifact@65c4c4a1ddee5b72f698fdd19549f0f0fb45cf08 # v4.6.0 if: ${{ always() }} with: - name: pubsub-logs + name: export-logs path: | logs-*.json diff --git a/Makefile b/Makefile index 7d2ea03213b..9e3fb10d6b1 100644 --- a/Makefile +++ b/Makefile @@ -15,7 +15,7 @@ PUSH_TO_GHCR ?= false DEV_TAG ?= dev USE_LOCAL_IMG ?= false ENABLE_GENERATOR_EXPANSION ?= false -ENABLE_PUBSUB ?= false +ENABLE_EXPORT ?= false AUDIT_CONNECTION ?= "audit" AUDIT_CHANNEL ?= "audit" LOG_LEVEL ?= "INFO" @@ -203,7 +203,7 @@ e2e-helm-install: ./.staging/helm/linux-amd64/helm version --client e2e-helm-deploy: e2e-helm-install -ifeq ($(ENABLE_PUBSUB),true) +ifeq ($(ENABLE_EXPORT),true) ./.staging/helm/linux-amd64/helm install manifest_staging/charts/gatekeeper --name-template=gatekeeper \ --namespace ${GATEKEEPER_NAMESPACE} \ --debug --wait \ @@ -220,7 +220,7 @@ ifeq ($(ENABLE_PUBSUB),true) --set auditEventsInvolvedNamespace=true \ --set disabledBuiltins={http.send} \ --set logMutations=true \ - --set audit.enablePubsub=${ENABLE_PUBSUB} \ + --set enableViolationExport=${ENABLE_EXPORT} \ --set audit.connection=${AUDIT_CONNECTION} \ --set audit.channel=${AUDIT_CHANNEL} \ --set-string auditPodAnnotations.dapr\\.io/enabled=true \ @@ -292,17 +292,17 @@ e2e-helm-upgrade: --set mutationAnnotations=true;\ e2e-subscriber-build-load-image: - docker buildx build --platform="linux/amd64" -t ${FAKE_SUBSCRIBER_IMAGE} --load -f test/pubsub/fake-subscriber/Dockerfile test/pubsub/fake-subscriber + docker buildx build --platform="linux/amd64" -t ${FAKE_SUBSCRIBER_IMAGE} --load -f test/export/fake-subscriber/Dockerfile test/export/fake-subscriber kind load docker-image --name kind ${FAKE_SUBSCRIBER_IMAGE} e2e-subscriber-deploy: kubectl create ns fake-subscriber kubectl get secret redis --namespace=default -o yaml | sed 's/namespace: .*/namespace: fake-subscriber/' | kubectl apply -f - - kubectl apply -f test/pubsub/fake-subscriber/manifest/subscriber.yaml + kubectl apply -f test/export/fake-subscriber/manifest/subscriber.yaml e2e-publisher-deploy: kubectl get secret redis --namespace=default -o yaml | sed 's/namespace: .*/namespace: gatekeeper-system/' | kubectl apply -f - - kubectl apply -f test/pubsub/publish-components.yaml + kubectl apply -f test/export/publish-components.yaml # Build manager binary manager: generate diff --git a/cmd/build/helmify/kustomize-for-helm.yaml b/cmd/build/helmify/kustomize-for-helm.yaml index 3ed0fafd121..cbea03358ff 100644 --- a/cmd/build/helmify/kustomize-for-helm.yaml +++ b/cmd/build/helmify/kustomize-for-helm.yaml @@ -183,7 +183,7 @@ spec: - --operation=audit - --operation=status - --operation=generate - - HELMSUBST_DEPLOYMENT_AUDIT_PUBSUB_ARGS + - HELMSUBST_DEPLOYMENT_AUDIT_VIOLATION_EXPORT_ARGS - HELMSUBST_MUTATION_STATUS_ENABLED_ARG - --logtostderr - --health-addr=:HELMSUBST_DEPLOYMENT_AUDIT_HEALTH_PORT diff --git a/cmd/build/helmify/replacements.go b/cmd/build/helmify/replacements.go index 8748ba62433..c42541f40f6 100644 --- a/cmd/build/helmify/replacements.go +++ b/cmd/build/helmify/replacements.go @@ -118,8 +118,8 @@ var replacements = map[string]string{ - --default-wait-for-vapb-generation={{ .Values.defaultWaitForVAPBGeneration }} {{- end }}`, - "- HELMSUBST_DEPLOYMENT_AUDIT_PUBSUB_ARGS": `{{ if hasKey .Values.audit "enablePubsub" }} - - --enable-pub-sub={{ .Values.audit.enablePubsub }} + "- HELMSUBST_DEPLOYMENT_AUDIT_VIOLATION_EXPORT_ARGS": `{{ if hasKey .Values "enableViolationExport" }} + - --enable-violation-export={{ .Values.enableViolationExport }} {{- end }} {{ if hasKey .Values.audit "connection" }} - --audit-connection={{ .Values.audit.connection }} diff --git a/cmd/build/helmify/static/README.md b/cmd/build/helmify/static/README.md index ff24c055568..a58355a1e96 100644 --- a/cmd/build/helmify/static/README.md +++ b/cmd/build/helmify/static/README.md @@ -221,9 +221,9 @@ information._ | audit.readinessTimeout | Timeout in seconds for audit's readiness probe | `1` | | audit.livenessTimeout | Timeout in seconds for the audit's liveness probe | `1` | | audit.logLevel | The minimum log level for audit, takes precedence over `logLevel` when specified | `null` | -| audit.enablePubsub | (alpha) Enabled pubsub to publish messages | `false` | -| audit.connection | (alpha) Connection name for publishing audit violation messages | `audit-connection` | -| audit.channel | (alpha) Channel name for publishing audit violation messages | `audit-channel` | +| enableViolationExport | (alpha) Enable exporting violations to external systems | `false` | +| audit.connection | (alpha) Connection name for exporting audit violation messages | `audit-connection` | +| audit.channel | (alpha) Channel name for exporting audit violation messages | `audit-channel` | | replicas | The number of Gatekeeper replicas to deploy for the webhook | `3` | | podAnnotations | The annotations to add to the Gatekeeper pods | `container.seccomp.security.alpha.kubernetes.io/manager: runtime/default` | | podLabels | The labels to add to the Gatekeeper pods | `{}` | diff --git a/main.go b/main.go index 00964c9acdc..e7804aaa02a 100644 --- a/main.go +++ b/main.go @@ -47,11 +47,11 @@ import ( "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process" "github.com/open-policy-agent/gatekeeper/v3/pkg/drivers/k8scel" "github.com/open-policy-agent/gatekeeper/v3/pkg/expansion" + "github.com/open-policy-agent/gatekeeper/v3/pkg/export" "github.com/open-policy-agent/gatekeeper/v3/pkg/externaldata" "github.com/open-policy-agent/gatekeeper/v3/pkg/metrics" "github.com/open-policy-agent/gatekeeper/v3/pkg/mutation" "github.com/open-policy-agent/gatekeeper/v3/pkg/operations" - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub" "github.com/open-policy-agent/gatekeeper/v3/pkg/readiness" "github.com/open-policy-agent/gatekeeper/v3/pkg/readiness/pruner" "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil" @@ -435,7 +435,7 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, tracker *readiness. mutationSystem := mutation.NewSystem(mutationOpts) expansionSystem := expansion.NewSystem(mutationSystem) - pubsubSystem := pubsub.NewSystem() + exportSystem := export.NewSystem() c := mgr.GetCache() dc, ok := c.(watch.RemovableCache) @@ -508,7 +508,7 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, tracker *readiness. MutationSystem: mutationSystem, ExpansionSystem: expansionSystem, ProviderCache: providerCache, - PubsubSystem: pubsubSystem, + ExportSystem: exportSystem, } if err := controller.AddToManager(mgr, &opts); err != nil { @@ -538,7 +538,7 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, tracker *readiness. ProcessExcluder: processExcluder, CacheLister: auditCache, ExpansionSystem: expansionSystem, - PubSubSystem: pubsubSystem, + ExportSystem: exportSystem, } if err := audit.AddToManager(mgr, &auditDeps); err != nil { setupLog.Error(err, "unable to register audit with the manager") diff --git a/manifest_staging/charts/gatekeeper/README.md b/manifest_staging/charts/gatekeeper/README.md index ff24c055568..a58355a1e96 100644 --- a/manifest_staging/charts/gatekeeper/README.md +++ b/manifest_staging/charts/gatekeeper/README.md @@ -221,9 +221,9 @@ information._ | audit.readinessTimeout | Timeout in seconds for audit's readiness probe | `1` | | audit.livenessTimeout | Timeout in seconds for the audit's liveness probe | `1` | | audit.logLevel | The minimum log level for audit, takes precedence over `logLevel` when specified | `null` | -| audit.enablePubsub | (alpha) Enabled pubsub to publish messages | `false` | -| audit.connection | (alpha) Connection name for publishing audit violation messages | `audit-connection` | -| audit.channel | (alpha) Channel name for publishing audit violation messages | `audit-channel` | +| enableViolationExport | (alpha) Enable exporting violations to external systems | `false` | +| audit.connection | (alpha) Connection name for exporting audit violation messages | `audit-connection` | +| audit.channel | (alpha) Channel name for exporting audit violation messages | `audit-channel` | | replicas | The number of Gatekeeper replicas to deploy for the webhook | `3` | | podAnnotations | The annotations to add to the Gatekeeper pods | `container.seccomp.security.alpha.kubernetes.io/manager: runtime/default` | | podLabels | The labels to add to the Gatekeeper pods | `{}` | diff --git a/manifest_staging/charts/gatekeeper/templates/gatekeeper-audit-deployment.yaml b/manifest_staging/charts/gatekeeper/templates/gatekeeper-audit-deployment.yaml index e525d64e21e..552beb1fa7a 100644 --- a/manifest_staging/charts/gatekeeper/templates/gatekeeper-audit-deployment.yaml +++ b/manifest_staging/charts/gatekeeper/templates/gatekeeper-audit-deployment.yaml @@ -71,8 +71,8 @@ spec: - --operation=audit - --operation=status - --operation=generate - {{ if hasKey .Values.audit "enablePubsub" }} - - --enable-pub-sub={{ .Values.audit.enablePubsub }} + {{ if hasKey .Values "enableViolationExport" }} + - --enable-violation-export={{ .Values.enableViolationExport }} {{- end }} {{ if hasKey .Values.audit "connection" }} - --audit-connection={{ .Values.audit.connection }} diff --git a/pkg/audit/controller.go b/pkg/audit/controller.go index 97f1bba7692..d01b9bf9bca 100644 --- a/pkg/audit/controller.go +++ b/pkg/audit/controller.go @@ -16,7 +16,7 @@ import ( constraintclient "github.com/open-policy-agent/frameworks/constraint/pkg/client" "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process" "github.com/open-policy-agent/gatekeeper/v3/pkg/expansion" - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub" + "github.com/open-policy-agent/gatekeeper/v3/pkg/export" "sigs.k8s.io/controller-runtime/pkg/manager" ) @@ -25,7 +25,7 @@ type Dependencies struct { ProcessExcluder *process.Excluder CacheLister *CacheLister ExpansionSystem *expansion.System - PubSubSystem *pubsub.System + ExportSystem *export.System } // AddToManager adds audit manager to the Manager. diff --git a/pkg/audit/manager.go b/pkg/audit/manager.go index d7db33b1585..87fface6a6e 100644 --- a/pkg/audit/manager.go +++ b/pkg/audit/manager.go @@ -18,11 +18,11 @@ import ( constraintclient "github.com/open-policy-agent/frameworks/constraint/pkg/client" "github.com/open-policy-agent/frameworks/constraint/pkg/client/reviews" "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process" - pubsubController "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/pubsub" + exportController "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/export" "github.com/open-policy-agent/gatekeeper/v3/pkg/expansion" + "github.com/open-policy-agent/gatekeeper/v3/pkg/export" "github.com/open-policy-agent/gatekeeper/v3/pkg/logging" mutationtypes "github.com/open-policy-agent/gatekeeper/v3/pkg/mutation/types" - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub" "github.com/open-policy-agent/gatekeeper/v3/pkg/target" "github.com/open-policy-agent/gatekeeper/v3/pkg/util" corev1 "k8s.io/api/core/v1" @@ -67,8 +67,8 @@ var ( auditEventsInvolvedNamespace = flag.Bool("audit-events-involved-namespace", false, "emit audit events for each violation in the involved objects namespace, the default (false) generates events in the namespace Gatekeeper is installed in. Audit events from cluster-scoped resources will still follow the default behavior") auditMatchKindOnly = flag.Bool("audit-match-kind-only", false, "only use kinds specified in all constraints for auditing cluster resources. if kind is not specified in any of the constraints, it will audit all resources (same as setting this flag to false)") apiCacheDir = flag.String("api-cache-dir", defaultAPICacheDir, "The directory where audit from api server cache are stored, defaults to /tmp/audit") - auditConnection = flag.String("audit-connection", defaultConnection, "(alpha) Connection name for publishing audit violation messages. Defaults to audit-connection") - auditChannel = flag.String("audit-channel", defaultChannel, "(alpha) Channel name for publishing audit violation messages. Defaults to audit-channel") + auditConnection = flag.String("audit-connection", defaultConnection, "(alpha) Connection name for exporting audit violation messages. Defaults to audit-connection") + auditChannel = flag.String("audit-channel", defaultChannel, "(alpha) Channel name for exporting audit violation messages. Defaults to audit-channel") emptyAuditResults = newLimitQueue(0) logStatsAudit = flag.Bool("log-stats-audit", false, "(alpha) log stats metrics for the audit run") ) @@ -91,7 +91,7 @@ type Manager struct { auditCache *CacheLister expansionSystem *expansion.System - pubsubSystem *pubsub.System + exportSystem *export.System } // StatusViolation represents each violation under status. @@ -106,8 +106,8 @@ type StatusViolation struct { EnforcementActions []string `json:"enforcementActions,omitempty"` } -// ConstraintMsg represents publish message for each constraint. -type PubsubMsg struct { +// ExportMsg represents export message for each violation. +type ExportMsg struct { ID string `json:"id,omitempty"` Details interface{} `json:"details,omitempty"` EventType string `json:"eventType,omitempty"` @@ -269,7 +269,7 @@ func New(mgr manager.Manager, deps *Dependencies) (*Manager, error) { gkNamespace: util.GetNamespace(), auditCache: deps.CacheLister, expansionSystem: deps.ExpansionSystem, - pubsubSystem: deps.PubSubSystem, + exportSystem: deps.ExportSystem, } return am, nil } @@ -902,10 +902,10 @@ func (am *Manager) addAuditResponsesToUpdateLists( details := r.Metadata["details"] labels := r.obj.GetLabels() logViolation(am.log, constraint, ea, r.ScopedEnforcementActions, gvk, namespace, name, msg, details, labels) - if *pubsubController.PubsubEnabled { - err := am.pubsubSystem.Publish(context.Background(), *auditConnection, *auditChannel, violationMsg(constraint, ea, r.ScopedEnforcementActions, gvk, namespace, name, msg, details, labels, timestamp)) + if *exportController.ExportEnabled { + err := am.exportSystem.Publish(context.Background(), *auditConnection, *auditChannel, violationMsg(constraint, ea, r.ScopedEnforcementActions, gvk, namespace, name, msg, details, labels, timestamp)) if err != nil { - am.log.Error(err, "pubsub audit Publishing") + am.log.Error(err, "error exporting audit violation") } } if *emitAuditEvents { @@ -1162,7 +1162,7 @@ func violationMsg(constraint *unstructured.Unstructured, enforcementAction util. userConstraintAnnotations := constraint.GetAnnotations() delete(userConstraintAnnotations, "kubectl.kubernetes.io/last-applied-configuration") - return PubsubMsg{ + return ExportMsg{ Message: message, Details: details, ID: timestamp, diff --git a/pkg/controller/add_pubsub.go b/pkg/controller/add_export.go similarity index 83% rename from pkg/controller/add_pubsub.go rename to pkg/controller/add_export.go index 52904f36d50..feb9cb30e1c 100644 --- a/pkg/controller/add_pubsub.go +++ b/pkg/controller/add_export.go @@ -16,9 +16,9 @@ limitations under the License. package controller import ( - "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/pubsub" + "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/export" ) func init() { - Injectors = append(Injectors, &pubsub.Adder{}) + Injectors = append(Injectors, &export.Adder{}) } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 97b6044a523..add363bfa49 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -28,9 +28,9 @@ import ( "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process" syncc "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/sync" "github.com/open-policy-agent/gatekeeper/v3/pkg/expansion" + "github.com/open-policy-agent/gatekeeper/v3/pkg/export" "github.com/open-policy-agent/gatekeeper/v3/pkg/fakes" "github.com/open-policy-agent/gatekeeper/v3/pkg/mutation" - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub" "github.com/open-policy-agent/gatekeeper/v3/pkg/readiness" "github.com/open-policy-agent/gatekeeper/v3/pkg/util" "github.com/open-policy-agent/gatekeeper/v3/pkg/watch" @@ -56,8 +56,8 @@ type GetPodInjector interface { InjectGetPod(func(context.Context) (*corev1.Pod, error)) } -type PubsubInjector interface { - InjectPubsubSystem(pubsubSystem *pubsub.System) +type ExportInjector interface { + InjectExportSystem(exportSystem *export.System) } type DataClientInjector interface { @@ -101,7 +101,7 @@ type Dependencies struct { MutationSystem *mutation.System ExpansionSystem *expansion.System ProviderCache *externaldata.ProviderCache - PubsubSystem *pubsub.System + ExportSystem *export.System SyncEventsCh chan event.GenericEvent CacheMgr *cm.CacheManager } @@ -212,8 +212,8 @@ func AddToManager(m manager.Manager, deps *Dependencies) error { if a2, ok := a.(GetPodInjector); ok { a2.InjectGetPod(deps.GetPod) } - if a2, ok := a.(PubsubInjector); ok { - a2.InjectPubsubSystem(deps.PubsubSystem) + if a2, ok := a.(ExportInjector); ok { + a2.InjectExportSystem(deps.ExportSystem) } if a2, ok := a.(CacheManagerInjector); ok { // this is used by the config controller to sync diff --git a/pkg/controller/pubsub/pubsub_config_controller.go b/pkg/controller/export/export_config_controller.go similarity index 76% rename from pkg/controller/pubsub/pubsub_config_controller.go rename to pkg/controller/export/export_config_controller.go index 012e0ba0b07..5bb93a62b15 100644 --- a/pkg/controller/pubsub/pubsub_config_controller.go +++ b/pkg/controller/export/export_config_controller.go @@ -1,4 +1,4 @@ -package pubsub +package export import ( "context" @@ -6,8 +6,8 @@ import ( "flag" "fmt" + "github.com/open-policy-agent/gatekeeper/v3/pkg/export" "github.com/open-policy-agent/gatekeeper/v3/pkg/logging" - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub" "github.com/open-policy-agent/gatekeeper/v3/pkg/readiness" "github.com/open-policy-agent/gatekeeper/v3/pkg/util" corev1 "k8s.io/api/core/v1" @@ -25,36 +25,36 @@ import ( ) var ( - PubsubEnabled = flag.Bool("enable-pub-sub", false, "(alpha) Enabled pubsub to publish messages") - log = logf.Log.WithName("controller").WithValues(logging.Process, "pubsub_controller") + ExportEnabled = flag.Bool("enable-violation-export", false, "(alpha) Enable exporting violations to external systems") + log = logf.Log.WithName("controller").WithValues(logging.Process, "export_controller") ) type Adder struct { - PubsubSystem *pubsub.System + ExportSystem *export.System } func (a *Adder) Add(mgr manager.Manager) error { - if !*PubsubEnabled { + if !*ExportEnabled { return nil } - log.Info("Warning: Alpha flag enable-pub-sub is set to true. This flag may change in the future.") - r := newReconciler(mgr, a.PubsubSystem) + log.Info("Warning: Alpha flag enable-violation-export is set to true. This flag may change in the future.") + r := newReconciler(mgr, a.ExportSystem) return add(mgr, r) } func (a *Adder) InjectTracker(_ *readiness.Tracker) {} -func (a *Adder) InjectPubsubSystem(pubsubSystem *pubsub.System) { - a.PubsubSystem = pubsubSystem +func (a *Adder) InjectExportSystem(exportSystem *export.System) { + a.ExportSystem = exportSystem } type Reconciler struct { client.Client scheme *runtime.Scheme - system *pubsub.System + system *export.System } -func newReconciler(mgr manager.Manager, system *pubsub.System) *Reconciler { +func newReconciler(mgr manager.Manager, system *export.System) *Reconciler { return &Reconciler{ Client: mgr.GetClient(), scheme: mgr.GetScheme(), @@ -63,7 +63,7 @@ func newReconciler(mgr manager.Manager, system *pubsub.System) *Reconciler { } func add(mgr manager.Manager, r reconcile.Reconciler) error { - c, err := controller.New("pubsub-config-controller", mgr, controller.Options{Reconciler: r}) + c, err := controller.New("export-config-controller", mgr, controller.Options{Reconciler: r}) if err != nil { return err } @@ -111,10 +111,10 @@ func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) ( } if len(cfg.Data) == 0 { - return reconcile.Result{}, fmt.Errorf(fmt.Sprintf("data missing in configmap %s, unable to configure respective pubsub", request.NamespacedName)) + return reconcile.Result{}, fmt.Errorf(fmt.Sprintf("data missing in configmap %s, unable to configure exporter", request.NamespacedName)) } - if _, ok := cfg.Data["provider"]; !ok { - return reconcile.Result{}, fmt.Errorf(fmt.Sprintf("missing provider field in configmap %s, unable to configure respective pubsub", request.NamespacedName)) + if _, ok := cfg.Data["driver"]; !ok { + return reconcile.Result{}, fmt.Errorf(fmt.Sprintf("missing driver field in configmap %s, unable to configure exporter", request.NamespacedName)) } var config interface{} err = json.Unmarshal([]byte(cfg.Data["config"]), &config) @@ -122,11 +122,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) ( return reconcile.Result{}, err } - err = r.system.UpsertConnection(ctx, config, request.Name, cfg.Data["provider"]) + err = r.system.UpsertConnection(ctx, config, request.Name, cfg.Data["driver"]) if err != nil { return reconcile.Result{}, err } - log.Info("Connection upsert successful", "name", request.Name, "provider", cfg.Data["provider"]) + log.Info("Connection upsert successful", "name", request.Name, "driver", cfg.Data["driver"]) return reconcile.Result{}, nil } diff --git a/pkg/controller/pubsub/pubsub_config_controller_test.go b/pkg/controller/export/export_config_controller_test.go similarity index 88% rename from pkg/controller/pubsub/pubsub_config_controller_test.go rename to pkg/controller/export/export_config_controller_test.go index 258b092c309..1a7d521b8ce 100644 --- a/pkg/controller/pubsub/pubsub_config_controller_test.go +++ b/pkg/controller/export/export_config_controller_test.go @@ -1,4 +1,4 @@ -package pubsub +package export import ( "context" @@ -6,7 +6,7 @@ import ( "fmt" "testing" - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/dapr" + "github.com/open-policy-agent/gatekeeper/v3/pkg/export/dapr" "github.com/open-policy-agent/gatekeeper/v3/pkg/util" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" @@ -25,7 +25,7 @@ func TestReconcile(t *testing.T) { t.Fatalf("Unexpected error parsing flag: %v", err) } - err = flag.CommandLine.Parse([]string{"--enable-pub-sub", "true"}) + err = flag.CommandLine.Parse([]string{"--enable-violation-export", "true"}) if err != nil { t.Fatalf("Unexpected error parsing flag: %v", err) } @@ -48,7 +48,7 @@ func TestReconcile(t *testing.T) { }, }, wantErr: true, - errorMsg: fmt.Sprintf("data missing in configmap %s, unable to configure respective pubsub", request.NamespacedName), + errorMsg: fmt.Sprintf("data missing in configmap %s, unable to configure exporter", request.NamespacedName), }, } for _, tc := range testCases { diff --git a/pkg/export/dapr/dapr.go b/pkg/export/dapr/dapr.go new file mode 100644 index 00000000000..ba4ca8e4fdd --- /dev/null +++ b/pkg/export/dapr/dapr.go @@ -0,0 +1,88 @@ +package dapr + +import ( + "context" + "encoding/json" + "fmt" + + daprClient "github.com/dapr/go-sdk/client" +) + +type Connection struct { + // Name of the component object to use in Dapr + component string + + client daprClient.Client +} + +// Dapr represents driver to use Dapr. +type Dapr struct { + openConnections map[string]Connection +} + +const ( + Name = "dapr" +) + +var Connections = &Dapr{ + openConnections: make(map[string]Connection), +} + +func (r *Dapr) Publish(_ context.Context, connectionName string, data interface{}, topic string) error { + jsonData, err := json.Marshal(data) + if err != nil { + return fmt.Errorf("error marshaling data: %w", err) + } + + conn, ok := r.openConnections[connectionName] + if !ok { + return fmt.Errorf("connection not found: %s for Dapr driver", connectionName) + } + err = conn.client.PublishEvent(context.Background(), conn.component, topic, jsonData) + if err != nil { + return fmt.Errorf("error publishing message to dapr: %w", err) + } + + return nil +} + +func (r *Dapr) CloseConnection(connectionName string) error { + delete(r.openConnections, connectionName) + return nil +} + +func (r *Dapr) UpdateConnection(_ context.Context, connectionName string, config interface{}) error { + cfg, ok := config.(map[string]interface{}) + if !ok { + return fmt.Errorf("invalid type assertion, config is not in expected format") + } + component, ok := cfg["component"].(string) + if !ok { + return fmt.Errorf("failed to get value of component") + } + conn := r.openConnections[connectionName] + conn.component = component + r.openConnections[connectionName] = conn + return nil +} + +func (r *Dapr) CreateConnection(_ context.Context, connectionName string, config interface{}) error { + var conn Connection + cfg, ok := config.(map[string]interface{}) + if !ok { + return fmt.Errorf("invalid type assertion, config is not in expected format") + } + conn.component, ok = cfg["component"].(string) + if !ok { + return fmt.Errorf("failed to get value of component") + } + + tmp, err := daprClient.NewClient() + if err != nil { + return err + } + + conn.client = tmp + r.openConnections[connectionName] = conn + return nil +} diff --git a/pkg/export/dapr/dapr_test.go b/pkg/export/dapr/dapr_test.go new file mode 100644 index 00000000000..d53e2e819ed --- /dev/null +++ b/pkg/export/dapr/dapr_test.go @@ -0,0 +1,163 @@ +package dapr + +import ( + "context" + "os" + "testing" + + "github.com/open-policy-agent/gatekeeper/v3/pkg/export/driver" + "github.com/stretchr/testify/assert" +) + +var testClient driver.Driver + +func TestMain(m *testing.M) { + c, f := FakeConnection() + testClient = c + r := m.Run() + f() + + if r != 0 { + os.Exit(r) + } +} + +func TestCreate(t *testing.T) { + tests := []struct { + name string + config interface{} + expectedConnections int + errorMsg string + }{ + { + name: "invalid config", + config: "test", + expectedConnections: 1, + errorMsg: "invalid type assertion, config is not in expected format", + }, + { + name: "config with missing component", + config: map[string]interface{}{"enableBatching": true}, + expectedConnections: 1, + errorMsg: "failed to get value of component", + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := testClient.CreateConnection(context.TODO(), "another-test", tc.config) + tmp, ok := testClient.(*Dapr) + if !ok { + t.Errorf("failed to type assert") + } + assert.Equal(t, tc.expectedConnections, len(tmp.openConnections)) + assert.EqualError(t, err, tc.errorMsg) + }) + } +} + +func TestDapr_Publish(t *testing.T) { + ctx := context.Background() + + type args struct { + ctx context.Context + data interface{} + topic string + connectionName string + } + + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "test publish", + args: args{ + ctx: ctx, + data: map[string]interface{}{ + "test": "test", + }, + topic: "test", + connectionName: "test", + }, + wantErr: false, + }, + { + name: "test publish without data", + args: args{ + ctx: ctx, + data: nil, + topic: "test", + connectionName: "test", + }, + wantErr: false, + }, + { + name: "test publish without topic", + args: args{ + ctx: ctx, + data: map[string]interface{}{ + "test": "test", + }, + topic: "", + connectionName: "test", + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := testClient + if err := r.Publish(tt.args.ctx, tt.args.connectionName, tt.args.data, tt.args.topic); (err != nil) != tt.wantErr { + t.Errorf("Dapr.Publish() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestDapr_Update(t *testing.T) { + tests := []struct { + name string + config interface{} + connectionName string + wantErr bool + }{ + { + name: "test update connection", + config: map[string]interface{}{ + "component": "foo", + }, + wantErr: false, + connectionName: "test", + }, + { + name: "test update connection with invalid config", + config: map[string]interface{}{ + "foo": "bar", + }, + connectionName: "test", + wantErr: true, + }, + { + name: "test update connection with nil config", + config: nil, + connectionName: "test", + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := testClient + if err := r.UpdateConnection(context.Background(), tt.connectionName, tt.config); (err != nil) != tt.wantErr { + t.Errorf("Dapr.Update() error = %v, wantErr %v", err, tt.wantErr) + } + if !tt.wantErr { + cmp, ok := tt.config.(map[string]interface{})["component"].(string) + assert.True(t, ok) + tmp, ok := r.(*Dapr) + assert.True(t, ok) + assert.Equal(t, cmp, tmp.openConnections[tt.connectionName].component) + } + }) + } +} diff --git a/pkg/pubsub/dapr/fake_dapr_client.go b/pkg/export/dapr/fake_dapr_client.go similarity index 87% rename from pkg/pubsub/dapr/fake_dapr_client.go rename to pkg/export/dapr/fake_dapr_client.go index 4bd36da5ecd..62236c1c74f 100644 --- a/pkg/pubsub/dapr/fake_dapr_client.go +++ b/pkg/export/dapr/fake_dapr_client.go @@ -17,7 +17,7 @@ import ( pb "github.com/dapr/go-sdk/dapr/proto/runtime/v1" "github.com/golang/protobuf/ptypes/empty" "github.com/google/uuid" - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/connection" + "github.com/open-policy-agent/gatekeeper/v3/pkg/export/driver" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/test/bufconn" @@ -326,66 +326,76 @@ func (s *testDaprServer) BulkPublishEventAlpha1(_ context.Context, req *pb.BulkP return &pb.BulkPublishResponse{FailedEntries: failedEntries}, nil } -func FakeConnection() (connection.Connection, func()) { +func FakeConnection() (driver.Driver, func()) { ctx := context.Background() c, f := getTestClient(ctx) return &Dapr{ - client: c, - pubSubComponent: "test", + openConnections: map[string]Connection{ + "test": { + client: c, + component: "test", + }, + }, }, f } -type FakeDapr struct { - // Array of clients to talk to different endpoints - client daprClient.Client - - // Name of the pubsub component - pubSubComponent string +type FakeDaprConnection struct { + component string + client daprClient.Client // closing function f func() } -func (r *FakeDapr) Publish(_ context.Context, _ interface{}, _ string) error { +type FakeDapr struct { + openConnections map[string]FakeDaprConnection +} + +func (r *FakeDapr) Publish(_ context.Context, _ string, _ interface{}, _ string) error { return nil } -func (r *FakeDapr) CloseConnection() error { - r.f() +func (r *FakeDapr) CloseConnection(connectionName string) error { + if len(r.openConnections) == 1 { + r.openConnections[connectionName].f() + } + delete(r.openConnections, connectionName) return nil } -func (r *FakeDapr) UpdateConnection(_ context.Context, config interface{}) error { - var cfg ClientConfig - m, ok := config.(map[string]interface{}) +func (r *FakeDapr) UpdateConnection(_ context.Context, connectionName string, config interface{}) error { + cfg, ok := config.(map[string]interface{}) if !ok { return fmt.Errorf("invalid type assertion, config is not in expected format") } - cfg.Component, ok = m["component"].(string) + component, ok := cfg["component"].(string) if !ok { return fmt.Errorf("failed to get value of component") } - r.pubSubComponent = cfg.Component + conn := r.openConnections[connectionName] + conn.component = component + r.openConnections[connectionName] = conn return nil } -// Returns a fake client for dapr. -func FakeNewConnection(ctx context.Context, config interface{}) (connection.Connection, error) { - var cfg ClientConfig - m, ok := config.(map[string]interface{}) +func (r *FakeDapr) CreateConnection(ctx context.Context, connectionName string, config interface{}) error { + var conn FakeDaprConnection + cfg, ok := config.(map[string]interface{}) if !ok { - return nil, fmt.Errorf("invalid type assertion, config is not in expected format") + return fmt.Errorf("invalid type assertion, config is not in expected format") } - cfg.Component, ok = m["component"].(string) + conn.component, ok = cfg["component"].(string) if !ok { - return nil, fmt.Errorf("failed to get value of component") + return fmt.Errorf("failed to get value of component") } c, f := getTestClient(ctx) + conn.client = c + conn.f = f + r.openConnections[connectionName] = conn + return nil +} - return &FakeDapr{ - client: c, - pubSubComponent: cfg.Component, - f: f, - }, nil +var FakeConn = &FakeDapr{ + openConnections: map[string]FakeDaprConnection{}, } diff --git a/pkg/export/driver/driver.go b/pkg/export/driver/driver.go new file mode 100644 index 00000000000..3b5a4b561d4 --- /dev/null +++ b/pkg/export/driver/driver.go @@ -0,0 +1,19 @@ +package driver + +import ( + "context" +) + +type Driver interface { + // Publish publishes single message with specific subject using a connection + Publish(ctx context.Context, connectionName string, data interface{}, subject string) error + + // CloseConnection closes a connection + CloseConnection(connectionName string) error + + // UpdateConnection updates an existing connection + UpdateConnection(ctx context.Context, connectionName string, config interface{}) error + + // CreateConnection creates new connection + CreateConnection(ctx context.Context, connectionName string, config interface{}) error +} diff --git a/pkg/export/system.go b/pkg/export/system.go new file mode 100644 index 00000000000..d9863c35a65 --- /dev/null +++ b/pkg/export/system.go @@ -0,0 +1,81 @@ +package export + +import ( + "context" + "fmt" + "sync" + + "github.com/open-policy-agent/gatekeeper/v3/pkg/export/dapr" + "github.com/open-policy-agent/gatekeeper/v3/pkg/export/driver" +) + +var SupportedDrivers = map[string]driver.Driver{ + dapr.Name: dapr.Connections, +} + +type System struct { + mux sync.RWMutex + connectionToDriver map[string]string +} + +func NewSystem() *System { + return &System{ + connectionToDriver: map[string]string{}, + } +} + +func (s *System) Publish(_ context.Context, connectionName string, subject string, msg interface{}) error { + s.mux.RLock() + defer s.mux.RUnlock() + if dName, ok := s.connectionToDriver[connectionName]; ok { + return SupportedDrivers[dName].Publish(context.Background(), connectionName, msg, subject) + } + return fmt.Errorf("connection is not initialized, name: %s ", connectionName) +} + +func (s *System) UpsertConnection(ctx context.Context, config interface{}, connectionName string, newDriver string) error { + s.mux.Lock() + defer s.mux.Unlock() + // Check if the connection already exists. + if oldDriver, ok := s.connectionToDriver[connectionName]; ok { + // If the provider is the same, update the existing connection. + if oldDriver == newDriver { + return SupportedDrivers[newDriver].UpdateConnection(ctx, connectionName, config) + } + } + // Check if the provider is supported. + if d, ok := SupportedDrivers[newDriver]; ok { + err := d.CreateConnection(ctx, connectionName, config) + if err != nil { + return err + } + + // Close the existing connection after successfully creating the new one. + if err := s.closeConnection(connectionName); err != nil { + return err + } + // Add the new connection and provider to the maps. + s.connectionToDriver[connectionName] = newDriver + return nil + } + return fmt.Errorf("driver %s is not supported", newDriver) +} + +func (s *System) CloseConnection(connectionName string) error { + s.mux.Lock() + defer s.mux.Unlock() + return s.closeConnection(connectionName) +} + +func (s *System) closeConnection(connectionName string) error { + if c, ok := s.connectionToDriver[connectionName]; ok { + if conn, ok := SupportedDrivers[c]; ok { + err := conn.CloseConnection(connectionName) + if err != nil { + return err + } + } + delete(s.connectionToDriver, connectionName) + } + return nil +} diff --git a/pkg/export/system_test.go b/pkg/export/system_test.go new file mode 100644 index 00000000000..df9c75f7e72 --- /dev/null +++ b/pkg/export/system_test.go @@ -0,0 +1,241 @@ +package export + +import ( + "context" + "os" + "sync" + "testing" + + "github.com/open-policy-agent/gatekeeper/v3/pkg/export/dapr" + "github.com/open-policy-agent/gatekeeper/v3/pkg/export/driver" + "github.com/open-policy-agent/gatekeeper/v3/pkg/export/testdriver" + "github.com/stretchr/testify/assert" +) + +var testSystem *System + +func TestMain(m *testing.M) { + ctx := context.Background() + SupportedDrivers = map[string]driver.Driver{ + dapr.Name: dapr.FakeConn, + } + testSystem = NewSystem() + cfg := map[string]interface{}{ + dapr.Name: map[string]interface{}{ + "component": "pubsub", + }, + } + for name, fakeConn := range SupportedDrivers { + testSystem.connectionToDriver[name] = name + _ = fakeConn.CreateConnection(ctx, name, cfg[name]) + } + r := m.Run() + for name, fakeConn := range testSystem.connectionToDriver { + _ = SupportedDrivers[fakeConn].CloseConnection(name) + } + + if r != 0 { + os.Exit(r) + } +} + +func TestNewSystem(t *testing.T) { + tests := []struct { + name string + input string + want *System + }{ + { + name: "requesting system", + want: &System{ + connectionToDriver: map[string]string{}, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ret := NewSystem() + assert.Equal(t, ret, tc.want) + }) + } +} + +func TestSystem_UpsertConnection(t *testing.T) { + ctx := context.Background() + + tests := []struct { + name string + config interface{} + connectionName string + newDriver string + setup func(*System) error + wantErr bool + }{ + { + name: "new connection with supported driver", + config: map[string]interface{}{"component": "pubsub"}, + connectionName: "conn1", + newDriver: dapr.Name, + setup: func(s *System) error { + s.connectionToDriver = map[string]string{} + SupportedDrivers[dapr.Name] = dapr.FakeConn + return nil + }, + wantErr: false, + }, + { + name: "update existing connection with same driver", + config: map[string]interface{}{"component": "pubsub1"}, + connectionName: "conn1", + newDriver: dapr.Name, + setup: func(s *System) error { + s.connectionToDriver["conn1"] = dapr.Name + SupportedDrivers[dapr.Name] = dapr.FakeConn + return SupportedDrivers[dapr.Name].CreateConnection(ctx, "conn1", map[string]interface{}{"component": "pubsub"}) + }, + wantErr: false, + }, + { + name: "new connection with unsupported driver", + config: map[string]interface{}{"component": "pubsub"}, + connectionName: "conn3", + newDriver: "unsupportedDriver", + setup: func(_ *System) error { return nil }, + wantErr: true, + }, + { + name: "update existing connection with different driver", + config: map[string]interface{}{"component": "pubsub"}, + connectionName: "conn4", + newDriver: dapr.Name, + setup: func(s *System) error { + s.connectionToDriver["conn4"] = testdriver.Name + SupportedDrivers[dapr.Name] = dapr.FakeConn + SupportedDrivers[testdriver.Name] = testdriver.FakeConn + return SupportedDrivers[testdriver.Name].CreateConnection(ctx, "conn4", "config4") + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + system := NewSystem() + if err := tt.setup(system); err != nil { + t.Fatalf("failed to setup test: %v", err) + } + + err := system.UpsertConnection(ctx, tt.config, tt.connectionName, tt.newDriver) + if (err != nil) != tt.wantErr { + t.Errorf("UpsertConnection() error = %v, wantErr %v", err, tt.wantErr) + } + + if !tt.wantErr { + if driver, ok := system.connectionToDriver[tt.connectionName]; !ok || driver != tt.newDriver { + t.Errorf("connection %s not found or driver mismatch: got %v, want %v", tt.connectionName, driver, tt.newDriver) + } + } + }) + } +} + +func TestSystem_CloseConnection(t *testing.T) { + tests := []struct { + name string + setup func(*System) + connectionName string + wantErr bool + }{ + { + name: "close existing connection", + setup: func(s *System) { + s.connectionToDriver["test-connection"] = dapr.Name + SupportedDrivers[dapr.Name] = dapr.FakeConn + _ = dapr.FakeConn.CreateConnection(context.TODO(), "test-connection", map[string]interface{}{"component": "pubsub"}) + }, + connectionName: "test-connection", + wantErr: false, + }, + { + name: "close non-existing connection", + setup: func(s *System) { + // No setup needed for non-existing connection + s.connectionToDriver = map[string]string{} + }, + connectionName: "non-existing-connection", + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := NewSystem() + if tt.setup != nil { + tt.setup(s) + } + + err := s.CloseConnection(tt.connectionName) + if (err != nil) != tt.wantErr { + t.Errorf("CloseConnection() error = %v, wantErr %v", err, tt.wantErr) + } + + if _, exists := s.connectionToDriver[tt.connectionName]; exists && !tt.wantErr { + t.Errorf("connection %s still exists after CloseConnection", tt.connectionName) + } + }) + } +} + +func TestSystem_Publish(t *testing.T) { + type fields struct { + connections map[string]string + } + type args struct { + ctx context.Context + connection string + topic string + msg interface{} + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + { + name: "There are no connections established", + fields: fields{ + connections: nil, + }, + args: args{ctx: context.Background(), connection: "audit", topic: "test", msg: nil}, + wantErr: true, + }, + { + name: "Exporting to a connection that does not exist", + fields: fields{ + connections: map[string]string{"audit": dapr.Name}, + }, + args: args{ctx: context.Background(), connection: "test", topic: "test", msg: nil}, + wantErr: true, + }, + { + name: "Exporting to a connection that does exist", + fields: fields{ + connections: testSystem.connectionToDriver, + }, + args: args{ctx: context.Background(), connection: "dapr", topic: "test", msg: nil}, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &System{ + mux: sync.RWMutex{}, + connectionToDriver: tt.fields.connections, + } + if err := s.Publish(tt.args.ctx, tt.args.connection, tt.args.topic, tt.args.msg); (err != nil) != tt.wantErr { + t.Errorf("System.Publish() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/pkg/export/testdriver/testdriver.go b/pkg/export/testdriver/testdriver.go new file mode 100644 index 00000000000..4283a256a5c --- /dev/null +++ b/pkg/export/testdriver/testdriver.go @@ -0,0 +1,48 @@ +package testdriver + +import ( + "context" + "fmt" +) + +const Name = "testdriver" + +var FakeConn = &Connection{ + openConnections: make(map[string]FakeConnection), +} + +// Connection represents driver to use testdriver. +type Connection struct { + openConnections map[string]FakeConnection +} + +type FakeConnection struct { + name string +} + +func (r *Connection) Publish(_ context.Context, _ string, _ interface{}, _ string) error { + return nil +} + +func (r *Connection) CloseConnection(connectionName string) error { + delete(r.openConnections, connectionName) + return nil +} + +func (r *Connection) UpdateConnection(_ context.Context, connectionName string, config interface{}) error { + name, ok := config.(string) + if !ok { + return fmt.Errorf("invalid type assertion, config is not in expected format") + } + r.openConnections[connectionName] = FakeConnection{name: name} + return nil +} + +func (r *Connection) CreateConnection(_ context.Context, connectionName string, config interface{}) error { + name, ok := config.(string) + if !ok { + return fmt.Errorf("invalid type assertion, config is not in expected format") + } + r.openConnections[connectionName] = FakeConnection{name: name} + return nil +} diff --git a/pkg/pubsub/connection/connection.go b/pkg/pubsub/connection/connection.go deleted file mode 100644 index 0edb6a74daf..00000000000 --- a/pkg/pubsub/connection/connection.go +++ /dev/null @@ -1,17 +0,0 @@ -package connection - -import ( - "context" -) - -// PubSub is the interface that wraps pubsub methods. -type Connection interface { - // Publish single message over a specific topic/channel - Publish(ctx context.Context, data interface{}, topic string) error - - // Close connections - CloseConnection() error - - // Update connection - UpdateConnection(ctx context.Context, data interface{}) error -} diff --git a/pkg/pubsub/dapr/dapr.go b/pkg/pubsub/dapr/dapr.go deleted file mode 100644 index 0db60445494..00000000000 --- a/pkg/pubsub/dapr/dapr.go +++ /dev/null @@ -1,83 +0,0 @@ -package dapr - -import ( - "context" - "encoding/json" - "fmt" - - daprClient "github.com/dapr/go-sdk/client" - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/connection" -) - -type ClientConfig struct { - // Name of the component to be used for pub sub messaging - Component string `json:"component"` -} - -// Dapr represents driver for interacting with pub sub using dapr. -type Dapr struct { - // Array of clients to talk to different endpoints - client daprClient.Client - - // Name of the pubsub component - pubSubComponent string -} - -const ( - Name = "dapr" -) - -func (r *Dapr) Publish(_ context.Context, data interface{}, topic string) error { - jsonData, err := json.Marshal(data) - if err != nil { - return fmt.Errorf("error marshaling data: %w", err) - } - - err = r.client.PublishEvent(context.Background(), r.pubSubComponent, topic, jsonData) - if err != nil { - return fmt.Errorf("error publishing message to dapr: %w", err) - } - - return nil -} - -func (r *Dapr) CloseConnection() error { - return nil -} - -func (r *Dapr) UpdateConnection(_ context.Context, config interface{}) error { - var cfg ClientConfig - m, ok := config.(map[string]interface{}) - if !ok { - return fmt.Errorf("invalid type assertion, config is not in expected format") - } - cfg.Component, ok = m["component"].(string) - if !ok { - return fmt.Errorf("failed to get value of component") - } - r.pubSubComponent = cfg.Component - return nil -} - -// Returns a new client for dapr. -func NewConnection(_ context.Context, config interface{}) (connection.Connection, error) { - var cfg ClientConfig - m, ok := config.(map[string]interface{}) - if !ok { - return nil, fmt.Errorf("invalid type assertion, config is not in expected format") - } - cfg.Component, ok = m["component"].(string) - if !ok { - return nil, fmt.Errorf("failed to get value of component") - } - - tmp, err := daprClient.NewClient() - if err != nil { - return nil, err - } - - return &Dapr{ - client: tmp, - pubSubComponent: cfg.Component, - }, nil -} diff --git a/pkg/pubsub/dapr/dapr_test.go b/pkg/pubsub/dapr/dapr_test.go deleted file mode 100644 index 5a2e72615b1..00000000000 --- a/pkg/pubsub/dapr/dapr_test.go +++ /dev/null @@ -1,151 +0,0 @@ -package dapr - -import ( - "context" - "os" - "testing" - - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/connection" - "github.com/stretchr/testify/assert" -) - -var testClient connection.Connection - -func TestMain(m *testing.M) { - c, f := FakeConnection() - testClient = c - r := m.Run() - f() - - if r != 0 { - os.Exit(r) - } -} - -func TestNewConnection(t *testing.T) { - tests := []struct { - name string - config interface{} - expected connection.Connection - errorMsg string - }{ - { - name: "invalid config", - config: "test", - expected: nil, - errorMsg: "invalid type assertion, config is not in expected format", - }, - { - name: "config with missing component", - config: map[string]interface{}{"enableBatching": true}, - expected: nil, - errorMsg: "failed to get value of component", - }, - } - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - ret, err := NewConnection(context.TODO(), tc.config) - assert.Equal(t, ret, tc.expected) - assert.EqualError(t, err, tc.errorMsg) - }) - } -} - -func TestDapr_Publish(t *testing.T) { - ctx := context.Background() - - type args struct { - ctx context.Context - data interface{} - topic string - } - - tests := []struct { - name string - args args - wantErr bool - }{ - { - name: "test publish", - args: args{ - ctx: ctx, - data: map[string]interface{}{ - "test": "test", - }, - topic: "test", - }, - wantErr: false, - }, - { - name: "test publish without data", - args: args{ - ctx: ctx, - data: nil, - topic: "test", - }, - wantErr: false, - }, - { - name: "test publish without topic", - args: args{ - ctx: ctx, - data: map[string]interface{}{ - "test": "test", - }, - topic: "", - }, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - r := testClient - if err := r.Publish(tt.args.ctx, tt.args.data, tt.args.topic); (err != nil) != tt.wantErr { - t.Errorf("Dapr.Publish() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} - -func TestDapr_UpdateConnection(t *testing.T) { - tests := []struct { - name string - config interface{} - wantErr bool - }{ - { - name: "test update connection", - config: map[string]interface{}{ - "component": "foo", - }, - wantErr: false, - }, - { - name: "test update connection with invalid config", - config: map[string]interface{}{ - "foo": "bar", - }, - wantErr: true, - }, - { - name: "test update connection with nil config", - config: nil, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - r := testClient - if err := r.UpdateConnection(context.Background(), tt.config); (err != nil) != tt.wantErr { - t.Errorf("Dapr.UpdateConnection() error = %v, wantErr %v", err, tt.wantErr) - } - if !tt.wantErr { - cmp, ok := tt.config.(map[string]interface{})["component"].(string) - assert.True(t, ok) - tmp, ok := r.(*Dapr) - assert.True(t, ok) - assert.Equal(t, cmp, tmp.pubSubComponent) - } - }) - } -} diff --git a/pkg/pubsub/provider/fake_provider.go b/pkg/pubsub/provider/fake_provider.go deleted file mode 100644 index 6e1173eb9b7..00000000000 --- a/pkg/pubsub/provider/fake_provider.go +++ /dev/null @@ -1,11 +0,0 @@ -package provider - -import ( - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/dapr" -) - -func FakeProviders() { - pubSubs = newPubSubSet(map[string]InitiateConnection{ - dapr.Name: dapr.FakeNewConnection, - }) -} diff --git a/pkg/pubsub/provider/provider.go b/pkg/pubsub/provider/provider.go deleted file mode 100644 index 5e1d0601014..00000000000 --- a/pkg/pubsub/provider/provider.go +++ /dev/null @@ -1,39 +0,0 @@ -package provider - -import ( - "context" - - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/connection" - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/dapr" -) - -var pubSubs = newPubSubSet(map[string]InitiateConnection{ - dapr.Name: dapr.NewConnection, -}, -) - -type pubSubSet struct { - supportedPubSub map[string]InitiateConnection -} - -// returns new client for pub sub tool. -type InitiateConnection func(ctx context.Context, config interface{}) (connection.Connection, error) - -func newPubSubSet(pubSubs map[string]InitiateConnection) *pubSubSet { - supported := make(map[string]InitiateConnection) - set := &pubSubSet{ - supportedPubSub: supported, - } - for name := range pubSubs { - set.supportedPubSub[name] = pubSubs[name] - } - return set -} - -func List() map[string]InitiateConnection { - ret := make(map[string]InitiateConnection) - for name, new := range pubSubs.supportedPubSub { - ret[name] = new - } - return ret -} diff --git a/pkg/pubsub/provider/provider_test.go b/pkg/pubsub/provider/provider_test.go deleted file mode 100644 index c81602525ac..00000000000 --- a/pkg/pubsub/provider/provider_test.go +++ /dev/null @@ -1,51 +0,0 @@ -package provider - -import ( - "testing" - - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/dapr" -) - -func Test_newPubSubSet(t *testing.T) { - tests := []struct { - name string - pubSubs map[string]InitiateConnection - wantKey string - }{ - { - name: "only one provider is available", - pubSubs: map[string]InitiateConnection{ - dapr.Name: dapr.NewConnection, - }, - wantKey: dapr.Name, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := newPubSubSet(tt.pubSubs) - if _, ok := got.supportedPubSub[tt.wantKey]; !ok { - t.Errorf("newPubSubSet() = %#v, want key %#v", got.supportedPubSub, tt.wantKey) - } - }) - } -} - -func TestList(t *testing.T) { - tests := []struct { - name string - wantKey string - }{ - { - name: "only one provider is available", - wantKey: dapr.Name, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := List() - if _, ok := got[tt.wantKey]; !ok { - t.Errorf("List() = %#v, want key %#v", got, tt.wantKey) - } - }) - } -} diff --git a/pkg/pubsub/system.go b/pkg/pubsub/system.go deleted file mode 100644 index da60a1be8e6..00000000000 --- a/pkg/pubsub/system.go +++ /dev/null @@ -1,87 +0,0 @@ -package pubsub - -import ( - "context" - "fmt" - "sync" - - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/connection" - prvd "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/provider" -) - -type System struct { - mux sync.RWMutex - connections map[string]connection.Connection - providers map[string]string -} - -func NewSystem() *System { - return &System{} -} - -func (s *System) Publish(_ context.Context, connection string, topic string, msg interface{}) error { - s.mux.RLock() - defer s.mux.RUnlock() - if len(s.connections) > 0 { - if c, ok := s.connections[connection]; ok { - return c.Publish(context.Background(), msg, topic) - } - return fmt.Errorf("connection is not initialized, name: %s ", connection) - } - return fmt.Errorf("No connections are established") -} - -func (s *System) UpsertConnection(ctx context.Context, config interface{}, name string, provider string) error { - s.mux.Lock() - defer s.mux.Unlock() - // Check if the connection already exists. - if conn, ok := s.connections[name]; ok { - // If the provider is the same, update the existing connection. - if s.providers[name] == provider { - return conn.UpdateConnection(ctx, config) - } - } - // Check if the provider is supported. - if newConnFunc, ok := prvd.List()[provider]; ok { - newConn, err := newConnFunc(ctx, config) - if err != nil { - return err - } - - // Close the existing connection after successfully creating the new one. - if err := s.closeConnection(name); err != nil { - return err - } - // Add the new connection and provider to the maps. - if s.connections == nil { - s.connections = map[string]connection.Connection{} - } - if s.providers == nil { - s.providers = map[string]string{} - } - s.connections[name] = newConn - s.providers[name] = provider - return nil - } - return fmt.Errorf("pub-sub provider %s is not supported", provider) -} - -func (s *System) CloseConnection(connection string) error { - s.mux.Lock() - defer s.mux.Unlock() - return s.closeConnection(connection) -} - -func (s *System) closeConnection(connection string) error { - if len(s.connections) > 0 { - if c, ok := s.connections[connection]; ok { - err := c.CloseConnection() - if err != nil { - return err - } - delete(s.connections, connection) - delete(s.providers, connection) - } - } - return nil -} diff --git a/pkg/pubsub/system_test.go b/pkg/pubsub/system_test.go deleted file mode 100644 index a58e43c8cb7..00000000000 --- a/pkg/pubsub/system_test.go +++ /dev/null @@ -1,256 +0,0 @@ -package pubsub - -import ( - "context" - "os" - "sync" - "testing" - - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/connection" - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/dapr" - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/provider" - "github.com/stretchr/testify/assert" -) - -var testSystem *System - -func TestMain(m *testing.M) { - ctx := context.Background() - provider.FakeProviders() - tmp := provider.List() - testSystem = NewSystem() - testSystem.connections = make(map[string]connection.Connection) - testSystem.providers = make(map[string]string) - cfg := map[string]interface{}{ - dapr.Name: map[string]interface{}{ - "component": "pubsub", - }, - } - for name, fakeConn := range tmp { - testSystem.providers[name] = name - testSystem.connections[name], _ = fakeConn(ctx, cfg[name]) - } - r := m.Run() - for _, fakeConn := range testSystem.connections { - _ = fakeConn.CloseConnection() - } - - if r != 0 { - os.Exit(r) - } -} - -func TestNewSystem(t *testing.T) { - tests := []struct { - name string - input string - want *System - }{ - { - name: "requesting system", - want: &System{}, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - ret := NewSystem() - assert.Equal(t, ret, tc.want) - }) - } -} - -func TestSystem_UpsertConnection(t *testing.T) { - type fields struct { - connections map[string]connection.Connection - providers map[string]string - s *System - } - type args struct { - ctx context.Context - config interface{} - name string - provider string - } - tests := []struct { - name string - fields fields - args args - wantErr bool - match bool - }{ - { - name: "Create a new connection with dapr provider", - fields: fields{ - connections: testSystem.connections, - providers: testSystem.providers, - s: &System{}, - }, - args: args{ - ctx: context.Background(), - config: map[string]interface{}{ - "component": "pubsub", - }, - name: "dapr", - provider: "dapr", - }, - wantErr: false, - match: true, - }, - { - name: "Update a connection to use test provider", - fields: fields{ - connections: nil, - providers: map[string]string{"audit": "dapr"}, - s: &System{ - mux: sync.RWMutex{}, - providers: map[string]string{"audit": "dapr"}, - }, - }, - args: args{ - ctx: context.Background(), - config: map[string]interface{}{ - "component": "pubsub", - }, - name: "audit", - provider: "test", - }, - wantErr: true, - match: true, - }, - { - name: "Update a connection using same provider", - fields: fields{ - connections: testSystem.connections, - providers: map[string]string{"dapr": "dapr"}, - s: &System{ - mux: sync.RWMutex{}, - providers: testSystem.providers, - connections: testSystem.connections, - }, - }, - args: args{ - ctx: context.Background(), - config: map[string]interface{}{ - "component": "test", - }, - name: "audit", - provider: "dapr", - }, - wantErr: false, - match: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if err := tt.fields.s.UpsertConnection(tt.args.ctx, tt.args.config, tt.args.name, tt.args.provider); (err != nil) != tt.wantErr { - t.Errorf("System.UpsertConnection() error = %v, wantErr %v", err, tt.wantErr) - } - assert.NotEqual(t, nil, tt.fields.s.connections) - if tt.match { - assert.Equal(t, tt.fields.providers, tt.fields.s.providers) - } else { - assert.NotEqual(t, tt.fields.providers, tt.fields.s.providers) - } - }) - } -} - -func TestSystem_CloseConnection(t *testing.T) { - type fields struct { - connections map[string]connection.Connection - providers map[string]string - } - type args struct { - connection string - } - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - { - name: "closing connection", - fields: fields{ - connections: map[string]connection.Connection{"audit": &dapr.Dapr{}}, - providers: map[string]string{"audit": "dapr"}, - }, - args: args{connection: "audit"}, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := &System{ - mux: sync.RWMutex{}, - connections: tt.fields.connections, - providers: tt.fields.providers, - } - if err := s.CloseConnection(tt.args.connection); (err != nil) != tt.wantErr { - t.Errorf("System.CloseConnection() error = %v, wantErr %v", err, tt.wantErr) - _, ok := s.connections[tt.args.connection] - assert.False(t, ok) - } - }) - } -} - -func TestSystem_Publish(t *testing.T) { - type fields struct { - connections map[string]connection.Connection - providers map[string]string - } - type args struct { - ctx context.Context - connection string - topic string - msg interface{} - } - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - { - name: "There are no connections established", - fields: fields{ - connections: nil, - providers: nil, - }, - args: args{ctx: context.Background(), connection: "audit", topic: "test", msg: nil}, - wantErr: true, - }, - { - name: "Publishing to a connection that does not exist", - fields: fields{ - connections: map[string]connection.Connection{"audit": &dapr.Dapr{}}, - providers: map[string]string{"audit": "dapr"}, - }, - args: args{ctx: context.Background(), connection: "test", topic: "test", msg: nil}, - wantErr: true, - }, - { - name: "Publishing to a connection that does exist", - fields: fields{ - connections: testSystem.connections, - providers: testSystem.providers, - }, - args: args{ctx: context.Background(), connection: "dapr", topic: "test", msg: nil}, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := &System{ - mux: sync.RWMutex{}, - connections: tt.fields.connections, - providers: tt.fields.providers, - } - if err := s.Publish(tt.args.ctx, tt.args.connection, tt.args.topic, tt.args.msg); (err != nil) != tt.wantErr { - t.Errorf("System.Publish() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} diff --git a/test/bats/test.bats b/test/bats/test.bats index 708b0dacaeb..1f2a29698a0 100644 --- a/test/bats/test.bats +++ b/test/bats/test.bats @@ -661,23 +661,23 @@ __expansion_audit_test() { run kubectl delete --ignore-not-found -f test/expansion/expand_pod_cronjob.yaml } -@test "gatekeeper pubsub test" { - if [ -z $ENABLE_PUBSUB_TESTS ]; then - skip "skipping pubsub tests" +@test "gatekeeper export_violation test" { + if [ -z $ENABLE_EXPORT_TESTS ]; then + skip "skipping export tests" fi run kubectl create ns nginx - run kubectl create -f test/pubsub/nginx_deployment.yaml + run kubectl create -f test/export/nginx_deployment.yaml - run kubectl apply -f test/pubsub/k8srequiredlabels_ct.yaml - run kubectl apply -f test/pubsub/pod_must_have_test.yaml + run kubectl apply -f test/export/k8srequiredlabels_ct.yaml + run kubectl apply -f test/export/pod_must_have_test.yaml wait_for_process ${WAIT_TIME} ${SLEEP_TIME} "constraint_enforced k8srequiredlabels pod-must-have-test" wait_for_process ${WAIT_TIME} ${SLEEP_TIME} "total_violations" - run kubectl delete -f test/pubsub/k8srequiredlabels_ct.yaml --ignore-not-found - run kubectl delete -f test/pubsub/pod_must_have_test.yaml --ignore-not-found - run kubectl delete -f test/pubsub/nginx_deployment.yaml --ignore-not-found + run kubectl delete -f test/export/k8srequiredlabels_ct.yaml --ignore-not-found + run kubectl delete -f test/export/pod_must_have_test.yaml --ignore-not-found + run kubectl delete -f test/export/nginx_deployment.yaml --ignore-not-found run kubectl delete ns nginx --ignore-not-found } diff --git a/test/pubsub/fake-subscriber/Dockerfile b/test/export/fake-subscriber/Dockerfile similarity index 83% rename from test/pubsub/fake-subscriber/Dockerfile rename to test/export/fake-subscriber/Dockerfile index 05b9cb0e837..fc922cb76cb 100644 --- a/test/pubsub/fake-subscriber/Dockerfile +++ b/test/export/fake-subscriber/Dockerfile @@ -12,7 +12,7 @@ ENV GO111MODULE=on \ GOARCH=${TARGETARCH} \ GOARM=${TARGETVARIANT} -WORKDIR /go/src/github.com/open-policy-agent/gatekeeper/test/pubsub/fake-subscriber +WORKDIR /go/src/github.com/open-policy-agent/gatekeeper/test/export/fake-subscriber COPY . . @@ -24,7 +24,7 @@ FROM gcr.io/distroless/static-debian12@sha256:8dd8d3ca2cf283383304fd45a5c9c74d5f WORKDIR / -COPY --from=builder /go/src/github.com/open-policy-agent/gatekeeper/test/pubsub/fake-subscriber/main . +COPY --from=builder /go/src/github.com/open-policy-agent/gatekeeper/test/export/fake-subscriber/main . USER 65532:65532 diff --git a/test/pubsub/fake-subscriber/main.go b/test/export/fake-subscriber/main.go similarity index 98% rename from test/pubsub/fake-subscriber/main.go rename to test/export/fake-subscriber/main.go index fadd3aac2c9..96271c8a4d7 100644 --- a/test/pubsub/fake-subscriber/main.go +++ b/test/export/fake-subscriber/main.go @@ -11,7 +11,7 @@ import ( daprd "github.com/dapr/go-sdk/service/http" ) -type PubsubMsg struct { +type ExportMsg struct { ID string `json:"id,omitempty"` Details interface{} `json:"details,omitempty"` EventType string `json:"eventType,omitempty"` @@ -52,7 +52,7 @@ func main() { } func eventHandler(_ context.Context, e *common.TopicEvent) (retry bool, err error) { - var msg PubsubMsg + var msg ExportMsg jsonInput, err := strconv.Unquote(string(e.RawData)) if err != nil { log.Fatalf("error unquoting %v", err) diff --git a/test/pubsub/fake-subscriber/manifest/subscriber.yaml b/test/export/fake-subscriber/manifest/subscriber.yaml similarity index 100% rename from test/pubsub/fake-subscriber/manifest/subscriber.yaml rename to test/export/fake-subscriber/manifest/subscriber.yaml diff --git a/test/pubsub/k8srequiredlabels_ct.yaml b/test/export/k8srequiredlabels_ct.yaml similarity index 100% rename from test/pubsub/k8srequiredlabels_ct.yaml rename to test/export/k8srequiredlabels_ct.yaml diff --git a/test/pubsub/nginx_deployment.yaml b/test/export/nginx_deployment.yaml similarity index 100% rename from test/pubsub/nginx_deployment.yaml rename to test/export/nginx_deployment.yaml diff --git a/test/pubsub/pod_must_have_test.yaml b/test/export/pod_must_have_test.yaml similarity index 100% rename from test/pubsub/pod_must_have_test.yaml rename to test/export/pod_must_have_test.yaml diff --git a/test/pubsub/publish-components.yaml b/test/export/publish-components.yaml similarity index 96% rename from test/pubsub/publish-components.yaml rename to test/export/publish-components.yaml index 9686935dd01..e623acabe6f 100644 --- a/test/pubsub/publish-components.yaml +++ b/test/export/publish-components.yaml @@ -21,7 +21,7 @@ metadata: name: audit namespace: gatekeeper-system data: - provider: "dapr" + driver: "dapr" config: | { "component": "pubsub" diff --git a/website/docs/audit.md b/website/docs/audit.md index f73b9ae0649..9ce938be687 100644 --- a/website/docs/audit.md +++ b/website/docs/audit.md @@ -133,14 +133,14 @@ In addition to violations, these other audit events may be useful (all uniquely All of these events (including `violation_audited`) are marked with the same `audit_id` for a given audit run. -### Pubsub channel +### Export violations -This feature uses publish and subscribe (pubsub) model that allows Gatekeeper to export audit violations over a broker that can be consumed by a subscriber independently. Therefore, pubsub violations are not subject to reporting limits. Please refer to [this](pubsub.md) guide to configure audit to push violations over a channel. +This feature allows plugging in different backends that allows Gatekeeper to export audit violations. Therefore, violations are not subject to reporting limits. Please refer to [this](export.md) guide to configure audit to push violations via this feature. -Limitations/drawbacks of getting violations using pubsub channel: +Limitations/drawbacks of exporting violations: -- There is an inherent risk of messages getting dropped. You might not receive all the published violations. -- Additional dependency on pubsub broker. +- There is a risk of messages getting dropped. You might not receive all the exported violations. This depends on the type of backend you are using for delivery. For example, using a network as backend to export violation has the risk of messages getting dropped. +- Additional dependency depending on what is plugged in. For example, using pubsub tools to export violations. ## Running Audit For more details on how to deploy audit and @@ -148,7 +148,7 @@ number of instances to run, please refer to [operations audit](operations.md#aud ## Configuring Audit -- Audit violations per constraint: set `--constraint-violations-limit=123` (defaults to `20`). NOTE: This flag only impacts when gathering audit results using the constraint status model. If you are gathering audit results using the pubsub model, please refer to the [pubsub](pubsub.md) guide. Both approaches for getting audit violations can be configured independently and work simultaneously without any interference. +- Audit violations per constraint: set `--constraint-violations-limit=123` (defaults to `20`). NOTE: This flag only impacts when gathering audit results using the constraint status model. If you want to gather audit results via other means, please refer to the [exporting audit results](export.md) guide. Both approaches for getting audit violations can be configured independently and work simultaneously without any interference. - Audit chunk size: set `--audit-chunk-size=400` (defaults to `500`, `0` = infinite) Lower chunk size can reduce memory consumption of the auditing `Pod` but can increase the number requests to the Kubernetes API server. - Audit interval: set `--audit-interval=123` (defaults to every `60` seconds). Disable audit interval by setting `--audit-interval=0` - Audit api server cache write to disk (Gatekeeper v3.7.0+): Starting from v3.7.0, by default, audit writes api server cache to the disk attached to the node. This reduces the memory consumption of the audit `pod`. If there are concerns with high IOPS, then switch audit to write cache to a tmpfs ramdisk instead. NOTE: write to ramdisk will increase memory footprint of the audit `pod`. diff --git a/website/docs/export-driver-walkthrough.md b/website/docs/export-driver-walkthrough.md new file mode 100644 index 00000000000..4a79c1bbe58 --- /dev/null +++ b/website/docs/export-driver-walkthrough.md @@ -0,0 +1,100 @@ +--- +id: export-driver +title: Export Interface/Driver walkthrough +--- + +This guide provides an overview of the driver interface, including details of its structure and functionality. Additionally, it offers instructions on adding a new driver and utilizing different backends to export audit violations. + +## Driver interface + +```go +type Driver interface { + // Publish publishes single message with specific subject using a connection + Publish(ctx context.Context, connectionName string, data interface{}, subject string) error + + // CloseConnection closes a connection + CloseConnection(connectionName string) error + + // UpdateConnection updates an existing connection + UpdateConnection(ctx context.Context, connectionName string, config interface{}) error + + // CreateConnection creates new connection + CreateConnection(ctx context.Context, connectionName string, config interface{}) error +} +``` + +As an example, the Dapr driver implements these methods to publish message and manage connection to do so. Please refer to [dapr.go](https://github.com/open-policy-agent/gatekeeper/blob/master/pkg/export/dapr/dapr.go) to understand the logic that goes in each of these methods. + +### How to add new driver to export audit violations to foo backend + +A driver must maintain a map of open connections associated with backend `foo`. + +```go +type Connection struct { + // properties needed for individual connection +} + +type Foo struct { + openConnections map[string]Connection +} + +const ( + Name = "foo" +) + +var Connections = &Foo{ + openConnections: make(map[string]Connection), +} + +``` + +A driver must implement the `Driver` interface. + +```go +func (r *Foo) Publish(ctx context.Context, connectionName string, data interface{}, subject string) error { + ... +} + +func (r *Foo) loseConnection(connectionName string) error { + ... +} + +func (r *Foo) UpdateConnection(ctx context.Context, connectionName string, config interface{}) error { + ... +} + +func (r *Foo) CreateConnection(ctx context.Context, connectionName string, config interface{}) error { + ... +} +``` + +This newly added driver's `Connections` exported variable must be added to the map of `SupportedDrivers` in [system.go](https://github.com/open-policy-agent/gatekeeper/blob/master/pkg/export/provider/system.go). For example, + +```go +var SupportedDrivers = map[string]driver.Driver{ + dapr.Name: dapr.Connections, + foo.Name: foo.Connections, +} +``` + +And thats it! Exporter system will take the newly added driver into account and whenever a configMap to establish connection to export message is created. + +### How to establish connections to different backend + +To enable audit to use this driver to publish messages, a connection configMap with appropriate `config` and `driver` is needed. For example, + +```yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: audit + namespace: gatekeeper-system +data: + driver: "foo" + config: | + { + + } +``` + +> The `data.driver` field must exist and must match one of the keys of the `SupportedDrivers` map that was defined earlier to use the corresponding driver. The `data.config` field in the configuration can vary depending on the driver being used. For dapr driver, `data.config` must be `{"component": "pubsub"}`. diff --git a/website/docs/pubsub.md b/website/docs/export.md similarity index 82% rename from website/docs/pubsub.md rename to website/docs/export.md index 8c1df5fb3c0..307a0dbbd35 100644 --- a/website/docs/pubsub.md +++ b/website/docs/export.md @@ -1,6 +1,6 @@ --- -id: pubsub -title: Consuming violations using Pubsub +id: export +title: Exporting violations --- `Feature State`: Gatekeeper version v3.13+ (alpha) @@ -9,7 +9,7 @@ title: Consuming violations using Pubsub ## Description -This feature pushes audit violations to a pubsub service. Users can subscribe to pubsub service to consume violations. +This feature exports audit violations to a backend from where users can consume violations. > To gain insights into different methods of obtaining audit violations and the respective trade-offs for each approach, please refer to [Reading Audit Results](audit.md#reading-audit-results). @@ -17,11 +17,11 @@ This feature pushes audit violations to a pubsub service. Users can subscribe to Install prerequisites such as a pubsub tool, a message broker etc. -### Setting up audit with pubsub enabled +### Setting up audit to export violations -In the audit deployment, set the `--enable-pub-sub` flag to `true` to publish audit violations. Additionally, use `--audit-connection` (defaults to `audit-connection`) and `--audit-channel`(defaults to `audit-channel`) flags to allow audit to publish violations using desired connection onto desired channel. `--audit-connection` must be set to the name of the connection config, and `--audit-channel` must be set to name of the channel where violations should get published. +In the audit deployment, set the `--enable-violation-export` flag to `true` to export audit violations. Additionally, use `--audit-connection` (defaults to `audit-connection`) and `--audit-channel`(defaults to `audit-channel`) flags to allow audit to export violations using desired connection onto desired channel. `--audit-connection` must be set to the name of the connection config, and `--audit-channel` must be set to name of the channel where violations should get published. -A ConfigMap that contains `provider` and `config` fields in `data` is required to establish connection for sending violations over the channel. Following is an example ConfigMap to establish a connection that uses Dapr to publish messages: +A ConfigMap that contains `driver` and `config` fields in `data` is required to establish connection for sending violations over the channel. Following is an example ConfigMap to establish a connection that uses Dapr to export messages: ```yaml apiVersion: v1 @@ -30,20 +30,20 @@ metadata: name: audit-connection namespace: gatekeeper-system data: - provider: "dapr" + driver: "dapr" config: | { "component": "pubsub" } ``` -- `provider` field determines which tool/driver should be used to establish a connection. Valid values are: `dapr` +- `driver` field determines which tool/driver should be used to establish a connection. Valid values are: `dapr` - `config` field is a json object that configures how the connection is made. E.g. which queue messages should be sent to. -#### Available Pubsub drivers +#### Available drivers Dapr: https://dapr.io/ -### Quick start with publishing violations using Dapr and Redis +### Quick start with exporting violations using Dapr and Redis #### Prerequisites @@ -128,9 +128,9 @@ Dapr: https://dapr.io/ ``` > [!IMPORTANT] - > Please make sure `fake-subscriber` image is built and available in your cluster. Dockerfile to build image for `fake-subscriber` is under [gatekeeper/test/fake-subscriber](https://github.com/open-policy-agent/gatekeeper/tree/master/test/pubsub/fake-subscriber). + > Please make sure `fake-subscriber` image is built and available in your cluster. Dockerfile to build image for `fake-subscriber` is under [gatekeeper/test/fake-subscriber](https://github.com/open-policy-agent/gatekeeper/tree/master/test/export/fake-subscriber). -#### Configure Gatekeeper with Pubsub enabled +#### Configure Gatekeeper with Export enabled 1. Create Gatekeeper namespace, and create Dapr pubsub component and Redis secret in Gatekeeper's namespace (`gatekeeper-system` by default). Please make sure to update `gatekeeper-system` namespace for the next steps if your cluster's Gatekeeper namespace is different. @@ -156,13 +156,13 @@ Dapr: https://dapr.io/ EOF ``` -2. To upgrade or install Gatekeeper with `--enable-pub-sub` set to `true`, `--audit-connection` set to `audit-connection`, `--audit-channel` set to `audit-channel` on audit pod. +2. To upgrade or install Gatekeeper with `--enable-violation-export` set to `true`, `--audit-connection` set to `audit-connection`, `--audit-channel` set to `audit-channel` on audit pod. ```shell # auditPodAnnotations is used to add annotations required by Dapr to inject sidecar to audit pod echo 'auditPodAnnotations: {dapr.io/enabled: "true", dapr.io/app-id: "audit", dapr.io/metrics-port: "9999", dapr.io/sidecar-seccomp-profile-type: "RuntimeDefault"}' > /tmp/annotations.yaml helm upgrade --install gatekeeper gatekeeper/gatekeeper --namespace gatekeeper-system \ - --set audit.enablePubsub=true \ + --set enableViolationExport=true \ --set audit.connection=audit-connection \ --set audit.channel=audit-channel \ --values /tmp/annotations.yaml @@ -180,7 +180,7 @@ Dapr: https://dapr.io/ name: audit-connection namespace: gatekeeper-system data: - provider: "dapr" + driver: "dapr" config: | { "component": "pubsub" @@ -188,7 +188,7 @@ Dapr: https://dapr.io/ EOF ``` - **Note:** Name of the connection configMap must match the value of `--audit-connection` for it to be used by audit to publish violation. At the moment, only one connection config can exists for audit. + **Note:** Name of the connection configMap must match the value of `--audit-connection` for it to be used by audit to export violation. At the moment, only one connection config can exists for audit. 4. Create the constraint templates and constraints, and make sure audit ran by checking constraints. If constraint status is updated with information such as `auditTimeStamp` or `totalViolations`, then audit has ran at least once. Additionally, populated `TOTAL-VIOLATIONS` field for all constraints while listing constraints also indicates that audit has ran at least once. @@ -203,12 +203,12 @@ Dapr: https://dapr.io/ ```log kubectl logs -l app=sub -c go-sub -n fake-subscriber 2023/07/18 20:16:41 Listening... - 2023/07/18 20:37:20 main.PubsubMsg{ID:"2023-07-18T20:37:19Z", Details:map[string]interface {}{"missing_labels":[]interface {}{"test"}}, EventType:"violation_audited", Group:"constraints.gatekeeper.sh", Version:"v1beta1", Kind:"K8sRequiredLabels", Name:"pod-must-have-test", Namespace:"", Message:"you must provide labels: {\"test\"}", EnforcementAction:"deny", ConstraintAnnotations:map[string]string(nil), ResourceGroup:"", ResourceAPIVersion:"v1", ResourceKind:"Pod", ResourceNamespace:"nginx", ResourceName:"nginx-deployment-58899467f5-j85bs", ResourceLabels:map[string]string{"app":"nginx", "owner":"admin", "pod-template-hash":"58899467f5"}} + 2023/07/18 20:37:20 main.ExportMsg{ID:"2023-07-18T20:37:19Z", Details:map[string]interface {}{"missing_labels":[]interface {}{"test"}}, EventType:"violation_audited", Group:"constraints.gatekeeper.sh", Version:"v1beta1", Kind:"K8sRequiredLabels", Name:"pod-must-have-test", Namespace:"", Message:"you must provide labels: {\"test\"}", EnforcementAction:"deny", ConstraintAnnotations:map[string]string(nil), ResourceGroup:"", ResourceAPIVersion:"v1", ResourceKind:"Pod", ResourceNamespace:"nginx", ResourceName:"nginx-deployment-58899467f5-j85bs", ResourceLabels:map[string]string{"app":"nginx", "owner":"admin", "pod-template-hash":"58899467f5"}} ``` ### Violations -The audit pod publishes violations in following format: +The audit pod exports violations in following format: ```json { diff --git a/website/docs/pubsub-driver-walkthrough.md b/website/docs/pubsub-driver-walkthrough.md deleted file mode 100644 index 4d598946640..00000000000 --- a/website/docs/pubsub-driver-walkthrough.md +++ /dev/null @@ -1,61 +0,0 @@ ---- -id: pubsub-driver -title: Pubsub Interface/Driver walkthrough ---- - -This guide provides an overview of the pubsub interface, including details on its structure and functionality. Additionally, it offers instructions on adding a new driver and utilizing providers other than the default provider Dapr. - -## Pubsub interface and Driver walkthrough - -Pubsub's connection interface looks like -```go -// Connection is the interface that wraps pubsub methods. -type Connection interface { - // Publish single message over a specific topic/channel - Publish(ctx context.Context, message interface{}, topic string) error - - // Close connections - CloseConnection() error - - // Update an existing connection with new configuration - UpdateConnection(ctx context.Context, config interface{}) error -} -``` - -As an example, the Dapr driver implements these three methods to publish message, close connection, and update connection respectively. Please refer to [dapr.go](https://github.com/open-policy-agent/gatekeeper/blob/master/pkg/pubsub/dapr/dapr.go) to understand the logic that goes in each of these methods. Additionally, the Dapr driver also implements `func NewConnection(_ context.Context, config interface{}) (connection.Connection, error)` method that returns a new client for dapr. - -### How to add new drivers - -**Note:** For example, if we want to add a new driver to use `foo` instead of Dapr as a tool to publish violations. - -A driver must implement the `Connection` interface and a new `func NewConnection(_ context.Context, config interface{}) (connection.Connection, error)` method that returns a client for the respective tool. - -This newly added driver's `NewConnection` method must be used to create a new `pubSubs` object in [provider.go](https://github.com/open-policy-agent/gatekeeper/blob/master/pkg/pubsub/provider/provider.go). For example, - -```go -var pubSubs = newPubSubSet(map[string]InitiateConnection{ - dapr.Name: dapr.NewConnection, - "foo": foo.NewConnection, -}, -) -``` - -### How to use different providers - -To enable audit to use this driver to publish messages, a connection configMap with appropriate `config` and `provider` is needed. For example, - -```yaml -apiVersion: v1 -kind: ConfigMap -metadata: - name: audit - namespace: gatekeeper-system -data: - provider: "foo" - config: | - { - - } -``` - -> The `data.provider` field must exist and must match one of the keys of the `pubSubs` map that was defined earlier to use the corresponding driver. The `data.config` field in the configuration can vary depending on the driver being used. For dapr driver, `data.config` must be `{"component": "pubsub"}`. diff --git a/website/sidebars.js b/website/sidebars.js index f81ac8de716..5b558445f3c 100644 --- a/website/sidebars.js +++ b/website/sidebars.js @@ -34,7 +34,7 @@ module.exports = { 'expansion', 'gator', 'workload-resources', - 'pubsub', + 'export', 'validating-admission-policy', 'enforcement-points' ], @@ -66,7 +66,7 @@ module.exports = { 'developers', 'help', 'security', - 'pubsub-driver' + 'export-driver' ], } ]