Rill(名词:小流)是一个用于流式传输、并行处理和管道构建的综合 Go 工具包。它旨在减少样板文件并简化使用,使开发人员能够专注于核心逻辑,而不会因并发的复杂性而陷入困境。
通过通道转换、类型安全、批处理和错误处理实现并发。
设计理念
rill 的核心在于一个简单而强大的概念:在由Try结构封装的包装值通道上进行操作。
- 此类通道可以手动创建,
- 也可以通过FromSlice或FromChan等实用程序创建,
- 然后通过Map、Filter、FlatMap等操作进行转换。
- 最后,当所有处理阶段完成后,可以通过ForEach、ToSlice或通过迭代结果通道来手动消耗数据 。
背景上下文
需要执行任何特殊操作即可使用 Go 并发性。大多数人编写 HTTP 服务,每个请求都已经在单独的 goroutine 中处理。您只需要确保您使用的所有其他库都是线程安全的,并且在大多数情况下,它们都是线程安全的。
当需要实现批处理以减少数据库负载时,高级并发问题就发生了:
- 批量 DB 插入:多个执行程序处理 DB 插入。将记录收集到一个通道,然后处理该通道,分批插入。
- 批处理数据库更新:与插入类似,但用于更新,如 UPDATE users SET last_active_at=NOW() WHERE id IN(?,?,?,?..)
- 批量处理队列信息:收集一批消息,提取 ID,并执行一次数据库查询 WHERE id IN (...) ,然后将消息标记为已处理。
还有更多情况,不仅与 DB 有关。我很快意识到,我不想一遍又一遍地重复同样的代码,所以我做了一个泛型批处理函数。那时候 Go 还没有泛型,所以我的泛型函数实际上是基于反射的。
- 编写基本的并行循环让我了解了 WaitGroups,
- 而与错误处理相关的一些 bug 和 goroutine 泄露则让我了解了 ErrGroup。
有一次,我需要一个可以容纳多达 N 个唯一键的映射。尝试插入 N+1 个键时会阻塞,直到另一个键被删除。为了实现这个目标,我学习了 sync.Condition。
我还需要下载许多巨大的 CSV 文件,每个文件都包含特定日期的交易列表。之后,我需要解析和比较连续几天的 CSV。为了加快速度,我需要同时下载它们。这就是我的 Rill 模块中 Ordered* 函数的诞生过程。
同步包并没有错,在某些情况下,同步/原子是解决问题的最佳方法。
Rill主要特征
- 轻量级:快速且模块化,可以轻松集成到现有项目中
- 易于使用:管理 goroutine、等待组和错误处理的复杂性被抽象化了
- Concurrent:控制所有操作的并发级别
- 批处理:提供了一种简单的方式来批量组织和处理数据
- 错误处理:提供一种结构化的方法来处理并发应用程序中的错误
- 流:以最小的内存占用处理实时数据流或大型数据集
- 顺序保留:提供保留数据原始顺序的功能,同时仍然允许并发处理
- 高效的资源使用:goroutines 的数量和分配不取决于数据大小
- 通用:所有操作都是类型安全的,可以与任何数据类型一起使用
- 函数式编程:基于函数式编程概念,使map、filter、flatMap等操作可用于基于通道的工作流程
用法示例
考虑一个从多个 URL 获取键、从键值数据库批量检索其值并打印它们的应用程序。此示例展示了该库在处理并发任务、错误传播、批处理和数据流方面的优势,同时保持简单性和效率。
func main() { urls := rill.FromSlice([]string{ "https://example.com/file1.txt", "https://example.com/file2.txt", "https://example.com/file3.txt", "https://example.com/file4.txt", }, nil)
// 从每个 URL 获取键值,并将其扁平化为单一流 keys := rill.FlatMap(urls, 3, func(url string) <-chan rill.Try[string] { return streamFileLines(url) })
// 从数据流中排除任何空键 keys = rill.Filter(keys, 3, func(key string) (bool, error) { return key != "", nil })
//将密钥key整理成易于管理的批次,每批 10 个,以便批量操作 keyBatches := rill.Batch(keys, 10, 1*time.Second)
// 从数据库中获取每批键的值 resultBatches := rill.Map(keyBatches, 3, func(keys []string) ([]KV, error) { values, err := kvMultiGet(keys...) if err != nil { return nil, err }
results := make([]KV, len(keys)) for i, key := range keys { results[i] = KV{Key: key, Value: values[i]} }
return results, nil })
//将批次转换回单个项目进行最终处理 results := rill.Unbatch(resultBatches)
//从数据流中排除任何空值 results = rill.Filter(results, 3, func(kv KV) (bool, error) { return kv.Value != "<nil>", nil })
//遍历每个键值对并打印 cnt := 0 err := rill.ForEach(results, 1, func(kv KV) error { fmt.Println(kv.Key, "=>", kv.Value) cnt++ return nil }) if err != nil { fmt.Println("Error:", err) }
fmt.Println("Total keys:", cnt) }
// streamFileLines 可以从 URL 逐行流式传输文件、 func streamFileLines(url string) <-chan rill.Try[string] { // ... }
// kvMultiGet 从键值数据库中进行批量读取、 func kvMultiGet(keys ...string) ([]string, error) { // ... }
|
批处理
批处理是并发处理中的常见模式,尤其是在处理外部服务或数据库时。 Rill 提供了Batch函数,可将项目流组织成指定大小的批次。还可以指定一个超时,之后即使批次未满,也会发出该批次。当输入流缓慢或稀疏时,这对于保持应用程序的反应非常有用。
扇入和扇出
提供了扇入和扇出数据流的机制。扇入是通过合并功能完成的,该功能将多个数据流合并到一个统一的通道中。扇出是通过Split2函数完成的,该函数将单个输入流划分为两个不同的输出通道。这种划分基于鉴别器函数,允许基于数据特征的并行处理路径。
错误处理
错误是使用ForEach处理的,这对于大多数用例都有好处。 ForEach在出现第一个错误时停止处理并返回该错误。如果您需要在管道中间处理错误,和/或在发生错误后继续处理,可以使用Catch函数。
终止和资源泄漏
在 Go 并发应用程序中,如果一个通道没有读者,那么写者就会被卡住,从而导致潜在的程序和内存泄漏。
这个问题也会延伸到基于 Go 通道构建的 rill 管道;如果管道中的任何阶段缺少消费者,上游的整个生产者链都可能被阻塞。
因此,确保管道被完全消费至关重要,尤其是在错误导致提前终止的情况下。
下面的示例演示了一种情况,即最后处理阶段在第一次遇到错误时退出,从而导致管道处于阻塞状态。
func doWork(ctx context.Context) error { // 初始化管道的第一阶段 ids := streamIDs(ctx) // Define other pipeline stages... // 最后阶段处理 for value := range results { // Process value... if someCondition { return fmt.Errorf("some error") // Early exit on error } } return nil }
|
为防止出现此类问题,建议确保在出现错误时排空结果通道。一种直接的方法是使用 defer 来调用 DrainNB:
func doWork(ctx context.Context) error { // Initialize the first stage of the pipeline ids := streamIDs(ctx) // Define other pipeline stages... // 确保在发生故障时排空管道 defer rill.DrainNB(results) // Final stage processing for value := range results { // Process value... if someCondition { return fmt.Errorf("some error") // Early exit on error } } return nil }
|
利用 ForEach 或 ToSlice 等内置排水机制的函数,可以简化代码并提高可读性:
func doWork(ctx context.Context) error { // Initialize the first stage of the pipeline ids := streamIDs(ctx) // Define other pipeline stages...
// Final stage processing return rill.ForEach(results, 5, func(value string) error { // Process value... if someCondition { return fmt.Errorf("some error") // 出错时提前退出,自动排水 } return nil }) }
|
虽然这些措施能有效防止泄漏,但只要初始阶段产生数值,管道就可能继续在后台消耗数值。
最佳做法是使用上下文管理第一阶段(以及可能的其他阶段),从而实现受控关闭:
func doWork(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() // 确保功能退出时取消第一阶段
// Initialize the first stage of the pipeline ids := streamIDs(ctx)
// Define other pipeline stages...
// Final stage processing return rill.ForEach(results, 5, func(value string) error { // Process value if someCondition { return fmt.Errorf("some error") // Early exit on error, with automatic draining } return nil }) }
|
顺序保持
在并发环境中,由于并行执行的特性,保持已处理项目的原始顺序具有挑战性。当数值从输入通道读取、通过函数 f 处理并写入输出通道时,它们的顺序可能与输入顺序不一致。为了解决这个问题,rill 为其核心函数(如 OrderedMap、OrderedFilter 等)提供了有序版本。这些函数确保,如果输入通道中的值 x 在值 y 之前,那么输出中的 f(x) 就会在 f(y) 之前,从而保持原来的顺序。值得注意的是,与无序函数相比,这些有序函数会产生少量开销,这是因为需要额外的逻辑来保持顺序。
在数据顺序会影响结果的情况下,保持顺序至关重要。例如,一个应用程序需要检索特定时间段内的每日温度测量值,并计算一天到第二天的温度变化。虽然并行获取数据可以提高效率,但按照原始顺序处理数据对于准确计算温度变化至关重要。
type Measurement struct { Date time.Time Temp float64 }
func main() { city := "New York" endDate := time.Now() startDate := endDate.AddDate(0, 0, -30)
// 创建一个通道,发送开始日期和结束日期之间的所有天数 days := make(chan rill.Try[time.Time]) go func() { defer close(days) for date := startDate; date.Before(endDate); date = date.AddDate(0, 0, 1) { days <- rill.Wrap(date, nil) } }()
// 同时下载每天的温度 measurements := rill.OrderedMap(days, 10, func(date time.Time) (Measurement, error) { temp, err := getTemperature(city, date) return Measurement{Date: date, Temp: temp}, err })
// 迭代测量结果,计算并打印变化。使用单个 goroutine prev := Measurement{Temp: math.NaN()} err := rill.ForEach(measurements, 1, func(m Measurement) error { change := m.Temp - prev.Temp prev = m
fmt.Printf("%s: %.1f°C (change %+.1f°C)\n", m.Date.Format("2006-01-02"), m.Temp, change) return nil }) if err != nil { fmt.Println("Error:", err) } }
// getTemperature 获取城市和日期的温度读数 func getTemperature(city string, date time.Time) (float64, error) { // ... }
|