Skip to content

Commit

Permalink
Added support for instanced versions of MayInterleave predicates (#8548)
Browse files Browse the repository at this point in the history
* Added support for instanced versions of MayInterleave predicates

* Renamed predicate

* Changed ReentrantPredicate to a singleton

* Renamed IMayInterleavePredicateGrain to IMayInterleaveStaticPredicateGrain along with corresponding classes and tests

* Added IMayInterleaveInstancedPredicateGrain and corresponding tests

* Remove unnecessary generics

* Fixed MayInterleaveInstancedPredicateGrain grain to actually include instanced version of MayInterleave attribute

* Revert "Remove unnecessary generics"

This reverts commit ce74765.

Revert "Remove unnecessary generics"

This reverts commit ce74765.

* Avoid LINQ Expressions

* Minor simplification

---------

Co-authored-by: Reuben Bond <[email protected]>
  • Loading branch information
blazknuplez and ReubenBond authored Jul 24, 2023
1 parent edf4183 commit 46982e9
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 35 deletions.
75 changes: 61 additions & 14 deletions src/Orleans.Runtime/Activation/IGrainContextActivator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public interface IGrainContextActivatorProvider
/// <returns><see langword="true"/> if an appropriate activator was found, otherwise <see langword="false"/>.</returns>
bool TryGet(GrainType grainType, [NotNullWhen(true)] out IGrainContextActivator activator);
}

/// <summary>
/// Creates a grain context for the given grain address.
/// </summary>
Expand Down Expand Up @@ -289,7 +289,7 @@ public void Configure(GrainType grainType, GrainProperties properties, GrainType
shared.SetComponent<GrainCanInterleave>(component);
}

component.MayInterleavePredicates.Add(_ => true);
component.MayInterleavePredicates.Add(ReentrantPredicate.Instance);
}
}
}
Expand All @@ -305,11 +305,11 @@ public MayInterleaveConfiguratorProvider(GrainClassMap grainClassMap)

public bool TryGetConfigurator(GrainType grainType, GrainProperties properties, out IConfigureGrainContext configurator)
{
if (properties.Properties.TryGetValue(WellKnownGrainTypeProperties.MayInterleavePredicate, out var value)
if (properties.Properties.TryGetValue(WellKnownGrainTypeProperties.MayInterleavePredicate, out _)
&& _grainClassMap.TryGetGrainClass(grainType, out var grainClass))
{
var predicate = GetMayInterleavePredicate(grainClass);
configurator = new MayInterleaveConfigurator(message => predicate(message.BodyObject as IInvokable));
configurator = new MayInterleaveConfigurator(predicate);
return true;
}

Expand All @@ -321,20 +321,21 @@ public bool TryGetConfigurator(GrainType grainType, GrainProperties properties,
/// Returns interleave predicate depending on whether class is marked with <see cref="MayInterleaveAttribute"/> or not.
/// </summary>
/// <param name="grainType">Grain class.</param>
private static Func<IInvokable, bool> GetMayInterleavePredicate(Type grainType)
private static IMayInterleavePredicate GetMayInterleavePredicate(Type grainType)
{
var attribute = grainType.GetCustomAttribute<MayInterleaveAttribute>();
if (attribute is null)
{
return null;
}

// here
var callbackMethodName = attribute.CallbackMethodName;
var method = grainType.GetMethod(callbackMethodName, BindingFlags.Public | BindingFlags.Static | BindingFlags.FlattenHierarchy);
var method = grainType.GetMethod(callbackMethodName, BindingFlags.Public | BindingFlags.Static | BindingFlags.Instance | BindingFlags.FlattenHierarchy);
if (method == null)
{
throw new InvalidOperationException(
$"Class {grainType.FullName} doesn't declare public static method " +
$"Class {grainType.FullName} doesn't declare public method " +
$"with name {callbackMethodName} specified in MayInterleave attribute");
}

Expand All @@ -345,18 +346,64 @@ private static Func<IInvokable, bool> GetMayInterleavePredicate(Type grainType)
throw new InvalidOperationException(
$"Wrong signature of callback method {callbackMethodName} " +
$"specified in MayInterleave attribute for grain class {grainType.FullName}. \n" +
$"Expected: public static bool {callbackMethodName}(IInvokable req)");
$"Expected: public bool {callbackMethodName}(IInvokable req)");
}

if (method.IsStatic)
{
return new MayInterleaveStaticPredicate(method.CreateDelegate<Func<IInvokable, bool>>());
}

return method.CreateDelegate<Func<IInvokable, bool>>();
var predicateType = typeof(MayInterleaveInstancedPredicate<>).MakeGenericType(grainType);
return (IMayInterleavePredicate)Activator.CreateInstance(predicateType, method);
}
}

