使用Go并发帮助我们解决使用 goroutine 时的错误处理问题。
错误处理
错误处理需要与同步编程不同的模式。为了更好地理解这个问题,让我们看一个简单的程序:
package main
import ( "fmt" "time" )
func main() { // 1. done := make(chan interface{}) inputStream := make(chan interface{}) go func() { time.Sleep(time.Second * 6) close(done) }() go seedNumbers(done, inputStream) go modTwo(done, inputStream) <-done } func seedNumbers(done <-chan interface{}, inputStream chan<- interface{}){ // 2. stream := []interface{}{"abc", 1, "2", 3, 4} go func() { for v := range stream{ select { case <-done: return default: inputStream <- v } } }() } func modTwo(done, inputStream <-chan interface{}){ go func() { for { select { case <-done: return case v := <-inputStream: intV, ok := v.(int) // 3. if !ok { fmt.Printf("\n seeded value not of type int: %+v", v) continue } if intV == 0 { fmt.Println("seeded value is zero: cannot mod with zero") continue } // 4. if intV % 2 == 0{ fmt.Printf("\n %d is divisible by two", v) }else{ fmt.Printf("\n %d is not divisible by two", v) } } time.Sleep(time.Second) } }() }
|
解释
- 我们使用在 Go Concurrency 1.3 — Sync Package | Channels & Select 中学习到的模式创建了一个 done 通道,用于向子 go 例程指示程序的终止。此外,在调用 close() 之前,我们在该通道上添加了 6 秒钟的睡眠时间,以指示子 go 例程的终止。
- 我们定义了 seedNumbers() 以向接口提供的通道播种数字。请注意我们是如何将两个值初始化的,如果调用 modTwo() 会导致错误。我们利用在 Go Concurrency 2.1 - Patterns and Idioms | Fundamentals中学习的模式来处理通道。
- 在函数 modTwo() 中,我们有两个 if 块检查错误并将错误打印到 stdout。
- 如果收到一个有效的可整除值,我们将继续检查并打印该值是否能被 2 整除。
够简单吗?在牢记这一计划的同时,也要问自己一些问题。
- 如果我想捕获无效输入流并分别处理它们,会发生什么情况?
- 主程序如何判断是否应该终止或处理错误?
- 如何摆脱 done 和 close(done) 方法,并保证迭代次数与输入次数相等?
让我们看看下面的重构程序。
package main
import ( "fmt" "time" ) type Result struct { Input interface{} DivisibleByTwo bool Error error } func main() { // 1. outputStream := make(chan Result) inputStream := make(chan interface{}) defer close(inputStream) defer close(outputStream) // 2. stream := []interface{}{"abc", 1, 0, 3, 4} go seedNumbers(stream, inputStream) go modTwo(inputStream, outputStream) for i:=0; i < len(stream); i++ { r := <- outputStream // 5. if r.Error != nil{ fmt.Printf("\n input : %+v, err - %s", r.Input, r.Error) }else{ fmt.Printf("\n %d is divisible by two: %t", r.Input.(int), r.DivisibleByTwo) } } } func seedNumbers(rawStream []interface{}, inputStream chan<- interface{}) { go func() { for _, v := range rawStream { // 3. inputStream <- v time.Sleep(time.Second) } }() } func modTwo(inputStream <-chan interface{}, outputStream chan<- Result) { go func() { for v := range inputStream { // 4. r := Result{Input: v} intV, ok := v.(int) if !ok { r.Error = fmt.Errorf("seeded value not of type int: %+v", v) outputStream <- r continue } if intV == 0 { r.Error = fmt.Errorf("seeded value is zero: cannot mod with zero") outputStream <- r continue } if intV%2 == 0 { r.DivisibleByTwo = true } outputStream <- r } }() }
|
解释:
- 我们创建了两个通道,一个用于通过 seedNumbers() 为输入值播种,另一个用于读写 modTwo() 的输出。此外,我们推迟 close(),以便 range 语句知道通道已关闭,不再等待进一步读取或写入。
- 我们在 modTwo() 函数中定义了可能导致错误的接口片段值。
- 我们对 rawStream 进行测距,并将这些值写入 inputStream,以允许 modTwo() 读取并验证其中的值。我们添加了等待时间,以观察程序在 goroutines 停止时的暂停情况。
- 我们将 outputStream 包装成一种类型,以帮助我们将所需的值转发给负责处理输出的程序,或提供处理输出的信息,即 main() 程序。
- 在 outputStream 的范围内,main() 程序将决定如何处理响应并打印相应的信息。
结论
在使用 goroutines 时,将潜在结果与潜在错误联系起来,有助于我们将错误处理与工作 goroutines 区分开来。这反过来又使我们的程序具有可组合性,并使程序员能够轻松调试潜在的问题。