diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/README.md b/samples/grpc/restaurant-drone-deliveries-service-scala/README.md index 01d439ea8..d741857c9 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/README.md +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/README.md @@ -79,3 +79,15 @@ Register a delivery for the second restaurant ```shell grpcurl -d '{"restaurant_id":"restaurant2","delivery_id": "order2","coordinates":{"latitude": 59.340128, "longitude": 18.056303}}' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.RegisterDelivery ``` + +Set up a replicated charging station + +```shell +grpcurl -d '{"charging_station_id":"station1","location_id": "sweden/stockholm/kungsholmen", "charging_slots": 4}' -plaintext localhost:8101 charging.ChargingStationService.CreateChargingStation +``` + +Inspect the state of the charging station + +```shell +grpcurl -d '{"charging_station_id":"station1"}' -plaintext localhost:8101 charging.ChargingStationService.GetChargingStationState +``` \ No newline at end of file diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/protobuf/charging/charging_station_api.proto b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/protobuf/charging/charging_station_api.proto new file mode 100644 index 000000000..6e890940c --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/protobuf/charging/charging_station_api.proto @@ -0,0 +1,44 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "charging.proto"; + +import "google/protobuf/timestamp.proto"; + +package charging; + +service ChargingStationService { + rpc CreateChargingStation(CreateChargingStationRequest) returns (CreateChargingStationResponse) {} + rpc GetChargingStationState(GetChargingStationStateRequest) returns (GetChargingStationStateResponse) {} +} + +message CreateChargingStationRequest { + // unique identifier for the charging station + string charging_station_id = 1; + // location of the station + string location_id = 2; + // number of parallel charging slots for drones + uint32 charging_slots = 3; +} + +message CreateChargingStationResponse { +} + +message GetChargingStationStateRequest { + string charging_station_id = 1; +} + +message GetChargingStationStateResponse { + // location of the station + string location_id = 1; + // number of parallel charging slots for drones + uint32 charging_slots = 2; + // drones currently at the station charging + repeated ChargingDrone currently_charging_drones = 3; +} + +message ChargingDrone { + string drone_id = 1; + // timestamp when charging is estimated to complete + google.protobuf.Timestamp charging_complete = 2; +} diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/resources/application.conf b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/resources/application.conf index 620c4a199..6661651eb 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/resources/application.conf +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/resources/application.conf @@ -1,6 +1,7 @@ include "cluster" include "grpc" include "persistence" +include "replication" akka { loglevel = DEBUG diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/resources/replication.conf b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/resources/replication.conf new file mode 100644 index 000000000..8f8b95248 --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/resources/replication.conf @@ -0,0 +1,14 @@ +# Allow replication of the charging station entities between the cloud service and the edge systems +akka.projection.grpc.replication.accept-edge-replication = on + +# Replication configuration for the ShoppingCart. Note that config `charging-station` +# is the same as the ChargingStation.EntityType. + +charging-station { + self-replica-id = cloud1 + self-replica-id = ${?SELF_REPLICA_ID} + entity-event-replication-timeout = 10s + parallel-updates = 8 + # all other replicas are edge replicas and not known up front + replicas: [ ] +} diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/DroneDeliveriesServer.scala b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/DroneDeliveriesServer.scala index 83fbba32a..e94aed645 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/DroneDeliveriesServer.scala +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/DroneDeliveriesServer.scala @@ -11,6 +11,8 @@ import central.deliveries.proto.RestaurantDeliveriesService import central.deliveries.proto.RestaurantDeliveriesServiceHandler import central.drones.proto.DroneOverviewService import central.drones.proto.DroneOverviewServiceHandler +import charging.proto.ChargingStationService +import charging.proto.ChargingStationServiceHandler import scala.concurrent.Future import scala.util.Failure @@ -28,17 +30,23 @@ object DroneDeliveriesServer { Future[HttpResponse]], pushedDroneEventsHandler: PartialFunction[ HttpRequest, - Future[HttpResponse]])(implicit system: ActorSystem[_]): Unit = { + Future[HttpResponse]], + chargingStationService: charging.proto.ChargingStationService)( + implicit system: ActorSystem[_]): Unit = { import system.executionContext // #composeAndBind val service = ServiceHandler.concatOrNotFound( DroneOverviewServiceHandler.partial(droneOverviewService), RestaurantDeliveriesServiceHandler.partial(restaurantDeliveriesService), + ChargingStationServiceHandler.partial(chargingStationService), deliveryEventsProducerService, pushedDroneEventsHandler, ServerReflection.partial( - List(DroneOverviewService, RestaurantDeliveriesService))) + List( + DroneOverviewService, + RestaurantDeliveriesService, + ChargingStationService))) val bound = Http(system).newServerAt(interface, port).bind(service) // #composeAndBind diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/Main.scala b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/Main.scala index 2c9dd362b..5cde387b7 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/Main.scala +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/Main.scala @@ -4,10 +4,13 @@ import akka.actor.typed.ActorSystem import akka.actor.typed.SpawnProtocol import akka.management.cluster.bootstrap.ClusterBootstrap import akka.management.scaladsl.AkkaManagement +import akka.projection.grpc.producer.scaladsl.EventProducer import central.deliveries.DeliveryEvents import central.deliveries.RestaurantDeliveries import central.deliveries.RestaurantDeliveriesServiceImpl -import central.drones.{ Drone, DroneOverviewServiceImpl, LocalDroneEvents } +import central.drones.{Drone, DroneOverviewServiceImpl, LocalDroneEvents} +import charging.ChargingStation +import charging.ChargingStationServiceImpl import org.slf4j.LoggerFactory import scala.util.control.NonFatal @@ -36,6 +39,7 @@ object Main { Drone.init(system) LocalDroneEvents.initPushedEventsConsumer(system) RestaurantDeliveries.init(system) + val chargingStationReplication = ChargingStation.init(system) val interface = system.settings.config .getString("restaurant-drone-deliveries-service.grpc.interface") @@ -44,19 +48,30 @@ object Main { val pushedDroneEventsHandler = LocalDroneEvents.pushedEventsGrpcHandler(system) - val deliveryEventsProducerService = - DeliveryEvents.eventProducerService(system) + val deliveryEventsProducerSource = + DeliveryEvents.eventProducerSource(system) val droneOverviewService = new DroneOverviewServiceImpl(system, settings) val restaurantDeliveriesService = new RestaurantDeliveriesServiceImpl(system, settings) + val chargingStationService = new ChargingStationServiceImpl(chargingStationReplication.entityRefFactory) + + // delivery events and charging station replication both are Akka Projection gRPC push destinations + // and needs to be combined into a single gRPC service handling both: + // FIXME shouldn't this rather combine with pushedDroneEvents handler? Hmmmm. + val eventProducerService = EventProducer.grpcServiceHandler( + Set( + deliveryEventsProducerSource, + chargingStationReplication.eventProducerService)) + DroneDeliveriesServer.start( interface, port, droneOverviewService, restaurantDeliveriesService, - deliveryEventsProducerService, - pushedDroneEventsHandler) + eventProducerService, + pushedDroneEventsHandler, + chargingStationService) } diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/deliveries/DeliveryEvents.scala b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/deliveries/DeliveryEvents.scala index 560acf447..f03b76e3f 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/deliveries/DeliveryEvents.scala +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/deliveries/DeliveryEvents.scala @@ -1,8 +1,6 @@ package central.deliveries import akka.actor.typed.ActorSystem -import akka.http.scaladsl.model.HttpRequest -import akka.http.scaladsl.model.HttpResponse import akka.persistence.query.typed.EventEnvelope import akka.projection.grpc.producer.EventProducerSettings import akka.projection.grpc.producer.scaladsl.EventProducer @@ -12,8 +10,8 @@ import scala.concurrent.Future object DeliveryEvents { - def eventProducerService(system: ActorSystem[_]) - : PartialFunction[HttpRequest, Future[HttpResponse]] = { + def eventProducerSource( + system: ActorSystem[_]): EventProducer.EventProducerSource = { val transformation = Transformation.empty .registerAsyncEnvelopeMapper[ RestaurantDeliveries.DeliveryRegistered, @@ -29,7 +27,7 @@ object DeliveryEvents { transformation, EventProducerSettings(system)) - EventProducer.grpcServiceHandler(eventProducerSource)(system) + eventProducerSource } private def transformDeliveryRegistration( diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/charging/ChargingStation.scala b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/charging/ChargingStation.scala new file mode 100644 index 000000000..b7f44be1f --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/charging/ChargingStation.scala @@ -0,0 +1,241 @@ +/* + * Copyright (C) 2009-2023 Lightbend Inc. + */ +package charging + +import akka.Done +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.ActorContext +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.TimerScheduler +import akka.persistence.typed.RecoveryCompleted +import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.scaladsl.EventSourcedBehavior +import akka.persistence.typed.scaladsl.ReplicationContext +import akka.projection.grpc.replication.scaladsl.ReplicatedBehaviors +import akka.projection.grpc.replication.scaladsl.Replication +import akka.projection.grpc.replication.scaladsl.ReplicationSettings +import akka.projection.r2dbc.scaladsl.R2dbcReplication +import akka.serialization.jackson.CborSerializable + +import java.time.Instant +import scala.concurrent.duration.DurationInt +import scala.concurrent.duration.DurationLong +import scala.concurrent.duration.FiniteDuration +import scala.math.Ordered.orderingToOrdered + +object ChargingStation { + + // commands and replies + sealed trait Command extends CborSerializable {} + case class Create( + locationId: String, + chargingSlots: Int, + replyTo: ActorRef[Done]) + extends Command + case class StartCharging( + droneId: String, + replyTo: ActorRef[StartChargingResponse]) + extends Command + sealed trait StartChargingResponse + case class AllSlotsBusy(firstSlotFreeAt: Instant) + extends StartChargingResponse + + case class GetState(replyTo: ActorRef[ChargingStation.State]) extends Command + + private case class CompleteCharging(droneId: String) extends Command + + // events + sealed trait Event extends CborSerializable {} + case class Created(locationId: String, chargingSlots: Int) extends Event + case class ChargingStarted(droneId: String, chargeComplete: Instant) + extends Event + with StartChargingResponse + + case class ChargingCompleted(droneId: String) extends Event + + case class ChargingDrone( + droneId: String, + chargingDone: Instant, + replicaId: String) + case class State( + chargingSlots: Int, + dronesCharging: Set[ChargingDrone], + stationLocationId: String) + extends CborSerializable + + val EntityType = "charging-station" + + private val FullChargeTime = 5.minutes + + def init(implicit system: ActorSystem[_]): Replication[Command] = { + // FIXME replication filter so only charging stations in a given edge replica is replicated there + val replicationSettings = + ReplicationSettings[Command](EntityType, R2dbcReplication()) + // FIXME remove once release out with flag in config (1.5.1-M2/GA) + .withEdgeReplication(true) + Replication.grpcReplication(replicationSettings)(ChargingStation.apply) + } + + def apply( + replicatedBehaviors: ReplicatedBehaviors[Command, Event, Option[State]]) + : Behavior[Command] = { + Behaviors.setup[Command] { context => + Behaviors.withTimers { timers => + replicatedBehaviors.setup { replicationContext => + new ChargingStation(context, replicationContext, timers).behavior() + } + } + } + } + + private def durationUntil(instant: Instant): FiniteDuration = + (instant.getEpochSecond - Instant.now().getEpochSecond).seconds + +} + +class ChargingStation( + context: ActorContext[ChargingStation.Command], + replicationContext: ReplicationContext, + timers: TimerScheduler[ChargingStation.Command]) { + + import ChargingStation._ + + def behavior(): EventSourcedBehavior[Command, Event, Option[State]] = + EventSourcedBehavior( + replicationContext.persistenceId, + None, + handleCommand, + handleEvent) + .receiveSignal { case (Some(state: State), RecoveryCompleted) => + handleRecoveryCompleted(state) + } + // tag with location id so we can replicate only to the right edge node + .withTaggerForState { + case (None, _) => Set.empty + case (Some(state), _) => Set(state.stationLocationId) + } + + private def handleCommand( + state: Option[State], + command: Command): Effect[Event, Option[State]] = + state match { + case None => handleCommandNoState(command) + case Some(state) => handleCommandInitialized(state, command) + } + + private def handleCommandNoState( + command: Command): Effect[Event, Option[State]] = + command match { + case Create(locationId, chargingSlots, replyTo) => + Effect + .persist(Created(locationId, chargingSlots)) + .thenReply(replyTo)(_ => Done) + case unexpected => + context.log.warn( + "Got an unexpected command {} but charging station with id {} not initialized", + unexpected.getClass, + replicationContext.entityId) + Effect.none + } + + private def handleCommandInitialized( + state: ChargingStation.State, + command: ChargingStation.Command): Effect[Event, Option[State]] = { + command match { + case Create(_, _, _) => + context.log.warn( + "Got a create command, but station id {} was already created, ignoring", + replicationContext.entityId) + Effect.none + + case StartCharging(droneId, replyTo) => + if (state.dronesCharging.exists(_.droneId == droneId)) { + context.log.warn( + "Drone {} requested charging but is already charging. Ignoring.", + droneId) + Effect.none + } else if (state.dronesCharging.size >= state.chargingSlots) { + val earliestFreeSlot = state.dronesCharging.map(_.chargingDone).min + context.log.info( + "Drone {} requested charging but all stations busy, earliest free slot {}", + droneId, + earliestFreeSlot) + Effect.reply(replyTo)(AllSlotsBusy(earliestFreeSlot)) + } else { + // charge + val chargeCompletedBy = + Instant.now().plusSeconds(FullChargeTime.toSeconds) + context.log.info( + "Drone {} requested charging, will complete charging at {}", + droneId, + chargeCompletedBy) + val event = ChargingStarted(droneId, chargeCompletedBy) + Effect.persist(event).thenRun { (_: Option[State]) => + timers.startSingleTimer( + CompleteCharging(droneId), + durationUntil(chargeCompletedBy)) + // Note: The event is also the reply + replyTo ! event + } + } + + case CompleteCharging(droneId) => + Effect.persist(ChargingCompleted(droneId)) + + case GetState(replyTo) => + Effect.reply(replyTo)(state) + + } + } + + private def handleEvent(state: Option[State], event: Event): Option[State] = { + state match { + case None => + event match { + case Created(locationId, chargingSlots) => + Some(State(chargingSlots, Set.empty, locationId)) + case unexpected => + throw new IllegalArgumentException( + s"Got unexpected event ${unexpected} for uninitialized state") + } + + case Some(state) => + event match { + case Created(_, _) => + context.log.warn("Saw a second created event, ignoring") + Some(state) + case ChargingStarted(droneId, chargeComplete) => + Some( + state.copy(dronesCharging = state.dronesCharging + ChargingDrone( + droneId, + chargeComplete, + replicationContext.origin.id))) + case ChargingCompleted(droneId) => + Some( + state.copy(dronesCharging = + state.dronesCharging.filterNot(_.droneId == droneId))) + } + + } + } + + private def handleRecoveryCompleted(state: State): Unit = { + // Complete or set up timers for completion for drones charging, + // but only if the charging was initiated in this replica + val now = Instant.now() + state.dronesCharging + .filter(_.replicaId == replicationContext.replicaId.id) + .foreach { chargingDrone => + if (chargingDrone.chargingDone < now) + context.self ! CompleteCharging(chargingDrone.droneId) + else + timers.startSingleTimer( + CompleteCharging(chargingDrone.droneId), + durationUntil(chargingDrone.chargingDone)) + } + } + +} diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/charging/ChargingStationServiceImpl.scala b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/charging/ChargingStationServiceImpl.scala new file mode 100644 index 000000000..54c80bfab --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/charging/ChargingStationServiceImpl.scala @@ -0,0 +1,53 @@ +package charging + +import akka.actor.typed.ActorSystem +import akka.cluster.sharding.typed.scaladsl.EntityRef +import akka.util.Timeout +import charging.proto +import com.google.protobuf.timestamp.Timestamp +import org.slf4j.LoggerFactory + +import scala.concurrent.Future +import scala.concurrent.duration.DurationInt + +class ChargingStationServiceImpl( + entityRefFactory: String => EntityRef[ChargingStation.Command])( + implicit val system: ActorSystem[_]) + extends proto.ChargingStationService { + private final val log = LoggerFactory.getLogger(getClass) + import system.executionContext + implicit val askTimeout: Timeout = 3.seconds + + override def createChargingStation(in: proto.CreateChargingStationRequest) + : Future[proto.CreateChargingStationResponse] = { + log.info( + "Creating charging station {} with {} charging slots, in location {}", + in.chargingStationId, + in.chargingSlots, + in.locationId) + + val entityRef = entityRefFactory(in.chargingStationId) + entityRef + .ask(ChargingStation.Create(in.locationId, in.chargingSlots, _)) + .map(_ => proto.CreateChargingStationResponse.defaultInstance) + + } + + override def getChargingStationState(in: proto.GetChargingStationStateRequest) + : Future[proto.GetChargingStationStateResponse] = { + log.info("Get charging station {} state", in.chargingStationId) + val entityRef = entityRefFactory(in.chargingStationId) + entityRef + .ask(ChargingStation.GetState.apply) + .map(state => + proto.GetChargingStationStateResponse( + locationId = state.stationLocationId, + chargingSlots = state.chargingSlots, + currentlyChargingDrones = state.dronesCharging + .map(d => + proto.ChargingDrone( + droneId = d.droneId, + chargingComplete = Some(Timestamp(d.chargingDone)))) + .toSeq)) + } +}