很多场景下,我们需要批量读取通道中的数据进行处理。比如批量写入数据到数据库、批量发送消息到Kafka、或者批量写入网络等。这种批量处理方式可以显着提升程序性能。
但是Go通道不提供原生的批量读取方法。那么我们如何高效地从通道读取批量数据呢?本文将介绍一个成熟的解决方案。
使用 chanx 库简化批量读取
为了方便批量读取通道数据,chanx库实现了以下Batch方法:
func Batch [ T any ] (ctx context.Context, ch <- chan T, batchSize int , fn func ([]T) ) |
使用起来非常简单:
- 传入context以进行取消/超时控制
- 传入一个阅读通道ch
- 设置批量大小batchSize
- 传入一个处理函数fn来处理每一批数据
func TestBatch(t *testing.T) { |
在上面的示例中,我们向通道写入 10 个 int 数据,然后调用 Batch 从通道批量读取数据。我们将批量大小设置为 5,即每次最多从通道中读取 5 个数据。读取的数据将传递给函数 fn 进行处理。
这样,我们就可以轻松地批量读取和处理数据。
FlatBatch 支持批量读取批次数据
chanx 库还提供 FlatBatch 方法,支持从通道批量读取批次数据:
code]func FlatBatch[T any)[/code]
FlatBatch 的通道参数为 <-chan[]T,即通道中的每个数据都是一个子切片。它会根据设定的批量大小将读取的子片中的数据展开成一个大片段,然后将其传递给处理函数。
func TestFlatBatch(t *testing.T) { |
执行
要实现高效的批量读取,有几个关键问题需要考虑:
- 不能无限读取通道,必须设置读取大小,否则会耗尽内存
- 如果通道暂时没有数据,则不能无限等待,这会阻塞读取
- 但也不能太频繁地检查通道,这会导致CPU旋转
初步的实现是使用读缓冲区batch来存储读取的数据。如果缓冲区已满,则调用处理程序;如果通道关闭,还处理缓冲区中的数据。
然而,如果通道长时间没有新数据,这会导致饥饿问题。
var batch = make ([]T, 0 , batchSize) |
我们可以default在 select 中添加一个分支,用于在没有数据时处理缓冲数据:
code]func Batch[T any) {
var batch = make([]T, 0, batchSize)
for {
select {
case <-ctx.Done():
if len(batch) > 0 {
fn(batch)
}
return
case v, ok := <-ch:
if !ok { // closed
fn(batch)
return
}
batch = append(batch, v)
if len(batch) == batchSize { // full
fn(batch)
batch = make([]T, 0, batchSize) // reset
}
default:
if len(batch) > 0 {
fn(batch)
batch = make([]T, 0, batchSize) // reset
}
}
}
}[/code]
但这将会造成 CPU spinning打转的问题,因为在没有数据时,默认分支会被频繁调用。
一种改进方法是在默认分支中添加一个 time.After 定时器,以便每隔一段时间处理缓冲数据:
code]func Batch[T any) {
var batch = make([]T, 0, batchSize)
for {
select {
case <-ctx.Done():
if len(batch) > 0 {
fn(batch)
}
return
case v, ok := <-ch:
if !ok { // closed
fn(batch)
return
}
batch = append(batch, v)
if len(batch) == batchSize { // full
fn(batch)
batch = make([]T, 0, batchSize) // reset
}
case <-time.After(100 * time.Millisecond):
if len(batch) > 0 {
fn(batch)
batch = make([]T, 0, batchSize) // reset
}
}
}
}[/code]
然而,这样做存在以下问题:
- 内存泄漏,因为存在大量计时器
- 定时器 "休眠 "期间未读取通道中的数据
code]func Batch[T any) {
var batch = make([]T, 0, batchSize)
for {
select {
case <-ctx.Done():
if len(batch) > 0 {
fn(batch)
}
return
case v, ok := <-ch:
if !ok { // closed
fn(batch)
return
}
batch = append(batch, v)
if len(batch) == batchSize { // full
fn(batch)
batch = make([]T, 0, batchSize) // reset
}
default:
if len(batch) > 0 { // partial
fn(batch)
batch = make([]T, 0, batchSize) // reset
} else { // empty
// wait for more
select {
case <-ctx.Done():
if len(batch) > 0 {
fn(batch)
}
return
case v, ok := <-ch:
if !ok {
return
}
batch = append(batch, v)
}
}
}
}
}[/code]
默认分支的处理至关重要:
- 如果批次不为空,立即处理批次数据
- 如果批处理数据为空,则阻塞等待通道或 ctx 信号
总之
通过精心设计的读取逻辑,我们可以高效、低延迟地批量读取通道数据。这种批处理模式可以提高许多 Go 程序的性能和弹性。