Skip to content

Commit

Permalink
Added traffic control capabilities.
Browse files Browse the repository at this point in the history
  • Loading branch information
brett19 committed Nov 27, 2023
1 parent 1958d45 commit 076b5b3
Show file tree
Hide file tree
Showing 10 changed files with 312 additions and 0 deletions.
29 changes: 29 additions & 0 deletions cmd/chaos-allowtraffic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package cmd

import (
"github.com/spf13/cobra"
"go.uber.org/zap"
)

var chaosAllowTrafficCmd = &cobra.Command{
Use: "allow-traffic",
Short: "Allows inter-node traffic to a specific node",
Args: cobra.MinimumNArgs(2),
Run: func(cmd *cobra.Command, args []string) {
helper := CmdHelper{}
logger := helper.GetLogger()
ctx := helper.GetContext()

_, deployer, cluster := helper.IdentifyCluster(ctx, args[0])
node := helper.IdentifyNode(ctx, cluster, args[1])

err := deployer.AllowNodeTraffic(ctx, cluster.GetID(), node.GetID())
if err != nil {
logger.Fatal("failed to allow node traffic", zap.Error(err))
}
},
}

func init() {
chaosCmd.AddCommand(chaosAllowTrafficCmd)
}
29 changes: 29 additions & 0 deletions cmd/chaos-blocktraffic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package cmd

import (
"github.com/spf13/cobra"
"go.uber.org/zap"
)

var chaosBlockTrafficCmd = &cobra.Command{
Use: "block-traffic",
Short: "Blocks inter-node traffic to a specific node",
Args: cobra.MinimumNArgs(2),
Run: func(cmd *cobra.Command, args []string) {
helper := CmdHelper{}
logger := helper.GetLogger()
ctx := helper.GetContext()

_, deployer, cluster := helper.IdentifyCluster(ctx, args[0])
node := helper.IdentifyNode(ctx, cluster, args[1])

err := deployer.BlockNodeTraffic(ctx, cluster.GetID(), node.GetID())
if err != nil {
logger.Fatal("failed to block node traffic", zap.Error(err))
}
},
}

func init() {
chaosCmd.AddCommand(chaosBlockTrafficCmd)
}
15 changes: 15 additions & 0 deletions cmd/chaos.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package cmd

import (
"github.com/spf13/cobra"
)

var chaosCmd = &cobra.Command{
Use: "chaos",
Short: "Provides chaos tools for the system",
Run: nil,
}

func init() {
rootCmd.AddCommand(chaosCmd)
}
52 changes: 52 additions & 0 deletions cmd/cmdhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,58 @@ func (h *CmdHelper) IdentifyCluster(ctx context.Context, userInput string) (stri
return "", nil, nil
}

func (h *CmdHelper) IdentifyNode(
ctx context.Context,
cluster deployment.ClusterInfo,
userInput string,
) deployment.ClusterNodeInfo {
logger := h.GetLogger()
logger.Info("attempting to identify node",
zap.String("clusterId", cluster.GetID()),
zap.String("input", userInput))

nodes := cluster.GetNodes()

// check if we have an id exact match
for _, node := range nodes {
if node.GetID() == userInput {
return node
}
}

// check if we have an resource id exact match
for _, node := range nodes {
if node.GetResourceID() == userInput {
return node
}
}

// check if we have an IP exact match
for _, node := range nodes {
if node.GetIPAddress() == userInput {
return node
}
}

// check if we have an id partial match
for _, node := range nodes {
if strings.HasPrefix(node.GetID(), userInput) {
return node
}
}

// check if we have a resource id partial match
for _, node := range nodes {
if strings.HasPrefix(node.GetResourceID(), userInput) {
return node
}
}

logger.Fatal("failed to identify node using specified identifier",
zap.String("identifier", userInput))
return nil
}

