Skip to content

Commit

Permalink
Add file based agent transport
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Doherty <[email protected]>
  • Loading branch information
chrisdoherty4 committed Jun 29, 2023
1 parent 2946cc3 commit 61fd22d
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 22 deletions.
3 changes: 0 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ require (
github.com/go-logr/zerologr v1.2.3
github.com/google/go-cmp v0.5.9
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/kr/pretty v0.3.1
github.com/onsi/ginkgo/v2 v2.11.0
github.com/onsi/gomega v1.27.8
github.com/opencontainers/image-spec v1.1.0-rc.3
Expand Down Expand Up @@ -79,7 +78,6 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
Expand All @@ -96,7 +94,6 @@ require (
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/spf13/afero v1.9.5 // indirect
github.com/spf13/cast v1.5.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,6 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
Expand Down Expand Up @@ -693,7 +692,6 @@ github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNc
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -742,9 +740,7 @@ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqn
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.29.1 h1:cO+d60CHkknCbvzEWxP0S9K6KqyTjrCNUy1LdQLCGPc=
github.com/rs/zerolog v1.29.1/go.mod h1:Le6ESbR7hc+DP6Lt1THiV8CQSdkkNrd3R0XbEgp3ZBU=
Expand Down
109 changes: 109 additions & 0 deletions internal/agent/transport/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package transport

import (
"context"
"os"
"path/filepath"
"strings"
"time"

"github.com/go-logr/logr"
"github.com/tinkerbell/tink/internal/agent/event"
"github.com/tinkerbell/tink/internal/agent/workflow"
"gopkg.in/yaml.v2"
)

// File is a transport implementation that monitors a directory for workflow files. When it finds
// files it has yet to process, it will parse them and offload them to its handler. It is intended
// for developmental use only.
type File struct {
// Log is a logger for debugging.
Log logr.Logger

// Dir is the directory to be monitored.
Dir string

// Tick defines the duration to wait before inspecting the directory for new workflow files.
// It defaulst to 5 seconds.
Tick time.Duration

// cache is used to track handled workflows.
cache map[string]struct{}
}

// Start begins watching f.Dir for files. When it finds a file it hasn't handled before, it
// attempts to parse it and offload to the handler. It will run workflows once where a workflow
// is determined by its file name.
func (f *File) Start(ctx context.Context, _ string, handler WorkflowHandler) error {
if f.Tick == 0 {
f.Tick = 5 * time.Second
}

f.cache = map[string]struct{}{}

ticker := time.NewTicker(f.Tick)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return nil

case <-ticker.C:
entries, err := os.ReadDir(f.Dir)
if err != nil {
return err
}

// When we experience an error with a particular file we don't want to return an
// error because we want to process as much as possible. Instead, we ignore the problem
// and hope the user fixes the issue in a future tick.
for _, e := range entries {
if e.IsDir() || !hasYAMLExt(e.Name()) {
continue
}

// Ensure we haven't handled this file before.
if _, handled := f.cache[e.Name()]; handled {
continue
}

f.Log.Info("Found new workflow file", "file", e.Name())

fh, err := os.Open(filepath.Join(f.Dir, e.Name()))
if err != nil {
f.Log.Error(err, "Could not open file", "file", e.Name())
continue
}

var wrkflow workflow.Workflow
if err := yaml.NewDecoder(fh).Decode(&wrkflow); err != nil {
f.Log.Error(err, "Invalid workflow YAML", "file", e.Name())
continue
}

// Add the file to the cache so we don't reprocess it on the next iteration.
f.cache[e.Name()] = struct{}{}

handler.HandleWorkflow(ctx, wrkflow, f)
}
}
}
}

func hasYAMLExt(path string) bool {
ext := strings.Trim(filepath.Ext(path), ".")
for _, v := range []string{"yml", "yaml"} {
if v == ext {
return true
}
}
return false
}

func (f *File) RecordEvent(_ context.Context, e event.Event) error {
// Noop because we don't particularly care about events for File based transports. Maybe
// we'll record this in a dedicated file one day.
f.Log.Info("Recording event", "event", e.GetName())
return nil
}
97 changes: 97 additions & 0 deletions internal/agent/transport/file_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package transport_test

import (
"context"
"testing"
"time"

"github.com/go-logr/zerologr"
"github.com/google/go-cmp/cmp"
"github.com/rs/zerolog"
"github.com/tinkerbell/tink/internal/agent/event"
"github.com/tinkerbell/tink/internal/agent/transport"
"github.com/tinkerbell/tink/internal/agent/workflow"
)

