高性能聊天系统

  作者:板桥banq

上页

1.3.2  UDP实现

1.3.1节是实现了TCPSocket实现,在本系统中,最经常使用的还是UDPUDPSocket实现相比TCP要简单一点,只有两个连接事件:OP_READOP_WRITE。因为没有连接,所以没有OP_ACCEPTOP_CONNECT这样建立连接的事件发生。

整个流程顺序类似TCP的实现,创建UDPReactor类如下(程序1-4):

程序1-4

public class UDPReactor implements Runnable {

  private final static String module = UDPReactor.class.getName();

  private final Selector selector;

  public UDPReactor(int port) throws IOException {

    selector = Selector.open();

    InetSocketAddress address =

        new InetSocketAddress(InetAddress.getLocalHost(), port);

    DatagramChannel channelRec = openDatagramChannel();

    channelRec.socket().bind(address); //绑定socketAddress

    //selector注册该channel

    SelectionKey key = channelRec.register(selector, SelectionKey.OP_READ);

    key.attach(new UDPHandler(key, channelRec));

  }

  //生成一个DatagramChannel实例

  private DatagramChannel openDatagramChannel() {

    DatagramChannel channel = null;

    try {

      channel = DatagramChannel.open();

      channel.configureBlocking(false);

    } catch (Exception e) {

      Debug.logError(e, module);

    }

    return channel;

  }

 

  public void run() { // normally in a new Thread

    try {

      while (!Thread.interrupted()) {

        selector.select();

        Set selected = selector.selectedKeys();

        Iterator it = selected.iterator();

        while (it.hasNext())

          //来一个事件 第一次触发一个accepter线程

          //以后触发SocketReadHandler

          dispatch( (SelectionKey) (it.next()));

        selected.clear();

      }

    } catch (IOException ex) {

      Debug.logError("reactor stop!" + ex, module);

    }

  }

  //运行AcceptorSocketReadHandler

  private void dispatch(SelectionKey k) {

    Runnable r = (Runnable) (k.attachment());

    if (r != null) {

      Debug.logVerbose("-->dispatch running");

      r.run();

    }

  }

}

TCP不同的就是使用DatagramChannel来代替了TCP中的ServerSocketChannelSocketChannelDatagramChannel是专门用于UDPSelectableChannel

另外一个不同点是,UDP中没有了Acceptor类,不用专门处理建立事件的相关事件,直接读取或写入数据,建立Handler如下(程序1-5):

程序1-5

public class UDPHandler implements Runnable {

  private final static String module = UDPHandler.class.getName();

  private final DatagramChannel datagramChannel;

  private final SelectionKey key;

 

  private static final int READING = 0, SENDING = 1;

  private int state = READING;

  private SocketAddress address = null;

 

  public UDPHandler(SelectionKey key, DatagramChannel datagramChannel) throws

      IOException {

    this.datagramChannel = datagramChannel;

    this.key = key;

    Debug.logVerbose(" UDPHandler prepare ...", module);

  }

 

  public void run() {  //线程运行方法

    Debug.logVerbose(" UDPHandler running ...", module);

    try {

      if (state == READING)      read();

      else if (state == SENDING)        send();

    } catch (Exception ex) {

      Debug.logError("readRequest .. error:" + ex, module);

      close(datagramChannel);

      key.cancel();

    }

  }

  //datagramChannel读取数据

  private void read() throws Exception {

    try {

      //这里应该是聊天服务器处理聊天信息的核心功能

      //下面只是测试用代码,直接从客户端读取字符串并打印出来

      byte[] array = new byte[512];

      ByteBuffer buffer = ByteBuffer.wrap(array);

      address = datagramChannel.receive(buffer);

      String str = new String(array);

      System.out.println("handlePendingReads() :" + str);

      state = SENDING;

      key.interestOps(SelectionKey.OP_WRITE);  //注册为可写事件

    } catch (Exception ex) {

      Debug.logError("readRequest .. error:" + ex, module);

    }

  }

  //datagramChannel写入数据

  private void send() throws Exception {

    try {

      //以下是测试发送数据

      String request = "come back";

      ByteBuffer buffer1 = ByteBuffer.wrap(request.getBytes());

      datagramChannel.send(buffer1, address);

      System.out.println("send :" + request);

      state = READING;

      key.interestOps(SelectionKey.OP_READ);

    } catch (Exception ex) {

      Debug.logError("readRequest .. error:" + ex, module);

    }

  }

 

}

在这里,Handler中实现了从datagramChannel读取数据和写入数据功能。

至此,整个服务器底层核心功能基本完成。这部分功能的性能很大程度上影响了整个系统的运行性能,因此需要对之实行单独性能测试。为了方便测试,先要开发出客户端测试程序,通过多台客户端多线程地不断对服务器进行连接访问和数据传送,从而可以对Java的这些最新技术提供的性能参数有一个彻底实际的了解。

 

下页