diff --git a/src/WeihanLi.Common/Helpers/InMemoryStream.cs b/src/WeihanLi.Common/Helpers/InMemoryStream.cs index 068ed560..4c265f14 100644 --- a/src/WeihanLi.Common/Helpers/InMemoryStream.cs +++ b/src/WeihanLi.Common/Helpers/InMemoryStream.cs @@ -1,7 +1,5 @@ -// Copyright (c) Weihan Li. All rights reserved. -// Licensed under the Apache license. - using System.Collections.Concurrent; +using System.Runtime.CompilerServices; using WeihanLi.Common.Models; using WeihanLi.Extensions; @@ -49,8 +47,9 @@ public interface IStream public sealed class InMemoryStream(string name, IComparer? comparer = null) : IStream { - private readonly PriorityQueue, T> _messages = new(comparer); + private readonly List> _messages = new(); private readonly ConcurrentDictionary> _groups = new(); + private readonly IComparer _comparer = comparer ?? Comparer.Default; public string StreamName => name; @@ -74,7 +73,7 @@ public Task AddAsync(T id, Dictionary fields, DateTimeOffset? ti Timestamp = timestamp ?? DateTimeOffset.Now, Properties = properties ?? new() }; - _messages.Enqueue(message, id); + _messages.Add(message); return Task.CompletedTask; } @@ -83,7 +82,7 @@ public Task AddGroupAsync(string groupName, T offset, CancellationToken cancella { if (_groups.ContainsKey(groupName)) { - throw new InvalidOperationException($"Group [{groupName}] not exists"); + throw new InvalidOperationException($"Group [{groupName}] already exists"); } _groups[groupName] = new StreamGroupInfo() @@ -96,13 +95,43 @@ public Task AddGroupAsync(string groupName, T offset, CancellationToken cancella public Task CountAsync(T? min = default, T? max = default, RangeInclusion inclusion = default, CancellationToken cancellationToken = default) { - // TODO: support min/max filter - return _messages.Count.WrapTask(); + var count = _messages.Count; + if (min != null || max != null) + { + count = _messages.Count(item => + { + var id = item.Id; + var isInRange = true; + if (min != null) + { + isInRange = inclusion.HasFlag(RangeInclusion.IncludeLowerBound) ? _comparer.Compare(id, min) >= 0 : _comparer.Compare(id, min) > 0; + } + if (max != null) + { + isInRange = inclusion.HasFlag(RangeInclusion.IncludeUpperBound) ? _comparer.Compare(id, max) <= 0 : _comparer.Compare(id, max) < 0; + } + return isInRange; + }); + } + return Task.FromResult(count); } - public IAsyncEnumerable> FetchAsync(T lastId, int count, Ordering order = Ordering.Ascending, CancellationToken cancellationToken = default) + public async IAsyncEnumerable> FetchAsync(T lastId, int count, Ordering order = Ordering.Ascending, [EnumeratorCancellation] CancellationToken cancellationToken = default) { - throw new NotImplementedException(); + var messages = order == Ordering.Ascending ? _messages.OrderBy(item => item.Id) : _messages.OrderByDescending(item => item.Id); + var fetchedCount = 0; + foreach (var message in messages) + { + if (fetchedCount >= count) + { + yield break; + } + if (_comparer.Compare(message.Id, lastId) > 0) + { + yield return message; + fetchedCount++; + } + } } public Task?> GroupInfoAsync(string groupName, CancellationToken cancellationToken = default) @@ -122,11 +151,14 @@ public Task>> GroupsAsync(CancellationTok public Task> InfoAsync(CancellationToken cancellationToken = default) { + var minMessage = _messages.MinBy(item => item.Id); + var maxMessage = _messages.MaxBy(item => item.Id); var streamInfo = new StreamInfo { - // TODO: update min/max from messages - MinId = default, - MaxId = default, + MinId = minMessage?.Id ?? default, + MinTimestamp = minMessage?.Timestamp ?? default, + MaxId = maxMessage?.Id ?? default, + MaxTimestamp = maxMessage?.Timestamp ?? default, Count = _messages.Count }; return streamInfo.WrapTask();