Skip to content

Commit

Permalink
feat(scheduler): node placement by labels (#10)
Browse files Browse the repository at this point in the history
* feat(scheduler): place by labels

* feat(scheduler): return selected candidate nodes

* feat(scheduler): pick N amount of nodes

* feat: handle healthchecks
  • Loading branch information
zze0s authored Oct 27, 2024
1 parent 05092bc commit 6da16fd
Show file tree
Hide file tree
Showing 13 changed files with 571 additions and 300 deletions.
10 changes: 3 additions & 7 deletions config_agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ http:
agent:
nodeName: test-node1
clientAddr: http://localhost:7430
annotations:
- hetzner
- ssd
labels:
- speed=1g
network: 1G
disktype: ssd

manager:
addr: http://localhost:7422
Expand Down Expand Up @@ -38,6 +36,4 @@ clients:
torrents:
maxActiveDownloads: 3
maxTotalDownloads: 3
maxTotalTorrents: 100

maxActiveDownloads: 3
maxTotalTorrents: 100
93 changes: 32 additions & 61 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package agent

import (
"context"
"github.com/autobrr/distribrr/pkg/server/client"
"os"
"os/signal"
"syscall"
"time"

"github.com/autobrr/distribrr/pkg/server/client"
"github.com/autobrr/distribrr/pkg/stats"
"github.com/autobrr/distribrr/pkg/task"

Expand Down Expand Up @@ -118,7 +118,7 @@ func (s *Service) registerAgentWithServer(tickerDuration time.Duration) error {
// Create a new context that will be done after tickerDuration
ctx, cancel := context.WithTimeout(context.Background(), tickerDuration)
defer cancel()
if err := s.Join(ctx, s.cfg.Manager.Addr, s.cfg.Manager.Token, s.cfg.Agent.NodeName, s.cfg.Agent.ClientAddr); err != nil {
if err := s.Join(ctx, s.cfg.Manager.Addr, s.cfg.Manager.Token, s.cfg.Agent); err != nil {
return err
}

Expand All @@ -144,10 +144,14 @@ func (s *Service) Deregister() error {
}

// Join worker to manager
func (s *Service) Join(ctx context.Context, addr string, token string, name string, clientAddr string) error {
func (s *Service) Join(ctx context.Context, addr string, token string, agent Agent) error {
log.Info().Msgf("sending join request to: %s", addr)

nodeName := name
if s.serverClient == nil {
s.serverClient = serverclient.NewClient(addr, token)
}

nodeName := agent.NodeName
if nodeName == "" {
h, err := os.Hostname()
if err != nil {
Expand All @@ -159,16 +163,14 @@ func (s *Service) Join(ctx context.Context, addr string, token string, name stri
}
}

//s.serverClient = serverclient.NewClient(addr, token)

joinReq := serverclient.JoinRequest{
NodeName: nodeName,
ClientAddr: clientAddr,
ClientAddr: agent.ClientAddr,
Labels: agent.Labels,
}

//if s.serverClient == nil {
// s.serverClient = serverclient.NewClient(addr, token)
//}
s.serverClient = serverclient.NewClient(addr, token)

if err := s.serverClient.JoinRequest(ctx, joinReq); err != nil {
return err
}
Expand All @@ -185,6 +187,7 @@ func (s *Service) Join(ctx context.Context, addr string, token string, name stri
}

func (s *Service) Healthcheck(ctx context.Context) error {
// TODO check client(s) and report status
return nil
}

Expand Down Expand Up @@ -251,15 +254,13 @@ func (s *Service) StartTask(t task.Task) error {
opts["tags"] = t.Tags
}

for _, c := range s.clients {
c := c

for _, client := range s.clients {
sender.Go(func() error {
log.Debug().Msgf("add torrent %s to %s", t.Name, c.Name)
log.Debug().Msgf("add torrent %s to %s", t.Name, client.Name)

// send downloads
if err := c.Client.AddTorrentFromUrlCtx(ctx, t.DownloadURL, opts); err != nil {
log.Error().Err(err).Msgf("error adding torrent from file %s to qbit: %s", t.Name, c.Name)
if err := client.Client.AddTorrentFromUrlCtx(ctx, t.DownloadURL, opts); err != nil {
log.Error().Err(err).Msgf("error adding torrent from file %s to qbit: %s", t.Name, client.Name)
return err
}

Expand All @@ -274,8 +275,8 @@ func (s *Service) StartTask(t task.Task) error {
// MaxAttempts: 50,
// DeleteOnFailure: false,
// }
// if err := c.Client.ReannounceTorrentWithRetry(context.Background(), req.InfoHash, &options); err != nil {
// log.Error().Err(err).Msgf("error re-announcing torrent %s on qbit: %s", req.InfoHash, c.Name)
// if err := client.Client.ReannounceTorrentWithRetry(context.Background(), req.InfoHash, &options); err != nil {
// log.Error().Err(err).Msgf("error re-announcing torrent %s on qbit: %s", req.InfoHash, client.Name)
// }
// }(req)
//}
Expand Down Expand Up @@ -329,55 +330,20 @@ func (s *Service) CollectStats() {
}
}

func (s *Service) GetStatsFull() *stats.Stats {
func (s *Service) GetStatsFull(ctx context.Context) *stats.Stats {
s.stats = stats.GetStats()
s.GetClientStats()
s.GetClientStats(ctx)
return s.stats
}

func (s *Service) GetStats() *stats.Stats {
log.Trace().Msg("collecting stats")
s.stats = stats.GetStats()

// TODO use errgroup
//for _, client := range s.clients {
// l := log.With().Str("client", client.Name).Logger()
//
//
// l.Trace().Msg("check disk per path for client")
//
// for _, storage := range client.Rules.Storage {
// l.Trace().Msgf("check disk for path %q", storage.Path)
//
// s.stats.DiskPathStats[storage.Path] = stats.GetDiskInfoByPath(storage.Path)
// }
//
// l.Trace().Msg("get active torrents for client")
//
// t, err := client.Client.GetTorrentsActiveDownloadsCtx(context.Background())
// if err != nil {
// l.Error().Err(err).Msgf("could not load active torrents for client: %q", client.Name)
// continue
// }
//
// l.Trace().Msgf("found %d active torrents for client", len(t))
//
// ct := stats.ClientStats{
// ClientActiveDownloads: len(t),
// ClientReady: len(t) < client.Rules.Torrents.MaxActiveDownloads,
// }
//
// l.Trace().Msgf("client ready: %t", ct.ClientReady)
//
// s.stats.ClientStats[client.Name] = ct
//}
//
//s.taskCount = s.stats.TaskCount

return s.stats
}

func (s *Service) GetClientStats() *stats.Stats {
func (s *Service) GetClientStats(ctx context.Context) *stats.Stats {
log.Trace().Msg("collecting stats")
//s.stats = stats.GetStats()

Expand All @@ -395,20 +361,21 @@ func (s *Service) GetClientStats() *stats.Stats {

l.Trace().Msg("get active torrents for client")

t, err := client.Client.GetTorrentsActiveDownloadsCtx(context.Background())
activeDownloads, err := client.Client.GetTorrentsActiveDownloadsCtx(ctx)
if err != nil {
l.Error().Err(err).Msgf("could not load active torrents for client")
continue
}

l.Trace().Msgf("found %d active torrents for client", len(t))
l.Trace().Msgf("found %d active torrents for client", len(activeDownloads))

ct := stats.ClientStats{
ClientActiveDownloads: len(t),
ClientReady: len(t) < client.Rules.Torrents.MaxActiveDownloads,
ActiveDownloads: len(activeDownloads),
Ready: len(activeDownloads) < client.Rules.Torrents.MaxActiveDownloads,
}

l.Trace().Msgf("client ready: %t", ct.ClientReady)
l.Trace().Msgf("[%d/%d] active downloads, status ready: %t", len(activeDownloads), client.Rules.Torrents.MaxActiveDownloads, ct.Ready)
l.Debug().Msgf("client ready: %t", ct.Ready)

s.stats.ClientStats[name] = ct
}
Expand All @@ -417,3 +384,7 @@ func (s *Service) GetClientStats() *stats.Stats {

return s.stats
}

func (s *Service) GetLabels() map[string]string {
return s.cfg.Agent.Labels
}
29 changes: 22 additions & 7 deletions pkg/agent/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,19 @@ func (s *APIServer) Handler() http.Handler {
r.Use(mw.RequestLogger)

r.Route("/api/v1/", func(r chi.Router) {
r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
if err := s.service.Healthcheck(r.Context()); err != nil {
render.Status(r, http.StatusFailedDependency)
return
}
r.Route("/healthz", func(r chi.Router) {
r.Get("/liveness", func(w http.ResponseWriter, r *http.Request) {
render.Status(r, http.StatusOK)
})

r.Get("/readiness", func(w http.ResponseWriter, r *http.Request) {
if err := s.service.Healthcheck(r.Context()); err != nil {
render.Status(r, http.StatusFailedDependency)
return
}

render.Status(r, http.StatusOK)
render.Status(r, http.StatusOK)
})
})

r.Group(func(r chi.Router) {
Expand Down Expand Up @@ -96,7 +102,7 @@ func (s *APIServer) Handler() http.Handler {

r.Route("/stats", func(r chi.Router) {
r.Get("/", func(w http.ResponseWriter, r *http.Request) {
s := s.service.GetStatsFull()
s := s.service.GetStatsFull(r.Context())
if s == nil {
render.Status(r, http.StatusInternalServerError)
return
Expand All @@ -106,6 +112,15 @@ func (s *APIServer) Handler() http.Handler {
render.JSON(w, r, s)
})
})

r.Route("/labels", func(r chi.Router) {
r.Get("/", func(w http.ResponseWriter, r *http.Request) {
label := s.service.GetLabels()

render.Status(r, http.StatusOK)
render.JSON(w, r, label)
})
})
})

})
Expand Down
64 changes: 61 additions & 3 deletions pkg/agent/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/autobrr/distribrr/pkg/version"
"github.com/rs/xid"
"net/http"
"net/url"
"time"

"github.com/autobrr/distribrr/pkg/stats"
"github.com/autobrr/distribrr/pkg/task"
"github.com/autobrr/distribrr/pkg/version"

"github.com/pkg/errors"
"github.com/rs/xid"
)

const DefaultTimeout = 15 * time.Second
Expand All @@ -39,10 +39,68 @@ func NewClient(addr, name, token string) *Client {
}

func (c *Client) HealthCheck(ctx context.Context) error {
// TODO ping clients
reqUrl, err := c.buildUrl(c.addr, "healthz/readiness", nil)
if err != nil {
return errors.Wrapf(err, "could not build URL: %s", c.name)
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqUrl.String(), nil)
if err != nil {
return errors.Wrapf(err, "could not create request for node: %s", c.name)
}

c.setHeaders(ctx, req)

resp, err := c.http.Do(req)
if err != nil {
return errors.Wrapf(err, "error during request for node: %s", c.name)
}

defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {

Check failure on line 61 in pkg/agent/client.go

View workflow job for this annotation

GitHub Actions / Nilaway Linter

error: Potential nil panic detected. Observed nil flow from source to dereference point:
return fmt.Errorf("node: %s healthcheck unexpected status: %d", c.name, resp.StatusCode)
}

return nil
}

func (c *Client) GetLabels(ctx context.Context) (map[string]string, error) {
return c.getLabels(ctx)
}

func (c *Client) getLabels(ctx context.Context) (map[string]string, error) {
reqUrl, err := c.buildUrl(c.addr, "labels", nil)
if err != nil {
return nil, errors.Wrapf(err, "could not build URL: %s", c.name)
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqUrl.String(), nil)
if err != nil {
return nil, errors.Wrapf(err, "could not create request for node: %s", c.name)
}

c.setHeaders(ctx, req)

resp, err := c.http.Do(req)
if err != nil {
return nil, errors.Wrapf(err, "error during request for node: %s", c.name)
}

defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("node: %s unexpected status: %d", c.name, resp.StatusCode)
}

var data map[string]string
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
return nil, err
}

return data, nil
}

func (c *Client) GetStats(ctx context.Context) (*stats.Stats, error) {
return c.getStats(ctx)
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ type Config struct {
}

type Agent struct {
NodeName string `yaml:"nodeName"`
ClientAddr string `yaml:"clientAddr"`
Annotations []string `yaml:"annotations"`
NodeName string `yaml:"nodeName"`
ClientAddr string `yaml:"clientAddr"`
Labels map[string]string `yaml:"labels"`
}

type Manager struct {
Expand Down Expand Up @@ -51,6 +51,11 @@ func (c *Config) Defaults() {
Token: "",
}
c.Manager = Manager{}
c.Agent = Agent{
NodeName: "",
ClientAddr: "",
Labels: map[string]string{},
}
c.Clients = map[string]*QbitClient{}
//c.Clients = make(map[string]*QbitClient, 0)
}
Expand Down
Loading

0 comments on commit 6da16fd

Please sign in to comment.