Go 1.5的并发特性与案例

Go语言最有用的特性是将并发作为第一支持的语言,使用协程goroutine, 非常容易实现代码的并发,这使得Go成为网络类应用的重要选择,本文以银行转账为例,阐述了Go 1.5新版本中如何使用协程实现并发。该文还指出了在Go 1.5版本之间所有协程只是运行在单个进程,并不支持多核CPU并行计算,1.5以后提升到支持多核。

Golang Security and Concurrency


下面代码是一段协程的实现:


func hello() {
println("Hello!")
}

// ---

func main() {

testchan := make(chan string)

go hello()

go func(chan string) {
println(<-testchan)
}(testchan)

testchan <-
"world"
}

协程是使用"go"这个关键词,可以将其作为独立函数或匿名函数看待,这个函数是非堵塞的,因为协程会灵活进行调度。我们也会使用Channel通道,允许协程彼此传输变量,类似队列管道,这就轻松解决了协程之间通讯的问题,当有东西发送到通道中,channel会堵塞住一直等到读操作发生,因此,这种方式是不会有丢失消息的风险。

在Go 1.5以前版本,所有协程默认都是运行在单进程(类似node.js),这意味着只是并发但不是并行,因为一次只有一个协程在运行,内部调度器对它们进行调度以确保所有协程都能够运行。

下面的代码模拟了单进程方式:


testchan := make(chan int)

finite_func := func() {
testchan <- 1
}

infinite_func := func() {
for {}
testchan <- 1
}

go finite_func()
go infinite_func()

println(<-testchan)


第一个协程会立即返回通道中的1数值,而第二个无限循环,因为单线程原因会导致程序一直挂住等待两个协程先后完成,而如果使用两个进程,这个程序会在第一个协程返回结果时立即就退出了。

上面演示了协程的基本知识,下面我们看看竞争条件,使用简单在线银行转账案例,每次发一个请求会导致从A账户转账钞票到B账户,银行需要转移现金并输出新的账户余额:


type User struct {
Cash int
}

func (u *User) sendCash(to *User, amount int) bool {
if u.Cash < amount {
return false
}

/* Delay to demonstrate the race condition */
time.Sleep(500 * time.Millisecond)

u.Cash = u.Cash - amount
to.Cash = to.Cash + amount
return true
}

func main() {
me := User{Cash: 500}
you := User{Cash: 500}

http.HandleFunc(
"/", func(w http.ResponseWriter, r *http.Request) {
me.sendCash(&you, 50)
fmt.Fprintf(w,
"I have $%d\n", me.Cash)
fmt.Fprintf(w,
"You have $%d\n", you.Cash)
fmt.Fprintf(w,
"Total transferred: $%d\n", (you.Cash - 500))
})

http.ListenAndServe(
":8080", nil)
}

这是一个通用的Go Web应用,定义User数据结构,sendCash是在两个User之间转账的服务,这里使用的是net/http 包,我们创建了一个简单的Http服务器,然后将请求路由到转账50元的sendCash方法,在正常操作下,代码会如我们预料一样运行,每次转移50美金,一旦一个用户的账户余额达到0美金,就不能再进行转出钞票了,因为没有钱了,但是,如果我们很快地发送很多请求,这个程序会继续转出很多钱,导致账户余额为负数。

这是课本上经常谈到的竞争情况race condition,在这个代码中,账户余额的检查是与从账户中取钱操作分离的,我们假想一下,如果一个请求刚刚完成账户余额检查,但是还没有取钱,也就是没有减少账户余额数值;而另外一个请求线程同时也检查账户余额,发现账户余额还没有剩为零(结果两个请求都一起取钱,导致账户余额为负数),这是典型的"check-then-act"竞争情况。这是很普遍存在的并发bug。

那么我们如何解决呢?我们肯定不能移除检查操作,而是确保检查和取钱两个动作之间没有任何其他操作发生,其他语言是使用锁,当账户进行更新时,锁住禁止同时有其他线程操作,确保一次只有一个进程操作,也就是排斥锁Mutex。

使用Go语言也能实现锁操作,如下:


type User struct {
Cash int
}

var transferLock *sync.Mutex

func (u *User) sendCash(to *User, amount int) bool {
transferLock.Lock()

/* Defer runs this function whenever sendCash exits */
defer transferLock.Unlock()

if u.Cash < amount {
return false
}

/* Delay to demonstrate the race condition */
time.Sleep(500 * time.Millisecond)

u.Cash = u.Cash - amount
to.Cash = to.Cash + amount
return true
}

func main() {
transferLock = &sync.Mutex{}

me := User{Cash: 500}
you := User{Cash: 500}

http.HandleFunc(
"/", func(w http.ResponseWriter, r *http.Request) {
me.sendCash(&you, 50)
fmt.Fprintf(w,
"I have $%d\n", me.Cash)
fmt.Fprintf(w,
"You have $%d\n", you.Cash)
fmt.Fprintf(w,
"Total transferred: $%d\n", (you.Cash - 500))
})

http.ListenAndServe(
":8080", nil)
}

但是缩的问题很显然降低了并发性能,是并发设计的最大敌人,在Go中推荐使用通道Channel,我们能够使用事件循环event loop机制更灵活地实现并发,我们委托一个后台协程监听通道,当通道中有数据时,立即进行转账操作,因为协程是顺序地读取通道中的数据,也就是巧妙地回避了竞争情况,没有必要使用任何状态变量防止并发竞争了。


