Skip to content

Commit

Permalink
Feat: started implementing reentrancelock
Browse files Browse the repository at this point in the history
  • Loading branch information
hmoog committed Mar 20, 2023
1 parent bbab455 commit 91372a8
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"sync"

"github.com/iotaledger/goshimmer/packages/protocol/engine/ledger/mempool/newconflictdag/reentrantmutex"
"github.com/iotaledger/goshimmer/packages/protocol/engine/ledger/mempool/newconflictdag/weight"
"github.com/iotaledger/hive.go/ds/advancedset"
"github.com/iotaledger/hive.go/lo"
Expand All @@ -15,7 +16,7 @@ import (
type Conflict[ConflictID, ResourceID IDType] struct {
// PreferredInsteadUpdated is triggered whenever preferred conflict is updated. It carries two values:
// the new preferred conflict and a set of conflicts visited
PreferredInsteadUpdated *event.Event2[*Conflict[ConflictID, ResourceID], TriggerContext[ConflictID]]
PreferredInsteadUpdated *event.Event2[*Conflict[ConflictID, ResourceID], reentrantmutex.ThreadID]

id ConflictID
parents *advancedset.AdvancedSet[ConflictID]
Expand All @@ -30,7 +31,7 @@ type Conflict[ConflictID, ResourceID IDType] struct {

func New[ConflictID, ResourceID IDType](id ConflictID, parents *advancedset.AdvancedSet[ConflictID], conflictSets map[ResourceID]*Set[ConflictID, ResourceID], initialWeight *weight.Weight) *Conflict[ConflictID, ResourceID] {
c := &Conflict[ConflictID, ResourceID]{
PreferredInsteadUpdated: event.New2[*Conflict[ConflictID, ResourceID], TriggerContext[ConflictID]](),
PreferredInsteadUpdated: event.New2[*Conflict[ConflictID, ResourceID], reentrantmutex.ThreadID](),
id: id,
parents: parents,
children: advancedset.New[*Conflict[ConflictID, ResourceID]](),
Expand All @@ -39,9 +40,9 @@ func New[ConflictID, ResourceID IDType](id ConflictID, parents *advancedset.Adva
}

c.conflictingConflicts = NewSortedSet[ConflictID, ResourceID](c)
c.conflictingConflicts.HeaviestPreferredMemberUpdated.Hook(func(eventConflict *Conflict[ConflictID, ResourceID], visitedConflicts TriggerContext[ConflictID]) {
fmt.Println(c.ID(), "prefers", eventConflict.ID())
c.PreferredInsteadUpdated.Trigger(eventConflict, visitedConflicts)
c.conflictingConflicts.HeaviestPreferredMemberUpdated.Hook(func(eventConflict *Conflict[ConflictID, ResourceID], threadID reentrantmutex.ThreadID) {
fmt.Println(c.ID(), "prefers", eventConflict.ID(), threadID)
c.PreferredInsteadUpdated.Trigger(eventConflict, threadID)
})

// add existing conflicts first, so we can correctly determine the preferred instead flag
Expand Down Expand Up @@ -98,15 +99,12 @@ func (c *Conflict[ConflictID, ResourceID]) Compare(other *Conflict[ConflictID, R
return bytes.Compare(lo.PanicOnErr(c.id.Bytes()), lo.PanicOnErr(other.id.Bytes()))
}

func (c *Conflict[ConflictID, ResourceID]) PreferredInstead() *Conflict[ConflictID, ResourceID] {
c.mutex.RLock()
defer c.mutex.RUnlock()

return c.conflictingConflicts.HeaviestPreferredConflict()
func (c *Conflict[ConflictID, ResourceID]) PreferredInstead(optThreadID ...reentrantmutex.ThreadID) *Conflict[ConflictID, ResourceID] {
return c.conflictingConflicts.HeaviestPreferredConflict(optThreadID...)
}

func (c *Conflict[ConflictID, ResourceID]) IsPreferred() bool {
return c.PreferredInstead() == c
func (c *Conflict[ConflictID, ResourceID]) IsPreferred(optThreadID ...reentrantmutex.ThreadID) bool {
return c.PreferredInstead(optThreadID...) == c
}

func (c *Conflict[ConflictID, ResourceID]) String() string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ package conflict

import (
"fmt"
"math/rand"
"sync"
"sync/atomic"

"github.com/iotaledger/goshimmer/packages/protocol/engine/ledger/mempool/newconflictdag/reentrantmutex"
"github.com/iotaledger/goshimmer/packages/protocol/engine/ledger/mempool/newconflictdag/weight"
"github.com/iotaledger/hive.go/ds/shrinkingmap"
"github.com/iotaledger/hive.go/ds/types"
"github.com/iotaledger/hive.go/runtime/event"
"github.com/iotaledger/hive.go/runtime/syncutils"
"github.com/iotaledger/hive.go/stringify"
Expand All @@ -17,7 +16,7 @@ import (
// SortedSet is a set of Conflicts that is sorted by their weight.
type SortedSet[ConflictID, ResourceID IDType] struct {
// HeaviestPreferredMemberUpdated is triggered when the heaviest preferred member of the SortedSet changes.
HeaviestPreferredMemberUpdated *event.Event2[*Conflict[ConflictID, ResourceID], TriggerContext[ConflictID]]
HeaviestPreferredMemberUpdated *event.Event2[*Conflict[ConflictID, ResourceID], reentrantmutex.ThreadID]

// owner is the Conflict that owns this SortedSet.
owner *Conflict[ConflictID, ResourceID]
Expand Down Expand Up @@ -47,17 +46,18 @@ type SortedSet[ConflictID, ResourceID IDType] struct {
isShutdown atomic.Bool

// mutex is used to synchronize access to the SortedSet.
mutex sync.RWMutex
mutex *reentrantmutex.ReEntrantMutex
}

// NewSortedSet creates a new SortedSet that is owned by the given Conflict.
func NewSortedSet[ConflictID, ResourceID IDType](owner *Conflict[ConflictID, ResourceID]) *SortedSet[ConflictID, ResourceID] {
s := &SortedSet[ConflictID, ResourceID]{
HeaviestPreferredMemberUpdated: event.New2[*Conflict[ConflictID, ResourceID], TriggerContext[ConflictID]](),
HeaviestPreferredMemberUpdated: event.New2[*Conflict[ConflictID, ResourceID], reentrantmutex.ThreadID](),
owner: owner,
members: shrinkingmap.New[ConflictID, *sortedSetMember[ConflictID, ResourceID]](),
pendingWeightUpdates: shrinkingmap.New[ConflictID, *sortedSetMember[ConflictID, ResourceID]](),
pendingWeightUpdatesCounter: syncutils.NewCounter(),
mutex: reentrantmutex.New(owner.ID().String()),
}
s.pendingWeightUpdatesSignal = sync.NewCond(&s.pendingWeightUpdatesMutex)

Expand All @@ -70,9 +70,13 @@ func NewSortedSet[ConflictID, ResourceID IDType](owner *Conflict[ConflictID, Res
}

// Add adds the given Conflict to the SortedSet.
func (s *SortedSet[ConflictID, ResourceID]) Add(conflict *Conflict[ConflictID, ResourceID]) {
s.mutex.Lock()
defer s.mutex.Unlock()
func (s *SortedSet[ConflictID, ResourceID]) Add(conflict *Conflict[ConflictID, ResourceID], optThreadID ...reentrantmutex.ThreadID) {
if len(optThreadID) == 0 {
optThreadID = []reentrantmutex.ThreadID{reentrantmutex.NewThreadID()}
}

s.mutex.Lock(optThreadID[0])
defer s.mutex.UnLock(optThreadID[0])

newMember, isNew := s.members.GetOrCreate(conflict.id, func() *sortedSetMember[ConflictID, ResourceID] {
return newSortedSetMember[ConflictID, ResourceID](s, conflict)
Expand Down Expand Up @@ -119,17 +123,21 @@ func (s *SortedSet[ConflictID, ResourceID]) Add(conflict *Conflict[ConflictID, R
}
}

if conflict.IsPreferred() && newMember.Compare(s.heaviestPreferredMember) == weight.Heavier {
if conflict.IsPreferred(optThreadID[0]) && newMember.Compare(s.heaviestPreferredMember) == weight.Heavier {
s.heaviestPreferredMember = newMember

s.HeaviestPreferredMemberUpdated.Trigger(conflict, NewTriggerContext(conflict.ID()))
s.HeaviestPreferredMemberUpdated.Trigger(conflict, optThreadID[0])
}
}

// ForEach iterates over all Conflicts of the SortedSet and calls the given callback for each of them.
func (s *SortedSet[ConflictID, ResourceID]) ForEach(callback func(*Conflict[ConflictID, ResourceID]) error) error {
s.mutex.RLock()
defer s.mutex.RUnlock()
func (s *SortedSet[ConflictID, ResourceID]) ForEach(callback func(*Conflict[ConflictID, ResourceID]) error, optThreadID ...reentrantmutex.ThreadID) error {
if len(optThreadID) == 0 {
optThreadID = []reentrantmutex.ThreadID{reentrantmutex.NewThreadID()}
}

s.mutex.RLock(optThreadID[0])
defer s.mutex.RUnlock(optThreadID[0])

for currentMember := s.heaviestMember; currentMember != nil; currentMember = currentMember.lighterMember {
if err := callback(currentMember.Conflict); err != nil {
Expand All @@ -141,9 +149,13 @@ func (s *SortedSet[ConflictID, ResourceID]) ForEach(callback func(*Conflict[Conf
}

// HeaviestConflict returns the heaviest Conflict of the SortedSet.
func (s *SortedSet[ConflictID, ResourceID]) HeaviestConflict() *Conflict[ConflictID, ResourceID] {
s.mutex.RLock()
defer s.mutex.RUnlock()
func (s *SortedSet[ConflictID, ResourceID]) HeaviestConflict(optThreadID ...reentrantmutex.ThreadID) *Conflict[ConflictID, ResourceID] {
if len(optThreadID) == 0 {
optThreadID = []reentrantmutex.ThreadID{reentrantmutex.NewThreadID()}
}

s.mutex.RLock(optThreadID[0])
defer s.mutex.RUnlock(optThreadID[0])

if s.heaviestMember == nil {
return nil
Expand All @@ -153,14 +165,16 @@ func (s *SortedSet[ConflictID, ResourceID]) HeaviestConflict() *Conflict[Conflic
}

// HeaviestPreferredConflict returns the heaviest preferred Conflict of the SortedSet.
func (s *SortedSet[ConflictID, ResourceID]) HeaviestPreferredConflict() *Conflict[ConflictID, ResourceID] {
a := rand.Float64()
func (s *SortedSet[ConflictID, ResourceID]) HeaviestPreferredConflict(optThreadID ...reentrantmutex.ThreadID) *Conflict[ConflictID, ResourceID] {
if len(optThreadID) == 0 {
optThreadID = []reentrantmutex.ThreadID{reentrantmutex.NewThreadID()}
}

fmt.Println("HeaviestPreferreConflict", s.owner.ID(), a)
defer fmt.Println("unlocked HeaviestPreferreConflict", s.owner.ID(), a)
fmt.Println("HeaviestPreferreConflict", s.owner.ID(), optThreadID[0])
defer fmt.Println("unlocked HeaviestPreferreConflict", s.owner.ID(), optThreadID[0])

s.mutex.RLock()
defer s.mutex.RUnlock()
s.mutex.RLock(optThreadID[0])
defer s.mutex.RUnlock(optThreadID[0])

if s.heaviestPreferredMember == nil {
return nil
Expand Down Expand Up @@ -200,17 +214,17 @@ func (s *SortedSet[ConflictID, ResourceID]) notifyPendingWeightUpdate(member *so
}

// notifyPreferredInsteadUpdate notifies the SortedSet about a member that changed its preferred instead flag.
func (s *SortedSet[ConflictID, ResourceID]) notifyPreferredInsteadUpdate(member *sortedSetMember[ConflictID, ResourceID], preferred bool, visitedConflicts TriggerContext[ConflictID]) {
fmt.Println("Write-Lock", s.owner.ID(), "notifyPreferredInsteadUpdate(", member.ID(), ",", preferred, ",", visitedConflicts, ")")
defer fmt.Println("Write-Unlock", s.owner.ID(), "notifyPreferredInsteadUpdate(", member.ID(), ",", preferred, ",", visitedConflicts, ")")
func (s *SortedSet[ConflictID, ResourceID]) notifyPreferredInsteadUpdate(member *sortedSetMember[ConflictID, ResourceID], preferred bool, threadID reentrantmutex.ThreadID) {
fmt.Println("Write-Lock", s.owner.ID(), "notifyPreferredInsteadUpdate(", member.ID(), ",", preferred, ",", threadID, ")")
defer fmt.Println("Write-Unlock", s.owner.ID(), "notifyPreferredInsteadUpdate(", member.ID(), ",", preferred, ",", threadID, ")")

s.mutex.Lock()
defer s.mutex.Unlock()
s.mutex.Lock(threadID)
defer s.mutex.UnLock(threadID)

if preferred {
if member.Compare(s.heaviestPreferredMember) == weight.Heavier {
s.heaviestPreferredMember = member
s.HeaviestPreferredMemberUpdated.Trigger(member.Conflict, visitedConflicts)
s.HeaviestPreferredMemberUpdated.Trigger(member.Conflict, threadID)
}

return
Expand All @@ -221,12 +235,12 @@ func (s *SortedSet[ConflictID, ResourceID]) notifyPreferredInsteadUpdate(member
}

currentMember := member.lighterMember
for currentMember.Conflict != s.owner && !currentMember.IsPreferred() && currentMember.PreferredInstead() != member.Conflict {
for currentMember.Conflict != s.owner && !currentMember.IsPreferred(threadID) && currentMember.PreferredInstead(threadID) != member.Conflict {
currentMember = currentMember.lighterMember
}

s.heaviestPreferredMember = currentMember
s.HeaviestPreferredMemberUpdated.Trigger(currentMember.Conflict, visitedConflicts)
s.HeaviestPreferredMemberUpdated.Trigger(currentMember.Conflict, threadID)
}

// nextPendingWeightUpdate returns the next member that needs to be updated (or nil if the shutdown flag is set).
Expand Down Expand Up @@ -260,21 +274,24 @@ func (s *SortedSet[ConflictID, ResourceID]) fixMemberPositionWorker() {

// fixMemberPosition fixes the position of the given member in the SortedSet.
func (s *SortedSet[ConflictID, ResourceID]) fixMemberPosition(member *sortedSetMember[ConflictID, ResourceID]) {
threadID := reentrantmutex.NewThreadID()

fmt.Println("Write-Lock", s.owner.ID(), "fixMemberPosition(", member.ID(), ")")
defer fmt.Println("Write-Unlock", s.owner.ID(), "fixMemberPosition(", member.ID(), ")")

s.mutex.Lock()
defer s.mutex.Unlock()
s.mutex.Lock(threadID)
defer s.mutex.UnLock(threadID)

preferredMember := s.preferredInstead(member)
preferredMember := member.PreferredInstead(threadID)

// the member needs to be moved up in the list
for currentMember := member.heavierMember; currentMember != nil && currentMember.Compare(member) == weight.Lighter; currentMember = member.heavierMember {
s.swapNeighbors(member, currentMember)

if currentMember.ID() == preferredMember.ID() {
s.heaviestPreferredMember = member
s.HeaviestPreferredMemberUpdated.Trigger(member.Conflict, NewTriggerContext(s.owner.ID()))
fmt.Println("TRIGGER1", threadID)
s.HeaviestPreferredMemberUpdated.Trigger(member.Conflict, threadID)
}
}

Expand All @@ -283,27 +300,16 @@ func (s *SortedSet[ConflictID, ResourceID]) fixMemberPosition(member *sortedSetM
for currentMember := member.lighterMember; currentMember != nil && currentMember.Compare(member) == weight.Heavier; currentMember = member.lighterMember {
s.swapNeighbors(currentMember, member)

if memberIsHeaviestPreferred && s.isPreferred(currentMember) {
if memberIsHeaviestPreferred && currentMember.IsPreferred(threadID) {
s.heaviestPreferredMember = currentMember
s.HeaviestPreferredMemberUpdated.Trigger(currentMember.Conflict, TriggerContext[ConflictID]{s.owner.ID(): types.Void})
fmt.Println("TRIGGER2", threadID)
s.HeaviestPreferredMemberUpdated.Trigger(currentMember.Conflict, threadID)

memberIsHeaviestPreferred = false
}
}
}

func (s *SortedSet[ConflictID, ResourceID]) preferredInstead(member *sortedSetMember[ConflictID, ResourceID]) *Conflict[ConflictID, ResourceID] {
if member.Conflict == s.owner {
return s.heaviestPreferredMember.Conflict
}

return member.PreferredInstead()
}

func (s *SortedSet[ConflictID, ResourceID]) isPreferred(member *sortedSetMember[ConflictID, ResourceID]) bool {
return s.preferredInstead(member) == member.Conflict
}

// swapNeighbors swaps the given members in the SortedSet.
func (s *SortedSet[ConflictID, ResourceID]) swapNeighbors(heavierMember, lighterMember *sortedSetMember[ConflictID, ResourceID]) {
if heavierMember.lighterMember != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ package conflict

import (
"bytes"
"fmt"
"sync"

"github.com/iotaledger/goshimmer/packages/protocol/engine/ledger/mempool/newconflictdag/reentrantmutex"
"github.com/iotaledger/goshimmer/packages/protocol/engine/ledger/mempool/newconflictdag/weight"
"github.com/iotaledger/hive.go/ds/types"
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/hive.go/runtime/event"
)
Expand Down Expand Up @@ -35,7 +34,7 @@ type sortedSetMember[ConflictID, ResourceID IDType] struct {
onUpdateHook *event.Hook[func(weight.Value)]

// onPreferredUpdatedHook is the hook that is triggered when the preferredInstead value of the Conflict is updated.
onPreferredUpdatedHook *event.Hook[func(*Conflict[ConflictID, ResourceID], TriggerContext[ConflictID])]
onPreferredUpdatedHook *event.Hook[func(*Conflict[ConflictID, ResourceID], reentrantmutex.ThreadID)]

// Conflict is the wrapped Conflict.
*Conflict[ConflictID, ResourceID]
Expand All @@ -53,8 +52,8 @@ func newSortedSetMember[ConflictID, ResourceID IDType](set *SortedSet[ConflictID

// do not attach to event from ourselves
if set.owner != conflict {
s.onPreferredUpdatedHook = conflict.PreferredInsteadUpdated.Hook(func(newPreferredConflict *Conflict[ConflictID, ResourceID], visitedConflicts TriggerContext[ConflictID]) {
s.notifyPreferredInsteadUpdate(newPreferredConflict, visitedConflicts)
s.onPreferredUpdatedHook = conflict.PreferredInsteadUpdated.Hook(func(newPreferredConflict *Conflict[ConflictID, ResourceID], threadID reentrantmutex.ThreadID) {
s.notifyPreferredInsteadUpdate(newPreferredConflict, threadID)
})
}

Expand Down Expand Up @@ -113,13 +112,6 @@ func (s *sortedSetMember[ConflictID, ResourceID]) weightUpdateApplied() bool {
}

// notifyPreferredInsteadUpdate notifies the sortedSet that the preferred instead flag of the Conflict was updated.
func (s *sortedSetMember[ConflictID, ResourceID]) notifyPreferredInsteadUpdate(newPreferredConflict *Conflict[ConflictID, ResourceID], visitedConflicts TriggerContext[ConflictID]) {
if _, exists := visitedConflicts[s.sortedSet.owner.ID()]; !exists {
visitedConflicts[s.ID()] = types.Void
fmt.Println("notify", s.sortedSet.owner.ID(), "that", s.ID(), "prefers", newPreferredConflict.ID(), "with visited conflicts", visitedConflicts)

s.sortedSet.notifyPreferredInsteadUpdate(s, newPreferredConflict == s.Conflict, visitedConflicts)
} else {
fmt.Println("do not notify", s.sortedSet.owner.ID(), "that", s.ID(), "prefers", newPreferredConflict.ID(), "with visited conflicts", visitedConflicts)
}
func (s *sortedSetMember[ConflictID, ResourceID]) notifyPreferredInsteadUpdate(newPreferredConflict *Conflict[ConflictID, ResourceID], threadID reentrantmutex.ThreadID) {
s.sortedSet.notifyPreferredInsteadUpdate(s, newPreferredConflict == s.Conflict, threadID)
}
Loading

0 comments on commit 91372a8

Please sign in to comment.