Skip to content

Commit

Permalink
Release 2.3 (#38)
Browse files Browse the repository at this point in the history
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
  • Loading branch information
IharYakimush and dependabot[bot] committed Dec 2, 2024
1 parent 25e8159 commit 68f8b53
Show file tree
Hide file tree
Showing 12 changed files with 166 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,13 @@
<HintPath>packages\System.Text.Encodings.Web.8.0.0\lib\net462\System.Text.Encodings.Web.dll</HintPath>
<Private>True</Private>
</Reference>
<<<<<<< HEAD
<Reference Include="System.Text.Json, Version=8.0.0.5, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51">
<HintPath>packages\System.Text.Json.8.0.5\lib\net462\System.Text.Json.dll</HintPath>
=======
<Reference Include="System.Text.Json, Version=8.0.0.4, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51">
<HintPath>packages\System.Text.Json.8.0.4\lib\net462\System.Text.Json.dll</HintPath>
>>>>>>> Release 2.3 (#38)
<Private>True</Private>
</Reference>
<Reference Include="System.Threading.Tasks.Extensions, Version=4.2.0.1, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51, processorArchitecture=MSIL">
Expand Down
4 changes: 4 additions & 0 deletions sample/Epam.Kafka.Sample.Net462/packages.config
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@
<package id="System.Security.Cryptography.Primitives" version="4.3.0" targetFramework="net462" />
<package id="System.Security.Cryptography.X509Certificates" version="4.3.0" targetFramework="net462" />
<package id="System.Text.Encodings.Web" version="8.0.0" targetFramework="net462" />
<<<<<<< HEAD
<package id="System.Text.Json" version="8.0.5" targetFramework="net462" />
=======
<package id="System.Text.Json" version="8.0.4" targetFramework="net462" />
>>>>>>> Release 2.3 (#38)
<package id="System.Threading.Tasks.Extensions" version="4.5.4" targetFramework="net462" />
<package id="System.ValueTuple" version="4.5.0" targetFramework="net462" />
</packages>
4 changes: 4 additions & 0 deletions src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
<NeutralLanguage>en</NeutralLanguage>
<PackageReleaseNotes>https://github.com/epam/epam-kafka/releases</PackageReleaseNotes>
<VersionPrefix>0</VersionPrefix>
<<<<<<< HEAD
<Version>2.4.$(VersionPrefix)</Version>
=======
<Version>2.3.$(VersionPrefix)</Version>
>>>>>>> Release 2.3 (#38)
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ public static IQueryable<T> AsTracking<T>(this IQueryable<T> queryable)
return queryable;
}

<<<<<<< HEAD
public static int SaveChanges(this DbContext context, bool _)
=======
public static int SaveChanges(this DbContext context, bool acceptAllChangesOnSuccess)
>>>>>>> Release 2.3 (#38)
{
return context.SaveChanges();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ protected DbContextPublicationHandler(TContext context, ILogger logger) : base(l
protected override IEnumerable<TEntity> GetEntities(int count, bool transaction,
CancellationToken cancellationToken)
{
<<<<<<< HEAD
return this.OrderBy(this.GetTable().AsTracking().Where(this.IsQueued)).Take(count).ToArray();
=======
return this.OrderBy(this.GetTable().AsTracking().Where(this.IsQueued)).Take(count);
>>>>>>> Release 2.3 (#38)
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
// Copyright © 2024 EPAM Systems

<<<<<<< HEAD
=======
using Confluent.Kafka;

>>>>>>> Release 2.3 (#38)
using Epam.Kafka.PubSub.Common.Pipeline;

namespace Epam.Kafka.PubSub.Subscription.Pipeline;
Expand Down
4 changes: 4 additions & 0 deletions src/Epam.Kafka.PubSub/Subscription/State/ExternalState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ protected override void AssignConsumer<TKey, TValue>(SubscriptionTopicWrapper<TK
// existing assignment, check if offset reset
if (topic.Consumer.Assignment.Contains(item.TopicPartition))
{
<<<<<<< HEAD
if (topic.TryGetOffset(item.TopicPartition, out Offset tp) && tp != item.Offset)
=======
if (topic.TryGetOffset(item.TopicPartition, out var tp) && tp != item.Offset)
>>>>>>> Release 2.3 (#38)
{
ExternalStateExtensions.PauseOrReset(topic, item, pause, reset);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,13 @@ public static IReadOnlyCollection<TopicPartitionOffset> CommitState<TKey, TValue
}

public static void PauseOrReset<TKey, TValue>(
<<<<<<< HEAD
SubscriptionTopicWrapper<TKey, TValue> topic,
TopicPartitionOffset item,
=======
SubscriptionTopicWrapper<TKey, TValue> topic,
TopicPartitionOffset item,
>>>>>>> Release 2.3 (#38)
List<TopicPartition> pause,
List<TopicPartitionOffset> reset)
{
Expand Down Expand Up @@ -112,7 +117,11 @@ public static void CommitOffsetIfNeeded<TKey, TValue>(
}
else if (item.Offset == Offset.End)
{
<<<<<<< HEAD
WatermarkOffsets w = topic.Consumer.QueryWatermarkOffsets(item.TopicPartition, TimeSpan.FromSeconds(5));
=======
var w = topic.Consumer.QueryWatermarkOffsets(item.TopicPartition, TimeSpan.FromSeconds(5));
>>>>>>> Release 2.3 (#38)
toCommit.Add(new TopicPartitionOffset(item.TopicPartition, w.High));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ protected override void ExecuteBatch(
this.Monitor.Batch.Update(BatchStatus.Reading);

bool unassignedBeforeRead = state.GetBatch(
<<<<<<< HEAD
topic, activitySpan, out IReadOnlyCollection<ConsumeResult<TKey, TValue>> batch, cancellationToken);
=======
topic, activitySpan, out var batch, cancellationToken);
>>>>>>> Release 2.3 (#38)

cancellationToken.ThrowIfCancellationRequested();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,14 @@ public SubscriptionTopicWrapper(IKafkaFactory kafkaFactory,

public Func<IReadOnlyCollection<TopicPartition>, IReadOnlyCollection<TopicPartitionOffset>>? ExternalState { get; set; }

<<<<<<< HEAD
public bool TryGetOffset(TopicPartition tp, out Offset result)
{
return this._offsets.TryGetValue(tp, out result);
}
=======
public bool TryGetOffset(TopicPartition tp, out Offset result) => this._offsets.TryGetValue(tp, out result);
>>>>>>> Release 2.3 (#38)

public void OnAssign(IReadOnlyCollection<TopicPartitionOffset> items)
{
Expand Down Expand Up @@ -230,6 +234,7 @@ public void OnCommit(IReadOnlyCollection<TopicPartitionOffset> committed)
}

public void OnReset(IReadOnlyCollection<TopicPartitionOffset> items)
<<<<<<< HEAD
{
if (items.Count > 0)
{
Expand Down Expand Up @@ -300,6 +305,81 @@ private bool OnPauseEnumerate(IEnumerable<TopicPartition> items)

this.Logger.PartitionsPaused(this.Monitor.Name, result);

return this.CleanupBuffer(x => result.Any(v => v == x.TopicPartition), "partition paused");
=======
{
if (items.Count > 0)
{
List<TopicPartitionOffset> reset = new(items.Count);
List<TopicPartitionOffset> resume = new(items.Count);

foreach (var tpo in items)
{
if (this._paused.Remove(tpo.TopicPartition))
{
resume.Add(tpo);
}
else
{
reset.Add(tpo);
}
}

if (reset.Count > 0)
{
this.Logger.OffsetsReset(this.Monitor.Name, reset);
this.CleanupBuffer(x => reset.Any(v => v.TopicPartition == x.TopicPartition), "partition offset reset");
}

if (resume.Count > 0)
{
this.Consumer.Resume(resume.Select(x => x.TopicPartition));
this.Logger.PartitionsResumed(this.Monitor.Name, resume);
this.CleanupBuffer(x => resume.Any(v => v.TopicPartition == x.TopicPartition), "partition offset resume");
}
>>>>>>> Release 2.3 (#38)
}

return false;
}

public bool OnPause(IReadOnlyCollection<TopicPartition> items)
{
return items.Count > 0 && this.OnPauseEnumerate(items);
}

private bool OnPauseEnumerate(IEnumerable<TopicPartition> items)
{
List<TopicPartition> result = new();

foreach (TopicPartition tp in items)
{
if (this.Consumer.Assignment.Contains(tp) && !this._paused.Contains(tp))
{
result.Add(tp);
}
}

if (result.Count > 0)
{
try
{
this.Consumer.Pause(result);
}
catch (Exception e)
{
e.DoNotRetryBatch();
throw;
}

foreach (var r in result)
{
this._paused.Add(r);
this._offsets[r] = ExternalOffset.Paused;
}

this.Logger.PartitionsPaused(this.Monitor.Name, result);

return this.CleanupBuffer(x => result.Any(v => v == x.TopicPartition), "partition paused");
}

Expand Down
39 changes: 39 additions & 0 deletions src/Epam.Kafka/Internals/KafkaFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,21 @@ public IConsumer<TKey, TValue> CreateConsumer<TKey, TValue>(ConsumerConfig confi

ILogger logger = this._loggerFactory.CreateLogger(LoggerCategoryName);

<<<<<<< HEAD
ObservableConsumer<TKey, TValue> consumer;

try
{
consumer = new ObservableConsumer<TKey, TValue>(builder);
=======
ILogger fl = this._loggerFactory.CreateLogger(LoggerCategoryName);

try
{
IConsumer<TKey, TValue> consumer = new ObservableConsumer<TKey, TValue>(builder);

fl.ConsumerCreateOk(PrepareConfigForLogs(config), typeof(TKey), typeof(TValue));
>>>>>>> Release 2.3 (#38)

logger.ConsumerCreateOk(PrepareConfigForLogs(config), typeof(TKey), typeof(TValue), oauthSet, logSet);
}
Expand Down Expand Up @@ -212,12 +222,21 @@ public IProducer<TKey, TValue> CreateProducer<TKey, TValue>(ProducerConfig confi
}
catch (InvalidOperationException)
{
<<<<<<< HEAD
// handler already set
if (clusterOptions.OauthHandlerThrow)
{
throw;
}
}
=======
} // handler already set
}

try
{
builder.SetLogHandler((_, m) => this._loggerFactory.CreateLogger(logHandler).KafkaLogHandler(m));
>>>>>>> Release 2.3 (#38)
}

ILogger logger = this._loggerFactory.CreateLogger(LoggerCategoryName);
Expand All @@ -226,7 +245,13 @@ public IProducer<TKey, TValue> CreateProducer<TKey, TValue>(ProducerConfig confi

try
{
<<<<<<< HEAD
producer = new(builder);
=======
ObservableProducer<TKey, TValue> producer = new(builder);

fl.ProducerCreateOk(PrepareConfigForLogs(config), typeof(TKey), typeof(TValue));
>>>>>>> Release 2.3 (#38)

logger.ProducerCreateOk(PrepareConfigForLogs(config), typeof(TKey), typeof(TValue), oauthSet, logSet);
}
Expand All @@ -246,13 +271,23 @@ public IClient GetOrCreateClient(string? cluster = null)

KafkaClusterOptions clusterOptions = this.GetAndValidateClusterOptions(cluster);

<<<<<<< HEAD
if (!this._clients.TryGetValue(clusterOptions, out SharedClient? result))
=======
SharedClient? result;

lock (this._syncObj)
>>>>>>> Release 2.3 (#38)
{
lock (this._syncObj)
{
<<<<<<< HEAD
if (!this._clients.TryGetValue(clusterOptions, out result))
{
result = new SharedClient(this, cluster);
=======
result = new SharedClient(this, cluster);
>>>>>>> Release 2.3 (#38)

this._clients.Add(clusterOptions, result);
}
Expand Down Expand Up @@ -334,6 +369,7 @@ private KafkaClusterOptions GetAndValidateClusterOptions(string? cluster)
ValidateLogicalName(cluster, "cluster");

// save cluster name for further health check
<<<<<<< HEAD
// https://learn.microsoft.com/en-us/dotnet/standard/collections/thread-safe/
// If you're only reading from a shared collection, then you can use the classes in the System.Collections.Generic namespace
if (!this.UsedClusters.Contains(cluster!))
Expand All @@ -343,6 +379,9 @@ private KafkaClusterOptions GetAndValidateClusterOptions(string? cluster)
this.UsedClusters.Add(cluster!);
}
}
=======
this.UsedClusters.Add(cluster!);
>>>>>>> Release 2.3 (#38)

try
{
Expand Down
4 changes: 4 additions & 0 deletions src/Epam.Kafka/Options/KafkaClusterOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ public sealed class KafkaClusterOptions : IOptions<KafkaClusterOptions>

internal bool UsedByFactory { get; set; }

<<<<<<< HEAD
internal Action<IClient, string?>? OauthHandler { get; private set; }
internal bool OauthHandlerThrow { get; private set; }
=======
internal Action<IClient, string>? OauthHandler { get; private set; }
>>>>>>> Release 2.3 (#38)
internal IAuthenticationHeaderValueProvider? AuthenticationHeaderValueProvider { get; set; }

KafkaClusterOptions IOptions<KafkaClusterOptions>.Value => this;
Expand Down

0 comments on commit 68f8b53

Please sign in to comment.