Skip to content

Commit

Permalink
fix: various in-process fixes, e2e tests (#189)
Browse files Browse the repository at this point in the history
Signed-off-by: Todd Baert <[email protected]>
Co-authored-by: Florian Bacher <[email protected]>
  • Loading branch information
toddbaert and bacherfl authored Apr 30, 2024
1 parent 6108d45 commit f2d096f
Show file tree
Hide file tree
Showing 28 changed files with 560 additions and 1,332 deletions.
11 changes: 3 additions & 8 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ jobs:
services:
# flagd-testbed for flagd RPC provider e2e tests
flagd:
image: ghcr.io/open-feature/flagd-testbed:v0.5.0
image: ghcr.io/open-feature/flagd-testbed:v0.5.4
ports:
- 8013:8013
# sync-testbed for flagd in-process provider e2e tests
sync:
image: ghcr.io/open-feature/sync-testbed:v0.5.1
image: ghcr.io/open-feature/sync-testbed:v0.5.4
ports:
- 9090:9090
steps:
Expand All @@ -76,13 +76,8 @@ jobs:
8.0.x
source-url: https://nuget.pkg.github.com/open-feature/index.json

- name: Copy Gherkin
run: |
cp spec/specification/assets/gherkin/evaluation.feature test/OpenFeature.Contrib.Providers.Flagd.E2e.RpcTest/Features
cp spec/specification/assets/gherkin/evaluation.feature test/OpenFeature.Contrib.Providers.Flagd.E2e.ProcessTest/Features
- name: Test
run: E2E=true dotnet test --logger GitHubActions
run: dotnet build && E2E=true dotnet test --logger GitHubActions

packaging:
needs: build
Expand Down
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@
[submodule "spec"]
path = spec
url = https://github.com/open-feature/spec.git
[submodule "src/OpenFeature.Contrib.Providers.Flagd/flagd-testbed"]
path = src/OpenFeature.Contrib.Providers.Flagd/flagd-testbed
url = https://github.com/open-feature/flagd-testbed.git
2 changes: 1 addition & 1 deletion build/Common.props
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
Please sort alphabetically.
Refer to https://docs.microsoft.com/nuget/concepts/package-versioning for semver syntax.
-->
<!-- 0.5+ -->
<!-- 1.5+ -->
<OpenFeatureVer>[1.5,)</OpenFeatureVer>
</PropertyGroup>

Expand Down
10 changes: 9 additions & 1 deletion src/OpenFeature.Contrib.Providers.Flagd/FlagdConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,18 @@ public string SocketPath

/// <summary>
/// Maximum number of times the connection to the event stream should be re-attempted
/// 0 = infinite
/// </summary>
public int MaxEventStreamRetries
{
get => _maxEventStreamRetries;
get
{
if (_maxEventStreamRetries == 0)
{
return int.MaxValue;
}
return _maxEventStreamRetries;
}
set => _maxEventStreamRetries = value;
}

Expand Down
5 changes: 3 additions & 2 deletions src/OpenFeature.Contrib.Providers.Flagd/FlagdProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Metadata = OpenFeature.Model.Metadata;
using Value = OpenFeature.Model.Value;
using OpenFeature.Constant;
using System.Diagnostics.Tracing;

namespace OpenFeature.Contrib.Providers.Flagd
{
Expand Down Expand Up @@ -69,11 +70,11 @@ public FlagdProvider(FlagdConfig config)

if (_config.ResolverType == ResolverType.IN_PROCESS)
{
_resolver = new InProcessResolver(_config);
_resolver = new InProcessResolver(_config, EventChannel, _providerMetadata);
}
else
{
_resolver = new RpcResolver(config);
_resolver = new RpcResolver(config, EventChannel, _providerMetadata);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="8.0.0" />
<PackageReference Include="murmurhash" Version="1.0.3" />
<PackageReference Include="Semver" Version="2.3.0" />
<Protobuf Include="schemas\protobuf\schema\v1\schema.proto" GrpcServices="Client" />
<Protobuf Include="schemas\protobuf\flagd\evaluation\v1\evaluation.proto" GrpcServices="Client" />
<Protobuf Include="schemas\protobuf\flagd\sync\v1\sync.proto" GrpcServices="Client" />
<PackageReference Include="Google.Protobuf" Version="3.23.4" />
<PackageReference Include="Grpc.Net.Client" Version="2.59.0" />
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
using System;
using System.Runtime.InteropServices;
using JsonLogic.Net;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Newtonsoft.Json.Linq;
using OpenFeature.Error;
using OpenFeature.Model;

namespace OpenFeature.Contrib.Providers.Flagd.Resolver.InProcess.CustomEvaluators
{
Expand All @@ -26,22 +29,45 @@ internal StringEvaluator()

internal object StartsWith(IProcessJsonLogic p, JToken[] args, object data)
{
// check if we have at least 2 arguments
if (args.Length < 2)
if (!isValid(p, args, data, out string operandA, out string operandB))
{
return false;
}
return p.Apply(args[0], data).ToString().StartsWith(p.Apply(args[1], data).ToString());
};
return Convert.ToString(operandA).StartsWith(Convert.ToString(operandB));
}

internal object EndsWith(IProcessJsonLogic p, JToken[] args, object data)
{
if (!isValid(p, args, data, out string operandA, out string operandB))
{
return false;
};
return operandA.EndsWith(operandB);
}

private bool isValid(IProcessJsonLogic p, JToken[] args, object data, out string operandA, out string operandB)
{
// check if we have at least 2 arguments
operandA = null;
operandB = null;

if (args.Length < 2)
{
return false;
}
return p.Apply(args[0], data).ToString().EndsWith(p.Apply(args[1], data).ToString());
operandA = p.Apply(args[0], data) as string;
operandB = p.Apply(args[1], data) as string;

if (!(operandA is string) || !(operandB is string))
{
// return false immediately if both operands are not strings.
return false;
}

Convert.ToString(operandA);
Convert.ToString(operandB);

return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,43 @@
using System.Threading;
using Grpc.Core;
using Value = OpenFeature.Model.Value;
using System.Diagnostics.Tracing;
using System.Threading.Channels;
using OpenFeature.Constant;

namespace OpenFeature.Contrib.Providers.Flagd.Resolver.InProcess
{
internal class InProcessResolver : Resolver
{
static readonly int EventStreamRetryBaseBackoff = 1;
static readonly int InitialEventStreamRetryBaseBackoff = 1;
static readonly int MaxEventStreamRetryBackoff = 60;
readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
private readonly FlagSyncService.FlagSyncServiceClient _client;
private readonly JsonEvaluator _evaluator;
private readonly Mutex _mtx;
private int _eventStreamRetryBackoff = EventStreamRetryBaseBackoff;
private int _eventStreamRetryBackoff = InitialEventStreamRetryBaseBackoff;
private readonly FlagdConfig _config;
private Thread _handleEventsThread;
private GrpcChannel _channel;
private Channel<object> _eventChannel;
private Model.Metadata _providerMetadata;
private bool connected = false;

internal InProcessResolver(FlagdConfig config)
internal InProcessResolver(FlagdConfig config, Channel<object> eventChannel, Model.Metadata providerMetadata)
{
_eventChannel = eventChannel;
_providerMetadata = providerMetadata;
_config = config;
_client = BuildClient(config, channel => new FlagSyncService.FlagSyncServiceClient(channel));
_mtx = new Mutex();
_evaluator = new JsonEvaluator(config.SourceSelector);
}

internal InProcessResolver(FlagSyncService.FlagSyncServiceClient client, FlagdConfig config) : this(config)
internal InProcessResolver(FlagSyncService.FlagSyncServiceClient client, FlagdConfig config, Channel<object> eventChannel, Model.Metadata providerMetadata) : this(config, eventChannel, providerMetadata)
{
_client = client;
}


public Task Init()
{
return Task.Run(() =>
Expand All @@ -53,9 +60,9 @@ public Task Init()
};
_handleEventsThread.Start();
latch.Wait();
}).ContinueWith((t) =>
}).ContinueWith((task) =>
{
if (t.IsFaulted) throw t.Exception;
if (task.IsFaulted) throw task.Exception;
});
}

Expand Down Expand Up @@ -106,7 +113,7 @@ private async void HandleEvents(CountdownEvent latch)
try
{
// Read the response stream asynchronously
while (await call.ResponseStream.MoveNext(token))
while (!token.IsCancellationRequested && await call.ResponseStream.MoveNext(token))
{
var response = call.ResponseStream.Current;
_evaluator.Sync(FlagConfigurationUpdateType.ALL, response.FlagConfiguration);
Expand All @@ -115,6 +122,7 @@ private async void HandleEvents(CountdownEvent latch)
latch.Signal();
}
HandleProviderReadyEvent();
HandleProviderChangeEvent();
}
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
Expand All @@ -132,18 +140,38 @@ private async void HandleEvents(CountdownEvent latch)
private void HandleProviderReadyEvent()
{
_mtx.WaitOne();
_eventStreamRetryBackoff = EventStreamRetryBaseBackoff;
_eventStreamRetryBackoff = InitialEventStreamRetryBaseBackoff;
if (!connected)
{
connected = true;
_eventChannel.Writer.TryWrite(new ProviderEventPayload { Type = ProviderEventTypes.ProviderReady, ProviderName = _providerMetadata.Name });
}
_mtx.ReleaseMutex();
}

private async Task HandleErrorEvent()
{
_mtx.WaitOne();
_eventStreamRetryBackoff = Math.Min(_eventStreamRetryBackoff * 2, MaxEventStreamRetryBackoff);
if (connected)
{
connected = false;
_eventChannel.Writer.TryWrite(new ProviderEventPayload { Type = ProviderEventTypes.ProviderError, ProviderName = _providerMetadata.Name });
}
_mtx.ReleaseMutex();
await Task.Delay(_eventStreamRetryBackoff * 1000);
}

private void HandleProviderChangeEvent()
{
_mtx.WaitOne();
if (connected)
{
_eventChannel.Writer.TryWrite(new ProviderEventPayload { Type = ProviderEventTypes.ProviderConfigurationChanged, ProviderName = _providerMetadata.Name });
}
_mtx.ReleaseMutex();
}

private T BuildClient<T>(FlagdConfig config, Func<GrpcChannel, T> constructorFunc)
{
var useUnixSocket = config.GetUri().ToString().StartsWith("unix://");
Expand Down
Loading

0 comments on commit f2d096f

Please sign in to comment.