diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/MinMaxState.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/MinMaxState.cs
index 862beda2e..4f300eda1 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/MinMaxState.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/MinMaxState.cs
@@ -2,11 +2,35 @@
 // Copyright (c) Microsoft Corporation.  All rights reserved.
 // Licensed under the MIT License
 // *********************************************************************
+using System.Collections.Generic;
 using System.ComponentModel;
 using System.Runtime.Serialization;
 
 namespace Microsoft.StreamProcessing.Internal
 {
+    /// <summary>
+    /// The value representation with its timestamp.
+    /// </summary>
+    /// <typeparam name="T">The type of the underlying elements being aggregated.</typeparam>
+    [DataContract]
+    [EditorBrowsable(EditorBrowsableState.Never)]
+    public struct ValueAndTimestamp<T>
+    {
+        /// <summary>
+        /// The timestamp of the payload.
+        /// </summary>
+        [DataMember]
+        [EditorBrowsable(EditorBrowsableState.Never)]
+        public long timestamp;
+
+        /// <summary>
+        /// Payload of the event
+        /// </summary>
+        [DataMember]
+        [EditorBrowsable(EditorBrowsableState.Never)]
+        public T value;
+    }
+
     /// <summary>
     /// The state object used in minimum and maximum aggregates.
     /// </summary>
@@ -22,6 +46,13 @@ public struct MinMaxState<T>
         [EditorBrowsable(EditorBrowsableState.Never)]
         public SortedMultiSet<T> savedValues;
 
+        /// <summary>
+        /// List of values and its timestamp
+        /// </summary>
+        [DataMember]
+        [EditorBrowsable(EditorBrowsableState.Never)]
+        public ElasticCircularBuffer<ValueAndTimestamp<T>> values;
+
         /// <summary>
         /// The current value if the aggregate were to be computed immediately.
         /// </summary>
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMaxAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMaxAggregate.cs
deleted file mode 100644
index fe31f2630..000000000
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMaxAggregate.cs
+++ /dev/null
@@ -1,96 +0,0 @@
-// *********************************************************************
-// Copyright (c) Microsoft Corporation.  All rights reserved.
-// Licensed under the MIT License
-// *********************************************************************
-using System;
-using System.Collections.Generic;
-using System.Diagnostics.Contracts;
-using System.Linq.Expressions;
-using Microsoft.StreamProcessing.Internal;
-
-namespace Microsoft.StreamProcessing.Aggregates
-{
-    internal sealed class SlidingMaxAggregate<T> : IAggregate<T, MinMaxState<T>, T>
-    {
-        private static readonly long InvalidSyncTime = StreamEvent.MinSyncTime - 1;
-        private readonly Comparison<T> comparer;
-
-        public SlidingMaxAggregate(QueryContainer container) : this(ComparerExpression<T>.Default, container) { }
-
-        public SlidingMaxAggregate(IComparerExpression<T> comparer, QueryContainer container)
-        {
-            Contract.Requires(comparer != null);
-            this.comparer = comparer.GetCompareExpr().Compile();
-
-            var generator = comparer.CreateSortedDictionaryGenerator<T, long>(container);
-            Expression<Func<Func<SortedDictionary<T, long>>, MinMaxState<T>>> template
-                = (g) => new MinMaxState<T> { savedValues = new SortedMultiSet<T>(g), currentValue = default, currentTimestamp = InvalidSyncTime };
-            var replaced = template.ReplaceParametersInBody(generator);
-            this.initialState = Expression.Lambda<Func<MinMaxState<T>>>(replaced);
-        }
-
-        private readonly Expression<Func<MinMaxState<T>>> initialState;
-        public Expression<Func<MinMaxState<T>>> InitialState() => initialState;
-
-        public Expression<Func<MinMaxState<T>, long, T, MinMaxState<T>>> Accumulate()
-            => (state, timestamp, input) => Accumulate(state, timestamp, input);
-
-        private MinMaxState<T> Accumulate(MinMaxState<T> state, long timestamp, T input)
-        {
-            if (timestamp == state.currentTimestamp)
-            {
-                if (this.comparer(input, state.currentValue) > 0)
-                    state.currentValue = input;
-            }
-            else
-            {
-                if (state.currentTimestamp != InvalidSyncTime)
-                    state.savedValues.Add(state.currentValue);
-                state.currentTimestamp = timestamp;
-                state.currentValue = input;
-            }
-            return state;
-        }
-
-        public Expression<Func<MinMaxState<T>, long, T, MinMaxState<T>>> Deaccumulate()
-            => (state, timestamp, input) => state; // never invoked, hence not implemented
-
-        public Expression<Func<MinMaxState<T>, MinMaxState<T>, MinMaxState<T>>> Difference()
-            => (leftSet, rightSet) => Difference(leftSet, rightSet);
-
-        private static MinMaxState<T> Difference(MinMaxState<T> leftSet, MinMaxState<T> rightSet)
-        {
-            if (leftSet.currentTimestamp != InvalidSyncTime)
-            {
-                leftSet.currentTimestamp = InvalidSyncTime;
-                leftSet.savedValues.Add(leftSet.currentValue);
-            }
-            if (rightSet.currentTimestamp != InvalidSyncTime)
-            {
-                rightSet.currentTimestamp = InvalidSyncTime;
-                rightSet.savedValues.Add(rightSet.currentValue);
-            }
-
-            leftSet.savedValues.RemoveAll(rightSet.savedValues);
-            return leftSet;
-        }
-
-        public Expression<Func<MinMaxState<T>, T>> ComputeResult() => state => ComputeResult(state);
-
-        private T ComputeResult(MinMaxState<T> state)
-        {
-            if (state.savedValues.IsEmpty)
-                return (state.currentTimestamp == InvalidSyncTime) ? default : state.currentValue;
-            else
-            {
-                if (state.currentTimestamp == InvalidSyncTime)
-                    return state.savedValues.Last();
-                else
-                {
-                    var last = state.savedValues.Last();
-                    return this.comparer(last, state.currentValue) > 0 ? last : state.currentValue;
-                }
-            }
-        }
-    }
-}
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMinAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMinAggregate.cs
deleted file mode 100644
index 9ad193bec..000000000
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMinAggregate.cs
+++ /dev/null
@@ -1,96 +0,0 @@
-// *********************************************************************
-// Copyright (c) Microsoft Corporation.  All rights reserved.
-// Licensed under the MIT License
-// *********************************************************************
-using System;
-using System.Collections.Generic;
-using System.Diagnostics.Contracts;
-using System.Linq.Expressions;
-using Microsoft.StreamProcessing.Internal;
-
-namespace Microsoft.StreamProcessing.Aggregates
-{
-    internal sealed class SlidingMinAggregate<T> : IAggregate<T, MinMaxState<T>, T>
-    {
-        private static readonly long InvalidSyncTime = StreamEvent.MinSyncTime - 1;
-        private readonly Comparison<T> comparer;
-
-        public SlidingMinAggregate(QueryContainer container) : this(ComparerExpression<T>.Default, container) { }
-
-        public SlidingMinAggregate(IComparerExpression<T> comparer, QueryContainer container)
-        {
-            Contract.Requires(comparer != null);
-            this.comparer = comparer.GetCompareExpr().Compile();
-
-            var generator = comparer.CreateSortedDictionaryGenerator<T, long>(container);
-            Expression<Func<Func<SortedDictionary<T, long>>, MinMaxState<T>>> template
-                = (g) => new MinMaxState<T> { savedValues = new SortedMultiSet<T>(g), currentValue = default, currentTimestamp = InvalidSyncTime };
-            var replaced = template.ReplaceParametersInBody(generator);
-            this.initialState = Expression.Lambda<Func<MinMaxState<T>>>(replaced);
-        }
-
-        private readonly Expression<Func<MinMaxState<T>>> initialState;
-        public Expression<Func<MinMaxState<T>>> InitialState() => initialState;
-
-        public Expression<Func<MinMaxState<T>, long, T, MinMaxState<T>>> Accumulate()
-            => (state, timestamp, input) => Accumulate(state, timestamp, input);
-
-        private MinMaxState<T> Accumulate(MinMaxState<T> state, long timestamp, T input)
-        {
-            if (timestamp == state.currentTimestamp)
-            {
-                if (this.comparer(input, state.currentValue) < 0)
-                    state.currentValue = input;
-            }
-            else
-            {
-                if (state.currentTimestamp != InvalidSyncTime)
-                    state.savedValues.Add(state.currentValue);
-                state.currentTimestamp = timestamp;
-                state.currentValue = input;
-            }
-            return state;
-        }
-
-        public Expression<Func<MinMaxState<T>, long, T, MinMaxState<T>>> Deaccumulate()
-            => (state, timestamp, input) => state; // never invoked, hence not implemented
-
-        public Expression<Func<MinMaxState<T>, MinMaxState<T>, MinMaxState<T>>> Difference()
-            => (leftSet, rightSet) => Difference(leftSet, rightSet);
-
-        private static MinMaxState<T> Difference(MinMaxState<T> leftSet, MinMaxState<T> rightSet)
-        {
-            if (leftSet.currentTimestamp != InvalidSyncTime)
-            {
-                leftSet.currentTimestamp = InvalidSyncTime;
-                leftSet.savedValues.Add(leftSet.currentValue);
-            }
-            if (rightSet.currentTimestamp != InvalidSyncTime)
-            {
-                rightSet.currentTimestamp = InvalidSyncTime;
-                rightSet.savedValues.Add(rightSet.currentValue);
-            }
-
-            leftSet.savedValues.RemoveAll(rightSet.savedValues);
-            return leftSet;
-        }
-
-        public Expression<Func<MinMaxState<T>, T>> ComputeResult() => state => ComputeResult(state);
-
-        private T ComputeResult(MinMaxState<T> state)
-        {
-            if (state.savedValues.IsEmpty)
-                return (state.currentTimestamp == InvalidSyncTime) ? default : state.currentValue;
-            else
-            {
-                if (state.currentTimestamp == InvalidSyncTime)
-                    return state.savedValues.First();
-                else
-                {
-                    var first = state.savedValues.First();
-                    return this.comparer(first, state.currentValue) < 0 ? first : state.currentValue;
-                }
-            }
-        }
-    }
-}
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMinMaxAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMinMaxAggregate.cs
new file mode 100644
index 000000000..895162d57
--- /dev/null
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMinMaxAggregate.cs
@@ -0,0 +1,162 @@
+// *********************************************************************
+// Copyright (c) Microsoft Corporation.  All rights reserved.
+// Licensed under the MIT License
+// *********************************************************************
+using System;
+using System.Diagnostics.Contracts;
+using System.Linq.Expressions;
+using Microsoft.StreamProcessing.Internal;
+
+namespace Microsoft.StreamProcessing.Aggregates
+{
+    /* The following invariant holds for the state:
+     * 1. The 'values' list is always in decreasing order of payload - that happens as we append only smaller values than last.
+     * 2. The 'values.timestamp' is always in increasing order - as the new values are always appended.
+     * 3. The 'currentTimestamp' is higher than all timestamps of the 'state.values' list
+     * 4. The 'currentValue' payload need not be the lowest (relative to 'state.values')
+     * -
+     * Further for usage from HoppingPipe:
+     * 5. The 'ecq.state.values' list will always be empty, optimizing difference operation
+     * 6. The difference calculation happens by timestamp, and uses one bulk remove (RemoveRange/Clear) for values.
+    */
+    internal abstract class SlidingMinMaxAggregate<T> : IAggregate<T, MinMaxState<T>, T>
+    {
+        private static readonly long InvalidSyncTime = StreamEvent.MinSyncTime - 1;
+
+        // The comparer is used as if we are working for Max operation. For Min, we reverse the expression
+        private readonly Comparison<T> comparer;
+
+        public SlidingMinMaxAggregate(IComparerExpression<T> comparer)
+        {
+            Contract.Requires(comparer != null);
+            this.comparer = comparer.GetCompareExpr().Compile();
+        }
+
+        public Expression<Func<MinMaxState<T>>> InitialState()
+            => () => new MinMaxState<T> { values = new ElasticCircularBuffer<ValueAndTimestamp<T>>(), currentValue = default, currentTimestamp = InvalidSyncTime };
+
+        public Expression<Func<MinMaxState<T>, long, T, MinMaxState<T>>> Accumulate()
+            => (state, timestamp, input) => Accumulate(state, timestamp, input);
+
+        private MinMaxState<T> Accumulate(MinMaxState<T> state, long timestamp, T input)
+        {
+            if (timestamp == state.currentTimestamp)
+            {
+                if (this.comparer(input, state.currentValue) > 0)
+                    state.currentValue = input;
+            }
+            else
+            {
+                // Save only if new input is smaller,
+                // If the current input is larger (or equal), we never require older value as they expire before current
+                if (state.currentTimestamp != InvalidSyncTime &&
+                    this.comparer(input, state.currentValue) < 0)
+                {
+                    PushToCollection(state);
+                }
+                state.currentTimestamp = timestamp;
+                state.currentValue = input;
+            }
+            return state;
+        }
+
+        private void PushToCollection(MinMaxState<T> state)
+        {
+            Contract.Assert(state.currentTimestamp != InvalidSyncTime);
+            Contract.Assert(state.values.Count == 0 || state.values.PeekLast().timestamp <= state.currentTimestamp); // Ensure new timestamp is higher
+
+            while (state.values.Count != 0 &&
+                   this.comparer(state.values.PeekLast().value, state.currentValue) <= 0)
+            {
+                state.values.PopLast();
+            }
+
+            var newValue = new ValueAndTimestamp<T> { timestamp = state.currentTimestamp, value = state.currentValue };
+            state.values.Enqueue(ref newValue);
+        }
+
+        public Expression<Func<MinMaxState<T>, long, T, MinMaxState<T>>> Deaccumulate()
+            => (state, timestamp, input) => Deaccumulate(state, timestamp, input);
+
+        private static MinMaxState<T> Deaccumulate(MinMaxState<T> state, long timestamp, T input)
+        {
+            throw new NotImplementedException($"{nameof(SlidingMinMaxAggregate<T>)} does not implement Deaccumulate()");
+        }
+
+        public Expression<Func<MinMaxState<T>, MinMaxState<T>, MinMaxState<T>>> Difference()
+            => (leftSet, rightSet) => Difference(leftSet, rightSet);
+
+        private static MinMaxState<T> Difference(MinMaxState<T> leftSet, MinMaxState<T> rightSet)
+        {
+            long maxRightTimestamp;
+
+            if (rightSet.currentTimestamp != InvalidSyncTime)
+            {
+                maxRightTimestamp = rightSet.currentTimestamp;
+            }
+            else
+            {
+                if (rightSet.values.Count == 0)
+                    return leftSet;
+
+                // The right set will never contain values for HoppingPipe, adding below for completeness
+                maxRightTimestamp = rightSet.values.PeekLast().timestamp;
+            }
+
+            if (leftSet.currentTimestamp != InvalidSyncTime &&
+                leftSet.currentTimestamp <= maxRightTimestamp)
+            {
+                leftSet.currentTimestamp = InvalidSyncTime; // Discard all values if rightSet's maxTime covers it.
+                leftSet.values.Clear();
+                return leftSet;
+            }
+            else
+            {
+                while (leftSet.values.Count != 0 &&
+                       leftSet.values.PeekFirst().timestamp <= maxRightTimestamp)
+                {
+                    leftSet.values.Dequeue();
+                }
+            }
+
+            return leftSet;
+        }
+
+        public Expression<Func<MinMaxState<T>, T>> ComputeResult() => state => ComputeResult(state);
+
+        private T ComputeResult(MinMaxState<T> state)
+        {
+            if (state.values.Count == 0)
+                return (state.currentTimestamp == InvalidSyncTime) ? default : state.currentValue;
+            else
+            {
+                var first = state.values.PeekFirst().value;
+
+                if (state.currentTimestamp == InvalidSyncTime)
+                    return first;
+                else
+                {
+                    return this.comparer(first, state.currentValue) > 0 ? first : state.currentValue;
+                }
+            }
+        }
+    }
+
+    internal sealed class SlidingMaxAggregate<T> : SlidingMinMaxAggregate<T>
+    {
+        public SlidingMaxAggregate()
+            : this(ComparerExpression<T>.Default) { }
+
+        public SlidingMaxAggregate(IComparerExpression<T> comparer)
+            : base(comparer) { }
+    }
+
+    internal sealed class SlidingMinAggregate<T> : SlidingMinMaxAggregate<T>
+    {
+        public SlidingMinAggregate()
+            : this(ComparerExpression<T>.Default) { }
+
+        public SlidingMinAggregate(IComparerExpression<T> comparer)
+            : base(ComparerExpression<T>.Reverse(comparer)) { }
+    }
+}
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs
index 15ebdacff..cb6ed60b4 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs
@@ -20,24 +20,15 @@ public TopKAggregate(int k, IComparerExpression<T> rankComparer, QueryContainer
             : this(k, rankComparer, ComparerExpression<T>.Default, container) { }
 
         public TopKAggregate(int k, IComparerExpression<T> rankComparer, IComparerExpression<T> overallComparer, QueryContainer container)
-            : base(ThenOrderBy(Reverse(rankComparer), overallComparer), container)
+            : base(ThenOrderBy(ComparerExpression<T>.Reverse(rankComparer), overallComparer), container)
         {
             Contract.Requires(rankComparer != null);
             Contract.Requires(overallComparer != null);
             Contract.Requires(k > 0);
-            this.compiledRankComparer = Reverse(rankComparer).GetCompareExpr().Compile();
+            this.compiledRankComparer = ComparerExpression<T>.Reverse(rankComparer).GetCompareExpr().Compile();
             this.k = k;
         }
 
-        private static IComparerExpression<T> Reverse(IComparerExpression<T> comparer)
-        {
-            Contract.Requires(comparer != null);
-            var expression = comparer.GetCompareExpr();
-            Expression<Comparison<T>> template = (left, right) => CallInliner.Call(expression, right, left);
-            var reversedExpression = template.InlineCalls();
-            return new ComparerExpression<T>(reversedExpression);
-        }
-
         private static IComparerExpression<T> ThenOrderBy(IComparerExpression<T> comparer1, IComparerExpression<T> comparer2)
         {
             Contract.Requires(comparer1 != null);
diff --git a/Sources/Core/Microsoft.StreamProcessing/Collections/CircularBuffer.cs b/Sources/Core/Microsoft.StreamProcessing/Collections/CircularBuffer.cs
index 3df34156c..e85838d8f 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Collections/CircularBuffer.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Collections/CircularBuffer.cs
@@ -21,7 +21,7 @@ public sealed class CircularBuffer<T>
         [DataMember]
         private int capacityMask = 0xfff;
         [DataMember]
-        internal T[] Items;
+        internal T[] Items = null;
         [DataMember]
         internal int head = 0;
         [DataMember]
@@ -31,7 +31,7 @@ public sealed class CircularBuffer<T>
         /// Currently for internal use only - do not use directly.
         /// </summary>
         [EditorBrowsable(EditorBrowsableState.Never)]
-        public CircularBuffer() => this.Items = new T[this.capacityMask + 1];
+        public CircularBuffer() { }
 
         /// <summary>
         /// Currently for internal use only - do not use directly.
@@ -44,7 +44,6 @@ public CircularBuffer(int capacity)
             var temp = 8;
             while (temp <= capacity) temp <<= 1;
 
-            this.Items = new T[temp];
             this.capacityMask = temp - 1;
         }
 
@@ -72,6 +71,7 @@ public void Enqueue(ref T value)
         {
             int next = (this.tail + 1) & this.capacityMask;
             if (next == this.head) throw new InvalidOperationException("The list is full!");
+            if (this.Items == null) this.Items = new T[this.capacityMask + 1];
             this.Items[this.tail] = value;
             this.tail = next;
         }
@@ -91,6 +91,21 @@ public T Dequeue()
             return ret;
         }
 
+        /// <summary>
+        /// Currently for internal use only - do not use directly.
+        /// </summary>
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        [EditorBrowsable(EditorBrowsableState.Never)]
+        public T PopLast()
+        {
+            if (this.head == this.tail) throw new InvalidOperationException("The list is empty!");
+            int oldtail = this.tail;
+            this.tail = (this.tail - 1) & this.capacityMask;
+            var ret = this.Items[oldtail];
+            this.Items[oldtail] = default;
+            return ret;
+        }
+
         /// <summary>
         /// Currently for internal use only - do not use directly.
         /// </summary>
@@ -105,6 +120,13 @@ public T Dequeue()
         [EditorBrowsable(EditorBrowsableState.Never)]
         public bool IsEmpty() => this.head == this.tail;
 
+        /// <summary>
+        /// Removes alll elements from the list - do not use directly.
+        /// </summary>
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        [EditorBrowsable(EditorBrowsableState.Never)]
+        public void Clear() => this.head = this.tail = 0;
+
         /// <summary>
         /// Currently for internal use only - do not use directly.
         /// </summary>
@@ -150,6 +172,17 @@ public ElasticCircularBuffer()
             this.Count = 0;
         }
 
