diff --git a/src/Redis/Orleans.Clustering.Redis/Hosting/HostingExtensions.ICientBuilder.cs b/src/Redis/Orleans.Clustering.Redis/Hosting/HostingExtensions.ICientBuilder.cs index d9f82f6c52..ded2d9feba 100644 --- a/src/Redis/Orleans.Clustering.Redis/Hosting/HostingExtensions.ICientBuilder.cs +++ b/src/Redis/Orleans.Clustering.Redis/Hosting/HostingExtensions.ICientBuilder.cs @@ -1,9 +1,10 @@ -using System; +using System; using Orleans; using Microsoft.Extensions.DependencyInjection; using Orleans.Hosting; using Orleans.Messaging; using Orleans.Clustering.Redis; +using StackExchange.Redis; namespace Microsoft.Extensions.Hosting { @@ -25,7 +26,7 @@ public static IClientBuilder UseRedisClustering(this IClientBuilder builder, Act } services - .AddRedis() + .AddRedisClustering() .AddSingleton(); }); } @@ -33,15 +34,14 @@ public static IClientBuilder UseRedisClustering(this IClientBuilder builder, Act /// /// Configures Redis as the clustering provider. /// - public static IClientBuilder UseRedisClustering(this IClientBuilder builder, string redisConnectionString, int db = 0) + public static IClientBuilder UseRedisClustering(this IClientBuilder builder, string redisConnectionString) { return builder.ConfigureServices(services => services .Configure(opt => { - opt.ConnectionString = redisConnectionString; - opt.Database = db; + opt.ConfigurationOptions = ConfigurationOptions.Parse(redisConnectionString); }) - .AddRedis() + .AddRedisClustering() .AddSingleton()); } diff --git a/src/Redis/Orleans.Clustering.Redis/Hosting/HostingExtensions.ISiloBuilder.cs b/src/Redis/Orleans.Clustering.Redis/Hosting/HostingExtensions.ISiloBuilder.cs index 5ae281fed9..906bae176f 100644 --- a/src/Redis/Orleans.Clustering.Redis/Hosting/HostingExtensions.ISiloBuilder.cs +++ b/src/Redis/Orleans.Clustering.Redis/Hosting/HostingExtensions.ISiloBuilder.cs @@ -1,8 +1,9 @@ -using System; +using System; using Orleans; using Microsoft.Extensions.DependencyInjection; using Orleans.Hosting; using Orleans.Clustering.Redis; +using StackExchange.Redis; namespace Microsoft.Extensions.Hosting { @@ -23,23 +24,27 @@ public static ISiloBuilder UseRedisClustering(this ISiloBuilder builder, Action< services.Configure(configuration); } - services.AddRedis(); + services.AddRedisClustering(); }); } /// /// Configures Redis as the clustering provider. /// - public static ISiloBuilder UseRedisClustering(this ISiloBuilder builder, string redisConnectionString, int db = 0) + public static ISiloBuilder UseRedisClustering(this ISiloBuilder builder, string redisConnectionString) { return builder.ConfigureServices(services => services - .Configure(options => { options.Database = db; options.ConnectionString = redisConnectionString; }) - .AddRedis()); + .Configure(options => + { + options.ConfigurationOptions = ConfigurationOptions.Parse(redisConnectionString); + }) + .AddRedisClustering()); } - internal static IServiceCollection AddRedis(this IServiceCollection services) + internal static IServiceCollection AddRedisClustering(this IServiceCollection services) { services.AddSingleton(); + services.AddSingleton(); services.AddSingleton(sp => sp.GetRequiredService()); return services; } diff --git a/src/Redis/Orleans.Clustering.Redis/Orleans.Clustering.Redis.csproj b/src/Redis/Orleans.Clustering.Redis/Orleans.Clustering.Redis.csproj index 2bd5989d2e..74c237e7c4 100644 --- a/src/Redis/Orleans.Clustering.Redis/Orleans.Clustering.Redis.csproj +++ b/src/Redis/Orleans.Clustering.Redis/Orleans.Clustering.Redis.csproj @@ -7,6 +7,7 @@ $(PackageTags) Redis Clustering $(DefaultTargetFrameworks) beta1 + true diff --git a/src/Redis/Orleans.Clustering.Redis/Providers/RedisClusteringOptions.cs b/src/Redis/Orleans.Clustering.Redis/Providers/RedisClusteringOptions.cs index 63ae47f835..86f1448dbb 100644 --- a/src/Redis/Orleans.Clustering.Redis/Providers/RedisClusteringOptions.cs +++ b/src/Redis/Orleans.Clustering.Redis/Providers/RedisClusteringOptions.cs @@ -1,3 +1,4 @@ +using Orleans.Runtime; using StackExchange.Redis; using System; using System.Threading.Tasks; @@ -10,26 +11,55 @@ namespace Orleans.Clustering.Redis public class RedisClusteringOptions { /// - /// Specifies the database identi + /// Gets or sets the Redis client configuration. /// - public int Database { get; set; } + [RedactRedisConfigurationOptions] + public ConfigurationOptions ConfigurationOptions { get; set; } /// - /// The connection string. + /// The delegate used to create a Redis connection multiplexer. /// - public string ConnectionString { get; set; } = "localhost:6379"; + public Func> CreateMultiplexer { get; set; } = DefaultCreateMultiplexer; /// - /// The delegate used to create a Redis connection multiplexer. + /// Entry expiry, null by default. A value should be set ONLY for ephemeral environments (like in tests). + /// Setting a value different from null will cause entries to be deleted after some period of time. /// - public Func> CreateMultiplexer { get; set; } = DefaultCreateMultiplexer; + public TimeSpan? EntryExpiry { get; set; } = null; /// /// The default multiplexer creation delegate. /// public static async Task DefaultCreateMultiplexer(RedisClusteringOptions options) { - return await ConnectionMultiplexer.ConnectAsync(options.ConnectionString); + return await ConnectionMultiplexer.ConnectAsync(options.ConfigurationOptions); + } + } + + internal class RedactRedisConfigurationOptions : RedactAttribute + { + public override string Redact(object value) => value is ConfigurationOptions cfg ? cfg.ToString(includePassword: false) : base.Redact(value); + } + + /// + /// Configuration validator for . + /// + public class RedisClusteringOptionsValidator : IConfigurationValidator + { + private readonly RedisClusteringOptions _options; + + public RedisClusteringOptionsValidator(RedisClusteringOptions options) + { + _options = options; + } + + /// + public void ValidateConfiguration() + { + if (_options.ConfigurationOptions == null) + { + throw new OrleansConfigurationException($"Invalid configuration for {nameof(RedisMembershipTable)}. {nameof(RedisClusteringOptions)}.{nameof(_options.ConfigurationOptions)} is required."); + } } } } \ No newline at end of file diff --git a/src/Redis/Orleans.Clustering.Redis/Storage/RedisMembershipTable.cs b/src/Redis/Orleans.Clustering.Redis/Storage/RedisMembershipTable.cs index 2a9c9981c7..c7d92f3245 100644 --- a/src/Redis/Orleans.Clustering.Redis/Storage/RedisMembershipTable.cs +++ b/src/Redis/Orleans.Clustering.Redis/Storage/RedisMembershipTable.cs @@ -6,8 +6,8 @@ using Newtonsoft.Json; using System.Linq; using Microsoft.Extensions.Options; -using System.Runtime.CompilerServices; using System.Globalization; +using System.Text; namespace Orleans.Clustering.Redis { @@ -26,7 +26,7 @@ public RedisMembershipTable(IOptions redisOptions, IOpti { _redisOptions = redisOptions.Value; _clusterOptions = clusterOptions.Value; - _clusterKey = $"{_clusterOptions.ServiceId}/{_clusterOptions.ClusterId}"; + _clusterKey = Encoding.UTF8.GetBytes($"{_clusterOptions.ServiceId}/members/{_clusterOptions.ClusterId}"); _jsonSerializerSettings = JsonSettings.JsonSerializerSettings; } @@ -40,11 +40,16 @@ public async Task DeleteMembershipTableEntries(string clusterId) public async Task InitializeMembershipTable(bool tryInitTableVersion) { _muxer = await _redisOptions.CreateMultiplexer(_redisOptions); - _db = _muxer.GetDatabase(_redisOptions.Database); + _db = _muxer.GetDatabase(); if (tryInitTableVersion) { await _db.HashSetAsync(_clusterKey, TableVersionKey, SerializeVersion(DefaultTableVersion), When.NotExists); + + if (_redisOptions.EntryExpiry is { } expiry) + { + await _db.KeyExpireAsync(_clusterKey, expiry); + } } this.IsInitialized = true; diff --git a/src/Redis/Orleans.GrainDirectory.Redis/Hosting/RedisGrainDirectoryExtensions.cs b/src/Redis/Orleans.GrainDirectory.Redis/Hosting/RedisGrainDirectoryExtensions.cs index 6566c03a17..92dc69f845 100644 --- a/src/Redis/Orleans.GrainDirectory.Redis/Hosting/RedisGrainDirectoryExtensions.cs +++ b/src/Redis/Orleans.GrainDirectory.Redis/Hosting/RedisGrainDirectoryExtensions.cs @@ -8,10 +8,13 @@ namespace Orleans.Hosting { + /// + /// Extensions for configuring Redis as a grain directory provider. + /// public static class RedisGrainDirectoryExtensions { /// - /// Use a Redis data-store as the default Grain Directory + /// Adds a default grain directory which persists entries in Redis. /// public static ISiloBuilder UseRedisGrainDirectoryAsDefault( this ISiloBuilder builder, @@ -21,7 +24,7 @@ public static ISiloBuilder UseRedisGrainDirectoryAsDefault( } /// - /// Use a Redis data-store as the default Grain Directory + /// Adds a default grain directory which persists entries in Redis. /// public static ISiloBuilder UseRedisGrainDirectoryAsDefault( this ISiloBuilder builder, @@ -31,7 +34,7 @@ public static ISiloBuilder UseRedisGrainDirectoryAsDefault( } /// - /// Add a Redis data-store as a named Grain Directory + /// Adds a named grain directory which persists entries in Redis. /// public static ISiloBuilder AddRedisGrainDirectory( this ISiloBuilder builder, @@ -42,7 +45,7 @@ public static ISiloBuilder AddRedisGrainDirectory( } /// - /// Add a Redis data-store as a named Grain Directory + /// Adds a named grain directory which persists entries in Redis. /// public static ISiloBuilder AddRedisGrainDirectory( this ISiloBuilder builder, @@ -59,7 +62,7 @@ private static IServiceCollection AddRedisGrainDirectory( { configureOptions.Invoke(services.AddOptions(name)); services - .AddTransient(sp => new RedisGrainDirectoryOptionsValidator(sp.GetRequiredService>().Get(name))) + .AddTransient(sp => new RedisGrainDirectoryOptionsValidator(sp.GetRequiredService>().Get(name), name)) .ConfigureNamedOptionForLogging(name) .AddSingletonNamedService(name, (sp, name) => ActivatorUtilities.CreateInstance(sp, sp.GetOptionsByName(name))) .AddSingletonNamedService>(name, (s, n) => (ILifecycleParticipant)s.GetRequiredServiceByName(n)); diff --git a/src/Redis/Orleans.GrainDirectory.Redis/Options/RedisGrainDirectoryOptions.cs b/src/Redis/Orleans.GrainDirectory.Redis/Options/RedisGrainDirectoryOptions.cs index 7220bb65f3..1f1f84daba 100644 --- a/src/Redis/Orleans.GrainDirectory.Redis/Options/RedisGrainDirectoryOptions.cs +++ b/src/Redis/Orleans.GrainDirectory.Redis/Options/RedisGrainDirectoryOptions.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; using Orleans.GrainDirectory.Redis; using Orleans.Runtime; using StackExchange.Redis; @@ -11,37 +12,53 @@ namespace Orleans.Configuration public class RedisGrainDirectoryOptions { /// - /// Configure the Redis client + /// Gets or sets the Redis client configuration. /// [RedactRedisConfigurationOptions] public ConfigurationOptions ConfigurationOptions { get; set; } + /// + /// The delegate used to create a Redis connection multiplexer. + /// + public Func> CreateMultiplexer { get; set; } = DefaultCreateMultiplexer; + /// /// Entry expiry, null by default. A value should be set ONLY for ephemeral environments (like in tests). /// Setting a value different from null will cause duplicate activations in the cluster. /// public TimeSpan? EntryExpiry { get; set; } = null; + + /// + /// The default multiplexer creation delegate. + /// + public static async Task DefaultCreateMultiplexer(RedisGrainDirectoryOptions options) => await ConnectionMultiplexer.ConnectAsync(options.ConfigurationOptions); } - public class RedactRedisConfigurationOptions : RedactAttribute + internal class RedactRedisConfigurationOptions : RedactAttribute { public override string Redact(object value) => value is ConfigurationOptions cfg ? cfg.ToString(includePassword: false) : base.Redact(value); } + /// + /// Configuration validator for . + /// public class RedisGrainDirectoryOptionsValidator : IConfigurationValidator { - private readonly RedisGrainDirectoryOptions options; + private readonly RedisGrainDirectoryOptions _options; + private readonly string _name; - public RedisGrainDirectoryOptionsValidator(RedisGrainDirectoryOptions options) + public RedisGrainDirectoryOptionsValidator(RedisGrainDirectoryOptions options, string name) { - this.options = options; + _options = options; + _name = name; } + /// public void ValidateConfiguration() { - if (this.options.ConfigurationOptions == null) + if (_options.ConfigurationOptions == null) { - throw new OrleansConfigurationException($"Invalid {nameof(RedisGrainDirectoryOptions)} values for {nameof(RedisGrainDirectory)}. {nameof(options.ConfigurationOptions)} is required."); + throw new OrleansConfigurationException($"Invalid configuration for {nameof(RedisGrainDirectory)} with name {_name}. {nameof(RedisGrainDirectoryOptions)}.{nameof(_options.ConfigurationOptions)} is required."); } } } diff --git a/src/Redis/Orleans.GrainDirectory.Redis/Orleans.GrainDirectory.Redis.csproj b/src/Redis/Orleans.GrainDirectory.Redis/Orleans.GrainDirectory.Redis.csproj index 5359bef72e..3ece13b004 100644 --- a/src/Redis/Orleans.GrainDirectory.Redis/Orleans.GrainDirectory.Redis.csproj +++ b/src/Redis/Orleans.GrainDirectory.Redis/Orleans.GrainDirectory.Redis.csproj @@ -7,6 +7,7 @@ $(PackageTags) Redis Grain Directory $(DefaultTargetFrameworks) beta1 + true diff --git a/src/Redis/Orleans.GrainDirectory.Redis/Properties/AssemblyInfo.cs b/src/Redis/Orleans.GrainDirectory.Redis/Properties/AssemblyInfo.cs new file mode 100644 index 0000000000..0fc65c7673 --- /dev/null +++ b/src/Redis/Orleans.GrainDirectory.Redis/Properties/AssemblyInfo.cs @@ -0,0 +1,3 @@ +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("Tester.Redis")] diff --git a/src/Redis/Orleans.GrainDirectory.Redis/RedisGrainDirectory.cs b/src/Redis/Orleans.GrainDirectory.Redis/RedisGrainDirectory.cs index c6100da49a..e6ede0960f 100644 --- a/src/Redis/Orleans.GrainDirectory.Redis/RedisGrainDirectory.cs +++ b/src/Redis/Orleans.GrainDirectory.Redis/RedisGrainDirectory.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; @@ -13,13 +14,25 @@ namespace Orleans.GrainDirectory.Redis { public class RedisGrainDirectory : IGrainDirectory, ILifecycleParticipant { + private const string DeleteScript = + """ + local cur = redis.call('GET', KEYS[1]) + if cur ~= false then + local typedCur = cjson.decode(cur) + if typedCur.ActivationId == ARGV[1] then + return redis.call('DEL', KEYS[1]) + end + end + return 0 + """; + private readonly RedisGrainDirectoryOptions directoryOptions; private readonly ClusterOptions clusterOptions; private readonly ILogger logger; + private readonly RedisKey _keyPrefix; - private ConnectionMultiplexer redis; + private IConnectionMultiplexer redis; private IDatabase database; - private LuaScript deleteScript; public RedisGrainDirectory( RedisGrainDirectoryOptions directoryOptions, @@ -29,6 +42,7 @@ public RedisGrainDirectory( this.directoryOptions = directoryOptions; this.logger = logger; this.clusterOptions = clusterOptions.Value; + _keyPrefix = Encoding.UTF8.GetBytes($"{this.clusterOptions.ClusterId}/directory/"); } public async Task Lookup(GrainId grainId) @@ -91,7 +105,10 @@ public async Task Unregister(GrainAddress address) { try { - var result = (int) await this.database.ScriptEvaluateAsync(this.deleteScript, new { key = GetKey(address.GrainId), val = address.ActivationId.ToParsableString() }); + var result = (int) await this.database.ScriptEvaluateAsync( + DeleteScript, + keys: new RedisKey[] { GetKey(address.GrainId) }, + values: new RedisValue[] { address.ActivationId.ToParsableString() }); if (this.logger.IsEnabled(LogLevel.Debug)) this.logger.LogDebug("Unregister {GrainId} ({Address}): {Result}", address.GrainId, JsonSerializer.Serialize(address), (result != 0) ? "OK" : "Conflict"); @@ -121,7 +138,7 @@ public void Participate(ISiloLifecycle lifecycle) public async Task Initialize(CancellationToken ct = default) { - this.redis = await ConnectionMultiplexer.ConnectAsync(this.directoryOptions.ConfigurationOptions); + this.redis = await directoryOptions.CreateMultiplexer(directoryOptions); // Configure logging this.redis.ConnectionRestored += this.LogConnectionRestored; @@ -130,18 +147,6 @@ public async Task Initialize(CancellationToken ct = default) this.redis.InternalError += this.LogInternalError; this.database = this.redis.GetDatabase(); - - this.deleteScript = LuaScript.Prepare( - @" -local cur = redis.call('GET', @key) -if cur ~= false then - local typedCur = cjson.decode(cur) - if typedCur.ActivationId == @val then - return redis.call('DEL', @key) - end -end -return 0 - "); } private async Task Uninitialize(CancellationToken arg) @@ -155,7 +160,7 @@ private async Task Uninitialize(CancellationToken arg) } } - private string GetKey(GrainId grainId) => $"{this.clusterOptions.ClusterId}-{grainId}"; + private RedisKey GetKey(GrainId grainId) => _keyPrefix.Append(grainId.ToString()); #region Logging private void LogConnectionRestored(object sender, ConnectionFailedEventArgs e) diff --git a/src/Redis/Orleans.Persistence.Redis/Hosting/RedisGrainStorageServiceCollectionExtensions.cs b/src/Redis/Orleans.Persistence.Redis/Hosting/RedisGrainStorageServiceCollectionExtensions.cs index 7c69e5fa9e..e79b50813c 100644 --- a/src/Redis/Orleans.Persistence.Redis/Hosting/RedisGrainStorageServiceCollectionExtensions.cs +++ b/src/Redis/Orleans.Persistence.Redis/Hosting/RedisGrainStorageServiceCollectionExtensions.cs @@ -17,7 +17,7 @@ namespace Orleans.Hosting public static class RedisGrainStorageServiceCollectionExtensions { /// - /// Configure silo to use Redis as the default grain storage. + /// Configures Redis as the default grain storage provider. /// public static IServiceCollection AddRedisGrainStorageAsDefault(this IServiceCollection services, Action configureOptions) { @@ -25,7 +25,7 @@ public static IServiceCollection AddRedisGrainStorageAsDefault(this IServiceColl } /// - /// Configure silo to use Redis for grain storage. + /// Configures Redis as a grain storage provider. /// public static IServiceCollection AddRedisGrainStorage(this IServiceCollection services, string name, Action configureOptions) { @@ -33,7 +33,7 @@ public static IServiceCollection AddRedisGrainStorage(this IServiceCollection se } /// - /// Configure silo to use Redis as the default grain storage. + /// Configures Redis as the default grain storage provider. /// public static IServiceCollection AddRedisGrainStorageAsDefault(this IServiceCollection services, Action> configureOptions = null) { @@ -41,7 +41,7 @@ public static IServiceCollection AddRedisGrainStorageAsDefault(this IServiceColl } /// - /// Configure silo to use Redis for grain storage. + /// Configures Redis as a grain storage provider. /// public static IServiceCollection AddRedisGrainStorage(this IServiceCollection services, string name, Action> configureOptions = null) diff --git a/src/Redis/Orleans.Persistence.Redis/Hosting/RedisSiloBuilderExtensions.cs b/src/Redis/Orleans.Persistence.Redis/Hosting/RedisSiloBuilderExtensions.cs index 45d948d0c0..a5b3d0b586 100644 --- a/src/Redis/Orleans.Persistence.Redis/Hosting/RedisSiloBuilderExtensions.cs +++ b/src/Redis/Orleans.Persistence.Redis/Hosting/RedisSiloBuilderExtensions.cs @@ -11,7 +11,7 @@ namespace Orleans.Hosting public static class RedisSiloBuilderExtensions { /// - /// Configure silo to use Redis as the default grain storage. + /// Configures Redis as the default grain storage provider. /// public static ISiloBuilder AddRedisGrainStorageAsDefault(this ISiloBuilder builder, Action configureOptions) { @@ -19,7 +19,7 @@ public static ISiloBuilder AddRedisGrainStorageAsDefault(this ISiloBuilder build } /// - /// Configure silo to use Redis for grain storage. + /// Configures Redis as a grain storage provider. /// public static ISiloBuilder AddRedisGrainStorage(this ISiloBuilder builder, string name, Action configureOptions) { @@ -27,7 +27,7 @@ public static ISiloBuilder AddRedisGrainStorage(this ISiloBuilder builder, strin } /// - /// Configure silo to use Redis as the default grain storage. + /// Configures Redis as the default grain storage provider. /// public static ISiloBuilder AddRedisGrainStorageAsDefault(this ISiloBuilder builder, Action> configureOptions = null) { @@ -35,7 +35,7 @@ public static ISiloBuilder AddRedisGrainStorageAsDefault(this ISiloBuilder build } /// - /// Configure silo to use Redis for grain storage. + /// Configures Redis as a grain storage provider. /// public static ISiloBuilder AddRedisGrainStorage(this ISiloBuilder builder, string name, Action> configureOptions = null) { diff --git a/src/Redis/Orleans.Persistence.Redis/Orleans.Persistence.Redis.csproj b/src/Redis/Orleans.Persistence.Redis/Orleans.Persistence.Redis.csproj index 7ad009fe77..cbdaf41d30 100644 --- a/src/Redis/Orleans.Persistence.Redis/Orleans.Persistence.Redis.csproj +++ b/src/Redis/Orleans.Persistence.Redis/Orleans.Persistence.Redis.csproj @@ -1,12 +1,13 @@ - Microsoft.Orleans.Persistance.Redis - Microsoft Orleans Persistance Redis Provider - Microsoft Orleans Persistance implementation that uses Redis - $(PackageTags) Redis Persistance + Microsoft.Orleans.Persistence.Redis + Microsoft Orleans Persistence Redis Provider + Microsoft Orleans Persistence implementation that uses Redis + $(PackageTags) Redis Persistence $(DefaultTargetFrameworks) beta1 + true diff --git a/src/Redis/Orleans.Persistence.Redis/Providers/RedisStorageOptions.cs b/src/Redis/Orleans.Persistence.Redis/Providers/RedisStorageOptions.cs index 9fb324278a..a37f21de65 100644 --- a/src/Redis/Orleans.Persistence.Redis/Providers/RedisStorageOptions.cs +++ b/src/Redis/Orleans.Persistence.Redis/Providers/RedisStorageOptions.cs @@ -1,4 +1,7 @@ -using Orleans.Storage; +using System; +using System.Threading.Tasks; +using Orleans.Storage; +using StackExchange.Redis; namespace Orleans.Persistence { @@ -7,21 +10,11 @@ namespace Orleans.Persistence /// public class RedisStorageOptions : IStorageProviderSerializerOptions { - /// - /// The connection string. - /// - public string ConnectionString { get; set; } = "localhost:6379"; - /// /// Whether or not to delete state during a clear operation. /// public bool DeleteOnClear { get; set; } - /// - /// The database number. - /// - public int? DatabaseNumber { get; set; } - /// /// Stage of silo lifecycle where storage should be initialized. Storage must be initialized prior to use. /// @@ -29,5 +22,32 @@ public class RedisStorageOptions : IStorageProviderSerializerOptions /// public IGrainStorageSerializer GrainStorageSerializer { get; set; } + + /// + /// Gets or sets the Redis client configuration. + /// + [RedactRedisConfigurationOptions] + public ConfigurationOptions ConfigurationOptions { get; set; } + + /// + /// The delegate used to create a Redis connection multiplexer. + /// + public Func> CreateMultiplexer { get; set; } = DefaultCreateMultiplexer; + + /// + /// Entry expiry, null by default. A value should be set ONLY for ephemeral environments (like in tests). + /// Setting a value different from null will cause duplicate activations in the cluster. + /// + public TimeSpan? EntryExpiry { get; set; } = null; + + /// + /// The default multiplexer creation delegate. + /// + public static async Task DefaultCreateMultiplexer(RedisStorageOptions options) => await ConnectionMultiplexer.ConnectAsync(options.ConfigurationOptions); + } + + internal class RedactRedisConfigurationOptions : RedactAttribute + { + public override string Redact(object value) => value is ConfigurationOptions cfg ? cfg.ToString(includePassword: false) : base.Redact(value); } } \ No newline at end of file diff --git a/src/Redis/Orleans.Persistence.Redis/Providers/RedisStorageOptionsValidator.cs b/src/Redis/Orleans.Persistence.Redis/Providers/RedisStorageOptionsValidator.cs index 0dfd8a1bfc..32b55712d9 100644 --- a/src/Redis/Orleans.Persistence.Redis/Providers/RedisStorageOptionsValidator.cs +++ b/src/Redis/Orleans.Persistence.Redis/Providers/RedisStorageOptionsValidator.cs @@ -1,4 +1,4 @@ -using Orleans.Runtime; +using Orleans.Runtime; namespace Orleans.Persistence { @@ -9,27 +9,15 @@ internal class RedisStorageOptionsValidator : IConfigurationValidator public RedisStorageOptionsValidator(RedisStorageOptions options, string name) { - this._options = options; - this._name = name; + _options = options; + _name = name; } public void ValidateConfiguration() { - var msg = $"Configuration for {nameof(RedisGrainStorage)} - {_name} is invalid"; - - if (_options == null) - { - throw new OrleansConfigurationException($"{msg} - {nameof(RedisStorageOptions)} is null"); - } - - if (string.IsNullOrWhiteSpace(_options.ConnectionString)) - { - throw new OrleansConfigurationException($"{msg} - {nameof(_options.ConnectionString)} is null or empty"); - } - - if (!_options.ConnectionString.Contains(':')) // host:port delimiter + if (_options.ConfigurationOptions == null) { - throw new OrleansConfigurationException($"{msg} - {nameof(_options.ConnectionString)} invalid format: {_options.ConnectionString}, should contain host and port delimited by ':'"); + throw new OrleansConfigurationException($"Invalid configuration for {nameof(RedisGrainStorage)} with name {_name}. {nameof(RedisStorageOptions)}.{nameof(_options.ConfigurationOptions)} is required."); } } } diff --git a/src/Redis/Orleans.Persistence.Redis/Storage/RedisGrainStorage.cs b/src/Redis/Orleans.Persistence.Redis/Storage/RedisGrainStorage.cs index 763c43c2b6..cb3ae7e1e1 100644 --- a/src/Redis/Orleans.Persistence.Redis/Storage/RedisGrainStorage.cs +++ b/src/Redis/Orleans.Persistence.Redis/Storage/RedisGrainStorage.cs @@ -1,6 +1,8 @@ -using System; +using System; using System.Diagnostics; +using System.Globalization; using System.Linq; +using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -17,22 +19,18 @@ namespace Orleans.Persistence /// /// Redis-based grain storage provider /// - public class RedisGrainStorage : IGrainStorage, ILifecycleParticipant + internal class RedisGrainStorage : IGrainStorage, ILifecycleParticipant { - private const string WriteScript = "local etag = redis.call('HGET', KEYS[1], 'etag')\nif etag == false or etag == ARGV[1] then return redis.call('HMSET', KEYS[1], 'etag', ARGV[2], 'data', ARGV[3]) else return false end"; - private const int ReloadWriteScriptMaxCount = 3; - private readonly string _serviceId; + private readonly RedisValue _ttl; + private readonly RedisKey _keyPrefix; private readonly string _name; private readonly ILogger _logger; private readonly RedisStorageOptions _options; private readonly IGrainStorageSerializer _grainStorageSerializer; - private ConnectionMultiplexer _connection; + private IConnectionMultiplexer _connection; private IDatabase _db; - private ConfigurationOptions _redisOptions; - private LuaScript _preparedWriteScript; - private byte[] _preparedWriteScriptHash; /// /// Creates a new instance of the type. @@ -49,6 +47,8 @@ public RedisGrainStorage( _options = options; _grainStorageSerializer = options.GrainStorageSerializer ?? grainStorageSerializer; _serviceId = clusterOptions.Value.ServiceId; + _ttl = options.EntryExpiry is { } ts ? ts.TotalSeconds.ToString(CultureInfo.InvariantCulture) : "-1"; + _keyPrefix = Encoding.UTF8.GetBytes($"{_serviceId}/state/"); } /// @@ -67,27 +67,14 @@ private async Task Init(CancellationToken cancellationToken) if (_logger.IsEnabled(LogLevel.Debug)) { _logger.LogDebug( - "RedisGrainStorage {Name} is initializing: ServiceId={ServiceId} DatabaseNumber={DatabaseNumber} DeleteOnClear={DeleteOnClear}", + "RedisGrainStorage {Name} is initializing: ServiceId={ServiceId} DeleteOnClear={DeleteOnClear}", _name, _serviceId, - _options.DatabaseNumber, _options.DeleteOnClear); } - _redisOptions = ConfigurationOptions.Parse(_options.ConnectionString); - _connection = await ConnectionMultiplexer.ConnectAsync(_redisOptions).ConfigureAwait(false); - - if (_options.DatabaseNumber.HasValue) - { - _db = _connection.GetDatabase(_options.DatabaseNumber.Value); - } - else - { - _db = _connection.GetDatabase(); - } - - _preparedWriteScript = LuaScript.Prepare(WriteScript); - _preparedWriteScriptHash = await LoadWriteScriptAsync().ConfigureAwait(false); + _connection = await _options.CreateMultiplexer(_options).ConfigureAwait(false); + _db = _connection.GetDatabase(); if (_logger.IsEnabled(LogLevel.Debug)) { @@ -108,33 +95,15 @@ private async Task Init(CancellationToken cancellationToken) _name, _serviceId, timer.Elapsed.TotalMilliseconds.ToString("0.00")); - throw; - } - } - - private async Task LoadWriteScriptAsync() - { - Debug.Assert(_connection is not null); - Debug.Assert(_preparedWriteScript is not null); - Debug.Assert(_redisOptions.EndPoints.Count > 0); - - System.Net.EndPoint[] endPoints = _connection.GetEndPoints(); - var loadTasks = new Task[endPoints.Length]; - for (int i = 0; i < endPoints.Length; i++) - { - var endpoint = endPoints.ElementAt(i); - var server = _connection.GetServer(endpoint); - loadTasks[i] = _preparedWriteScript.LoadAsync(server); + throw new RedisStorageException(Invariant($"{ex.GetType()}: {ex.Message}")); } - await Task.WhenAll(loadTasks).ConfigureAwait(false); - return loadTasks[0].Result.Hash; } /// public async Task ReadStateAsync(string grainType, GrainId grainId, IGrainState grainState) { - var key = grainId.ToString(); + var key = GetKey(grainId); try { @@ -147,10 +116,12 @@ public async Task ReadStateAsync(string grainType, GrainId grainId, IGrainSta grainState.State = _grainStorageSerializer.Deserialize(valueEntry.Value); grainState.ETag = etagEntry.Value; + grainState.RecordExists = true; } else { - grainState.ETag = Guid.NewGuid().ToString(); + grainState.ETag = null; + grainState.RecordExists = false; } } catch (Exception e) @@ -167,22 +138,40 @@ public async Task ReadStateAsync(string grainType, GrainId grainId, IGrainSta /// public async Task WriteStateAsync(string grainType, GrainId grainId, IGrainState grainState) { - var etag = grainState.ETag ?? "null"; - var key = grainId.ToString(); - var newEtag = Guid.NewGuid().ToString(); + const string WriteScript = + """ + local etag = redis.call('HGET', KEYS[1], 'etag') + if (not etag and (ARGV[1] == nil or ARGV[1] == '')) or etag == ARGV[1] then + redis.call('HMSET', KEYS[1], 'etag', ARGV[2], 'data', ARGV[3]) + if ARGV[4] ~= '-1' then + redis.call('EXPIRE', KEYS[1], ARGV[4]) + end + return 0 + else + return -1 + end + """; + + var key = GetKey(grainId); + RedisValue etag = grainState.ETag ?? ""; + RedisValue newEtag = Guid.NewGuid().ToString("N"); - RedisValue payload = default; - RedisResult writeWithScriptResponse = null; try { - payload = new RedisValue(_grainStorageSerializer.Serialize(grainState.State).ToString()); - writeWithScriptResponse = await WriteToRedisUsingPreparedScriptAsync(payload, - etag: etag, - key: key, - newEtag: newEtag) - .ConfigureAwait(false); + var payload = new RedisValue(_grainStorageSerializer.Serialize(grainState.State).ToString()); + var keys = new RedisKey[] { key }; + var args = new RedisValue[] { etag, newEtag, payload, _ttl }; + var response = await _db.ScriptEvaluateAsync(WriteScript, keys, args).ConfigureAwait(false); + + if (response is not null && (int)response == -1) + { + throw new InconsistentStateException($"Version conflict ({nameof(WriteStateAsync)}): ServiceId={_serviceId} ProviderName={_name} GrainType={grainType} GrainId={grainId} ETag={grainState.ETag}."); + } + + grainState.ETag = newEtag; + grainState.RecordExists = true; } - catch (Exception e) + catch (Exception exception) when (exception is not InconsistentStateException) { _logger.LogError( "Failed to write grain state for {GrainType} grain with ID: {GrainId} with redis key {Key}.", @@ -190,52 +179,42 @@ public async Task WriteStateAsync(string grainType, GrainId grainId, IGrainSt grainId, key); throw new RedisStorageException( - Invariant($"Failed to write grain state for {grainType} grain with ID: {grainId} with redis key {key}."), e); + Invariant($"Failed to write grain state for {grainType} grain with ID: {grainId} with redis key {key}. {exception.GetType()}: {exception.Message}")); } - - if (writeWithScriptResponse is not null && writeWithScriptResponse.IsNull) - { - throw new InconsistentStateException($"ETag mismatch - tried with ETag: {grainState.ETag}"); - } - - grainState.ETag = newEtag; } - private Task WriteToRedisUsingPreparedScriptAsync(RedisValue payload, string etag, string key, string newEtag) - { - var keys = new RedisKey[] { key }; - var args = new RedisValue[] { etag, newEtag, payload }; - return WriteToRedisUsingPreparedScriptAsync(attemptNum: 0); - + private RedisKey GetKey(GrainId grainId) => _keyPrefix.Append(grainId.ToString()); - async Task WriteToRedisUsingPreparedScriptAsync(int attemptNum) + /// + public async Task ClearStateAsync(string grainType, GrainId grainId, IGrainState grainState) + { + const string ClearScript = + """ + local etag = redis.call('HGET', KEYS[1], 'etag') + if (not etag and not ARGV[1]) or etag == ARGV[1] then + redis.call('DEL', KEYS[1]) + return 0 + else + return -1 + end + """; + try { - try - { - return await _db.ScriptEvaluateAsync(_preparedWriteScriptHash, keys, args).ConfigureAwait(false); - } - catch (RedisServerException rse) when (rse.Message is not null && rse.Message.StartsWith("NOSCRIPT ", StringComparison.Ordinal)) + RedisValue etag = grainState.ETag ?? ""; + var response = await _db.ScriptEvaluateAsync(ClearScript, keys: new[] { GetKey(grainId) }, values: new[] { etag }).ConfigureAwait(false); + + if (response is not null && (int)response == -1) { - // EVALSHA returned error 'NOSCRIPT No matching script. Please use EVAL.'. - // This means that SHA1 cache of Lua scripts is cleared at server side, possibly because of Redis server rebooted after Init() method was called. Need to reload Lua script. - // Several attempts are made just in case (e.g. if Redis server is rebooted right after previous script reload). - if (attemptNum >= ReloadWriteScriptMaxCount) - { - throw; - } - - await LoadWriteScriptAsync().ConfigureAwait(false); - return await WriteToRedisUsingPreparedScriptAsync(attemptNum: attemptNum + 1) - .ConfigureAwait(false); + throw new InconsistentStateException($"Version conflict ({nameof(ClearStateAsync)}): ServiceId={_serviceId} ProviderName={_name} GrainType={grainType} GrainId={grainId} ETag={grainState.ETag}."); } - } - } - /// - public async Task ClearStateAsync(string grainType, GrainId grainId, IGrainState grainState) - { - var key = grainId.ToString(); - await _db.KeyDeleteAsync(key).ConfigureAwait(false); + grainState.ETag = null; + grainState.RecordExists = false; + } + catch (Exception exception) when (exception is not InconsistentStateException) + { + throw new RedisStorageException(Invariant($"{exception.GetType()}: {exception.Message}")); + } } private async Task Close(CancellationToken cancellationToken) diff --git a/src/Redis/Orleans.Reminders.Redis/Hosting/SiloBuilderReminderExtensions.cs b/src/Redis/Orleans.Reminders.Redis/Hosting/SiloBuilderReminderExtensions.cs index 34a9a11fd2..4ea2dcc4f0 100644 --- a/src/Redis/Orleans.Reminders.Redis/Hosting/SiloBuilderReminderExtensions.cs +++ b/src/Redis/Orleans.Reminders.Redis/Hosting/SiloBuilderReminderExtensions.cs @@ -1,7 +1,7 @@ -using System; +using System; using Microsoft.Extensions.DependencyInjection; - +using Microsoft.Extensions.Options; using Orleans.Configuration; using Orleans.Hosting; using Orleans.Reminders.Redis; @@ -47,6 +47,7 @@ public static IServiceCollection UseRedisReminderService(this IServiceCollection { services.AddSingleton(); services.Configure(configure); + services.AddSingleton(); services.ConfigureFormatter(); return services; } diff --git a/src/Redis/Orleans.Reminders.Redis/Orleans.Reminders.Redis.csproj b/src/Redis/Orleans.Reminders.Redis/Orleans.Reminders.Redis.csproj index e817a7e072..55290b343c 100644 --- a/src/Redis/Orleans.Reminders.Redis/Orleans.Reminders.Redis.csproj +++ b/src/Redis/Orleans.Reminders.Redis/Orleans.Reminders.Redis.csproj @@ -7,6 +7,7 @@ $(PackageTags) Redis Reminders $(DefaultTargetFrameworks) beta1 + true diff --git a/src/Redis/Orleans.Reminders.Redis/Providers/RedisReminderTableOptions.cs b/src/Redis/Orleans.Reminders.Redis/Providers/RedisReminderTableOptions.cs index 0cf0d8412b..576d9e65ac 100644 --- a/src/Redis/Orleans.Reminders.Redis/Providers/RedisReminderTableOptions.cs +++ b/src/Redis/Orleans.Reminders.Redis/Providers/RedisReminderTableOptions.cs @@ -1,6 +1,7 @@ -using System; +using System; using System.Threading.Tasks; - +using Orleans.Reminders.Redis; +using Orleans.Runtime; using StackExchange.Redis; namespace Orleans.Configuration @@ -10,29 +11,52 @@ namespace Orleans.Configuration /// public class RedisReminderTableOptions { - /// - /// The connection string. + /// Gets or sets the Redis client options. /// - public string ConnectionString { get; set; } = "localhost:6379"; + [RedactRedisConfigurationOptions] + public ConfigurationOptions ConfigurationOptions { get; set; } /// - /// The database number. + /// The delegate used to create a Redis connection multiplexer. /// - public int? DatabaseNumber { get; set; } + public Func> CreateMultiplexer { get; set; } = DefaultCreateMultiplexer; /// - /// The delegate used to create a Redis connection multiplexer. + /// Entry expiry, null by default. A value should be set ONLY for ephemeral environments (like in tests). + /// Setting a value different from null will cause reminder entries to be deleted after some period of time. /// - public Func> CreateMultiplexer { get; set; } = DefaultCreateMultiplexer; + public TimeSpan? EntryExpiry { get; set; } = null; /// /// The default multiplexer creation delegate. /// - public static async Task DefaultCreateMultiplexer(RedisReminderTableOptions options) + public static async Task DefaultCreateMultiplexer(RedisReminderTableOptions options) => await ConnectionMultiplexer.ConnectAsync(options.ConfigurationOptions); + } + + internal class RedactRedisConfigurationOptions : RedactAttribute + { + public override string Redact(object value) => value is ConfigurationOptions cfg ? cfg.ToString(includePassword: false) : base.Redact(value); + } + + /// + /// Configuration validator for . + /// + public class RedisReminderTableOptionsValidator : IConfigurationValidator + { + private readonly RedisReminderTableOptions _options; + + public RedisReminderTableOptionsValidator(RedisReminderTableOptions options) { - return await ConnectionMultiplexer.ConnectAsync(options.ConnectionString); + _options = options; } + public void ValidateConfiguration() + { + if (_options.ConfigurationOptions == null) + { + throw new OrleansConfigurationException($"Invalid configuration for {nameof(RedisReminderTable)}. {nameof(RedisReminderTableOptions)}.{nameof(_options.ConfigurationOptions)} is required."); + } + } } } diff --git a/src/Redis/Orleans.Reminders.Redis/Storage/RedisReminderTable.cs b/src/Redis/Orleans.Reminders.Redis/Storage/RedisReminderTable.cs index 4ca6e81b78..90873c3e0f 100644 --- a/src/Redis/Orleans.Reminders.Redis/Storage/RedisReminderTable.cs +++ b/src/Redis/Orleans.Reminders.Redis/Storage/RedisReminderTable.cs @@ -1,7 +1,9 @@ -using System; +using System; +using System.Buffers; using System.Collections.Generic; using System.Globalization; using System.Linq; +using System.Text; using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -13,12 +15,13 @@ using Orleans.Runtime; using StackExchange.Redis; +using static System.FormattableString; namespace Orleans.Reminders.Redis { internal class RedisReminderTable : IReminderTable { - private readonly RedisKey RemindersRedisKey; + private readonly RedisKey _hashSetKey; private readonly RedisReminderTableOptions _redisOptions; private readonly ClusterOptions _clusterOptions; private readonly ILogger _logger; @@ -42,104 +45,156 @@ public RedisReminderTable( _clusterOptions = clusterOptions.Value; _logger = logger; - RemindersRedisKey = $"{_clusterOptions.ServiceId}_Reminders"; + _hashSetKey = Encoding.UTF8.GetBytes($"{_clusterOptions.ServiceId}/reminders"); } public async Task Init() { - _muxer = await _redisOptions.CreateMultiplexer(_redisOptions); - _db = _redisOptions.DatabaseNumber.HasValue - ? _muxer.GetDatabase(_redisOptions.DatabaseNumber.Value) - : _muxer.GetDatabase(); + try + { + _muxer = await _redisOptions.CreateMultiplexer(_redisOptions); + _db = _muxer.GetDatabase(); + + if (_redisOptions.EntryExpiry is { } expiry) + { + await _db.KeyExpireAsync(_hashSetKey, expiry); + } + } + catch (Exception exception) + { + throw new RedisRemindersException(Invariant($"{exception.GetType()}: {exception.Message}")); + } } public async Task ReadRow(GrainId grainId, string reminderName) { - (string from, string to) = GetFilter(grainId, reminderName); - RedisValue[] values = await _db.SortedSetRangeByValueAsync(RemindersRedisKey, from, to); - if (values.Length == 0) + try { - return null; + var (from, to) = GetFilter(grainId, reminderName); + RedisValue[] values = await _db.SortedSetRangeByValueAsync(_hashSetKey, from, to); + if (values.Length == 0) + { + return null; + } + else + { + return ConvertToEntry(values.SingleOrDefault()); + } } - else + catch (Exception exception) { - return ConvertToEntry(values.SingleOrDefault()); + throw new RedisRemindersException(Invariant($"{exception.GetType()}: {exception.Message}")); } } public async Task ReadRows(GrainId grainId) { - (string from, string to) = GetFilter(grainId); - RedisValue[] values = await _db.SortedSetRangeByValueAsync(RemindersRedisKey, from, to); - IEnumerable records = values.Select(v => ConvertToEntry(v)); - return new ReminderTableData(records); + try + { + var (from, to) = GetFilter(grainId); + RedisValue[] values = await _db.SortedSetRangeByValueAsync(_hashSetKey, from, to); + IEnumerable records = values.Select(static v => ConvertToEntry(v)); + return new ReminderTableData(records); + } + catch (Exception exception) + { + throw new RedisRemindersException(Invariant($"{exception.GetType()}: {exception.Message}")); + } } public async Task ReadRows(uint begin, uint end) { - (string _, string from) = GetFilter(begin); - (string _, string to) = GetFilter(end); - IEnumerable values; - if (begin < end) + try { - // -----begin******end----- - values = await _db.SortedSetRangeByValueAsync(RemindersRedisKey, from, to); + var (_, from) = GetFilter(begin); + var (_, to) = GetFilter(end); + IEnumerable values; + if (begin < end) + { + // -----begin******end----- + values = await _db.SortedSetRangeByValueAsync(_hashSetKey, from, to); + } + else + { + // *****end------begin***** + RedisValue[] values1 = await _db.SortedSetRangeByValueAsync(_hashSetKey, from, "\"FFFFFFFF\",#"); + RedisValue[] values2 = await _db.SortedSetRangeByValueAsync(_hashSetKey, "\"00000000\",\"", to); + values = values1.Concat(values2); + } + + IEnumerable records = values.Select(static v => ConvertToEntry(v)); + return new ReminderTableData(records); } - else + catch (Exception exception) { - // *****end------begin***** - RedisValue[] values1 = await _db.SortedSetRangeByValueAsync(RemindersRedisKey, from, "[\"FFFFFFFF\",#"); - RedisValue[] values2 = await _db.SortedSetRangeByValueAsync(RemindersRedisKey, "[\"00000000\",\"", to); - values = values1.Concat(values2); + throw new RedisRemindersException(Invariant($"{exception.GetType()}: {exception.Message}")); } - - IEnumerable records = values.Select(v => ConvertToEntry(v)); - return new ReminderTableData(records); } public async Task RemoveRow(GrainId grainId, string reminderName, string eTag) { - (RedisValue from, RedisValue to) = GetFilter(grainId, reminderName, eTag); - long removed = await _db.SortedSetRemoveRangeByValueAsync(RemindersRedisKey, from, to); - return removed > 0; + try + { + var (from, to) = GetFilter(grainId, reminderName, eTag); + long removed = await _db.SortedSetRemoveRangeByValueAsync(_hashSetKey, from, to); + return removed > 0; + } + catch (Exception exception) + { + throw new RedisRemindersException(Invariant($"{exception.GetType()}: {exception.Message}")); + } } public async Task TestOnlyClearTable() { - await _db.ExecuteAsync("FLUSHDB"); + try + { + await _db.KeyDeleteAsync(_hashSetKey); + } + catch (Exception exception) + { + throw new RedisRemindersException(Invariant($"{exception.GetType()}: {exception.Message}")); + } } public async Task UpsertRow(ReminderEntry entry) { - if (_logger.IsEnabled(LogLevel.Debug)) - { - _logger.LogDebug("UpsertRow entry = {Entry}, ETag = {ETag}", entry.ToString(), entry.ETag); - } + const string UpsertScript = + """ + local key = KEYS[1] + local from = '[' .. ARGV[1] -- start of the conditional (with etag) key range + local to = '[' .. ARGV[2] -- end of the conditional (with etag) key range + local value = ARGV[3] - (string etag, string value) = ConvertFromEntry(entry); - (string from, string to) = GetFilter(entry.GrainId, entry.ReminderName); + -- Remove all entries for this reminder + local remRes = redis.call('ZREMRANGEBYLEX', key, from, to); - ITransaction tx = _db.CreateTransaction(); - _db.SortedSetRemoveRangeByValueAsync(RemindersRedisKey, from, to).Ignore(); - _db.SortedSetAddAsync(RemindersRedisKey, value, 0).Ignore(); - bool success = await tx.ExecuteAsync(); - if (success) + -- Add the new reminder entry + local addRes = redis.call('ZADD', key, 0, value); + return { key, from, to, value, remRes, addRes } + """; + + try { - return etag; + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug("UpsertRow entry = {Entry}, ETag = {ETag}", entry.ToString(), entry.ETag); + } + + var (newETag, value) = ConvertFromEntry(entry); + var (from, to) = GetFilter(entry.GrainId, entry.ReminderName); + var res = await _db.ScriptEvaluateAsync(UpsertScript, keys: new[] { _hashSetKey }, values: new[] { from, to, value }); + return newETag; } - else + catch (Exception exception) when (exception is not ReminderException) { - _logger.LogWarning( - (int)ErrorCode.ReminderServiceBase, - "Intermediate error updating entry {Entry} to Redis.", - entry); - throw new ReminderException("Failed to upsert reminder"); + throw new RedisRemindersException(Invariant($"{exception.GetType()}: {exception.Message}")); } } - private ReminderEntry ConvertToEntry(string reminderValue) + private static ReminderEntry ConvertToEntry(string reminderValue) { - string[] segments = JsonConvert.DeserializeObject(reminderValue); + string[] segments = JsonConvert.DeserializeObject($"[{reminderValue}]"); return new ReminderEntry { @@ -151,37 +206,33 @@ private ReminderEntry ConvertToEntry(string reminderValue) }; } - private (string from, string to) GetFilter(uint grainHash) + private (RedisValue from, RedisValue to) GetFilter(uint grainHash) { return GetFilter(grainHash.ToString("X8")); } - private (string from, string to) GetFilter(GrainId grainId) + private (RedisValue from, RedisValue to) GetFilter(GrainId grainId) { return GetFilter(grainId.GetUniformHashCode().ToString("X8"), grainId.ToString()); } - private (string from, string to) GetFilter(GrainId grainId, string reminderName) + private (RedisValue from, RedisValue to) GetFilter(GrainId grainId, string reminderName) { return GetFilter(grainId.GetUniformHashCode().ToString("X8"), grainId.ToString(), reminderName); } - private (string from, string to) GetFilter(GrainId grainId, string reminderName, string eTag) + private (RedisValue from, RedisValue to) GetFilter(GrainId grainId, string reminderName, string eTag) { return GetFilter(grainId.GetUniformHashCode().ToString("X8"), grainId.ToString(), reminderName, eTag); } - private (string from, string to) GetFilter(params string[] segments) + private (RedisValue from, RedisValue to) GetFilter(params string[] segments) { string prefix = JsonConvert.SerializeObject(segments, _jsonSettings); - prefix = prefix.Remove(prefix.Length - 1); - string from = prefix + ",\""; - string to = prefix + ",#"; - return (from, to); + return ($"{prefix[1..^1]},\"", $"{prefix[1..^1]},#"); } - - private (string eTag, string value) ConvertFromEntry(ReminderEntry entry) + private (RedisValue eTag, RedisValue value) ConvertFromEntry(ReminderEntry entry) { string grainHash = entry.GrainId.GetUniformHashCode().ToString("X8"); string eTag = Guid.NewGuid().ToString(); @@ -195,7 +246,7 @@ private ReminderEntry ConvertToEntry(string reminderValue) entry.Period.ToString() }; - return (eTag, JsonConvert.SerializeObject(segments, _jsonSettings)); + return (eTag, JsonConvert.SerializeObject(segments, _jsonSettings)[1..^1]); } } } diff --git a/src/Redis/Orleans.Reminders.Redis/Storage/RedisRemindersException.cs b/src/Redis/Orleans.Reminders.Redis/Storage/RedisRemindersException.cs new file mode 100644 index 0000000000..c0eb26dcf9 --- /dev/null +++ b/src/Redis/Orleans.Reminders.Redis/Storage/RedisRemindersException.cs @@ -0,0 +1,43 @@ +using System; +using System.Runtime.Serialization; + +namespace Orleans.Reminders.Redis +{ + /// + /// Exception thrown from . + /// + [GenerateSerializer] + public class RedisRemindersException : Exception + { + /// + /// Initializes a new instance of . + /// + public RedisRemindersException() + { + } + + /// + /// Initializes a new instance of . + /// + /// The error message that explains the reason for the exception. + public RedisRemindersException(string message) : base(message) + { + } + + /// + /// Initializes a new instance of . + /// + /// The error message that explains the reason for the exception. + /// The exception that is the cause of the current exception, or a null reference (Nothing in Visual Basic) if no inner exception is specified. + public RedisRemindersException(string message, Exception inner) : base(message, inner) + { + } + + /// + protected RedisRemindersException( + SerializationInfo info, + StreamingContext context) : base(info, context) + { + } + } +} \ No newline at end of file diff --git a/test/Extensions/Tester.Redis/Clustering/RedisMembershipTableTests.cs b/test/Extensions/Tester.Redis/Clustering/RedisMembershipTableTests.cs index ffa4d3bbfd..fdae4a4fc5 100644 --- a/test/Extensions/Tester.Redis/Clustering/RedisMembershipTableTests.cs +++ b/test/Extensions/Tester.Redis/Clustering/RedisMembershipTableTests.cs @@ -6,6 +6,7 @@ using UnitTests.MembershipTests; using TestExtensions; using UnitTests; +using StackExchange.Redis; namespace Tester.Redis.Clustering { @@ -32,7 +33,11 @@ protected override IMembershipTable CreateMembershipTable(ILogger logger) TestUtils.CheckForRedis(); membershipTable = new RedisMembershipTable( - Options.Create(new RedisClusteringOptions() { ConnectionString = GetConnectionString().Result }), + Options.Create(new RedisClusteringOptions() + { + ConfigurationOptions = ConfigurationOptions.Parse(GetConnectionString().Result), + EntryExpiry = TimeSpan.FromHours(1) + }), this.clusterOptions); return membershipTable; diff --git a/test/Extensions/Tester.Redis/GrainDirectory/RedisGrainDirectoryTests.cs b/test/Extensions/Tester.Redis/GrainDirectory/RedisGrainDirectoryTests.cs index 511ad42148..f1882b0c75 100644 --- a/test/Extensions/Tester.Redis/GrainDirectory/RedisGrainDirectoryTests.cs +++ b/test/Extensions/Tester.Redis/GrainDirectory/RedisGrainDirectoryTests.cs @@ -1,16 +1,11 @@ -using System; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Orleans.Configuration; +using Orleans.GrainDirectory; using Orleans.GrainDirectory.Redis; -using Orleans.Hosting; -using Orleans.TestingHost; using StackExchange.Redis; using Tester.Directories; -using Tester.Redis.Utility; using TestExtensions; -using UnitTests.Grains.Directories; -using Xunit; using Xunit.Abstractions; namespace Tester.Redis.GrainDirectory diff --git a/test/Extensions/Tester.Redis/GrainDirectory/RedisMultipleGrainDirectoriesTests.cs b/test/Extensions/Tester.Redis/GrainDirectory/RedisMultipleGrainDirectoriesTests.cs index e605eb5d85..9ab50c13ca 100644 --- a/test/Extensions/Tester.Redis/GrainDirectory/RedisMultipleGrainDirectoriesTests.cs +++ b/test/Extensions/Tester.Redis/GrainDirectory/RedisMultipleGrainDirectoriesTests.cs @@ -1,17 +1,10 @@ -using System; using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using Orleans.Configuration; using Orleans.GrainDirectory.Redis; -using Orleans.Hosting; using Orleans.TestingHost; using StackExchange.Redis; using Tester.Directories; -using Tester.Redis.Utility; using TestExtensions; using UnitTests.Grains.Directories; -using Xunit; -using Xunit.Abstractions; namespace Tester.Redis.GrainDirectory { diff --git a/test/Extensions/Tester.Redis/Persistence/GrainState.cs b/test/Extensions/Tester.Redis/Persistence/GrainState.cs index 9e7eee317c..85aac58bd7 100644 --- a/test/Extensions/Tester.Redis/Persistence/GrainState.cs +++ b/test/Extensions/Tester.Redis/Persistence/GrainState.cs @@ -1,4 +1,3 @@ -using System; using UnitTests.GrainInterfaces; namespace Tester.Redis.Persistence diff --git a/test/Extensions/Tester.Redis/Persistence/RedisPersistenceGrainTests.cs b/test/Extensions/Tester.Redis/Persistence/RedisPersistenceGrainTests.cs index 3b6fc2640a..6ff524c1f1 100644 --- a/test/Extensions/Tester.Redis/Persistence/RedisPersistenceGrainTests.cs +++ b/test/Extensions/Tester.Redis/Persistence/RedisPersistenceGrainTests.cs @@ -1,12 +1,10 @@ -using System; -using System.Collections.Generic; +using System.Text.RegularExpressions; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; using Orleans.Runtime; using Orleans.Storage; using Orleans.TestingHost; using StackExchange.Redis; -using Tester.Redis.Utility; using TestExtensions; using TestExtensions.Runners; using UnitTests.GrainInterfaces; @@ -18,8 +16,8 @@ namespace Tester.Redis.Persistence [TestCategory("Redis"), TestCategory("Persistence"), TestCategory("Functional")] public class RedisPersistenceGrainTests : GrainPersistenceTestsRunner, IClassFixture { - public static Guid ServiceId = Guid.NewGuid(); - public static string ConnectionStringKey = "ConnectionString"; + public static readonly string ServiceId = Guid.NewGuid().ToString("N"); + public const string ConnectionStringKey = "ConnectionString"; public class Fixture : BaseTestClusterFixture { protected override void ConfigureTestCluster(TestClusterBuilder builder) @@ -31,7 +29,7 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder) { {ConnectionStringKey, TestDefaultConfiguration.RedisConnectionString} })); - builder.Options.ServiceId = ServiceId.ToString(); + builder.Options.ServiceId = ServiceId; builder.AddSiloBuilderConfigurator(); builder.AddClientBuilderConfigurator(); } @@ -46,7 +44,8 @@ public void Configure(IHostBuilder hostBuilder) siloBuilder .AddRedisGrainStorage("GrainStorageForTest", options => { - options.ConnectionString = connectionString; + options.ConfigurationOptions = ConfigurationOptions.Parse(connectionString); + options.EntryExpiry = TimeSpan.FromHours(1); }) .AddMemoryGrainStorage("MemoryStore"); }); @@ -122,7 +121,18 @@ public async Task Redis_TestRedisScriptCacheClearBeforeGrainWriteState() { var grain = fixture.GrainFactory.GetGrain>(1111); - await database.ExecuteAsync("SCRIPT", "FLUSH", "SYNC"); + var info = (string)await database.ExecuteAsync("INFO"); + var versionString = Regex.Match(info, @"redis_version:[\s]*([^\s]+)").Groups[1].Value; + var version = Version.Parse(versionString); + if (version >= Version.Parse("6.2.0")) + { + await database.ExecuteAsync("SCRIPT", "FLUSH", "SYNC"); + } + else + { + await database.ExecuteAsync("SCRIPT", "FLUSH"); + } + await grain.DoWrite(state); var result = await grain.DoRead(); @@ -139,7 +149,7 @@ public async Task Redis_DoubleActivationETagConflictSimulation() var grain = fixture.GrainFactory.GetGrain>(54321); var data = await grain.DoRead(); - var key = grain.GetGrainId().ToString(); + var key = $"{ServiceId}/state/{grain.GetGrainId()}"; await database.HashSetAsync(key, new[] { new HashEntry("etag", "derp") }); await Assert.ThrowsAsync(() => grain.DoWrite(state)); diff --git a/test/Extensions/Tester.Redis/Persistence/RedisPersistenceSetupTests.cs b/test/Extensions/Tester.Redis/Persistence/RedisPersistenceSetupTests.cs index 6fdce74698..8d41c5cc5b 100644 --- a/test/Extensions/Tester.Redis/Persistence/RedisPersistenceSetupTests.cs +++ b/test/Extensions/Tester.Redis/Persistence/RedisPersistenceSetupTests.cs @@ -3,6 +3,7 @@ using Xunit; using Orleans.Configuration; using Orleans.Runtime; +using StackExchange.Redis; namespace Tester.Redis.Persistence { @@ -11,9 +12,7 @@ public class RedisPersistenceSetupTests { [SkippableTheory] [InlineData(null)] - [InlineData("")] - [InlineData(" ")] - [InlineData("123")] + [InlineData("localhost:1234")] public void StorageOptionsValidator(string connectionString) { TestUtils.CheckForRedis(); @@ -29,11 +28,17 @@ public void StorageOptionsValidator(string connectionString) .ConfigureEndpoints(siloAddress, siloPort, gatewayPort) .AddRedisGrainStorage("Redis", optionsBuilder => optionsBuilder.Configure(options => { - options.ConnectionString = connectionString; + if (connectionString is not null) + { + options.ConfigurationOptions = ConfigurationOptions.Parse(connectionString); + } })); }).Build(); - Assert.Throws(() => host.Start()); + if (string.IsNullOrWhiteSpace(connectionString)) + { + Assert.Throws(() => host.Start()); + } } } } \ No newline at end of file diff --git a/test/Extensions/Tester.Redis/Reminders/RedisReminderTableTests.cs b/test/Extensions/Tester.Redis/Reminders/RedisReminderTableTests.cs index 89d5b91e09..3dd4b0d242 100644 --- a/test/Extensions/Tester.Redis/Reminders/RedisReminderTableTests.cs +++ b/test/Extensions/Tester.Redis/Reminders/RedisReminderTableTests.cs @@ -1,16 +1,10 @@ -using System; -using System.Threading.Tasks; -using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Orleans.Configuration; using Orleans.Reminders.Redis; -using Orleans.Runtime; -using Orleans.TestingHost; -using Tester.Redis.Utility; +using StackExchange.Redis; using TestExtensions; using UnitTests; -using UnitTests.GrainInterfaces; using UnitTests.RemindersTest; using Xunit; @@ -37,8 +31,11 @@ protected override IReminderTable CreateRemindersTable() RedisReminderTable reminderTable = new( this.loggerFactory.CreateLogger(), this.clusterOptions, - Options.Create(new RedisReminderTableOptions() { ConnectionString = GetConnectionString().Result }) - ); + Options.Create(new RedisReminderTableOptions() + { + ConfigurationOptions = ConfigurationOptions.Parse(GetConnectionString().Result), + EntryExpiry = TimeSpan.FromHours(1) + })); if (reminderTable == null) { diff --git a/test/Extensions/Tester.Redis/Utility/TestExtensions.cs b/test/Extensions/Tester.Redis/Utility/TestExtensions.cs index d2671a3403..d2e1ce705a 100644 --- a/test/Extensions/Tester.Redis/Utility/TestExtensions.cs +++ b/test/Extensions/Tester.Redis/Utility/TestExtensions.cs @@ -1,8 +1,5 @@ using Orleans.Runtime; -using System; using System.Net; -using System.Threading; -using System.Threading.Tasks; namespace Tester.Redis.Utility { diff --git a/test/Grains/TestInternalGrains/PersistenceTestGrains.cs b/test/Grains/TestInternalGrains/PersistenceTestGrains.cs index 43ed3c58ac..9e0850cb91 100644 --- a/test/Grains/TestInternalGrains/PersistenceTestGrains.cs +++ b/test/Grains/TestInternalGrains/PersistenceTestGrains.cs @@ -356,7 +356,7 @@ public async Task DoRead() public Task DoDelete() { - return ClearStateAsync(); // Automatically marks this grain as DeactivateOnIdle + return ClearStateAsync(); } } @@ -388,7 +388,7 @@ public async Task DoRead() public Task DoDelete() { - return ClearStateAsync(); // Automatically marks this grain as DeactivateOnIdle + return ClearStateAsync(); } } @@ -427,7 +427,7 @@ public async Task DoRead() public Task DoDelete() { - return ClearStateAsync(); // Automatically marks this grain as DeactivateOnIdle + return ClearStateAsync(); } } @@ -463,7 +463,7 @@ public async Task DoRead() public Task DoDelete() { - return ClearStateAsync(); // Automatically marks this grain as DeactivateOnIdle + return ClearStateAsync(); } } @@ -495,7 +495,7 @@ public async Task DoRead() public Task DoDelete() { - return ClearStateAsync(); // Automatically marks this grain as DeactivateOnIdle + return ClearStateAsync(); } } @@ -534,7 +534,7 @@ public async Task DoRead() public Task DoDelete() { - return ClearStateAsync(); // Automatically marks this grain as DeactivateOnIdle + return ClearStateAsync(); } } diff --git a/test/Grains/TestInternalGrains/PersistentStateTestGrains.cs b/test/Grains/TestInternalGrains/PersistentStateTestGrains.cs index bfcfa84235..64f7217ac1 100644 --- a/test/Grains/TestInternalGrains/PersistentStateTestGrains.cs +++ b/test/Grains/TestInternalGrains/PersistentStateTestGrains.cs @@ -38,7 +38,7 @@ public async Task DoRead() public Task DoDelete() { - return this.persistentState.ClearStateAsync(); // Automatically marks this grain as DeactivateOnIdle + return this.persistentState.ClearStateAsync(); } } @@ -82,7 +82,7 @@ public async Task DoRead() public Task DoDelete() { - return this.persistentState.ClearStateAsync(); // Automatically marks this grain as DeactivateOnIdle + return this.persistentState.ClearStateAsync(); } } } \ No newline at end of file diff --git a/test/Tester/Directories/GrainDirectoryTests.cs b/test/Tester/Directories/GrainDirectoryTests.cs index ddd5ca2757..c354fe4cbb 100644 --- a/test/Tester/Directories/GrainDirectoryTests.cs +++ b/test/Tester/Directories/GrainDirectoryTests.cs @@ -30,7 +30,7 @@ public async Task RegisterLookupUnregisterLookup() var expected = new GrainAddress { ActivationId = ActivationId.NewId(), - GrainId = GrainId.Parse("user/someraondomuser_" + Guid.NewGuid().ToString("N")), + GrainId = GrainId.Parse("user/somerandomuser_" + Guid.NewGuid().ToString("N")), SiloAddress = SiloAddress.FromParsableString("10.0.23.12:1000@5678"), MembershipVersion = new MembershipVersion(51) }; @@ -50,7 +50,7 @@ public async Task DoNotOverrideEntry() var expected = new GrainAddress { ActivationId = ActivationId.NewId(), - GrainId = GrainId.Parse("user/someraondomuser_" + Guid.NewGuid().ToString("N")), + GrainId = GrainId.Parse("user/somerandomuser_" + Guid.NewGuid().ToString("N")), SiloAddress = SiloAddress.FromParsableString("10.0.23.12:1000@5678"), MembershipVersion = new MembershipVersion(51) }; @@ -84,7 +84,7 @@ public async Task DoNotDeleteDifferentActivationIdEntry() var expected = new GrainAddress { ActivationId = ActivationId.NewId(), - GrainId = GrainId.Parse("user/someraondomuser_" + Guid.NewGuid().ToString("N")), + GrainId = GrainId.Parse("user/somerandomuser_" + Guid.NewGuid().ToString("N")), SiloAddress = SiloAddress.FromParsableString("10.0.23.12:1000@5678"), MembershipVersion = new MembershipVersion(51) }; @@ -105,7 +105,7 @@ public async Task DoNotDeleteDifferentActivationIdEntry() [SkippableFact] public async Task LookupNotFound() { - Assert.Null(await this.grainDirectory.Lookup(GrainId.Parse("user/someraondomuser_" + Guid.NewGuid().ToString("N")))); + Assert.Null(await this.grainDirectory.Lookup(GrainId.Parse("user/somerandomuser_" + Guid.NewGuid().ToString("N")))); } } } diff --git a/test/TesterInternal/MembershipTests/MembershipTableTestsBase.cs b/test/TesterInternal/MembershipTests/MembershipTableTestsBase.cs index ac744d4fb6..81bfa5b428 100644 --- a/test/TesterInternal/MembershipTests/MembershipTableTestsBase.cs +++ b/test/TesterInternal/MembershipTests/MembershipTableTestsBase.cs @@ -369,16 +369,24 @@ protected async Task MembershipTable_UpdateRowInParallel(bool extendedProtocol = await Task.WhenAll(Enumerable.Range(1, 19).Select(async i => { - bool done; + var done = false; do { var updatedTableData = await membershipTable.ReadAll(); var updatedRow = updatedTableData.TryGet(data.SiloAddress); - TableVersion tableVersion = updatedTableData.Version.Next(); - await Task.Delay(10); - try { done = await membershipTable.UpdateRow(updatedRow.Item1, updatedRow.Item2, tableVersion); } catch { done = false; } + if (updatedRow is null) continue; + + TableVersion tableVersion = updatedTableData.Version.Next(); + try + { + done = await membershipTable.UpdateRow(updatedRow.Item1, updatedRow.Item2, tableVersion); + } + catch + { + done = false; + } } while (!done); })).WithTimeout(TimeSpan.FromSeconds(30)); diff --git a/test/TesterInternal/RemindersTest/ReminderTableTestsBase.cs b/test/TesterInternal/RemindersTest/ReminderTableTestsBase.cs index dc81e1f95b..d09fd082db 100644 --- a/test/TesterInternal/RemindersTest/ReminderTableTestsBase.cs +++ b/test/TesterInternal/RemindersTest/ReminderTableTestsBase.cs @@ -110,7 +110,7 @@ protected async Task ReminderSimple() Assert.False(removeRowRes, "should have failed. reminder shouldn't exist"); } - protected async Task RemindersRange(int iterations=1000) + protected async Task RemindersRange(int iterations = 1000) { await Task.WhenAll(Enumerable.Range(1, iterations).Select(async i => { @@ -119,7 +119,7 @@ await Task.WhenAll(Enumerable.Range(1, iterations).Select(async i => await RetryHelper.RetryOnExceptionAsync(10, RetryOperation.Sigmoid, async () => { await remindersTable.UpsertRow(CreateReminder(grainRef, i.ToString())); - return Task.CompletedTask; + return 0; }); }));