Go 并发 2.2:错误处理模式

使用Go并发帮助我们解决使用 goroutine 时的错误处理问题。

错误处理
错误处理需要与同步编程不同的模式。为了更好地理解这个问题,让我们看一个简单的程序:

package main

import (
 "fmt"
 "time"
)

func main() {
 // 1.
 done := make(chan interface{})
 inputStream := make(chan interface{})
 go func() {
  time.Sleep(time.Second * 6)
  close(done)
 }()
 go seedNumbers(done, inputStream)
 go modTwo(done, inputStream)
 <-done
}
func seedNumbers(done <-chan interface{}, inputStream chan<- interface{}){
 // 2.
 stream := []interface{}{"abc", 1, "2", 3, 4}
 go func() {
  for v := range stream{
   select {
   case <-done:
    return
   default:
    inputStream <- v
   }
  }
 }()
}
func modTwo(done, inputStream <-chan interface{}){
 go func() {
  for {
   select {
   case <-done:
    return
   case  v := <-inputStream:
    intV, ok := v.(int)
    // 3.
    if !ok {
     fmt.Printf("\n seeded value not of type int: %+v", v)
     continue
    }
    if intV == 0 {
     fmt.Println("seeded value is zero: cannot mod with zero")
     continue
    }
    // 4.
    if intV % 2 == 0{
     fmt.Printf("\n %d is divisible by two", v)
    }else{
     fmt.Printf("\n %d is not divisible by two", v)
    }
   }
   time.Sleep(time.Second)
  }
 }()
}

解释

  1. 我们使用在 Go Concurrency 1.3 — Sync Package | Channels & Select 中学习到的模式创建了一个 done 通道,用于向子 go 例程指示程序的终止。此外,在调用 close() 之前,我们在该通道上添加了 6 秒钟的睡眠时间,以指示子 go 例程的终止。
  2. 我们定义了 seedNumbers() 以向接口提供的通道播种数字。请注意我们是如何将两个值初始化的,如果调用 modTwo() 会导致错误。我们利用在 Go Concurrency 2.1 - Patterns and Idioms | Fundamentals中学习的模式来处理通道。
  3. 在函数 modTwo() 中,我们有两个 if 块检查错误并将错误打印到 stdout。
  4. 如果收到一个有效的可整除值,我们将继续检查并打印该值是否能被 2 整除。

够简单吗?在牢记这一计划的同时,也要问自己一些问题。

  • 如果我想捕获无效输入流并分别处理它们,会发生什么情况?
  • 主程序如何判断是否应该终止或处理错误?
  • 如何摆脱 done 和 close(done) 方法,并保证迭代次数与输入次数相等?

让我们看看下面的重构程序。

package main

import (
 "fmt"
 "time"
)
type Result struct {
 Input          interface{}
 DivisibleByTwo bool
 Error          error
}
func main() {
 // 1.
 outputStream := make(chan Result)
 inputStream := make(chan interface{})
 defer close(inputStream)
 defer close(outputStream)
 // 2.
 stream := []interface{}{"abc", 1, 0, 3, 4}
 go seedNumbers(stream, inputStream)
 go modTwo(inputStream, outputStream)
 for i:=0; i < len(stream); i++ {
  r := <- outputStream
  // 5. 
  if r.Error != nil{
   fmt.Printf("\n input : %+v, err - %s", r.Input, r.Error)
  }else{
   fmt.Printf("\n %d is divisible by two: %t", r.Input.(int), r.DivisibleByTwo)
  }
 }
}
func seedNumbers(rawStream []interface{}, inputStream chan<- interface{}) {
 go func() {
  for _, v := range rawStream {
   // 3.
   inputStream <- v
   time.Sleep(time.Second)
  }
 }()
}
func modTwo(inputStream <-chan interface{}, outputStream chan<- Result) {
 go func() {
  for v := range inputStream {
   // 4.
   r := Result{Input: v}
   intV, ok := v.(int)
   if !ok {
    r.Error = fmt.Errorf("seeded value not of type int: %+v", v)
    outputStream <- r
    continue
   }
   if intV == 0 {
    r.Error = fmt.Errorf("seeded value is zero: cannot mod with zero")
    outputStream <- r
    continue
   }
   if intV%2 == 0 {
    r.DivisibleByTwo = true
   }
   outputStream <- r
  }
 }()
}

解释:

  1. 我们创建了两个通道,一个用于通过 seedNumbers() 为输入值播种,另一个用于读写 modTwo() 的输出。此外,我们推迟 close(),以便 range 语句知道通道已关闭,不再等待进一步读取或写入。
  2. 我们在 modTwo() 函数中定义了可能导致错误的接口片段值。
  3. 我们对 rawStream 进行测距,并将这些值写入 inputStream,以允许 modTwo() 读取并验证其中的值。我们添加了等待时间,以观察程序在 goroutines 停止时的暂停情况。
  4. 我们将 outputStream 包装成一种类型,以帮助我们将所需的值转发给负责处理输出的程序,或提供处理输出的信息,即 main() 程序。
  5. 在 outputStream 的范围内,main() 程序将决定如何处理响应并打印相应的信息。

结论
在使用 goroutines 时,将潜在结果与潜在错误联系起来,有助于我们将错误处理与工作 goroutines 区分开来。这反过来又使我们的程序具有可组合性,并使程序员能够轻松调试潜在的问题。