[go: nahoru, domu]

Skip to content

Commit

Permalink
Implement Guardian.
Browse files Browse the repository at this point in the history
  • Loading branch information
potterdai committed Dec 28, 2017
1 parent b26cae5 commit 89496f5
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 9 deletions.
82 changes: 82 additions & 0 deletions actor/guardian.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package actor

import (
"errors"
"sync"

"github.com/AsynkronIT/protoactor-go/log"
)

type guardiansValue struct {
guardians *sync.Map
}

var guardians = &guardiansValue{&sync.Map{}}

func (gs *guardiansValue) getGuardianPid(s SupervisorStrategy) *PID {
if g, ok := gs.guardians.Load(s); ok {
return g.(*guardianProcess).pid
}
g := gs.newGuardian(s)
gs.guardians.Store(s, g)
return g.pid
}

// newGuardian creates and returns a new actor.guardianProcess with a timeout of duration d
func (gs *guardiansValue) newGuardian(s SupervisorStrategy) *guardianProcess {
ref := &guardianProcess{strategy: s}
id := ProcessRegistry.NextId()

pid, ok := ProcessRegistry.Add(ref, "guardian"+id)
if !ok {
plog.Error("failed to register guardian process", log.Stringer("pid", pid))
}

ref.pid = pid
return ref
}

type guardianProcess struct {
pid *PID
strategy SupervisorStrategy
}

func (g *guardianProcess) SendUserMessage(pid *PID, message interface{}) {
panic(errors.New("Guardian actor cannot receive any user messages"))
}

func (g *guardianProcess) SendSystemMessage(pid *PID, message interface{}) {
if msg, ok := message.(*Failure); ok {
g.strategy.HandleFailure(g, msg.Who, msg.RestartStats, msg.Reason, msg.Message)
}
}

func (g *guardianProcess) Stop(pid *PID) {
//Ignore
}

func (g *guardianProcess) Children() []*PID {
panic(errors.New("Guardian does not hold its children PIDs"))
}

func (*guardianProcess) EscalateFailure(reason interface{}, message interface{}) {
panic(errors.New("Guardian cannot escalate failure"))
}

func (*guardianProcess) RestartChildren(pids ...*PID) {
for _, pid := range pids {
pid.sendSystemMessage(restartMessage)
}
}

func (*guardianProcess) StopChildren(pids ...*PID) {
for _, pid := range pids {
pid.sendSystemMessage(stopMessage)
}
}

func (*guardianProcess) ResumeChildren(pids ...*PID) {
for _, pid := range pids {
pid.sendSystemMessage(resumeMailboxMessage)
}
}
5 changes: 5 additions & 0 deletions actor/local_context.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package actor