func (h *CmdHelper) OutputJson(value interface{}) {
out, _ := json.Marshal(value)
fmt.Printf("%s\n", out)
Expand Down
8 changes: 8 additions & 0 deletions deployment/clouddeploy/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1111,3 +1111,11 @@ func (d *Deployer) DeleteScope(ctx context.Context, clusterID string, bucketName
func (d *Deployer) DeleteCollection(ctx context.Context, clusterID string, bucketName, scopeName, collectionName string) error {
return errors.New("clouddeploy does not support deleting collections")
}

func (d *Deployer) BlockNodeTraffic(ctx context.Context, clusterID string, nodeID string) error {
return errors.New("clouddeploy does not support traffic control")
}

func (d *Deployer) AllowNodeTraffic(ctx context.Context, clusterID string, nodeID string) error {
return errors.New("clouddeploy does not support traffic control")
}
2 changes: 2 additions & 0 deletions deployment/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,6 @@ type Deployer interface {
CreateCollection(ctx context.Context, clusterID string, bucketName, scopeName, collectionName string) error
DeleteScope(ctx context.Context, clusterID string, bucketName, scopeName string) error
DeleteCollection(ctx context.Context, clusterID string, bucketName, scopeName, collectionName string) error
BlockNodeTraffic(ctx context.Context, clusterID string, nodeID string) error
AllowNodeTraffic(ctx context.Context, clusterID string, nodeID string) error
}
87 changes: 87 additions & 0 deletions deployment/dockerdeploy/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,3 +310,90 @@ func (c *Controller) RemoveNode(ctx context.Context, containerID string) error {

return nil
}

func (c *Controller) execCmd(ctx context.Context, containerID string, cmd []string) error {
c.Logger.Debug("executing cmd",
zap.String("containerID", containerID),
zap.Strings("cmd", cmd))

return dockerExecAndPipe(ctx, c.Logger, c.DockerCli, containerID, cmd)
}

func (c *Controller) execIptables(ctx context.Context, containerID string, args []string) error {
err := c.execCmd(ctx, containerID, append([]string{"iptables"}, args...))
if err != nil {
// if the iptables command fails initially, we attempt to install iptables first
c.Logger.Debug("failed to execute iptables, attempting to install")

err := c.execCmd(ctx, containerID, []string{"apt-get", "update"})
if err != nil {
return errors.Wrap(err, "failed to update apt")
}

err = c.execCmd(ctx, containerID, []string{"apt-get", "-y", "install", "iptables"})
if err != nil {
return errors.Wrap(err, "failed to install iptables")
}

// try it again after installing iptables
err = c.execCmd(ctx, containerID, append([]string{"iptables"}, args...))
if err != nil {
return errors.Wrap(err, "failed to execute iptables command")
}
}

return nil
}

func (c *Controller) SetTrafficControl(ctx context.Context, containerID string, blocked bool) error {
logger := c.Logger.With(zap.String("container", containerID))
logger.Debug("setting up traffic control", zap.Bool("blocked", blocked))

netInfo, err := c.DockerCli.NetworkInspect(ctx, c.NetworkName, types.NetworkInspectOptions{})
if err != nil {
return errors.Wrap(err, "failed to inspect network")
}

if len(netInfo.IPAM.Config) < 1 {
return errors.New("more than one ipam config, cannot identify node subnet")
}
ipamConfig := netInfo.IPAM.Config[0]

gatewayIP := ipamConfig.Gateway
ipRange := ipamConfig.Subnet
if ipamConfig.IPRange != "" {
ipRange = ipamConfig.IPRange
}

if ipRange == "" || gatewayIP == "" {
return errors.New("failed to identify subnet or gateway ip")
}

err = c.execIptables(ctx, containerID, []string{"-F"})
if err != nil {
return errors.Wrap(err, "failed to clear iptables")
}

if blocked {
// reject from the rest of that subnet
err = c.execIptables(ctx, containerID, []string{"-I", "INPUT", "-s", ipRange, "-j", "DROP"})
if err != nil {
return errors.Wrap(err, "failed to create iptables rule")
}

// always accept from the gateway
err = c.execIptables(ctx, containerID, []string{"-I", "INPUT", "-s", gatewayIP, "-j", "ACCEPT"})
if err != nil {
return errors.Wrap(err, "failed to create iptables rule")
}
}

err = c.execIptables(ctx, containerID, []string{"-S"})
if err != nil {
c.Logger.Debug("failed to print iptables state", zap.Error(err))
}

logger.Debug("traffic control has been set up!")

return nil
}
47 changes: 47 additions & 0 deletions deployment/dockerdeploy/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1033,3 +1033,50 @@ func (d *Deployer) DeleteCollection(ctx context.Context, clusterID string, bucke

return nil
}

func (d *Deployer) getNode(ctx context.Context, clusterID, nodeID string) (*NodeInfo, error) {
nodes, err := d.controller.ListNodes(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to list nodes")
}

var foundNode *NodeInfo
for _, node := range nodes {
if node.ClusterID == clusterID && node.NodeID == nodeID {
foundNode = node
}
}
if foundNode == nil {
return nil, fmt.Errorf("failed to find node with id `%s`", nodeID)
}

return foundNode, nil
}

func (d *Deployer) BlockNodeTraffic(ctx context.Context, clusterID string, nodeID string) error {
node, err := d.getNode(ctx, clusterID, nodeID)
if err != nil {
return errors.Wrap(err, "failed to get node")
}

err = d.controller.SetTrafficControl(ctx, node.ContainerID, true)
if err != nil {
return errors.Wrap(err, "failed to block traffic")
}

return nil
}

func (d *Deployer) AllowNodeTraffic(ctx context.Context, clusterID string, nodeID string) error {
node, err := d.getNode(ctx, clusterID, nodeID)
if err != nil {
return errors.Wrap(err, "failed to get node")
}

err = d.controller.SetTrafficControl(ctx, node.ContainerID, false)
if err != nil {
return errors.Wrap(err, "failed to allow traffic")
}

return nil
}
35 changes: 35 additions & 0 deletions deployment/dockerdeploy/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"

"github.com/docker/docker/api/types"
Expand Down Expand Up @@ -78,3 +79,37 @@ func dockerPullAndPipe(ctx context.Context, logger *zap.Logger, cli *client.Clie

return nil
}

func dockerExecAndPipe(ctx context.Context, logger *zap.Logger, cli *client.Client, containerID string, cmd []string) error {
execID, err := cli.ContainerExecCreate(ctx, containerID, types.ExecConfig{
AttachStdout: true,
AttachStderr: true,
Cmd: cmd,
})
if err != nil {
return errors.Wrap(err, "failed to create exec")
}

resp, err := cli.ContainerExecAttach(ctx, execID.ID, types.ExecStartCheck{})
if err != nil {
return errors.Wrap(err, "failed to start exec")
}

scanner := bufio.NewScanner(resp.Reader)
for scanner.Scan() {
line := scanner.Text()

logger.Debug("docker exec output", zap.String("text", line))
}

res, err := cli.ContainerExecInspect(ctx, execID.ID)
if err != nil {
return errors.Wrap(err, "failed to inspect exec")
}

if res.ExitCode != 0 {
return fmt.Errorf("failed to execute process (exit code: %d)", res.ExitCode)
}

return nil
}
8 changes: 8 additions & 0 deletions deployment/localdeploy/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,11 @@ func (d *Deployer) DeleteScope(ctx context.Context, clusterID string, bucketName
func (d *Deployer) DeleteCollection(ctx context.Context, clusterID string, bucketName, scopeName, collectionName string) error {
return errors.New("localdeploy does not support deleting collections")
}

func (d *Deployer) BlockNodeTraffic(ctx context.Context, clusterID string, nodeID string) error {
return errors.New("localdeploy does not support traffic control")
}

func (d *Deployer) AllowNodeTraffic(ctx context.Context, clusterID string, nodeID string) error {
return errors.New("localdeploy does not support traffic control")
}

0 comments on commit 076b5b3

Please sign in to comment.