Skip to content

Commit

Permalink
test selectMemberFunc
Browse files Browse the repository at this point in the history
  • Loading branch information
anthdm committed Jan 13, 2024
1 parent 506e877 commit b190872
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 88 deletions.
67 changes: 67 additions & 0 deletions cluster/activation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package cluster

import (
fmt "fmt"
"math"
"math/rand"
)

// ActivationConfig...
type ActivationConfig struct {
id string
region string
selectMember SelectMemberFunc
}

// NewActivationConfig returns a new default config.
func NewActivationConfig() ActivationConfig {
return ActivationConfig{
id: fmt.Sprintf("%d", rand.Intn(math.MaxInt)),
region: "default",
selectMember: SelectRandomMember,
}
}

// WithSelectMemberFunc set's the fuction that will be invoked during
// the activation process.
// It will select the member where the actor will be activated/spawned on.
func (config ActivationConfig) WithSelectMemberFunc(fun SelectMemberFunc) ActivationConfig {
config.selectMember = fun
return config
}

// WithID set's the id of the actor that will be activated on the cluster.
//
// Defaults to a random identifier.
func (config ActivationConfig) WithID(id string) ActivationConfig {
config.id = id
return config
}

// WithRegion set's the region on where this actor should be spawned.
//
// Defaults to a "default".
func (config ActivationConfig) WithRegion(region string) ActivationConfig {
config.region = region
return config
}

// SelectMemberFunc will be invoked during the activation process.
// Given the ActivationDetails the actor will be spawned on the returned member.
type SelectMemberFunc func(ActivationDetails) *Member

// ActivationDetails holds detailed information about an activation.
type ActivationDetails struct {
// Region where the actor should be activated on
Region string
// A slice of members that are pre-filtered by the kind of the actor
// that need to be activated
Members []*Member
// The kind of the actor
Kind string
}

// SelectRandomMember selects a random member of the cluster.
func SelectRandomMember(details ActivationDetails) *Member {
return details.Members[rand.Intn(len(details.Members))]
}
27 changes: 0 additions & 27 deletions cluster/activator.go

This file was deleted.

