Skip to content

Commit

Permalink
[YUNIKORN-2329] Ensure RMProxy events are processed in-order (apache#776
Browse files Browse the repository at this point in the history
)

Remove the unnecessary goroutine invocations in the UpdateApplication(),
UpdateAllcoation() and UpdateNode() functions, ensuring that events are
passed to the scheduler's HandleEvent() function in the order received.

Closes: apache#776

Signed-off-by: Craig Condit <[email protected]>
  • Loading branch information
chenyulin0719 authored and craigcondit committed Jan 19, 2024
1 parent 1f5f1c9 commit bce75ba
Showing 1 changed file with 37 additions and 54 deletions.
91 changes: 37 additions & 54 deletions pkg/rmproxy/rmproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,37 +314,26 @@ func (rmp *RMProxy) UpdateAllocation(request *si.AllocationRequest) error {
if rmp.GetResourceManagerCallback(request.RmID) == nil {
return fmt.Errorf("received AllocationRequest, but RmID=\"%s\" not registered", request.RmID)
}
go func() {
// Update allocations
if len(request.Allocations) > 0 {
for _, alloc := range request.Allocations {
alloc.PartitionName = common.GetNormalizedPartitionName(alloc.PartitionName, request.RmID)
}
}

// Update asks
if len(request.Asks) > 0 {
for _, ask := range request.Asks {
ask.PartitionName = common.GetNormalizedPartitionName(ask.PartitionName, request.RmID)
}
}
// Update allocations
for _, alloc := range request.Allocations {
alloc.PartitionName = common.GetNormalizedPartitionName(alloc.PartitionName, request.RmID)
}

// Update releases
if request.Releases != nil {
if len(request.Releases.AllocationsToRelease) > 0 {
for _, rel := range request.Releases.AllocationsToRelease {
rel.PartitionName = common.GetNormalizedPartitionName(rel.PartitionName, request.RmID)
}
}
// Update asks
for _, ask := range request.Asks {
ask.PartitionName = common.GetNormalizedPartitionName(ask.PartitionName, request.RmID)
}

if len(request.Releases.AllocationAsksToRelease) > 0 {
for _, rel := range request.Releases.AllocationAsksToRelease {
rel.PartitionName = common.GetNormalizedPartitionName(rel.PartitionName, request.RmID)
}
}
// Update releases
if request.Releases != nil {
for _, rel := range request.Releases.AllocationsToRelease {
rel.PartitionName = common.GetNormalizedPartitionName(rel.PartitionName, request.RmID)
}
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateAllocationEvent{Request: request})
}()
for _, rel := range request.Releases.AllocationAsksToRelease {
rel.PartitionName = common.GetNormalizedPartitionName(rel.PartitionName, request.RmID)
}
}
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateAllocationEvent{Request: request})
return nil
}

Expand All @@ -353,40 +342,34 @@ func (rmp *RMProxy) UpdateApplication(request *si.ApplicationRequest) error {
return fmt.Errorf("received ApplicationRequest, but RmID=\"%s\" not registered", request.RmID)
}

go func() {
// Update New apps
if len(request.New) > 0 {
for _, app := range request.New {
app.PartitionName = common.GetNormalizedPartitionName(app.PartitionName, request.RmID)
}
}
// Update Remove apps
if len(request.Remove) > 0 {
for _, app := range request.Remove {
app.PartitionName = common.GetNormalizedPartitionName(app.PartitionName, request.RmID)
}
}
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateApplicationEvent{Request: request})
}()
// Update New apps
for _, app := range request.New {
app.PartitionName = common.GetNormalizedPartitionName(app.PartitionName, request.RmID)
}

// Update Remove apps
for _, app := range request.Remove {
app.PartitionName = common.GetNormalizedPartitionName(app.PartitionName, request.RmID)
}

rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateApplicationEvent{Request: request})
return nil
}

func (rmp *RMProxy) UpdateNode(request *si.NodeRequest) error {
if rmp.GetResourceManagerCallback(request.RmID) == nil {
return fmt.Errorf("received NodeRequest, but RmID=\"%s\" not registered", request.RmID)
}
go func() {
if len(request.Nodes) > 0 {
for _, node := range request.Nodes {
if len(node.GetAttributes()) == 0 {
node.Attributes = map[string]string{}
}
partition := node.Attributes[siCommon.NodePartition]
node.Attributes[siCommon.NodePartition] = common.GetNormalizedPartitionName(partition, request.RmID)
}

for _, node := range request.Nodes {
if len(node.GetAttributes()) == 0 {
node.Attributes = map[string]string{}
}
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateNodeEvent{Request: request})
}()
partition := node.Attributes[siCommon.NodePartition]
node.Attributes[siCommon.NodePartition] = common.GetNormalizedPartitionName(partition, request.RmID)
}

rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateNodeEvent{Request: request})
return nil
}

Expand Down

0 comments on commit bce75ba

Please sign in to comment.