高性能聊天系统

  作者:板桥banq

上页

1.3  Socket核心设计和实现

通过前面的系统设计,基本解决了本系统实现的主要技术问题,下面将就具体的实现细节展开讨论。

整个系统的核心底层是非堵塞I/O技术,通过使用这一新技术,可以实现底层网络I/O的无堵塞、流畅地读写,为整个系统的高性能运行奠定了坚实的基础。

非堵塞的Socket I/O有两大部分:服务器和客户端。在两端都将采取这一新技术,根据TCP/UDP不同,又分别有两套Socket详细实现。

非堵塞的Socket I/O实现和以往堵塞I/O的实现在编程上有所不同,以前堵塞I/OSocket读写是一种被动行为,即new Socket这些语句可以根据自己系统的应用要求放置在任何位置,可以由程序员自己任意安排的,而非堵塞I/OSocket读写则不一样,它类似一个主动的、有自己“意志”行为的独立线程(因为使用了Reactor模式),所以,什么时候读取数据,什么时候写入数据不是由程序员自己能掌控的,是由Selector决定的。

因此这两种I/O模式的不同使用,决定了不同的编程模式和思维习惯,从堵塞I/O转到非堵塞I/O上,对程序员有些考验。

下面将先讨论服务器的Socket非堵塞I/O实现。

1.3.1  TCPReactor模式

Reactor模式是属于一种自我触发、自我激活的模式非堵塞I/OSelector实现了Reactor模式主要部分将连接事件自我触发SelectionKey事件形式爆发出来。

因此,只要建立一个线程类,反复检查Selector中是否有触发的SelectionKey,如果有,就再次触发对这些事件进行相应的处理,建立一个名为Reactor的线程类。在这个Reactor类中,将使用Selector的事件触发机制,触发本应用系统的事件处理机制。

首先以基于TCP连接的Socket实现为例,在Doug Lea 的《Scalable IO in Java》这本电子文档中,使用Reactor模式很好地实现了事件的自触发机制,如图1-8所示。

nio

1-8  服务器处理连接事件

在图1-8ReactorSelector注册了一个关于TCP的连接事件OP_ACCEPT是否有可接受的连接事件)。当客户端第一次开始连接服务器时OP_ACCEPT事件将激活Reactor将检测到这个激活事件对象SelectionKeySelectionKeyattachment中获取Acceptor线程对象直接运行Acceptor线程。

Acceptor将完成两件事情:

1)向Selector注册了一个新的连接事件OP_READ(是否可以读取数据)。这是假设客户端一旦连接上服务器后,将首先向服务器发送数据,一旦TCP连接握手成功,服务器首先要处于准备读取数据的状态。

2)更改SelectionKey中的attachment,修改为Handler线程对象,这是一个处理读取或写入数据的线程类。

当客户端发送数据到服务器时,可读取事件OP_READ发生了,Reactor又检测到这个事件对象SelectionKey,从SelectionKeyattachment中获取Handler线程对象,立即运行这个线程。

Handler线程从SelectionKey中提取SocketChannel,再从这个Channel中读取数据,然后向Selector注册一个新的连接事件OP_WRITE,以便服务器在处理完成读取的数据后,再写入发送到客户端。

OP_WRITER事件发生时,Handler线程又开始运行,这次是向SocketChannel写入数据,写入完成后,向Selector再注册新的连接事件OP_READ,这样一个请求/响应模式的数据处理基本完成,准备进入下一个循环。

创建Reactor类如下(程序1-1):

程序1-1

public class TCPReactor implements Runnable {

  private final static String module = TCPReactor.class.getName();

  private final Selector selector;               //Selector 实例

  private final ServerSocketChannel sc;       //SeletableCannel一个实现

 

  public TCPReactor (int port) throws IOException {

    selector = Selector.open();                      //创建Selector实例

    sc = ServerSocketChannel.open();         //创建ServerSocketChannel实例

    InetSocketAddress address =

        new InetSocketAddress(InetAddress.getLocalHost(), port);

    sc.socket().bind(address);                      //绑定ServerSocketChannel

    Debug.logVerbose("-->Start host:"+ InetAddress.getLocalHost()+" port=" + port);

    sc.configureBlocking(false);                  //设置为非堵塞

    //selector注册该channel感兴趣的事件为OP_ACCEPT

    SelectionKey sk = sc.register(selector, SelectionKey.OP_ACCEPT);

    //利用skattache功能绑定Acceptor 如果有事件触发Acceptor

    sk.attach(new Acceptor(selector, sc));

    Debug.logVerbose("-->attach(new Acceptor()!");

  }

 

  public void run() {

    try {

      while (!Thread.interrupted()) {  //反复运行,检查是否有触发的key

        selector.select();

        Set selected = selector.selectedKeys();

        Iterator it = selected.iterator();

        //Selector如果发现channel有事件发生,进行key的遍历

        while (it.hasNext())

            //来一个事件 第一次触发一个accepter线程

            //以后触发SocketReadHandler

            dispatch( (SelectionKey) (it.next()));

        selected.clear();

       }

    } catch (IOException ex) {

      Debug.logError("reactor stop!" + ex, module);

    }

  }

   //运行AcceptorSocketReadHandler

  private void dispatch(SelectionKey k) {

    Runnable r = (Runnable) (k.attachment());

    if (r != null) {

         Debug.logVerbose("-->dispatch running");

         r.run();

    }

  }

}

