使用Go语言每分钟处理一百万个请求

该文是Malwarebytes首席架构师介绍其希望如何使用Go语言实现每分钟处理100万个请求。

其主要职责是加强系统基础架构,以支持每天数百万人使用,其本人已经在反病毒和反恶意软件领域工作12年,他深深知道这些系统的复杂原因最终是由于每天处理大量数据。

其过去九年后端经历大部分使用Ruby On Rails,虽然他相信RoR是一个令人惊奇的环境,但是当你以Ruby方式开始思考和设计系统一段时间以后,在你需要考虑多线程 并行和快速执行以及小内存消耗的情况下,你会忘记原来系统架构的所谓有效与简单。多年来,他也是C/C++, Delphi 和C# ,他开始认识到使用正确工具做事能减少不必要的复杂性。

作为一个首席架构师,他并不执着于语言和框架,而这两点总是网上争论的焦点,他相信效率 产品性和代码维护性大部分依赖于你的架构解决方案有多简单。

问题
目标是能够处理来自数百万端点的POST请求,Web处理器会接受到一个JSON文档,其中包含一个许多数据集合,这些需要写入到Amazon S3,然后Map-reduce系统稍后可以操作分析这些数据。

传统地闯进一个worker-tier工作层架构,如:

1.Sidekiq
2.Resque
3.DelayedJob
4. Elasticbeanstalk Worker Tier
5,RabbitMQ

然后设置两个不同集群,一个是用于Web前端,另外一个是用于worker,这样我们可以扩展很多后端服务器数量以应付大量增长的请求。

当他们发现这是高流量系统时,意识到应该使用Go语言完成,之前其本人已经使用Go语言两年,开发过一些系统,但是没有人确证Go语言能够应付如此大的负载。

他们定义了数据结构,这是通过POST提交获得的请求,然后通过一个方法上传到S3。


type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:
"token"`
Payloads []Payload `json:
"data"`
}

type Payload struct {
// [redacted]
}

func (p *Payload) UploadToS3() error {
// the storageFolder method ensures that there are no name collision in
// case we get same timestamp in the key name
storage_path := fmt.Sprintf(
"%v/%v", p.storageFolder, time.Now().UnixNano())

bucket := S3Bucket

b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}

// Everything we post to the S3 bucket should be marked 'private'
var acl = s3.Private
var contentType =
"application/octet-stream"

return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}

起初他们试图并行化这种接受请求然后转发上传的处理工作,将这些处理过程放入一个简单的goroutine(Go语言并行协程)


func payloadHandler(w http.ResponseWriter, r *http.Request) {

if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set(
"Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}

// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
go payload.UploadToS3()
// <----- DON'T DO THIS
}

w.WriteHeader(http.StatusOK)
}

在负载稳定情况下,这种方案能够大部分工作得很好,但是在大规模访问下,却工作得不怎么样。当每分钟达到100万POST请求时,这段代码崩溃了。

他们引入了buffered channel缓冲通道,将一些工作放入队列。


var Queue chan Payload

func init() {
Queue = make(chan Payload, MAX_QUEUE)
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
...
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
Queue <- payload
}
...
}

在队列另外一端,也就是出列工作中再处理上传到S3的工作,如下:


func StartProcessor() {
for {
select {
case job := <-Queue:
job.payload.UploadToS3() // <-- STILL NOT GOOD
}
}
}

但是带来问题是这个队列迅速达到其上限,堵塞住了请求处理器,这样就不能向队列中继续放入。这可以通过加入一个倒数计数来避免,但是系统的延迟会恒速上升(系统变慢)。

导致这个原因是因为上传到S3这个工作非常耗时,因为是通过网络连接,因此引入Java和C概念中的线程池来处理上传工作,这样使用Channel实现Queue+Worker的概念.



var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv(
"MAX_QUEUE")
)

// Job represents the job to be run
type Job struct {
Payload Payload
}

// A buffered channel that we can send work requests on.
var JobQueue chan Job

// A pool of workers that are instantianted to perform the work
var WorkerPool chan chan Job

// Worker represents the worker that executes the job
type Worker struct {
ID int
JobChannel chan Job
WorkerPool chan chan Job
QuitChan chan bool
}

func NewWorker(id int, workerPool chan chan Job) Worker {
worker := Worker{
ID: id,
Work: make(chan Job),
WorkerPool: workerPool,
QuitChan: make(chan bool)}

return worker
}

// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel

select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf(
"Error uploading to S3: %s", err.Error())
}

case <-w.QuitChan:
// we have received a signal to stop
return
}
}
}()
}

// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.QuitChan <- true
}()
}

在队列放入的一端代码改为如下:


func payloadHandler(w http.ResponseWriter, r *http.Request) {

if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set(
"Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}

// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {

// let's create a job with the payload
work := Job{Payload: payload}

// Push the work onto the queue.
JobQueue <- work
}

w.WriteHeader(http.StatusOK)
}

通过StartDispatcher创建worker池,然后开始监听JobQueue中的job:


func StartDispatcher(maxWorkers int) {

WorkerPool = make(chan chan Job, maxWorkers)

// starting n number of workers
for i := 0; i < maxWorkers; i++ {
worker := NewWorker(i+1, WorkerPool)
worker.Start()
}

go func() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(jobChannel chan Job) {
// try to obtain a worker that is available.
// this will block until a worker is idle
worker := <-WorkerPool

// dispatch the job to the worker, dequeuing from
// the jobChannel
worker <- jobChannel
}(job)
}
}
}()
}

最后终于达到了每分钟处理100万个请求,更多测试结果见原文:

Handling 1 Million Requests per Minute with Go · m

扛不住春运和双十一的系统不是好系统。。。。

他这个行么。。。?

Go语言能这么彪悍?

看看七牛就知道golang还是很不错的