Golang中编写I/O数据读写流

GitLab 使用 Golang 抽象的 I/O 实现:每小时流式传输 TB 级的 Git 数据。本文帮助你了解如何在 Golang 应用程序中编写读写器Readers 和 Writers。

每小时,GitLab 都要在服务器和客户端之间传输数 TB 的 Git 数据。除非以流式方式高效传输,否则很难甚至不可能处理如此大的流量。Git 数据由

  1. Gitaly(Git 服务器)、
  2. GitLab Shell(通过 SSH 传输 Git)
  3. 和 Workhorse(通过 HTTP(S) 传输 Git)提供。

这些服务都是用 Go 语言实现的,Go 语言提供了高效处理 I/O 操作的抽象。

io 软件包
Golang 的 io 软件包提供了 Reader 和 Writer 接口,可将 I/O 实现的功能抽象为公共接口。

Reader 是封装基本 Read 方法的接口:

type Reader interface {
    Read(p []byte) (n int, err error)
}

Writer 是封装基本 Write 方法的接口。

type Writer interface {
    Write(p []byte) (n int, err error)
}

例如,os 包提供了读取文件的实现。文件类型通过定义基本的 "读 "和 "写 "函数实现了 "读 "和 "写 "接口。


读写案例
首先,让我们读取文件并将其内容写入 os.Stdout。

func main() {
    file, err := os.Open("data.txt")
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    p := make([]byte, 32 * 1024)
    for {
        n, err := file.Read(p)

        _, errW := os.Stdout.Write(p[:n])
        if errW != nil {
            log.Fatal(errW)
        }

        if err != nil {
            if errors.Is(err, io.EOF) {
                break
            }

            log.Fatal(err)
        }
    }
}

每次调用 "读取 "函数都会将文件内容填充到缓冲区 p 中,也就是说,文件是分块(最多 32KB)消耗的,而不是完全加载到内存中。

为了简化这种广泛使用的模式,io 软件包方便地提供了 Copy 函数,允许将内容从任意读取器传递到任意写入器,还能处理其他边缘情况。

func main() {
    file, err := os.Open("data.txt")
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    if _, err := io.Copy(os.Stdout, file); err != nil {
        log.Fatal(err)
    }
}

读者(Reader)和写者(Writer)接口被广泛应用于整个 Golang 生态系统,因为它们有助于以流式方式读写内容。

因此,将读取器Readers 和写入器Writers 与期望这些接口作为参数的函数粘合在一起是一个经常要解决的问题。

有时,只需将内容从一个读取器传递到一个写入器即可,但有时写入写入器的内容必须表示为一个读取器,或者来自一个阅读器的内容必须发送到多个写入器。

让我们仔细看看 GitLab 代码库中解决这类问题的不同用例和示例。

Reader -> Writer
我们需要将内容从读取器Readers 传递到写作器Writers 。

使用io.Copy.代码可以解决这个问题:

func Copy(dst Writer, src Reader) (written int64, err error)

例如:InfoRefs* Gitaly RPC 返回一个阅读器,我们希望通过 HTTP 响应将其内容流式传输给用户:

func handleGetInfoRefsWithGitaly(ctx context.Context, responseWriter *HttpResponseWriter, a *api.Response, rpc, gitProtocol, encoding string) error {
        ...
        infoRefsResponseReader, err := smarthttp.InfoRefsResponseReader(ctx, &a.Repository, rpc, gitConfigOptions(a), gitProtocol)
        ...
        if _, err = io.Copy(w, infoRefsResponseReader); err != nil {
            return err
        }
        ...
}


Reader -> 多个Writers
我们需要将内容从一个阅读器传递到多个写作器。

io软件包提供了io.MultiWriter函数,可将多个Writers转换为单个Writers。调用其 Write 函数时,内容会被复制到所有 Writers(实现implementation)。

func MultiWriter(writers ...Writer) Writer

假设我们要从同一内容创建 md5、sha1、sha256 和 sha512 哈希值。哈希类型是写入器。我们使用 io.MultiWriter 定义 multiHash Writer。内容写入 multiHash 后,我们将一次性计算所有这些函数的哈希值。