+        /// <summary>
+        /// Currently for internal use only - do not use directly.
+        /// </summary>
+        [EditorBrowsable(EditorBrowsableState.Never)]
+        public void Clear()
+        {
+            this.tail = this.head = this.buffers.First;
+            this.head.Value.Clear();
+            this.Count = 0;
+        }
+
         /// <summary>
         /// Currently for internal use only - do not use directly.
         /// </summary>
@@ -203,6 +236,27 @@ public T Dequeue()
             return this.head.Value.Dequeue();
         }
 
+        /// <summary>
+        /// Currently for internal use only - do not use directly.
+        /// </summary>
+        /// <returns></returns>
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        [EditorBrowsable(EditorBrowsableState.Never)]
+        public T PopLast()
+        {
+            if (this.tail.Value.IsEmpty())
+            {
+                if (this.head == this.tail)
+                    throw new InvalidOperationException("The list is empty!");
+
+                this.tail = this.tail.Previous;
+                if (this.tail == null) this.tail = this.buffers.Last;
+            }
+
+            this.Count--;
+            return this.tail.Value.PopLast();
+        }
+
         /// <summary>
         /// Currently for internal use only - do not use directly.
         /// </summary>
diff --git a/Sources/Core/Microsoft.StreamProcessing/Utilities/ComparerExpression.cs b/Sources/Core/Microsoft.StreamProcessing/Utilities/ComparerExpression.cs
index f39ea9dbf..dc1fdf047 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Utilities/ComparerExpression.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Utilities/ComparerExpression.cs
@@ -5,7 +5,7 @@
 using System;
 using System.Collections;
 using System.Collections.Generic;
