Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
roll 3
Browse files Browse the repository at this point in the history
* custom ShardingMessageExtractor can be removed
  • Loading branch information
patriknw committed Jun 19, 2019
1 parent c2bff79 commit 5294705
Showing 1 changed file with 5 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package sample.sharding
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.sharding.typed.HashCodeMessageExtractor
import akka.cluster.sharding.typed.ShardingEnvelope
import akka.cluster.sharding.typed.ShardingMessageExtractor
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.Entity
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
Expand All @@ -20,47 +17,11 @@ object Device {

def init(system: ActorSystem[_]): Unit = {

val messageExtractor =
new ShardingMessageExtractor[Any, Device.RecordTemperature] {

// Note that `HashCodeMessageExtractor` is using
// `(math.abs(id.hashCode) % numberOfShards).toString`.
// If the old Untyped nodes were using a different hashing function
// this delegate HashCodeMessageExtractor can't be used and
// same hashing function as before must be implemented here.
// `akka.cluster.sharding.typed.HashCodeMessageExtractor` is compatible
// with `akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor`.
val delegate = new HashCodeMessageExtractor[Device.RecordTemperature](
system.settings.config
.getInt("akka.cluster.sharding.number-of-shards")
)

override def entityId(message: Any): String = {
message match {
case Device.RecordTemperature(deviceId, _) =>
deviceId.toString
case env: ShardingEnvelope[Device.RecordTemperature] =>
delegate.entityId(env)
}
}

override def shardId(entityId: String): String = {
delegate.shardId(entityId)
}

override def unwrapMessage(message: Any): RecordTemperature = {
message match {
case m: Device.RecordTemperature => m
case env: ShardingEnvelope[Device.RecordTemperature] =>
delegate.unwrapMessage(env)
}
}
}

ClusterSharding(system).init(
Entity(TypeKey, _ => Device())
.withMessageExtractor(messageExtractor)
)
// If the original hashing function was using
// `(math.abs(id.hashCode) % numberOfShards).toString`
// the default HashCodeMessageExtractor in Typed can be used.
// That is also compatible with `akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor`.
ClusterSharding(system).init(Entity(TypeKey, _ => Device()))
}

case class RecordTemperature(deviceId: Int, temperature: Double)
Expand Down

0 comments on commit 5294705

Please sign in to comment.