Skip to content

Commit

Permalink
Rename modules in cmd, add goreplay image configurations
Browse files Browse the repository at this point in the history
  • Loading branch information
lwolf committed Jul 22, 2018
1 parent 2c921ed commit 7f8ae64
Show file tree
Hide file tree
Showing 12 changed files with 127 additions and 36 deletions.
8 changes: 6 additions & 2 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ project_name: kubereplay

builds:
- binary: kubereplay-controller
main: ./cmd/controller-manager/main.go
main: ./cmd/controller/main.go
env:
- CGO_ENABLED=0
goos:
# - darwin
- linux
Expand All @@ -27,7 +29,9 @@ builds:
# - goos: darwin
# goarch: arm64
- binary: kubereplay-initializer
main: ./cmd/initializer-controller/main.go
main: ./cmd/initializer/main.go
env:
- CGO_ENABLED=0
goos:
# - darwin
- linux
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.initializer
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
FROM scratch
FROM alpine:3.7
COPY kubereplay-initializer /
ENTRYPOINT ["/kubereplay-initializer"]
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ Kubereplay aims to make integration of [Goreplay](https://github.com/buger/gorep

# Current status

This is an early alpha version. It is *not* meant to run in production.
This is an early alpha version. It is *not* meant to run in production yet.

# About

Kubereplay is Kubernetes add-on to automate capturing and redirection of traffic using [Goreplay](https://github.com/buger/goreplay).
It consist of 2 parts that need to run in the cluster - controller-manager and initializer-controller.
Kubereplay is a Kubernetes add-on to automate capturing and redirection of traffic using [Goreplay](https://github.com/buger/goreplay).
It consist of 2 parts that need to run in the cluster - controller and initializer.

## How it works:

Kubereplay creates and manage 2 CRDs: Harvesters and Refineries.
Kubereplay creates and manages 2 CRDs: Harvesters and Refineries.

Refinery - is responsible for managing dedicated GoReplay deployment used for receiving data from workloads (harvesters).
It listens to traffic on tcp socket and then sends it to configured output (stdout, elasticsearch, kafka, http).
Expand Down
File renamed without changes.
51 changes: 39 additions & 12 deletions cmd/initializer-controller/main.go → cmd/initializer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/heptiolabs/healthcheck"
"k8s.io/api/apps/v1beta1"
apiv1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -46,17 +47,25 @@ var (

var harvesterGVK = v1alpha1.SchemeGroupVersion.WithKind("Harvester")

func GenerateSidecar(refinerySvc string, port uint32) *corev1.Container {
return &corev1.Container{
Name: "goreplay",
Image: "buger/goreplay:latest",
Args: []string{
"-input-raw",
fmt.Sprintf(":%d", port),
"-output-tcp",
fmt.Sprintf("%s:28020", refinerySvc),
},
Resources: corev1.ResourceRequirements{
func GenerateSidecar(refinerySvc string, hs v1alpha1.HarvesterSpec) *corev1.Container {
var image string
var imagePullPolicy apiv1.PullPolicy
var resources corev1.ResourceRequirements

if hs.Goreplay != nil && hs.Goreplay.Image != "" {
image = hs.Goreplay.Image
} else {
image = "buger/goreplay:latest"
}
if hs.Goreplay != nil && hs.Goreplay.ImagePullPolicy != "" {
imagePullPolicy = hs.Goreplay.ImagePullPolicy
} else {
imagePullPolicy = apiv1.PullAlways
}
if hs.Resources != nil {
resources = *hs.Resources
} else {
resources = corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("10m"),
corev1.ResourceMemory: resource.MustParse("64Mi"),
Expand All @@ -65,7 +74,20 @@ func GenerateSidecar(refinerySvc string, port uint32) *corev1.Container {
corev1.ResourceCPU: resource.MustParse("10m"),
corev1.ResourceMemory: resource.MustParse("64Mi"),
},
}
}

return &corev1.Container{
Name: "goreplay",
Image: image,
ImagePullPolicy: imagePullPolicy,
Args: []string{
"-input-raw",
fmt.Sprintf(":%d", hs.AppPort),
"-output-tcp",
fmt.Sprintf("%s:28020", refinerySvc),
},
Resources: resources,
}
}

Expand Down Expand Up @@ -167,7 +189,7 @@ func initializeDeployment(deployment *v1beta1.Deployment, clientset *kubernetes.
sidecar := GenerateSidecar(
fmt.Sprintf("refinery-%s.%s", harvester.Spec.Refinery, harvester.Namespace),
// todo: remove port from harvester spec, get it directly from deployment
harvester.Spec.AppPort,
harvester.Spec,
)

_, err = clientset.AppsV1beta1().Deployments(deployment.Namespace).Update(initializedDeploymentGreen)
Expand All @@ -177,6 +199,11 @@ func initializeDeployment(deployment *v1beta1.Deployment, clientset *kubernetes.
}

// Modify the Deployment's Pod template to include the Gor container
if harvester.Spec.Goreplay != nil && harvester.Spec.Goreplay.ImagePullSecrets != nil {
for _, ips := range harvester.Spec.Goreplay.ImagePullSecrets {
initializedDeploymentBlue.Spec.Template.Spec.ImagePullSecrets = append(initializedDeploymentBlue.Spec.Template.Spec.ImagePullSecrets, ips)
}
}
initializedDeploymentBlue.Spec.Template.Spec.Containers = append(deployment.Spec.Template.Spec.Containers, *sidecar)
// Creating new deployment in a go routine, otherwise it will block and timeout
go createShadowDeployment(initializedDeploymentBlue, clientset)
Expand Down
2 changes: 1 addition & 1 deletion docs/refinery.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ spec:
# Used for internal communication between Gor instances. Example:
uri: "replay.kubernetes:28020" # -output-tcp value

stdout: # --output-stdout
stdout: # -output-stdout
enabled: true

http:
Expand Down
47 changes: 42 additions & 5 deletions helpers/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
kubereplayv1alpha1 "github.com/lwolf/kubereplay/pkg/apis/kubereplay/v1alpha1"
appsv1 "k8s.io/api/apps/v1beta2"
apiv1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)
Expand Down Expand Up @@ -123,11 +125,11 @@ func mergeArgs(newArgs []string, args []string) []string {
return args
}

func GenerateService(name string, spec *kubereplayv1alpha1.RefinerySpec) *apiv1.Service {
func GenerateService(name string, namespace string, spec *kubereplayv1alpha1.RefinerySpec) *apiv1.Service {
return &apiv1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("refinery-%s", name),
Namespace: "default",
Namespace: namespace,
Labels: AppLabels(name),
},
Spec: apiv1.ServiceSpec{
Expand Down Expand Up @@ -220,6 +222,11 @@ func GenerateDeployment(name string, r *kubereplayv1alpha1.Refinery) *appsv1.Dep
if &r.Spec == nil || r.Spec.Storage == nil {
return nil
}
var imagePullSecrets []corev1.LocalObjectReference
var image string
var imagePullPolicy apiv1.PullPolicy
var resources corev1.ResourceRequirements

args := argsFromSpec(&r.Spec)

ownerReferences := []metav1.OwnerReference{
Expand All @@ -230,6 +237,33 @@ func GenerateDeployment(name string, r *kubereplayv1alpha1.Refinery) *appsv1.Dep
APIVersion: kubereplayv1alpha1.SchemeGroupVersion.String(),
},
}
if r.Spec.Goreplay != nil && r.Spec.Goreplay.ImagePullSecrets != nil {
imagePullSecrets = r.Spec.Goreplay.ImagePullSecrets
}
if r.Spec.Goreplay != nil && r.Spec.Goreplay.Image != "" {
image = r.Spec.Goreplay.Image
} else {
image = "buger/goreplay:latest"
}
if r.Spec.Goreplay != nil && r.Spec.Goreplay.ImagePullPolicy != "" {
imagePullPolicy = r.Spec.Goreplay.ImagePullPolicy
} else {
imagePullPolicy = apiv1.PullAlways
}
if r.Spec.Resources != nil {
resources = *r.Spec.Resources
} else {
resources = corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("10m"),
corev1.ResourceMemory: resource.MustParse("64Mi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("10m"),
corev1.ResourceMemory: resource.MustParse("64Mi"),
},
}
}

deployment := &appsv1.Deployment{

Expand All @@ -248,18 +282,21 @@ func GenerateDeployment(name string, r *kubereplayv1alpha1.Refinery) *appsv1.Dep
Labels: AppLabels(name),
},
Spec: apiv1.PodSpec{
ImagePullSecrets: imagePullSecrets,
Containers: []apiv1.Container{
{
Name: "goreplay",
Image: "buger/goreplay:latest",
Args: *args,
Name: "goreplay",
Image: image,
ImagePullPolicy: imagePullPolicy,
Args: *args,
Ports: []apiv1.ContainerPort{
{
Name: "tcp",
Protocol: apiv1.ProtocolTCP,
ContainerPort: 28020,
},
},
Resources: resources,
},
},
},
Expand Down
11 changes: 7 additions & 4 deletions pkg/apis/kubereplay/v1alpha1/harvester_types.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package v1alpha1

import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// HarvesterSpec defines the desired state of Harvester
type HarvesterSpec struct {
Selector map[string]string `json:"selector,omitempty"`
AppPort uint32 `json:"app_port,omitempty"`
Refinery string `json:"refinery,omitempty"`
SegmentSize uint32 `json:"segment,omitempty"`
Selector map[string]string `json:"selector,omitempty"`
AppPort uint32 `json:"app_port,omitempty"`
Refinery string `json:"refinery,omitempty"`
SegmentSize uint32 `json:"segment,omitempty"`
Goreplay *GoreplayImage `json:"goreplay,omitempty"`
Resources *v1.ResourceRequirements `json:"resources,omitempty"`
}

// HarvesterStatus defines the observed state of Harvester
Expand Down
17 changes: 13 additions & 4 deletions pkg/apis/kubereplay/v1alpha1/refinery_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v1alpha1

import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -11,9 +12,17 @@ import (

// RefinerySpec defines the desired state of Refinery
type RefinerySpec struct {
Workers int32 `json:"workers,omitempчty"`
Timeout string `json:"timeout,omitempty"`
Storage *RefineryStorage `json:"output,omitempty"`
Workers int32 `json:"workers,omitempty"`
Timeout string `json:"timeout,omitempty"`
Storage *RefineryStorage `json:"output,omitempty"`
Goreplay *GoreplayImage `json:"goreplay,omitempty"`
Resources *v1.ResourceRequirements `json:"resources,omitempty"`
}

type GoreplayImage struct {
Image string `json:"image,omitempty"`
ImagePullPolicy v1.PullPolicy `json:"image_pull_policy,omitempty"`
ImagePullSecrets []v1.LocalObjectReference `json:"image_pull_secrets,omitempty"`
}

// RefineryStatus defines various storages available for Refinery
Expand All @@ -30,7 +39,7 @@ type FileSilo struct {
Enabled bool `json:"enabled,omitempty"`
Filename string `json:"filename,omitempty"`
Append bool `json:"append,omitempty"`
FlushInterval string `json:"flushinterval,omitempty"`
FlushInterval string `json:"flush_interval,omitempty"`
QueueSize int32 `json:"queuesize,omitempty"`
FileLimit string `json:"filelimit,omitempty"`
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/refinery/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (bc *RefineryController) Reconcile(k types.ReconcileKey) error {
}

sClient := bc.kubernetesclient.CoreV1().Services(r.Namespace)
service := helpers.GenerateService(r.Name, &r.Spec)
service := helpers.GenerateService(r.Name, r.Namespace, &r.Spec)
_, err = sClient.Get(service.Name, metav1.GetOptions{})
if errors.IsNotFound(err) {
_, err := sClient.Create(service)
Expand Down
6 changes: 6 additions & 0 deletions sample/harvester.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,9 @@ spec:
module: test
# Name of the Refinery to send traffic to
refinery: "refinery-example"

goreplay:
image: buger/goreplay:latest
image_pull_policy: Never
image_pull_secrets: []
resources: {}
9 changes: 7 additions & 2 deletions sample/refinery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ spec:
workers: 1 # -output-http-workers int
# Specify HTTP request/response timeout. By default 5s. Example: -output-http-timeout 30s (default 5s)
timeout: "5s" # -output-http-timeout duration
goreplay:
image: buger/goreplay:latest
image_pull_policy: IfNotPresent
image_pull_secrets: []
resources: {}
output:
file:
enabled: false
Expand All @@ -18,7 +23,7 @@ spec:
append: true # -output-file-append

# Interval for forcing buffer flush to the file, default: 1s. (default 1s)
flushInterval: "1s" # -output-file-flush-interval duration
flush_interval: "1s" # -output-file-flush-interval duration

# The length of the chunk queue. Default: 256 (default 256)
queueSize: 256 # -output-file-queue-limit int
Expand All @@ -43,7 +48,7 @@ spec:
debug: true # -output-http-debug

# HTTP response buffer size, all data after this size will be discarded.
responseBuffer: 1 # -output-http-response-buffer int
response_buffer: 1 # -output-http-response-buffer int

elasticsearch:
enabled: false
Expand Down

0 comments on commit 7f8ae64

Please sign in to comment.