高性能聊天系统
作者:板桥banq
1.3.2 UDP实现
1.3.1节是实现了TCP的Socket实现,在本系统中,最经常使用的还是UDP。UDP的Socket实现相比TCP要简单一点,只有两个连接事件:OP_READ和OP_WRITE。因为没有连接,所以没有OP_ACCEPT或OP_CONNECT这样建立连接的事件发生。
整个流程顺序类似TCP的实现,创建UDPReactor类如下(程序1-4):
程序1-4
public class UDPReactor implements Runnable {
private final static String module = UDPReactor.class.getName();
private final Selector selector;
public UDPReactor(int port) throws IOException {
selector = Selector.open();
InetSocketAddress address =
new InetSocketAddress(InetAddress.getLocalHost(), port);
DatagramChannel channelRec = openDatagramChannel();
channelRec.socket().bind(address); //绑定socketAddress
//向selector注册该channel
SelectionKey key = channelRec.register(selector, SelectionKey.OP_READ);
key.attach(new UDPHandler(key, channelRec));
}
//生成一个DatagramChannel实例
private DatagramChannel openDatagramChannel() {
DatagramChannel channel = null;
try {
channel = DatagramChannel.open();
channel.configureBlocking(false);
} catch (Exception e) {
Debug.logError(e, module);
}
return channel;
}
public void run() { // normally in a new Thread
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext())
//来一个事件 第一次触发一个accepter线程
//以后触发SocketReadHandler
dispatch( (SelectionKey) (it.next()));
selected.clear();
}
} catch (IOException ex) {
Debug.logError("reactor stop!" + ex, module);
}
}
//运行Acceptor或SocketReadHandler
private void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
if (r != null) {
Debug.logVerbose("-->dispatch running");
r.run();
}
}
}
与TCP不同的就是使用DatagramChannel来代替了TCP中的ServerSocketChannel和SocketChannel,DatagramChannel是专门用于UDP的SelectableChannel。
另外一个不同点是,UDP中没有了Acceptor类,不用专门处理建立事件的相关事件,直接读取或写入数据,建立Handler如下(程序1-5):
程序1-5
public class UDPHandler implements Runnable {
private final static String module = UDPHandler.class.getName();
private final DatagramChannel datagramChannel;
private final SelectionKey key;
private static final int READING = 0, SENDING = 1;
private int state = READING;
private SocketAddress address = null;
public UDPHandler(SelectionKey key, DatagramChannel datagramChannel) throws
IOException {
this.datagramChannel = datagramChannel;
this.key = key;
Debug.logVerbose(" UDPHandler prepare ...", module);
}
public void run() { //线程运行方法
Debug.logVerbose(" UDPHandler running ...", module);
try {
if (state == READING) read();
else if (state == SENDING) send();
} catch (Exception ex) {
Debug.logError("readRequest .. error:" + ex, module);
close(datagramChannel);
key.cancel();
}
}
//从datagramChannel读取数据
private void read() throws Exception {
try {
//这里应该是聊天服务器处理聊天信息的核心功能
//下面只是测试用代码,直接从客户端读取字符串并打印出来
byte[] array = new byte[512];
ByteBuffer buffer = ByteBuffer.wrap(array);
address = datagramChannel.receive(buffer);
String str = new String(array);
System.out.println("handlePendingReads() :" + str);
state = SENDING;
key.interestOps(SelectionKey.OP_WRITE); //注册为可写事件
} catch (Exception ex) {
Debug.logError("readRequest .. error:" + ex, module);
}
}
//向datagramChannel写入数据
private void send() throws Exception {
try {
//以下是测试发送数据
String request = "come back";
ByteBuffer buffer1 = ByteBuffer.wrap(request.getBytes());
datagramChannel.send(buffer1, address);
System.out.println("send :" + request);
state = READING;
key.interestOps(SelectionKey.OP_READ);
} catch (Exception ex) {
Debug.logError("readRequest .. error:" + ex, module);
}
}
…
}
在这里,Handler中实现了从datagramChannel读取数据和写入数据功能。
至此,整个服务器底层核心功能基本完成。这部分功能的性能很大程度上影响了整个系统的运行性能,因此需要对之实行单独性能测试。为了方便测试,先要开发出客户端测试程序,通过多台客户端多线程地不断对服务器进行连接访问和数据传送,从而可以对Java的这些最新技术提供的性能参数有一个彻底实际的了解。