Actor模型是一种这样的编程结构,它对大量独立作业进行建模,以任何顺序进行处理,无需锁同步。如Java中Play!框架。
在本文中,我将描述如何在 golang 中实现一个原始的 Actor 模型。我们将利用 golang 提供的工具进行并发处理——goroutine、通道和等待组。
Actor 有一个任务队列和一个监听任务队列并执行任务的 goroutine。
这里 A 是一个阻塞任务队列并继续执行队列中的任务的 goroutine。Actor 的界面如下所示:
type Actor interface { AddTask(task Task) Start() Stop() }
|
task在actor中执行。它是具有Execute 方法的给定接口的实现。任何可以通过调用 Execute 来执行的东西。
task是我们需要做的工作的业务实现。
actor系统接口:
type ActorSystem interface { Run() SubmitTask(task Task) Shutdown(shutdownWG *sync.WaitGroup) }
|
task使用SubmitTask方法提交给ActorSystem。一个任务分配器将每个任务task分配给一个行为体。每个行动者也有一个小队列,在其中缓冲任务task并逐一执行。
ActorSystem
type ActorSystem struct { name string assigner entities.Actor wg *sync.WaitGroup tracker *tracker.Tracker }
func (system *ActorSystem) Run() { log.Debug("actor system %s started \n", system.name) // start the assigner in seprate go routine go system.assigner.Start() }
func (system *ActorSystem) SubmitTask(task entities.Task) error { // adding submitted task to assigner return system.assigner.AddTask(task) }
func (system *ActorSystem) Shutdown(wg *sync.WaitGroup) { defer wg.Done() system.assigner.Stop() system.wg.Wait() system.tracker.Shutdown() log.Debug("actor system: %s shutdown completed ", system.name) }
|
当ActorSystem启动时,它启动一个taskAssigner actor。通过在这个actor上调用 AddTask 方法,将每个传入Task系统的数据添加到 taskAssigner actor。
task使用SubmitTask方法提交给ActorSystem。我们通过调用AddTask方法将每个收到的任务放到taskAssigner中。
在Shutdown 时,它关闭任务通道,阻止任何新进入的任务,等待所有收到的任务被分配给行为者。然后,它对每个actor调用 "停止",并等待它们完成。
Task Assigner
我们把每个传入的任务放在一个通道tasks中,taskAssigner和Task在一个Actor的内部队列中。
type AssignerActor struct { name string closeSig chan bool tasks chan entities.Task assignerIndex int tracker *tracker.Tracker scalar *autoScalar *TaskActorPool *Config }
func (assigner *AssignerActor) AddTask(task entities.Task) error{ if len(assigner.tasks) >= assignerQueueSize { assigner.tracker.GetTrackerChan() <- tracker.CreateCounterTrack(tracker.Task, tracker.Rejected) return errors.New("task queue is full") } // task getting added to assigner actor channel assigner.tasks <- task assigner.tracker.GetTrackerChan() <- tracker.CreateCounterTrack(tracker.Task, tracker.Submitted) return nil }
|
taskAssigner内部处理任务通道,并通过对其调用AddTask,将任务路由到池中的一个任务执行者。
func (assigner *AssignerActor) Start() { poolStarted := make(chan bool) assigner.scalar = GetAutoScaler(assigner, poolStarted) <- poolStarted // will loop forever till tasks channel is closed for task := range assigner.tasks { for { assigner.poolLock.Lock() assigner.assignerIndex = assigner.assignerIndex % len(assigner.pool) actor := assigner.pool[assigner.assignerIndex] assigner.assignerIndex += 1 assigner.poolLock.Unlock() // assigning task to a task actor from pool err := actor.AddTask(task) if err == nil { break } } } assigner.closeSig <- true }
|
autoScalar持续关注任务中的项目数量,并增加或减少任务actor池的大小。
// auto scalar is part of task assigner actor // It scales task actor pool based on queue len size type autoScalar struct { *AssignerActor lastActorId int closingSig chan bool closedSig chan bool }
func(scalar *autoScalar) run(poolStarted chan bool) { log.Debug("running auto scalar with min actor") // provision starting actors scalar.provisionActors(scalar.Config.MinActor) // waiting for scalar to start task actors poolStarted <- true completed := false // loops till it gets a closing signal from task assigner for !completed { select { case <- scalar.closingSig: completed = true case <-time.After(100 * time.Millisecond): if scalar.QueueSize() > scalar.UpscaleQueueSize && len(scalar.pool) < scalar.MaxActor { scalar.provisionActors(1)
} else if scalar.QueueSize() < scalar.DownscaleQueueSize && len(scalar.pool) > scalar.MinActor { scalar.deprovisionActors(1) } } } // when it comes out, it closes all task actors in pool scalar.deprovisionActors(len(scalar.pool)) log.Debug("scalar exited") scalar.closedSig <- true }
|
Task Actor
它也是一个Actor,它的工作是执行任务,这些任务被添加到它的通道任务中,类似于分配者assigner的Actor。
type TaskActor struct { id int closeSig chan bool wg *sync.WaitGroup tasks chan entities.Task tracker *tracker.Tracker } // add task only if channel has space func (a *TaskActor) AddTask(task entities.Task) error { if len(a.tasks) >= taskQueueSize { return errors.New("filled queue") } // task added to channel a.tasks <- task return nil } func (a *TaskActor) Start() { defer a.wg.Done() a.wg.Add(1) log.Debug("starting actor :%d", a.id) // forever loop on tasks channel till channel is closed for task := range a.tasks{ task.Execute() a.tracker.GetTrackerChan() <- tracker.CreateCounterTrack(tracker.Task, tracker.Completed) } log.Debug("stopped actor :%d", a.id) a.closeSig <- true } func (a *TaskActor) Stop() { // closing task channel close (a.tasks) <- a.closeSig }
|
源码:Github