Go 并发大师:上下文传播和取消的秘密揭晓

Go 的并发模型改变了游戏规则,但管理复杂的并发操作可能很棘手。这就是上下文传播和取消的作用所在。这些强大的工具让我们能够构建跨多个 goroutine 甚至网络边界的强大、可取消的操作。

让我们从基础开始。context 包提供了一种跨 API 边界和进程传递截止时间、取消信号和请求范围值的方法。它是控制长时间运行的操作和正常关闭服务的秘诀。

以下是使用上下文进行取消的简单示例:

func longRunningOperation(ctx context.Context) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            // Do some work
        }
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    if err := longRunningOperation(ctx); err != nil {
        log.Printf("Operation cancelled: %v", err)
    }
}

在此示例中,我们创建了一个超时时间为 5 秒的上下文。如果操作未在该时间内完成,则会自动取消。

但上下文不仅仅用于超时。我们可以使用它在多个 goroutine 之间传播取消信号。这对于管理复杂的工作流程非常有用。

考虑一下我们正在构建分布式事务系统的场景。我们可能在单个事务中涉及多个微服务,并且我们需要确保如果任何部分失败,整个事务都会回滚。

以下是我们如何使用上下文来构造它:

func performTransaction(ctx context.Context) error {
    // Start the transaction
    tx, err := db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback() // Will be no-op if tx.Commit() is called

    // Perform multiple operations
    if err := operation1(ctx); err != nil {
        return err
    }
    if err := operation2(ctx); err != nil {
        return err
    }
    if err := operation3(ctx); err != nil {
        return err
    }

    // If we've made it this far, commit the transaction
    return tx.Commit()
}

func operation1(ctx context.Context) error {
    // Make an HTTP request to another service
    req, err := http.NewRequestWithContext(ctx, "GET", "http://service1.example.com", nil)
    if err != nil {
        return err
    }
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    // Process the response...
    return nil
}

在此示例中,我们使用上下文在数据库操作和 HTTP 请求之间传播取消。如果上下文在任何时候被取消(由于超时或显式取消),则所有操作都将终止,资源将被清理。

但是,如果我们需要对取消进行更细粒度的控制怎么办?这时,自定义上下文类型就派上用场了。我们可以创建自己的上下文类型,携带特定领域的取消信号。

以下是带有“优先级”值的自定义上下文的示例:

type priorityKey struct{}

func WithPriority(ctx context.Context, priority int) context.Context {
    return context.WithValue(ctx, priorityKey{}, priority)
}

func GetPriority(ctx context.Context) (int, bool) {
    priority, ok := ctx.Value(priorityKey{}).(int)
    return priority, ok
}

func priorityAwareOperation(ctx context.Context) error {
    priority, ok := GetPriority(ctx)
    if !ok {
        priority = 0 // Default priority
    }

    // Use the priority to make decisions...
    switch priority {
    case 1:
        // High priority operation
    case 2:
        // Medium priority operation
    default:
        // Low priority operation
    }

    return nil
}

这种自定义上下文允许我们传播优先级信息以及取消信号,从而让我们更好地控制并发操作。

现在,我们来谈谈优雅关闭。当我们构建长期运行的服务时,正确处理关闭信号至关重要,以确保我们不会留下任何未完成的操作或未清理的资源。

以下是我们如何使用上下文实现正常关机:

func main() {
    // Create a context that's cancelled when we receive an interrupt signal
    ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
    defer cancel()

    // Start our main service loop
    errChan := make(chan error, 1)
    go func() {
        errChan <- runService(ctx)
    }()

    // Wait for either the service to exit or a cancellation signal
    select {
    case err := <-errChan:
        if err != nil {
            log.Printf("Service exited with error: %v", err)
        }
    case <-ctx.Done():
        log.Println("Received shutdown signal. Gracefully shutting down...")
        // Perform any necessary cleanup
        // Wait for ongoing operations to complete (with a timeout)
        cleanupCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
        defer cancel()
        if err := performCleanup(cleanupCtx); err != nil {
            log.Printf("Cleanup error: %v", err)
        }
    }
}

func runService(ctx context.Context) error {
    // Run your service here, respecting the context for cancellation
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            // Do some work
        }
    }
}

func performCleanup(ctx context.Context) error {
    // Perform any necessary cleanup operations
    // This could include closing database connections, flushing buffers, etc.
    return nil
}

此设置确保我们的服务在收到中断信号时可以正常关闭,从而有时间清理资源并完成任何正在进行的操作。

Go 上下文系统最强大的一个方面是它能够跨网络边界传播取消操作。这在构建操作可能跨多个服务的分布式系统时特别有用。

让我们看一个如何在微服务架构中实现这一点的例子:

