Skip to content

Commit

Permalink
Merge pull request #767 from Project-MONAI/AC-2127
Browse files Browse the repository at this point in the history
adding stats update for timeout(canceled) tasks
  • Loading branch information
neildsouth authored Apr 26, 2023
2 parents 9afe61c + 0191191 commit 62a3dc2
Show file tree
Hide file tree
Showing 19 changed files with 341 additions and 120 deletions.
1 change: 1 addition & 0 deletions src/Shared/Shared/Wrappers/StatsPagedResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class StatsPagedResponse<T> : PagedResponse<T>
public DateTime PeriodEnd { get; set; }
public long TotalExecutions { get; set; }
public long TotalFailures { get; set; }
public long TotalInprogress { get; set; }
public double AverageTotalExecutionSeconds { get; set; }
public double AverageArgoExecutionSeconds { get; set; }

Expand Down
10 changes: 10 additions & 0 deletions src/TaskManager/API/Models/TaskExecutionStats.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,5 +136,15 @@ public TaskExecutionStats(TaskUpdateEvent taskUpdateEvent)
TaskId = taskUpdateEvent.TaskId;
Status = taskUpdateEvent.Status.ToString();
}

public TaskExecutionStats(TaskCancellationEvent taskCanceledEvent, string correlationId)
{
Guard.Against.Null(taskCanceledEvent, "taskCanceledEvent");
CorrelationId = correlationId;
WorkflowInstanceId = taskCanceledEvent.WorkflowInstanceId;
ExecutionId = taskCanceledEvent.ExecutionId;
TaskId = taskCanceledEvent.TaskId;
Status = TaskExecutionStatus.Failed.ToString();
}
}
}
27 changes: 21 additions & 6 deletions src/TaskManager/Database/ITaskExecutionStatsRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,29 @@ public interface ITaskExecutionStatsRepository
/// Creates a task dispatch event in the database.
/// </summary>
/// <param name="taskDispatchEvent">A TaskDispatchEvent to create.</param>
/// <returns>Returns the created TaskDispatchEventInfo.</returns>
/// <returns></returns>
Task CreateAsync(TaskDispatchEventInfo taskDispatchEventInfo);

/// <summary>
/// Updates user accounts of a task dispatch event in the database.
/// Updates status of a task dispatch event in the database.
/// </summary>
/// <param name="taskDispatchEvent">A TaskDispatchEvent to update.</param>
/// <returns>Returns the created TaskDispatchEventInfo.</returns>
/// <returns></returns>
Task UpdateExecutionStatsAsync(TaskUpdateEvent taskUpdateEvent);

/// <summary>
/// Updates status of a task now its been canceled.
/// </summary>
/// <param name="TaskCanceledException">A TaskCanceledException to update.</param>
/// <returns></returns
Task UpdateExecutionStatsAsync(TaskCancellationEvent taskCanceledEvent, string correlationId);

/// <summary>
/// Returns paged entries between the two given dates.
/// </summary>
/// <param name="startTime">start of the range.</param>
/// <param name="endTime">end of the range.</param>
/// <returns>a paged view of entried in range</returns>
/// <returns>a collections of stats</returns>
Task<IEnumerable<TaskExecutionStats>> GetStatsAsync(DateTime startTime, DateTime endTime, int PageSize = 10, int PageNumber = 1, string workflowInstanceId = "", string taskId = "");

/// <summary>
Expand All @@ -49,14 +56,22 @@ public interface ITaskExecutionStatsRepository
/// <param name="startTime">start of the range.</param>
/// <param name="endTime">end of the range.</param>
/// <returns>The count of all records in range</returns>
Task<long> GetStatsCountAsync(DateTime startTime, DateTime endTime, string workflowInstanceId = "", string taskId = "");
//Task<long> GetStatsCountAsync(DateTime startTime, DateTime endTime, string workflowInstanceId = "", string taskId = "");

