Skip to content

Commit

Permalink
GEOMESA-3156 Fix Kafka event time expiry (locationtech#2819)
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz authored Dec 1, 2021
1 parent e1846f5 commit fa8b31c
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@

package org.locationtech.geomesa.kafka.index

import java.io.Closeable
import java.util.Date
import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, ScheduledThreadPoolExecutor, TimeUnit}

import com.github.benmanes.caffeine.cache.Ticker
import com.typesafe.scalalogging.LazyLogging
import org.locationtech.geomesa.filter.factory.FastFilterFactory
Expand All @@ -25,6 +21,9 @@ import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.opengis.filter.Filter
import org.opengis.filter.expression.Expression

import java.io.Closeable
import java.util.Date
import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, ScheduledThreadPoolExecutor, TimeUnit}
import scala.util.control.NonFatal

/**
Expand Down Expand Up @@ -54,7 +53,7 @@ object FeatureStateFactory extends LazyLogging {
// remove tasks when canceled, otherwise they will only be removed from the task queue
// when they would be executed. we expect frequent cancellations due to feature updates
es.setRemoveOnCancelPolicy(true)
(es, Ticker.systemTicker())
(es, CurrentTimeTicker)
}

expiry match {
Expand Down Expand Up @@ -315,4 +314,8 @@ object FeatureStateFactory extends LazyLogging {

override def close(): Unit = CloseWithLogging(delegates.map(_._2))
}

object CurrentTimeTicker extends Ticker {
override def read(): Long = System.currentTimeMillis() * 1000000L
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,21 @@ class EventTimeFeatureCacheTest extends Specification with Mockito {
}
}

"expire by event time with ordering (no mocking)" in {
val ev = EventTimeConfig(Duration("100ms"), "dtg", ordered = true)
val config = IndexConfig(ev, res, Seq.empty, Seq.empty, lazyDeserialization = true, None)

WithClose(KafkaFeatureCache(sft, config)) { cache =>
val sf1 = ScalaSimpleFeature.create(sft, "1", "first", new Date(), "POINT (-78.0 35.0)")
cache.put(sf1)
cache.query("1") must beSome(sf1.asInstanceOf[SimpleFeature])
cache.query(ECQL.toFilter("bbox(geom,-79.0,34.0,-77.0,36.0)")).toSeq mustEqual Seq(sf1)

eventually(cache.query("1") must beNone)
cache.query(ECQL.toFilter("bbox(geom,-79.0,34.0,-77.0,36.0)")).toSeq must beEmpty
}
}

"expire by event time without ordering" in {
val ex = mock[ScheduledExecutorService]
val ticker = new MockTicker()
Expand Down

0 comments on commit fa8b31c

Please sign in to comment.