如何高效批量读取Go Channel数据

近年来,Go语言因其高效的并发模型而受到广泛关注。Goroutines 和 Channels 是 Go 中构建并发程序的基础。通道可用于在 goroutine 之间安全地传递数据和信号。

很多场景下,我们需要批量读取通道中的数据进行处理。比如批量写入数据到数据库、批量发送消息到Kafka、或者批量写入网络等。这种批量处理方式可以显着提升程序性能。

但是Go通道不提供原生的批量读取方法。那么我们如何高效地从通道读取批量数据呢?本文将介绍一个成熟的解决方案。

使用 chanx 库简化批量读取
为了方便批量读取通道数据,chanx库实现了以下Batch方法:

func  Batch [ T  any ] (ctx context.Context, ch <- chan T, batchSize int , fn func ([]T) )

使用起来非常简单:

  • 传入context以进行取消/超时控制
  • 传入一个阅读通道ch
  • 设置批量大小batchSize
  • 传入一个处理函数fn来处理每一批数据

func TestBatch(t *testing.T) {
 ch := make(chan int, 10)
 for i := 0; i < 10; i++ {
  ch <- i
 }

 count := 0
 go Batch[int](context.Background(), ch, 5, func(batch []int) {
  if len(batch) != 5 {
   assert.Fail(t, "expected batch size 5, got %d", len(batch))
  }
  count += len(batch)
 })
 time.Sleep(time.Second)
 close(ch)
 assert.Equal(t, 10, count)
}

在上面的示例中,我们向通道写入 10 个 int 数据,然后调用 Batch 从通道批量读取数据。我们将批量大小设置为 5,即每次最多从通道中读取 5 个数据。读取的数据将传递给函数 fn 进行处理。

这样,我们就可以轻松地批量读取和处理数据。

FlatBatch 支持批量读取批次数据
chanx 库还提供 FlatBatch 方法,支持从通道批量读取批次数据:

func FlatBatch[T any](ctx context.Context, ch <-chan []T, batchSize int, fn func([]T))

FlatBatch 的通道参数为 <-chan[]T,即通道中的每个数据都是一个子切片。它会根据设定的批量大小将读取的子片中的数据展开成一个大片段,然后将其传递给处理函数。

func TestFlatBatch(t *testing.T) {
 ch := make(chan []int, 10)
 for i := 0; i < 10; i++ {
  ch <- []int{i, i}
 }

 count := 0
 go FlatBatch[int](context.Background(), ch, 5, func(batch []int) {
  assert.NotEmpty(t, batch)
  count += len(batch)
 })
 time.Sleep(time.Second)
 close(ch)
 assert.Equal(t, 20, count)
}

执行
要实现高效的批量读取,有几个关键问题需要考虑:

  1. 不能无限读取通道,必须设置读取大小,否则会耗尽内存
  2. 如果通道暂时没有数据,则不能无限等待,这会阻塞读取
  3. 但也不能太频繁地检查通道,这会导致CPU旋转

初步的实现是使用读缓冲区batch来存储读取的数据。如果缓冲区已满,则调用处理程序;如果通道关闭,还处理缓冲区中的数据。
然而,如果通道长时间没有新数据,这会导致饥饿问题。

 var batch = make ([]T, 0 , batchSize) 
 for
  select { 
  case <-ctx.Done(): 
   if  len (batch) > 0 { 
    fn(batch) 
   } 
   return 
  case v, ok := <-ch: 
   if !ok { // 关闭
    fn(batch) 
    return
    } 

   batch = append (batch, v) 
   if  len (batch) == batchSize {
// 完整
    fn(batch) 
    batch = make ([]T, 0 , batchSize) / / 重置
   } 
  } } 
}

我们可以default在 select 中添加一个分支,用于在没有数据时处理缓冲数据:

func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T)) {
    var batch = make([]T, 0, batchSize)
    for {
        select {
        case <-ctx.Done():
            if len(batch) > 0 {
                fn(batch)
            }
            return
        case v, ok := <-ch:
            if !ok { // closed
                fn(batch)
                return
            }

            batch = append(batch, v)
            if len(batch) == batchSize {
// full
                fn(batch)
                batch = make([]T, 0, batchSize)
// reset
            }
        default:
            if len(batch) > 0 {
                fn(batch)
                batch = make([]T, 0, batchSize)
// reset
            }
        }
    }
}

但这将会造成 CPU spinning打转的问题,因为在没有数据时,默认分支会被频繁调用。

一种改进方法是在默认分支中添加一个 time.After 定时器,以便每隔一段时间处理缓冲数据:

func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T)) {
    var batch = make([]T, 0, batchSize)
    for {
        select {
        case <-ctx.Done():
            if len(batch) > 0 {
                fn(batch)
            }
            return
        case v, ok := <-ch:
            if !ok { // closed
                fn(batch)
                return
            }

            batch = append(batch, v)
            if len(batch) == batchSize {
// full
                fn(batch)
                batch = make([]T, 0, batchSize)
// reset
            }
        case <-time.After(100 * time.Millisecond):
            if len(batch) > 0 {
                fn(batch)
                batch = make([]T, 0, batchSize)
// reset
            }
        }
    }
}

然而,这样做存在以下问题:

  • 内存泄漏,因为存在大量计时器
  • 定时器 "休眠 "期间未读取通道中的数据

有没有更好的实现方法?答案是肯定的。chanx 库采用了一种巧妙的实现方法:

func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T)) {
 var batch = make([]T, 0, batchSize)
 for {
  select {
  case <-ctx.Done():
   if len(batch) > 0 {
    fn(batch)
   }
   return
  case v, ok := <-ch:
   if !ok { // closed
    fn(batch)
    return
   }

   batch = append(batch, v)
   if len(batch) == batchSize {
// full
    fn(batch)
    batch = make([]T, 0, batchSize)
// reset
   }
  default:
   if len(batch) > 0 {
// partial
    fn(batch)
    batch = make([]T, 0, batchSize)
// reset
   } else {
// empty
   
// wait for more
    select {
    case <-ctx.Done():
     if len(batch) > 0 {
      fn(batch)
     }
     return
    case v, ok := <-ch:
     if !ok {
      return
     }

     batch = append(batch, v)
    }

   }
  }
 }
}

默认分支的处理至关重要:
  • 如果批次不为空,立即处理批次数据
  • 如果批处理数据为空,则阻塞等待通道或 ctx 信号

这种实现方式既能防止饥饿,又能防止 CPU 旋转。它还最大限度地减少了无效等待,降低了读取延迟。

总之
通过精心设计的读取逻辑,我们可以高效、低延迟地批量读取通道数据。这种批处理模式可以提高许多 Go 程序的性能和弹性。