diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/MinMaxState.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/MinMaxState.cs index 862beda2e..4f300eda1 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/MinMaxState.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/MinMaxState.cs @@ -2,11 +2,35 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License // ********************************************************************* +using System.Collections.Generic; using System.ComponentModel; using System.Runtime.Serialization; namespace Microsoft.StreamProcessing.Internal { + /// <summary> + /// The value representation with its timestamp. + /// </summary> + /// <typeparam name="T">The type of the underlying elements being aggregated.</typeparam> + [DataContract] + [EditorBrowsable(EditorBrowsableState.Never)] + public struct ValueAndTimestamp<T> + { + /// <summary> + /// The timestamp of the payload. + /// </summary> + [DataMember] + [EditorBrowsable(EditorBrowsableState.Never)] + public long timestamp; + + /// <summary> + /// Payload of the event + /// </summary> + [DataMember] + [EditorBrowsable(EditorBrowsableState.Never)] + public T value; + } + /// <summary> /// The state object used in minimum and maximum aggregates. /// </summary> @@ -22,6 +46,13 @@ public struct MinMaxState<T> [EditorBrowsable(EditorBrowsableState.Never)] public SortedMultiSet<T> savedValues; + /// <summary> + /// List of values and its timestamp + /// </summary> + [DataMember] + [EditorBrowsable(EditorBrowsableState.Never)] + public ElasticCircularBuffer<ValueAndTimestamp<T>> values; + /// <summary> /// The current value if the aggregate were to be computed immediately. /// </summary> diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMaxAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMaxAggregate.cs deleted file mode 100644 index fe31f2630..000000000 --- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMaxAggregate.cs +++ /dev/null @@ -1,96 +0,0 @@ -// ********************************************************************* -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License -// ********************************************************************* -using System; -using System.Collections.Generic; -using System.Diagnostics.Contracts; -using System.Linq.Expressions; -using Microsoft.StreamProcessing.Internal; - -namespace Microsoft.StreamProcessing.Aggregates -{ - internal sealed class SlidingMaxAggregate<T> : IAggregate<T, MinMaxState<T>, T> - { - private static readonly long InvalidSyncTime = StreamEvent.MinSyncTime - 1; - private readonly Comparison<T> comparer; - - public SlidingMaxAggregate(QueryContainer container) : this(ComparerExpression<T>.Default, container) { } - - public SlidingMaxAggregate(IComparerExpression<T> comparer, QueryContainer container) - { - Contract.Requires(comparer != null); - this.comparer = comparer.GetCompareExpr().Compile(); - - var generator = comparer.CreateSortedDictionaryGenerator<T, long>(container); - Expression<Func<Func<SortedDictionary<T, long>>, MinMaxState<T>>> template - = (g) => new MinMaxState<T> { savedValues = new SortedMultiSet<T>(g), currentValue = default, currentTimestamp = InvalidSyncTime }; - var replaced = template.ReplaceParametersInBody(generator); - this.initialState = Expression.Lambda<Func<MinMaxState<T>>>(replaced); - } - - private readonly Expression<Func<MinMaxState<T>>> initialState; - public Expression<Func<MinMaxState<T>>> InitialState() => initialState; - - public Expression<Func<MinMaxState<T>, long, T, MinMaxState<T>>> Accumulate() - => (state, timestamp, input) => Accumulate(state, timestamp, input); - - private MinMaxState<T> Accumulate(MinMaxState<T> state, long timestamp, T input) - { - if (timestamp == state.currentTimestamp) - { - if (this.comparer(input, state.currentValue) > 0) - state.currentValue = input; - } - else - { - if (state.currentTimestamp != InvalidSyncTime) - state.savedValues.Add(state.currentValue); - state.currentTimestamp = timestamp; - state.currentValue = input; - } - return state; - } - - public Expression<Func<MinMaxState<T>, long, T, MinMaxState<T>>> Deaccumulate() - => (state, timestamp, input) => state; // never invoked, hence not implemented - - public Expression<Func<MinMaxState<T>, MinMaxState<T>, MinMaxState<T>>> Difference() - => (leftSet, rightSet) => Difference(leftSet, rightSet); - - private static MinMaxState<T> Difference(MinMaxState<T> leftSet, MinMaxState<T> rightSet) - { - if (leftSet.currentTimestamp != InvalidSyncTime) - { - leftSet.currentTimestamp = InvalidSyncTime; - leftSet.savedValues.Add(leftSet.currentValue); - } - if (rightSet.currentTimestamp != InvalidSyncTime) - { - rightSet.currentTimestamp = InvalidSyncTime; - rightSet.savedValues.Add(rightSet.currentValue); - } - - leftSet.savedValues.RemoveAll(rightSet.savedValues); - return leftSet; - } - - public Expression<Func<MinMaxState<T>, T>> ComputeResult() => state => ComputeResult(state); - - private T ComputeResult(MinMaxState<T> state) - { - if (state.savedValues.IsEmpty) - return (state.currentTimestamp == InvalidSyncTime) ? default : state.currentValue; - else - { - if (state.currentTimestamp == InvalidSyncTime) - return state.savedValues.Last(); - else - { - var last = state.savedValues.Last(); - return this.comparer(last, state.currentValue) > 0 ? last : state.currentValue; - } - } - } - } -} diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMinAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMinAggregate.cs deleted file mode 100644 index 9ad193bec..000000000 --- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMinAggregate.cs +++ /dev/null @@ -1,96 +0,0 @@ -// ********************************************************************* -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License -// ********************************************************************* -using System; -using System.Collections.Generic; -using System.Diagnostics.Contracts; -using System.Linq.Expressions; -using Microsoft.StreamProcessing.Internal; - -namespace Microsoft.StreamProcessing.Aggregates -{ - internal sealed class SlidingMinAggregate<T> : IAggregate<T, MinMaxState<T>, T> - { - private static readonly long InvalidSyncTime = StreamEvent.MinSyncTime - 1; - private readonly Comparison<T> comparer; - - public SlidingMinAggregate(QueryContainer container) : this(ComparerExpression<T>.Default, container) { } - - public SlidingMinAggregate(IComparerExpression<T> comparer, QueryContainer container) - { - Contract.Requires(comparer != null); - this.comparer = comparer.GetCompareExpr().Compile(); - - var generator = comparer.CreateSortedDictionaryGenerator<T, long>(container); - Expression<Func<Func<SortedDictionary<T, long>>, MinMaxState<T>>> template - = (g) => new MinMaxState<T> { savedValues = new SortedMultiSet<T>(g), currentValue = default, currentTimestamp = InvalidSyncTime }; - var replaced = template.ReplaceParametersInBody(generator); - this.initialState = Expression.Lambda<Func<MinMaxState<T>>>(replaced); - } - - private readonly Expression<Func<MinMaxState<T>>> initialState; - public Expression<Func<MinMaxState<T>>> InitialState() => initialState; - - public Expression<Func<MinMaxState<T>, long, T, MinMaxState<T>>> Accumulate() - => (state, timestamp, input) => Accumulate(state, timestamp, input); - - private MinMaxState<T> Accumulate(MinMaxState<T> state, long timestamp, T input) - { - if (timestamp == state.currentTimestamp) - { - if (this.comparer(input, state.currentValue) < 0) - state.currentValue = input; - } - else - { - if (state.currentTimestamp != InvalidSyncTime) - state.savedValues.Add(state.currentValue); - state.currentTimestamp = timestamp; - state.currentValue = input; - } - return state; - } - - public Expression<Func<MinMaxState<T>, long, T, MinMaxState<T>>> Deaccumulate() - => (state, timestamp, input) => state; // never invoked, hence not implemented - - public Expression<Func<MinMaxState<T>, MinMaxState<T>, MinMaxState<T>>> Difference() - => (leftSet, rightSet) => Difference(leftSet, rightSet); - - private static MinMaxState<T> Difference(MinMaxState<T> leftSet, MinMaxState<T> rightSet) - { - if (leftSet.currentTimestamp != InvalidSyncTime) - { - leftSet.currentTimestamp = InvalidSyncTime; - leftSet.savedValues.Add(leftSet.currentValue); - } - if (rightSet.currentTimestamp != InvalidSyncTime) - { - rightSet.currentTimestamp = InvalidSyncTime; - rightSet.savedValues.Add(rightSet.currentValue); - } - - leftSet.savedValues.RemoveAll(rightSet.savedValues); - return leftSet; - } - - public Expression<Func<MinMaxState<T>, T>> ComputeResult() => state => ComputeResult(state); - - private T ComputeResult(MinMaxState<T> state) - { - if (state.savedValues.IsEmpty) - return (state.currentTimestamp == InvalidSyncTime) ? default : state.currentValue; - else - { - if (state.currentTimestamp == InvalidSyncTime) - return state.savedValues.First(); - else - { - var first = state.savedValues.First(); - return this.comparer(first, state.currentValue) < 0 ? first : state.currentValue; - } - } - } - } -} diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMinMaxAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMinMaxAggregate.cs new file mode 100644 index 000000000..895162d57 --- /dev/null +++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMinMaxAggregate.cs @@ -0,0 +1,162 @@ +// ********************************************************************* +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License +// ********************************************************************* +using System; +using System.Diagnostics.Contracts; +using System.Linq.Expressions; +using Microsoft.StreamProcessing.Internal; + +namespace Microsoft.StreamProcessing.Aggregates +{ + /* The following invariant holds for the state: + * 1. The 'values' list is always in decreasing order of payload - that happens as we append only smaller values than last. + * 2. The 'values.timestamp' is always in increasing order - as the new values are always appended. + * 3. The 'currentTimestamp' is higher than all timestamps of the 'state.values' list + * 4. The 'currentValue' payload need not be the lowest (relative to 'state.values') + * - + * Further for usage from HoppingPipe: + * 5. The 'ecq.state.values' list will always be empty, optimizing difference operation + * 6. The difference calculation happens by timestamp, and uses one bulk remove (RemoveRange/Clear) for values. + */ + internal abstract class SlidingMinMaxAggregate<T> : IAggregate<T, MinMaxState<T>, T> + { + private static readonly long InvalidSyncTime = StreamEvent.MinSyncTime - 1; + + // The comparer is used as if we are working for Max operation. For Min, we reverse the expression + private readonly Comparison<T> comparer; + + public SlidingMinMaxAggregate(IComparerExpression<T> comparer) + { + Contract.Requires(comparer != null); + this.comparer = comparer.GetCompareExpr().Compile(); + } + + public Expression<Func<MinMaxState<T>>> InitialState() + => () => new MinMaxState<T> { values = new ElasticCircularBuffer<ValueAndTimestamp<T>>(), currentValue = default, currentTimestamp = InvalidSyncTime }; + + public Expression<Func<MinMaxState<T>, long, T, MinMaxState<T>>> Accumulate() + => (state, timestamp, input) => Accumulate(state, timestamp, input); + + private MinMaxState<T> Accumulate(MinMaxState<T> state, long timestamp, T input) + { + if (timestamp == state.currentTimestamp) + { + if (this.comparer(input, state.currentValue) > 0) + state.currentValue = input; + } + else + { + // Save only if new input is smaller, + // If the current input is larger (or equal), we never require older value as they expire before current + if (state.currentTimestamp != InvalidSyncTime && + this.comparer(input, state.currentValue) < 0) + { + PushToCollection(state); + } + state.currentTimestamp = timestamp; + state.currentValue = input; + } + return state; + } + + private void PushToCollection(MinMaxState<T> state) + { + Contract.Assert(state.currentTimestamp != InvalidSyncTime); + Contract.Assert(state.values.Count == 0 || state.values.PeekLast().timestamp <= state.currentTimestamp); // Ensure new timestamp is higher + + while (state.values.Count != 0 && + this.comparer(state.values.PeekLast().value, state.currentValue) <= 0) + { + state.values.PopLast(); + } + + var newValue = new ValueAndTimestamp<T> { timestamp = state.currentTimestamp, value = state.currentValue }; + state.values.Enqueue(ref newValue); + } + + public Expression<Func<MinMaxState<T>, long, T, MinMaxState<T>>> Deaccumulate() + => (state, timestamp, input) => Deaccumulate(state, timestamp, input); + + private static MinMaxState<T> Deaccumulate(MinMaxState<T> state, long timestamp, T input) + { + throw new NotImplementedException($"{nameof(SlidingMinMaxAggregate<T>)} does not implement Deaccumulate()"); + } + + public Expression<Func<MinMaxState<T>, MinMaxState<T>, MinMaxState<T>>> Difference() + => (leftSet, rightSet) => Difference(leftSet, rightSet); + + private static MinMaxState<T> Difference(MinMaxState<T> leftSet, MinMaxState<T> rightSet) + { + long maxRightTimestamp; + + if (rightSet.currentTimestamp != InvalidSyncTime) + { + maxRightTimestamp = rightSet.currentTimestamp; + } + else + { + if (rightSet.values.Count == 0) + return leftSet; + + // The right set will never contain values for HoppingPipe, adding below for completeness + maxRightTimestamp = rightSet.values.PeekLast().timestamp; + } + + if (leftSet.currentTimestamp != InvalidSyncTime && + leftSet.currentTimestamp <= maxRightTimestamp) + { + leftSet.currentTimestamp = InvalidSyncTime; // Discard all values if rightSet's maxTime covers it. + leftSet.values.Clear(); + return leftSet; + } + else + { + while (leftSet.values.Count != 0 && + leftSet.values.PeekFirst().timestamp <= maxRightTimestamp) + { + leftSet.values.Dequeue(); + } + } + + return leftSet; + } + + public Expression<Func<MinMaxState<T>, T>> ComputeResult() => state => ComputeResult(state); + + private T ComputeResult(MinMaxState<T> state) + { + if (state.values.Count == 0) + return (state.currentTimestamp == InvalidSyncTime) ? default : state.currentValue; + else + { + var first = state.values.PeekFirst().value; + + if (state.currentTimestamp == InvalidSyncTime) + return first; + else + { + return this.comparer(first, state.currentValue) > 0 ? first : state.currentValue; + } + } + } + } + + internal sealed class SlidingMaxAggregate<T> : SlidingMinMaxAggregate<T> + { + public SlidingMaxAggregate() + : this(ComparerExpression<T>.Default) { } + + public SlidingMaxAggregate(IComparerExpression<T> comparer) + : base(comparer) { } + } + + internal sealed class SlidingMinAggregate<T> : SlidingMinMaxAggregate<T> + { + public SlidingMinAggregate() + : this(ComparerExpression<T>.Default) { } + + public SlidingMinAggregate(IComparerExpression<T> comparer) + : base(ComparerExpression<T>.Reverse(comparer)) { } + } +} diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs index 15ebdacff..cb6ed60b4 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs @@ -20,24 +20,15 @@ public TopKAggregate(int k, IComparerExpression<T> rankComparer, QueryContainer : this(k, rankComparer, ComparerExpression<T>.Default, container) { } public TopKAggregate(int k, IComparerExpression<T> rankComparer, IComparerExpression<T> overallComparer, QueryContainer container) - : base(ThenOrderBy(Reverse(rankComparer), overallComparer), container) + : base(ThenOrderBy(ComparerExpression<T>.Reverse(rankComparer), overallComparer), container) { Contract.Requires(rankComparer != null); Contract.Requires(overallComparer != null); Contract.Requires(k > 0); - this.compiledRankComparer = Reverse(rankComparer).GetCompareExpr().Compile(); + this.compiledRankComparer = ComparerExpression<T>.Reverse(rankComparer).GetCompareExpr().Compile(); this.k = k; } - private static IComparerExpression<T> Reverse(IComparerExpression<T> comparer) - { - Contract.Requires(comparer != null); - var expression = comparer.GetCompareExpr(); - Expression<Comparison<T>> template = (left, right) => CallInliner.Call(expression, right, left); - var reversedExpression = template.InlineCalls(); - return new ComparerExpression<T>(reversedExpression); - } - private static IComparerExpression<T> ThenOrderBy(IComparerExpression<T> comparer1, IComparerExpression<T> comparer2) { Contract.Requires(comparer1 != null); diff --git a/Sources/Core/Microsoft.StreamProcessing/Collections/CircularBuffer.cs b/Sources/Core/Microsoft.StreamProcessing/Collections/CircularBuffer.cs index 3df34156c..e85838d8f 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Collections/CircularBuffer.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Collections/CircularBuffer.cs @@ -21,7 +21,7 @@ public sealed class CircularBuffer<T> [DataMember] private int capacityMask = 0xfff; [DataMember] - internal T[] Items; + internal T[] Items = null; [DataMember] internal int head = 0; [DataMember] @@ -31,7 +31,7 @@ public sealed class CircularBuffer<T> /// Currently for internal use only - do not use directly. /// </summary> [EditorBrowsable(EditorBrowsableState.Never)] - public CircularBuffer() => this.Items = new T[this.capacityMask + 1]; + public CircularBuffer() { } /// <summary> /// Currently for internal use only - do not use directly. @@ -44,7 +44,6 @@ public CircularBuffer(int capacity) var temp = 8; while (temp <= capacity) temp <<= 1; - this.Items = new T[temp]; this.capacityMask = temp - 1; } @@ -72,6 +71,7 @@ public void Enqueue(ref T value) { int next = (this.tail + 1) & this.capacityMask; if (next == this.head) throw new InvalidOperationException("The list is full!"); + if (this.Items == null) this.Items = new T[this.capacityMask + 1]; this.Items[this.tail] = value; this.tail = next; } @@ -91,6 +91,21 @@ public T Dequeue() return ret; } + /// <summary> + /// Currently for internal use only - do not use directly. + /// </summary> + [MethodImpl(MethodImplOptions.AggressiveInlining)] + [EditorBrowsable(EditorBrowsableState.Never)] + public T PopLast() + { + if (this.head == this.tail) throw new InvalidOperationException("The list is empty!"); + int oldtail = this.tail; + this.tail = (this.tail - 1) & this.capacityMask; + var ret = this.Items[oldtail]; + this.Items[oldtail] = default; + return ret; + } + /// <summary> /// Currently for internal use only - do not use directly. /// </summary> @@ -105,6 +120,13 @@ public T Dequeue() [EditorBrowsable(EditorBrowsableState.Never)] public bool IsEmpty() => this.head == this.tail; + /// <summary> + /// Removes alll elements from the list - do not use directly. + /// </summary> + [MethodImpl(MethodImplOptions.AggressiveInlining)] + [EditorBrowsable(EditorBrowsableState.Never)] + public void Clear() => this.head = this.tail = 0; + /// <summary> /// Currently for internal use only - do not use directly. /// </summary> @@ -150,6 +172,17 @@ public ElasticCircularBuffer() this.Count = 0; } + /// <summary> + /// Currently for internal use only - do not use directly. + /// </summary> + [EditorBrowsable(EditorBrowsableState.Never)] + public void Clear() + { + this.tail = this.head = this.buffers.First; + this.head.Value.Clear(); + this.Count = 0; + } + /// <summary> /// Currently for internal use only - do not use directly. /// </summary> @@ -203,6 +236,27 @@ public T Dequeue() return this.head.Value.Dequeue(); } + /// <summary> + /// Currently for internal use only - do not use directly. + /// </summary> + /// <returns></returns> + [MethodImpl(MethodImplOptions.AggressiveInlining)] + [EditorBrowsable(EditorBrowsableState.Never)] + public T PopLast() + { + if (this.tail.Value.IsEmpty()) + { + if (this.head == this.tail) + throw new InvalidOperationException("The list is empty!"); + + this.tail = this.tail.Previous; + if (this.tail == null) this.tail = this.buffers.Last; + } + + this.Count--; + return this.tail.Value.PopLast(); + } + /// <summary> /// Currently for internal use only - do not use directly. /// </summary> diff --git a/Sources/Core/Microsoft.StreamProcessing/Utilities/ComparerExpression.cs b/Sources/Core/Microsoft.StreamProcessing/Utilities/ComparerExpression.cs index f39ea9dbf..dc1fdf047 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Utilities/ComparerExpression.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Utilities/ComparerExpression.cs @@ -5,7 +5,7 @@ using System; using System.Collections; using System.Collections.Generic; -using System.Linq; +using System.Diagnostics.Contracts; using System.Linq.Expressions; using System.Reflection; @@ -244,6 +244,16 @@ private static ConditionalExpression MakeComparisonExpression(ParameterExpressio internal static bool IsSimpleDefault(IComparerExpression<T> input) => input == Default && input is PrimitiveComparerExpression<T>; + + public static IComparerExpression<T> Reverse(IComparerExpression<T> comparer) + { + Contract.Requires(comparer != null); + + var expression = comparer.GetCompareExpr(); + Expression<Comparison<T>> template = (left, right) => CallInliner.Call(expression, right, left); + var reversedExpression = template.InlineCalls(); + return new ComparerExpression<T>(reversedExpression); + } } internal class PrimitiveComparerExpression<T> : ComparerExpression<T> diff --git a/Sources/Core/Microsoft.StreamProcessing/Windows/Window.cs b/Sources/Core/Microsoft.StreamProcessing/Windows/Window.cs index 198521763..749cce699 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Windows/Window.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Windows/Window.cs @@ -90,7 +90,7 @@ public IAggregate<TSource, MinMaxState<TValue>, TValue> Min<TValue>(Expression<F var aggregate = this.Properties.IsTumbling ? new TumblingMinAggregate<TValue>() : this.Properties.IsConstantDuration - ? new SlidingMinAggregate<TValue>(this.Properties.QueryContainer) + ? new SlidingMinAggregate<TValue>() : (IAggregate<TValue, MinMaxState<TValue>, TValue>)new MinAggregate<TValue>(this.Properties.QueryContainer); return aggregate.SkipNulls().Wrap(selector).ApplyFilter(this.Filter); @@ -108,7 +108,7 @@ public IAggregate<TSource, MinMaxState<TValue>, TValue> Min<TValue>( var aggregate = this.Properties.IsTumbling ? new TumblingMinAggregate<TValue>(comparer) : this.Properties.IsConstantDuration - ? new SlidingMinAggregate<TValue>(comparer, this.Properties.QueryContainer) + ? new SlidingMinAggregate<TValue>(comparer) : (IAggregate<TValue, MinMaxState<TValue>, TValue>)new MinAggregate<TValue>(comparer, this.Properties.QueryContainer); return aggregate.SkipNulls().Wrap(selector).ApplyFilter(this.Filter); @@ -130,7 +130,7 @@ public IAggregate<TSource, MinMaxState<TValue>, TValue> Max<TValue>(Expression<F var aggregate = this.Properties.IsTumbling ? new TumblingMaxAggregate<TValue>() : this.Properties.IsConstantDuration - ? new SlidingMaxAggregate<TValue>(this.Properties.QueryContainer) + ? new SlidingMaxAggregate<TValue>() : (IAggregate<TValue, MinMaxState<TValue>, TValue>)new MaxAggregate<TValue>(this.Properties.QueryContainer); return aggregate.SkipNulls().Wrap(selector).ApplyFilter(this.Filter); @@ -147,7 +147,7 @@ public IAggregate<TSource, MinMaxState<TValue>, TValue> Max<TValue>(Expression<F var aggregate = this.Properties.IsTumbling ? new TumblingMaxAggregate<TValue>(comparer) : this.Properties.IsConstantDuration - ? new SlidingMaxAggregate<TValue>(comparer, this.Properties.QueryContainer) + ? new SlidingMaxAggregate<TValue>(comparer) : (IAggregate<TValue, MinMaxState<TValue>, TValue>)new MaxAggregate<TValue>(comparer, this.Properties.QueryContainer); return aggregate.SkipNulls().Wrap(selector).ApplyFilter(this.Filter); diff --git a/Sources/Test/SimpleTesting/Aggregates/HoppingWindowMinMaxAggregate.cs b/Sources/Test/SimpleTesting/Aggregates/HoppingWindowMinMaxAggregate.cs new file mode 100644 index 000000000..2625ead81 --- /dev/null +++ b/Sources/Test/SimpleTesting/Aggregates/HoppingWindowMinMaxAggregate.cs @@ -0,0 +1,229 @@ +// ********************************************************************* +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License +// ********************************************************************* +using System; +using System.Collections.Generic; +using System.Linq; +using Microsoft.StreamProcessing; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace SimpleTesting +{ + [TestClass] + public class HoppingWindowMinMaxAggregate : TestWithConfigSettingsAndMemoryLeakDetection + { + private const long HopSize = 10; + private const long WindowSize = 4 * HopSize; + + private StreamEvent<int> EndEvent = StreamEvent.CreatePunctuation<int>(StreamEvent.InfinitySyncTime); + private Random random = new Random(Seed: (int)DateTime.UtcNow.Ticks); + + [TestMethod, TestCategory("Gated")] + public void TestHoppingWindowMinMaxAggregateSimple() + { + var input = new[] + { + StreamEvent.CreateStart(1, 10), + StreamEvent.CreateStart(3, 20), + StreamEvent.CreateStart(6, 10), + StreamEvent.CreateStart(8, 30), + StreamEvent.CreateStart(12, 20), + StreamEvent.CreateStart(18, 10), + EndEvent + }; + + var output = input.ToStreamable().HoppingWindowLifetime(10, 5).Max(); + + var correctValues = new[] { 20, 30, 30, 20, 10 }; + var correctEvents = new List<StreamEvent<int>>(); + for (int i = 0; i < correctValues.Length; i++) + { + correctEvents.Add(StreamEvent.CreateStart(5 * (i + 1), correctValues[i])); + correctEvents.Add(StreamEvent.CreateEnd(5 * (i + 2), 5 * (i + 1), correctValues[i])); + } + correctEvents.Add(EndEvent); + + CollectionAssert.AreEqual(correctEvents, output.ToStreamEventArray()); + } + + [TestMethod, TestCategory("Gated")] + public void TestHoppingWindowMinAggregateSimple() + { + var input = new[] + { + StreamEvent.CreateStart(1, 30), + StreamEvent.CreateStart(3, 20), + StreamEvent.CreateStart(6, 30), + StreamEvent.CreateStart(8, 10), + StreamEvent.CreateStart(12, 20), + StreamEvent.CreateStart(18, 30), + EndEvent + }; + + var output = input.ToStreamable().HoppingWindowLifetime(10, 5).Min(); + + var correctValues = new[] { 20, 10, 10, 20, 30 }; + var correctEvents = new List<StreamEvent<int>>(); + for (int i = 0; i < correctValues.Length; i++) + { + correctEvents.Add(StreamEvent.CreateStart(5 * (i + 1), correctValues[i])); + correctEvents.Add(StreamEvent.CreateEnd(5 * (i + 2), 5 * (i + 1), correctValues[i])); + } + correctEvents.Add(EndEvent); + + CollectionAssert.AreEqual(correctEvents, output.ToStreamEventArray()); + } + + [TestMethod, TestCategory("Gated")] + public void TestHoppingWindowMinMaxAggregateRandomDistribution() + { + // Distribution: [1,2,3,4,5] hops : 10% each, Closely-Spaced: 50% + GenerateDataAndTestInput( + numValues: 1000, + valueGenerator: v => random.Next(100, 110), + distanceGenerator: () => + { + var hopType = random.Next(1, 11); + return (hopType < 6) ? (hopType * HopSize) : (hopType - 5); + }); + } + + [TestMethod, TestCategory("Gated")] + public void TestHoppingWindowMinMaxAggregateUnaligned() + { + GenerateDataAndTestInput( + numValues: 1000, + valueGenerator: v => random.Next(100, 110), + distanceGenerator: () => 10, + windowSize: 27); + + GenerateDataAndTestInput( + numValues: 1000, + valueGenerator: v => random.Next(100, 110), + distanceGenerator: () => 10, + windowSize: 17); + } + + [TestMethod, TestCategory("Gated")] + public void TestHoppingWindowMinMaxAggregateAllIncreasing() + { + GenerateDataAndTestInput( + numValues: 1000, + valueGenerator: v => v + random.Next(1, 11), + distanceGenerator: () => HopSize); + } + + [TestMethod, TestCategory("Gated")] + public void TestHoppingWindowMinMaxAggregateAllDecreasing() + { + GenerateDataAndTestInput( + numValues: 1000, + valueGenerator: v => (v == 0) ? 10000 : v - random.Next(1, 11), + distanceGenerator: () => HopSize); + } + + [TestMethod, TestCategory("Gated")] + public void TestHoppingWindowMinMaxAggregateCyclingValues() + { + // Test values 1 -> 2 -> 3 -> 1, and run for 1/2, 1, 2, 4, 8 intervals of hops + for (long distance = HopSize / 2; distance <= HopSize * 8; distance *= 2) + { + GenerateDataAndTestInput( + numValues: 10, + valueGenerator: v => (v % 3) + 1, + distanceGenerator: () => distance); + } + } + + private void GenerateDataAndTestInput( + int numValues, + Func<int, int> valueGenerator, + Func<long> distanceGenerator, + long windowSize = WindowSize) + { + var input = new List<StreamEvent<int>>(); + long maxStartTime = 0; + + long startTime = 0; + int value = 100; + for (int i = 0; i < numValues; i++) + { + startTime += distanceGenerator(); + value = valueGenerator(value); + input.Add(StreamEvent.CreateStart(startTime, value)); + maxStartTime = Math.Max(maxStartTime, startTime); + } + input.Add(EndEvent); + + TestHoppingWindowMaxAggregateInternal(input, maxStartTime, windowSize); + + TestHoppingWindowMinAggregateInternal(input, maxStartTime, windowSize); + } + + private void TestHoppingWindowMinAggregateInternal(IEnumerable<StreamEvent<int>> streamEvents, long maxStartTime, long windowSize) + { + TestHoppingWindowMinMaxAggregateInternal(streamEvents, isMax: true, maxStartTime, windowSize); + } + + private void TestHoppingWindowMaxAggregateInternal(IEnumerable<StreamEvent<int>> streamEvents, long maxStartTime, long windowSize) + { + TestHoppingWindowMinMaxAggregateInternal(streamEvents, isMax: false, maxStartTime, windowSize); + } + + private void TestHoppingWindowMinMaxAggregateInternal(IEnumerable<StreamEvent<int>> streamEvents, bool isMax, long maxStartTime, long windowSize) + { + var output = streamEvents.ToStreamable().HoppingWindowLifetime(windowSize, HopSize).Max(); + + var correct = new List<StreamEvent<int>>(); + + maxStartTime += windowSize; + for (long startTime = 0; startTime < maxStartTime; startTime += HopSize) + { + var eventsInWindow = streamEvents.Where(x => x.StartTime > (startTime - windowSize) && x.StartTime <= startTime); + + if (!eventsInWindow.Any()) + continue; + + var fun = isMax ? new Func<int, int, bool>((x, y) => x > y) : ((x, y) => x < y); + var max = eventsInWindow.Aggregate((state, input) => state.Payload > input.Payload ? state : input); + + correct.Add(StreamEvent.CreateStart(startTime, max.Payload)); + correct.Add(StreamEvent.CreateEnd(startTime + HopSize, startTime, max.Payload)); + } + + var actual = NormalizeToInterval(output.ToStreamEventArray<int>()); + + Assert.IsTrue(Enumerable.SequenceEqual(NormalizeToInterval(correct), actual)); + } + + private IEnumerable<StreamEvent<int>> NormalizeToInterval(IEnumerable<StreamEvent<int>> streamEvents) + { + var result = new List<StreamEvent<int>>(); + + var endEvents = streamEvents.Where(se => se.Kind == StreamEventKind.End || se.Kind == StreamEventKind.Interval); + + if (!endEvents.Any()) + return result; + + var firstEvent = endEvents.First(); + StreamEvent<int> curInterval = StreamEvent.CreateInterval(firstEvent.StartTime, firstEvent.EndTime, firstEvent.Payload); + + foreach (var streamEvent in endEvents.Skip(1)) + { + if ((streamEvent.StartTime == curInterval.EndTime || streamEvent.Kind == StreamEventKind.Interval) && // Merge into current interval if payload is same + streamEvent.Payload == curInterval.Payload) + { + curInterval.OtherTime = streamEvent.EndTime; + } + else + { + result.Add(curInterval); + curInterval = StreamEvent.CreateInterval(streamEvent.StartTime, streamEvent.EndTime, streamEvent.Payload); ; + } + } + result.Add(curInterval); + return result; + } + } +} diff --git a/Sources/Test/SimpleTesting/Program.cs b/Sources/Test/SimpleTesting/Program.cs index e1b97e1e9..80f4397f2 100644 --- a/Sources/Test/SimpleTesting/Program.cs +++ b/Sources/Test/SimpleTesting/Program.cs @@ -49,7 +49,7 @@ public static IStreamable<Empty, TPayload> ToCleanStreamable<TPayload>(this Stre return input.OrderBy(v => v.SyncTime).ToArray().ToStreamable(); } - public static IStreamable<Empty, TPayload> ToStreamable<TPayload>(this StreamEvent<TPayload>[] input) + public static IStreamable<Empty, TPayload> ToStreamable<TPayload>(this IEnumerable<StreamEvent<TPayload>> input) { Invariant.IsNotNull(input, "input"); diff --git a/Sources/Test/SimpleTesting/SimpleTesting.csproj b/Sources/Test/SimpleTesting/SimpleTesting.csproj index a024e50d4..55fa71817 100644 --- a/Sources/Test/SimpleTesting/SimpleTesting.csproj +++ b/Sources/Test/SimpleTesting/SimpleTesting.csproj @@ -63,6 +63,7 @@ <Compile Include="Aggregates\CompoundAggregate.cs" /> <Compile Include="Aggregates\CountAggregate.cs" /> <Compile Include="Aggregates\GroupAggregate.cs" /> + <Compile Include="Aggregates\HoppingWindowMinMaxAggregate.cs" /> <Compile Include="Aggregates\MaxAggregate.cs" /> <Compile Include="Aggregates\MinAggregate.cs" /> <Compile Include="Aggregates\PercentileAggregates.cs" />