func TestFile(t *testing.T) {
logger := zerolog.New(zerolog.NewConsoleWriter())

expect := expectWorkflows{
Workflows: []workflow.Workflow{
{
ID: "test-workflow-id",
Actions: []workflow.Action{
{
ID: "test-action-1",
Name: "my test action",
Image: "docker.io/hub/alpine",
Cmd: "sh -c",
Args: []string{"echo", "action 1"},
Env: map[string]string{"foo": "bar"},
Volumes: []string{"mount:/foo/bar:ro"},
NetworkNamespace: "custom-namespace",
},
{
ID: "test-action-2",
Name: "my test action",
Image: "docker.io/hub/alpine",
Cmd: "sh -c",
Args: []string{"echo", "action 2"},
Env: map[string]string{"foo": "bar"},
Volumes: []string{"mount:/foo/bar:ro"},
NetworkNamespace: "custom-namespace",
},
},
},
},
}

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

handler := &transport.WorkflowHandlerMock{
HandleWorkflowFunc: func(contextMoqParam context.Context, workflow workflow.Workflow, recorder event.Recorder) {
if expect.End() {
t.Fatalf("Received unexpected workflow:\nid=%v", workflow.ID)
}

next := expect.Next()
if !cmp.Equal(*next, workflow) {
t.Fatalf("Workflow diff:\n%v", cmp.Diff(next, workflow))
}
},
}

f := transport.File{
Log: zerologr.New(&logger),
Dir: "./testdata/workflows",
Tick: 1 * time.Second,
}

err := f.Start(ctx, "agent_id", handler)
if err != nil {
t.Fatal(err)
}
}

type expectWorkflows struct {
Workflows []workflow.Workflow
idx int
}

func (e *expectWorkflows) Next() *workflow.Workflow {
if len(e.Workflows) == 0 {
return nil
}

if len(e.Workflows) < e.idx {
return nil
}

defer func() { e.idx++ }()
return &e.Workflows[e.idx]
}

func (e *expectWorkflows) End() bool {
return len(e.Workflows) < e.idx
}
5 changes: 0 additions & 5 deletions internal/agent/transport/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ package transport_test

import (
"context"
"fmt"
"io"
"sync"
"testing"

"github.com/go-logr/zerologr"
"github.com/kr/pretty"
"github.com/rs/zerolog"
"github.com/tinkerbell/tink/internal/agent/event"
"github.com/tinkerbell/tink/internal/agent/transport"
Expand Down Expand Up @@ -58,7 +56,6 @@ func TestGRPC(t *testing.T) {
handler := &transport.WorkflowHandlerMock{
HandleWorkflowFunc: func(contextMoqParam context.Context, workflow workflow.Workflow, recorder event.Recorder) {
defer wg.Done()
fmt.Println("handling")
close(responses)
},
}
Expand All @@ -71,6 +68,4 @@ func TestGRPC(t *testing.T) {
}

wg.Wait()

pretty.Println(handler.HandleWorkflowCalls())
}
23 changes: 23 additions & 0 deletions internal/agent/transport/testdata/workflows/workflow.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
id: "test-workflow-id"
actions:
- id: "test-action-1"
name: "my test action"
image: "docker.io/hub/alpine"
cmd: "sh -c"
args: ["echo", "action 1"]
env:
foo: bar
volumes:
- mount:/foo/bar:ro
networkNamespace: "custom-namespace"
- id: "test-action-2"
name: "my test action"
image: "docker.io/hub/alpine"
cmd: "sh -c"
args: ["echo", "action 2"]
env:
foo: bar
volumes:
- mount:/foo/bar:ro
networkNamespace: "custom-namespace"

20 changes: 10 additions & 10 deletions internal/agent/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ package workflow
// Workflow represents a runnable workflow for the Handler.
type Workflow struct {
// Do we need a workflow name? Does that even come down in the proto definition?
ID string
Actions []Action
ID string `yaml:"id"`
Actions []Action `yaml:"actions"`
}

func (w Workflow) String() string {
Expand All @@ -15,14 +15,14 @@ func (w Workflow) String() string {

// Action represents an individually runnable action.
type Action struct {
ID string
Name string
Image string
Cmd string
Args []string
Env map[string]string
Volumes []string
NetworkNamespace string
ID string `yaml:"id"`
Name string `yaml:"name"`
Image string `yaml:"image"`
Cmd string `yaml:"cmd"`
Args []string `yaml:"args"`
Env map[string]string `yaml:"env"`
Volumes []string `yaml:"volumes"`
NetworkNamespace string `yaml:"networkNamespace"`
}

func (a Action) String() string {
Expand Down

0 comments on commit 61fd22d

Please sign in to comment.