Skip to content

Commit

Permalink
Add limit to purge with filter
Browse files Browse the repository at this point in the history
  • Loading branch information
lucaslorentz committed Nov 11, 2023
1 parent 401a8a4 commit 994b280
Show file tree
Hide file tree
Showing 12 changed files with 202 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public static IReadOnlyList<IEndpointConventionBuilder> MapOrchestrationEndpoint
{
var orchestrationServiceSearchClient = context.RequestServices.GetRequiredService<IOrchestrationServiceQueryClient>();
var query = context.ParseQuery<ExtendedOrchestrationQuery>();
var query = context.ParseQuery<OrchestrationQueryExtended>();
var result = await orchestrationServiceSearchClient.GetOrchestrationWithQueryAsync(query, context.RequestAborted);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace LLL.DurableTask.Core;

public class ExtendedOrchestrationQuery : OrchestrationQuery
public class OrchestrationQueryExtended : OrchestrationQuery
{
public string NamePrefix { get; set; }
public DateTime? CompletedTimeFrom { get; set; }
Expand Down
18 changes: 18 additions & 0 deletions src/LLL.DurableTask.Core/PurgeInstanceFilterExtended.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using DurableTask.Core;

namespace LLL.DurableTask.Core;

public class PurgeInstanceFilterExtended : PurgeInstanceFilter
{
public PurgeInstanceFilterExtended(
DateTime createdTimeFrom,
DateTime? createdTimeTo,
IEnumerable<OrchestrationStatus> runtimeStatus)
: base(createdTimeFrom, createdTimeTo, runtimeStatus)
{
}

public int? Limit { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Data;
using System.Linq;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.Query;
using LLL.DurableTask.Core;
using LLL.DurableTask.EFCore.Entities;
Expand Down Expand Up @@ -148,12 +149,38 @@ public override IQueryable<Execution> CreateFilteredQueryable(
{
var queryable = base.CreateFilteredQueryable(dbContext, query);

if (query is not ExtendedOrchestrationQuery extendedQuery
if (query is not OrchestrationQueryExtended extendedQuery
|| !extendedQuery.Tags.Any())
{
queryable = queryable.WithStraightJoin();
}

return queryable;
}

public override async Task<int> PurgeInstanceHistoryAsync(OrchestrationDbContext dbContext, PurgeInstanceFilter filter)
{
var limit = filter is PurgeInstanceFilterExtended filterExtended
? filterExtended.Limit
: null;

var parameters = new ParametersCollection();

return await dbContext.Database.ExecuteSqlRawAsync($@"
DELETE FROM Executions
WHERE ExecutionId IN(
SELECT ExecutionId FROM (
SELECT Executions.ExecutionId
FROM Executions
INNER JOIN Instances ON Executions.InstanceId = Instances.InstanceId
WHERE Executions.CreatedTime > {parameters.Add(filter.CreatedTimeFrom)}
{(filter.CreatedTimeTo != null ? $"AND Executions.CreatedTime < {parameters.Add(filter.CreatedTimeTo)}" : "")}
{(filter.RuntimeStatus.Any() ? $"AND Executions.Status IN ({string.Join(",", filter.RuntimeStatus.Select(s => parameters.Add(s.ToString())))})" : "")}
ORDER BY Executions.CreatedTime
{(limit != null ? $"LIMIT {parameters.Add(limit)}" : null)}
FOR UPDATE SKIP LOCKED
) T
);
", parameters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
using System.Data;
using System.Linq;
using System.Threading.Tasks;
using DurableTask.Core;
using LLL.DurableTask.Core;
using LLL.DurableTask.EFCore.Entities;
using Microsoft.EntityFrameworkCore;

Expand Down Expand Up @@ -138,4 +140,28 @@ FOR UPDATE SKIP LOCKED

return instance;
}

public override async Task<int> PurgeInstanceHistoryAsync(OrchestrationDbContext dbContext, PurgeInstanceFilter filter)
{
var limit = filter is PurgeInstanceFilterExtended filterExtended
? filterExtended.Limit
: null;

var parameters = new ParametersCollection();

return await dbContext.Database.ExecuteSqlRawAsync($@"
DELETE FROM ""Executions""
WHERE ""ExecutionId"" IN(
SELECT ""Executions"".""ExecutionId""
FROM ""Executions""
INNER JOIN ""Instances"" ON ""Executions"".""InstanceId"" = ""Instances"".""InstanceId""
WHERE ""Executions"".""CreatedTime"" > {parameters.Add(filter.CreatedTimeFrom)}
{(filter.CreatedTimeTo != null ? $@"AND ""Executions"".""CreatedTime"" < {parameters.Add(filter.CreatedTimeTo)}" : "")}
{(filter.RuntimeStatus.Any() ? $@"AND ""Executions"".""Status"" IN ({string.Join(",", filter.RuntimeStatus.Select(s => parameters.Add(s.ToString())))})" : "")}
ORDER BY ""Executions"".""CreatedTime""
{(limit != null ? $"LIMIT {parameters.Add(limit)}" : null)}
FOR UPDATE SKIP LOCKED
);
", parameters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
using System.Data;
using System.Linq;
using System.Threading.Tasks;
using DurableTask.Core;
using LLL.DurableTask.Core;
using LLL.DurableTask.EFCore.Entities;
using Microsoft.EntityFrameworkCore;

Expand Down Expand Up @@ -132,4 +134,26 @@ WHERE Queue IN ({queuesParams})

return instance;
}

public override async Task<int> PurgeInstanceHistoryAsync(OrchestrationDbContext dbContext, PurgeInstanceFilter filter)
{
var limit = filter is PurgeInstanceFilterExtended filterExtended
? filterExtended.Limit
: null;

var parameters = new ParametersCollection();

return await dbContext.Database.ExecuteSqlRawAsync($@"
DELETE FROM Executions
WHERE ExecutionId IN(
SELECT {(limit != null ? $"TOP ({parameters.Add(limit)})" : null)} Executions.ExecutionId
FROM Executions WITH (UPDLOCK, READPAST)
INNER JOIN Instances WITH (UPDLOCK, READPAST) ON Executions.InstanceId = Instances.InstanceId
WHERE Executions.CreatedTime > {parameters.Add(filter.CreatedTimeFrom)}
{(filter.CreatedTimeTo != null ? $"AND Executions.CreatedTime < {parameters.Add(filter.CreatedTimeTo)}" : "")}
{(filter.RuntimeStatus.Any() ? $"AND Executions.Status IN ({string.Join(",", filter.RuntimeStatus.Select(s => parameters.Add(s.ToString())))})" : "")}
ORDER BY Executions.CreatedTime
);
", parameters);
}
}
30 changes: 30 additions & 0 deletions src/LLL.DurableTask.EFCore/Helpers/ParametersCollection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System.Collections;
using System.Collections.Generic;

namespace LLL.DurableTask.EFCore;

public class ParametersCollection : IEnumerable<object>
{
private readonly List<object> _values = new();

public string Add(object value)
{
_values.Add(value);
return $"{{{_values.Count - 1}}}";
}

public object[] ToArray()
{
return _values.ToArray();
}

public IEnumerator<object> GetEnumerator()
{
return _values.GetEnumerator();
}

IEnumerator IEnumerable.GetEnumerator()
{
return _values.GetEnumerator();
}
}
32 changes: 20 additions & 12 deletions src/LLL.DurableTask.EFCore/OrchestrationDbContextExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public abstract Task<ActivityMessage> TryLockNextActivityMessageAsync(
string[] queues,
TimeSpan lockTimeout);

public async Task PurgeOrchestrationHistoryAsync(
public virtual async Task PurgeOrchestrationHistoryAsync(
OrchestrationDbContext dbContext,
DateTime thresholdDateTimeUtc,
OrchestrationStateTimeRangeFilterType timeRangeFilterType)
Expand All @@ -66,7 +66,7 @@ public async Task PurgeOrchestrationHistoryAsync(
await ExecuteDeleteAsync(dbContext, query);
}

public async Task<int> PurgeInstanceHistoryAsync(
public virtual async Task<int> PurgeInstanceHistoryAsync(
OrchestrationDbContext dbContext,
string instanceId)
{
Expand All @@ -75,15 +75,6 @@ public async Task<int> PurgeInstanceHistoryAsync(
return await ExecuteDeleteAsync(dbContext, query);
}

public async Task<int> PurgeInstanceHistoryAsync(OrchestrationDbContext dbContext, PurgeInstanceFilter filter)
{
var query = dbContext.Executions.Where(e => e.CreatedTime >= filter.CreatedTimeFrom
&& (filter.CreatedTimeTo == null || e.CreatedTime <= filter.CreatedTimeTo)
&& (filter.RuntimeStatus == null || filter.RuntimeStatus.Contains(e.Status)));

return await ExecuteDeleteAsync(dbContext, query);
}

protected virtual Task<int> ExecuteDeleteAsync<T>(OrchestrationDbContext dbContext, IQueryable<T> query)
where T : class
{
Expand All @@ -94,7 +85,7 @@ public virtual IQueryable<Execution> CreateFilteredQueryable(
OrchestrationDbContext dbContext,
OrchestrationQuery query)
{
var extendedQuery = query as ExtendedOrchestrationQuery;
var extendedQuery = query as OrchestrationQueryExtended;

var queryable = dbContext.Executions as IQueryable<Entities.Execution>;

Expand Down Expand Up @@ -149,4 +140,21 @@ public virtual IQueryable<Execution> CreateFilteredQueryable(

return queryable;
}

public virtual async Task<int> PurgeInstanceHistoryAsync(OrchestrationDbContext dbContext, PurgeInstanceFilter filter)
{
var query = dbContext.Executions.Where(e => e.CreatedTime >= filter.CreatedTimeFrom
&& (filter.CreatedTimeTo == null || e.CreatedTime <= filter.CreatedTimeTo)
&& (filter.RuntimeStatus == null || filter.RuntimeStatus.Contains(e.Status)));

if (filter is PurgeInstanceFilterExtended filterExtended)
{
if (filterExtended.Limit != null)
{
query = query.Take(filterExtended.Limit.Value);
}
}

return await ExecuteDeleteAsync(dbContext, query);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public async Task<OrchestrationQueryResult> GetOrchestrationWithQueryAsync(Orche
request.InstanceIdPrefix = query.InstanceIdPrefix;
request.FetchInputsAndOutputs = query.FetchInputsAndOutputs;

if (query is ExtendedOrchestrationQuery extendedQuery)
if (query is OrchestrationQueryExtended extendedQuery)
{
request.NamePrefix = extendedQuery.NamePrefix;
request.CompletedTimeFrom = ToTimestamp(extendedQuery.CompletedTimeFrom);
Expand Down Expand Up @@ -211,6 +211,14 @@ public async Task<PurgeResult> PurgeInstanceStateAsync(PurgeInstanceFilter filte
if (filter.RuntimeStatus != null)
request.RuntimeStatus.AddRange(filter.RuntimeStatus.Select(s => (int)s));

if (filter is PurgeInstanceFilterExtended filterExtended)
{
if (filterExtended.Limit != null)
{
request.Limit = filterExtended.Limit.Value;
}
}

var result = await _client.PurgeInstanceHistoryAsync(request);

return new PurgeResult(result.InstancesDeleted);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public override async Task<WaitForOrchestrationResponse> WaitForOrchestration(Wa

public override async Task<GetOrchestrationWithQueryResponse> GetOrchestrationWithQuery(GetOrchestrationWithQueryRequest request, ServerCallContext context)
{
var query = new ExtendedOrchestrationQuery();
var query = new OrchestrationQueryExtended();
query.RuntimeStatus = request.RuntimeStatus.Select(s => (OrchestrationStatus)s).ToArray();
query.CreatedTimeFrom = request.CreatedTimeFrom?.ToDateTime();
query.CreatedTimeTo = request.CreatedTimeTo?.ToDateTime();
Expand Down Expand Up @@ -202,7 +202,10 @@ public override async Task<PurgeInstanceHistoryResponse> PurgeInstanceHistory(Pu
var createdTimeTo = request.CreatedTimeTo?.ToDateTime();
var runtimeStatus = request.RuntimeStatus.Select(s => (OrchestrationStatus)s).ToArray();

var filter = new PurgeInstanceFilter(createdTimeFrom, createdTimeTo, runtimeStatus);
var filter = new PurgeInstanceFilterExtended(createdTimeFrom, createdTimeTo, runtimeStatus)
{
Limit = request.HasLimit ? request.Limit : null
};

result = await client.PurgeInstanceStateAsync(filter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ message PurgeInstanceHistoryRequest {
google.protobuf.Timestamp created_time_from = 2;
google.protobuf.Timestamp created_time_to = 3;
repeated int32 runtime_status = 4;
optional int32 limit = 5;
}

message PurgeInstanceHistoryResponse {
Expand Down
41 changes: 39 additions & 2 deletions test/LLL.DurableTask.Tests/Storages/StorageTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public async Task Tags_ShouldBeStoredAndRetrieved()

[Trait("Category", "Integration")]
[SkippableFact]
public async Task PurgeInstanceState_ShouldPurge()
public async Task PurgeInstanceStateWithInstanceId_ShouldPurge()
{
var taskHubClient = _host.Services.GetRequiredService<TaskHubClient>();
var purgeClient = _host.Services.GetService<IOrchestrationServicePurgeClient>();
Expand All @@ -309,6 +309,43 @@ public async Task PurgeInstanceState_ShouldPurge()
stateAfterPurge.Should().BeNull();
}

[Trait("Category", "Integration")]
[SkippableFact]
public async Task PurgeInstanceStateWithFilter_ShouldPurge()
{
var taskHubClient = _host.Services.GetRequiredService<TaskHubClient>();
var purgeClient = _host.Services.GetService<IOrchestrationServicePurgeClient>();
Skip.If(purgeClient == null, "Purge instance not supported");

var instance = await taskHubClient.CreateOrchestrationInstanceAsync(
EmptyOrchestration.Name,
EmptyOrchestration.Version,
string.Empty);

var state = await taskHubClient.GetOrchestrationStateAsync(instance.InstanceId);
state.Should().NotBeNull();

var filter = new PurgeInstanceFilterExtended(DateTime.UnixEpoch, DateTime.UtcNow, new[] {
OrchestrationStatus.Pending,
OrchestrationStatus.Running,
OrchestrationStatus.Completed,
OrchestrationStatus.ContinuedAsNew,
OrchestrationStatus.Failed,
OrchestrationStatus.Canceled,
OrchestrationStatus.Terminated,
OrchestrationStatus.Suspended,
})
{
Limit = 1000
};

var purgeResult = await purgeClient.PurgeInstanceStateAsync(filter);
purgeResult.DeletedInstanceCount.Should().BeGreaterThan(0);

var stateAfterPurge = await taskHubClient.GetOrchestrationStateAsync(instance.InstanceId);
stateAfterPurge.Should().BeNull();
}

[Trait("Category", "Integration")]
[SkippableFact]
public async Task GetOrchestrationsWithQuery_ShouldNotError()
Expand All @@ -326,7 +363,7 @@ public async Task GetOrchestrationsWithQuery_ShouldNotError()
{ "key-b", "value-b" }
});

var query = new ExtendedOrchestrationQuery
var query = new OrchestrationQueryExtended
{
Tags = {
{ "key-a", "value-a" },
Expand Down

0 comments on commit 994b280

Please sign in to comment.