Skip to content

Commit

Permalink
refactor: nested api/worker (#567)
Browse files Browse the repository at this point in the history
  • Loading branch information
plyr4 authored Mar 19, 2024
1 parent 5fede35 commit d872e67
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 69 deletions.
36 changes: 18 additions & 18 deletions cmd/vela-worker/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,25 @@ import (
"context"
"encoding/json"
"net/http"
"strconv"
"sync"
"time"

"github.com/sirupsen/logrus"

api "github.com/go-vela/server/api/types"
"github.com/go-vela/types"
"github.com/go-vela/types/constants"
"github.com/go-vela/types/library"
"github.com/go-vela/types/pipeline"
"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/runtime"
"github.com/go-vela/worker/version"
"github.com/sirupsen/logrus"
)

// exec is a helper function to poll the queue
// and execute Vela pipelines for the Worker.
//
//nolint:nilerr,funlen // ignore returning nil - don't want to crash worker
func (w *Worker) exec(index int, config *library.Worker) error {
func (w *Worker) exec(index int, config *api.Worker) error {
var err error

// setup the version
Expand Down Expand Up @@ -103,14 +103,14 @@ func (w *Worker) exec(index int, config *library.Worker) error {
"version": v.Semantic(),
})

// lock and append the build to the RunningBuildIDs list
w.RunningBuildIDsMutex.Lock()
// lock and append the build to the list
w.RunningBuildsMutex.Lock()

w.RunningBuildIDs = append(w.RunningBuildIDs, strconv.FormatInt(item.Build.GetID(), 10))
w.RunningBuilds = append(w.RunningBuilds, item.Build)

config.SetRunningBuildIDs(w.RunningBuildIDs)
config.SetRunningBuilds(w.RunningBuilds)

w.RunningBuildIDsMutex.Unlock()
w.RunningBuildsMutex.Unlock()

// set worker status
updateStatus := w.getWorkerStatusFromConfig(config)
Expand Down Expand Up @@ -208,18 +208,18 @@ func (w *Worker) exec(index int, config *library.Worker) error {

logger.Info("completed build")

// lock and remove the build from the RunningBuildIDs list
w.RunningBuildIDsMutex.Lock()
// lock and remove the build from the list
w.RunningBuildsMutex.Lock()

for i, v := range w.RunningBuildIDs {
if v == strconv.FormatInt(item.Build.GetID(), 10) {
w.RunningBuildIDs = append(w.RunningBuildIDs[:i], w.RunningBuildIDs[i+1:]...)
for i, v := range w.RunningBuilds {
if v.GetID() == item.Build.GetID() {
w.RunningBuilds = append(w.RunningBuilds[:i], w.RunningBuilds[i+1:]...)
}
}

config.SetRunningBuildIDs(w.RunningBuildIDs)
config.SetRunningBuilds(w.RunningBuilds)

w.RunningBuildIDsMutex.Unlock()
w.RunningBuildsMutex.Unlock()

// set worker status
updateStatus := w.getWorkerStatusFromConfig(config)
Expand Down Expand Up @@ -303,8 +303,8 @@ func (w *Worker) exec(index int, config *library.Worker) error {

// getWorkerStatusFromConfig is a helper function
// to determine the appropriate worker status.
func (w *Worker) getWorkerStatusFromConfig(config *library.Worker) string {
switch rb := len(config.GetRunningBuildIDs()); {
func (w *Worker) getWorkerStatusFromConfig(config *api.Worker) string {
switch rb := len(config.GetRunningBuilds()); {
case rb == 0:
return constants.WorkerStatusIdle
case rb < w.Config.Build.Limit:
Expand Down
19 changes: 14 additions & 5 deletions cmd/vela-worker/operate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"context"
"time"

"github.com/go-vela/server/queue"
"github.com/go-vela/types/constants"
"github.com/go-vela/types/library"
"github.com/sirupsen/logrus"

"golang.org/x/sync/errgroup"

api "github.com/go-vela/server/api/types"
"github.com/go-vela/server/queue"
"github.com/go-vela/types/constants"
)

// operate is a helper function to initiate all
Expand All @@ -27,7 +27,7 @@ func (w *Worker) operate(ctx context.Context) error {
executors, gctx := errgroup.WithContext(ctx)
// Define the database representation of the worker
// and register itself in the database
registryWorker := new(library.Worker)
registryWorker := new(api.Worker)
registryWorker.SetHostname(w.Config.API.Address.Hostname())
registryWorker.SetAddress(w.Config.API.Address.String())
registryWorker.SetRoutes(w.Config.Queue.Routes)
Expand Down Expand Up @@ -118,13 +118,16 @@ func (w *Worker) operate(ctx context.Context) error {

continue
}

w.QueueCheckedIn, err = w.queueCheckIn(gctx, registryWorker)

if err != nil {
// queue check in failed, retry
logrus.Errorf("unable to ping queue %v", err)
logrus.Info("retrying check-in...")

time.Sleep(5 * time.Second)

continue
}

Expand Down Expand Up @@ -166,19 +169,22 @@ func (w *Worker) operate(ctx context.Context) error {
if !w.CheckedIn {
time.Sleep(5 * time.Second)
logrus.Info("worker not checked in, skipping queue read")

continue
}
// do not pull from queue unless queue setup is done and connected
if !w.QueueCheckedIn {
time.Sleep(5 * time.Second)
logrus.Info("queue ping failed, skipping queue read")

continue
}
select {
case <-gctx.Done():
logrus.WithFields(logrus.Fields{
"id": id,
}).Info("completed looping on worker executor")

return nil
default:
logrus.WithFields(logrus.Fields{
Expand All @@ -197,16 +203,19 @@ func (w *Worker) operate(ctx context.Context) error {
logrus.Errorf("failing worker executor: %v", err)
registryWorker.SetStatus(constants.WorkerStatusError)
_, resp, logErr := w.VelaClient.Worker.Update(registryWorker.GetHostname(), registryWorker)

if resp == nil {
// log the error instead of returning so the operation doesn't block worker deployment
logrus.Error("status update response is nil")
}

if logErr != nil {
if resp != nil {
// log the error instead of returning so the operation doesn't block worker deployment
logrus.Errorf("status code: %v, unable to update worker %s status with the server: %v", resp.StatusCode, registryWorker.GetHostname(), logErr)
}
}

return err
}
}
Expand Down
13 changes: 7 additions & 6 deletions cmd/vela-worker/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import (
"fmt"
"net/http"

"github.com/go-vela/types/constants"
"github.com/go-vela/types/library"
"github.com/sirupsen/logrus"

api "github.com/go-vela/server/api/types"
"github.com/go-vela/types/constants"
)

// checkIn is a helper function to phone home to the server.
func (w *Worker) checkIn(config *library.Worker) (bool, string, error) {
func (w *Worker) checkIn(config *api.Worker) (bool, string, error) {
// check to see if the worker already exists in the database
logrus.Infof("retrieving worker %s from the server", config.GetHostname())

Expand Down Expand Up @@ -48,7 +49,7 @@ func (w *Worker) checkIn(config *library.Worker) (bool, string, error) {
}

// register is a helper function to register the worker with the server.
func (w *Worker) register(config *library.Worker) (bool, string, error) {
func (w *Worker) register(config *api.Worker) (bool, string, error) {
logrus.Infof("worker %s not found, registering it with the server", config.GetHostname())

// status Idle will be set for worker upon first time registration
Expand All @@ -68,7 +69,7 @@ func (w *Worker) register(config *library.Worker) (bool, string, error) {
}

// queueCheckIn is a helper function to phone home to the redis.
func (w *Worker) queueCheckIn(ctx context.Context, registryWorker *library.Worker) (bool, error) {
func (w *Worker) queueCheckIn(ctx context.Context, registryWorker *api.Worker) (bool, error) {
pErr := w.Queue.Ping(ctx)
if pErr != nil {
logrus.Errorf("worker %s unable to contact the queue: %v", registryWorker.GetHostname(), pErr)
Expand All @@ -86,7 +87,7 @@ func (w *Worker) queueCheckIn(ctx context.Context, registryWorker *library.Worke

// updateWorkerStatus is a helper function to update worker status
// logs the error if it can't update status.
func (w *Worker) updateWorkerStatus(config *library.Worker, status string) {
func (w *Worker) updateWorkerStatus(config *api.Worker, status string) {
config.SetStatus(status)
_, resp, logErr := w.VelaClient.Worker.Update(config.GetHostname(), config)

Expand Down
13 changes: 6 additions & 7 deletions cmd/vela-worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,15 @@ import (
"net/url"

"github.com/gin-gonic/gin"

"github.com/go-vela/server/queue"
"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/runtime"

"github.com/sirupsen/logrus"

"github.com/urfave/cli/v2"

_ "github.com/joho/godotenv/autoload"

"github.com/go-vela/server/queue"
"github.com/go-vela/types/library"
"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/runtime"
)

// run executes the worker based
Expand Down Expand Up @@ -137,7 +136,7 @@ func run(c *cli.Context) error {

RegisterToken: make(chan string, 1),

RunningBuildIDs: make([]string, 0),
RunningBuilds: make([]*library.Build, 0),
}

// set the worker address if no flag was provided
Expand Down
21 changes: 11 additions & 10 deletions cmd/vela-worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/go-vela/sdk-go/vela"
"github.com/go-vela/server/queue"
"github.com/go-vela/types/library"
"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/runtime"
)
Expand Down Expand Up @@ -61,15 +62,15 @@ type (
// Worker represents all configuration and
// system processes for the worker.
Worker struct {
Config *Config
Executors map[int]executor.Engine
Queue queue.Service
Runtime runtime.Engine
VelaClient *vela.Client
RegisterToken chan string
CheckedIn bool
QueueCheckedIn bool
RunningBuildIDs []string
RunningBuildIDsMutex sync.Mutex
Config *Config
Executors map[int]executor.Engine
Queue queue.Service
Runtime runtime.Engine
VelaClient *vela.Client
RegisterToken chan string
CheckedIn bool
RunningBuilds []*library.Build
QueueCheckedIn bool
RunningBuildsMutex sync.Mutex
}
)
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ require (
github.com/docker/docker v24.0.9+incompatible
github.com/docker/go-units v0.5.0
github.com/gin-gonic/gin v1.9.1
github.com/go-vela/sdk-go v0.23.2
github.com/go-vela/server v0.23.2
github.com/go-vela/sdk-go v0.23.3-0.20240319181130-4a7c245c93ae
github.com/go-vela/server v0.23.4-0.20240319161125-1809638e7e72
github.com/go-vela/types v0.23.2
github.com/golang-jwt/jwt/v5 v5.2.1
github.com/google/go-cmp v0.6.0
Expand All @@ -33,7 +33,7 @@ require (
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/alicebob/miniredis/v2 v2.31.1 // indirect
github.com/alicebob/miniredis/v2 v2.32.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/buildkite/yaml v0.0.0-20230306222819-0e4e032d4835 // indirect
github.com/bytedance/sonic v1.9.1 // indirect
Expand Down Expand Up @@ -79,7 +79,7 @@ require (
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mitchellh/copystructure v1.0.0 // indirect
github.com/mitchellh/reflectwalk v1.0.1 // indirect
github.com/moby/term v0.0.0-20201216013528-df9cb8a40635 // indirect
Expand All @@ -102,8 +102,8 @@ require (
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
github.com/yuin/gopher-lua v1.1.0 // indirect
go.starlark.net v0.0.0-20240311180835-efac67204ba7 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
go.starlark.net v0.0.0-20240314022150-ee8ed142361c // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/mod v0.14.0 // indirect
Expand All @@ -115,7 +115,7 @@ require (
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.16.1 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.32.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
Loading

0 comments on commit d872e67

Please sign in to comment.