Skip to content

Commit

Permalink
implement-inmemory-stream (#233)
Browse files Browse the repository at this point in the history
* Implement InMemoryStream not implemented methods
* update implementation
  • Loading branch information
WeihanLi authored Dec 15, 2024
1 parent 611a31e commit 339a0c4
Showing 1 changed file with 45 additions and 13 deletions.
58 changes: 45 additions & 13 deletions src/WeihanLi.Common/Helpers/InMemoryStream.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
// Copyright (c) Weihan Li. All rights reserved.
// Licensed under the Apache license.

using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using WeihanLi.Common.Models;
using WeihanLi.Extensions;

Expand Down Expand Up @@ -49,8 +47,9 @@ public interface IStream<T>

public sealed class InMemoryStream<T>(string name, IComparer<T>? comparer = null) : IStream<T>
{
private readonly PriorityQueue<StreamMessage<T>, T> _messages = new(comparer);
private readonly List<StreamMessage<T>> _messages = new();
private readonly ConcurrentDictionary<string, StreamGroupInfo<T>> _groups = new();
private readonly IComparer<T> _comparer = comparer ?? Comparer<T>.Default;

public string StreamName => name;

Expand All @@ -74,7 +73,7 @@ public Task AddAsync(T id, Dictionary<string, string> fields, DateTimeOffset? ti
Timestamp = timestamp ?? DateTimeOffset.Now,
Properties = properties ?? new()
};
_messages.Enqueue(message, id);
_messages.Add(message);

return Task.CompletedTask;
}
Expand All @@ -83,7 +82,7 @@ public Task AddGroupAsync(string groupName, T offset, CancellationToken cancella
{
if (_groups.ContainsKey(groupName))
{
throw new InvalidOperationException($"Group [{groupName}] not exists");
throw new InvalidOperationException($"Group [{groupName}] already exists");
}

_groups[groupName] = new StreamGroupInfo<T>()
Expand All @@ -96,13 +95,43 @@ public Task AddGroupAsync(string groupName, T offset, CancellationToken cancella

public Task<int> CountAsync(T? min = default, T? max = default, RangeInclusion inclusion = default, CancellationToken cancellationToken = default)
{
// TODO: support min/max filter
return _messages.Count.WrapTask();
var count = _messages.Count;
if (min != null || max != null)
{
count = _messages.Count(item =>
{
var id = item.Id;
var isInRange = true;
if (min != null)
{
isInRange = inclusion.HasFlag(RangeInclusion.IncludeLowerBound) ? _comparer.Compare(id, min) >= 0 : _comparer.Compare(id, min) > 0;
}
if (max != null)
{
isInRange = inclusion.HasFlag(RangeInclusion.IncludeUpperBound) ? _comparer.Compare(id, max) <= 0 : _comparer.Compare(id, max) < 0;
}
return isInRange;
});
}
return Task.FromResult(count);
}

public IAsyncEnumerable<StreamMessage<T>> FetchAsync(T lastId, int count, Ordering order = Ordering.Ascending, CancellationToken cancellationToken = default)
public async IAsyncEnumerable<StreamMessage<T>> FetchAsync(T lastId, int count, Ordering order = Ordering.Ascending, [EnumeratorCancellation] CancellationToken cancellationToken = default)

Check warning on line 119 in src/WeihanLi.Common/Helpers/InMemoryStream.cs

View workflow job for this annotation

GitHub Actions / Running tests on ubuntu-latest

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 119 in src/WeihanLi.Common/Helpers/InMemoryStream.cs

View workflow job for this annotation

GitHub Actions / Running tests on ubuntu-latest

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 119 in src/WeihanLi.Common/Helpers/InMemoryStream.cs

View workflow job for this annotation

GitHub Actions / Running tests on macOS-latest

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 119 in src/WeihanLi.Common/Helpers/InMemoryStream.cs

View workflow job for this annotation

GitHub Actions / Running tests on macOS-latest

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.
{
throw new NotImplementedException();
var messages = order == Ordering.Ascending ? _messages.OrderBy(item => item.Id) : _messages.OrderByDescending(item => item.Id);
var fetchedCount = 0;
foreach (var message in messages)
{
if (fetchedCount >= count)
{
yield break;
}
if (_comparer.Compare(message.Id, lastId) > 0)
{
yield return message;
fetchedCount++;
}
}
}

public Task<StreamGroupInfo<T>?> GroupInfoAsync(string groupName, CancellationToken cancellationToken = default)
Expand All @@ -122,11 +151,14 @@ public Task<IReadOnlyCollection<StreamGroupInfo<T>>> GroupsAsync(CancellationTok

public Task<StreamInfo<T>> InfoAsync(CancellationToken cancellationToken = default)
{
var minMessage = _messages.MinBy(item => item.Id);
var maxMessage = _messages.MaxBy(item => item.Id);
var streamInfo = new StreamInfo<T>
{
// TODO: update min/max from messages
MinId = default,
MaxId = default,
MinId = minMessage?.Id ?? default,

Check failure on line 158 in src/WeihanLi.Common/Helpers/InMemoryStream.cs

View workflow job for this annotation

GitHub Actions / Running tests on ubuntu-latest

'T' cannot be made nullable.

Check failure on line 158 in src/WeihanLi.Common/Helpers/InMemoryStream.cs

View workflow job for this annotation

GitHub Actions / Running tests on ubuntu-latest

'T' cannot be made nullable.

Check failure on line 158 in src/WeihanLi.Common/Helpers/InMemoryStream.cs

View workflow job for this annotation

GitHub Actions / Running tests on ubuntu-latest

'T' cannot be made nullable.

Check failure on line 158 in src/WeihanLi.Common/Helpers/InMemoryStream.cs

View workflow job for this annotation

GitHub Actions / Running tests on ubuntu-latest

'T' cannot be made nullable.

Check failure on line 158 in src/WeihanLi.Common/Helpers/InMemoryStream.cs

View workflow job for this annotation

GitHub Actions / Running tests on macOS-latest

'T' cannot be made nullable.

Check failure on line 158 in src/WeihanLi.Common/Helpers/InMemoryStream.cs

View workflow job for this annotation

GitHub Actions / Running tests on macOS-latest

'T' cannot be made nullable.

Check failure on line 158 in src/WeihanLi.Common/Helpers/InMemoryStream.cs

View workflow job for this annotation

GitHub Actions / Running tests on macOS-latest

'T' cannot be made nullable.

Check failure on line 158 in src/WeihanLi.Common/Helpers/InMemoryStream.cs

View workflow job for this annotation

GitHub Actions / Running tests on macOS-latest

'T' cannot be made nullable.
MinTimestamp = minMessage?.Timestamp ?? default,
MaxId = maxMessage?.Id ?? default,

Check failure on line 160 in src/WeihanLi.Common/Helpers/InMemoryStream.cs

View workflow job for this annotation

GitHub Actions / Running tests on ubuntu-latest

'T' cannot be made nullable.

Check failure on line 160 in src/WeihanLi.Common/Helpers/InMemoryStream.cs

View workflow job for this annotation

GitHub Actions / Running tests on ubuntu-latest

'T' cannot be made nullable.

Check failure on line 160 in src/WeihanLi.Common/Helpers/InMemoryStream.cs

View workflow job for this annotation

GitHub Actions / Running tests on ubuntu-latest

'T' cannot be made nullable.

Check failure on line 160 in src/WeihanLi.Common/Helpers/InMemoryStream.cs

View workflow job for this annotation

GitHub Actions / Running tests on ubuntu-latest

'T' cannot be made nullable.

Check failure on line 160 in src/WeihanLi.Common/Helpers/InMemoryStream.cs

View workflow job for this annotation

GitHub Actions / Running tests on macOS-latest

'T' cannot be made nullable.

Check failure on line 160 in src/WeihanLi.Common/Helpers/InMemoryStream.cs

View workflow job for this annotation

GitHub Actions / Running tests on macOS-latest

'T' cannot be made nullable.

Check failure on line 160 in src/WeihanLi.Common/Helpers/InMemoryStream.cs

View workflow job for this annotation

GitHub Actions / Running tests on macOS-latest

'T' cannot be made nullable.

Check failure on line 160 in src/WeihanLi.Common/Helpers/InMemoryStream.cs

View workflow job for this annotation

GitHub Actions / Running tests on macOS-latest

'T' cannot be made nullable.
MaxTimestamp = maxMessage?.Timestamp ?? default,
Count = _messages.Count
};
return streamInfo.WrapTask();
Expand Down

0 comments on commit 339a0c4

Please sign in to comment.