在线程的run()方法中,通过while (!Thread.interrupted())语句不断地对Selector进行事件检查,一旦有事先注册的关注的事件发生,运行dispatch(SelectionKey k)方法进行分配处理,在dispatch方法中,从SelectionKeyattachment中获得的一个线程,然后启动这个线程,这样,获取发生事件后,同时也驱动了对事件的进一步处理。

那么,这个线程是如何被赋予SelectionKey呢?原来在前面有一句:

sk.attach(new Acceptor(selector, sc));

SlectionKey有两种处理附件attachment的方法:

public abstract class SelectionKey

{

 

public final Object attach (Object ob)   //类似setAttachment

public final Object attachment( )       //类似getAttachment

}

首先通过attach将一个对象和SelectionKey发生联系,然后再通过attachment( )获得这个对象,这个对象可以是任何业务对象、处理器或另外一个Channelattach只是保存对象的引用,在使用完成这个功能后,要使用attach(null)来清除附件对象的引用,以便垃圾回收机制能够回收这个附件对象。

SelectionKey注册自己的特定对象用如下语句:

SelectionKey key = channel.register (selector, SelectionKey.OP_READ, myObject);

等同于下列语句:

SelectionKey key = channel.register (selector, SelectionKey.OP_READ);

key.attach (myObject);

本系统中是采取后者做法:

sk.attach(new Acceptor(selector, sc));

那么被attach的线程对象Accpetor是对事件实行进一步处理的,注意一下事先注册的事件是SelectionKey.OP_ACCEPT,即系统运行开始时,第一个关注的事件总是OP_ACCEPT:是否有可接受的网络连接,这是服务器运行后一直应该关注的头等事件。

如果有这样的事件发生,那么就会激活Accpetor线程对象,从而启动一个Acceptor线程,在这个线程中,将准备下一步工作,就是再向Selector注册其他事件,例如这个连接是否可以读出或是否可以写入数据等。代码如下(程序1-2):

程序1-2

public class Acceptor implements Runnable {

  private final Selector selector;

  private final ServerSocketChannel ssc;

  public Acceptor(Selector selector, ServerSocketChannel ssc) {

    this.selector = selector;

    this.ssc = ssc;

  }

 

  public void run() {

    try {

      Debug.logVerbose("-->ready for accept!");

      SocketChannel sc = ssc.accept();

      if (sc != null) {

        sc.configureBlocking(false);                             //设定为非堵塞

        SelectionKey sk = sc.register(selector, 0);        //注册这个SocketChannel

 

        //同时将SelectionKey标记为可读,以便读取

        sk.interestOps(SelectionKey.OP_READ);

        selector.wakeup();                        //因为interestOps,防止Selector死锁

        sk.attach(new Handler(sk, sc));                        //携带Handler对象

      }

    } catch (Exception ex) {

      Debug.logVerbose("accept stop!" + ex);

    }

  }

}

Accpetor中,从ServerSocketChannel获得SocketChannel实例,这两个Channel可注册的事件是不一样的,后者可以注册是否可读或可写等事件。Accpetor代码中注册了是否可以读SelectionKey.OP_READ的事件,然后attachHandler线程对象。

这样,Selector将一直关注OP_READ事件,一旦有这类事件发生,将激活attachmentHandler线程的运行。Handler在可读事件发生后启动,就是从SocketChannel中读取客户端传送的数据了,Handler代码如下(程序1-3):

程序1-3

public class TCPHandler implements Runnable {

  private final static String module = TCPHandler.class.getName();

  private final SocketChannel sc;

  private final SelectionKey sk;

  private SocketHelper socketHelper;                  //Socket读写帮助类

 

  public TCPHandler (SelectionKey  sk, SocketChannel sc) throws IOException {

    this.sc = sc;

    this.sk = sk;

    socketHelper = new SocketHelper();

    Debug.logVerbose(" SocketReadHandler prepare ...", module);

  }

 

  public void run() {                                             //线程run方法

    Debug.logVerbose("Handler running ...", module);

    try {

      if (state == READING) read();                  //读取数据

      else if (state == SENDING) send();             //写入数据

    } catch (Exception ex) {

      Debug.logError("readRequest error:" + ex, module);

      socketHelper.close(sc);

      sk.cancel();

    }

  }

  //SocketChannel中读取数据

  private void read() throws Exception{

    try {

      //Socket中读取byte[]数组

    byte[] bytes = socketHelper.readSocket(sc);

      if (bytes.length == 0) throw new Exception();

    //实现服务器聊天核心处理功能,这里暂时打印出来,方便测试

    System.out.println(" ge result is :" + new String(bytes));

 

    state=SENDING;

      sk.interestOps(SelectionKey.OP_WRITE); //注册新的事件

  } catch (Exception ex) {

      throw new Exception(ex);

  }

  }

  //SocketChannel写入数据

  private void send()throws Exception{

    try {

      //写入测试数据

      String request1 = "come back";

      System.out.println(" send result is :" + request1);

    socketHelper.writeSocket(request1.getBytes(),sc);

    state=READING;

    sk.interestOps(SelectionKey.OP_READ);

  } catch (Exception ex) {

       throw new Exception(ex);

  }

  }

}

Handlerread方法中,简单地从SocketChannel中读取Message一个实例,然后打印出来,下一步可以在这里启动新的线程,进行聊天具体处理。如获取这个聊天信息的接受方用户ID,然后以用户ID寻找出它的SocketChannel,从而向对方用户发出该信息。

为了实现一个分布式的服务器环境,可以使用JMS这样的消息处理系统,通过查询该服务器内的用户名单,如果对方用户ID不是登录本服务器,那么通过JMS将消息发送给它。

下页