import (
"errors"
"time"

"github.com/AsynkronIT/protoactor-go/log"
Expand Down Expand Up @@ -425,6 +426,10 @@ func (ctx *localContext) SpawnPrefix(props *Props, prefix string) *PID {
}

func (ctx *localContext) SpawnNamed(props *Props, name string) (*PID, error) {
if props.guardianStrategy != nil {
panic(errors.New("Props used to spawn child cannot have GuardianStrategy"))
}

pid, err := props.spawn(ctx.self.Id+"/"+name, ctx.self)
if err != nil {
return pid, err
Expand Down
7 changes: 7 additions & 0 deletions actor/props.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type OutboundMiddleware func(next SenderFunc) SenderFunc
type Props struct {
actorProducer Producer
mailboxProducer mailbox.Producer
guardianStrategy SupervisorStrategy
supervisionStrategy SupervisorStrategy
inboundMiddleware []InboundMiddleware
outboundMiddleware []OutboundMiddleware
Expand Down Expand Up @@ -61,6 +62,12 @@ func (props *Props) WithMailbox(mailbox mailbox.Producer) *Props {
return props
}

//WithGuardian assigns a guardian strategy to the props
func (props *Props) WithGuardian(guardian SupervisorStrategy) *Props {
props.guardianStrategy = guardian
return props
}

//WithSupervisor assigns a supervision strategy to the props
func (props *Props) WithSupervisor(supervisor SupervisorStrategy) *Props {
props.supervisionStrategy = supervisor
Expand Down
10 changes: 7 additions & 3 deletions actor/spawn.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,24 @@ var DefaultSpawner SpawnFunc = spawn

// Spawn starts a new actor based on props and named with a unique id
func Spawn(props *Props) *PID {
pid, _ := props.spawn(ProcessRegistry.NextId(), nil)
pid, _ := SpawnNamed(props, ProcessRegistry.NextId())
return pid
}

// SpawnPrefix starts a new actor based on props and named using a prefix followed by a unique id
func SpawnPrefix(props *Props, prefix string) (*PID, error) {
return props.spawn(prefix+ProcessRegistry.NextId(), nil)
return SpawnNamed(props, prefix+ProcessRegistry.NextId())
}

// SpawnNamed starts a new actor based on props and named using the specified name
//
// If name exists, error will be ErrNameExists
func SpawnNamed(props *Props, name string) (*PID, error) {
return props.spawn(name, nil)
var parent *PID
if props.guardianStrategy != nil {
parent = guardians.getGuardianPid(props.guardianStrategy)
}
return props.spawn(name, parent)
}

func spawn(id string, props *Props, parent *PID) (*PID, error) {
Expand Down
2 changes: 1 addition & 1 deletion actor/strategy_all_for_one.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (strategy *allForOneStrategy) requestRestartPermission(rs *RestartStatistic

rs.Fail()

if strategy.withinDuration == 0 || rs.IsWithinDuration(strategy.withinDuration) {
if strategy.maxNrOfRetries > 0 && (strategy.withinDuration == 0 || rs.IsWithinDuration(strategy.withinDuration)) {
return rs.FailureCount <= strategy.maxNrOfRetries
}

Expand Down
2 changes: 1 addition & 1 deletion actor/strategy_one_for_one.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (strategy *oneForOne) requestRestartPermission(rs *RestartStatistics) bool

rs.Fail()

if strategy.withinDuration == 0 || rs.IsWithinDuration(strategy.withinDuration) {
if strategy.maxNrOfRetries > 0 && (strategy.withinDuration == 0 || rs.IsWithinDuration(strategy.withinDuration)) {
return rs.FailureCount <= strategy.maxNrOfRetries
}

Expand Down
3 changes: 2 additions & 1 deletion cluster/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ type partitionActor struct {
}

func spawnPartitionActor(kind string) *actor.PID {
partitionPid, _ := actor.SpawnNamed(actor.FromProducer(newPartitionActor(kind)), "partition-"+kind)
props := actor.FromProducer(newPartitionActor(kind)).WithGuardian(actor.RestartingSupervisorStrategy())
partitionPid, _ := actor.SpawnNamed(props, "partition-"+kind)
return partitionPid
}

Expand Down
2 changes: 1 addition & 1 deletion cluster/pid_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func setupPidCache() {
reverseCache: cmap.New(),
}

props := actor.FromProducer(newPidCacheWatcher())
props := actor.FromProducer(newPidCacheWatcher()).WithGuardian(actor.RestartingSupervisorStrategy())
pidCache.watcher, _ = actor.SpawnNamed(props, "PidCacheWatcher")

pidCache.memberStatusSub = eventstream.Subscribe(pidCache.onMemberStatusEvent).
Expand Down
3 changes: 2 additions & 1 deletion remote/activator_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ var (
)

func spawnActivatorActor() {
activatorPid, _ = actor.SpawnNamed(actor.FromProducer(newActivatorActor()), "activator")
props := actor.FromProducer(newActivatorActor()).WithGuardian(actor.RestartingSupervisorStrategy())
activatorPid, _ = actor.SpawnNamed(props, "activator")
}

func stopActivatorActor() {
Expand Down
3 changes: 2 additions & 1 deletion remote/endpoint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ func startEndpointManager(config *remoteConfig) {
plog.Debug("Started EndpointManager")

props := actor.FromProducer(newEndpointSupervisor).
WithGuardian(actor.RestartingSupervisorStrategy()).
WithSupervisor(actor.RestartingSupervisorStrategy()).
WithDispatcher(mailbox.NewSynchronizedDispatcher(300))
endpointSupervisor := actor.Spawn(props)
endpointSupervisor, _ := actor.SpawnNamed(props, "EndpointSupervisor")

endpointManager = &endpointManagerValue{
connections: &sync.Map{},
Expand Down

0 comments on commit 89496f5

Please sign in to comment.