func handleRequest(w http.ResponseWriter, r *http.Request) {
    // Extract the timeout from the request, defaulting to 10 seconds
    timeout, _ := time.ParseDuration(r.URL.Query().Get("timeout"))
    if timeout == 0 {
        timeout = 10 * time.Second
    }

    // Create a context with the specified timeout
    ctx, cancel := context.WithTimeout(r.Context(), timeout)
    defer cancel()

    // Make requests to other services, propagating the context
    results, err := gatherResults(ctx)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    // Send the results back to the client
    json.NewEncoder(w).Encode(results)
}

func gatherResults(ctx context.Context) ([]string, error) {
    var results []string
    var mu sync.Mutex
    var wg sync.WaitGroup

    for _, url := range []string{"http://service1", "http://service2", "http://service3"} {
        wg.Add(1)
        go func(url string) {
            defer wg.Done()
            result, err := makeRequest(ctx, url)
            if err != nil {
                log.Printf("Error from %s: %v", url, err)
                return
            }
            mu.Lock()
            results = append(results, result)
            mu.Unlock()
        }(url)
    }

    // Wait for all requests to complete or the context to be cancelled
    done := make(chan struct{})
    go func() {
        wg.Wait()
        close(done)
    }()

    select {
    case <-done:
        return results, nil
    case <-ctx.Done():
        return nil, ctx.Err()
    }
}

func makeRequest(ctx context.Context, url string) (string, error) {
    req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
    if err != nil {
        return "", err
    }
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return "", err
    }
    defer resp.Body.Close()

    // Read and return the response body
    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return "", err
    }
    return string(body), nil
}

在此示例中,我们根据查询参数创建了一个具有超时限制的上下文。此上下文随后会传播到所有后续 API 调用。如果达到超时限制,则所有正在进行的操作都将被取消,并且我们会向客户端返回错误。

这种模式确保我们不会在客户端放弃等待响应后继续执行任何“失控”操作。这是构建响应迅速、资源高效的分布式系统的关键部分。

并发系统中的错误处理可能很棘手,但上下文也可以提供帮助。通过使用上下文,我们可以确保错误正确传播,并且即使发生错误,资源也会被清理。

下面是一个我们如何在并发操作中处理错误的例子:

func concurrentOperation(ctx context.Context) error {
    errChan := make(chan error, 1)

    go func() {
        defer close(errChan)
        if err := riskyOperation(ctx); err != nil {
            errChan <- err
        }
    }()

    select {
    case err := <-errChan:
        if err != nil {
            return fmt.Errorf("risky operation failed: %w", err)
        }
    case <-ctx.Done():
        return ctx.Err()
    }

    return nil
}

func riskyOperation(ctx context.Context) error {
    // Simulate a long-running operation
    select {
    case <-time.After(5 * time.Second):
        // Operation completed successfully
        return nil
    case <-ctx.Done():
        // The operation was cancelled
        return ctx.Err()
    }
}

在此示例中,我们使用通道将 goroutine 中的错误传回主函数。我们还检查上下文以查看是否取消。这确保我们既能处理操作本身的错误,也能处理上下文中的取消。

上下文的一个经常被忽视的方面是它能够承载请求范围的值。这对于跨 API 边界传播请求 ID、身份验证令牌或其他元数据等内容非常有用。

以下是我们如何使用它的一个例子:

type key int

const (
    requestIDKey key = iota
    userIDKey
)

func Middleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        // Generate a request ID
        requestID := generateRequestID()

        // Add it to the context
        ctx := context.WithValue(r.Context(), requestIDKey, requestID)

        // Call the next handler with the updated context
        next.ServeHTTP(w, r.WithContext(ctx))
    })
}

func Handler(w http.ResponseWriter, r *http.Request) {
    // Retrieve the request ID from the context
    requestID, ok := r.Context().Value(requestIDKey).(string)
    if !ok {
        requestID = "unknown"
    }

    // Use the request ID in logging
    log.Printf("[%s] Handling request", requestID)

    // ... handle the request ...
}

func generateRequestID() string {
    // Generate a unique request ID
    return uuid.New().String()
}

在此示例中,我们使用中间件将请求 ID 添加到上下文中。然后可以检索此请求 ID,并在接收此上下文的任何后续处理程序或函数中使用。

总结一下,值得注意的是,尽管上下文是一个强大的工具,但它并不是万能的。过度使用上下文会导致代码难以理解和维护。明智地使用上下文并谨慎设计 API 非常重要。

请记住,上下文的主要用途应该是跨 API 边界传输截止时间、取消信号和请求范围的值。它并不是向函数传递可选参数的通用机制。

总之,掌握 Go 的并发模型(包括上下文传播和取消)是构建强大、高效且可扩展的应用程序的关键。通过利用这些工具,我们可以创建能够优雅地处理复杂工作流、有效管理资源并智能地响应不断变化的条件的系统。随着我们继续突破并发编程的极限,这些技术将在我们的工具箱中变得更加重要。