Netty中自定义事件处理程序和监听器

在本教程中,我们将使用Netty 创建一个聊天室应用程序。在网络编程中,Netty 作为一个强大的框架而脱颖而出,它简化了异步 I/O 操作的复杂性。我们将探讨如何构建一个基本的聊天服务器,多个客户端可以在其中连接并进行实时对话。

在 Netty 中,通信是通过通道完成的,通道抽象了任何协议上的异步 I/O 操作。这使我们能够专注于应用程序逻辑而不是网络代码。我们的应用程序将通过命令行运行。

 我们将编写一个服务器和一个客户端应用程序。

 对于通道之间的通信,我们将实现SimpleChannelInboundHandler<String>,它是ChannelInboundHandlerAdapter的通用实现。该适配器使我们能够专注于仅实现我们关心的事件。在本例中,它是channelRead0(),当从服务器接收到消息时调用它。我们将使用它来简化我们的用例,因为我们只交换字符串消息。


1. 客户端事件处理程序
让我们从客户端消息的处理程序开始,它将把服务器接收到的任何内容打印到控制台,无需修改:

public class ClientEventHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) {
        System.out.println(msg);
    }
}

稍后,我们将通过直接写入通道来处理消息发送。

2. 消息对象
在我们继续讨论服务器事件之前,让我们编写一个POJO来表示发送到服务器的每条消息。我们将注册发送的日期以及用户名和消息:

public class Message {
    private final Instant time;
    private final String user;
    private final String message;
    public Message(String user, String message) {
        this.time = Instant.now();
        this.user = user;
        this.message = message;
    }
    // standard getters...
}

然后,我们将包括一些帮助程序,首先是服务器发送消息时消息如何显示在控制台上:

@Override
public String toString() {
    return time + " - " + user + ": " + message;
}

然后,为了解析客户端收到的消息,我们将使用 CSV 格式。当我们创建客户端应用程序时,我们将看到客户端如何以这种格式发送消息:

public static Message parse(String string) {
    String arr = string.split(";", 2);
    return new Message(arr[0], arr[1]);
}

将分割限制为 2 很重要,因为消息部分可能包含分号。

3. 服务器事件处理程序
在我们的服务器事件处理程序中,我们将首先为我们将覆盖的其他事件创建一个辅助方法。此外,我们还需要一个已连接客户端的映射和一个队列来最多保留MAX_HISTORY元素:

public class ServerEventHandler extends SimpleChannelInboundHandler<String> {
    static final Map<String, Channel> clients = new HashMap<>();
    static final Queue<String> history = new LinkedList<>();
    static final int MAX_HISTORY = 5;
    private void handleBroadcast(Message message, ChannelHandlerContext context) {
        String channelId = context.channel()
          .id()
          .asShortText();
        
        clients.forEach((id, channel) -> {
            if (!id.equals(channelId))
                channel.writeAndFlush(message.toString());
        });
        // history-control code...
    }
   
// ...
}


首先,我们获取通道 ID 作为地图的键。然后,对于广播,对于每个连接的客户端(不包括发送者),我们中继他们的消息。
值得注意的是writeAndFlush()接收一个Object。而且,由于我们的处理程序只能处理字符串,因此必须调用toString()以便客户端可以正确接收它。

最后,我们进行历史控制。每次添加新消息时,如果列表超过MAX_HISTORY项,我们就会删除最旧的消息:

history.add(message.toString());
if (history.size() > MAX_HISTORY)
    history.poll();

 现在,我们可以重写channelRead0()并解析从客户端收到的消息:

@Override
public void channelRead0(ChannelHandlerContext context, String msg) {
    handleBroadcast(Message.parse(msg), context);
}

然后,对于每个上线的客户端,我们将其添加到我们的客户端 列表中,中继旧消息以获取上下文,并发送一条系统消息宣布新客户端message为上下文,并发送系统消息宣布新客户端:

@Override
public void channelActive(final ChannelHandlerContext context) {
    Channel channel = context.channel();
    clients.put(channel.id().asShortText(), channel);
    history.forEach(channel::writeAndFlush);
    handleBroadcast(new Message("system", "client online"), context);
}

 最后,我们重写channelInactive(),在客户端离线时调用。这次,我们只需要从列表中删除客户端并发送系统消息:

@Override
public void channelInactive(ChannelHandlerContext context) {
    Channel channel = context.channel();
    clients.remove(channel.id().asShortText());
    handleBroadcast(new Message("system", "client offline"), context);
}

 4、服务器引导应用程序
我们的处理程序不独立执行任何操作,因此我们需要一个应用程序来引导并运行它,这是一个通用模板。
在ChannelPipeline中注册自定义组件

为了准备引导程序,我们选择一个通道实现并实现一个子处理程序,该处理程序为通道的请求提供服务:

bootstrap.group(serverGroup, clientGroup)
  .channel(NioServerSocketChannel.class)
  .childHandler(new ChannelInitializer<SocketChannel>() {
      @Override
      public void initChannel(SocketChannel channel) {
          channel.pipeline()
            .addFirst(
              new StringDecoder(),
              new ServerEventHandler()
              new StringEncoder());
      }
  });

