在Netty聊天室应用程序中自定义事件处理程序和侦听器

Netty是一个用 Java 构建高性能、可扩展的网络应用程序的框架。它的关键特性之一是事件驱动的架构,它使我们能够有效地处理网络事件。在本文中,我们将深入探讨如何在 Netty 聊天室应用程序中自定义事件处理程序和侦听器。

Netty的事件模型
Netty 是围绕通道和处理程序的概念构建的。当网络事件发生时(例如接收数据或新连接),Netty会触发由特定事件处理程序处理的事件。可以自定义这些处理程序以根据事件类型执行各种任务。

什么是事件处理程序
Netty 中的自定义事件处理程序是该ChannelHandler接口的实现。该接口定义了处理各种通道事件的方法,例如通道激活、数据读取、通道关闭和异常。通过扩展SimpleChannelInboundHandler,我们可以有选择地重写与我们想要处理的事件相对应的方法。

Netty中的监听器可以使用该接口来实现ChannelFutureListener。这个接口允许我们指定当a上的某个操作ChannelFuture完成时应该执行的任务。例如,我们可以定义一个侦听器来处理写入操作完成后的操作。

使用 Netty 构建聊天室应用程序
让我们使用 Netty 创建一个简单的聊天室应用程序。我们将使用自定义事件处理程序来管理传入消息并向所有连接的客户端广播。下面详细介绍了如何利用自定义事件处理程序和侦听器来创建聊天室应用程序:

  • 聊天服务器处理程序:此处理程序应扩展SimpleChannelInboundHandler<String>.它应该实现channelRead0处理传入聊天消息的方法。收到消息后,它应该将其广播给聊天室中所有连接的用户。
  • 聊天客户端处理程序:此处理程序还应该扩展SimpleChannelInboundHandler<String>.它应该处理从服务器收到的传入聊天消息并将其显示给用户。此外,它应该实现处理用户输入并向服务器发送消息的方法。
  • 用户管理:服务器需要维护已连接用户的列表。这可以通过使用List<Channel>存储已连接客户端通道的列表来实现。

设置依赖关系
首先,我们确保已设置 Maven 来管理项目依赖项。在您的pom.xml(对于 Maven)中包含以下 Netty 依赖项:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.105.Final</version>
</dependency>

实现聊天服务器
首先,我们将创建一个聊天服务器ChatServer.java,将消息回显给所有连接的客户端。

聊天服务器
这是负责引导服务器通道并启动服务器的服务器代码。此代码将服务器绑定到指定端口,它将在其中主动侦听传入连接。

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.List;
import java.util.ArrayList;

public class ChatServer {

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new ChatServerHandler());
                        }
                    });

            ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
            System.out.println("Chat server started successfully and is ready to accept clients.");
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

上面的代码块设置并初始化一个基于 Netty 的聊天服务器,定义它如何处理传入连接、处理 I/O 操作和管理客户端通信。

首先,我们创建两个EventLoopGroup实例:bossGroup用于处理传入连接和workerGroup用于处理 I/O 工作。

我们初始化一个ServerBootstrap实例来设置和配置服务器,并将 和 与 分别关联bossGroup以workerGroup处理ServerBootstrap传入连接和 I/O 操作。接下来,我们指定NioServerSocketChannel为通道类型,表示使用基于 NIO 的服务器套接字来接受传入连接。

接下来,我们设置一个来配置服务器创建的ChannelInitializer新实例,并为每个实例定义一个管道,其中包括一个用于解码传入消息的管道和一个用于编码传出消息的管道。我们还向管道添加了一个自定义,负责处理传入消息和管理客户端连接。SocketChannelSocketChannelStringDecoderStringEncoderChatServerHandler

8888接下来,我们使用绑定服务器到端口serverBootstrap.bind(8888).sync(),启动服务器的监听进程,并使用 ( channelFuture.channel().closeFuture().sync()) 保持服务器运行直到终止。

聊天服务器处理程序
这是聊天服务器使用的处理程序。

class ChatServerHandler extends SimpleChannelInboundHandler<String> {

    // List of connected client channels.
    static final List<Channel> connectedUsers = new ArrayList<>();

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(
"Client connected: " + ctx.channel().remoteAddress());
        connectedUsers.add(ctx.channel());
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, String receivedMessage) throws Exception {

        System.out.println(
"Server Received message: " + receivedMessage);

       
// Broadcast the received message to all connected clients
        for (Channel channel : connectedUsers) {
            channel.writeAndFlush(
" " + receivedMessage + '\n');
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

实现聊天客户端
接下来,让我们实现一个连接到我们的服务器的简单聊天客户端。

聊天客户端

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;

public class ChatClient {

    static String clientName;

    public static void main(String[] args) throws Exception {

        // Get name of the user for this chat session.       
        Scanner scanner = new Scanner(System.in);
        System.out.println(
"Please enter your name: ");
        if (scanner.hasNext()) {
            clientName = scanner.nextLine();
            System.out.println(
"Welcome " + clientName);
        }

        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new ChatClientHandler());
                        }
                    });

            ChannelFuture cf = bootstrap.connect(
"localhost", 8888).sync();

            while (scanner.hasNext()) {
                String input = scanner.nextLine();
                Channel channel = cf.sync().channel();
                channel.writeAndFlush(
"[" + clientName + "]: " + input);
                channel.flush();
            }

            cf.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

上面的代码配置了一个Bootstrap用于设置客户端的实例。在此类中,我们指定NioSocketChannel用于网络通信的 a 并设置 aChannelInitializer来定义通道的初始化方式。StringDecoder并StringEncoder添加到管道中以处理入站和出站消息。我们还ChatClientHandler向管道添加了一个自定义来处理传入消息。

接下来,我们使用配置localhost的端口连接到运行的聊天服务器,并使用( ) 进行异步操作并连接到服务器。8888BootstrapChannelFuturecf

发送消息
该main方法通过要求用户使用 输入姓名来开始执行聊天客户端Scanner。为了发送消息,程序进入一个循环,使用 连续读取用户输入,从( )Scanner检索通道,并以 格式将用户输入发送到服务器。ChannelFuturecf"[clientName]: message"

聊天客户端处理程序类
下面是为客户端打印聊天消息的通道处理程序类。

class ChatClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    public void channelRead0(ChannelHandlerContext ctx, String receivedMessage) throws Exception {
        System.out.println("Received message: " + receivedMessage);
    }

}

运行聊天应用程序
以下是执行此示例的步骤。首先,使用以下命令在单独的终端窗口中编译并运行服务器类 ( ChatServer.java),并等待它启动并准备好接受客户端。留意日志中是否有任何消息,如图 1.0 所示


mvn exec:java -Dexec.mainClass=com.jcg.chatserver.ChatServer

服务器启动并运行后,在单独的终端窗口中启动两个或多个客户端实例,并ChatClient.java)使用以下命令运行 (:

mvn exec:java -Dexec.mainClass=com.jcg.chatclient.ChatClient

在控制台中提示时输入用户名,然后通过控制台输入交换聊天消息并观察消息传递到其他客户端。