近年来,Go语言因其高效的并发模型而受到广泛关注。Goroutines 和 Channels 是 Go 中构建并发程序的基础。通道可用于在 goroutine 之间安全地传递数据和信号。
很多场景下,我们需要批量读取通道中的数据进行处理。比如批量写入数据到数据库、批量发送消息到Kafka、或者批量写入网络等。这种批量处理方式可以显着提升程序性能。
但是Go通道不提供原生的批量读取方法。那么我们如何高效地从通道读取批量数据呢?本文将介绍一个成熟的解决方案。
使用 chanx 库简化批量读取
为了方便批量读取通道数据,chanx库实现了以下Batch方法:
func Batch [ T any ] (ctx context.Context, ch <- chan T, batchSize int , fn func ([]T) )
|
使用起来非常简单:
- 传入context以进行取消/超时控制
- 传入一个阅读通道ch
- 设置批量大小batchSize
- 传入一个处理函数fn来处理每一批数据
func TestBatch(t *testing.T) { ch := make(chan int, 10) for i := 0; i < 10; i++ { ch <- i }
count := 0 go Batch[int](context.Background(), ch, 5, func(batch []int) { if len(batch) != 5 { assert.Fail(t, "expected batch size 5, got %d", len(batch)) } count += len(batch) }) time.Sleep(time.Second) close(ch) assert.Equal(t, 10, count) }
|
在上面的示例中,我们向通道写入 10 个 int 数据,然后调用 Batch 从通道批量读取数据。我们将批量大小设置为 5,即每次最多从通道中读取 5 个数据。读取的数据将传递给函数 fn 进行处理。
这样,我们就可以轻松地批量读取和处理数据。
FlatBatch 支持批量读取批次数据
chanx 库还提供 FlatBatch 方法,支持从通道批量读取批次数据:
func FlatBatch[T any](ctx context.Context, ch <-chan []T, batchSize int, fn func([]T))
|
FlatBatch 的通道参数为 <-chan[]T,即通道中的每个数据都是一个子切片。它会根据设定的批量大小将读取的子片中的数据展开成一个大片段,然后将其传递给处理函数。
func TestFlatBatch(t *testing.T) { ch := make(chan []int, 10) for i := 0; i < 10; i++ { ch <- []int{i, i} }
count := 0 go FlatBatch[int](context.Background(), ch, 5, func(batch []int) { assert.NotEmpty(t, batch) count += len(batch) }) time.Sleep(time.Second) close(ch) assert.Equal(t, 20, count) }
|
执行
要实现高效的批量读取,有几个关键问题需要考虑:
- 不能无限读取通道,必须设置读取大小,否则会耗尽内存
- 如果通道暂时没有数据,则不能无限等待,这会阻塞读取
- 但也不能太频繁地检查通道,这会导致CPU旋转
初步的实现是使用读缓冲区batch来存储读取的数据。如果缓冲区已满,则调用处理程序;如果通道关闭,还处理缓冲区中的数据。
然而,如果通道长时间没有新数据,这会导致饥饿问题。
var batch = make ([]T, 0 , batchSize) for { select { case <-ctx.Done(): if len (batch) > 0 { fn(batch) } return case v, ok := <-ch: if !ok { // 关闭 fn(batch) return }
batch = append (batch, v) if len (batch) == batchSize { // 完整 fn(batch) batch = make ([]T, 0 , batchSize) / / 重置 } } } }
|
我们可以default在 select 中添加一个分支,用于在没有数据时处理缓冲数据:
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T)) { var batch = make([]T, 0, batchSize) for { select { case <-ctx.Done(): if len(batch) > 0 { fn(batch) } return case v, ok := <-ch: if !ok { // closed fn(batch) return }
batch = append(batch, v) if len(batch) == batchSize { // full fn(batch) batch = make([]T, 0, batchSize) // reset } default: if len(batch) > 0 { fn(batch) batch = make([]T, 0, batchSize) // reset } } } }
|
但这将会造成 CPU spinning打转的问题,因为在没有数据时,默认分支会被频繁调用。
一种改进方法是在默认分支中添加一个 time.After 定时器,以便每隔一段时间处理缓冲数据:
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T)) { var batch = make([]T, 0, batchSize) for { select { case <-ctx.Done(): if len(batch) > 0 { fn(batch) } return case v, ok := <-ch: if !ok { // closed fn(batch) return }
batch = append(batch, v) if len(batch) == batchSize { // full fn(batch) batch = make([]T, 0, batchSize) // reset } case <-time.After(100 * time.Millisecond): if len(batch) > 0 { fn(batch) batch = make([]T, 0, batchSize) // reset } } } }
|
然而,这样做存在以下问题:
- 内存泄漏,因为存在大量计时器
- 定时器 "休眠 "期间未读取通道中的数据
有没有更好的实现方法?答案是肯定的。chanx 库采用了一种巧妙的实现方法:func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T)) { var batch = make([]T, 0, batchSize) for { select { case <-ctx.Done(): if len(batch) > 0 { fn(batch) } return case v, ok := <-ch: if !ok { // closed fn(batch) return }
batch = append(batch, v) if len(batch) == batchSize { // full fn(batch) batch = make([]T, 0, batchSize) // reset } default: if len(batch) > 0 { // partial fn(batch) batch = make([]T, 0, batchSize) // reset } else { // empty // wait for more select { case <-ctx.Done(): if len(batch) > 0 { fn(batch) } return case v, ok := <-ch: if !ok { return }
batch = append(batch, v) }
} } } }
|
默认分支的处理至关重要:
- 如果批次不为空,立即处理批次数据
- 如果批处理数据为空,则阻塞等待通道或 ctx 信号
这种实现方式既能防止饥饿,又能防止 CPU 旋转。它还最大限度地减少了无效等待,降低了读取延迟。总之
通过精心设计的读取逻辑,我们可以高效、低延迟地批量读取通道数据。这种批处理模式可以提高许多 Go 程序的性能和弹性。