/// <summary>
/// Return the count of the entries with this status, or all if no status given
/// </summary>
/// <param name="start">start of the range.</param>
/// <param name="endTime">end of the range.</param>
/// <param name="status">the status to get count of, or string.empty</param>
/// <returns>The count of all records in range</returns>
Task<long> GetStatsStatusCountAsync(DateTime start, DateTime endTime, string status = "", string workflowInstanceId = "", string taskId = "");
/// <summary>
/// Returns all stats in Failed or PartialFail status.
/// </summary>
/// <param name="startTime">start of the range.</param>
/// <param name="endTime">end of the range.</param>
/// <returns>All stats NOT of that status</returns>
/// <returns>All stats that failed or partially failed</returns>
Task<long> GetStatsStatusFailedCountAsync(DateTime startTime, DateTime endTime, string workflowInstanceId = "", string taskId = "");

/// <summary>
Expand Down
49 changes: 34 additions & 15 deletions src/TaskManager/Database/TaskExecutionStatsRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,29 @@ await _taskExecutionStatsCollection.UpdateOneAsync(o =>
}
}

public async Task UpdateExecutionStatsAsync(TaskCancellationEvent taskCanceledEvent, string correlationId)
{
Guard.Against.Null(taskCanceledEvent, "taskCanceledEvent");

try
{
var updateMe = new TaskExecutionStats(taskCanceledEvent, correlationId);
var duration = updateMe.CompletedAtUTC == default ? 0 : (updateMe.CompletedAtUTC - updateMe.StartedUTC).TotalMilliseconds / 1000;
await _taskExecutionStatsCollection.UpdateOneAsync(o =>
o.ExecutionId == updateMe.ExecutionId,
Builders<TaskExecutionStats>.Update
.Set(w => w.Status, updateMe.Status)
.Set(w => w.LastUpdatedUTC, DateTime.UtcNow)
.Set(w => w.CompletedAtUTC, updateMe.CompletedAtUTC)
.Set(w => w.DurationSeconds, duration)

, new UpdateOptions { IsUpsert = true }).ConfigureAwait(false);
}
catch (Exception e)
{
_logger.DatabaseException(nameof(CreateAsync), e);
}
}
public async Task<IEnumerable<TaskExecutionStats>> GetStatsAsync(DateTime startTime, DateTime endTime, int PageSize = 10, int PageNumber = 1, string workflowInstanceId = "", string taskId = "")
{
startTime = startTime.ToUniversalTime();
Expand All @@ -128,12 +151,13 @@ public async Task<IEnumerable<TaskExecutionStats>> GetStatsAsync(DateTime startT
T.StartedUTC >= startTime &&
T.StartedUTC <= endTime.ToUniversalTime() &&
(workflowinstanceNull || T.WorkflowInstanceId == workflowInstanceId) &&
(taskIdNull || T.TaskId == taskId) &&
(
T.Status == TaskExecutionStatus.Succeeded.ToString()
|| T.Status == TaskExecutionStatus.Failed.ToString()
|| T.Status == TaskExecutionStatus.PartialFail.ToString()
)
(taskIdNull || T.TaskId == taskId)
//&&
//(
// T.Status == TaskExecutionStatus.Succeeded.ToString()
// || T.Status == TaskExecutionStatus.Failed.ToString()
// || T.Status == TaskExecutionStatus.PartialFail.ToString()
// )
)
.Limit(PageSize)
.Skip((PageNumber - 1) * PageSize)
Expand Down Expand Up @@ -173,24 +197,19 @@ private static TaskExecutionStats ExposeExecutionStats(TaskExecutionStats taskEx
}
return taskExecutionStats;
}

public async Task<long> GetStatsCountAsync(DateTime startTime, DateTime endTime, string workflowInstanceId = "", string taskId = "")
public async Task<long> GetStatsStatusCountAsync(DateTime start, DateTime endTime, string status = "", string workflowInstanceId = "", string taskId = "")
{
var statusNull = string.IsNullOrWhiteSpace(status);
var workflowinstanceNull = string.IsNullOrWhiteSpace(workflowInstanceId);
var taskIdNull = string.IsNullOrWhiteSpace(taskId);

return await _taskExecutionStatsCollection.CountDocumentsAsync(T =>
T.StartedUTC >= startTime.ToUniversalTime() &&
T.StartedUTC >= start.ToUniversalTime() &&
T.StartedUTC <= endTime.ToUniversalTime() &&
(workflowinstanceNull || T.WorkflowInstanceId == workflowInstanceId) &&
(taskIdNull || T.TaskId == taskId) &&
(
T.Status == TaskExecutionStatus.Succeeded.ToString() ||
T.Status == TaskExecutionStatus.Failed.ToString() ||
T.Status == TaskExecutionStatus.PartialFail.ToString())
);
(statusNull || T.Status == status));
}

