该文是Malwarebytes首席架构师介绍其希望如何使用Go语言实现每分钟处理100万个请求。
其主要职责是加强系统基础架构,以支持每天数百万人使用,其本人已经在反病毒和反恶意软件领域工作12年,他深深知道这些系统的复杂原因最终是由于每天处理大量数据。
其过去九年后端经历大部分使用Ruby On Rails,虽然他相信RoR是一个令人惊奇的环境,但是当你以Ruby方式开始思考和设计系统一段时间以后,在你需要考虑多线程 并行和快速执行以及小内存消耗的情况下,你会忘记原来系统架构的所谓有效与简单。多年来,他也是C/C++, Delphi 和C# ,他开始认识到使用正确工具做事能减少不必要的复杂性。
作为一个首席架构师,他并不执着于语言和框架,而这两点总是网上争论的焦点,他相信效率 产品性和代码维护性大部分依赖于你的架构解决方案有多简单。
问题
目标是能够处理来自数百万端点的POST请求,Web处理器会接受到一个JSON文档,其中包含一个许多数据集合,这些需要写入到Amazon S3,然后Map-reduce系统稍后可以操作分析这些数据。
传统地闯进一个worker-tier工作层架构,如:
1.Sidekiq
2.Resque
3.DelayedJob
4. Elasticbeanstalk Worker Tier
5,RabbitMQ
然后设置两个不同集群,一个是用于Web前端,另外一个是用于worker,这样我们可以扩展很多后端服务器数量以应付大量增长的请求。
当他们发现这是高流量系统时,意识到应该使用Go语言完成,之前其本人已经使用Go语言两年,开发过一些系统,但是没有人确证Go语言能够应付如此大的负载。
他们定义了数据结构,这是通过POST提交获得的请求,然后通过一个方法上传到S3。
type PayloadCollection struct { WindowsVersion string `json:"version"` Token string `json:"token"` Payloads []Payload `json:"data"` }
type Payload struct { // [redacted] }
func (p *Payload) UploadToS3() error { // the storageFolder method ensures that there are no name collision in // case we get same timestamp in the key name storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())
bucket := S3Bucket
b := new(bytes.Buffer) encodeErr := json.NewEncoder(b).Encode(payload) if encodeErr != nil { return encodeErr }
// Everything we post to the S3 bucket should be marked 'private' var acl = s3.Private var contentType = "application/octet-stream"
return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{}) }
|
起初他们试图并行化这种接受请求然后转发上传的处理工作,将这些处理过程放入一个简单的goroutine(Go语言并行协程)
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return }
// Read the body into a string for json decoding var content = &PayloadCollection{} err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return }
// Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { go payload.UploadToS3() // <----- DON'T DO THIS }
w.WriteHeader(http.StatusOK) }
|
在负载稳定情况下,这种方案能够大部分工作得很好,但是在大规模访问下,却工作得不怎么样。当每分钟达到100万POST请求时,这段代码崩溃了。他们引入了buffered channel缓冲通道,将一些工作放入队列。
var Queue chan Payload
func init() { Queue = make(chan Payload, MAX_QUEUE) }
func payloadHandler(w http.ResponseWriter, r *http.Request) { ... // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { Queue <- payload } ... }
|
在队列另外一端,也就是出列工作中再处理上传到S3的工作,如下: func StartProcessor() { for { select { case job := <-Queue: job.payload.UploadToS3() // <-- STILL NOT GOOD } } }
|
但是带来问题是这个队列迅速达到其上限,堵塞住了请求处理器,这样就不能向队列中继续放入。这可以通过加入一个倒数计数来避免,但是系统的延迟会恒速上升(系统变慢)。导致这个原因是因为上传到S3这个工作非常耗时,因为是通过网络连接,因此引入Java和C概念中的线程池来处理上传工作,这样使用Channel实现Queue+Worker的概念.
var ( MaxWorker = os.Getenv("MAX_WORKERS") MaxQueue = os.Getenv("MAX_QUEUE") )
// Job represents the job to be run type Job struct { Payload Payload }
// A buffered channel that we can send work requests on. var JobQueue chan Job
// A pool of workers that are instantianted to perform the work var WorkerPool chan chan Job
// Worker represents the worker that executes the job type Worker struct { ID int JobChannel chan Job WorkerPool chan chan Job QuitChan chan bool }
func NewWorker(id int, workerPool chan chan Job) Worker { worker := Worker{ ID: id, Work: make(chan Job), WorkerPool: workerPool, QuitChan: make(chan bool)}
return worker }
// Start method starts the run loop for the worker, listening for a quit channel in // case we need to stop it func (w Worker) Start() { go func() { for { // register the current worker into the worker queue. w.WorkerPool <- w.JobChannel
select { case job := <-w.JobChannel: // we have received a work request. if err := job.Payload.UploadToS3(); err != nil { log.Errorf("Error uploading to S3: %s", err.Error()) }
case <-w.QuitChan: // we have received a signal to stop return } } }() }
// Stop signals the worker to stop listening for work requests. func (w Worker) Stop() { go func() { w.QuitChan <- true }() }
|
在队列放入的一端代码改为如下:
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return }
// Read the body into a string for json decoding var content = &PayloadCollection{} err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return }
// Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads {
// let's create a job with the payload work := Job{Payload: payload}
// Push the work onto the queue. JobQueue <- work }
w.WriteHeader(http.StatusOK) }
|
通过StartDispatcher创建worker池,然后开始监听JobQueue中的job:
func StartDispatcher(maxWorkers int) {
WorkerPool = make(chan chan Job, maxWorkers)
// starting n number of workers for i := 0; i < maxWorkers; i++ { worker := NewWorker(i+1, WorkerPool) worker.Start() }
go func() { for { select { case job := <-JobQueue: // a job request has been received go func(jobChannel chan Job) { // try to obtain a worker that is available. // this will block until a worker is idle worker := <-WorkerPool
// dispatch the job to the worker, dequeuing from // the jobChannel worker <- jobChannel }(job) } } }() }
|
最后终于达到了每分钟处理100万个请求,更多测试结果见原文:
Handling 1 Million Requests per Minute with Go · m