-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
- Loading branch information
1 parent
c06f4a2
commit a15a4eb
Showing
107 changed files
with
3,459 additions
and
383 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
// Copyright © 2024 EPAM Systems | ||
|
||
using Confluent.Kafka; | ||
using Confluent.Kafka.Admin; | ||
using Confluent.SchemaRegistry; | ||
using Epam.Kafka.Internals; | ||
using Epam.Kafka.Options; | ||
|
||
using Microsoft.Extensions.Diagnostics.HealthChecks; | ||
using Microsoft.Extensions.Options; | ||
|
||
namespace Epam.Kafka.HealthChecks; | ||
|
||
internal sealed class ClusterHealthCheck : IHealthCheck, IObserver<Error> | ||
{ | ||
private readonly IKafkaFactory _kafkaFactory; | ||
private readonly IOptionsMonitor<ClusterHealthCheckOptions> _optionsMonitor; | ||
private readonly IOptionsMonitor<KafkaClusterOptions> _clusterOptionsMonitor; | ||
private readonly List<Error> _errors = new(); | ||
|
||
public const string NamePrefix = "Epam.Kafka.Cluster."; | ||
|
||
public ClusterHealthCheck( | ||
IKafkaFactory kafkaFactory, | ||
IOptionsMonitor<ClusterHealthCheckOptions> optionsMonitor, | ||
IOptionsMonitor<KafkaClusterOptions> clusterOptionsMonitor) | ||
{ | ||
this._kafkaFactory = kafkaFactory ?? throw new ArgumentNullException(nameof(kafkaFactory)); | ||
this._optionsMonitor = optionsMonitor ?? throw new ArgumentNullException(nameof(optionsMonitor)); | ||
this._clusterOptionsMonitor = clusterOptionsMonitor ?? throw new ArgumentNullException(nameof(clusterOptionsMonitor)); | ||
} | ||
|
||
public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, | ||
CancellationToken cancellationToken = new CancellationToken()) | ||
{ | ||
string description = "Not used by application."; | ||
HealthStatus status = HealthStatus.Healthy; | ||
|
||
string name = context.Registration.Name.Substring(NamePrefix.Length); | ||
|
||
ClusterHealthCheckOptions options = this._optionsMonitor.Get(name); | ||
|
||
if (options.IncludeUnused || this._kafkaFactory is not KafkaFactory kf || kf.UsedClusters.Contains(name)) | ||
{ | ||
description = "AdminClient: "; | ||
|
||
if (options.SkipAdminClient) | ||
{ | ||
description += "check skipped."; | ||
} | ||
else | ||
{ | ||
#pragma warning disable CA1031 // Not applicable for this health checks | ||
|
||
try | ||
{ | ||
using IClient client = this._kafkaFactory.GetOrCreateClient(name); | ||
|
||
using IDisposable subscription = ((IObservable<Error>)client).Subscribe(this); | ||
|
||
using IAdminClient adminClient = client.CreateDependentAdminClient(); | ||
|
||
await adminClient | ||
.DescribeClusterAsync(new DescribeClusterOptions | ||
{ RequestTimeout = context.Registration.Timeout }) | ||
.ConfigureAwait(false); | ||
|
||
if (this._errors.Count > 0) | ||
{ | ||
status = HealthStatus.Degraded; | ||
description += string.Join(", ", this._errors) + "."; | ||
} | ||
else | ||
{ | ||
description += "OK."; | ||
} | ||
} | ||
catch (Exception e) | ||
{ | ||
status = context.Registration.FailureStatus; | ||
|
||
if (this._errors.Count > 0) | ||
{ | ||
description += string.Join(", ", this._errors) + "."; | ||
} | ||
else | ||
{ | ||
description += $"{e.Message}."; | ||
} | ||
} | ||
} | ||
|
||
if (options.SkipSchemaRegistry) | ||
{ | ||
description += " SchemaRegistry: check skipped."; | ||
} | ||
else | ||
{ | ||
try | ||
{ | ||
if (this._clusterOptionsMonitor.Get(name).SchemaRegistryConfig.Any()) | ||
{ | ||
ISchemaRegistryClient sr = this._kafkaFactory.GetOrCreateSchemaRegistryClient(name); | ||
await sr.GetCompatibilityAsync().ConfigureAwait(false); | ||
description += " SchemaRegistry: OK."; | ||
} | ||
else | ||
{ | ||
description += " SchemaRegistry: not configured."; | ||
} | ||
} | ||
catch (Exception e) | ||
{ | ||
status = context.Registration.FailureStatus; | ||
description += $" SchemaRegistry: {e.Message}."; | ||
} | ||
} | ||
|
||
#pragma warning restore CA1031 | ||
} | ||
|
||
return new HealthCheckResult(status, description); | ||
} | ||
|
||
public void OnNext(Error value) | ||
{ | ||
this._errors.Add(value); | ||
} | ||
|
||
public void OnError(Exception error) | ||
{ | ||
} | ||
|
||
public void OnCompleted() | ||
{ | ||
this._errors.Clear(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
// Copyright © 2024 EPAM Systems | ||
|
||
using Microsoft.Extensions.Options; | ||
|
||
namespace Epam.Kafka.HealthChecks; | ||
|
||
/// <summary> | ||
/// Options for kafka cluster health checks. | ||
/// </summary> | ||
public sealed class ClusterHealthCheckOptions : IOptions<ClusterHealthCheckOptions> | ||
{ | ||
/// <summary> | ||
/// Whether schema registry check should be skipped. Default <code>false</code>. | ||
/// </summary> | ||
public bool SkipSchemaRegistry { get; set; } | ||
|
||
/// <summary> | ||
/// Whether admin client check should be skipped. Default <code>false</code>. | ||
/// </summary> | ||
public bool SkipAdminClient { get; set; } | ||
|
||
/// <summary> | ||
/// Whether cluster will be checked even if was not used at least 1 time by default <see cref="IKafkaFactory"/> implementation. Default <code>false</code>. | ||
/// </summary> | ||
public bool IncludeUnused { get; set; } | ||
|
||
ClusterHealthCheckOptions IOptions<ClusterHealthCheckOptions>.Value => this; | ||
} |
20 changes: 20 additions & 0 deletions
20
src/Epam.Kafka.HealthChecks/Epam.Kafka.HealthChecks.csproj
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
<Project Sdk="Microsoft.NET.Sdk"> | ||
|
||
<PropertyGroup> | ||
<TargetFrameworks>net8.0;net6.0;netstandard2.0;net462</TargetFrameworks> | ||
<Description>Health check extensions for [Epam.Kafka](https://www.nuget.org/packages/Epam.Kafka) package.</Description> | ||
</PropertyGroup> | ||
|
||
|
||
<ItemGroup Condition="'$(TargetFramework)' == 'net6.0' or '$(TargetFramework)' == 'net462' or '$(TargetFramework)' == 'netstandard2.0'"> | ||
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="6.0.0" /> | ||
</ItemGroup> | ||
|
||
<ItemGroup Condition="'$(TargetFramework)' == 'net8.0'"> | ||
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="8.0.0" /> | ||
</ItemGroup> | ||
|
||
<ItemGroup> | ||
<ProjectReference Include="..\Epam.Kafka\Epam.Kafka.csproj" /> | ||
</ItemGroup> | ||
</Project> |
Oops, something went wrong.