diff --git a/cmd/metal-api/internal/datastore/machine.go b/cmd/metal-api/internal/datastore/machine.go index f5dacd459..b1071a8ec 100644 --- a/cmd/metal-api/internal/datastore/machine.go +++ b/cmd/metal-api/internal/datastore/machine.go @@ -1,10 +1,12 @@ package datastore import ( + "context" "errors" "fmt" "math" "math/rand/v2" + "time" "github.com/metal-stack/metal-api/cmd/metal-api/internal/metal" "golang.org/x/exp/slices" @@ -427,7 +429,7 @@ func (rs *RethinkStore) UpdateMachine(oldMachine *metal.Machine, newMachine *met // FindWaitingMachine returns an available, not allocated, waiting and alive machine of given size within the given partition. // TODO: the algorithm can be optimized / shortened by using a rethinkdb join command and then using .Sample(1) // but current implementation should have a slightly better readability. -func (rs *RethinkStore) FindWaitingMachine(projectid, partitionid string, size metal.Size, placementTags []string) (*metal.Machine, error) { +func (rs *RethinkStore) FindWaitingMachine(ctx context.Context, projectid, partitionid string, size metal.Size, placementTags []string) (*metal.Machine, error) { q := *rs.machineTable() q = q.Filter(map[string]interface{}{ "allocation": nil, @@ -440,6 +442,11 @@ func (rs *RethinkStore) FindWaitingMachine(projectid, partitionid string, size m "preallocated": false, }) + if err := rs.sharedMutex.lock(ctx, partitionid, 10*time.Second); err != nil { + return nil, fmt.Errorf("too many parallel machine allocations taking place, try again later") + } + defer rs.sharedMutex.unlock(ctx, partitionid) + var candidates metal.Machines err := rs.searchEntities(&q, &candidates) if err != nil { @@ -635,7 +642,9 @@ func randomIndex(max int) int { if max <= 0 { return 0 } - return rand.N(max) + // golangci-lint has an issue with math/rand/v2 + // here it provides sufficient randomness though because it's not used for cryptographic purposes + return rand.N(max) //nolint:gosec } func intersect[T comparable](a, b []T) []T { diff --git a/cmd/metal-api/internal/datastore/machine_integration_test.go b/cmd/metal-api/internal/datastore/machine_integration_test.go index 4c49f4174..d15987c3a 100644 --- a/cmd/metal-api/internal/datastore/machine_integration_test.go +++ b/cmd/metal-api/internal/datastore/machine_integration_test.go @@ -4,11 +4,19 @@ package datastore import ( + "context" + "log/slog" + "strconv" + "strings" + "sync" "testing" + "time" "github.com/metal-stack/metal-api/cmd/metal-api/internal/metal" "github.com/metal-stack/metal-lib/pkg/pointer" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + r "gopkg.in/rethinkdb/rethinkdb-go.v6" ) type machineTestable struct{} @@ -950,3 +958,110 @@ func TestRethinkStore_UpdateMachine(t *testing.T) { tests[i].run(t, tt) } } + +func Test_FindWaitingMachine_NoConcurrentModificationErrors(t *testing.T) { + + var ( + root = slog.Default() + wg sync.WaitGroup + size = metal.Size{Base: metal.Base{ID: "1"}} + count int + ) + + for _, initEntity := range []struct { + entity metal.Entity + table *r.Term + }{ + { + table: sharedDS.machineTable(), + entity: &metal.Machine{ + Base: metal.Base{ + ID: "1", + }, + PartitionID: "partition", + SizeID: size.ID, + State: metal.MachineState{ + Value: metal.AvailableState, + }, + Waiting: true, + PreAllocated: false, + }, + }, + { + table: sharedDS.eventTable(), + entity: &metal.ProvisioningEventContainer{ + Base: metal.Base{ + ID: "1", + }, + Liveliness: metal.MachineLivelinessAlive, + }, + }, + } { + initEntity := initEntity + + err := sharedDS.createEntity(initEntity.table, initEntity.entity) + require.NoError(t, err) + + defer func() { + _, err := initEntity.table.Delete().RunWrite(sharedDS.session) + require.NoError(t, err) + }() + } + + for i := 0; i < 100; i++ { + i := i + wg.Add(1) + + log := root.With("worker", i) + + go func() { + defer wg.Done() + + for { + machine, err := sharedDS.FindWaitingMachine(context.Background(), "project", "partition", size, nil) + if err != nil { + if metal.IsConflict(err) { + t.Errorf("concurrent modification occurred, shared mutex is not working") + break + } + + if strings.Contains(err.Error(), "no machine available") { + continue + } + + if strings.Contains(err.Error(), "too many parallel") { + time.Sleep(10 * time.Millisecond) + continue + } + + t.Errorf("unexpected error occurred: %s", err) + continue + } + + log.Info("waiting machine found") + + newMachine := *machine + newMachine.PreAllocated = false + if newMachine.Name == "" { + newMachine.Name = strconv.Itoa(0) + } + + assert.Equal(t, strconv.Itoa(count), newMachine.Name, "concurrency occurred") + count++ + newMachine.Name = strconv.Itoa(count) + + err = sharedDS.updateEntity(sharedDS.machineTable(), &newMachine, machine) + if err != nil { + log.Error("unable to toggle back pre-allocation flag", "error", err) + t.Fail() + } + + return + } + }() + } + + wg.Wait() + + assert.Equal(t, 100, count) +} diff --git a/cmd/metal-api/internal/datastore/rethinkdb.go b/cmd/metal-api/internal/datastore/rethinkdb.go index 00ce05ca9..82512f81e 100644 --- a/cmd/metal-api/internal/datastore/rethinkdb.go +++ b/cmd/metal-api/internal/datastore/rethinkdb.go @@ -1,6 +1,7 @@ package datastore import ( + "context" "fmt" "log/slog" "reflect" @@ -18,9 +19,21 @@ const ( ) var tables = []string{ - "image", "size", "partition", "machine", "switch", "switchstatus", "event", "network", "ip", "migration", "filesystemlayout", "sizeimageconstraint", - VRFIntegerPool.String(), VRFIntegerPool.String() + "info", ASNIntegerPool.String(), ASNIntegerPool.String() + "info", + "event", + "filesystemlayout", + "image", + "ip", + "machine", + "migration", + "network", + "partition", + "sharedmutex", + "size", + "sizeimageconstraint", + "switch", + "switchstatus", + VRFIntegerPool.String(), VRFIntegerPool.String() + "info", } // A RethinkStore is the database access layer for rethinkdb. @@ -40,6 +53,11 @@ type RethinkStore struct { VRFPoolRangeMax uint ASNPoolRangeMin uint ASNPoolRangeMax uint + + sharedMutexCtx context.Context + sharedMutexCancel context.CancelFunc + sharedMutex *sharedMutex + sharedMutexCheckInterval time.Duration } // New creates a new rethink store. @@ -55,6 +73,8 @@ func New(log *slog.Logger, dbhost string, dbname string, dbuser string, dbpass s VRFPoolRangeMax: DefaultVRFPoolRangeMax, ASNPoolRangeMin: DefaultASNPoolRangeMin, ASNPoolRangeMax: DefaultASNPoolRangeMax, + + sharedMutexCheckInterval: defaultSharedMutexCheckInterval, } } @@ -241,7 +261,13 @@ func (rs *RethinkStore) Close() error { return err } } + + if rs.sharedMutexCancel != nil { + rs.sharedMutexCancel() + } + rs.log.Info("Rethinkstore disconnected") + return nil } @@ -251,6 +277,13 @@ func (rs *RethinkStore) Connect() error { rs.dbsession = retryConnect(rs.log, []string{rs.dbhost}, rs.dbname, rs.dbuser, rs.dbpass) rs.log.Info("Rethinkstore connected") rs.session = rs.dbsession + rs.sharedMutexCtx, rs.sharedMutexCancel = context.WithCancel(context.Background()) + var err error + rs.sharedMutex, err = newSharedMutex(rs.sharedMutexCtx, rs.log, rs.dbsession, newMutexOptCheckInterval(rs.sharedMutexCheckInterval)) + if err != nil { + return err + } + return nil } @@ -262,8 +295,14 @@ func (rs *RethinkStore) Demote() error { if err != nil { return err } + rs.dbsession = retryConnect(rs.log, []string{rs.dbhost}, rs.dbname, DemotedUser, rs.dbpass) rs.session = rs.dbsession + rs.sharedMutexCtx, rs.sharedMutexCancel = context.WithCancel(context.Background()) + rs.sharedMutex, err = newSharedMutex(rs.sharedMutexCtx, rs.log, rs.dbsession, newMutexOptCheckInterval(rs.sharedMutexCheckInterval)) + if err != nil { + return err + } rs.log.Info("rethinkstore connected with demoted user") return nil diff --git a/cmd/metal-api/internal/datastore/rethinkdb_integration_test.go b/cmd/metal-api/internal/datastore/rethinkdb_integration_test.go index b04ccb830..655abfacb 100644 --- a/cmd/metal-api/internal/datastore/rethinkdb_integration_test.go +++ b/cmd/metal-api/internal/datastore/rethinkdb_integration_test.go @@ -8,6 +8,7 @@ import ( "log/slog" "os" "sort" + "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" @@ -45,12 +46,15 @@ func startRethinkInitialized() (container testcontainers.Container, ds *RethinkS panic(err) } - rs := New(slog.Default(), c.IP+":"+c.Port, c.DB, c.User, c.Password) + rs := New(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})), c.IP+":"+c.Port, c.DB, c.User, c.Password) + rs.VRFPoolRangeMin = 10000 rs.VRFPoolRangeMax = 10010 rs.ASNPoolRangeMin = 10000 rs.ASNPoolRangeMax = 10010 + rs.sharedMutexCheckInterval = 3 * time.Second + err = rs.Connect() if err != nil { panic(err) diff --git a/cmd/metal-api/internal/datastore/rethinkdb_test.go b/cmd/metal-api/internal/datastore/rethinkdb_test.go index bff2c2011..8a12407fc 100644 --- a/cmd/metal-api/internal/datastore/rethinkdb_test.go +++ b/cmd/metal-api/internal/datastore/rethinkdb_test.go @@ -2,11 +2,12 @@ package datastore import ( "log/slog" - "reflect" "testing" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/metal-stack/metal-api/cmd/metal-api/internal/testdata" - r "gopkg.in/rethinkdb/rethinkdb-go.v6" + "github.com/metal-stack/metal-lib/pkg/testcommon" ) func TestNew(t *testing.T) { @@ -44,41 +45,17 @@ func TestNew(t *testing.T) { VRFPoolRangeMax: DefaultVRFPoolRangeMax, ASNPoolRangeMin: DefaultASNPoolRangeMin, ASNPoolRangeMax: DefaultASNPoolRangeMax, - }, - }, - } - for i := range tests { - tt := tests[i] - t.Run(tt.name, func(t *testing.T) { - if got := New(tt.args.log, tt.args.dbhost, tt.args.dbname, tt.args.dbuser, tt.args.dbpass); !reflect.DeepEqual(got, tt.want) { - t.Errorf("New() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestRethinkStore_db(t *testing.T) { - ds, mock := InitMockDB(t) - testdata.InitMockDBData(mock) - - theDBTerm := r.DB("mockdb") - tests := []struct { - name string - rs *RethinkStore - want *r.Term - }{ - { - name: "TestRethinkStore_db Test 1", - rs: ds, - want: &theDBTerm, + sharedMutexCheckInterval: defaultSharedMutexCheckInterval, + }, }, } for i := range tests { tt := tests[i] t.Run(tt.name, func(t *testing.T) { - if got := tt.rs.db(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("RethinkStore.db() = %v, want %v", got, tt.want) + got := New(tt.args.log, tt.args.dbhost, tt.args.dbname, tt.args.dbuser, tt.args.dbpass) + if diff := cmp.Diff(got, tt.want, testcommon.IgnoreUnexported(), cmpopts.IgnoreTypes(slog.Logger{})); diff != "" { + t.Errorf("New() mismatch (-want +got):\n%s", diff) } }) } @@ -108,166 +85,3 @@ func TestRethinkStore_Close(t *testing.T) { }) } } - -func Test_connect(t *testing.T) { - type args struct { - hosts []string - dbname string - user string - pwd string - } - tests := []struct { - name string - args args - want *r.Term - wantErr bool - }{} - for i := range tests { - tt := tests[i] - t.Run(tt.name, func(t *testing.T) { - got, err := connect(tt.args.hosts, tt.args.dbname, tt.args.user, tt.args.pwd) - if (err != nil) != tt.wantErr { - t.Errorf("connect() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("connect() got = %v, want %v", got, tt.want) - } - }) - } -} - -func TestRethinkStore_sizeTable(t *testing.T) { - ds, mock := InitMockDB(t) - testdata.InitMockDBData(mock) - - theWantedTerm := r.DB("mockdb").Table("size") - - tests := []struct { - name string - rs *RethinkStore - want *r.Term - }{ - { - name: "TestRethinkStore_sizeTable Test 1", - rs: ds, - want: &theWantedTerm, - }, - } - for i := range tests { - tt := tests[i] - t.Run(tt.name, func(t *testing.T) { - if got := tt.rs.sizeTable(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("RethinkStore.sizeTable() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestRethinkStore_imageTable(t *testing.T) { - ds, mock := InitMockDB(t) - testdata.InitMockDBData(mock) - - theWantedTerm := r.DB("mockdb").Table("image") - - tests := []struct { - name string - rs *RethinkStore - want *r.Term - }{ - { - name: "TestRethinkStore_imageTable Test 1", - rs: ds, - want: &theWantedTerm, - }, - } - for i := range tests { - tt := tests[i] - t.Run(tt.name, func(t *testing.T) { - if got := tt.rs.imageTable(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("RethinkStore.imageTable() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestRethinkStore_partitionTable(t *testing.T) { - ds, mock := InitMockDB(t) - testdata.InitMockDBData(mock) - - theWantedTerm := r.DB("mockdb").Table("partition") - - tests := []struct { - name string - rs *RethinkStore - want *r.Term - }{ - { - name: "TestRethinkStore_partitionTable Test 1", - rs: ds, - want: &theWantedTerm, - }, - } - for i := range tests { - tt := tests[i] - t.Run(tt.name, func(t *testing.T) { - if got := tt.rs.partitionTable(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("RethinkStore.partitionTable() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestRethinkStore_machineTable(t *testing.T) { - ds, mock := InitMockDB(t) - testdata.InitMockDBData(mock) - - theWantedTerm := r.DB("mockdb").Table("machine") - - tests := []struct { - name string - rs *RethinkStore - want *r.Term - }{ - { - name: "Test 1", - rs: ds, - want: &theWantedTerm, - }, - } - for i := range tests { - tt := tests[i] - t.Run(tt.name, func(t *testing.T) { - if got := tt.rs.machineTable(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("RethinkStore.machineTable() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestRethinkStore_switchTable(t *testing.T) { - ds, mock := InitMockDB(t) - testdata.InitMockDBData(mock) - - theWantedTerm := r.DB("mockdb").Table("switch") - - tests := []struct { - name string - rs *RethinkStore - want *r.Term - }{ - { - name: "TestRethinkStore_switchTable Test 1", - rs: ds, - want: &theWantedTerm, - }, - } - for i := range tests { - tt := tests[i] - t.Run(tt.name, func(t *testing.T) { - if got := tt.rs.switchTable(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("RethinkStore.switchTable() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/cmd/metal-api/internal/datastore/shared_mutex.go b/cmd/metal-api/internal/datastore/shared_mutex.go new file mode 100644 index 000000000..47007d1ec --- /dev/null +++ b/cmd/metal-api/internal/datastore/shared_mutex.go @@ -0,0 +1,194 @@ +package datastore + +import ( + "context" + "fmt" + "log/slog" + "time" + + r "gopkg.in/rethinkdb/rethinkdb-go.v6" +) + +// sharedMutex constructs a mutex using the rethinkdb to guarantee atomic operations. +// this can be helpful because in RethinkDB there are no transactions but sometimes you want +// to prevent concurrency issues over multiple metal-api replicas. +// the performance of this is remarkably worse than running code without this mutex, so +// only make use of this when it really makes sense. +type sharedMutex struct { + session r.QueryExecutor + table r.Term + checkinterval time.Duration + log *slog.Logger +} + +type sharedMutexDoc struct { + ID string `rethinkdb:"id"` + LockedAt time.Time `rethinkdb:"locked_at"` + ExpiresAt time.Time `rethinkdb:"expires_at"` +} + +const ( + // defaultSharedMutexCheckInterval is the interval in which it checked whether mutexes have expired. if they have expired, they will be released. + // this is a safety mechanism in case a mutex was forgotten to be released to prevent the whole machinery to lock up forever. + defaultSharedMutexCheckInterval = 30 * time.Second + // defaultSharedMutexAcquireTimeout defines a timeout for the context for the acquisition of the mutex. + defaultSharedMutexAcquireTimeout = 10 * time.Second +) + +type mutexOpt any + +type mutexOptCheckInterval struct { + timeout time.Duration +} + +func newMutexOptCheckInterval(t time.Duration) *mutexOptCheckInterval { + return &mutexOptCheckInterval{timeout: t} +} + +func newSharedMutex(ctx context.Context, log *slog.Logger, session r.QueryExecutor, opts ...mutexOpt) (*sharedMutex, error) { + table := r.Table("sharedmutex") + timeout := defaultSharedMutexCheckInterval + + for _, opt := range opts { + switch o := opt.(type) { + case *mutexOptCheckInterval: + timeout = o.timeout + default: + return nil, fmt.Errorf("unknown option: %T", opt) + } + } + + m := &sharedMutex{ + log: log, + session: session, + table: table, + checkinterval: timeout, + } + + go m.expireloop(ctx) + + return m, nil +} + +type lockOpt any + +type lockOptAcquireTimeout struct { + timeout time.Duration +} + +func newLockOptAcquireTimeout(t time.Duration) *lockOptAcquireTimeout { + return &lockOptAcquireTimeout{timeout: t} +} + +func (m *sharedMutex) lock(ctx context.Context, key string, expiration time.Duration, opts ...lockOpt) error { + timeout := defaultSharedMutexAcquireTimeout + for _, opt := range opts { + switch o := opt.(type) { + case *lockOptAcquireTimeout: + timeout = o.timeout + default: + return fmt.Errorf("unknown option: %T", opt) + } + } + + _, err := m.table.Insert(m.newMutexDoc(key, expiration), r.InsertOpts{ + Conflict: "error", + Durability: "soft", + ReturnChanges: "always", + }).RunWrite(m.session, r.RunOpts{Context: ctx}) + if err == nil { + m.log.Debug("mutex acquired", "key", key) + return nil + } + + if !r.IsConflictErr(err) { + return err + } + + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + m.log.Debug("mutex is already locked, listening for changes", "key", key) + + cursor, err := m.table.Get(key).Changes(r.ChangesOpts{ + Squash: false, + }).Run(m.session, r.RunOpts{ + Context: timeoutCtx, + }) + if err != nil { + return err + } + + changes := make(chan r.ChangeResponse) + cursor.Listen(changes) + + for { + select { + case change := <-changes: + m.log.Debug("document change received", "key", key) + + if change.NewValue != nil { + m.log.Debug("mutex was not yet released", "key", key) + continue + } + + _, err = m.table.Insert(m.newMutexDoc(key, expiration), r.InsertOpts{ + Conflict: "error", + Durability: "soft", + }).RunWrite(m.session, r.RunOpts{Context: timeoutCtx}) + if err != nil && r.IsConflictErr(err) { + continue + } + if err != nil { + return err + } + + m.log.Debug("mutex acquired after waiting", "key", key) + + return nil + case <-timeoutCtx.Done(): + return fmt.Errorf("unable to acquire mutex: %s", key) + } + } +} + +func (m *sharedMutex) unlock(ctx context.Context, key string) { + _, err := m.table.Get(key).Delete().RunWrite(m.session, r.RunOpts{Context: ctx}) + if err != nil { + m.log.Error("unable to release shared mutex", "key", key, "error", err) + } +} + +func (m *sharedMutex) newMutexDoc(key string, expiration time.Duration) *sharedMutexDoc { + now := time.Now() + return &sharedMutexDoc{ + ID: key, + LockedAt: now, + ExpiresAt: now.Add(expiration), + } +} + +func (m *sharedMutex) expireloop(ctx context.Context) { + ticker := time.NewTicker(m.checkinterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + m.log.Debug("checking for expired mutexes") + + resp, err := m.table.Filter(func(row r.Term) r.Term { + return row.Field("expires_at").Lt(time.Now()) + }).Delete().RunWrite(m.session, r.RunOpts{Context: ctx}) + if err != nil { + m.log.Error("unable to release shared mutexes", "error", err) + continue + } + + m.log.Debug("searched for expiring mutexes in database", "deletion-count", resp.Deleted) + case <-ctx.Done(): + m.log.Info("stopped shared mutex expiration loop") + return + } + } +} diff --git a/cmd/metal-api/internal/datastore/shared_mutex_test.go b/cmd/metal-api/internal/datastore/shared_mutex_test.go new file mode 100644 index 000000000..888685e61 --- /dev/null +++ b/cmd/metal-api/internal/datastore/shared_mutex_test.go @@ -0,0 +1,131 @@ +//go:build integration +// +build integration + +package datastore + +import ( + "context" + "log/slog" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + r "gopkg.in/rethinkdb/rethinkdb-go.v6" +) + +func Test_sharedMutex_reallyLocking(t *testing.T) { + defer mutexCleanup(t) + ctx := context.Background() + expiration := 10 * time.Second + + err := sharedDS.sharedMutex.lock(ctx, "test", expiration, newLockOptAcquireTimeout(10*time.Millisecond)) + require.NoError(t, err) + + err = sharedDS.sharedMutex.lock(ctx, "test", expiration, newLockOptAcquireTimeout(5*time.Millisecond)) + require.Error(t, err) + require.ErrorContains(t, err, "unable to acquire mutex") + + err = sharedDS.sharedMutex.lock(ctx, "test2", expiration, newLockOptAcquireTimeout(10*time.Millisecond)) + require.NoError(t, err) + + err = sharedDS.sharedMutex.lock(ctx, "test", expiration, newLockOptAcquireTimeout(10*time.Millisecond)) + require.Error(t, err) + require.ErrorContains(t, err, "unable to acquire mutex") + + sharedDS.sharedMutex.unlock(ctx, "test") + + err = sharedDS.sharedMutex.lock(ctx, "test2", expiration, newLockOptAcquireTimeout(10*time.Millisecond)) + require.Error(t, err) + require.ErrorContains(t, err, "unable to acquire mutex") + + err = sharedDS.sharedMutex.lock(ctx, "test", expiration, newLockOptAcquireTimeout(10*time.Millisecond)) + require.NoError(t, err) +} + +func Test_sharedMutex_acquireAfterRelease(t *testing.T) { + defer mutexCleanup(t) + ctx := context.Background() + + err := sharedDS.sharedMutex.lock(ctx, "test", 3*time.Second, newLockOptAcquireTimeout(10*time.Millisecond)) + require.NoError(t, err) + + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + + err = sharedDS.sharedMutex.lock(ctx, "test", 1*time.Second, newLockOptAcquireTimeout(3*time.Second)) + assert.NoError(t, err) + }() + + time.Sleep(1 * time.Second) + + sharedDS.sharedMutex.unlock(ctx, "test") + + wg.Wait() +} + +func Test_sharedMutex_expires(t *testing.T) { + defer mutexCleanup(t) + ctx := context.Background() + + err := sharedDS.sharedMutex.lock(ctx, "test", 2*time.Second, newLockOptAcquireTimeout(10*time.Millisecond)) + require.NoError(t, err) + + err = sharedDS.sharedMutex.lock(ctx, "test", 2*time.Second, newLockOptAcquireTimeout(10*time.Millisecond)) + require.Error(t, err) + require.ErrorContains(t, err, "unable to acquire mutex") + + done := make(chan bool) + go func() { + err = sharedDS.sharedMutex.lock(ctx, "test", 2*time.Second, newLockOptAcquireTimeout(2*sharedDS.sharedMutex.checkinterval)) + if err != nil { + t.Errorf("mutex was not acquired: %s", err) + } + done <- true + }() + + timeoutCtx, cancel := context.WithTimeout(context.Background(), 2*sharedDS.sharedMutex.checkinterval) + defer cancel() + + select { + case <-done: + case <-timeoutCtx.Done(): + t.Errorf("shared mutex has not expired") + } +} + +func Test_sharedMutex_stop(t *testing.T) { + defer mutexCleanup(t) + ctx, cancel := context.WithCancel(context.Background()) + + mutex, err := newSharedMutex(context.Background(), slog.Default(), sharedDS.dbsession) + require.NoError(t, err) + + done := make(chan bool) + + go func() { + mutex.expireloop(ctx) + done <- true + }() + + cancel() + + timeoutCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + select { + case <-done: + case <-timeoutCtx.Done(): + t.Errorf("shared mutex expiration did not stop") + } +} + +func mutexCleanup(t *testing.T) { + _, err := r.Table("sharedmutex").Delete().RunWrite(sharedDS.dbsession) + require.NoError(t, err) +} diff --git a/cmd/metal-api/internal/grpc/boot-service-wait_integration_test.go b/cmd/metal-api/internal/grpc/boot-service-wait_integration_test.go index 3eab7fa00..69ead6cba 100644 --- a/cmd/metal-api/internal/grpc/boot-service-wait_integration_test.go +++ b/cmd/metal-api/internal/grpc/boot-service-wait_integration_test.go @@ -68,9 +68,9 @@ func TestWaitServer(t *testing.T) { mm := [][]int{{10, 7}} for _, a := range aa { for _, m := range mm { - require.Greater(t, a, 0) - require.Greater(t, m[0], 0) - require.Greater(t, m[1], 0) + require.Positive(t, a) + require.Positive(t, m[0]) + require.Positive(t, m[1]) require.GreaterOrEqual(t, m[0], m[1]) tt = append(tt, &test{ numberApiInstances: a, @@ -253,7 +253,9 @@ func (t *test) startMachineInstances() { } for i := range t.numberMachineInstances { machineID := strconv.Itoa(i) - port := 50005 + rand.N(t.numberApiInstances) + // golangci-lint has an issue with math/rand/v2 + // here it provides sufficient randomness though because it's not used for cryptographic purposes + port := 50005 + rand.N(t.numberApiInstances) //nolint:gosec ctx, cancel := context.WithCancel(context.Background()) conn, err := grpc.DialContext(ctx, fmt.Sprintf("localhost:%d", port), opts...) require.NoError(t, err) @@ -327,7 +329,9 @@ func (t *test) allocateMachines() { } func (t *test) selectMachine(except []string) string { - machineID := strconv.Itoa(rand.N(t.numberMachineInstances)) + // golangci-lint has an issue with math/rand/v2 + // here it provides sufficient randomness though because it's not used for cryptographic purposes + machineID := strconv.Itoa(rand.N(t.numberMachineInstances)) //nolint:gosec for _, id := range except { if id == machineID { return t.selectMachine(except) diff --git a/cmd/metal-api/internal/service/firewall-service.go b/cmd/metal-api/internal/service/firewall-service.go index 757cdd6de..50b8d06ce 100644 --- a/cmd/metal-api/internal/service/firewall-service.go +++ b/cmd/metal-api/internal/service/firewall-service.go @@ -216,7 +216,7 @@ func (r *firewallResource) allocateFirewall(request *restful.Request, response * return } - m, err := allocateMachine(r.logger(request), r.ds, r.ipamer, spec, r.mdc, r.actor, r.Publisher) + m, err := allocateMachine(request.Request.Context(), r.logger(request), r.ds, r.ipamer, spec, r.mdc, r.actor, r.Publisher) if err != nil { r.sendError(request, response, defaultError(err)) return diff --git a/cmd/metal-api/internal/service/machine-service.go b/cmd/metal-api/internal/service/machine-service.go index 52357a920..da902c83b 100644 --- a/cmd/metal-api/internal/service/machine-service.go +++ b/cmd/metal-api/internal/service/machine-service.go @@ -984,7 +984,7 @@ func (r *machineResource) allocateMachine(request *restful.Request, response *re return } - m, err := allocateMachine(r.logger(request), r.ds, r.ipamer, spec, r.mdc, r.actor, r.Publisher) + m, err := allocateMachine(request.Request.Context(), r.logger(request), r.ds, r.ipamer, spec, r.mdc, r.actor, r.Publisher) if err != nil { r.sendError(request, response, defaultError(err)) return @@ -1157,7 +1157,7 @@ func createMachineAllocationSpec(ds *datastore.RethinkStore, machineRequest v1.M }, nil } -func allocateMachine(logger *slog.Logger, ds *datastore.RethinkStore, ipamer ipam.IPAMer, allocationSpec *machineAllocationSpec, mdc mdm.Client, actor *asyncActor, publisher bus.Publisher) (*metal.Machine, error) { +func allocateMachine(ctx context.Context, logger *slog.Logger, ds *datastore.RethinkStore, ipamer ipam.IPAMer, allocationSpec *machineAllocationSpec, mdc mdm.Client, actor *asyncActor, publisher bus.Publisher) (*metal.Machine, error) { err := validateAllocationSpec(allocationSpec) if err != nil { return nil, err @@ -1169,7 +1169,7 @@ func allocateMachine(logger *slog.Logger, ds *datastore.RethinkStore, ipamer ipa } projectID := allocationSpec.ProjectID - p, err := mdc.Project().Get(context.Background(), &mdmv1.ProjectGetRequest{Id: projectID}) + p, err := mdc.Project().Get(ctx, &mdmv1.ProjectGetRequest{Id: projectID}) if err != nil { return nil, err } @@ -1206,20 +1206,7 @@ func allocateMachine(logger *slog.Logger, ds *datastore.RethinkStore, ipamer ipa } } - var machineCandidate *metal.Machine - err = retry.Do( - func() error { - var err2 error - machineCandidate, err2 = findMachineCandidate(ds, allocationSpec) - return err2 - }, - retry.Attempts(10), - retry.RetryIf(func(err error) bool { - return metal.IsConflict(err) - }), - retry.DelayType(retry.CombineDelay(retry.BackOffDelay, retry.RandomDelay)), - retry.LastErrorOnly(true), - ) + machineCandidate, err := findMachineCandidate(ctx, ds, allocationSpec) if err != nil { return nil, err } @@ -1361,12 +1348,12 @@ func validateAllocationSpec(allocationSpec *machineAllocationSpec) error { return nil } -func findMachineCandidate(ds *datastore.RethinkStore, allocationSpec *machineAllocationSpec) (*metal.Machine, error) { +func findMachineCandidate(ctx context.Context, ds *datastore.RethinkStore, allocationSpec *machineAllocationSpec) (*metal.Machine, error) { var err error var machine *metal.Machine if allocationSpec.Machine == nil { // requesting allocation of an arbitrary ready machine in partition with given size - machine, err = findWaitingMachine(ds, allocationSpec) + machine, err = findWaitingMachine(ctx, ds, allocationSpec) if err != nil { return nil, err } @@ -1387,7 +1374,7 @@ func findMachineCandidate(ds *datastore.RethinkStore, allocationSpec *machineAll return machine, err } -func findWaitingMachine(ds *datastore.RethinkStore, allocationSpec *machineAllocationSpec) (*metal.Machine, error) { +func findWaitingMachine(ctx context.Context, ds *datastore.RethinkStore, allocationSpec *machineAllocationSpec) (*metal.Machine, error) { size, err := ds.FindSize(allocationSpec.Size.ID) if err != nil { return nil, fmt.Errorf("size cannot be found: %w", err) @@ -1397,7 +1384,7 @@ func findWaitingMachine(ds *datastore.RethinkStore, allocationSpec *machineAlloc return nil, fmt.Errorf("partition cannot be found: %w", err) } - machine, err := ds.FindWaitingMachine(allocationSpec.ProjectID, partition.ID, *size, allocationSpec.PlacementTags) + machine, err := ds.FindWaitingMachine(ctx, allocationSpec.ProjectID, partition.ID, *size, allocationSpec.PlacementTags) if err != nil { return nil, err }