使用GoLang和WebSockets实时流式传输日志文件:模拟“tail -f”


最近,我遇到了一个编程任务,需要在网络界面上显示一个日志文件的最后'n'行,类似'tail -[n] -f'的功能。

tail -f -30 /var/log/nginx/error.log命令,我们可能在终端上做过数百万次。

为了不断地将数据从后端输送到前端,我们必须冒险超越传统的REST APIs。我探索了服务器端事件和文件流API的可能性,但它们的服务器端实现似乎相当复杂。因此,我选择了WebSockets,在服务器和客户端之间提供了一个更精简的通信渠道。

因为我已经有了一个用GoLang编写的现有网络API服务器,它提供了几个REST端点作为起点。

让我们添加托管WebSocket服务所需的包。

go get github.com/gorilla/websocket

我们需要为WebSocket升级一个HTTP端点。以后我们将需要这个方法来把普通的HTTP端点升级为WebSocket端点。

var upgrader = websocket.Upgrader{
 ReadBufferSize:  1024,
 WriteBufferSize: 1024,
}

接下来,我们将准备两类结构:一类是客户对象,另一类是广播者对象。客户端对象代表一个单一的客户端。它持有一个套接字连接引用和发送通道。广播者对象负责管理多个客户端连接。

type Client struct {
 socket *websocket.Conn
 send   chan []byte
}

type Broadcaster struct {
 clients    map[*Client]bool
 broadcast  chan string
 register   chan *Client
 unregister chan *Client
}

让我们创建一个broadcaster生产者函数来创建broadcaster实例对象。广播器记录了WebSocket连接的情况。它还保留了一个接收消息的通道,以便向所有连接广播。

ster() *Broadcaster {
 return &Broadcaster{
  broadcast:  make(chan string),
  register:   make(chan *Client),
  unregister: make(chan *Client),
  clients:    make(map[*Client]bool),
 }
}

func (b *Broadcaster) run() {
 for {
  select {
  case client := <-b.register:
   b.clients[client] = true
  case client := <-b.unregister:
   if _, ok := b.clients[client]; ok {
    delete(b.clients, client)
    close(client.send)
   }
  case message := <-b.broadcast:
   for client := range b.clients {
    client.send <- []byte(message)
   }
  }
 }
}

我们可以使用这个Go 包对文件进行“拖尾”读取。所以,我们可以在我们的代码中包含这个包。

它将把这几行文字发送到广播器broadcaster,广播器broadcaster将它们发送到send通道--一个我们将定义的Go例程。

func (b *Broadcaster) tailFile(filepath string) {
 t, err := tail.TailFile(
  filepath,
  tail.Config{Follow: true, Location: &tail.SeekInfo{Offset: 0, Whence: 2}},
 )
 if err != nil {
  log.Fatalf("tail file err: %v", err)
 }

 for line := range t.Lines {
  if line.Text != "" {
   b.broadcast <- line.Text
  }
 }
}

设置好广播员逻辑后,我们可以在main()例程中把它挂起来。

我们添加一个静态文件路由,以方便用JavaScript测试这个功能。

import (
 "encoding/json"
 "log"
 "net/http"

 "github.com/gorilla/mux"
 "github.com/gorilla/websocket"
 "github.com/hpcloud/tail"
)

// ...

func main() {
   broadcaster := newBroadcaster()
   go broadcaster.run()
   go broadcaster.tailFile("./sample_data/sample_log.log") // file path, last n number of lines
   
   staticServer := http.FileServer(http.Dir("./public_html"))
   router := mux.NewRouter()
   // or just http.HandleFunc(...) for not using mux.
   router.HandleFunc("/ws", handleWebSocketConnection(broadcaster))
   router.Handle("/", staticServer) // we can also add a static file server to test it using JavaScript
   // ... 
   log.Fatal(http.ListenAndServe(":8000", router))
   // Original map
}

在/public-html下放置一些index.html内容。

<html>
  <head>
      <title>WebSocket Tester</title>
  </head>
  <body>
      <script>
        var ws = new WebSocket('ws://localhost:8000/ws');
        ws.onopen = function(event) {
                console.log('Connection is open ...');
              };

        ws.onerror = function(err) {
                console.log('err: ', err, err.toString());
              };

        // Event handler for receiving text from the server
        ws.onmessage = function(event) {
                console.log('Received: ' + event.data);
              };

        ws.onclose = function() {
                console.log('Connection is closed...');
              };
        </script>
  </body>
