Skip to content

Commit

Permalink
add multi concurrency support
Browse files Browse the repository at this point in the history
  • Loading branch information
zreigz committed Oct 6, 2023
1 parent 6d937d7 commit 2f9b5e0
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 65 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ replace (

require (
github.com/Masterminds/sprig/v3 v3.2.3
github.com/alitto/pond v1.8.3
github.com/argoproj/gitops-engine v0.7.1-0.20230906152414-b0fffe419a0f
github.com/go-logr/logr v1.2.4
github.com/orcaman/concurrent-map/v2 v2.0.1
Expand Down
8 changes: 2 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/alitto/pond v1.8.3 h1:ydIqygCLVPqIX/USe5EaV/aSRXTRXDEI9JwuDdu+/xs=
github.com/alitto/pond v1.8.3/go.mod h1:CmvIIGd5jKLasGI3D87qDkQxjzChdKMmnXMg3fG6M6Q=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
Expand Down Expand Up @@ -541,12 +543,6 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI=
github.com/pluralsh/console-client-go v0.0.5 h1:+L7I3QLMWNBiuZlWe/YJfUMMZGnpKhEAlbs9sL+hiSc=
github.com/pluralsh/console-client-go v0.0.5/go.mod h1:kZjk0pXAWnvyj+miXveCho4kKQaX1Tm3CGAM+iwurWU=
github.com/pluralsh/console-client-go v0.0.8 h1:BwWOt1ggBX/fxzY2+01dk8sBTz1jqT57o2y1Iz9Zxzk=
github.com/pluralsh/console-client-go v0.0.8/go.mod h1:kZjk0pXAWnvyj+miXveCho4kKQaX1Tm3CGAM+iwurWU=
github.com/pluralsh/console-client-go v0.0.11 h1:2fchZE6qlSQmHTeuH54hAzJJpgKpx2Kbl8HhJNugbns=
github.com/pluralsh/console-client-go v0.0.11/go.mod h1:kZjk0pXAWnvyj+miXveCho4kKQaX1Tm3CGAM+iwurWU=
github.com/pluralsh/console-client-go v0.0.14 h1:vpvC6SR7A0MIrpeyR78hM6IreOLKgg+moRIEjyUnKZo=
github.com/pluralsh/console-client-go v0.0.14/go.mod h1:kZjk0pXAWnvyj+miXveCho4kKQaX1Tm3CGAM+iwurWU=
github.com/pluralsh/polly v0.1.4 h1:Kz90peCgvsfF3ERt8cujr5TR9z4wUlqQE60Eg09ZItY=
Expand Down
39 changes: 18 additions & 21 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"fmt"
"time"

"github.com/alitto/pond"
"github.com/argoproj/gitops-engine/pkg/cache"
"github.com/argoproj/gitops-engine/pkg/engine"
"github.com/pluralsh/deployment-operator/pkg/client"
"github.com/pluralsh/deployment-operator/pkg/manifests"
deploysync "github.com/pluralsh/deployment-operator/pkg/sync"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
Expand All @@ -26,7 +26,6 @@ type Agent struct {
discoveryClient *discovery.DiscoveryClient
engine *deploysync.Engine
deathChan chan interface{}
svcQueue workqueue.RateLimitingInterface
cleanup engine.StopFunc
refresh time.Duration
}
Expand Down Expand Up @@ -74,46 +73,44 @@ func New(clientConfig clientcmd.ClientConfig, refresh time.Duration, consoleUrl,
consoleClient: consoleClient,
engine: engine,
deathChan: deathChan,
svcQueue: svcQueue,
cleanup: cleanup,
refresh: refresh,
}, nil
}

func (agent *Agent) Run() {
defer agent.cleanup()
defer agent.svcQueue.ShutDown()
defer agent.engine.WipeCache()
go func() {
for {
go agent.engine.ControlLoop()
failure := <-agent.deathChan
fmt.Printf("recovered from panic %v\n", failure)
}
}()
panicHandler := func(p interface{}) {
fmt.Printf("Task panicked: %v", p)
}

wait.PollInfinite(agent.refresh, func() (done bool, err error) {
for {
log.Info("fetching services for cluster")
svcs, err := agent.consoleClient.GetServices()
if err != nil {
log.Error(err, "failed to fetch service list from deployments service")
return false, nil
log.Error(err, "failed to fetch service list from deployments service %v", err)
time.Sleep(agent.refresh)
continue
}

pool := pond.New(20, 100, pond.MinWorkers(20), pond.PanicHandler(panicHandler))
for _, svc := range svcs {
log.Info("sending update for", "service", svc.ID)
agent.svcQueue.Add(svc.ID)
log.Info("sending update for", "service", svc.ID, "namespace", svc.Namespace, "name", svc.Name)
pool.TrySubmit(func() {
if err := agent.engine.ProcessItem(svc.ID); err != nil {
log.Error(err, "found unprocessable error")
}
})
}

pool.StopAndWait()
info, err := agent.discoveryClient.ServerVersion()
if err != nil {
log.Error(err, "failed to fetch cluster version")
return false, nil
}
v := fmt.Sprintf("%s.%s", info.Major, info.Minor)
if err := agent.consoleClient.Ping(v); err != nil {
log.Error(err, "failed to ping cluster after scheduling syncs")
}
return false, nil
})
time.Sleep(agent.refresh)
}
}
42 changes: 4 additions & 38 deletions pkg/sync/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,56 +3,22 @@ package sync
import (
"context"
"fmt"
"runtime/debug"

"github.com/argoproj/gitops-engine/pkg/sync"
"github.com/argoproj/gitops-engine/pkg/sync/common"
"github.com/samber/lo"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/wait"
)

func (engine *Engine) ControlLoop() {
if engine.deathChan != nil {
defer func() {
if r := recover(); r != nil {
engine.deathChan <- r
fmt.Printf("panic: %s\n", string(debug.Stack()))
}
}()
}

engine.RegisterHandlers()

wait.PollInfinite(syncDelay, func() (done bool, err error) {
log.Info("Polling for new service updates")

item, shutdown := engine.svcQueue.Get()
if shutdown {
return true, nil
}

if err := engine.processItem(item); err != nil {
log.Error(err, "found unprocessable error")
}

engine.syncing = ""

return false, nil
})
}

func (engine *Engine) processItem(item interface{}) error {
defer engine.svcQueue.Done(item)
func (engine *Engine) ProcessItem(item interface{}) error {
id := item.(string)

if id == "" {
return nil
}

log.Info("attempting to sync service", "id", id)
engine.syncing = id
svc, err := engine.svcCache.Get(id)
if err != nil {
fmt.Printf("failed to fetch service from cache: %s, ignoring for now", err)
Expand All @@ -74,7 +40,7 @@ func (engine *Engine) processItem(item interface{}) error {

if manErr != nil {
if err := engine.updateStatus(svc.ID, results, errorAttributes("manifests", manErr)); err != nil {
log.Error(err, "Failed to update service status, ignoring for now")
log.Error(err, "Failed to update service status, ignoring for now", "namespace", svc.Namespace, "name", svc.Name)
}
log.Error(manErr, "failed to parse manifests")
return manErr
Expand All @@ -87,7 +53,7 @@ func (engine *Engine) processItem(item interface{}) error {
diff, err := engine.diff(manifests, svc.Namespace, svc.ID)
checkModifications := sync.WithResourceModificationChecker(true, diff)
if err != nil {
log.Error(err, "could not build diff list, ignoring for now")
log.Error(err, "could not build diff list for service, ignoring for now", "namespace", svc.Namespace, "name", svc.Name)
checkModifications = sync.WithResourceModificationChecker(false, nil)
}

Expand All @@ -113,7 +79,7 @@ func (engine *Engine) processItem(item interface{}) error {
}

if err := engine.updateStatus(svc.ID, results, errorAttributes("sync", err)); err != nil {
log.Error(err, "Failed to update service status, ignoring for now")
log.Error(err, "Failed to update service status, ignoring for now", "namespace", svc.Namespace, "name", svc.Name)
}

return nil
Expand Down

0 comments on commit 2f9b5e0

Please sign in to comment.