简化版示例如下

package main

import (
    "crypto/sha1"
    
"crypto/sha256"
    
"fmt"
    
"io"
    
"log"
)

func main() {
    s1 := sha1.New()
    s256 := sha256.New()

    w := io.MultiWriter(s1, s256)
    if _, err := w.Write([]byte(
"content")); err != nil {
        log.Fatal(err)
    }

    fmt.Println(s1.Sum(nil))
    fmt.Println(s256.Sum(nil))
}

多个Readers -> Reader
我们有多个读取器,需要按顺序读取。

io 软件包提供的 io.MultiReader 函数可将多个读取器转换为一个读取器。读取器按照传递的顺序读取。
func MultiReader(readers ...Reader) Reader

这样,该阅读器就可以在任何接受阅读器作为参数的函数中使用。

Workhorse 通过读取图像的前 N 个字节来检测其是否为 PNG 文件,并通过从多个读取器中创建一个读取器来放回图像:

func NewReader(r io.Reader) (io.Reader, error) {
    magicBytes, err := readMagic(r)
    if err != nil {
        return nil, err
    }

    if string(magicBytes) != pngMagic {
        debug("Not a PNG - read file unchanged")
        return io.MultiReader(bytes.NewReader(magicBytes), r), nil
    }

    return io.MultiReader(bytes.NewReader(magicBytes), &Reader{underlying: r}), nil
}

多个读取者 -> 多个写入器
我们需要将多个读取器中的内容传递到多个写作器中。

上述解决方案可以推广到多对多使用案例中。

_, err := io.Copy(io.MultiWriter(w1, w2, w3), io.MultiReader(r1, r2, r3))


Reader -> Reader + Writer
我们需要从阅读器中读取内容,或将阅读器传递给函数,同时将内容写入写入器。

io软件包提供了io.TeeReader函数,该函数接受一个要读取的Reader和一个要写入的Writer,并返回一个可进一步处理的Reader。

func TeeReader(r Reader, w Writer) Reader

该功能的实现非常简单。传递的读取器和写入器存储在一个本身就是读取器的结构中:

func TeeReader(r Reader, w Writer) Reader {
    return &teeReader{r, w}
}

type teeReader struct {
    r Reader
    w Writer
}

为结构体执行的读取函数将读取委托给传递的读取器,同时也将写入委托给传递的写入器:

func (t *teeReader) Read(p []byte) (n int, err error) {
    n, err = t.r.Read(p)
    if n > 0 {
        if n, err := t.w.Write(p[:n]); err != nil {
            return n, err
        }
    }
    return
}

示例 1
我们在 "多个写入器 -> 写入器 "部分已经提到过散列主题,io.TeeReader 用于提供一个写入器,从内容中创建散列。返回的读取器可进一步用于将内容上传到对象存储空间。

示例 2

Workhorse 使用 io.TeeReader 来实现依赖代理功能。依赖关系代理会将请求的上游图像缓存到对象存储中。尚未缓存的用例具有以下行为:

  • 用户执行 HTTP 请求。
  • 上游图片使用 net/http 抓取,http.Response 通过 Body 字段提供其内容,该字段是 io.ReadCloser(基本上是 io.Reader)。
  • 我们需要将这些内容写入 http.ResponseWriter(基本上是一个 io.Writer),然后发送回用户。
  • 同时,我们需要通过执行 http.Request(一个接受 io.Reader 的函数)将内容上传到对象存储空间。

因此,io.TeeReader 可用于将这些基元粘合在一起:

