Skip to content

Commit

Permalink
scheduler queue, db accounts
Browse files Browse the repository at this point in the history
  • Loading branch information
Cufee committed Jun 7, 2024
1 parent 2f74102 commit 9f360c4
Show file tree
Hide file tree
Showing 35 changed files with 1,609 additions and 289 deletions.
9 changes: 5 additions & 4 deletions cmds/core/scheduler/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@ import (
"time"

"github.com/cufee/aftermath/cmds/core"
"github.com/cufee/aftermath/cmds/core/scheduler/tasks"
"github.com/go-co-op/gocron"
"github.com/rs/zerolog/log"
)

func StartCronJobs(client core.Client) {
log.Info().Msg("starting cron jobs")
func StartCronJobs(client core.Client, queue *tasks.Queue) {
defer log.Info().Msg("started cron scheduler")

c := gocron.NewScheduler(time.UTC)
// Tasks
c.Cron("* * * * *").Do(runTasksWorker(client))
c.Cron("0 * * * *").Do(restartTasksWorker(client))
c.Cron("* * * * *").Do(runTasksWorker(queue))
c.Cron("0 * * * *").Do(restartTasksWorker(queue))

// Glossary - Do it around the same time WG releases game updates
c.Cron("0 10 * * *").Do(UpdateGlossaryWorker(client))
Expand Down
17 changes: 17 additions & 0 deletions cmds/core/scheduler/tasks/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package tasks

import (
"github.com/cufee/aftermath/cmds/core"
"github.com/cufee/aftermath/internal/database"
)

type TaskHandler struct {
process func(client core.Client, task database.Task) (string, error)
shouldRetry func(task *database.Task) bool
}

var defaultHandlers = make(map[database.TaskType]TaskHandler)

func DefaultHandlers() map[database.TaskType]TaskHandler {
return defaultHandlers
}
125 changes: 125 additions & 0 deletions cmds/core/scheduler/tasks/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package tasks

import (
"context"
"sync"
"time"

"github.com/cufee/aftermath/cmds/core"
"github.com/cufee/aftermath/internal/database"
"github.com/rs/zerolog/log"
)

type Queue struct {
limiter chan struct{}
concurrencyLimit int
lastTaskRun time.Time

handlers map[database.TaskType]TaskHandler
core core.Client
}

func (q *Queue) ConcurrencyLimit() int {
return q.concurrencyLimit
}

func (q *Queue) ActiveWorkers() int {
return len(q.limiter)
}

func (q *Queue) LastTaskRun() time.Time {
return q.lastTaskRun
}

func NewQueue(client core.Client, handlers map[database.TaskType]TaskHandler, concurrencyLimit int) *Queue {
return &Queue{
core: client,
handlers: handlers,
concurrencyLimit: concurrencyLimit,
limiter: make(chan struct{}, concurrencyLimit),
}
}

func (q *Queue) Process(callback func(error), tasks ...database.Task) {
var err error
if callback != nil {
defer callback(err)
}
if len(tasks) == 0 {
log.Debug().Msg("no tasks to process")
return
}

log.Debug().Msgf("processing %d tasks", len(tasks))

var wg sync.WaitGroup
q.lastTaskRun = time.Now()
processedTasks := make(chan database.Task, len(tasks))
for _, task := range tasks {
wg.Add(1)
go func(t database.Task) {
q.limiter <- struct{}{}
defer func() {
processedTasks <- t
wg.Done()
<-q.limiter
log.Debug().Msgf("finished processing task %s", t.ID)
}()
log.Debug().Msgf("processing task %s", t.ID)

handler, ok := q.handlers[t.Type]
if !ok {
t.Status = database.TaskStatusFailed
t.LogAttempt(database.TaskLog{
Targets: t.Targets,
Timestamp: time.Now(),
Error: "missing task type handler",
})
return
}

attempt := database.TaskLog{
Targets: t.Targets,
Timestamp: time.Now(),
}

message, err := handler.process(nil, t)
attempt.Comment = message
if err != nil {
attempt.Error = err.Error()
t.Status = database.TaskStatusFailed
} else {
t.Status = database.TaskStatusComplete
}
t.LogAttempt(attempt)
}(task)
}

wg.Wait()
close(processedTasks)

rescheduledCount := 0
processedSlice := make([]database.Task, 0, len(processedTasks))
for task := range processedTasks {
handler, ok := q.handlers[task.Type]
if !ok {
continue
}

if task.Status == database.TaskStatusFailed && handler.shouldRetry(&task) {
rescheduledCount++
task.Status = database.TaskStatusScheduled
}
processedSlice = append(processedSlice, task)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

err = q.core.Database().UpdateTasks(ctx, processedSlice...)
if err != nil {
return
}

log.Debug().Msgf("processed %d tasks, %d rescheduled", len(processedSlice), rescheduledCount)
}
88 changes: 88 additions & 0 deletions cmds/core/scheduler/tasks/sessions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package tasks

import (
"context"
"errors"
"strings"
"time"

"github.com/cufee/aftermath/cmds/core"
"github.com/cufee/aftermath/internal/database"
)

func init() {
defaultHandlers[database.TaskTypeRecordSessions] = TaskHandler{
process: func(client core.Client, task database.Task) (string, error) {
if task.Data == nil {
return "no data provided", errors.New("no data provided")
}
realm, ok := task.Data["realm"].(string)
if !ok {
return "invalid realm", errors.New("invalid realm")
}

return "did nothing for a task on realm " + realm, nil

// accountErrs, err := cache.RefreshSessionsAndAccounts(models.SessionTypeDaily, nil, realm, task.Targets...)
// if err != nil {
// return "failed to refresh sessions on all account", err
// }

// var failedAccounts []int
// for accountId, err := range accountErrs {
// if err != nil && accountId != 0 {
// failedAccounts = append(failedAccounts, accountId)
// }
// }
// if len(failedAccounts) == 0 {
// return "finished session update on all accounts", nil
// }

// // Retry failed accounts
// task.Targets = failedAccounts
// return "retrying failed accounts", errors.New("some accounts failed")
},
shouldRetry: func(task *database.Task) bool {
triesLeft, ok := task.Data["triesLeft"].(int32)
if !ok {
return false
}
if triesLeft <= 0 {
return false
}

triesLeft -= 1
task.Data["triesLeft"] = triesLeft
task.ScheduledAfter = time.Now().Add(5 * time.Minute) // Backoff for 5 minutes to avoid spamming
return true
},
}
}

func CreateSessionUpdateTasks(client core.Client) func(realm string) error {
return func(realm string) error {
realm = strings.ToUpper(realm)
task := database.Task{
Type: database.TaskTypeRecordSessions,
ReferenceID: "realm_" + realm,
Data: map[string]any{
"realm": realm,
"triesLeft": int32(3),
},
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

accounts, err := client.Database().GetRealmAccounts(ctx, realm)
if err != nil {
return err
}
if len(accounts) < 1 {
return nil
}

// This update requires (2 + n) requests per n players
return client.Database().CreateTasks(ctx, splitTaskByTargets(task, 50)...)
}
}
24 changes: 24 additions & 0 deletions cmds/core/scheduler/tasks/split.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package tasks

import "github.com/cufee/aftermath/internal/database"

func splitTaskByTargets(task database.Task, batchSize int) []database.Task {
if len(task.Targets) <= batchSize {
return []database.Task{task}
}

var tasks []database.Task
subTasks := len(task.Targets) / batchSize

for i := 0; i <= subTasks; i++ {
subTask := task
if len(task.Targets) > batchSize*(i+1) {
subTask.Targets = (task.Targets[batchSize*i : batchSize*(i+1)])
} else {
subTask.Targets = (task.Targets[batchSize*i:])
}
tasks = append(tasks, subTask)
}

return tasks
}
5 changes: 3 additions & 2 deletions cmds/core/scheduler/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scheduler

import (
"github.com/cufee/aftermath/cmds/core"
"github.com/cufee/aftermath/cmds/core/scheduler/tasks"
)

func rotateBackgroundPresetsWorker(client core.Client) func() {
Expand Down Expand Up @@ -29,7 +30,7 @@ func createSessionTasksWorker(client core.Client, realm string) func() {
}
}

func runTasksWorker(client core.Client) func() {
func runTasksWorker(queue *tasks.Queue) func() {
return func() {
// if tasks.DefaultQueue.ActiveWorkers() > 0 {
// return
Expand Down Expand Up @@ -57,7 +58,7 @@ func runTasksWorker(client core.Client) func() {
}
}

func restartTasksWorker(client core.Client) func() {
func restartTasksWorker(queue *tasks.Queue) func() {
return func() {
// _, err := tasks.RestartAbandonedTasks(nil)
// if err != nil {
Expand Down
16 changes: 16 additions & 0 deletions cmds/discord/commands/ping.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package commands

import (
"github.com/cufee/aftermath/cmds/discord/commands/builder"
"github.com/cufee/aftermath/cmds/discord/common"
)

func init() {
Loaded.add(
builder.NewCommand("ping").
Params(builder.SetDescKey("Pong!")).
Handler(func(ctx *common.Context) error {
return ctx.Reply("Pong!")
}),
)
}
13 changes: 8 additions & 5 deletions cmds/discord/router/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,24 +109,27 @@ func (router *Router) HTTPHandler() (http.HandlerFunc, error) {
command, err := router.routeInteraction(data)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
log.Err(err).Msg("failed to route interaction")
log.Err(err).Str("id", data.ID).Msg("failed to route interaction")
return
}

// ack the interaction
err = writeDeferredInteractionResponseAck(w, data.Type, command.Ephemeral)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
log.Err(err).Msg("failed to ack an interaction")
log.Err(err).Str("id", data.ID).Msg("failed to ack an interaction")
return
}

// route the interaction to a proper handler for a later reply
state := &interactionState{mx: &sync.Mutex{}, acked: true}
state := &interactionState{mx: &sync.Mutex{}}
state.mx.Lock()
go func() {
// unlock once this context is done and the ack is delivered
<-r.Context().Done()
log.Debug().Str("id", data.ID).Msg("sent an interaction ack")

state.acked = true
state.mx.Unlock()
}()

Expand Down Expand Up @@ -190,7 +193,7 @@ func sendPingReply(w http.ResponseWriter) {
}

func (router *Router) startInteractionHandlerAsync(interaction discordgo.Interaction, state *interactionState, command *builder.Command) {
log.Debug().Str("id", interaction.ID).Msg("started handling an interaction")
log.Info().Str("id", interaction.ID).Msg("started handling an interaction")

// create a timer for the interaction response
responseTimer := time.NewTimer(interactionTimeout)
Expand Down Expand Up @@ -220,7 +223,7 @@ func (router *Router) startInteractionHandlerAsync(interaction discordgo.Interac
// we are done, there is nothing else we should do here
// lock in case responseCh is still busy sending some data over
defer state.mx.Unlock()
log.Debug().Str("id", interaction.ID).Msg("finished handling an interaction")
log.Info().Str("id", interaction.ID).Msg("finished handling an interaction")
return

case data := <-responseCh:
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ require (
github.com/go-co-op/gocron v1.37.0
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0
github.com/joho/godotenv v1.5.1
github.com/nlpodyssey/gopickle v0.3.0
github.com/rs/zerolog v1.33.0
github.com/shopspring/decimal v1.4.0
github.com/steebchen/prisma-client-go v0.37.0
github.com/stretchr/testify v1.9.0
go.dedis.ch/protobuf v1.0.11
golang.org/x/exp v0.0.0-20240530194437-404ba88c7ed0
golang.org/x/image v0.16.0
golang.org/x/sync v0.7.0
golang.org/x/text v0.15.0
gopkg.in/yaml.v3 v3.0.1
)
Expand Down
Loading

0 comments on commit 9f360c4

Please sign in to comment.