Go 的并发模型改变了游戏规则,但管理复杂的并发操作可能很棘手。这就是上下文传播和取消的作用所在。这些强大的工具让我们能够构建跨多个 goroutine 甚至网络边界的强大、可取消的操作。
让我们从基础开始。context 包提供了一种跨 API 边界和进程传递截止时间、取消信号和请求范围值的方法。它是控制长时间运行的操作和正常关闭服务的秘诀。
以下是使用上下文进行取消的简单示例:
func longRunningOperation(ctx context.Context) error { for { select { case <-ctx.Done(): return ctx.Err() default: // Do some work } } }
func main() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel()
if err := longRunningOperation(ctx); err != nil { log.Printf("Operation cancelled: %v", err) } }
|
在此示例中,我们创建了一个超时时间为 5 秒的上下文。如果操作未在该时间内完成,则会自动取消。
但上下文不仅仅用于超时。我们可以使用它在多个 goroutine 之间传播取消信号。这对于管理复杂的工作流程非常有用。
考虑一下我们正在构建分布式事务系统的场景。我们可能在单个事务中涉及多个微服务,并且我们需要确保如果任何部分失败,整个事务都会回滚。
以下是我们如何使用上下文来构造它:
func performTransaction(ctx context.Context) error { // Start the transaction tx, err := db.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() // Will be no-op if tx.Commit() is called
// Perform multiple operations if err := operation1(ctx); err != nil { return err } if err := operation2(ctx); err != nil { return err } if err := operation3(ctx); err != nil { return err }
// If we've made it this far, commit the transaction return tx.Commit() }
func operation1(ctx context.Context) error { // Make an HTTP request to another service req, err := http.NewRequestWithContext(ctx, "GET", "http://service1.example.com", nil) if err != nil { return err } resp, err := http.DefaultClient.Do(req) if err != nil { return err } defer resp.Body.Close()
// Process the response... return nil }
|
在此示例中,我们使用上下文在数据库操作和 HTTP 请求之间传播取消。如果上下文在任何时候被取消(由于超时或显式取消),则所有操作都将终止,资源将被清理。
但是,如果我们需要对取消进行更细粒度的控制怎么办?这时,自定义上下文类型就派上用场了。我们可以创建自己的上下文类型,携带特定领域的取消信号。
以下是带有“优先级”值的自定义上下文的示例:
type priorityKey struct{}
func WithPriority(ctx context.Context, priority int) context.Context { return context.WithValue(ctx, priorityKey{}, priority) }
func GetPriority(ctx context.Context) (int, bool) { priority, ok := ctx.Value(priorityKey{}).(int) return priority, ok }
func priorityAwareOperation(ctx context.Context) error { priority, ok := GetPriority(ctx) if !ok { priority = 0 // Default priority }
// Use the priority to make decisions... switch priority { case 1: // High priority operation case 2: // Medium priority operation default: // Low priority operation }
return nil }
|
这种自定义上下文允许我们传播优先级信息以及取消信号,从而让我们更好地控制并发操作。
现在,我们来谈谈优雅关闭。当我们构建长期运行的服务时,正确处理关闭信号至关重要,以确保我们不会留下任何未完成的操作或未清理的资源。
以下是我们如何使用上下文实现正常关机:
func main() { // Create a context that's cancelled when we receive an interrupt signal ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) defer cancel()
// Start our main service loop errChan := make(chan error, 1) go func() { errChan <- runService(ctx) }()
// Wait for either the service to exit or a cancellation signal select { case err := <-errChan: if err != nil { log.Printf("Service exited with error: %v", err) } case <-ctx.Done(): log.Println("Received shutdown signal. Gracefully shutting down...") // Perform any necessary cleanup // Wait for ongoing operations to complete (with a timeout) cleanupCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := performCleanup(cleanupCtx); err != nil { log.Printf("Cleanup error: %v", err) } } }
func runService(ctx context.Context) error { // Run your service here, respecting the context for cancellation for { select { case <-ctx.Done(): return ctx.Err() default: // Do some work } } }
func performCleanup(ctx context.Context) error { // Perform any necessary cleanup operations // This could include closing database connections, flushing buffers, etc. return nil }
|
此设置确保我们的服务在收到中断信号时可以正常关闭,从而有时间清理资源并完成任何正在进行的操作。
Go 上下文系统最强大的一个方面是它能够跨网络边界传播取消操作。这在构建操作可能跨多个服务的分布式系统时特别有用。
让我们看一个如何在微服务架构中实现这一点的例子:
func handleRequest(w http.ResponseWriter, r *http.Request) { // Extract the timeout from the request, defaulting to 10 seconds timeout, _ := time.ParseDuration(r.URL.Query().Get("timeout")) if timeout == 0 { timeout = 10 * time.Second }
// Create a context with the specified timeout ctx, cancel := context.WithTimeout(r.Context(), timeout) defer cancel()
// Make requests to other services, propagating the context results, err := gatherResults(ctx) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return }
// Send the results back to the client json.NewEncoder(w).Encode(results) }
func gatherResults(ctx context.Context) ([]string, error) { var results []string var mu sync.Mutex var wg sync.WaitGroup
for _, url := range []string{"http://service1", "http://service2", "http://service3"} { wg.Add(1) go func(url string) { defer wg.Done() result, err := makeRequest(ctx, url) if err != nil { log.Printf("Error from %s: %v", url, err) return } mu.Lock() results = append(results, result) mu.Unlock() }(url) }
// Wait for all requests to complete or the context to be cancelled done := make(chan struct{}) go func() { wg.Wait() close(done) }()
select { case <-done: return results, nil case <-ctx.Done(): return nil, ctx.Err() } }
func makeRequest(ctx context.Context, url string) (string, error) { req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return "", err } resp, err := http.DefaultClient.Do(req) if err != nil { return "", err } defer resp.Body.Close()
// Read and return the response body body, err := ioutil.ReadAll(resp.Body) if err != nil { return "", err } return string(body), nil }
|
在此示例中,我们根据查询参数创建了一个具有超时限制的上下文。此上下文随后会传播到所有后续 API 调用。如果达到超时限制,则所有正在进行的操作都将被取消,并且我们会向客户端返回错误。
这种模式确保我们不会在客户端放弃等待响应后继续执行任何“失控”操作。这是构建响应迅速、资源高效的分布式系统的关键部分。
并发系统中的错误处理可能很棘手,但上下文也可以提供帮助。通过使用上下文,我们可以确保错误正确传播,并且即使发生错误,资源也会被清理。
下面是一个我们如何在并发操作中处理错误的例子:
func concurrentOperation(ctx context.Context) error { errChan := make(chan error, 1)
go func() { defer close(errChan) if err := riskyOperation(ctx); err != nil { errChan <- err } }()
select { case err := <-errChan: if err != nil { return fmt.Errorf("risky operation failed: %w", err) } case <-ctx.Done(): return ctx.Err() }
return nil }
func riskyOperation(ctx context.Context) error { // Simulate a long-running operation select { case <-time.After(5 * time.Second): // Operation completed successfully return nil case <-ctx.Done(): // The operation was cancelled return ctx.Err() } }
|
在此示例中,我们使用通道将 goroutine 中的错误传回主函数。我们还检查上下文以查看是否取消。这确保我们既能处理操作本身的错误,也能处理上下文中的取消。
上下文的一个经常被忽视的方面是它能够承载请求范围的值。这对于跨 API 边界传播请求 ID、身份验证令牌或其他元数据等内容非常有用。
以下是我们如何使用它的一个例子:
type key int
const ( requestIDKey key = iota userIDKey )
func Middleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Generate a request ID requestID := generateRequestID()
// Add it to the context ctx := context.WithValue(r.Context(), requestIDKey, requestID)
// Call the next handler with the updated context next.ServeHTTP(w, r.WithContext(ctx)) }) }
func Handler(w http.ResponseWriter, r *http.Request) { // Retrieve the request ID from the context requestID, ok := r.Context().Value(requestIDKey).(string) if !ok { requestID = "unknown" }
// Use the request ID in logging log.Printf("[%s] Handling request", requestID)
// ... handle the request ... }
func generateRequestID() string { // Generate a unique request ID return uuid.New().String() }
|
在此示例中,我们使用中间件将请求 ID 添加到上下文中。然后可以检索此请求 ID,并在接收此上下文的任何后续处理程序或函数中使用。
总结一下,值得注意的是,尽管上下文是一个强大的工具,但它并不是万能的。过度使用上下文会导致代码难以理解和维护。明智地使用上下文并谨慎设计 API 非常重要。
请记住,上下文的主要用途应该是跨 API 边界传输截止时间、取消信号和请求范围的值。它并不是向函数传递可选参数的通用机制。
总之,掌握 Go 的并发模型(包括上下文传播和取消)是构建强大、高效且可扩展的应用程序的关键。通过利用这些工具,我们可以创建能够优雅地处理复杂工作流、有效管理资源并智能地响应不断变化的条件的系统。随着我们继续突破并发编程的极限,这些技术将在我们的工具箱中变得更加重要。