最近,我遇到了一个编程任务,需要在网络界面上显示一个日志文件的最后'n'行,类似'tail -[n] -f'的功能。
tail -f -30 /var/log/nginx/error.log命令,我们可能在终端上做过数百万次。
为了不断地将数据从后端输送到前端,我们必须冒险超越传统的REST APIs。我探索了服务器端事件和文件流API的可能性,但它们的服务器端实现似乎相当复杂。因此,我选择了WebSockets,在服务器和客户端之间提供了一个更精简的通信渠道。
因为我已经有了一个用GoLang编写的现有网络API服务器,它提供了几个REST端点作为起点。
让我们添加托管WebSocket服务所需的包。
go get github.com/gorilla/websocket
我们需要为WebSocket升级一个HTTP端点。以后我们将需要这个方法来把普通的HTTP端点升级为WebSocket端点。
var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, }
|
接下来,我们将准备两类结构:一类是客户对象,另一类是广播者对象。客户端对象代表一个单一的客户端。它持有一个套接字连接引用和发送通道。广播者对象负责管理多个客户端连接。
type Client struct { socket *websocket.Conn send chan []byte }
type Broadcaster struct { clients map[*Client]bool broadcast chan string register chan *Client unregister chan *Client }
|
让我们创建一个broadcaster生产者函数来创建broadcaster实例对象。广播器记录了WebSocket连接的情况。它还保留了一个接收消息的通道,以便向所有连接广播。
ster() *Broadcaster { return &Broadcaster{ broadcast: make(chan string), register: make(chan *Client), unregister: make(chan *Client), clients: make(map[*Client]bool), } }
func (b *Broadcaster) run() { for { select { case client := <-b.register: b.clients[client] = true case client := <-b.unregister: if _, ok := b.clients[client]; ok { delete(b.clients, client) close(client.send) } case message := <-b.broadcast: for client := range b.clients { client.send <- []byte(message) } } } }
|
我们可以使用这个Go 包对文件进行“拖尾”读取。所以,我们可以在我们的代码中包含这个包。它将把这几行文字发送到广播器broadcaster,广播器broadcaster将它们发送到send通道--一个我们将定义的Go例程。
func (b *Broadcaster) tailFile(filepath string) { t, err := tail.TailFile( filepath, tail.Config{Follow: true, Location: &tail.SeekInfo{Offset: 0, Whence: 2}}, ) if err != nil { log.Fatalf("tail file err: %v", err) }
for line := range t.Lines { if line.Text != "" { b.broadcast <- line.Text } } }
|
设置好广播员逻辑后,我们可以在main()例程中把它挂起来。
我们添加一个静态文件路由,以方便用JavaScript测试这个功能。
import ( "encoding/json" "log" "net/http"
"github.com/gorilla/mux" "github.com/gorilla/websocket" "github.com/hpcloud/tail" )
// ...
func main() { broadcaster := newBroadcaster() go broadcaster.run() go broadcaster.tailFile("./sample_data/sample_log.log") // file path, last n number of lines staticServer := http.FileServer(http.Dir("./public_html")) router := mux.NewRouter() // or just http.HandleFunc(...) for not using mux. router.HandleFunc("/ws", handleWebSocketConnection(broadcaster)) router.Handle("/", staticServer) // we can also add a static file server to test it using JavaScript // ... log.Fatal(http.ListenAndServe(":8000", router)) // Original map }
|
在/public-html下放置一些index.html内容。
<html> <head> <title>WebSocket Tester</title> </head> <body> <script> var ws = new WebSocket('ws://localhost:8000/ws'); ws.onopen = function(event) { console.log('Connection is open ...'); };
ws.onerror = function(err) { console.log('err: ', err, err.toString()); };
// Event handler for receiving text from the server ws.onmessage = function(event) { console.log('Received: ' + event.data); };
ws.onclose = function() { console.log('Connection is closed...'); }; </script> </body> </html>
|
我们还需要为WebSocket创建连接处理程序。请注意,我们在这里使用了两个Go Routines。它们处于一个for循环之下,这使得它们一直在运行,直到它们中断。第一个Go例程用于在我们收到来自套接字的错误时清理WebSocket连接。另一个用于写入我们从send通道收到的消息,该通道由我们之前定义的广播器broadcaster传递。
func handleWebSocketConnection(b *Broadcaster, filePath string, n int) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { ws, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Println(err) return } client := &Client{socket: ws, send: make(chan []byte)} b.register <- client
go func() { defer func() { b.unregister <- client ws.Close() }()
for { _, _, err := ws.ReadMessage() if err != nil { b.unregister <- client ws.Close() break } } }()
go func() { defer ws.Close() for { message, ok := <-client.send if !ok { ws.WriteMessage(websocket.CloseMessage, []byte{}) return } ws.WriteMessage(websocket.TextMessage, message) } }() } }
|
现在是时候运行服务器并检查是否有任何错误。
go run main.go
当我们更新目标日志文件sample_data/sample_log.log时,我们可以看到JavaScript控制台记录了浏览器的输出。酷,看起来它成功了。现在,让我们再往前走一步。比方说,在每个客户端连接时,我们希望服务器立即将日志文件的最后'n'行发送给客户端。而不是一开始就显示空白行。
我们怎样才能归档这个?嗯,有很多方法可以做到这一点。为了简单起见。我们可以使用Go中普通的缓冲区io.Scan()函数来写一个readLastNlines()函数,如下所示:
func readLastNLines(fileName string, n int) ([]string, error) { if err != nil { return nil, err } defer file.Close()
scanner := bufio.NewScanner(file) lines := make([]string, 0)
for scanner.Scan() { lines = append(lines, scanner.Text()) if len(lines) > n { lines = lines[1:] } }
if scanner.Err() != nil { return nil, scanner.Err() }
return lines, nil }
|
在handleWebSocketConnection()函数中创建另一个go例程来调用initialRead(),该函数在内部调用readLastNLines()来获取最后的n行,以便向下发送。但不要忘记在调用initialRead()函数前的go关键字。
func (b *Broadcaster) initialRead(client *Client, filePath string, n int) { // Send last n lines from file to the client lines, err := readLastNLines(filePath, n) if err != nil { log.Println(err) return }
for _, line := range lines { b.broadcast <- line } } func handleWebSocketConnection(b *Broadcaster, filePath string, n int) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { ws, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Println(err) return } client := &Client{socket: ws, send: make(chan []byte)} b.register <- client
go b.initialRead(client, filePath, n) // <-- this is added
go func() { defer func() { b.unregister <- client ws.Close() }()
for { _, _, err := ws.ReadMessage() if err != nil { b.unregister <- client ws.Close() break } } }()
go func() { defer ws.Close() for { message, ok := <-client.send if !ok { ws.WriteMessage(websocket.CloseMessage, []byte{}) return } ws.WriteMessage(websocket.TextMessage, message) } }() } }
|
最后,我们需要更新main()函数的 handleWebSocketConnection()方法签名。
func main() { targetFile := "./sample_data/sample_log.log" lastNLines := 20 // ... router.HandleFunc("/ws", handleWebSocketConnection(broadcaster, targetFile, lastNLines)) // ... }
|
在此GitHub 存储库中阅读完整的源代码