高性能聊天系统

  作者:板桥banq

上页

1.3.3  客户端实现

为了使客户端的I/O读写能够保持流畅,客户端的Socket读写也使用非堵塞I/O实现,在前面章节中已经讨论过,由于非堵塞I/O本身类似一个独立自主的Reactor模式,而客户端界面输入也是一个事件监视模式,因此,要实现这两个独立模式之间的数据通信,需要使用队列Queue模式。

首先建立TCP客户端非堵塞I/OTCPClient如下(程序1-6):

程序1-6

public class TCPClient extends Thread {

  private final static String module = TCPClient.class.getName();

  //这是一个消息队列,用于和前台客户端界面输入实现通信

  private final static MessageList messageList = MessageList.getInstance();

 

  private InetSocketAddress socketAddress;

  private Selector selector;

  private SocketHelper socketHelper;                                         //Socket读写帮助

 

  public TCPClient(String url, int port) {

    try {

      socketAddress = new InetSocketAddress(url, port);

      selector = Selector.open();

      openSocketChannel();                                                       //开启一个SocketChannel

      socketHelper = new SocketHelper();

    } catch (Exception e) {

      Debug.logError("init error:" + e, module);

    }

  }

  //直接开启一个SocketChannel

 private SocketChannel openSocketChannel() {

    SocketChannel channel = null;

    try {

      channel = SocketChannel.open();

      channel.configureBlocking(false);

      channel.connect(this.socketAddress);                               //绑定SocketAddress

      channel.register(selector, SelectionKey.OP_CONNECT); //注册OP_CONNECT事件

    } catch (Exception e) {

      Debug.logError(e, module);

    }

    return channel;

  }

  public void run() {                                                                   //线程运行方法

    try {

      while (!Thread.interrupted()) {

        if (selector.select(30) > 0) {                                          //为防止底层堵塞,设置TimeOutt事件

         doSelector(selector);

        }

      }

    } catch (Exception e) {

      Debug.logError("run error:" + e, module);

    }

  }

  //分别获取触发的事件对象SelectionKey

  private void doSelector(Selector selector) throws Exception {

    Set readyKeys = selector.selectedKeys();

    Iterator readyItor = readyKeys.iterator();

    while (readyItor.hasNext()) {

      SelectionKey key = (SelectionKey) readyItor.next();

      readyItor.remove();

      doKey(key);

      readyKeys.clear();

    }

  }

  private void doKey(SelectionKey key) throws Exception {

    SocketChannel keyChannel = null;

    try {

      keyChannel = (SocketChannel) key.channel();

      if (key.isConnectable()) {                                     //如果连接成功

        if (keyChannel.isConnectionPending()) {

          keyChannel.finishConnect();

        }

        Debug.logVerbose(" connected the server", module);

        sendRequest(keyChannel);                               //首先发送数据

        key.interestOps(SelectionKey.OP_READ);       //注册为可写

      } else if (key.isReadable()) {                                //如果可以从服务器读取response数据

        readResponse(keyChannel);

        key.interestOps(SelectionKey.OP_WRITE);

      } else if (key.isWritable()) {                                 //如果可以向服务器发送request数据

        sendRequest(keyChannel);

        key.interestOps(SelectionKey.OP_READ);

      }

    } catch (Exception e) {

      Debug.logError("run error:" + e, module);

      socketHelper.close(keyChannel);

      throw new Exception(e);

    }

  }

  //向服务器发送信息

  private void sendRequest(SocketChannel keyChannel) throws Exception {

    try {

      Message request = messageList.removeReqFirst(); //获取队列中的数据

      String strs = (String)request.getObject();

      Debug.logVerbose(" send the request to the server =" + strs, module);

      //写入Socket

      socketHelper.writeSocket(strs.getBytes("UTF-8"), keyChannel);

    } catch (Exception e) {

      Debug.logError(e, module);

      throw new Exception(e);

    }

  }

  //从服务器读取信息

  private void readResponse(SocketChannel keyChannel) throws Exception {

    try {

      byte[] bytes = socketHelper.readSocket(keyChannel); //Socket读取数组字节

      Debug.logVerbose(" read the response from the server:" +

new String(bytes), module);

      //实现其他处理,如在客户端屏幕显示服务器的反应信息

    } catch (Exception e) {

      Debug.logError(e, module);

      throw new Exception(e);

    }

  }

  public static void main(String[] args) {

    Debug.logVerbose("Starting client...", module);

    try {

      String url = "220.112.110.61";

      int port = 81;

      TCPClient nonBlockingSocket = new TCPClient(url, port);

      nonBlockingSocket.start();

      Debug.logVerbose("create a request ...", module);

      //连续发送100条信息到客户端

      for(int i=0; i<100; i++){

        Message request = new ObjectMessage(i + "hello I am Peng" + i);

        messageList.pushRequest(request);

      }

    } catch (Exception e) {

      Debug.logError("Client start error:" + e);

    }

    Debug.logVerbose("Client started ...", module);

  }

}

TCPClient代码与服务器端的代码结构有所不同,其实可以一样的实现。这里只是说明一下非堵塞I/O API的多种调用写法。

