Skip to content

Commit

Permalink
WIP: scala cloud service side of things, first iteration
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Dec 5, 2023
1 parent 9cddb36 commit 04e87ec
Show file tree
Hide file tree
Showing 9 changed files with 398 additions and 12 deletions.
12 changes: 12 additions & 0 deletions samples/grpc/restaurant-drone-deliveries-service-scala/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,15 @@ Register a delivery for the second restaurant
```shell
grpcurl -d '{"restaurant_id":"restaurant2","delivery_id": "order2","coordinates":{"latitude": 59.340128, "longitude": 18.056303}}' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.RegisterDelivery
```

Set up a replicated charging station

```shell
grpcurl -d '{"charging_station_id":"station1","location_id": "sweden/stockholm/kungsholmen", "charging_slots": 4}' -plaintext localhost:8101 charging.ChargingStationService.CreateChargingStation
```

Inspect the state of the charging station

```shell
grpcurl -d '{"charging_station_id":"station1"}' -plaintext localhost:8101 charging.ChargingStationService.GetChargingStationState
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
syntax = "proto3";

option java_multiple_files = true;
option java_package = "charging.proto";

import "google/protobuf/timestamp.proto";

package charging;

service ChargingStationService {
rpc CreateChargingStation(CreateChargingStationRequest) returns (CreateChargingStationResponse) {}
rpc GetChargingStationState(GetChargingStationStateRequest) returns (GetChargingStationStateResponse) {}
}

message CreateChargingStationRequest {
// unique identifier for the charging station
string charging_station_id = 1;
// location of the station
string location_id = 2;
// number of parallel charging slots for drones
uint32 charging_slots = 3;
}

message CreateChargingStationResponse {
}

message GetChargingStationStateRequest {
string charging_station_id = 1;
}

message GetChargingStationStateResponse {
// location of the station
string location_id = 1;
// number of parallel charging slots for drones
uint32 charging_slots = 2;
// drones currently at the station charging
repeated ChargingDrone currently_charging_drones = 3;
}

message ChargingDrone {
string drone_id = 1;
// timestamp when charging is estimated to complete
google.protobuf.Timestamp charging_complete = 2;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
include "cluster"
include "grpc"
include "persistence"
include "replication"

akka {
loglevel = DEBUG
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Allow replication of the charging station entities between the cloud service and the edge systems
akka.projection.grpc.replication.accept-edge-replication = on

# Replication configuration for the ShoppingCart. Note that config `charging-station`
# is the same as the ChargingStation.EntityType.

charging-station {
self-replica-id = cloud1
self-replica-id = ${?SELF_REPLICA_ID}
entity-event-replication-timeout = 10s
parallel-updates = 8
# all other replicas are edge replicas and not known up front
replicas: [ ]
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import central.deliveries.proto.RestaurantDeliveriesService
import central.deliveries.proto.RestaurantDeliveriesServiceHandler
import central.drones.proto.DroneOverviewService
import central.drones.proto.DroneOverviewServiceHandler
import charging.proto.ChargingStationService
import charging.proto.ChargingStationServiceHandler

import scala.concurrent.Future
import scala.util.Failure
Expand All @@ -28,17 +30,23 @@ object DroneDeliveriesServer {
Future[HttpResponse]],
pushedDroneEventsHandler: PartialFunction[
HttpRequest,
Future[HttpResponse]])(implicit system: ActorSystem[_]): Unit = {
Future[HttpResponse]],
chargingStationService: charging.proto.ChargingStationService)(
implicit system: ActorSystem[_]): Unit = {
import system.executionContext

// #composeAndBind
val service = ServiceHandler.concatOrNotFound(
DroneOverviewServiceHandler.partial(droneOverviewService),
RestaurantDeliveriesServiceHandler.partial(restaurantDeliveriesService),
ChargingStationServiceHandler.partial(chargingStationService),
deliveryEventsProducerService,
pushedDroneEventsHandler,
ServerReflection.partial(
List(DroneOverviewService, RestaurantDeliveriesService)))
List(
DroneOverviewService,
RestaurantDeliveriesService,
ChargingStationService)))

val bound = Http(system).newServerAt(interface, port).bind(service)
// #composeAndBind
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import akka.actor.typed.ActorSystem
import akka.actor.typed.SpawnProtocol
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.management.scaladsl.AkkaManagement
import akka.projection.grpc.producer.scaladsl.EventProducer
import central.deliveries.DeliveryEvents
import central.deliveries.RestaurantDeliveries
import central.deliveries.RestaurantDeliveriesServiceImpl
import central.drones.{ Drone, DroneOverviewServiceImpl, LocalDroneEvents }
import central.drones.{Drone, DroneOverviewServiceImpl, LocalDroneEvents}
import charging.ChargingStation
import charging.ChargingStationServiceImpl
import org.slf4j.LoggerFactory

import scala.util.control.NonFatal
Expand Down Expand Up @@ -36,6 +39,7 @@ object Main {
Drone.init(system)
LocalDroneEvents.initPushedEventsConsumer(system)
RestaurantDeliveries.init(system)
val chargingStationReplication = ChargingStation.init(system)

val interface = system.settings.config
.getString("restaurant-drone-deliveries-service.grpc.interface")
Expand All @@ -44,19 +48,30 @@ object Main {

val pushedDroneEventsHandler =
LocalDroneEvents.pushedEventsGrpcHandler(system)
val deliveryEventsProducerService =
DeliveryEvents.eventProducerService(system)
val deliveryEventsProducerSource =
DeliveryEvents.eventProducerSource(system)
val droneOverviewService = new DroneOverviewServiceImpl(system, settings)
val restaurantDeliveriesService =
new RestaurantDeliveriesServiceImpl(system, settings)

val chargingStationService = new ChargingStationServiceImpl(chargingStationReplication.entityRefFactory)

// delivery events and charging station replication both are Akka Projection gRPC push destinations
// and needs to be combined into a single gRPC service handling both:
// FIXME shouldn't this rather combine with pushedDroneEvents handler? Hmmmm.
val eventProducerService = EventProducer.grpcServiceHandler(
Set(
deliveryEventsProducerSource,
chargingStationReplication.eventProducerService))

DroneDeliveriesServer.start(
interface,
port,
droneOverviewService,
restaurantDeliveriesService,
deliveryEventsProducerService,
pushedDroneEventsHandler)
eventProducerService,
pushedDroneEventsHandler,
chargingStationService)

}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package central.deliveries

import akka.actor.typed.ActorSystem
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse
import akka.persistence.query.typed.EventEnvelope
import akka.projection.grpc.producer.EventProducerSettings
import akka.projection.grpc.producer.scaladsl.EventProducer
Expand All @@ -12,8 +10,8 @@ import scala.concurrent.Future

object DeliveryEvents {

def eventProducerService(system: ActorSystem[_])
: PartialFunction[HttpRequest, Future[HttpResponse]] = {
def eventProducerSource(
system: ActorSystem[_]): EventProducer.EventProducerSource = {
val transformation = Transformation.empty
.registerAsyncEnvelopeMapper[
RestaurantDeliveries.DeliveryRegistered,
Expand All @@ -29,7 +27,7 @@ object DeliveryEvents {
transformation,
EventProducerSettings(system))

EventProducer.grpcServiceHandler(eventProducerSource)(system)
eventProducerSource
}

private def transformDeliveryRegistration(
Expand Down
Loading

0 comments on commit 04e87ec

Please sign in to comment.