Skip to content

Commit

Permalink
wip: implement event-sourced (persistent) actor
Browse files Browse the repository at this point in the history
  • Loading branch information
Denys Fakhritdinov committed Nov 21, 2023
1 parent a3140b3 commit b7ae06e
Show file tree
Hide file tree
Showing 33 changed files with 903 additions and 552 deletions.
10 changes: 9 additions & 1 deletion actor/src/main/scala/com/evolutiongaming/akkaeffect/Act.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,15 @@ private[akkaeffect] object Act {
}
}

def receive(receive: Actor.Receive) = {
/**
* set thread local [[threadLocal]] to [[self]] and
* if message is of type [[Msg]] - apply internal function,
* otherwise delegate receive to [[receive]]
*
* @param receive [[Actor.Receive]] partial function
* @return [[Actor.Receive]] partial function: Any => Unit
*/
def receive(receive: Actor.Receive): Actor.Receive = {
val receiveMsg: Actor.Receive = { case Msg(f) => f() }
syncReceive(receiveMsg orElse receive)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ object ActorOf {

type State = Receive[F, Envelope[Any], Stop]

def onPreStart(actorCtx: ActorCtx[F])(implicit fail: Fail[F]) = {
def onPreStart(actorCtx: ActorCtx[F])(implicit fail: Fail[F]): Resource[F, Receive[F, Envelope[Any], Stop]] = {
receiveOf(actorCtx)
.handleErrorWith { (error: Throwable) =>
s"failed to allocate receive".fail[F, State](error).toResource
}
}

def onReceive(a: Any, sender: ActorRef)(implicit fail: Fail[F]) = {
// a - message
def onReceive(a: Any, sender: ActorRef)(implicit fail: Fail[F]): State => F[Directive[Releasable[F, State]]] = {
state: State =>
val stop = a match {
case ReceiveTimeout => state.timeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ private[akkaeffect] object ActorVar {

new ActorVar[F, A] {

// A - actor' state

def preStart(resource: Resource[F, A]) = {
update { _ =>
resource
Expand All @@ -81,7 +83,7 @@ private[akkaeffect] object ActorVar {
}
}

def receive(f: A => F[Directive[Releasable[F, A]]]) = {
def receive(f: A => F[Directive[Releasable[F, A]]]): Unit = {
update {
case Some(state) =>
f(state.value).flatMap {
Expand Down
28 changes: 20 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ lazy val root = (project in file(".")
`actor-tests`,
testkit,
persistence,
`persistence-api`,
eventsourcing,
`eventsourcing-persistence`,
cluster,
`cluster-sharding`))

Expand Down Expand Up @@ -65,20 +65,32 @@ lazy val testkit = (project in file("testkit")
Akka.testkit % Test,
scalatest % Test)))

lazy val `eventsourcing-persistence` = (project in file("eventsourcing-persistence")
settings (name := "akka-effect-eventsourcing-persistence")
lazy val `persistence-api` = (project in file("persistence-api")
settings (name := "akka-effect-persistence-api")
settings commonSettings
dependsOn(
actor % "test->test;compile->compile",
testkit % "test->test;test->compile",
`actor-tests` % "test->test")
settings (
libraryDependencies ++= Seq(sstream)))
libraryDependencies ++= Seq(
Cats.core,
CatsEffect.effect,
`cats-helper`,
sstream,
Akka.persistence, // temporal dependency
Akka.slf4j % Test,
Akka.testkit % Test,
scalatest % Test)))

lazy val persistence = (project in file("persistence")
settings (name := "akka-effect-persistence")
settings commonSettings
dependsOn(
`eventsourcing-persistence` % "test->test;compile->compile",
actor % "test->test;compile->compile",
testkit % "test->test;test->compile",
`actor-tests` % "test->test")
`persistence-api` % "test->test;compile->compile",
actor % "test->test;compile->compile",
testkit % "test->test;test->compile",
`actor-tests` % "test->test")
settings (
libraryDependencies ++= Seq(
Akka.actor,
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,12 @@ object JournalKeeper {

def delete(criteria: SnapshotSelectionCriteria) = {

delete(Snapshotter.Criteria(criteria))

}

def delete(criteria: Snapshotter.Criteria): F[F[Unit]] = {

def selected(meta: SnapshotMetadata) = {
meta.seqNr <= criteria.maxSequenceNr && meta.timestamp.toEpochMilli <= criteria.maxSequenceNr
}
Expand All @@ -346,6 +352,7 @@ object JournalKeeper {
.map { _.joinWithNever }
}
}

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,13 @@ object JournalKeeperTest {
.add(Action.DeleteSnapshots(criteria))
.map { _.pure[F] }
}

def delete(criteria: Snapshotter.Criteria): F[F[Unit]] = {
actions
.add(Action.DeleteSnapshots(criteria.asAkka))
.map { _.pure[F] }
}

}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.evolutiongaming.akkaeffect.persistence

import cats.implicits._
import cats.{Applicative, FlatMap, Monad, ~>}
import com.evolutiongaming.akkaeffect.Fail
import com.evolutiongaming.catshelper.{Log, MeasureDuration, MonadThrowable}

trait Append[F[_], -A] {

/**
* @param events to be saved, inner Nel[A] will be persisted atomically, outer Nel[_] is for batching
* @return SeqNr of last event
*/
def apply(events: Events[A]): F[F[SeqNr]]
}

object Append {

def const[F[_], A](seqNr: F[F[SeqNr]]): Append[F, A] = {
class Const
new Const with Append[F, A] {
def apply(events: Events[A]) = seqNr
}
}

def empty[F[_]: Applicative, A]: Append[F, A] =
const(SeqNr.Min.pure[F].pure[F])

implicit class AppendOps[F[_], A](val self: Append[F, A]) extends AnyVal {

def mapK[G[_]: Applicative](f: F ~> G): Append[G, A] = { events =>
f(self(events)).map { a =>
f(a)
}
}

def convert[B](f: B => F[A])(implicit F: Monad[F]): Append[F, B] = {
events =>
{
for {
events <- events.traverse(f)
seqNr <- self(events)
} yield seqNr
}
}

def narrow[B <: A]: Append[F, B] = events => self(events)

def withLogging1(log: Log[F])(
implicit
F: FlatMap[F],
measureDuration: MeasureDuration[F]
): Append[F, A] = events => {
for {
d <- MeasureDuration[F].start
r <- self(events)
} yield
for {
r <- r
d <- d
_ <- log.debug(s"append ${events.size} events in ${d.toMillis}ms")
} yield r
}

def withFail(fail: Fail[F])(implicit F: MonadThrowable[F]): Append[F, A] = {
events =>
fail.adapt(s"failed to append $events") { self(events) }
}
}

}
Original file line number Diff line number Diff line change
@@ -1,17 +1,10 @@
package com.evolutiongaming.akkaeffect.persistence

import akka.persistence.DeleteEventsToInterop
import cats.effect.{Resource, Sync}
import cats.syntax.all._
import cats.{Applicative, FlatMap, ~>}
import com.evolutiongaming.akkaeffect.Fail
import com.evolutiongaming.catshelper.{FromFuture, Log, MeasureDuration, MonadThrowable}
import com.evolutiongaming.catshelper.{Log, MeasureDuration, MonadThrowable}

import scala.concurrent.duration.FiniteDuration

/**
* @see [[akka.persistence.Eventsourced.deleteMessages]]
*/
trait DeleteEventsTo[F[_]] {

/**
Expand All @@ -31,32 +24,25 @@ object DeleteEventsTo {
}
}


private[akkaeffect] def of[F[_]: Sync: FromFuture, A](
persistentActor: akka.persistence.PersistentActor,
timeout: FiniteDuration
): Resource[F, DeleteEventsTo[F]] = {
DeleteEventsToInterop(persistentActor, timeout)
}


private sealed abstract class WithLogging

private sealed abstract class WithFail

private sealed abstract class MapK


implicit class DeleteEventsToOps[F[_]](val self: DeleteEventsTo[F]) extends AnyVal {
implicit class DeleteEventsToOps[F[_]](val self: DeleteEventsTo[F])
extends AnyVal {

def mapK[G[_]: Applicative](f: F ~> G): DeleteEventsTo[G] = {
new MapK with DeleteEventsTo[G] {
def apply(seqNr: SeqNr) = f(self(seqNr)).map { a => f(a) }
def apply(seqNr: SeqNr) = f(self(seqNr)).map { a =>
f(a)
}
}
}

def withLogging1(
log: Log[F])(implicit
def withLogging1(log: Log[F])(
implicit
F: FlatMap[F],
measureDuration: MeasureDuration[F]
): DeleteEventsTo[F] = {
Expand All @@ -65,19 +51,18 @@ object DeleteEventsTo {
for {
d <- MeasureDuration[F].start
r <- self(seqNr)
} yield for {
r <- r
d <- d
_ <- log.info(s"delete events to $seqNr in ${ d.toMillis }ms")
} yield r
} yield
for {
r <- r
d <- d
_ <- log.info(s"delete events to $seqNr in ${d.toMillis}ms")
} yield r
}
}
}

def withFail(
fail: Fail[F])(implicit
F: MonadThrowable[F]
): DeleteEventsTo[F] = {
def withFail(fail: Fail[F])(implicit
F: MonadThrowable[F]): DeleteEventsTo[F] = {

new WithFail with DeleteEventsTo[F] {
def apply(seqNr: SeqNr) = {
Expand All @@ -88,4 +73,5 @@ object DeleteEventsTo {
}
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.evolutiongaming.akkaeffect.persistence

trait Event[E] {

def event: E
def seqNr: SeqNr

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ final case class EventSourcedId(value: String) {

object EventSourcedId {

implicit val orderEventSourcedId: Order[EventSourcedId] = Order.by { a: EventSourcedId => a.value }
implicit val orderEventSourcedId: Order[EventSourcedId] = Order.by {
a: EventSourcedId =>
a.value
}

implicit val showEventSourcedId: Show[EventSourcedId] = Show.fromToString
}
}
Loading

0 comments on commit b7ae06e

Please sign in to comment.