public async Task<long> GetStatsStatusFailedCountAsync(DateTime start, DateTime endTime, string workflowInstanceId = "", string taskId = "")
{
var workflowinstanceNull = string.IsNullOrWhiteSpace(workflowInstanceId);
Expand Down
21 changes: 13 additions & 8 deletions src/TaskManager/TaskManager/Controllers/TaskStatsController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Monai.Deploy.Messaging.Events;
using Monai.Deploy.WorkflowManager.Configuration;
using Monai.Deploy.WorkflowManager.ControllersShared;
using Monai.Deploy.WorkflowManager.Shared.Filter;
Expand Down Expand Up @@ -78,16 +79,18 @@ public async Task<IActionResult> GetOverviewAsync([FromQuery] DateTime startTime
try
{
var fails = _repository.GetStatsStatusFailedCountAsync(startTime, endTime);
var rangeCount = _repository.GetStatsCountAsync(startTime, endTime);
var running = _repository.GetStatsStatusCountAsync(startTime, endTime, TaskExecutionStatus.Accepted.ToString());
var rangeCount = _repository.GetStatsStatusCountAsync(startTime, endTime);
var stats = _repository.GetAverageStats(startTime, endTime);

await Task.WhenAll(fails, rangeCount, stats);
await Task.WhenAll(fails, rangeCount, stats, running);
return Ok(new
{
PeriodStart = startTime,
PeriodEnd = endTime,
TotalExecutions = (int)rangeCount.Result,
TotalFailures = (int)fails.Result,
TotalInprogress = running.Result,
AverageTotalExecutionSeconds = Math.Round(stats.Result.avgTotalExecution, 2),
AverageArgoExecutionSeconds = Math.Round(stats.Result.avgArgoExecution, 2),
});
Expand All @@ -102,7 +105,7 @@ public async Task<IActionResult> GetOverviewAsync([FromQuery] DateTime startTime
[ProducesResponseType(typeof(StatsPagedResponse<List<ExecutionStatDTO>>), StatusCodes.Status200OK)]
[ProducesResponseType(typeof(ProblemDetails), StatusCodes.Status500InternalServerError)]
[HttpGet("stats")]
public async Task<IActionResult> GetStatsAsync([FromQuery] TimeFilter filter, string workflowId, string taskId)
public async Task<IActionResult> GetStatsAsync([FromQuery] TimeFilter filter, string? workflowId = "", string? taskId = "")
{

if ((string.IsNullOrWhiteSpace(workflowId) && string.IsNullOrWhiteSpace(taskId)) is false
Expand Down Expand Up @@ -130,12 +133,13 @@ public async Task<IActionResult> GetStatsAsync([FromQuery] TimeFilter filter, st

try
{
var allStats = _repository.GetStatsAsync(filter.StartTime, filter.EndTime, pageSize, filter.PageNumber, workflowId, taskId);
var fails = _repository.GetStatsStatusFailedCountAsync(filter.StartTime, filter.EndTime, workflowId, taskId);
var rangeCount = _repository.GetStatsCountAsync(filter.StartTime, filter.EndTime, workflowId, taskId);
var stats = _repository.GetAverageStats(filter.StartTime, filter.EndTime, workflowId, taskId);
var allStats = _repository.GetStatsAsync(filter.StartTime, filter.EndTime, pageSize, filter.PageNumber, workflowId ?? string.Empty, taskId ?? string.Empty);
var fails = _repository.GetStatsStatusFailedCountAsync(filter.StartTime, filter.EndTime, workflowId ?? string.Empty, taskId ?? string.Empty);
var rangeCount = _repository.GetStatsStatusCountAsync(filter.StartTime, filter.EndTime, string.Empty, workflowId ?? string.Empty, taskId ?? string.Empty);
var stats = _repository.GetAverageStats(filter.StartTime, filter.EndTime, workflowId ?? string.Empty, taskId ?? string.Empty);
var running = _repository.GetStatsStatusCountAsync(filter.StartTime, filter.EndTime, TaskExecutionStatus.Accepted.ToString());

await Task.WhenAll(allStats, fails, rangeCount, stats);
await Task.WhenAll(allStats, fails, rangeCount, stats, running);

ExecutionStatDTO[] statsDto;

Expand All @@ -150,6 +154,7 @@ public async Task<IActionResult> GetStatsAsync([FromQuery] TimeFilter filter, st
res.PeriodEnd = filter.EndTime;
res.TotalExecutions = rangeCount.Result;
res.TotalFailures = fails.Result;
res.TotalInprogress = running.Result;
res.AverageTotalExecutionSeconds = Math.Round(stats.Result.avgTotalExecution, 2);
res.AverageArgoExecutionSeconds = Math.Round(stats.Result.avgArgoExecution, 2);
return Ok(res);
Expand Down
64 changes: 37 additions & 27 deletions src/TaskManager/TaskManager/TaskManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,6 @@ public Task StartAsync(CancellationToken cancellationToken)
return Task.CompletedTask;
}

private void SubscribeToEvents()
{
_messageBrokerSubscriberService.SubscribeAsync(_options.Value.Messaging.Topics.TaskDispatchRequest, _options.Value.Messaging.Topics.TaskDispatchRequest, TaskDispatchEventReceivedCallback);
_messageBrokerSubscriberService.SubscribeAsync(_options.Value.Messaging.Topics.TaskCallbackRequest, _options.Value.Messaging.Topics.TaskCallbackRequest, TaskCallbackEventReceivedCallback);
_messageBrokerSubscriberService.SubscribeAsync(_options.Value.Messaging.Topics.TaskCancellationRequest, _options.Value.Messaging.Topics.TaskCancellationRequest, TaskCancelationEventCallback);
}

public Task StopAsync(CancellationToken cancellationToken)
{
_logger.ServiceStopping(ServiceName);
Expand All @@ -121,6 +114,39 @@ public Task StopAsync(CancellationToken cancellationToken)
return Task.CompletedTask;
}

private static JsonMessage<TaskUpdateEvent> GenerateUpdateEventMessage<T>(
JsonMessage<T> message,
string executionId,
string workflowInstanceId,
string taskId,
ExecutionStatus executionStatus,
List<Messaging.Common.Storage>? outputs = null)
{
Guard.Against.Null(message, nameof(message));
Guard.Against.Null(executionStatus, nameof(executionStatus));

var body = new TaskUpdateEvent
{
CorrelationId = message.CorrelationId,
ExecutionId = executionId,
Reason = executionStatus.FailureReason,
Status = executionStatus.Status,
ExecutionStats = executionStatus.Stats,
WorkflowInstanceId = workflowInstanceId,
TaskId = taskId,
Message = executionStatus.Errors,
Outputs = outputs ?? new List<Messaging.Common.Storage>(),
};
return new JsonMessage<TaskUpdateEvent>(body, TaskManagerApplicationId, message.CorrelationId);
}

private void SubscribeToEvents()
{
_messageBrokerSubscriberService.SubscribeAsync(_options.Value.Messaging.Topics.TaskDispatchRequest, _options.Value.Messaging.Topics.TaskDispatchRequest, TaskDispatchEventReceivedCallback);
_messageBrokerSubscriberService.SubscribeAsync(_options.Value.Messaging.Topics.TaskCallbackRequest, _options.Value.Messaging.Topics.TaskCallbackRequest, TaskCallbackEventReceivedCallback);
_messageBrokerSubscriberService.SubscribeAsync(_options.Value.Messaging.Topics.TaskCancellationRequest, _options.Value.Messaging.Topics.TaskCancellationRequest, TaskCancelationEventCallback);
}

private async Task TaskCallbackEventReceivedCallback(MessageReceivedEventArgs args)
{
await TaskCallBackGeneric<TaskCallbackEvent>(args, HandleTaskCallback);
Expand Down Expand Up @@ -200,8 +226,12 @@ private async Task HandleCancellationTask(JsonMessage<TaskCancellationEvent> mes
{
throw new InvalidOperationException("Task Event data not found.");
}

var taskRunner = typeof(ITaskPlugin).CreateInstance<ITaskPlugin>(serviceProvider: _scope.ServiceProvider, typeString: pluginAssembly, _serviceScopeFactory, taskExecEvent);
await taskRunner.HandleTimeout(message.Body.Identity);

await _taskExecutionStatsRepository.UpdateExecutionStatsAsync(message.Body, message.CorrelationId);
AcknowledgeMessage(message);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -515,26 +545,6 @@ private void AcknowledgeMessage<T>(JsonMessage<T> message)
}
}

private static JsonMessage<TaskUpdateEvent> GenerateUpdateEventMessage<T>(JsonMessage<T> message, string executionId, string WorkflowInstanceId, string taskId, ExecutionStatus executionStatus, List<Messaging.Common.Storage> outputs = null)
{
Guard.Against.Null(message, nameof(message));
Guard.Against.Null(executionStatus, nameof(executionStatus));

var body = new TaskUpdateEvent
{
CorrelationId = message.CorrelationId,
ExecutionId = executionId,
Reason = executionStatus.FailureReason,
Status = executionStatus.Status,
ExecutionStats = executionStatus.Stats,
WorkflowInstanceId = WorkflowInstanceId,
TaskId = taskId,
Message = executionStatus.Errors,
Outputs = outputs ?? new List<Messaging.Common.Storage>(),
};
return new JsonMessage<TaskUpdateEvent>(body, TaskManagerApplicationId, message.CorrelationId);
}

//TODO: gh-100 implement retry logic
private async Task SendUpdateEvent(JsonMessage<TaskUpdateEvent> message)
{
Expand Down
6 changes: 3 additions & 3 deletions src/TaskManager/TaskManager/appsettings.Local.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"WorkloadManagerDatabase": {
"ConnectionString": "mongodb://root:rootpassword@localhost:30017",
"ConnectionString": "mongodb://root:rootpassword@localhost:27017",
"DatabaseName": "WorkloadManager"
},
"WorkflowManager": {
Expand Down Expand Up @@ -43,7 +43,7 @@
"endpoint": "localhost",
"username": "admin",
"password": "admin",
"port": "30072",
"port": "5672",
"virtualHost": "monaideploy",
"exchange": "monaideploy",
"deadLetterExchange": "monaideploy-dead-letter",
Expand All @@ -55,7 +55,7 @@
"endpoint": "localhost",
"username": "admin",
"password": "admin",
"port": "30072",
"port": "5672",
"virtualHost": "monaideploy",
"exchange": "monaideploy",
"exportRequestQueue": "export_tasks"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/

using System.Globalization;
using System.Linq;
using System.Runtime.CompilerServices;
using Ardalis.GuardClauses;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

{
"periodStart": "2023-01-01T00:00:00",
"periodEnd": "2023-04-11T10:13:29.9717784+01:00",
"periodEnd": "2023-04-26T15:01:00.9582693+01:00",
"totalExecutions": 0,
"totalFailures": 0,
"totalInprogress": 1,
"averageTotalExecutionSeconds": 0.0,
"averageArgoExecutionSeconds": 0.0,
"pageNumber": 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

{
"periodStart": "2023-01-01T00:00:00",
"periodEnd": "2023-04-11T10:13:26.6294812+01:00",
"periodEnd": "2023-04-26T15:01:00.678874+01:00",
"totalExecutions": 0,
"totalFailures": 0,
"totalInprogress": 1,
"averageTotalExecutionSeconds": 0.0,
"averageArgoExecutionSeconds": 0.0,
"pageNumber": 1,
Expand Down
Loading

0 comments on commit 62a3dc2

Please sign in to comment.