在Golang中实现Actor模型的源码 - Gaurav

22-02-25 banq

Actor模型是一种这样的编程结构,它对大量独立作业进行建模,以任何顺序进行处理,无需锁同步。如Java中Play!框架。
在本文中,我将描述如何在 golang 中实现一个原始的 Actor 模型。我们将利用 golang 提供的工具进行并发处理——goroutine、通道和等待组。
Actor 有一个任务队列和一个监听任务队列并执行任务的 goroutine。
这里 A 是一个阻塞任务队列并继续执行队列中的任务的 goroutine。Actor 的界面如下所示:

type Actor interface {
   AddTask(task Task)
   Start()
   Stop()
}


task在actor中执行。它是具有Execute 方法的给定接口的实现。任何可以通过调用 Execute 来执行的东西。
task是我们需要做的工作的业务实现。
 
actor系统接口:

type ActorSystem interface {
   Run()
   SubmitTask(task Task) 
   Shutdown(shutdownWG *sync.WaitGroup)
}

task使用SubmitTask方法提交给ActorSystem。一个任务分配器将每个任务task分配给一个行为体。每个行动者也有一个小队列,在其中缓冲任务task并逐一执行。
 

ActorSystem

type ActorSystem struct {
    name           string
    assigner        entities.Actor
    wg             *sync.WaitGroup
    tracker        *tracker.Tracker
}

func (system *ActorSystem) Run() {
    log.Debug("actor system %s started \n", system.name)
  // start the assigner in seprate go routine
    go system.assigner.Start()
}

func (system *ActorSystem) SubmitTask(task entities.Task) error {
  // adding submitted task to assigner
    return system.assigner.AddTask(task)
}

func (system *ActorSystem) Shutdown(wg *sync.WaitGroup) {
    defer wg.Done()
    system.assigner.Stop()
    system.wg.Wait()
    system.tracker.Shutdown()
    log.Debug("actor system: %s shutdown completed ", system.name)
}


当ActorSystem启动时,它启动一个taskAssigner actor。通过在这个actor上调用 AddTask 方法,将每个传入Task系统的数据添加到 taskAssigner actor。
task使用SubmitTask方法提交给ActorSystem。我们通过调用AddTask方法将每个收到的任务放到taskAssigner中。
在Shutdown 时,它关闭任务通道,阻止任何新进入的任务,等待所有收到的任务被分配给行为者。然后,它对每个actor调用 "停止",并等待它们完成。
 

Task Assigner
我们把每个传入的任务放在一个通道tasks中,taskAssigner和Task在一个Actor的内部队列中。

type AssignerActor struct {
    name     string
    closeSig chan bool
    tasks    chan entities.Task
    assignerIndex int
    tracker *tracker.Tracker
    scalar  *autoScalar
    *TaskActorPool
    *Config
}

func (assigner *AssignerActor) AddTask(task entities.Task) error{
    if len(assigner.tasks) >= assignerQueueSize {
        assigner.tracker.GetTrackerChan() <- tracker.CreateCounterTrack(tracker.Task, tracker.Rejected)
        return errors.New("task queue is full")
    }
    // task getting added to assigner actor channel 
    assigner.tasks <- task
    assigner.tracker.GetTrackerChan() <- tracker.CreateCounterTrack(tracker.Task, tracker.Submitted)
    return nil
}


taskAssigner内部处理任务通道,并通过对其调用AddTask,将任务路由到池中的一个任务执行者。

func (assigner *AssignerActor) Start() {
    poolStarted := make(chan bool)
    assigner.scalar = GetAutoScaler(assigner, poolStarted)
    <- poolStarted
        // will loop forever till tasks channel is closed
    for task := range assigner.tasks {
        for {
            assigner.poolLock.Lock()
            assigner.assignerIndex = assigner.assignerIndex % len(assigner.pool)
            actor := assigner.pool[assigner.assignerIndex]
            assigner.assignerIndex += 1
            assigner.poolLock.Unlock()
            // assigning task to a task actor from pool
            err := actor.AddTask(task)
            if err == nil {
                break
            }
        }
    }
    assigner.closeSig <- true
}


autoScalar持续关注任务中的项目数量,并增加或减少任务actor池的大小。

// auto scalar is part of task assigner actor
// It scales task actor pool based on queue len size
type autoScalar struct {
    *AssignerActor
    lastActorId int
    closingSig  chan bool
    closedSig   chan bool
}

func(scalar *autoScalar) run(poolStarted chan bool) {
    log.Debug("running auto scalar with min actor")
        // provision starting actors
    scalar.provisionActors(scalar.Config.MinActor)
    // waiting for scalar to start task actors
    poolStarted <- true
    completed := false
        // loops till it gets a closing signal from task assigner
    for !completed {
        select {
        case <- scalar.closingSig:
            completed = true
        case <-time.After(100 * time.Millisecond):
            if scalar.QueueSize() > scalar.UpscaleQueueSize && len(scalar.pool) < scalar.MaxActor {
                scalar.provisionActors(1)

            } else if scalar.QueueSize() < scalar.DownscaleQueueSize && len(scalar.pool) > scalar.MinActor {
                scalar.deprovisionActors(1)
            }
        }
    }
  // when it comes out, it closes all task actors in pool
    scalar.deprovisionActors(len(scalar.pool))
    log.Debug("scalar exited")
    scalar.closedSig <- true
}


 

Task Actor
它也是一个Actor,它的工作是执行任务,这些任务被添加到它的通道任务中,类似于分配者assigner的Actor。

type TaskActor struct {
id int
closeSig chan bool
wg *sync.WaitGroup
tasks chan entities.Task
tracker *tracker.Tracker
}
// add task only if channel has space
func (a *TaskActor) AddTask(task entities.Task) error {
if len(a.tasks) >= taskQueueSize {
return errors.New("filled queue")
}
// task added to channel
a.tasks <- task
return nil
}
func (a *TaskActor) Start() {
defer a.wg.Done()
a.wg.Add(1)
log.Debug("starting actor :%d", a.id)
// forever loop on tasks channel till channel is closed
for task := range a.tasks{
task.Execute()
a.tracker.GetTrackerChan() <- tracker.CreateCounterTrack(tracker.Task, tracker.Completed)
}
log.Debug("stopped actor :%d", a.id)
a.closeSig <- true
}
func (a *TaskActor) Stop() {
// closing task channel
close (a.tasks)
<- a.closeSig
}
 
 


源码:Github

 

猜你喜欢