Skip to content

Commit

Permalink
refactor agent setup/control loop from main function to enable reuse
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeljguarino committed Sep 23, 2023
1 parent 2278ebe commit c38988b
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 68 deletions.
2 changes: 1 addition & 1 deletion agent/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ help:
##@ Build

build:
go build -o bin/deployment-agent main.go
go build -o bin/deployment-agent cmd/main.go

docker-build:
docker build -t ${IMG} .
Expand Down
106 changes: 106 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package agent

import (
"fmt"
"time"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2/klogr"

"github.com/argoproj/gitops-engine/pkg/cache"
"github.com/argoproj/gitops-engine/pkg/engine"

"github.com/pluralsh/deployment-operator/agent/pkg/client"
"github.com/pluralsh/deployment-operator/agent/pkg/manifests"
deploysync "github.com/pluralsh/deployment-operator/agent/pkg/sync"
)

var (
log = klogr.New()
)

type Agent struct {
consoleClient *client.Client
engine *deploysync.Engine
deathChan chan interface{}
svcChan chan string
cleanup engine.StopFunc
refresh time.Duration
}

func New(clientConfig clientcmd.ClientConfig, refresh time.Duration, consoleUrl, deployToken string) (*Agent, error) {
config, err := clientConfig.ClientConfig()
if err != nil {
return nil, err
}
consoleClient := client.New(consoleUrl, deployToken)
svcCache := client.NewCache(consoleClient, refresh)
manifestCache := manifests.NewCache(refresh)

svcChan := make(chan string)
deathChan := make(chan interface{})

// we should enable SSA if kubernetes version supports it
clusterCache := cache.NewClusterCache(config,
cache.SetLogr(log),
cache.SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) {
svcId := un.GetAnnotations()[deploysync.SyncAnnotation]
sha := un.GetAnnotations()[deploysync.SyncShaAnnotation]
info = deploysync.NewResource(svcId, sha)
// cache resources that have the current annotation
cacheManifest = svcId != ""
return
}),
)

gitOpsEngine := engine.NewEngine(config, clusterCache, engine.WithLogr(log))
cleanup, err := gitOpsEngine.Run()
if err != nil {
return nil, err
}

engine := deploysync.New(gitOpsEngine, clusterCache, consoleClient, svcChan, svcCache, manifestCache)
engine.RegisterHandlers()
engine.AddHealthCheck(deathChan)

return &Agent{
consoleClient: consoleClient,
engine: engine,
deathChan: deathChan,
svcChan: svcChan,
cleanup: cleanup,
refresh: refresh,
}, nil
}

func (agent *Agent) Run() {
defer agent.cleanup()
go func() {
for {
go agent.engine.ControlLoop()
failure := <-agent.deathChan
fmt.Printf("recovered from panic %v\n", failure)
}
}()

for {
svcs, err := agent.consoleClient.GetServices()
if err != nil {
log.Error(err, "failed to fetch service list from deployments service")
time.Sleep(agent.refresh)
continue
}

for _, svc := range svcs {
agent.svcChan <- svc.ID
}

// TODO: fetch kubernetes version properly
if err := agent.consoleClient.Ping("1.24"); err != nil {
log.Error(err, "failed to ping cluster after scheduling syncs")
}

time.Sleep(agent.refresh)
}
}
71 changes: 4 additions & 67 deletions agent/main.go → agent/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,16 @@ package main

import (
"fmt"
"net/http"
"os"
"time"

"github.com/go-logr/logr"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2/klogr"

"github.com/argoproj/gitops-engine/pkg/cache"
"github.com/argoproj/gitops-engine/pkg/engine"

"github.com/pluralsh/deployment-operator/agent/pkg/client"
"github.com/pluralsh/deployment-operator/agent/pkg/manifests"
deploysync "github.com/pluralsh/deployment-operator/agent/pkg/sync"

"net/http"
"github.com/pluralsh/deployment-operator/agent"
)

func main() {
Expand Down Expand Up @@ -56,65 +49,9 @@ func newCmd(log logr.Logger) *cobra.Command {
refresh, err := time.ParseDuration(refreshInterval)
checkError(err, log)

config, err := clientConfig.ClientConfig()
checkError(err, log)
consoleClient := client.New(consoleUrl, deployToken)
svcCache := client.NewCache(consoleClient, refresh)
manifestCache := manifests.NewCache(refresh)

svcChan := make(chan string)
deathChan := make(chan interface{})

// we should enable SSA if kubernetes version supports it
clusterCache := cache.NewClusterCache(config,
cache.SetLogr(log),
cache.SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) {
svcId := un.GetAnnotations()[deploysync.SyncAnnotation]
sha := un.GetAnnotations()[deploysync.SyncShaAnnotation]
info = deploysync.NewResource(svcId, sha)
// cache resources that have the current annotation
cacheManifest = svcId != ""
return
}),
)

gitOpsEngine := engine.NewEngine(config, clusterCache, engine.WithLogr(log))
checkError(err, log)

cleanup, err := gitOpsEngine.Run()
a, err := agent.New(clientConfig, refresh, consoleUrl, deployToken)
checkError(err, log)
defer cleanup()

engine := deploysync.New(gitOpsEngine, clusterCache, consoleClient, svcChan, svcCache, manifestCache)
engine.RegisterHandlers()
engine.AddHealthCheck(deathChan)
go func() {
for {
go engine.ControlLoop()
failure := <-deathChan
fmt.Printf("recovered from panic %v\n", failure)
}
}()

for {
svcs, err := consoleClient.GetServices()
if err != nil {
log.Error(err, "failed to fetch service list from deployments service")
time.Sleep(refresh)
continue
}

for _, svc := range svcs {
svcChan <- svc.ID
}

// TODO: fetch kubernetes version properly
if err := consoleClient.Ping("1.24"); err != nil {
log.Error(err, "failed to ping cluster after scheduling syncs")
}

time.Sleep(refresh)
}
a.Run()
},
}
clientConfig = addKubectlFlagsToCmd(&cmd)
Expand Down

0 comments on commit c38988b

Please sign in to comment.