使用Go语言每分钟处理一百万个请求

15-07-07 banq

该文是Malwarebytes首席架构师介绍其希望如何使用Go语言实现每分钟处理100万个请求。

其主要职责是加强系统基础架构,以支持每天数百万人使用,其本人已经在反病毒和反恶意软件领域工作12年,他深深知道这些系统的复杂原因最终是由于每天处理大量数据。

其过去九年后端经历大部分使用Ruby On Rails,虽然他相信RoR是一个令人惊奇的环境,但是当你以Ruby方式开始思考和设计系统一段时间以后,在你需要考虑多线程 并行和快速执行以及小内存消耗的情况下,你会忘记原来系统架构的所谓有效与简单。多年来,他也是C/C++, Delphi 和C# ,他开始认识到使用正确工具做事能减少不必要的复杂性。

作为一个首席架构师,他并不执着于语言和框架,而这两点总是网上争论的焦点,他相信效率 产品性和代码维护性大部分依赖于你的架构解决方案有多简单。

问题

目标是能够处理来自数百万端点的POST请求,Web处理器会接受到一个JSON文档,其中包含一个许多数据集合,这些需要写入到Amazon S3,然后Map-reduce系统稍后可以操作分析这些数据。

传统地闯进一个worker-tier工作层架构,如:

1.Sidekiq

2.Resque

3.DelayedJob

4. Elasticbeanstalk Worker Tier

5,RabbitMQ

然后设置两个不同集群,一个是用于Web前端,另外一个是用于worker,这样我们可以扩展很多后端服务器数量以应付大量增长的请求。

当他们发现这是高流量系统时,意识到应该使用Go语言完成,之前其本人已经使用Go语言两年,开发过一些系统,但是没有人确证Go语言能够应付如此大的负载。

他们定义了数据结构,这是通过POST提交获得的请求,然后通过一个方法上传到S3。

type PayloadCollection struct {
	WindowsVersion  string    `json:"version"`
	Token           string    `json:"token"`
	Payloads        []Payload `json:"data"`
}

type Payload struct {
    // [redacted]
}

func (p *Payload) UploadToS3() error {
    // the storageFolder method ensures that there are no name collision in
    // case we get same timestamp in the key name
    storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

	bucket := S3Bucket

	b := new(bytes.Buffer)
	encodeErr := json.NewEncoder(b).Encode(payload)
	if encodeErr != nil {
		return encodeErr
	}

    // Everything we post to the S3 bucket should be marked 'private'
    var acl = s3.Private
	var contentType = "application/octet-stream"

	return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
<p>

起初他们试图并行化这种接受请求然后转发上传的处理工作,将这些处理过程放入一个简单的goroutine(Go语言并行协程)

func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
		w.WriteHeader(http.StatusMethodNotAllowed)
		return
	}

    // Read the body into a string for json decoding
	var content = &PayloadCollection{}
	err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
		w.Header().Set("Content-Type", "application/json; charset=UTF-8")
		w.WriteHeader(http.StatusBadRequest)
		return
	}

    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {
        go payload.UploadToS3()   // <----- DON'T DO THIS
    }

    w.WriteHeader(http.StatusOK)
}
<p>

在负载稳定情况下,这种方案能够大部分工作得很好,但是在大规模访问下,却工作得不怎么样。当每分钟达到100万POST请求时,这段代码崩溃了。

他们引入了buffered channel缓冲通道,将一些工作放入队列。

var Queue chan Payload

func init() {
    Queue = make(chan Payload, MAX_QUEUE)
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
    ...
    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {
        Queue <- payload
    }
    ...
}
<p>

在队列另外一端,也就是出列工作中再处理上传到S3的工作,如下:

func StartProcessor() {
    for {
        select {
        case job := <-Queue:
            job.payload.UploadToS3()  // <-- STILL NOT GOOD
        }
    }
}
<p>

但是带来问题是这个队列迅速达到其上限,堵塞住了请求处理器,这样就不能向队列中继续放入。这可以通过加入一个倒数计数来避免,但是系统的延迟会恒速上升(系统变慢)。

导致这个原因是因为上传到S3这个工作非常耗时,因为是通过网络连接,因此引入Java和C概念中的线程池来处理上传工作,这样使用Channel实现Queue+Worker的概念.

var (
	MaxWorker = os.Getenv("MAX_WORKERS")
	MaxQueue  = os.Getenv("MAX_QUEUE")
)

// Job represents the job to be run
type Job struct {
	Payload Payload
}

// A buffered channel that we can send work requests on.
var JobQueue chan Job

// A pool of workers that are instantianted to perform the work
var WorkerPool chan chan Job

// Worker represents the worker that executes the job
type Worker struct {
	ID          int
	JobChannel  chan Job
	WorkerPool  chan chan Job
	QuitChan    chan bool
}

func NewWorker(id int, workerPool chan chan Job) Worker {
	worker := Worker{
		ID:          id,
		Work:        make(chan Job),
		WorkerPool:  workerPool,
		QuitChan:    make(chan bool)}

	return worker
}

// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
	go func() {
		for {
			// register the current worker into the worker queue.
			w.WorkerPool <- w.JobChannel

			select {
			case job := <-w.JobChannel:
				// we have received a work request.
				if err := job.Payload.UploadToS3(); err != nil {
					log.Errorf("Error uploading to S3: %s", err.Error())
				}

			case <-w.QuitChan:
				// we have received a signal to stop
				return
			}
		}
	}()
}

// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
	go func() {
		w.QuitChan <- true
	}()
}
<p>

在队列放入的一端代码改为如下:

func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
		w.WriteHeader(http.StatusMethodNotAllowed)
		return
	}

    // Read the body into a string for json decoding
	var content = &PayloadCollection{}
	err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
		w.Header().Set("Content-Type", "application/json; charset=UTF-8")
		w.WriteHeader(http.StatusBadRequest)
		return
	}

    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {

        // let's create a job with the payload
        work := Job{Payload: payload}

        // Push the work onto the queue.
        JobQueue <- work
    }

    w.WriteHeader(http.StatusOK)
}
<p>

通过StartDispatcher创建worker池,然后开始监听JobQueue中的job:

func StartDispatcher(maxWorkers int) {

	WorkerPool = make(chan chan Job, maxWorkers)

    // starting n number of workers
	for i := 0; i < maxWorkers; i++ {
		worker := NewWorker(i+1, WorkerPool)
		worker.Start()
	}

	go func() {
		for {
			select {
			case job := <-JobQueue:
                // a job request has been received
				go func(jobChannel chan Job) {
                    // try to obtain a worker that is available.
                    // this will block until a worker is idle
					worker := <-WorkerPool

                    // dispatch the job to the worker, dequeuing from
                    // the jobChannel
					worker <- jobChannel
				}(job)
			}
		}
	}()
}
<p>

最后终于达到了每分钟处理100万个请求,更多测试结果见原文:

Handling 1 Million Requests per Minute with Go · m

         

4
lostalien
2015-07-16 09:58

扛不住春运和双十一的系统不是好系统。。。。

他这个行么。。。?

tecentID7752D
2015-09-29 10:28

Go语言能这么彪悍?

wilsonp
2015-10-20 10:46

看看七牛就知道golang还是很不错的

猜你喜欢