与服务器端不同的是,这里的Reactor模式是在一个线程类中实现,这对于小数量的客户端I/O来说是可以允许的。但是在服务器端,由于有很多连接,如果像客户端这样,将Socket的读写操作和Socket侦听合并在一个线程中完成,会降低服务器的处理性能。因此在服务器端专门设立了线程类Handler来处理Socket的读写操作,将读写操作委托给Handler线程后,Reactor自己可以有更多精力做好侦听工作。当然,对于繁忙的服务器,也可以设立多个Reactor同时侦听,这样服务器的灵敏度就更高。

相比而下,客户端的I/O灵敏度无需如此复杂,只要能保持流畅读写就可以,因此整个Socket侦听和读写都集中在一个类中实现。

为了对TCP的非堵塞I/O进行测试,实现了下列语句:

for(int i=0; i<100; i++){

        Message request = new ObjectMessage(i + "hello I am Peng" + i);

        messageList.pushRequest(request);

}

这是向服务器端连续发送了100个信息请求。实现这样的功能,在堵塞I/O中只要直接调用Socket向里面写数据就可以了,但是在非堵塞I/O中,什么时候能读、什么时候能写不能在代码编写时决定,只能在运行时,根据事件触发来实现。因此就使用了一个队列Queue,只要把需要发送的信息数据放在这个Queue中,然后由Reactor根据自己的情况从Queue中读取发送出去。

messageList.pushRequest()方法就是向这个Queue中放入数据。为了防止ReactorQueue中没有数据时还在不断地读取,这里使用了线程的触发机制,当Queue中为空时,读取Queue的线程处于等待暂停状态;一旦有数据放入,就触发读取线程开始读取。这样也是为了防止读取线程发生堵塞,完全独霸CPU,导致其他线程不能正常运行。

MessageList代码如下(程序1-7):

程序1-7

/**

 * <p>Copyright: Jdon.com Copyright (c) 2003</p>

 * <p>Company: 上海极道计算机技术有限公司</p>

 * @author banq

 * @version 1.0

 */

public class MessageList {

  private final static String module = MessageList.class.getName();

  //Request信号的Queue

  private LinkedList requestList = new LinkedList();

  //Response信号的Queue

  private LinkedList responseList = new LinkedList();

  //使用单态模式保证当前JVM中只有一个MessageList实例

  private static MessageList messageList =  new MessageList();

  public static MessageList getInstance(){

     return messageList;

  }

  //加入数据

  public void pushRequest(Message requestMsg) {

    synchronized (requestList) {

      requestList.add(requestMsg);

      requestList.notifyAll();  //提醒锁在requestList的其他线程

    }

  }

  //取出Queue中第一数据

  public synchronized Message removeReqFirst() {

    synchronized (requestList) {

      // 如果没有数据,就锁定在这里

      while (requestList.isEmpty()) {

        try {

          requestList.wait(); //等待解锁 等待加入数据后的提醒

        } catch (InterruptedException ie) {}

      }

      return (Message) requestList.removeFirst();

    }

  }

}

在这个MessageList中有两个LinkeList,分别是请求信号Queue和响应信号Queue,为了确保MessageList是全局惟一的实例,这里使用了单态模式。

单态模式就是一种保证一个类只有一个实例的模式,单态模式在Java中经常使用,该模式将在以后章节详细介绍。

removeReqFirst()方法中,如果当前Queue中为空,就实现线程锁等待,这样节省了CPU占用时间,实现了高效率运行。当pushRequest方法被调用时,通过requestList.notifyAll()通知所有锁住requestList等线程将可以继续运行。虽然MessageList本身不是一个线程,但是它的方法是提供线程调用的。

MessageList在本系统设计中非常重要,这将在以后章节进一步讨论。

实现完成客户端测试程序后,可以进行连接测试,先启动服务器端Socket,然后直接启动TCPCLient,客户端屏幕显示结果如下:

03-8-15 11:49:17   connected the server

03-8-15 11:49:17   send the request to the server =0hello I am Peng0

03-8-15 11:49:17   read the response from the server: com back

03-8-15 11:49:17   Client started ...

03-8-15 11:49:17   send the request to the server =1hello I am Peng1

03-8-15 11:49:17   read the response from the server: com back

03-8-15 11:49:17   send the request to the server =2hello I am Peng2

03-8-15 11:49:17   read the response from the server: com back

03-8-15 11:49:17   send the request to the server =3hello I am Peng3

服务器端屏幕结果输出如下:

03-8-15 14:28:40 [com.jdon.jserver.Server] begin to read config file

03-8-15 14:28:40 [com.jdon.jserver.Server] Server Port=81

03-8-15 14:28:40 [Debug:Verbose] -->Start host:peng-althon/220.112.110.61 port=81

03-8-15 14:28:40 [Debug:Verbose] -->Start serverSocket.register!

03-8-15 14:28:40 [Debug:Verbose] -->attach(new Acceptor()!

03-8-15 14:28:40 [Debug:com.jdon.jserver.Server:Verbose] Server started ...

ge result is :0hello I am Peng0

ge result is :1hello I am Peng1

ge result is :2hello I am Peng2

该测试试验结果表明,以Reactor模式建立的Socket底层网络通信已经正常运行,可以在其基础上进行应用层的深入开发。当然首先要做好Socket底层和应用层的接口工作,一个具有良好的拓展性和伸缩性的接口系统可以保证Socket底层代码的最大重用性,从而为平台服务器软件的开发奠定坚实的基础。

下页