internal interface IMayInterleavePredicate
{
bool Invoke(object instance, IInvokable bodyObject);
}

internal class ReentrantPredicate : IMayInterleavePredicate
{
private ReentrantPredicate()
{
}

public static ReentrantPredicate Instance { get; } = new();

public bool Invoke(object _, IInvokable bodyObject) => true;
}

internal class MayInterleaveStaticPredicate : IMayInterleavePredicate
{
private readonly Func<IInvokable, bool> _mayInterleavePredicate;

public MayInterleaveStaticPredicate(Func<IInvokable, bool> mayInterleavePredicate)
{
_mayInterleavePredicate = mayInterleavePredicate;
}

public bool Invoke(object _, IInvokable bodyObject) => _mayInterleavePredicate(bodyObject);
}

internal class MayInterleaveInstancedPredicate<T> : IMayInterleavePredicate where T : class
{
private readonly Func<T, IInvokable, bool> _mayInterleavePredicate;

public MayInterleaveInstancedPredicate(MethodInfo mayInterleavePredicateInfo)
{
_mayInterleavePredicate = mayInterleavePredicateInfo.CreateDelegate<Func<T, IInvokable, bool>>();
}

public bool Invoke(object instance, IInvokable bodyObject) => _mayInterleavePredicate(instance as T, bodyObject);
}