func (p *Injector) Inject(w http.ResponseWriter, r *http.Request, sendData string) {
    // 通过 HTTP 获取上游数据
    dependencyResponse, err := p.fetchUrl(r.Context(), sendData)
    ...
    
//创建一个 Tee 读取器。每次读取都将从 dependencyResponse.Body 中读取数据,同时
       
// perform a Write to w writer
    teeReader := io.TeeReader(dependencyResponse.Body, w)
    
// 将 Tee 阅读器作为 HTTP 请求的正文传递,以将其上传到对象存储空间
    saveFileRequest, err := http.NewRequestWithContext(r.Context(),
"POST", r.URL.String()+"/upload", teeReader)
    ...
    nrw := &nullResponseWriter{header: make(http.Header)}
    p.uploadHandler.ServeHTTP(nrw, saveFileRequest)
    ...


Writer -> Reader
我们有一个接受 Writer 的函数,我们对函数写入 Writer 的内容感兴趣。我们希望截取内容并将其表示为一个阅读器,以便以流式方式进一步处理。

io软件包提供的io.Pipe函数可返回一个Reader和一个Writer:

func Pipe() (*PipeReader, *PipeWriter)

Writer 可用于传递给接受 Writer 的函数。所有写入其中的内容都可以通过阅读器访问,即创建了一个同步内存管道,可用于连接期待使用 io.Reader 的代码和期待使用 io.Writer 的代码。

示例 1

为实现代码导航的 LSIF 的转换,我们需要

  • 读取 zip 文件的内容。
  • 转换内容并将其序列化为 zip.Writer。
  • 将新的压缩内容表示为一个阅读器,以便以流式方式进一步处理。

zip.NewWriter 函数接受一个写入压缩内容的 Writer。当我们需要向函数传递一个打开的文件描述符以将内容保存到文件中时,这个函数非常方便。不过,当我们需要通过 HTTP 请求传递压缩内容时,我们需要将数据表示为一个 Reader。

// The `io.Pipe()` creates a reader and a writer.
pr, pw := io.Pipe()

// 写入器将被传递给`parser.transform`函数,该函数将把经过转换的压缩内容写入其中
//
// 写入过程应在一个goroutine中异步进行,因为对`PipeWriter`的每一次`Write`都会阻塞,直到满足`PipeReader`的一次或多次`Read`为止。
go parser.transform(pw)

// 所有写入其中的内容现在都可以通过阅读器访问。
parser := &Parser{
    Docs: docs,
    pr:   pr,
}

// pr 是一个读取器,可用于读取写入 pw 写入器的所有数据
return parser, nil


示例 2

对于 Geo 设置, GitLab Shell 会将所有 git push 操作代理到辅助服务器,然后重定向到主服务器。

  • GitLab Shell 建立 SSH 连接,并定义 ReadWriter 结构,其中 In 字段为 io.Reader 类型,用于从用户读取数据;Out 字段为 io.Writer 类型,用于向用户发送响应。
  • GitLab Shell 向 /info/refs 执行 HTTP 请求,并使用 io.Copy 向用户发送 io.Reader 类型的 response.Body。
  • GitLab Shell 需要读取这些数据,将其转换为 Git HTTP 期望的请求,并将其作为 HTTP 请求发送到 /git-receive-pack。这就是 io.Pipe 的用武之地。

func (c *PushCommand) requestReceivePack(ctx context.Context, client *git.Client) error {
    // 定义 pipeReader 和 pipeWriter,并使用 pipeWriter 收集用户发送的所有数据
    
// 转换为 Git HTTP 期望的格式。
    pipeReader, pipeWriter := io.Pipe()
    
// 写入是异步进行的,因为它是一个阻塞操作
    go c.readFromStdin(pipeWriter)

    
// pipeReader 可以作为 io.Reader 传递,用于读取写入 pipeWriter 的所有数据
    response, err := client.ReceivePack(ctx, pipeReader)
    ...
    _, err = io.Copy(c.ReadWriter.Out, response.Body)
    ...
}

func (c *PushCommand) readFromStdin(pw *io.PipeWriter) {
    var needsPackData bool

    
// 扫描仪逐行读取用户输入的内容
    scanner := pktline.NewScanner(c.ReadWriter.In)
    for scanner.Scan() {
        line := scanner.Bytes()
        
// And writes it to the pipe writer
        pw.Write(line)
        ...
    }

    
// 必要时复制扫描仪未处理的数据
    if needsPackData {
        io.Copy(pw, c.ReadWriter.In)
    }

    
// 关闭管道写入器,表示管道读取器 EOF
    pw.Close()
}


总结
Golang 提供了优雅的模式,旨在以流式方式高效处理数据。这些模式可用于应对新的挑战或重构与高内存消耗相关的现有性能问题。