高性能聊天系统
作者:板桥banq
1.3.3 客户端实现
为了使客户端的I/O读写能够保持流畅,客户端的Socket读写也使用非堵塞I/O实现,在前面章节中已经讨论过,由于非堵塞I/O本身类似一个独立自主的Reactor模式,而客户端界面输入也是一个事件监视模式,因此,要实现这两个独立模式之间的数据通信,需要使用队列Queue模式。
首先建立TCP客户端非堵塞I/O类TCPClient如下(程序1-6):
程序1-6
public class TCPClient extends Thread {
private final static String module = TCPClient.class.getName();
//这是一个消息队列,用于和前台客户端界面输入实现通信
private final static MessageList messageList = MessageList.getInstance();
private InetSocketAddress socketAddress;
private Selector selector;
private SocketHelper socketHelper; //Socket读写帮助
public TCPClient(String url, int port) {
try {
socketAddress = new InetSocketAddress(url, port);
selector = Selector.open();
openSocketChannel(); //开启一个SocketChannel
socketHelper = new SocketHelper();
} catch (Exception e) {
Debug.logError("init error:" + e, module);
}
}
//直接开启一个SocketChannel
private SocketChannel openSocketChannel() {
SocketChannel channel = null;
try {
channel = SocketChannel.open();
channel.configureBlocking(false);
channel.connect(this.socketAddress); //绑定SocketAddress
channel.register(selector, SelectionKey.OP_CONNECT); //注册OP_CONNECT事件
} catch (Exception e) {
Debug.logError(e, module);
}
return channel;
}
public void run() { //线程运行方法
try {
while (!Thread.interrupted()) {
if (selector.select(30) > 0) { //为防止底层堵塞,设置TimeOutt事件
doSelector(selector);
}
}
} catch (Exception e) {
Debug.logError("run error:" + e, module);
}
}
//分别获取触发的事件对象SelectionKey
private void doSelector(Selector selector) throws Exception {
Set readyKeys = selector.selectedKeys();
Iterator readyItor = readyKeys.iterator();
while (readyItor.hasNext()) {
SelectionKey key = (SelectionKey) readyItor.next();
readyItor.remove();
doKey(key);
readyKeys.clear();
}
}
private void doKey(SelectionKey key) throws Exception {
SocketChannel keyChannel = null;
try {
keyChannel = (SocketChannel) key.channel();
if (key.isConnectable()) { //如果连接成功
if (keyChannel.isConnectionPending()) {
keyChannel.finishConnect();
}
Debug.logVerbose(" connected the server", module);
sendRequest(keyChannel); //首先发送数据
key.interestOps(SelectionKey.OP_READ); //注册为可写
} else if (key.isReadable()) { //如果可以从服务器读取response数据
readResponse(keyChannel);
key.interestOps(SelectionKey.OP_WRITE);
} else if (key.isWritable()) { //如果可以向服务器发送request数据
sendRequest(keyChannel);
key.interestOps(SelectionKey.OP_READ);
}
} catch (Exception e) {
Debug.logError("run error:" + e, module);
socketHelper.close(keyChannel);
throw new Exception(e);
}
}
//向服务器发送信息
private void sendRequest(SocketChannel keyChannel) throws Exception {
try {
Message request = messageList.removeReqFirst(); //获取队列中的数据
String strs = (String)request.getObject();
Debug.logVerbose(" send the request to the server =" + strs, module);
//写入Socket
socketHelper.writeSocket(strs.getBytes("UTF-8"), keyChannel);
} catch (Exception e) {
Debug.logError(e, module);
throw new Exception(e);
}
}
//从服务器读取信息
private void readResponse(SocketChannel keyChannel) throws Exception {
try {
byte[] bytes = socketHelper.readSocket(keyChannel); //从Socket读取数组字节
Debug.logVerbose(" read the response from the server:" +
new String(bytes), module);
//实现其他处理,如在客户端屏幕显示服务器的反应信息
} catch (Exception e) {
Debug.logError(e, module);
throw new Exception(e);
}
}
public static void main(String[] args) {
Debug.logVerbose("Starting client...", module);
try {
String url = "220.112.110.61";
int port = 81;
TCPClient nonBlockingSocket = new TCPClient(url, port);
nonBlockingSocket.start();
Debug.logVerbose("create a request ...", module);
//连续发送100条信息到客户端
for(int i=0; i<100; i++){
Message request = new ObjectMessage(i + "hello I am Peng" + i);
messageList.pushRequest(request);
}
} catch (Exception e) {
Debug.logError("Client start error:" + e);
}
Debug.logVerbose("Client started ...", module);
}
}
该TCPClient代码与服务器端的代码结构有所不同,其实可以一样的实现。这里只是说明一下非堵塞I/O API的多种调用写法。
与服务器端不同的是,这里的Reactor模式是在一个线程类中实现,这对于小数量的客户端I/O来说是可以允许的。但是在服务器端,由于有很多连接,如果像客户端这样,将Socket的读写操作和Socket侦听合并在一个线程中完成,会降低服务器的处理性能。因此在服务器端专门设立了线程类Handler来处理Socket的读写操作,将读写操作委托给Handler线程后,Reactor自己可以有更多精力做好侦听工作。当然,对于繁忙的服务器,也可以设立多个Reactor同时侦听,这样服务器的灵敏度就更高。
相比而下,客户端的I/O灵敏度无需如此复杂,只要能保持流畅读写就可以,因此整个Socket侦听和读写都集中在一个类中实现。
为了对TCP的非堵塞I/O进行测试,实现了下列语句:
for(int i=0; i<100; i++){
Message request = new ObjectMessage(i + "hello I am Peng" + i);
messageList.pushRequest(request);
}
这是向服务器端连续发送了100个信息请求。实现这样的功能,在堵塞I/O中只要直接调用Socket向里面写数据就可以了,但是在非堵塞I/O中,什么时候能读、什么时候能写不能在代码编写时决定,只能在运行时,根据事件触发来实现。因此就使用了一个队列Queue,只要把需要发送的信息数据放在这个Queue中,然后由Reactor根据自己的情况从Queue中读取发送出去。
messageList.pushRequest()方法就是向这个Queue中放入数据。为了防止Reactor在Queue中没有数据时还在不断地读取,这里使用了线程的触发机制,当Queue中为空时,读取Queue的线程处于等待暂停状态;一旦有数据放入,就触发读取线程开始读取。这样也是为了防止读取线程发生堵塞,完全独霸CPU,导致其他线程不能正常运行。
MessageList代码如下(程序1-7):
程序1-7
/**
* <p>Copyright: Jdon.com Copyright (c) 2003</p>
* <p>Company: 上海极道计算机技术有限公司</p>
* @author banq
* @version 1.0
*/
public class MessageList {
private final static String module = MessageList.class.getName();
//Request信号的Queue
private LinkedList requestList = new LinkedList();
//Response信号的Queue
private LinkedList responseList = new LinkedList();
//使用单态模式保证当前JVM中只有一个MessageList实例
private static MessageList messageList = new MessageList();
public static MessageList getInstance(){
return messageList;
}
//加入数据
public void pushRequest(Message requestMsg) {
synchronized (requestList) {
requestList.add(requestMsg);
requestList.notifyAll(); //提醒锁在requestList的其他线程
}
}
//取出Queue中第一数据
public synchronized Message removeReqFirst() {
synchronized (requestList) {
// 如果没有数据,就锁定在这里
while (requestList.isEmpty()) {
try {
requestList.wait(); //等待解锁 等待加入数据后的提醒
} catch (InterruptedException ie) {}
}
return (Message) requestList.removeFirst();
}
}
}
在这个MessageList中有两个LinkeList,分别是请求信号Queue和响应信号Queue,为了确保MessageList是全局惟一的实例,这里使用了单态模式。
单态模式就是一种保证一个类只有一个实例的模式,单态模式在Java中经常使用,该模式将在以后章节详细介绍。
在removeReqFirst()方法中,如果当前Queue中为空,就实现线程锁等待,这样节省了CPU占用时间,实现了高效率运行。当pushRequest方法被调用时,通过requestList.notifyAll()通知所有锁住requestList等线程将可以继续运行。虽然MessageList本身不是一个线程,但是它的方法是提供线程调用的。
MessageList在本系统设计中非常重要,这将在以后章节进一步讨论。
实现完成客户端测试程序后,可以进行连接测试,先启动服务器端Socket,然后直接启动TCPCLient,客户端屏幕显示结果如下:
03-8-15 11:49:17 connected the server
03-8-15 11:49:17 send the request to the server =0hello I am Peng0
03-8-15 11:49:17 read the response from the server: com back
03-8-15 11:49:17 Client started ...
03-8-15 11:49:17 send the request to the server =1hello I am Peng1
03-8-15 11:49:17 read the response from the server: com back
03-8-15 11:49:17 send the request to the server =2hello I am Peng2
03-8-15 11:49:17 read the response from the server: com back
03-8-15 11:49:17 send the request to the server =3hello I am Peng3
…
服务器端屏幕结果输出如下:
03-8-15 14:28:40 [com.jdon.jserver.Server] begin to read config file
03-8-15 14:28:40 [com.jdon.jserver.Server] Server Port=81
03-8-15 14:28:40 [Debug:Verbose] -->Start host:peng-althon/220.112.110.61 port=81
03-8-15 14:28:40 [Debug:Verbose] -->Start serverSocket.register!
03-8-15 14:28:40 [Debug:Verbose] -->attach(new Acceptor()!
03-8-15 14:28:40 [Debug:com.jdon.jserver.Server:Verbose] Server started ...
ge result is :0hello I am Peng0
ge result is :1hello I am Peng1
ge result is :2hello I am Peng2
该测试试验结果表明,以Reactor模式建立的Socket底层网络通信已经正常运行,可以在其基础上进行应用层的深入开发。当然首先要做好Socket底层和应用层的接口工作,一个具有良好的拓展性和伸缩性的接口系统可以保证Socket底层代码的最大重用性,从而为平台服务器软件的开发奠定坚实的基础。