internal class MayInterleaveConfigurator : IConfigureGrainContext
{
private readonly Func<Message, bool> _mayInterleavePredicate;
private readonly IMayInterleavePredicate _mayInterleavePredicate;

public MayInterleaveConfigurator(Func<Message, bool> mayInterleavePredicate)
public MayInterleaveConfigurator(IMayInterleavePredicate mayInterleavePredicate)
{
_mayInterleavePredicate = mayInterleavePredicate;
}
Expand All @@ -376,12 +423,12 @@ public void Configure(IGrainContext context)

internal class GrainCanInterleave
{
public List<Func<Message, bool>> MayInterleavePredicates { get; } = new List<Func<Message, bool>>();
public bool MayInterleave(Message message)
public List<IMayInterleavePredicate> MayInterleavePredicates { get; } = new List<IMayInterleavePredicate>();
public bool MayInterleave(object instance, Message message)
{
foreach (var predicate in this.MayInterleavePredicates)
{
if (predicate(message)) return true;
if (predicate.Invoke(instance, message.BodyObject as IInvokable)) return true;
}

return false;
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Runtime/Catalog/ActivationData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,7 @@ bool MayInvokeRequest(Message incoming)
{
try
{
return canInterleave.MayInterleave(incoming);
return canInterleave.MayInterleave(GrainInstance, incoming);
}
catch (Exception exception)
{
Expand Down
19 changes: 17 additions & 2 deletions test/Grains/TestGrainInterfaces/IReentrancyGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public interface INonReentrantGrain : IGrainWithIntegerKey
Task SetSelf(INonReentrantGrain self);
}

public interface IMayInterleavePredicateGrain : IGrainWithIntegerKey
public interface IMayInterleaveStaticPredicateGrain : IGrainWithIntegerKey
{
Task<string> One(string arg); // this interleaves only when arg == "reentrant"

Expand All @@ -35,7 +35,22 @@ public interface IMayInterleavePredicateGrain : IGrainWithIntegerKey
Task SubscribeToStream();
Task PushToStream(string item);

Task SetSelf(IMayInterleavePredicateGrain self);
Task SetSelf(IMayInterleaveStaticPredicateGrain self);
}

public interface IMayInterleaveInstancedPredicateGrain : IGrainWithIntegerKey
{
Task<string> One(string arg); // this interleaves only when arg == "reentrant"

Task<string> Two();
Task<string> TwoReentrant();

Task Exceptional();

Task SubscribeToStream();
Task PushToStream(string item);

Task SetSelf(IMayInterleaveInstancedPredicateGrain self);
}

[Unordered]
Expand Down
96 changes: 90 additions & 6 deletions test/Grains/TestGrains/ReentrantGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ public Task SetSelf(INonReentrantGrain self)
}

[MayInterleave(nameof(MayInterleave))]
public class MayInterleavePredicateGrain : Grain, IMayInterleavePredicateGrain
public class MayInterleaveStaticPredicateGrain : Grain, IMayInterleaveStaticPredicateGrain
{
private readonly ILogger logger;

public MayInterleavePredicateGrain(ILoggerFactory loggerFactory)
public MayInterleaveStaticPredicateGrain(ILoggerFactory loggerFactory)
{
this.logger = loggerFactory.CreateLogger($"{this.GetType().Name}-{this.IdentityString}");
}
Expand Down Expand Up @@ -111,9 +111,9 @@ public static bool MayInterleave(IInvokable req)

static object UnwrapImmutable(object item) => item is Immutable<object> ? ((Immutable<object>)item).Value : item;

private IMayInterleavePredicateGrain Self { get; set; }
private IMayInterleaveStaticPredicateGrain Self { get; set; }

// this interleaves only when arg == "reentrant"
// this interleaves only when arg == "reentrant"
// and test predicate will throw when arg = "err"
public Task<string> One(string arg)
{
Expand Down Expand Up @@ -151,10 +151,94 @@ public Task PushToStream(string item)
return GetStream().OnNextAsync(item);
}

IAsyncStream<string> GetStream() =>
IAsyncStream<string> GetStream() =>
this.GetStreamProvider("sms").GetStream<string>("test-stream-interleave", Guid.Empty);

public Task SetSelf(IMayInterleavePredicateGrain self)
public Task SetSelf(IMayInterleaveStaticPredicateGrain self)
{
Self = self;
return Task.CompletedTask;
}
}

[MayInterleave(nameof(MayInterleave))]
public class MayInterleaveInstancedPredicateGrain : Grain, IMayInterleaveInstancedPredicateGrain
{
private readonly ILogger logger;

public MayInterleaveInstancedPredicateGrain(ILoggerFactory loggerFactory)
{
this.logger = loggerFactory.CreateLogger($"{this.GetType().Name}-{this.IdentityString}");
}

public bool MayInterleave(IInvokable req)
{
// not interested
if (req.GetArgumentCount() == 0)
return false;

string arg = null;

// assume single argument message
if (req.GetArgumentCount() == 1)
arg = (string)UnwrapImmutable(req.GetArgument(0));

// assume stream message
if (req.GetArgumentCount() == 2)
arg = (string)UnwrapImmutable(req.GetArgument(1));

if (arg == "err")
throw new ApplicationException("boom");

return arg == "reentrant";
}

static object UnwrapImmutable(object item) => item is Immutable<object> ? ((Immutable<object>)item).Value : item;

private IMayInterleaveInstancedPredicateGrain Self { get; set; }

// this interleaves only when arg == "reentrant"
// and test predicate will throw when arg = "err"
public Task<string> One(string arg)
{
return Task.FromResult("one");
}

public async Task<string> Two()
{
return await Self.One("") + " two";
}

public async Task<string> TwoReentrant()
{
return await Self.One("reentrant") + " two";
}

public Task Exceptional()
{
return Self.One("err");
}

public async Task SubscribeToStream()
{
var stream = GetStream();

await stream.SubscribeAsync((item, _) =>
{
logger.LogInformation("Received stream item: {Item}", item);
return Task.CompletedTask;
});
}

public Task PushToStream(string item)
{
return GetStream().OnNextAsync(item);
}

IAsyncStream<string> GetStream() =>
this.GetStreamProvider("sms").GetStream<string>("test-stream-interleave", Guid.Empty);

public Task SetSelf(IMayInterleaveInstancedPredicateGrain self)
{
Self = self;
return Task.CompletedTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public void NonReentrantGrain(bool performDeadlockDetection)
this.logger.LogInformation("Reentrancy NonReentrantGrain Test finished OK.");
}

public void NonReentrantGrain_WithMayInterleavePredicate_WhenPredicateReturnsFalse(bool performDeadlockDetection)
public void NonReentrantGrain_WithMayInterleaveStaticPredicate_WhenPredicateReturnsFalse(bool performDeadlockDetection)
{
var grain = this.grainFactory.GetGrain<IMayInterleavePredicateGrain>(OrleansTestingBase.GetRandomGrainId());
var grain = this.grainFactory.GetGrain<IMayInterleaveStaticPredicateGrain>(OrleansTestingBase.GetRandomGrainId());
grain.SetSelf(grain).Wait();
bool timeout = false;
bool deadlock = false;
Expand All @@ -69,6 +69,31 @@ public void NonReentrantGrain_WithMayInterleavePredicate_WhenPredicateReturnsFal
this.logger.LogInformation("Reentrancy NonReentrantGrain_WithMayInterleavePredicate_WhenPredicateReturnsFalse Test finished OK.");
}

public void NonReentrantGrain_WithMayInterleaveInstancedPredicate_WhenPredicateReturnsFalse(bool performDeadlockDetection)
{
var grain = this.grainFactory.GetGrain<IMayInterleaveInstancedPredicateGrain>(OrleansTestingBase.GetRandomGrainId());
grain.SetSelf(grain).Wait();
bool timeout = false;
bool deadlock = false;
try
{
timeout = !grain.Two().Wait(2000);
}
catch (Exception exc)
{
Assert.True(false, string.Format("Unexpected exception {0}: {1}", exc.Message, exc.StackTrace));
}
if (performDeadlockDetection)
{
Assert.True(deadlock, "Non-reentrant grain should deadlock when MayInterleave predicate returns false");
}
else
{
Assert.True(timeout, "Non-reentrant grain should timeout when MayInterleave predicate returns false");
}
this.logger.LogInformation("Reentrancy NonReentrantGrain_WithMayInterleaveInstancedPredicate_WhenPredicateReturnsFalse Test finished OK.");
}

public void UnorderedNonReentrantGrain(bool performDeadlockDetection)
{
IUnorderedNonReentrantGrain unonreentrant = this.grainFactory.GetGrain<IUnorderedNonReentrantGrain>(OrleansTestingBase.GetRandomGrainId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,15 @@ public void NonReentrantGrain()
}

[Fact, TestCategory("Functional"), TestCategory("Tasks"), TestCategory("Reentrancy")]
public void NonReentrantGrain_WithMayInterleavePredicate_WhenPredicateReturnsFalse()
public void NonReentrantGrain_WithMayInterleaveStaticPredicate_WhenPredicateReturnsFalse()
{
this.runner.NonReentrantGrain_WithMayInterleavePredicate_WhenPredicateReturnsFalse(false);
this.runner.NonReentrantGrain_WithMayInterleaveStaticPredicate_WhenPredicateReturnsFalse(false);
}

[Fact, TestCategory("Functional"), TestCategory("Tasks"), TestCategory("Reentrancy")]
public void NonReentrantGrain_WithMayInterleaveInstancedPredicate_WhenPredicateReturnsFalse()
{
this.runner.NonReentrantGrain_WithMayInterleaveInstancedPredicate_WhenPredicateReturnsFalse(false);
}

[Fact, TestCategory("Functional"), TestCategory("Tasks"), TestCategory("Reentrancy")]
Expand Down
Loading

0 comments on commit 46982e9

Please sign in to comment.