Skip to content

Commit

Permalink
Bugfix: add metadata together with process initilization (#1818)
Browse files Browse the repository at this point in the history
* support adding metadata at the same time the CreateProcess/Initialize message is being processed instead of adding in another message processing step.

* chore: fix returning wrong failure reason when an interaction execution error happens

* chore: fix formatting and typo

* temporarily support old style of adding metadata (logic to be removed in the next release)
  • Loading branch information
bekiroguz authored Jan 9, 2025
1 parent 86707eb commit 49416a8
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 27 deletions.
1 change: 1 addition & 0 deletions core/akka-runtime/src/main/protobuf/process_index.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ message Index {
message CreateProcess {
optional string recipeId = 1;
optional string recipeInstanceId = 2;
repeated AddRecipeInstanceMetaDataRecord metaData = 3;
}

message ProcessEvent {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,16 +214,7 @@ class AkkaBaker private[runtime](config: AkkaBakerConfig) extends scaladsl.Baker
* @return
*/
override def bake(recipeId: String, recipeInstanceId: String): Future[Unit] = {
processIndexActor.ask(CreateProcess(recipeId, recipeInstanceId))(config.timeouts.defaultBakeTimeout).javaTimeoutToBakerTimeout("bake").flatMap {
case _: Initialized =>
Future.successful(())
case ProcessDeleted =>
Future.failed(ProcessDeletedException(recipeInstanceId))
case ProcessAlreadyExists(_) =>
Future.failed(ProcessAlreadyExistsException(recipeInstanceId))
case RecipeManagerProtocol.NoRecipeFound(_) =>
Future.failed(NoSuchRecipeException(recipeId))
}
bake(recipeId, recipeInstanceId, Map.empty)
}

/**
Expand All @@ -237,7 +228,23 @@ class AkkaBaker private[runtime](config: AkkaBakerConfig) extends scaladsl.Baker
* @return
*/
override def bake(recipeId: String, recipeInstanceId: String, metadata: Map[String, String]): Future[Unit] = {
bake(recipeId, recipeInstanceId).map(_ -> addMetaData(recipeInstanceId, metadata))
val eventualBake = processIndexActor.ask(CreateProcess(recipeId, recipeInstanceId, metadata))(config.timeouts.defaultBakeTimeout).javaTimeoutToBakerTimeout("bake").flatMap {
case _: Initialized =>
Future.successful(())
case ProcessDeleted =>
Future.failed(ProcessDeletedException(recipeInstanceId))
case ProcessAlreadyExists(_) =>
Future.failed(ProcessAlreadyExistsException(recipeInstanceId))
case RecipeManagerProtocol.NoRecipeFound(_) =>
Future.failed(NoSuchRecipeException(recipeId))
}

// TODO This is a temporary backwards compatibility logic to support the old way of adding metadata during rollout, to be removed in the future release
if (metadata.nonEmpty) {
eventualBake.map(_ => addMetaData(recipeInstanceId, metadata))
} else {
eventualBake
}
}

override def fireEventAndResolveWhenReceived(recipeInstanceId: String, event: EventInstance, correlationId: Option[String]): Future[SensoryEventStatus] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import akka.sensors.actor.PersistentActorMetrics
import cats.data.{EitherT, OptionT}
import cats.effect.IO
import cats.instances.future._
import com.ing.baker.il.petrinet.{InteractionTransition, Place, Transition}
import com.ing.baker.il.petrinet.{InteractionTransition, Transition}
import com.ing.baker.il.{CompiledRecipe, EventDescriptor}
import com.ing.baker.petrinet.api._
import com.ing.baker.runtime.akka._
Expand All @@ -23,10 +23,11 @@ import com.ing.baker.runtime.akka.actor.process_index.ProcessIndexProtocol._
import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceProtocol.ExceptionStrategy.{BlockTransition, Continue, RetryWithDelay}
import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceProtocol._
import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceLogger._
import com.ing.baker.runtime.akka.actor.process_instance.{ProcessInstance, ProcessInstanceLogger, ProcessInstanceProtocol, ProcessInstanceRuntime}
import com.ing.baker.runtime.akka.actor.process_instance.{ProcessInstance, ProcessInstanceProtocol, ProcessInstanceRuntime}
import com.ing.baker.runtime.akka.actor.recipe_manager.RecipeManagerProtocol._
import com.ing.baker.runtime.akka.actor.serialization.BakerSerializable
import com.ing.baker.runtime.akka.internal.RecipeRuntime
import com.ing.baker.runtime.common.RecipeInstanceState.RecipeInstanceMetadataName
import com.ing.baker.runtime.common.RecipeRecord
import com.ing.baker.runtime.model.InteractionManager
import com.ing.baker.runtime.recipe_manager.RecipeManager
Expand All @@ -35,8 +36,6 @@ import com.ing.baker.runtime.serialization.Encryption
import com.ing.baker.types.Value
import com.typesafe.config.Config

import java.util.concurrent.TimeUnit
import scala.Console.println
import scala.collection.mutable
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
Expand Down Expand Up @@ -365,7 +364,7 @@ class ProcessIndex(recipeInstanceIdleTimeout: Option[FiniteDuration],
log.logWithMDC(Logging.WarningLevel, s"Received Terminated message for non indexed actor: $actorRef", mdc)
}

case CreateProcess(recipeId, recipeInstanceId) =>
case CreateProcess(recipeId, recipeInstanceId, recipeInstanceMetadata) =>
context.child(recipeInstanceId) match {
case None if !index.contains(recipeInstanceId) =>

Expand All @@ -379,7 +378,14 @@ class ProcessIndex(recipeInstanceIdleTimeout: Option[FiniteDuration],
persistWithSnapshot(ActorCreated(recipeId, recipeInstanceId, createdTime)) { _ =>

// after that we actually create the ProcessInstance actor
val processState = RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], Map.empty[String, String], List.empty)
val processState = RecipeInstanceState(
recipeId = recipeId,
recipeInstanceId = recipeInstanceId,
ingredients =
if (recipeInstanceMetadata.isEmpty) Map.empty[String, Value]
else Map(RecipeInstanceMetadataName -> com.ing.baker.types.Converters.toValue(recipeInstanceMetadata)),
recipeInstanceMetadata = recipeInstanceMetadata,
events = List.empty)
val initializeCmd = Initialize(compiledRecipe.initialMarking, processState)

//TODO ensure the initialiseCMD is accepted before we add it ot the index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,18 @@ object ProcessIndexProto {
val companion = protobuf.CreateProcess

def toProto(a: CreateProcess): protobuf.CreateProcess =
protobuf.CreateProcess(Some(a.recipeId), Some(a.recipeInstanceId))
protobuf.CreateProcess(
Some(a.recipeId),
Some(a.recipeInstanceId),
a.metaData.map { case (key, value) => protobuf.AddRecipeInstanceMetaDataRecord(Some(key), Some(value)) }.toSeq
)

def fromProto(message: protobuf.CreateProcess): Try[CreateProcess] =
for {
recipeId <- versioned(message.recipeId, "recipeId")
recipeInstanceId <- versioned(message.recipeInstanceId, "RecipeInstanceId")
} yield CreateProcess(recipeId, recipeInstanceId)
metadata = message.metaData.map(record => record.getKey -> record.getValue).toMap
} yield CreateProcess(recipeId, recipeInstanceId, metadata)
}

implicit def processEventProto(implicit actorRefProvider: ActorRefProvider): ProtoMap[ProcessEvent, protobuf.ProcessEvent] =
Expand Down Expand Up @@ -369,13 +374,13 @@ object ProcessIndexProto {
def toProto(a: AddRecipeInstanceMetaData): protobuf.AddRecipeInstanceMetaData =
protobuf.AddRecipeInstanceMetaData(
Some(a.recipeInstanceId),
a.metaData.map(record => {
protobuf.AddRecipeInstanceMetaDataRecord(Some(record._1), Some(record._2))}).toSeq)
a.metaData.map { case (key, value) => protobuf.AddRecipeInstanceMetaDataRecord(Some(key), Some(value)) }.toSeq
)

def fromProto(message: protobuf.AddRecipeInstanceMetaData): Try[AddRecipeInstanceMetaData] =
for {
recipeInstanceId <- versioned(message.recipeInstanceId, "RecipeInstanceId")
metaData = message.metaData.map(record => (record.getKey -> record.getValue)).toMap
metaData = message.metaData.map(record => record.getKey -> record.getValue).toMap
} yield AddRecipeInstanceMetaData(recipeInstanceId, metaData)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ object ProcessIndexProtocol {
*
* @param recipeId Id of the recipe to use, must exist on the Recipe Manager
* @param recipeInstanceId To be used for the process, must not be previously used
* @param metaData Additional metadata to be stored with the process
*/
case class CreateProcess(recipeId: String, recipeInstanceId: String) extends ProcessIndexMessage
case class CreateProcess(recipeId: String, recipeInstanceId: String, metaData: Map[String, String] = Map.empty) extends ProcessIndexMessage

/**
* Returned if there was already another process using the predefined process id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ object ProcessInstanceProtocol {
sealed trait Response extends BakerSerializable

/**
* A response send in case any other command then 'Initialize' is sent to the actor in unitialized state.
* A response send in case any other command then 'Initialize' is sent to the actor in initialized state.
*
* @param recipeInstanceId The identifier of the uninitialized actor.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,16 @@ import com.ing.baker.petrinet.api._
import scala.util.Random

object Instance {
def uninitialized[S](process: PetriNet): Instance[S] = Instance[S](process, 0, Marking.empty, Map.empty, null.asInstanceOf[S], Map.empty, Set.empty)
def uninitialized[S](process: PetriNet): Instance[S] =
Instance[S](
petriNet = process,
sequenceNr = 0,
marking = Marking.empty,
delayedTransitionIds = Map.empty,
state = null.asInstanceOf[S],
jobs = Map.empty,
receivedCorrelationIds = Set.empty
)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import com.ing.baker.runtime.akka.actor.process_index.ProcessIndexProtocol._
import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceProtocol
import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceProtocol._
import com.ing.baker.runtime.akka.internal.CachingInteractionManager
import com.ing.baker.runtime.common.RecipeInstanceState.RecipeInstanceMetadataName
import com.ing.baker.runtime.common.RecipeRecord
import com.ing.baker.runtime.recipe_manager.RecipeManager
import com.ing.baker.runtime.scaladsl.{EventInstance, RecipeInstanceState}
Expand All @@ -33,7 +34,6 @@ import org.scalatestplus.mockito.MockitoSugar
import scalax.collection.immutable.Graph

import java.util.UUID
import scala.collection.immutable.Seq
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

Expand Down Expand Up @@ -105,6 +105,22 @@ class ProcessIndexSpec extends TestKit(ActorSystem("ProcessIndexSpec", ProcessIn
petriNetActorProbe.expectMsg(initializeMsg)
}

"create the PetriNetInstance actor with given metadata when Initialize message is received" in {
val recipeInstanceId = UUID.randomUUID().toString
val metadata = Map("someKey" -> "someValue")
val initializeMsg =
Initialize(Marking.empty[Place], RecipeInstanceState(
recipeId = recipeId,
recipeInstanceId = recipeInstanceId,
ingredients = Map(RecipeInstanceMetadataName -> com.ing.baker.types.Converters.toValue(metadata)),
recipeInstanceMetadata = metadata,
events = List.empty))
val petriNetActorProbe = TestProbe()
val actorIndex = createActorIndex(petriNetActorProbe.ref, recipeManager)
actorIndex ! CreateProcess(recipeId, recipeInstanceId, metaData = metadata)
petriNetActorProbe.expectMsg(initializeMsg)
}

"passivation" in {
val duration = 5.seconds

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object InteractionExecution {
}
case class Failure(reason: FailureReason) extends Result {
def toBakerInteractionExecutionFailure: InteractionExecutionResult.Failure = reason match {
case InteractionError(interactionName, message) => InteractionExecutionResult.Failure(InteractionExecutionFailureReason.INTERACTION_NOT_FOUND, Some(interactionName), Some(message))
case InteractionError(interactionName, message) => InteractionExecutionResult.Failure(InteractionExecutionFailureReason.INTERACTION_EXECUTION_ERROR, Some(interactionName), Some(message))
case NoInstanceFound => InteractionExecutionResult.Failure(InteractionExecutionFailureReason.INTERACTION_NOT_FOUND, None, None)
case Timeout => InteractionExecutionResult.Failure(InteractionExecutionFailureReason.TIMEOUT, None, None)
case BadIngredients => InteractionExecutionResult.Failure(InteractionExecutionFailureReason.BAD_INGREDIENTS, None, None)
Expand Down

0 comments on commit 49416a8

Please sign in to comment.