</html>


我们还需要为WebSocket创建连接处理程序。请注意,我们在这里使用了两个Go Routines。它们处于一个for循环之下,这使得它们一直在运行,直到它们中断。第一个Go例程用于在我们收到来自套接字的错误时清理WebSocket连接。另一个用于写入我们从send通道收到的消息,该通道由我们之前定义的广播器broadcaster传递。

func handleWebSocketConnection(b *Broadcaster, filePath string, n int) http.HandlerFunc {
 return func(w http.ResponseWriter, r *http.Request) {
  ws, err := upgrader.Upgrade(w, r, nil)
  if err != nil {
   log.Println(err)
   return
  }
  client := &Client{socket: ws, send: make(chan []byte)}
  b.register <- client

  go func() {
   defer func() {
    b.unregister <- client
    ws.Close()
   }()

   for {
    _, _, err := ws.ReadMessage()
    if err != nil {
     b.unregister <- client
     ws.Close()
     break
    }
   }
  }()

  go func() {
   defer ws.Close()
   for {
    message, ok := <-client.send
    if !ok {
     ws.WriteMessage(websocket.CloseMessage, []byte{})
     return
    }
    ws.WriteMessage(websocket.TextMessage, message)
   }
  }()
 }
}

现在是时候运行服务器并检查是否有任何错误。

go run main.go

当我们更新目标日志文件sample_data/sample_log.log时,我们可以看到JavaScript控制台记录了浏览器的输出。酷,看起来它成功了。现在,让我们再往前走一步。比方说,在每个客户端连接时,我们希望服务器立即将日志文件的最后'n'行发送给客户端。而不是一开始就显示空白行。

我们怎样才能归档这个?嗯,有很多方法可以做到这一点。为了简单起见。我们可以使用Go中普通的缓冲区io.Scan()函数来写一个readLastNlines()函数,如下所示:

func readLastNLines(fileName string, n int) ([]string, error) {
 if err != nil {
  return nil, err
 }
 defer file.Close()

 scanner := bufio.NewScanner(file)
 lines := make([]string, 0)

 for scanner.Scan() {
  lines = append(lines, scanner.Text())
  if len(lines) > n {
   lines = lines[1:]
  }
 }

 if scanner.Err() != nil {
  return nil, scanner.Err()
 }

 return lines, nil
}

在handleWebSocketConnection()函数中创建另一个go例程来调用initialRead(),该函数在内部调用readLastNLines()来获取最后的n行,以便向下发送。但不要忘记在调用initialRead()函数前的go关键字。


func (b *Broadcaster) initialRead(client *Client, filePath string, n int) {
 // Send last n lines from file to the client
 lines, err := readLastNLines(filePath, n)
 if err != nil {
  log.Println(err)
  return
 }

 for _, line := range lines {
  b.broadcast <- line
 }
}
func handleWebSocketConnection(b *Broadcaster, filePath string, n int) http.HandlerFunc {
 return func(w http.ResponseWriter, r *http.Request) {
  ws, err := upgrader.Upgrade(w, r, nil)
  if err != nil {
   log.Println(err)
   return
  }
  client := &Client{socket: ws, send: make(chan []byte)}
  b.register <- client

  go b.initialRead(client, filePath, n) // <-- this is added

  go func() {
   defer func() {
    b.unregister <- client
    ws.Close()
   }()

   for {
    _, _, err := ws.ReadMessage()
    if err != nil {
     b.unregister <- client
     ws.Close()
     break
    }
   }
  }()

  go func() {
   defer ws.Close()
   for {
    message, ok := <-client.send
    if !ok {
     ws.WriteMessage(websocket.CloseMessage, []byte{})
     return
    }
    ws.WriteMessage(websocket.TextMessage, message)
   }
  }()
 }
}

最后,我们需要更新main()函数的 handleWebSocketConnection()方法签名。

func main() {
 targetFile := "./sample_data/sample_log.log"
 lastNLines := 20
 // ...
 router.HandleFunc("/ws", handleWebSocketConnection(broadcaster, targetFile, lastNLines))
 // ...
}

在此GitHub 存储库中阅读完整的源代码