Skip to content

Commit

Permalink
Prevent parallel allocation to ensure spreading and reservations. (#490)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerrit91 authored May 27, 2024
1 parent d295a05 commit 10c1fc1
Show file tree
Hide file tree
Showing 10 changed files with 523 additions and 226 deletions.
13 changes: 11 additions & 2 deletions cmd/metal-api/internal/datastore/machine.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package datastore

import (
"context"
"errors"
"fmt"
"math"
"math/rand/v2"
"time"

"github.com/metal-stack/metal-api/cmd/metal-api/internal/metal"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -427,7 +429,7 @@ func (rs *RethinkStore) UpdateMachine(oldMachine *metal.Machine, newMachine *met
// FindWaitingMachine returns an available, not allocated, waiting and alive machine of given size within the given partition.
// TODO: the algorithm can be optimized / shortened by using a rethinkdb join command and then using .Sample(1)
// but current implementation should have a slightly better readability.
func (rs *RethinkStore) FindWaitingMachine(projectid, partitionid string, size metal.Size, placementTags []string) (*metal.Machine, error) {
func (rs *RethinkStore) FindWaitingMachine(ctx context.Context, projectid, partitionid string, size metal.Size, placementTags []string) (*metal.Machine, error) {
q := *rs.machineTable()
q = q.Filter(map[string]interface{}{
"allocation": nil,
Expand All @@ -440,6 +442,11 @@ func (rs *RethinkStore) FindWaitingMachine(projectid, partitionid string, size m
"preallocated": false,
})

if err := rs.sharedMutex.lock(ctx, partitionid, 10*time.Second); err != nil {
return nil, fmt.Errorf("too many parallel machine allocations taking place, try again later")
}
defer rs.sharedMutex.unlock(ctx, partitionid)

var candidates metal.Machines
err := rs.searchEntities(&q, &candidates)
if err != nil {
Expand Down Expand Up @@ -635,7 +642,9 @@ func randomIndex(max int) int {
if max <= 0 {
return 0
}
return rand.N(max)
// golangci-lint has an issue with math/rand/v2
// here it provides sufficient randomness though because it's not used for cryptographic purposes
return rand.N(max) //nolint:gosec
}

func intersect[T comparable](a, b []T) []T {
Expand Down
115 changes: 115 additions & 0 deletions cmd/metal-api/internal/datastore/machine_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,19 @@
package datastore

import (
"context"
"log/slog"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/metal-stack/metal-api/cmd/metal-api/internal/metal"
"github.com/metal-stack/metal-lib/pkg/pointer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
r "gopkg.in/rethinkdb/rethinkdb-go.v6"
)

type machineTestable struct{}
Expand Down Expand Up @@ -950,3 +958,110 @@ func TestRethinkStore_UpdateMachine(t *testing.T) {
tests[i].run(t, tt)
}
}

func Test_FindWaitingMachine_NoConcurrentModificationErrors(t *testing.T) {

var (
root = slog.Default()
wg sync.WaitGroup
size = metal.Size{Base: metal.Base{ID: "1"}}
count int
)

for _, initEntity := range []struct {
entity metal.Entity
table *r.Term
}{
{
table: sharedDS.machineTable(),
entity: &metal.Machine{
Base: metal.Base{
ID: "1",
},
PartitionID: "partition",
SizeID: size.ID,
State: metal.MachineState{
Value: metal.AvailableState,
},
Waiting: true,
PreAllocated: false,
},
},
{
table: sharedDS.eventTable(),
entity: &metal.ProvisioningEventContainer{
Base: metal.Base{
ID: "1",
},
Liveliness: metal.MachineLivelinessAlive,
},
},
} {
initEntity := initEntity

err := sharedDS.createEntity(initEntity.table, initEntity.entity)
require.NoError(t, err)

defer func() {
_, err := initEntity.table.Delete().RunWrite(sharedDS.session)
require.NoError(t, err)
}()
}

for i := 0; i < 100; i++ {
i := i
wg.Add(1)

log := root.With("worker", i)

go func() {
defer wg.Done()

for {
machine, err := sharedDS.FindWaitingMachine(context.Background(), "project", "partition", size, nil)
if err != nil {
if metal.IsConflict(err) {
t.Errorf("concurrent modification occurred, shared mutex is not working")
break
}

if strings.Contains(err.Error(), "no machine available") {
continue
}

if strings.Contains(err.Error(), "too many parallel") {
time.Sleep(10 * time.Millisecond)
continue
}

t.Errorf("unexpected error occurred: %s", err)
continue
}

log.Info("waiting machine found")

newMachine := *machine
newMachine.PreAllocated = false
if newMachine.Name == "" {
newMachine.Name = strconv.Itoa(0)
}

assert.Equal(t, strconv.Itoa(count), newMachine.Name, "concurrency occurred")
count++
newMachine.Name = strconv.Itoa(count)

err = sharedDS.updateEntity(sharedDS.machineTable(), &newMachine, machine)
if err != nil {
log.Error("unable to toggle back pre-allocation flag", "error", err)
t.Fail()
}

return
}
}()
}

wg.Wait()

assert.Equal(t, 100, count)
}
43 changes: 41 additions & 2 deletions cmd/metal-api/internal/datastore/rethinkdb.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datastore

import (
"context"
"fmt"
"log/slog"
"reflect"
Expand All @@ -18,9 +19,21 @@ const (
)

var tables = []string{
"image", "size", "partition", "machine", "switch", "switchstatus", "event", "network", "ip", "migration", "filesystemlayout", "sizeimageconstraint",
VRFIntegerPool.String(), VRFIntegerPool.String() + "info",
ASNIntegerPool.String(), ASNIntegerPool.String() + "info",
"event",
"filesystemlayout",
"image",
"ip",
"machine",
"migration",
"network",
"partition",
"sharedmutex",
"size",
"sizeimageconstraint",
"switch",
"switchstatus",
VRFIntegerPool.String(), VRFIntegerPool.String() + "info",
}

// A RethinkStore is the database access layer for rethinkdb.
Expand All @@ -40,6 +53,11 @@ type RethinkStore struct {
VRFPoolRangeMax uint
ASNPoolRangeMin uint
ASNPoolRangeMax uint

sharedMutexCtx context.Context
sharedMutexCancel context.CancelFunc
sharedMutex *sharedMutex
sharedMutexCheckInterval time.Duration
}

// New creates a new rethink store.
Expand All @@ -55,6 +73,8 @@ func New(log *slog.Logger, dbhost string, dbname string, dbuser string, dbpass s
VRFPoolRangeMax: DefaultVRFPoolRangeMax,
ASNPoolRangeMin: DefaultASNPoolRangeMin,
ASNPoolRangeMax: DefaultASNPoolRangeMax,

sharedMutexCheckInterval: defaultSharedMutexCheckInterval,
}
}

Expand Down Expand Up @@ -241,7 +261,13 @@ func (rs *RethinkStore) Close() error {
return err
}
}

if rs.sharedMutexCancel != nil {
rs.sharedMutexCancel()
}

rs.log.Info("Rethinkstore disconnected")

return nil
}

Expand All @@ -251,6 +277,13 @@ func (rs *RethinkStore) Connect() error {
rs.dbsession = retryConnect(rs.log, []string{rs.dbhost}, rs.dbname, rs.dbuser, rs.dbpass)
rs.log.Info("Rethinkstore connected")
rs.session = rs.dbsession
rs.sharedMutexCtx, rs.sharedMutexCancel = context.WithCancel(context.Background())
var err error
rs.sharedMutex, err = newSharedMutex(rs.sharedMutexCtx, rs.log, rs.dbsession, newMutexOptCheckInterval(rs.sharedMutexCheckInterval))
if err != nil {
return err
}

return nil
}

Expand All @@ -262,8 +295,14 @@ func (rs *RethinkStore) Demote() error {
if err != nil {
return err
}

rs.dbsession = retryConnect(rs.log, []string{rs.dbhost}, rs.dbname, DemotedUser, rs.dbpass)
rs.session = rs.dbsession
rs.sharedMutexCtx, rs.sharedMutexCancel = context.WithCancel(context.Background())
rs.sharedMutex, err = newSharedMutex(rs.sharedMutexCtx, rs.log, rs.dbsession, newMutexOptCheckInterval(rs.sharedMutexCheckInterval))
if err != nil {
return err
}

rs.log.Info("rethinkstore connected with demoted user")
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log/slog"
"os"
"sort"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
Expand Down Expand Up @@ -45,12 +46,15 @@ func startRethinkInitialized() (container testcontainers.Container, ds *RethinkS
panic(err)
}

rs := New(slog.Default(), c.IP+":"+c.Port, c.DB, c.User, c.Password)
rs := New(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})), c.IP+":"+c.Port, c.DB, c.User, c.Password)

rs.VRFPoolRangeMin = 10000
rs.VRFPoolRangeMax = 10010
rs.ASNPoolRangeMin = 10000
rs.ASNPoolRangeMax = 10010

rs.sharedMutexCheckInterval = 3 * time.Second

err = rs.Connect()
if err != nil {
panic(err)
Expand Down
Loading

0 comments on commit 10c1fc1

Please sign in to comment.