-using System.Linq;
+using System.Diagnostics.Contracts;
 using System.Linq.Expressions;
 using System.Reflection;
 
@@ -244,6 +244,16 @@ private static ConditionalExpression MakeComparisonExpression(ParameterExpressio
 
         internal static bool IsSimpleDefault(IComparerExpression<T> input)
             => input == Default && input is PrimitiveComparerExpression<T>;
+
+        public static IComparerExpression<T> Reverse(IComparerExpression<T> comparer)
+        {
+            Contract.Requires(comparer != null);
+
+            var expression = comparer.GetCompareExpr();
+            Expression<Comparison<T>> template = (left, right) => CallInliner.Call(expression, right, left);
+            var reversedExpression = template.InlineCalls();
+            return new ComparerExpression<T>(reversedExpression);
+        }
     }
 
     internal class PrimitiveComparerExpression<T> : ComparerExpression<T>
diff --git a/Sources/Core/Microsoft.StreamProcessing/Windows/Window.cs b/Sources/Core/Microsoft.StreamProcessing/Windows/Window.cs
index 198521763..749cce699 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Windows/Window.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Windows/Window.cs
@@ -90,7 +90,7 @@ public IAggregate<TSource, MinMaxState<TValue>, TValue> Min<TValue>(Expression<F
             var aggregate = this.Properties.IsTumbling
                 ? new TumblingMinAggregate<TValue>()
                 : this.Properties.IsConstantDuration
-                    ? new SlidingMinAggregate<TValue>(this.Properties.QueryContainer)
+                    ? new SlidingMinAggregate<TValue>()
                     : (IAggregate<TValue, MinMaxState<TValue>, TValue>)new MinAggregate<TValue>(this.Properties.QueryContainer);
 
             return aggregate.SkipNulls().Wrap(selector).ApplyFilter(this.Filter);
@@ -108,7 +108,7 @@ public IAggregate<TSource, MinMaxState<TValue>, TValue> Min<TValue>(
             var aggregate = this.Properties.IsTumbling
                 ? new TumblingMinAggregate<TValue>(comparer)
                 : this.Properties.IsConstantDuration
-                    ? new SlidingMinAggregate<TValue>(comparer, this.Properties.QueryContainer)
+                    ? new SlidingMinAggregate<TValue>(comparer)
                     : (IAggregate<TValue, MinMaxState<TValue>, TValue>)new MinAggregate<TValue>(comparer, this.Properties.QueryContainer);
 
             return aggregate.SkipNulls().Wrap(selector).ApplyFilter(this.Filter);
@@ -130,7 +130,7 @@ public IAggregate<TSource, MinMaxState<TValue>, TValue> Max<TValue>(Expression<F
             var aggregate = this.Properties.IsTumbling
                 ? new TumblingMaxAggregate<TValue>()
                 : this.Properties.IsConstantDuration
-                    ? new SlidingMaxAggregate<TValue>(this.Properties.QueryContainer)
+                    ? new SlidingMaxAggregate<TValue>()
                     : (IAggregate<TValue, MinMaxState<TValue>, TValue>)new MaxAggregate<TValue>(this.Properties.QueryContainer);
 
             return aggregate.SkipNulls().Wrap(selector).ApplyFilter(this.Filter);
@@ -147,7 +147,7 @@ public IAggregate<TSource, MinMaxState<TValue>, TValue> Max<TValue>(Expression<F
             var aggregate = this.Properties.IsTumbling
                 ? new TumblingMaxAggregate<TValue>(comparer)
                 : this.Properties.IsConstantDuration
-                    ? new SlidingMaxAggregate<TValue>(comparer, this.Properties.QueryContainer)
+                    ? new SlidingMaxAggregate<TValue>(comparer)
                     : (IAggregate<TValue, MinMaxState<TValue>, TValue>)new MaxAggregate<TValue>(comparer, this.Properties.QueryContainer);
 
             return aggregate.SkipNulls().Wrap(selector).ApplyFilter(this.Filter);
diff --git a/Sources/Test/SimpleTesting/Aggregates/HoppingWindowMinMaxAggregate.cs b/Sources/Test/SimpleTesting/Aggregates/HoppingWindowMinMaxAggregate.cs
new file mode 100644
index 000000000..2625ead81
--- /dev/null
+++ b/Sources/Test/SimpleTesting/Aggregates/HoppingWindowMinMaxAggregate.cs
@@ -0,0 +1,229 @@
+// *********************************************************************
+// Copyright (c) Microsoft Corporation.  All rights reserved.
+// Licensed under the MIT License
+// *********************************************************************
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Microsoft.StreamProcessing;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+
+namespace SimpleTesting
+{
+    [TestClass]
+    public class HoppingWindowMinMaxAggregate : TestWithConfigSettingsAndMemoryLeakDetection
+    {
+        private const long HopSize = 10;
+        private const long WindowSize = 4 * HopSize;
+
+        private StreamEvent<int> EndEvent = StreamEvent.CreatePunctuation<int>(StreamEvent.InfinitySyncTime);
+        private Random random = new Random(Seed: (int)DateTime.UtcNow.Ticks);
+
+        [TestMethod, TestCategory("Gated")]
+        public void TestHoppingWindowMinMaxAggregateSimple()
+        {
+            var input = new[]
+            {
+                StreamEvent.CreateStart(1, 10),
+                StreamEvent.CreateStart(3, 20),
+                StreamEvent.CreateStart(6, 10),
+                StreamEvent.CreateStart(8, 30),
+                StreamEvent.CreateStart(12, 20),
+                StreamEvent.CreateStart(18, 10),
+                EndEvent
+            };
+
+            var output = input.ToStreamable().HoppingWindowLifetime(10, 5).Max();
+
+            var correctValues = new[] { 20, 30, 30, 20, 10 };
+            var correctEvents = new List<StreamEvent<int>>();
+            for (int i = 0; i < correctValues.Length; i++)
+            {
+                correctEvents.Add(StreamEvent.CreateStart(5 * (i + 1), correctValues[i]));
+                correctEvents.Add(StreamEvent.CreateEnd(5 * (i + 2), 5 * (i + 1), correctValues[i]));
+            }
+            correctEvents.Add(EndEvent);
+
+            CollectionAssert.AreEqual(correctEvents, output.ToStreamEventArray());
+        }
+
+        [TestMethod, TestCategory("Gated")]
+        public void TestHoppingWindowMinAggregateSimple()
+        {
+            var input = new[]
+            {
+                StreamEvent.CreateStart(1, 30),
+                StreamEvent.CreateStart(3, 20),
+                StreamEvent.CreateStart(6, 30),
+                StreamEvent.CreateStart(8, 10),
+                StreamEvent.CreateStart(12, 20),
+                StreamEvent.CreateStart(18, 30),
+                EndEvent
+            };
+
+            var output = input.ToStreamable().HoppingWindowLifetime(10, 5).Min();
+
+            var correctValues = new[] { 20, 10, 10, 20, 30 };
+            var correctEvents = new List<StreamEvent<int>>();
+            for (int i = 0; i < correctValues.Length; i++)
+            {
+                correctEvents.Add(StreamEvent.CreateStart(5 * (i + 1), correctValues[i]));
+                correctEvents.Add(StreamEvent.CreateEnd(5 * (i + 2), 5 * (i + 1), correctValues[i]));
+            }
+            correctEvents.Add(EndEvent);
+
+            CollectionAssert.AreEqual(correctEvents, output.ToStreamEventArray());
+        }
+
+        [TestMethod, TestCategory("Gated")]
+        public void TestHoppingWindowMinMaxAggregateRandomDistribution()
+        {
+            // Distribution: [1,2,3,4,5] hops : 10% each, Closely-Spaced: 50%
+            GenerateDataAndTestInput(
+                numValues: 1000,
+                valueGenerator: v => random.Next(100, 110),
+                distanceGenerator: () =>
+                {
+                    var hopType = random.Next(1, 11);
+                    return (hopType < 6) ? (hopType * HopSize) : (hopType - 5);
+                });
+        }
+
+        [TestMethod, TestCategory("Gated")]
+        public void TestHoppingWindowMinMaxAggregateUnaligned()
+        {
+            GenerateDataAndTestInput(
+                numValues: 1000,
+                valueGenerator: v => random.Next(100, 110),
+                distanceGenerator: () => 10,
+                windowSize: 27);
+
+            GenerateDataAndTestInput(
+                numValues: 1000,
+                valueGenerator: v => random.Next(100, 110),
+                distanceGenerator: () => 10,
+                windowSize: 17);
+        }
+
+        [TestMethod, TestCategory("Gated")]
+        public void TestHoppingWindowMinMaxAggregateAllIncreasing()
+        {
+            GenerateDataAndTestInput(
+                numValues: 1000,
+                valueGenerator: v => v + random.Next(1, 11),
+                distanceGenerator: () => HopSize);
+        }
+
+        [TestMethod, TestCategory("Gated")]
+        public void TestHoppingWindowMinMaxAggregateAllDecreasing()
+        {
+            GenerateDataAndTestInput(
+                numValues: 1000,
+                valueGenerator: v => (v == 0) ? 10000 : v - random.Next(1, 11),
+                distanceGenerator: () => HopSize);
+        }
+
+        [TestMethod, TestCategory("Gated")]
+        public void TestHoppingWindowMinMaxAggregateCyclingValues()
+        {
+            // Test values 1 -> 2 -> 3 -> 1, and run for 1/2, 1, 2, 4, 8 intervals of hops
+            for (long distance = HopSize / 2; distance <= HopSize * 8; distance *= 2)
+            {
+                GenerateDataAndTestInput(
+                    numValues: 10,
+                    valueGenerator: v => (v % 3) + 1,
+                    distanceGenerator: () => distance);
+            }
+        }
+
+        private void GenerateDataAndTestInput(
+            int numValues,
+            Func<int, int> valueGenerator,
+            Func<long> distanceGenerator,
+            long windowSize = WindowSize)
+        {
+            var input = new List<StreamEvent<int>>();
+            long maxStartTime = 0;
+
+            long startTime = 0;
+            int value = 100;
+            for (int i = 0; i < numValues; i++)
+            {
+                startTime += distanceGenerator();
+                value = valueGenerator(value);
+                input.Add(StreamEvent.CreateStart(startTime, value));
+                maxStartTime = Math.Max(maxStartTime, startTime);
+            }
+            input.Add(EndEvent);
+
+            TestHoppingWindowMaxAggregateInternal(input, maxStartTime, windowSize);
+
+            TestHoppingWindowMinAggregateInternal(input, maxStartTime, windowSize);
+        }
+
+        private void TestHoppingWindowMinAggregateInternal(IEnumerable<StreamEvent<int>> streamEvents, long maxStartTime, long windowSize)
+        {
+            TestHoppingWindowMinMaxAggregateInternal(streamEvents, isMax: true, maxStartTime, windowSize);
+        }
+
+        private void TestHoppingWindowMaxAggregateInternal(IEnumerable<StreamEvent<int>> streamEvents, long maxStartTime, long windowSize)
+        {
+            TestHoppingWindowMinMaxAggregateInternal(streamEvents, isMax: false, maxStartTime, windowSize);
+        }
+
+        private void TestHoppingWindowMinMaxAggregateInternal(IEnumerable<StreamEvent<int>> streamEvents, bool isMax, long maxStartTime, long windowSize)
+        {
+            var output = streamEvents.ToStreamable().HoppingWindowLifetime(windowSize, HopSize).Max();
+
+            var correct = new List<StreamEvent<int>>();
+
+            maxStartTime += windowSize;
+            for (long startTime = 0; startTime < maxStartTime; startTime += HopSize)
+            {
+                var eventsInWindow = streamEvents.Where(x => x.StartTime > (startTime - windowSize) && x.StartTime <= startTime);
+
+                if (!eventsInWindow.Any())
+                    continue;
+
+                var fun = isMax ? new Func<int, int, bool>((x, y) => x > y) : ((x, y) => x < y);
+                var max = eventsInWindow.Aggregate((state, input) => state.Payload > input.Payload ? state : input);
+
+                correct.Add(StreamEvent.CreateStart(startTime, max.Payload));
+                correct.Add(StreamEvent.CreateEnd(startTime + HopSize, startTime, max.Payload));
+            }
+
+            var actual = NormalizeToInterval(output.ToStreamEventArray<int>());
+
+            Assert.IsTrue(Enumerable.SequenceEqual(NormalizeToInterval(correct), actual));
+        }
+
+        private IEnumerable<StreamEvent<int>> NormalizeToInterval(IEnumerable<StreamEvent<int>> streamEvents)
+        {
+            var result = new List<StreamEvent<int>>();
+
+            var endEvents = streamEvents.Where(se => se.Kind == StreamEventKind.End || se.Kind == StreamEventKind.Interval);
+
+            if (!endEvents.Any())
+                return result;
+
+            var firstEvent = endEvents.First();
+            StreamEvent<int> curInterval = StreamEvent.CreateInterval(firstEvent.StartTime, firstEvent.EndTime, firstEvent.Payload);
+
+            foreach (var streamEvent in endEvents.Skip(1))
+            {
+                if ((streamEvent.StartTime == curInterval.EndTime || streamEvent.Kind == StreamEventKind.Interval) && // Merge into current interval if payload is same
+                    streamEvent.Payload == curInterval.Payload)
+                {
+                    curInterval.OtherTime = streamEvent.EndTime;
+                }
+                else
+                {
+                    result.Add(curInterval);
+                    curInterval = StreamEvent.CreateInterval(streamEvent.StartTime, streamEvent.EndTime, streamEvent.Payload); ;
+                }
+            }
+            result.Add(curInterval);
+            return result;
+        }
+    }
+}
diff --git a/Sources/Test/SimpleTesting/Program.cs b/Sources/Test/SimpleTesting/Program.cs
index e1b97e1e9..80f4397f2 100644
--- a/Sources/Test/SimpleTesting/Program.cs
+++ b/Sources/Test/SimpleTesting/Program.cs
@@ -49,7 +49,7 @@ public static IStreamable<Empty, TPayload> ToCleanStreamable<TPayload>(this Stre
             return input.OrderBy(v => v.SyncTime).ToArray().ToStreamable();
         }
 
-        public static IStreamable<Empty, TPayload> ToStreamable<TPayload>(this StreamEvent<TPayload>[] input)
+        public static IStreamable<Empty, TPayload> ToStreamable<TPayload>(this IEnumerable<StreamEvent<TPayload>> input)
         {
             Invariant.IsNotNull(input, "input");
 
diff --git a/Sources/Test/SimpleTesting/SimpleTesting.csproj b/Sources/Test/SimpleTesting/SimpleTesting.csproj
index a024e50d4..55fa71817 100644
--- a/Sources/Test/SimpleTesting/SimpleTesting.csproj
+++ b/Sources/Test/SimpleTesting/SimpleTesting.csproj
@@ -63,6 +63,7 @@
     <Compile Include="Aggregates\CompoundAggregate.cs" />
     <Compile Include="Aggregates\CountAggregate.cs" />
     <Compile Include="Aggregates\GroupAggregate.cs" />
+    <Compile Include="Aggregates\HoppingWindowMinMaxAggregate.cs" />
     <Compile Include="Aggregates\MaxAggregate.cs" />
     <Compile Include="Aggregates\MinAggregate.cs" />
     <Compile Include="Aggregates\PercentileAggregates.cs" />