From 61fd22d1d20e721910ee4b964854df2ffb37b0d6 Mon Sep 17 00:00:00 2001 From: Chris Doherty Date: Wed, 21 Jun 2023 21:04:41 -0500 Subject: [PATCH] Add file based agent transport Signed-off-by: Chris Doherty --- go.mod | 3 - go.sum | 4 - internal/agent/transport/file.go | 109 ++++++++++++++++++ internal/agent/transport/file_test.go | 97 ++++++++++++++++ internal/agent/transport/grpc_test.go | 5 - .../transport/testdata/workflows/workflow.yml | 23 ++++ internal/agent/workflow/workflow.go | 20 ++-- 7 files changed, 239 insertions(+), 22 deletions(-) create mode 100644 internal/agent/transport/file.go create mode 100644 internal/agent/transport/file_test.go create mode 100644 internal/agent/transport/testdata/workflows/workflow.yml diff --git a/go.mod b/go.mod index 7c6631dda..c384fccfa 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,6 @@ require ( github.com/go-logr/zerologr v1.2.3 github.com/google/go-cmp v0.5.9 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 - github.com/kr/pretty v0.3.1 github.com/onsi/ginkgo/v2 v2.11.0 github.com/onsi/gomega v1.27.8 github.com/opencontainers/image-spec v1.1.0-rc.3 @@ -79,7 +78,6 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/kr/text v0.2.0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.12 // indirect @@ -96,7 +94,6 @@ require ( github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect - github.com/rogpeppe/go-internal v1.10.0 // indirect github.com/spf13/afero v1.9.5 // indirect github.com/spf13/cast v1.5.1 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect diff --git a/go.sum b/go.sum index 469f3d586..b7655df4c 100644 --- a/go.sum +++ b/go.sum @@ -600,7 +600,6 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= -github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -693,7 +692,6 @@ github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNc github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= -github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -742,9 +740,7 @@ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqn github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= -github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= -github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.29.1 h1:cO+d60CHkknCbvzEWxP0S9K6KqyTjrCNUy1LdQLCGPc= github.com/rs/zerolog v1.29.1/go.mod h1:Le6ESbR7hc+DP6Lt1THiV8CQSdkkNrd3R0XbEgp3ZBU= diff --git a/internal/agent/transport/file.go b/internal/agent/transport/file.go new file mode 100644 index 000000000..aa8964930 --- /dev/null +++ b/internal/agent/transport/file.go @@ -0,0 +1,109 @@ +package transport + +import ( + "context" + "os" + "path/filepath" + "strings" + "time" + + "github.com/go-logr/logr" + "github.com/tinkerbell/tink/internal/agent/event" + "github.com/tinkerbell/tink/internal/agent/workflow" + "gopkg.in/yaml.v2" +) + +// File is a transport implementation that monitors a directory for workflow files. When it finds +// files it has yet to process, it will parse them and offload them to its handler. It is intended +// for developmental use only. +type File struct { + // Log is a logger for debugging. + Log logr.Logger + + // Dir is the directory to be monitored. + Dir string + + // Tick defines the duration to wait before inspecting the directory for new workflow files. + // It defaulst to 5 seconds. + Tick time.Duration + + // cache is used to track handled workflows. + cache map[string]struct{} +} + +// Start begins watching f.Dir for files. When it finds a file it hasn't handled before, it +// attempts to parse it and offload to the handler. It will run workflows once where a workflow +// is determined by its file name. +func (f *File) Start(ctx context.Context, _ string, handler WorkflowHandler) error { + if f.Tick == 0 { + f.Tick = 5 * time.Second + } + + f.cache = map[string]struct{}{} + + ticker := time.NewTicker(f.Tick) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return nil + + case <-ticker.C: + entries, err := os.ReadDir(f.Dir) + if err != nil { + return err + } + + // When we experience an error with a particular file we don't want to return an + // error because we want to process as much as possible. Instead, we ignore the problem + // and hope the user fixes the issue in a future tick. + for _, e := range entries { + if e.IsDir() || !hasYAMLExt(e.Name()) { + continue + } + + // Ensure we haven't handled this file before. + if _, handled := f.cache[e.Name()]; handled { + continue + } + + f.Log.Info("Found new workflow file", "file", e.Name()) + + fh, err := os.Open(filepath.Join(f.Dir, e.Name())) + if err != nil { + f.Log.Error(err, "Could not open file", "file", e.Name()) + continue + } + + var wrkflow workflow.Workflow + if err := yaml.NewDecoder(fh).Decode(&wrkflow); err != nil { + f.Log.Error(err, "Invalid workflow YAML", "file", e.Name()) + continue + } + + // Add the file to the cache so we don't reprocess it on the next iteration. + f.cache[e.Name()] = struct{}{} + + handler.HandleWorkflow(ctx, wrkflow, f) + } + } + } +} + +func hasYAMLExt(path string) bool { + ext := strings.Trim(filepath.Ext(path), ".") + for _, v := range []string{"yml", "yaml"} { + if v == ext { + return true + } + } + return false +} + +func (f *File) RecordEvent(_ context.Context, e event.Event) error { + // Noop because we don't particularly care about events for File based transports. Maybe + // we'll record this in a dedicated file one day. + f.Log.Info("Recording event", "event", e.GetName()) + return nil +} diff --git a/internal/agent/transport/file_test.go b/internal/agent/transport/file_test.go new file mode 100644 index 000000000..87bb9138f --- /dev/null +++ b/internal/agent/transport/file_test.go @@ -0,0 +1,97 @@ +package transport_test + +import ( + "context" + "testing" + "time" + + "github.com/go-logr/zerologr" + "github.com/google/go-cmp/cmp" + "github.com/rs/zerolog" + "github.com/tinkerbell/tink/internal/agent/event" + "github.com/tinkerbell/tink/internal/agent/transport" + "github.com/tinkerbell/tink/internal/agent/workflow" +) + +func TestFile(t *testing.T) { + logger := zerolog.New(zerolog.NewConsoleWriter()) + + expect := expectWorkflows{ + Workflows: []workflow.Workflow{ + { + ID: "test-workflow-id", + Actions: []workflow.Action{ + { + ID: "test-action-1", + Name: "my test action", + Image: "docker.io/hub/alpine", + Cmd: "sh -c", + Args: []string{"echo", "action 1"}, + Env: map[string]string{"foo": "bar"}, + Volumes: []string{"mount:/foo/bar:ro"}, + NetworkNamespace: "custom-namespace", + }, + { + ID: "test-action-2", + Name: "my test action", + Image: "docker.io/hub/alpine", + Cmd: "sh -c", + Args: []string{"echo", "action 2"}, + Env: map[string]string{"foo": "bar"}, + Volumes: []string{"mount:/foo/bar:ro"}, + NetworkNamespace: "custom-namespace", + }, + }, + }, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + handler := &transport.WorkflowHandlerMock{ + HandleWorkflowFunc: func(contextMoqParam context.Context, workflow workflow.Workflow, recorder event.Recorder) { + if expect.End() { + t.Fatalf("Received unexpected workflow:\nid=%v", workflow.ID) + } + + next := expect.Next() + if !cmp.Equal(*next, workflow) { + t.Fatalf("Workflow diff:\n%v", cmp.Diff(next, workflow)) + } + }, + } + + f := transport.File{ + Log: zerologr.New(&logger), + Dir: "./testdata/workflows", + Tick: 1 * time.Second, + } + + err := f.Start(ctx, "agent_id", handler) + if err != nil { + t.Fatal(err) + } +} + +type expectWorkflows struct { + Workflows []workflow.Workflow + idx int +} + +func (e *expectWorkflows) Next() *workflow.Workflow { + if len(e.Workflows) == 0 { + return nil + } + + if len(e.Workflows) < e.idx { + return nil + } + + defer func() { e.idx++ }() + return &e.Workflows[e.idx] +} + +func (e *expectWorkflows) End() bool { + return len(e.Workflows) < e.idx +} diff --git a/internal/agent/transport/grpc_test.go b/internal/agent/transport/grpc_test.go index 42200ee45..80660a6cb 100644 --- a/internal/agent/transport/grpc_test.go +++ b/internal/agent/transport/grpc_test.go @@ -2,13 +2,11 @@ package transport_test import ( "context" - "fmt" "io" "sync" "testing" "github.com/go-logr/zerologr" - "github.com/kr/pretty" "github.com/rs/zerolog" "github.com/tinkerbell/tink/internal/agent/event" "github.com/tinkerbell/tink/internal/agent/transport" @@ -58,7 +56,6 @@ func TestGRPC(t *testing.T) { handler := &transport.WorkflowHandlerMock{ HandleWorkflowFunc: func(contextMoqParam context.Context, workflow workflow.Workflow, recorder event.Recorder) { defer wg.Done() - fmt.Println("handling") close(responses) }, } @@ -71,6 +68,4 @@ func TestGRPC(t *testing.T) { } wg.Wait() - - pretty.Println(handler.HandleWorkflowCalls()) } diff --git a/internal/agent/transport/testdata/workflows/workflow.yml b/internal/agent/transport/testdata/workflows/workflow.yml new file mode 100644 index 000000000..2099ba986 --- /dev/null +++ b/internal/agent/transport/testdata/workflows/workflow.yml @@ -0,0 +1,23 @@ +id: "test-workflow-id" +actions: +- id: "test-action-1" + name: "my test action" + image: "docker.io/hub/alpine" + cmd: "sh -c" + args: ["echo", "action 1"] + env: + foo: bar + volumes: + - mount:/foo/bar:ro + networkNamespace: "custom-namespace" +- id: "test-action-2" + name: "my test action" + image: "docker.io/hub/alpine" + cmd: "sh -c" + args: ["echo", "action 2"] + env: + foo: bar + volumes: + - mount:/foo/bar:ro + networkNamespace: "custom-namespace" + diff --git a/internal/agent/workflow/workflow.go b/internal/agent/workflow/workflow.go index 3082962a0..38408084e 100644 --- a/internal/agent/workflow/workflow.go +++ b/internal/agent/workflow/workflow.go @@ -5,8 +5,8 @@ package workflow // Workflow represents a runnable workflow for the Handler. type Workflow struct { // Do we need a workflow name? Does that even come down in the proto definition? - ID string - Actions []Action + ID string `yaml:"id"` + Actions []Action `yaml:"actions"` } func (w Workflow) String() string { @@ -15,14 +15,14 @@ func (w Workflow) String() string { // Action represents an individually runnable action. type Action struct { - ID string - Name string - Image string - Cmd string - Args []string - Env map[string]string - Volumes []string - NetworkNamespace string + ID string `yaml:"id"` + Name string `yaml:"name"` + Image string `yaml:"image"` + Cmd string `yaml:"cmd"` + Args []string `yaml:"args"` + Env map[string]string `yaml:"env"` + Volumes []string `yaml:"volumes"` + NetworkNamespace string `yaml:"networkNamespace"` } func (a Action) String() string {