26 changes: 15 additions & 11 deletions cluster/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (

type (
activate struct {
details ActivationDetails
kind string
config ActivationConfig
}
getMembers struct{}
getKinds struct{}
Expand Down Expand Up @@ -58,7 +59,7 @@ func (a *Agent) Receive(c *actor.Context) {
case *Activation:
a.handleActivation(msg)
case activate:
pid := a.activate(msg.details)
pid := a.activate(msg.kind, msg.config)
c.Respond(pid)
case deactivate:
a.bcast(&Deactivation{PID: msg.pid})
Expand Down Expand Up @@ -115,27 +116,30 @@ func (a *Agent) handleActivationRequest(msg *ActivationRequest) *ActivationRespo
return resp
}

func (a *Agent) activate(details ActivationDetails) *actor.PID {
members := a.members.FilterByKind(details.Kind)
func (a *Agent) activate(kind string, config ActivationConfig) *actor.PID {
members := a.members.FilterByKind(kind)
if len(members) == 0 {
slog.Warn("could not find any members with kind", "kind", details.Kind)
slog.Warn("could not find any members with kind", "kind", kind)
return nil
}
owner := a.cluster.config.activationStrategy.ActivateOnMember(ActivationDetails{
if config.selectMember == nil {
config.selectMember = SelectRandomMember
}
memberPID := config.selectMember(ActivationDetails{
Members: members,
Region: region,
Region: config.region,
Kind: kind,
})
if owner == nil {
if memberPID == nil {
slog.Warn("activator did not found a member to activate on")
return nil
}
req := &ActivationRequest{Kind: kind, ID: id}
activatorPID := actor.NewPID(owner.Host, "cluster/"+owner.ID)
req := &ActivationRequest{Kind: kind, ID: config.id}
activatorPID := actor.NewPID(memberPID.Host, "cluster/"+memberPID.ID)

var activationResp *ActivationResponse
// Local activation
if owner.Host == a.cluster.engine.Address() {
if memberPID.Host == a.cluster.engine.Address() {
activationResp = a.handleActivationRequest(req)
} else {
// Remote activation
Expand Down
33 changes: 1 addition & 32 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,36 +150,6 @@ func (c *Cluster) Spawn(p actor.Producer, id string, opts ...actor.OptFunc) *act
return pid
}

// ActivationConfig...
type ActivationConfig struct {
id string
region string
}

// NewActivationConfig returns a new default config.
func NewActivationConfig() ActivationConfig {
return ActivationConfig{
id: fmt.Sprintf("%d", rand.Intn(math.MaxInt)),
region: "default",
}
}

// WithID set's the id of the actor that will be activated on the cluster.
//
// Defaults to a random identifier.
func (config ActivationConfig) WithID(id string) ActivationConfig {
config.id = id
return config
}

// WithRegion set's the region on where this actor should be spawned.
//
// Defaults to a "default".
func (config ActivationConfig) WithRegion(region string) ActivationConfig {
config.region = region
return config
}

// Activate actives the registered kind in the cluster based on the given config.
// The actor does not need to be registered locally on the member if at least one
// member has that kind registered.
Expand All @@ -188,8 +158,7 @@ func (config ActivationConfig) WithRegion(region string) ActivationConfig {
func (c *Cluster) Activate(kind string, config ActivationConfig) *actor.PID {
msg := activate{
kind: kind,
id: config.id,
region: config.region,
config: config,
}
resp, err := c.engine.Request(c.agentPID, msg, c.config.requestTimeout).Result()
if err != nil {
Expand Down
53 changes: 50 additions & 3 deletions cluster/cluster_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package cluster

import (
"context"
"fmt"
"log"
"math/rand"
"sync"
"testing"
"time"

"github.com/anthdm/hollywood/actor"
"github.com/anthdm/hollywood/remote"
Expand All @@ -29,11 +31,56 @@ func NewInventory() actor.Receiver {

func (i Inventory) Receive(c *actor.Context) {}

func TestClusterActivationOnMemberFunc(t *testing.T) {
c, err := New(NewConfig())
func TestClusterSelectMemberFunc(t *testing.T) {
c1, err := New(NewConfig().WithID("A"))
require.Nil(t, err)
c2, err := New(NewConfig().WithID("B"))
require.Nil(t, err)
c3, err := New(NewConfig().WithID("C"))
require.Nil(t, err)

c.RegisterKind("player", NewPlayer, NewKindConfig())
c1.RegisterKind("player", NewPlayer, NewKindConfig())
c2.RegisterKind("player", NewPlayer, NewKindConfig())
c3.RegisterKind("player", NewPlayer, NewKindConfig())

c1.Start()
c2.Start()
c3.Start()

selectMember := func(details ActivationDetails) *Member {
for _, member := range details.Members {
if member.ID == "C" {
return member
}
}
return nil
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
eventPID := c1.Engine().SpawnFunc(func(c *actor.Context) {
switch msg := c.Message().(type) {
case ActivationEvent:
// test that we spawned on member C
require.Equal(t, c3.Address(), msg.PID.Address)
cancel()
case MemberJoinEvent:
if msg.Member.ID == "C" {
// Wait till member C is online before activating
// Activate the actor from member A
// Which should spawn the actor on member C
config := NewActivationConfig().WithSelectMemberFunc(selectMember)
c1.Activate("cancel_receiver", config)
}
}
}, "event")
c1.Engine().Subscribe(eventPID)
defer c1.Engine().Unsubscribe(eventPID)

<-ctx.Done()
require.Equal(t, context.DeadlineExceeded, ctx.Err())
c1.Stop().Wait()
c2.Stop().Wait()
c3.Stop().Wait()
}

func TestClusterShouldWorkWithDefaultValues(t *testing.T) {
Expand Down
15 changes: 2 additions & 13 deletions cluster/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,11 @@ package cluster
import "github.com/anthdm/hollywood/actor"

// KindConfig holds configuration for a registered kind.
type KindConfig struct {
activateOnMember ActivateOnMemberFunc
}
type KindConfig struct{}

// NewKindConfig returns a default kind configuration.
func NewKindConfig() KindConfig {
return KindConfig{
activateOnMember: ActivateOnRandomMember,
}
}

// WithActivateOnMemberFunc set the function that will be used to select the member
// where this kind will be spawned/activated on.
func (config KindConfig) WithActivateOnMemberFunc(fun ActivateOnMemberFunc) KindConfig {
config.activateOnMember = fun
return config
return KindConfig{}
}

// A kind is a type of actor that can be activated from any member of the cluster.
Expand Down
2 changes: 1 addition & 1 deletion examples/cluster/member_1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func main() {
if err != nil {
log.Fatal(err)
}
c.RegisterKind("playerSession", shared.NewPlayer, nil)
c.RegisterKind("playerSession", shared.NewPlayer, cluster.NewKindConfig())

eventPID := c.Engine().SpawnFunc(func(ctx *actor.Context) {
switch msg := ctx.Message().(type) {
Expand Down
2 changes: 1 addition & 1 deletion examples/cluster/member_2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func main() {
}, "event")

c.Engine().Subscribe(eventPID)
c.RegisterKind("playerSession", shared.NewPlayer, nil)
c.RegisterKind("playerSession", shared.NewPlayer, cluster.NewKindConfig())
c.Start()
select {}
}

0 comments on commit b190872

Please sign in to comment.