Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug with time #85

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ lazy val commonSettings = Seq(
"-Yrangepos",
),
// don't release subprojects
javaOptions in test += "-Djdk.attach.allowAttachSelf",
githubRelease := null,
skip in publish := true,
maxErrors := 5,
Expand Down
138 changes: 107 additions & 31 deletions core/src/main/scala/ru/itclover/tsp/core/AndThenPattern.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
package ru.itclover.tsp.core
import cats.syntax.flatMap._
import cats.syntax.foldable._
import cats.syntax.functor._
import cats.{Foldable, Functor, Monad}
import ru.itclover.tsp.core.Pattern.{Idx, QI}
import cats.{Apply, Foldable, Functor, Monad}
import ru.itclover.tsp.core.AndThenPattern.TimeMap
import ru.itclover.tsp.core.Pattern.IdxExtractor._
import ru.itclover.tsp.core.Pattern.{Idx, IdxExtractor, QI}
import ru.itclover.tsp.core.aggregators.{TimerPattern, WaitPattern}
import ru.itclover.tsp.core.io.TimeExtractor
import ru.itclover.tsp.core.io.TimeExtractor._

import scala.annotation.tailrec
import scala.collection.{mutable => m}
import scala.language.higherKinds

/** AndThen */
//We lose T1 and T2 in output for performance reason only. If needed outputs of first and second stages can be returned as well
case class AndThenPattern[Event, T1, T2, S1, S2](first: Pattern[Event, S1, T1], second: Pattern[Event, S2, T2])
extends Pattern[Event, AndThenPState[T1, T2, S1, S2], (Idx, Idx)] {
case class AndThenPattern[Event: IdxExtractor: TimeExtractor, T1, T2, S1, S2](
first: Pattern[Event, S1, T1],
second: Pattern[Event, S2, T2]
) extends Pattern[Event, AndThenPState[T1, T2, S1, S2], (Idx, Idx)] {

def apply[F[_]: Monad, Cont[_]: Foldable: Functor](
oldState: AndThenPState[T1, T2, S1, S2],
Expand All @@ -21,82 +30,149 @@ case class AndThenPattern[Event, T1, T2, S1, S2](first: Pattern[Event, S1, T1],
val firstF = first.apply[F, Cont](oldState.firstState, oldState.firstQueue, event)
val secondF = second.apply[F, Cont](oldState.secondState, oldState.secondQueue, event)

// populate TimeMap with idx->time tuples for new events
val idxTimeMapWithNewEvents =
event.foldLeft(oldState.indexTimeMap) { case (a, b) => a += (b.index -> b.time) }

for (newFirstOutput <- firstF;
newSecondOutput <- secondF)
yield {
// process queues
val (updatedFirstQueue, updatedSecondQueue, finalQueue) =
process(newFirstOutput._2, newSecondOutput._2, oldQueue)
val (updatedFirstQueue, updatedSecondQueue, finalQueue, updatedTimeMap) =
process(newFirstOutput._2, newSecondOutput._2, oldQueue, idxTimeMapWithNewEvents)

AndThenPState(
newFirstOutput._1,
updatedFirstQueue,
newSecondOutput._1,
updatedSecondQueue
updatedSecondQueue,
updatedTimeMap
) -> finalQueue
}
}

override def initialState(): AndThenPState[T1, T2, S1, S2] =
AndThenPState(first.initialState(), PQueue.empty, second.initialState(), PQueue.empty)
AndThenPState(first.initialState(), PQueue.empty, second.initialState(), PQueue.empty, m.TreeMap.empty)

private def process(
firstQ: QI[T1],
secondQ: QI[T2],
totalQ: QI[(Idx, Idx)],
timeMapQ: TimeMap
): (QI[T1], QI[T2], QI[(Idx, Idx)], TimeMap) = {
import Time._
import cats.instances.option._
import Ordered._

private def process(firstQ: QI[T1], secondQ: QI[T2], totalQ: QI[(Idx, Idx)]): (QI[T1], QI[T2], QI[(Idx, Idx)]) = {
// This function cleans up timeMap unwinding all elements for idx < Math.min(idx1, idx2)
// dropWhile very expensive as it copies whole TreeMap every time.
def cleanTimeMap(timeMap: TimeMap, idx1: Idx, idx2: Idx): TimeMap =
timeMap.rangeImpl(Some(Math.min(idx1 - 1, idx2 - 1)), None)

@tailrec
def inner(first: QI[T1], second: QI[T2], total: QI[(Idx, Idx)]): (QI[T1], QI[T2], QI[(Idx, Idx)]) = {
def inner(
first: QI[T1],
second: QI[T2],
total: QI[(Idx, Idx)],
timeMap: TimeMap
): (QI[T1], QI[T2], QI[(Idx, Idx)], TimeMap) = {

def default: (QI[T1], QI[T2], QI[(Idx, Idx)]) = (first, second, total)
def default: (QI[T1], QI[T2], QI[(Idx, Idx)], TimeMap) = (first, second, total, timeMap)

(first.headOption, second.headOption) match {
case (None, _) => default
case (_, None) => default
case (None, _) => default
case (_, None) => default
case (Some(IdxValue(start1, end1, value1)), Some(IdxValue(start2, end2, value2))) =>
// actually the second result starts after some offset.
val firstStart = timeMap(start1) + offset
// we add +1 to the end of the first result to handle queries like 'a = 0 andThen a = 1',
// there actually intervals of 1st and 2nd successful parts don't intersect, but conceptually this query
// should return true on margins, where 'a' changes from 0 to 1.
val firstEnd = if (end1 >= timeMap.last._1) timeMap.last._2 else timeMap(end1 + 1) + offset

val secondStart = timeMap(start2)
val secondEnd = timeMap(end2)

if (value1.isFail) {
val rewindSecondTo = QueueUtils.find(timeMap, firstStart).getOrElse(timeMap.last._1)
val endOfResult = Math.max(QueueUtils.find(timeMap, firstEnd).getOrElse(timeMap.last._1) - 1, start1)
inner(
first.behead(),
PQueue.unwindWhile(second)(_.end <= start1),
total.enqueue(IdxValue(start1, end1, Result.fail))
second.rewindTo(rewindSecondTo),
total.enqueue(IdxValue(start1, endOfResult, Fail)),
cleanTimeMap(timeMap, start1, start2)
)
} else if (value2.isFail) {
// Do not return Fail for the first part yet, unless it is the end of the queue
first.size match {
case 1 => inner(first.rewindTo(end2 + 1), second.behead(), total.enqueue(IdxValue(start1, end2, Fail)))
case _ => inner(first, second.behead(), total)}
inner(
first.rewindTo(QueueUtils.find(timeMap, secondEnd - offset).getOrElse(timeMap.head._1)),
second.behead(),
total.enqueue(IdxValue(start1, end2, Fail)),
cleanTimeMap(timeMap, start1, start2)
)
} else { // at this moment both first and second results are not Fail.

// late event from second, just skip it and fail this part only.
// first |-------|
// second |------|
if (start1 > end2) {
inner(first, second.behead(), total.enqueue(IdxValue(start2, end2, Fail)))
if (firstStart > secondEnd) {
inner(
first,
second.behead(),
total.enqueue(IdxValue(start2, end2, Fail)),
timeMap.rangeImpl(Some(end2), None)
)
}
// Gap between first and second. Just behead first and fail this part only.
// first |-------|
// second |------|
else if (end1 + 1 < start2) {
inner(first.behead(), second, total.enqueue(IdxValue(start1, end1, Fail)))
else if (firstEnd < secondStart) {
inner(
first.behead(),
second,
total.enqueue(IdxValue(start1, end1, Fail)),
timeMap.rangeImpl(Some(end1), None)
)
}
// First and second intersect
// first |-------|
// second |-------|
// result |------------| (take union, not intersection)
// result |--| (take intersection)
else {
val end = Math.max(end1 + 1, end2)
val start = Math.min(start1, start2)
val newResult = IdxValue(start, end, Succ((start, end))) // todo nobody uses the output of AndThen pattern. Let's drop it later.
inner(first.rewindTo(end + 1), second.rewindTo(end + 1), total.enqueue(newResult))
val start = Math.max(QueueUtils.find(timeMap, firstStart).getOrElse(timeMap.head._1), start2)
val end = Math.min(QueueUtils.find(timeMap, firstEnd).getOrElse(timeMap.last._1), end2)
// todo nobody uses the output of AndThen pattern. Let's drop it later.
val newResult = IdxValue(start, end, Succ((start, end)))
inner(
//todo should it be changed to QueueUtils.find(timeMap, end - offset).getOrElse(timeMap.head._1)) ???
first.rewindTo(end + 1),
second.rewindTo(end + 1),
total.enqueue(newResult),
timeMap.rangeImpl(Some(end), None)
)
}
}
}

}

inner(firstQ, secondQ, totalQ)
// We use TreeMap.rangeImpl everywhere inside @inner function, which creates new view of the same RB-tree.
// To avoid memory leaks we need to clone all remaining elements from timeMap to the new one calling .clone method.
val toReturn = inner(firstQ, secondQ, totalQ, timeMapQ)
toReturn.copy(_4 = toReturn._4.clone())
}

private val offset: Window = PatternTag.computeOffset(second)

override val patternTag: PatternTag = AndThenPatternTag
}

case class AndThenPState[T1, T2, State1, State2](
firstState: State1,
firstQueue: PQueue[T1],
secondState: State2,
secondQueue: PQueue[T2]
secondQueue: PQueue[T2],
indexTimeMap: TimeMap
)

object AndThenPattern {
type TimeMap = m.TreeMap[Idx, Time]
}
2 changes: 2 additions & 0 deletions core/src/main/scala/ru/itclover/tsp/core/CouplePattern.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ case class CouplePattern[Event: IdxExtractor, State1, State2, T1, T2, T3](

override def initialState(): CouplePState[State1, State2, T1, T2] =
CouplePState(left.initialState(), PQueue.empty, right.initialState(), PQueue.empty)

override val patternTag: PatternTag = CouplePatternTag
}

case class CouplePState[State1, State2, T1, T2](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ class ExtractingPattern[Event: IdxExtractor, EKey, EItem, T, S](key: EKey)(
Result.succ(r)
}) {
override def toString: String = s"ExtractingPattern($key)"

override val patternTag: PatternTag = ExtractingTag
}
2 changes: 2 additions & 0 deletions core/src/main/scala/ru/itclover/tsp/core/MapPattern.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ case class MapPattern[Event, T1, T2, InnerState](inner: Pattern[Event, InnerStat
}

override def initialState(): InnerState = inner.initialState()

override val patternTag: PatternTag = MapPatternTag
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package ru.itclover.tsp.mappers
package ru.itclover.tsp.core

import cats.syntax.foldable._
import cats.syntax.functor._
import cats.{Foldable, Functor, Monad}
import ru.itclover.tsp.core.PQueue.MapPQueue
import ru.itclover.tsp.core.Result._
import ru.itclover.tsp.core.{PQueue, Pattern, Result}

import scala.language.higherKinds

Expand Down Expand Up @@ -39,4 +38,6 @@ case class MapWithContextPattern[Event, InnerState, T1, T2](inner: Pattern[Event
}

override def initialState(): InnerState = inner.initialState()

override val patternTag: PatternTag = MapWithContextPatternTag
}
9 changes: 6 additions & 3 deletions core/src/main/scala/ru/itclover/tsp/core/Pattern.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ object Pat {

def unapply[E, _, T](arg: Pat[E, T]): Option[Pattern[E, _, T]] = arg match {
case x: Pattern[E, _, T] => Some(x)
case _ =>
throw new Exception(s"$arg is not a patterne")
case _ =>
throw new Exception(s"$arg is not a pattern")
}
}

Expand Down Expand Up @@ -42,6 +42,9 @@ trait Pattern[Event, S, T] extends Pat[Event, T] with Serializable {
queue: PQueue[T],
events: Cont[Event]
): F[(S, PQueue[T])]

/** Assigned tag. Can be used to semi-compile-time validation pattern matching fullness */
val patternTag: PatternTag
}

case class IdxValue[+T](start: Idx, end: Idx, value: Result[T]) {
Expand All @@ -57,7 +60,7 @@ object Pattern {

type QI[T] = PQueue[T]

trait IdxExtractor[Event] extends Serializable with Order[Idx] {
trait IdxExtractor[-Event] extends Serializable with Order[Idx] {
def apply(e: Event): Idx

def comap[A](f: A => Event): IdxExtractor[A] = IdxExtractor.of(f.andThen(apply))
Expand Down
99 changes: 99 additions & 0 deletions core/src/main/scala/ru/itclover/tsp/core/PatternTag.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package ru.itclover.tsp.core

import ru.itclover.tsp.core.aggregators._

sealed trait PatternTag extends Serializable

sealed trait WithoutInnerPatternTag extends PatternTag
case object ExtractingTag extends WithoutInnerPatternTag
case object SimplePatternTag extends WithoutInnerPatternTag
case object ConstPatternTag extends WithoutInnerPatternTag

sealed trait WithInnersPatternsTag extends PatternTag {

@SuppressWarnings(Array("org.wartremover.warts.AsInstanceOf"))
def getInnerPatterns[E](pattern: Pattern[E, _, _]): Seq[Pattern[E, _, _]] = {
assume(pattern.patternTag eq this)
this match {
case ReducePatternTag => pattern.asInstanceOf[ReducePattern[E, _, _, _]].patterns
case tag: WithOneInnerPatternTag => Seq(tag.getInnerPattern(pattern))
case tag: WithTwoInnersPatternTag =>
val (x, y) = tag.getTwoInnerPatterns(pattern)
Seq(x, y)
}
}
}

case object ReducePatternTag extends WithInnersPatternsTag

sealed trait WithOneInnerPatternTag extends WithInnersPatternsTag {

@SuppressWarnings(Array("org.wartremover.warts.AsInstanceOf"))
def getInnerPattern[E](pattern: Pattern[E, _, _]): Pattern[E, _, _] = {
assume(pattern.patternTag eq this)
this match {
case GroupPatternTag => pattern.asInstanceOf[GroupPattern[E, _, _]].inner
case PreviousValueTag => pattern.asInstanceOf[PreviousValue[E, _, _]].inner
case TimerPatternTag => pattern.asInstanceOf[TimerPattern[E, _, _]].inner
case TimestampAdderPatternTag => pattern.asInstanceOf[TimestampsAdderPattern[E, _, _]].inner
case WaitPatternTag => pattern.asInstanceOf[WaitPattern[E, _, _]].inner
case WindowStatisticPatternTag => pattern.asInstanceOf[WindowStatistic[E, _, _]].inner
case MapPatternTag => pattern.asInstanceOf[MapPattern[E, _, _, _]].inner
case MapWithContextPatternTag => pattern.asInstanceOf[MapWithContextPattern[E, _, _, _]].inner
case SegmentizerPatternTag => pattern.asInstanceOf[SegmentizerPattern[E, _, _]].inner
}
}
}

case object GroupPatternTag extends WithOneInnerPatternTag
case object PreviousValueTag extends WithOneInnerPatternTag
case object TimerPatternTag extends WithOneInnerPatternTag
case object TimestampAdderPatternTag extends WithOneInnerPatternTag
case object WaitPatternTag extends WithOneInnerPatternTag
case object WindowStatisticPatternTag extends WithOneInnerPatternTag
case object MapPatternTag extends WithOneInnerPatternTag
case object MapWithContextPatternTag extends WithOneInnerPatternTag
case object SegmentizerPatternTag extends WithOneInnerPatternTag

sealed trait WithTwoInnersPatternTag extends WithInnersPatternsTag {

@SuppressWarnings(Array("org.wartremover.warts.AsInstanceOf"))
def getTwoInnerPatterns[E](pattern: Pattern[E, _, _]): (Pattern[E, _, _], Pattern[E, _, _]) = {
assume(pattern.patternTag eq this)
this match {
case AndThenPatternTag =>
val t = pattern.asInstanceOf[AndThenPattern[E, _, _, _, _]]
(t.first, t.second)
case CouplePatternTag =>
val t = pattern.asInstanceOf[CouplePattern[E, _, _, _, _, _]]
(t.left, t.right)
}
}

}

case object AndThenPatternTag extends WithTwoInnersPatternTag
case object CouplePatternTag extends WithTwoInnersPatternTag


object PatternTag {

@SuppressWarnings(Array("org.wartremover.warts.AsInstanceOf"))
def computeOffset(pattern: Pattern[_, _, _]): Window = {
pattern.patternTag match {
// handle special cases
case AndThenPatternTag =>
val (first, second) = AndThenPatternTag.getTwoInnerPatterns(pattern)
Window(computeOffset(first).toMillis + computeOffset(second).toMillis)
case TimerPatternTag => pattern.asInstanceOf[TimerPattern[_, _, _]].window
case WaitPatternTag => Window(0) // pattern.asInstanceOf[WaitPattern[_, _, _]].window

// rest patterns are below
case _: WithoutInnerPatternTag => Window(0L)
case tag: WithInnersPatternsTag =>
//take the biggest offset for patterns with process patterns
Window(tag.getInnerPatterns(pattern).map((computeOffset _).andThen(_.toMillis)).fold(0L)(Math.max))
}

}
}
Loading