Skip to content

Commit

Permalink
[Disk Manager] rename MissedEstimatesUntilHanging to MissedEstimatesU…
Browse files Browse the repository at this point in the history
…ntilTaskIsHanging; refactor ExecutionContext.GetDeadline to ExecutionContext.IsHanging (#2016)
  • Loading branch information
SvartMetal authored Sep 12, 2024
1 parent 4884287 commit 8764d22
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 130 deletions.
7 changes: 4 additions & 3 deletions cloud/tasks/config/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,18 @@ message TasksConfig {
repeated string ZoneIds = 23;

optional string HearbeatInterval = 24 [default = "30s"];
// The time window within which the node is considered alive.
// The time window within which the node is considered alive.
optional string LivenessWindow = 25 [default = "30s"];

// Feature flag for enabling node eviction policy based
// on the health of the node and the number of running nodes.
// on the health of the node and the number of running nodes.
optional bool NodeEvictionEnabled = 26 [default = false];

map<string, int64> InflightTaskPerNodeLimits = 27; // by task type

optional string CollectListerMetricsTaskScheduleInterval = 28 [default = "10m"];
optional string ListerMetricsCollectionInterval = 29 [default = "1m"];
optional int64 MaxHangingTaskIDsToReport = 30 [default = 1000];
optional uint64 MissedEstimatesUntilHanging = 31 [default = 2];
// Number of missed estimated durations until task is considered hanging.
optional uint64 MissedEstimatesUntilTaskIsHanging = 31 [default = 2];
}
44 changes: 24 additions & 20 deletions cloud/tasks/execution_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type ExecutionContext interface {
// Dependencies are automatically added by Scheduler.WaitTask.
AddTaskDependency(ctx context.Context, taskID string) error

GetDeadline() time.Time
IsHanging() bool

SetEstimate(estimatedDuration time.Duration)

Expand All @@ -45,13 +45,14 @@ type ExecutionContext interface {
////////////////////////////////////////////////////////////////////////////////

type executionContext struct {
task Task
storage storage.Storage
taskState storage.TaskState
taskStateMutex sync.Mutex
finished bool
hangingTaskTimeout time.Duration
missedEstimatesUntilHanging uint64
task Task
storage storage.Storage
taskState storage.TaskState
taskStateMutex sync.Mutex
finished bool

hangingTaskTimeout time.Duration
missedEstimatesUntilTaskIsHanging uint64
}

// HACK from https://github.com/stretchr/testify/pull/694/files to avoid fake race detection
Expand Down Expand Up @@ -116,26 +117,28 @@ func (c *executionContext) AddTaskDependency(
})
}

func (c *executionContext) GetDeadline() time.Time {
func (c *executionContext) IsHanging() bool {
c.taskStateMutex.Lock()
defer c.taskStateMutex.Unlock()

var estimatedDuration time.Duration
now := time.Now()
defaultDeadline := c.taskState.CreatedAt.Add(c.hangingTaskTimeout)

var estimatedDuration time.Duration
if c.taskState.EstimatedTime.After(c.taskState.CreatedAt) {
estimatedDuration = c.taskState.EstimatedTime.Sub(c.taskState.CreatedAt)
} else {
return defaultDeadline
return defaultDeadline.After(now)
}

deadline := c.taskState.CreatedAt.Add(
estimatedDuration * time.Duration(c.missedEstimatesUntilHanging),
estimatedDuration * time.Duration(c.missedEstimatesUntilTaskIsHanging),
)
if deadline.Before(defaultDeadline) {
return defaultDeadline
return defaultDeadline.After(now)
}

return deadline
return deadline.After(now)
}

func (c *executionContext) SetEstimate(estimatedDuration time.Duration) {
Expand Down Expand Up @@ -359,15 +362,16 @@ func newExecutionContext(
storage storage.Storage,
taskState storage.TaskState,
hangingTaskTimeout time.Duration,
missedEstimatesUntilHanging uint64,
missedEstimatesUntilTaskIsHanging uint64,
) *executionContext {

return &executionContext{
task: task,
storage: storage,
taskState: taskState,
hangingTaskTimeout: hangingTaskTimeout,
missedEstimatesUntilHanging: missedEstimatesUntilHanging,
task: task,
storage: storage,
taskState: taskState,

hangingTaskTimeout: hangingTaskTimeout,
missedEstimatesUntilTaskIsHanging: missedEstimatesUntilTaskIsHanging,
}
}

Expand Down
6 changes: 3 additions & 3 deletions cloud/tasks/mocks/execution_context_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ func (c *ExecutionContextMock) AddTaskDependency(
return args.Error(0)
}

func (c *ExecutionContextMock) GetDeadline() time.Time {
c.Called()
return time.Time{}
func (c *ExecutionContextMock) IsHanging() bool {
args := c.Called()
return args.Bool(0)
}

func (c *ExecutionContextMock) SetEstimate(estimatedDuration time.Duration) {
Expand Down
114 changes: 59 additions & 55 deletions cloud/tasks/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,19 @@ type runner interface {
////////////////////////////////////////////////////////////////////////////////

type runnerForRun struct {
storage storage.Storage
registry *Registry
metrics runnerMetrics
channel *channel
pingPeriod time.Duration
pingTimeout time.Duration
host string
id string
maxRetriableErrorCount uint64
maxPanicCount uint64
hangingTaskTimeout time.Duration
missedEstimatesUntilHanging uint64
storage storage.Storage
registry *Registry
metrics runnerMetrics
channel *channel
pingPeriod time.Duration
pingTimeout time.Duration
host string
id string
maxRetriableErrorCount uint64
maxPanicCount uint64

hangingTaskTimeout time.Duration
missedEstimatesUntilTaskIsHanging uint64
}

func (r *runnerForRun) receiveTask(
Expand Down Expand Up @@ -319,23 +320,24 @@ func (r *runnerForRun) lockAndExecuteTask(
r,
taskInfo,
r.hangingTaskTimeout,
r.missedEstimatesUntilHanging,
r.missedEstimatesUntilTaskIsHanging,
)
}

////////////////////////////////////////////////////////////////////////////////

type runnerForCancel struct {
storage storage.Storage
registry *Registry
metrics runnerMetrics
channel *channel
pingPeriod time.Duration
pingTimeout time.Duration
host string
id string
hangingTaskTimeout time.Duration
missedEstimatesUntilHanging uint64
storage storage.Storage
registry *Registry
metrics runnerMetrics
channel *channel
pingPeriod time.Duration
pingTimeout time.Duration
host string
id string

hangingTaskTimeout time.Duration
missedEstimatesUntilTaskIsHanging uint64
}

func (r *runnerForCancel) receiveTask(
Expand Down Expand Up @@ -445,7 +447,7 @@ func (r *runnerForCancel) lockAndExecuteTask(
r,
taskInfo,
r.hangingTaskTimeout,
r.missedEstimatesUntilHanging,
r.missedEstimatesUntilTaskIsHanging,
)
}

Expand Down Expand Up @@ -512,7 +514,7 @@ func lockAndExecuteTask(
runner runner,
taskInfo storage.TaskInfo,
hangingTaskTimeout time.Duration,
missedEstimatesUntilHanging uint64,
missedEstimatesUntilTaskIsHanging uint64,
) error {

taskState, err := runner.lockTask(ctx, taskInfo)
Expand Down Expand Up @@ -581,7 +583,7 @@ func lockAndExecuteTask(
taskStorage,
taskState,
hangingTaskTimeout,
missedEstimatesUntilHanging,
missedEstimatesUntilTaskIsHanging,
)

pingCtx, cancelPing := context.WithCancel(ctx)
Expand Down Expand Up @@ -643,7 +645,7 @@ func startRunner(
idForCancel string,
maxRetriableErrorCount uint64,
maxPanicCount uint64,
missedEstimatesUntilHanging uint64,
missedEstimatesUntilTaskIsHanging uint64,
) error {

// TODO: More granular control on runners and cancellers.
Expand All @@ -656,18 +658,19 @@ func startRunner(
)

go runnerLoop(ctx, registry, &runnerForRun{
storage: taskStorage,
registry: registry,
metrics: runnerForRunMetrics,
channel: channelForRun,
pingPeriod: pingPeriod,
pingTimeout: pingTimeout,
host: host,
id: idForRun,
maxRetriableErrorCount: maxRetriableErrorCount,
maxPanicCount: maxPanicCount,
hangingTaskTimeout: hangingTaskTimeout,
missedEstimatesUntilHanging: missedEstimatesUntilHanging,
storage: taskStorage,
registry: registry,
metrics: runnerForRunMetrics,
channel: channelForRun,
pingPeriod: pingPeriod,
pingTimeout: pingTimeout,
host: host,
id: idForRun,
maxRetriableErrorCount: maxRetriableErrorCount,
maxPanicCount: maxPanicCount,

hangingTaskTimeout: hangingTaskTimeout,
missedEstimatesUntilTaskIsHanging: missedEstimatesUntilTaskIsHanging,
})

runnerForCancelMetrics := newRunnerMetrics(
Expand All @@ -678,16 +681,17 @@ func startRunner(
)

go runnerLoop(ctx, registry, &runnerForCancel{
storage: taskStorage,
registry: registry,
metrics: runnerForCancelMetrics,
channel: channelForCancel,
pingPeriod: pingPeriod,
pingTimeout: pingTimeout,
host: host,
id: idForCancel,
hangingTaskTimeout: hangingTaskTimeout,
missedEstimatesUntilHanging: missedEstimatesUntilHanging,
storage: taskStorage,
registry: registry,
metrics: runnerForCancelMetrics,
channel: channelForCancel,
pingPeriod: pingPeriod,
pingTimeout: pingTimeout,
host: host,
id: idForCancel,

hangingTaskTimeout: hangingTaskTimeout,
missedEstimatesUntilTaskIsHanging: missedEstimatesUntilTaskIsHanging,
})

return nil
Expand All @@ -708,7 +712,7 @@ func startRunners(
host string,
maxRetriableErrorCount uint64,
maxPanicCount uint64,
missedEstimatesUntilHanging uint64,
missedEstimatesUntilTaskIsHanging uint64,
) error {

for i := uint64(0); i < runnerCount; i++ {
Expand All @@ -728,7 +732,7 @@ func startRunners(
fmt.Sprintf("cancel_%v", i),
maxRetriableErrorCount,
maxPanicCount,
missedEstimatesUntilHanging,
missedEstimatesUntilTaskIsHanging,
)
if err != nil {
return fmt.Errorf("failed to start runner #%d: %w", i, err)
Expand All @@ -753,7 +757,7 @@ func startStalkingRunners(
host string,
maxRetriableErrorCount uint64,
maxPanicCount uint64,
missedEstimatesUntilHanging uint64,
missedEstimatesUntilTaskIsHanging uint64,
) error {

for i := uint64(0); i < runnerCount; i++ {
Expand All @@ -773,7 +777,7 @@ func startStalkingRunners(
fmt.Sprintf("stalker_cancel_%v", i),
maxRetriableErrorCount,
maxPanicCount,
missedEstimatesUntilHanging,
missedEstimatesUntilTaskIsHanging,
)
if err != nil {
return fmt.Errorf("failed to start stalking runner #%d: %w", i, err)
Expand Down Expand Up @@ -924,7 +928,7 @@ func StartRunners(
host,
config.GetMaxRetriableErrorCount(),
config.GetMaxPanicCount(),
config.GetMissedEstimatesUntilHanging(),
config.GetMissedEstimatesUntilTaskIsHanging(),
)
if err != nil {
return err
Expand Down Expand Up @@ -976,7 +980,7 @@ func StartRunners(
host,
config.GetMaxRetriableErrorCount(),
config.GetMaxPanicCount(),
config.GetMissedEstimatesUntilHanging(),
config.GetMissedEstimatesUntilTaskIsHanging(),
)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 8764d22

Please sign in to comment.