conc:更简单编写Golang并发编程


对于 Go,并发性已经是一等公民,因此将并发代码添加到 Go 项目通常是一个很低的门槛。但是正确地做到这一点仍然很困难,而且我在并发 Go 代码中看到(并犯下)了很多错误,例如:

  • 没有正确清理资源。
  • 造成死锁。
  • 由于单个 goroutine 中的 panic 而导致整个程序崩溃。

特别是,很难编写出在出现 panic 时仍能合理运行的并发代码。
我们不希望在派生的 goroutine 中发生panic时整个进程崩溃,我们希望避免死锁或泄漏的 goroutines 等可能由panic触发的其他问题。
Go 没有提供一种简单的方法来本地执行此操作。

所以构建conc这个库,它使编写并发代码更加优雅并减少了样板代码的数量。conc下面的代码显示了使用Go 标准库代替时可以减少多少样板文件。

标准:

type propagatedPanic struct {
    val   any
    stack []byte
}

func main() {
    done := make(chan *propagatedPanic)
    go func() {
        defer func() {
            if v := recover(); v != nil {
                done <- &propagatedPanic{
                    val:   v,
                    stack: debug.Stack(),
                }
            } else {
                done <- nil
            }
        }()
        doSomethingThatMightPanic()
    }()
    if val := <-done; val != nil {
        panic(val)
    }
}

使用conc:

func main() {
    var wg conc.WaitGroup
    wg.Go(doSomethingThatMightPanic)
    // panics with a nice stacktrace
    wg.Wait()
}

使用 conc 进行并行流处理
在 Sourcegraph,我们对有序流进行了大量并行处理。在搜索大量代码时,我们通常会得到要进行后处理的结果流。流中的每个结果都可能需要网络请求,例如,查找存储库的权限或获取搜索结果的完整文件内容。
为此,我们始终希望:

  • 并行执行网络请求。
  • 尽快将结果显示给用户。
  • 保持流的顺序(因为我们已经对结果进行了排序)。

很难同时获得所有这三个权利,因此我在编写concStream 包时的目标之一是尽可能多地抽象出该工作流的复杂性。
现在我可以使用类似于下面示例的代码一次获取多个文件的内容。这样可以高效安全地获取每个文件的内容,同时仍然保持流的原始顺序。

func streamFileContents(ctx context.Context, fileNames <-chan string, fileContents chan<- string) {
    s := stream.New()
    for fileName := range fileNames {
        fileName := fileName
        s.Go(func() stream.Callback {
            contents := fetchFileContents(ctx, fileName)
            return func() { fileContents <- contents }
        })
    }
    s.Wait()
}

conc 的目标
conc希望用更好的方法来处理panic,避免泄漏 Goroutines,或者只是拥有更易读的并发代码。