type User struct {
Cash int
}

type Transfer struct {
Sender *User
Recipient *User
Amount int
}

func sendCashHandler (transferchan chan Transfer) {
var val Transfer
for {
val = <-transferchan
val.Sender.sendCash(val.Recipient, val.Amount)
}
}

/* sendCash is the same */

func main() {

me := User{Cash: 500}
you := User{Cash: 500}

transferchan := make(chan Transfer)
go sendCashHandler(transferchan)

http.HandleFunc(
"/", func(w http.ResponseWriter, r *http.Request) {
transfer := Transfer{Sender: &me, Recipient: &you, Amount: 50}
transferchan <- transfer
fmt.Fprintf(w,
"I have $%d\n", me.Cash)
fmt.Fprintf(w,
"You have $%d\n", you.Cash)
fmt.Fprintf(w,
"Total transferred: $%d\n", (you.Cash - 500))
})

http.ListenAndServe(
":8080", nil)
}

上面这段代码创建了比较可靠的系统从而避免了并发竞争,但是我们会带来另外一个安全问题:DoS(Denial of Service服务拒绝),如果我们的转账操作慢下来,那么不断进来的请求需要等待进行转账操作的那个协程从通道中读取新数据,但是这个线程忙于照顾转账操作,没有闲功夫读取通道中新数据,这个情况会导致系统容易遭受DoS攻击,外界只要发送大量请求就能让系统停止响应。

一些基础机制比如buffered channel可以处理这种情况,但是buffered channel是有内存上限的,不足够保存所有请求数据,优化解决方案是使用Go杰出的“select”语句:



http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
transfer := Transfer{Sender: &me, Recipient: &you, Amount: 50}

/* Attempt the transfer */
result := make(chan int)

go func(transferchan chan<- Transfer, transfer Transfer, result chan<- int) {
transferchan <- transfer
result <- 1
}(transferchan, transfer, result)

select {
case <-result:
fmt.Fprintf(w,
"I have $%d\n", me.Cash)
fmt.Fprintf(w,
"You have $%d\n", you.Cash)
fmt.Fprintf(w,
"Total transferred: $%d\n", (you.Cash - 500))
case <-time.After(time.Second * 10):
fmt.Fprintf(w,
"Your request has been received, but is processing slowly")
}
})

这里提升了事件循环,等待不能超过10秒,等待超过timeout时间,会返回一个消息给User告诉它们请求已经接受,可能会花点时间处理,请耐心等候即可,使用这种方法我们降低了DoS攻击可能,一个真正健壮的能够并发处理转账且没有使用任何锁的系统诞生了。

有个地方不是很理解,
transferchan <- transfer
之后马上执行
fmt.Fprintf(w, "I have $%d\n", me.Cash)
如何来确保sendCash已经结束呢?

在Go官网,做了代码试验,上述代码在没有完成sendCash之前就会发生fmt.Fprintf。
所有是不是需要加一个compHandler的协程来监听sendCash结束呢?
sendCash结束操作之后向compHandler发送完成信号。
[该贴被xujeon于2015-07-27 11:07修改过]

2015-07-22 15:28 "@xujeon"的内容
如何来确保sendCash已经结束呢 ...

Go语言级别Channel能够保证通道中数据消息不丢失(增大Buffer内存),总是能够被处理,因此,发送者可以认为自己已经提交了转账申请,转账最终肯定会完成,这里应该有最终一致性的概念在里面。


[该贴被banq于2015-07-28 08:39修改过]

恩,最终一致性可以确保该交易的完成。

但是,
fmt.Fprintf(w, "I have $%d\n", me.Cash)
fmt.Fprintf(w, "You have $%d\n", you.Cash)
fmt.Fprintf(w, "Total transferred: $%d\n", (you.Cash - 500))
这三行输入,并不能反映出当前交易后,帐户余额的变化。

帐户余额在之后的处理中才会被修改吧。

2015-07-28 16:49 "@xujeon"的内容
帐户余额在之后的处理中才会被修改吧 ...

是的,显示告诉你已经修改,与后台逻辑(真正修改并不一定同时实时进行,这种现象很常见,比如现实中两个银行跨行转账不一定实时到帐,一般是两个小时内到帐,越是实时,转账费用越高,

参考:最终一致性在现实世界中到处存在

在意大利银行之间转帐还是要花费一定的时间,你可以想象装满钞票和警察的运钞车来回忙碌,尽管很多是通过数字化网络进行,但是商业业务协议还是和以前一样,电汇当天结束,次日才真正办理。

银行转帐总是选择一个典型的数据库事务实现,如果两个账户在同一家银行也许是可能实现的,但这种情况并不普遍......

银行提供了各种日志以便钞票不会消失,但是你看到的是系统外部状态,你看到你的账户钱少了,但不知道电汇正在发生。

有时即时一致性实现需要一种创新:Paypal能够在全世界几秒内实现转帐,允许电子商务网站买卖家能够瞬间完成交易。

但是商家在接受Paypal支付时,经常告诉你:我们已经接受处理你的订单,请等待我们的处理结果,而同时他们也许正在他们仓库中寻找那个商品,在线服务并不总是和实际库存是一致的。
[该贴被banq于2015-07-29 10:57修改过]

谢谢答复!
互联网(大型分布式系统)中的确很难做到强一致性,最终一致性是大多数的选择。