在子处理程序中,我们定义处理管道。由于我们只关心字符串消息,因此我们将使用内置的字符串编码器和解码器,这样就不必自己对交换的字节缓冲区进行编码/解码,从而节省了一些时间。

最后,由于顺序很重要,我们添加解码器、ServerEventHandler和编码器。这是因为事件通过管道从入站流向出站。

我们将服务器绑定到主机/端口来完成我们的应用程序,该应用程序返回一个ChannelFuture。我们将使用它来等待异步套接字通过sync()关闭:

ChannelFuture future = bootstrap.bind(HOST, PORT).sync();
System.out.println("server started. accepting clients.");
future.channel().closeFuture().sync();

 5、客户端引导应用程序
最后,我们的客户端应用程序遵循通用客户端模板进行引导。最重要的是,当调用handler()时,我们将使用ClientEventHandler来代替:

channel.pipeline().addFirst(
  new StringDecoder(), 
  new ClientEventHandler(), 
  new StringEncoder());

 处理消息输入
最后,为了处理用户输入,连接到服务器后,我们将使用扫描仪循环,直到收到用户名,然后直到消息等于“退出”。最重要的是,我们必须使用writeAndFlush()来发送消息。我们以Message.parse()期望的格式发送消息:

private static void messageLoop(Scanner scanner, Channel channel) {
    while (user.isEmpty()) {
        System.out.print("your name: ");
        user = scanner.nextLine();
    }
    while (scanner.hasNext()) {
        System.out.print(
"> ");
        String message = scanner.nextLine();
        if (message.equals(
"exit"))
            break;
        channel.writeAndFlush(user +
";" + message);
    }
}

 6、创建自定义事件监听器
在 Netty 中,事件监听器在通道整个生命周期中处理异步事件方面发挥着至关重要的作用。事件监听器本质上是一种回调机制,我们可以使用它对返回ChannelFuture的任何操作的完成做出反应。

我们在完成时实现ChannelFutureListener接口以实现自定义行为。ChannelFuture 表示异步操作的结果,例如连接尝试或 I/O 操作。

ChannelFutureListener很有用,因为它定义了默认实现,例如CLOSE_ON_FAILURE或FIRE_EXCEPTION_ON_FAILURE。但是,由于我们不会使用这些,因此让我们实现一个用于操作确认的GenericFutureListener 。

我们将保留上下文的自定义事件名称,并且我们将检查未来是否成功完成。否则,我们将在记录之前将状态标记为“FAILED”:
 

public class ChannelInfoListener implements GenericFutureListener<ChannelFuture> {
    private final String event;
    public ChannelInfoListener(String event) {
        this.event = event;
    }
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        Channel channel = future.channel();
        String status = "OK";
        if (!future.isSuccess()) {
            status =
"FAILED";
            future.cause().printStackTrace();
        }
        System.out.printf(
         
"%s - channel#%s %s: %s%n", Instant.now(), channel.id().asShortText(), status, event);
    }
}

  事件接收
让我们回到代码的某些部分来包含侦听器。首先,对于客户端,我们添加一个“连接到服务器”确认:

future.addListener(new ChannelInfoListener("connected to server"));


然后,让我们在消息循环中包含“消息已发送”确认:

ChannelFuture sent = channel.writeAndFlush(user + ";" + message);
sent.addListener(new ChannelInfoListener("message sent"));


这使我们能够确保在发送消息时仍然连接到服务器。最后,对于服务器处理程序,让我们在广播期间发送“消息中继”确认:

clients.forEach((id, channel) -> {
    if (!id.equals(channelId)) {
        ChannelFuture relay = channel.writeAndFlush(message.toString());
        relay.addListener(new ChannelInfoListener("message relayed to " + id));
    }
});

 
7、运行
Netty 允许我们使用EmbeddedChannel测试管道,但对于客户端/服务器交互,让我们看看从终端运行时它是什么样子。让我们启动服务器(为了便于阅读,我们将省略包名称):

$ mvn exec:java -Dexec.mainClass=ChatServerMain
chat server started. ready to accept clients.


然后,让我们启动第一个客户端,输入名称,然后发送两条消息:

$ mvn exec:java -Dexec.mainClass=ChatClientMain
2024-01-12 3:47:02 - channel#03c40ad4 OK: connected to server
your name: Bob
> Hello
2024-01-12 3:47:02 - channel#03c40ad4 OK: message sent
> Anyone there?!
2024-01-12 3:47:03 - channel#03c40ad4 OK: message sent


当我们与第二个客户端连接时,我们将在输入名称之前获取消息历史记录:

$ mvn exec:java -Dexec.mainClass=ChatClientMain
2024-01-12 3:49:33 - channeldaa64476 OK: connected to server
2024-01-12 3:46:55 - system: client online: 03c40ad4
2024-01-12 3:47:03 - Bob: Hello
2024-01-12 3:48:40 - Bob: Anyone there?!


当然,在选择名称并发送消息后:

your name: Alice
> Hi, Bob!
2024-01-12 3:51:05 - channeldaa64476 OK: message sent
第一个客户端将收到它:

2024-01-12 3:49:33 - system: client online: daa64476
2024-01-12 3:51:05 - Alice: Hi, Bob!