高性能聊天系统
作者:板桥banq
1.3 Socket核心设计和实现
通过前面的系统设计,基本解决了本系统实现的主要技术问题,下面将就具体的实现细节展开讨论。
整个系统的核心底层是非堵塞I/O技术,通过使用这一新技术,可以实现底层网络I/O的无堵塞、流畅地读写,为整个系统的高性能运行奠定了坚实的基础。
非堵塞的Socket I/O有两大部分:服务器和客户端。在两端都将采取这一新技术,根据TCP/UDP不同,又分别有两套Socket详细实现。
非堵塞的Socket I/O实现和以往堵塞I/O的实现在编程上有所不同,以前堵塞I/O的Socket读写是一种被动行为,即new Socket这些语句可以根据自己系统的应用要求放置在任何位置,可以由程序员自己任意安排的,而非堵塞I/O的Socket读写则不一样,它类似一个主动的、有自己“意志”行为的独立线程(因为使用了Reactor模式),所以,什么时候读取数据,什么时候写入数据不是由程序员自己能掌控的,是由Selector决定的。
因此这两种I/O模式的不同使用,决定了不同的编程模式和思维习惯,从堵塞I/O转到非堵塞I/O上,对程序员有些考验。
下面将先讨论服务器的Socket非堵塞I/O实现。
1.3.1 TCP和Reactor模式
Reactor模式是属于一种自我触发、自我激活的模式,非堵塞I/O的Selector实现了Reactor模式主要部分,将连接事件自我触发,以SelectionKey事件形式爆发出来。
因此,只要建立一个线程类,反复检查Selector中是否有触发的SelectionKey,如果有,就再次触发对这些事件进行相应的处理,建立一个名为Reactor的线程类。在这个Reactor类中,将使用Selector的事件触发机制,触发本应用系统的事件处理机制。
首先以基于TCP连接的Socket实现为例,在Doug Lea 的《Scalable IO in Java》这本电子文档中,使用Reactor模式很好地实现了事件的自触发机制,如图1-8所示。
图1-8 服务器处理连接事件
在图1-8中,Reactor向Selector注册了一个关于TCP的连接事件OP_ACCEPT(是否有可接受的连接事件)。当客户端第一次开始连接服务器时,OP_ACCEPT事件将激活,Reactor将检测到这个激活事件对象SelectionKey,从SelectionKey的attachment中获取Acceptor线程对象,直接运行Acceptor线程。
Acceptor将完成两件事情:
(1)向Selector注册了一个新的连接事件OP_READ(是否可以读取数据)。这是假设客户端一旦连接上服务器后,将首先向服务器发送数据,一旦TCP连接握手成功,服务器首先要处于准备读取数据的状态。
(2)更改SelectionKey中的attachment,修改为Handler线程对象,这是一个处理读取或写入数据的线程类。
当客户端发送数据到服务器时,可读取事件OP_READ发生了,Reactor又检测到这个事件对象SelectionKey,从SelectionKey的attachment中获取Handler线程对象,立即运行这个线程。
Handler线程从SelectionKey中提取SocketChannel,再从这个Channel中读取数据,然后向Selector注册一个新的连接事件OP_WRITE,以便服务器在处理完成读取的数据后,再写入发送到客户端。
当OP_WRITER事件发生时,Handler线程又开始运行,这次是向SocketChannel写入数据,写入完成后,向Selector再注册新的连接事件OP_READ,这样一个请求/响应模式的数据处理基本完成,准备进入下一个循环。
创建Reactor类如下(程序1-1):
程序1-1
public class TCPReactor implements Runnable {
private final static String module = TCPReactor.class.getName();
private final Selector selector; //Selector 实例
private final ServerSocketChannel sc; //SeletableCannel一个实现
public TCPReactor (int port) throws IOException {
selector = Selector.open(); //创建Selector实例
sc = ServerSocketChannel.open(); //创建ServerSocketChannel实例
InetSocketAddress address =
new InetSocketAddress(InetAddress.getLocalHost(), port);
sc.socket().bind(address); //绑定ServerSocketChannel
Debug.logVerbose("-->Start host:"+ InetAddress.getLocalHost()+" port=" + port);
sc.configureBlocking(false); //设置为非堵塞
//向selector注册该channel感兴趣的事件为OP_ACCEPT
SelectionKey sk = sc.register(selector, SelectionKey.OP_ACCEPT);
//利用sk的attache功能绑定Acceptor 如果有事件触发Acceptor
sk.attach(new Acceptor(selector, sc));
Debug.logVerbose("-->attach(new Acceptor()!");
}
public void run() {
try {
while (!Thread.interrupted()) { //反复运行,检查是否有触发的key
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
//Selector如果发现channel有事件发生,进行key的遍历
while (it.hasNext())
//来一个事件 第一次触发一个accepter线程
//以后触发SocketReadHandler
dispatch( (SelectionKey) (it.next()));
selected.clear();
}
} catch (IOException ex) {
Debug.logError("reactor stop!" + ex, module);
}
}
//运行Acceptor或SocketReadHandler
private void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
if (r != null) {
Debug.logVerbose("-->dispatch running");
r.run();
}
}
}
在线程的run()方法中,通过while (!Thread.interrupted())语句不断地对Selector进行事件检查,一旦有事先注册的关注的事件发生,运行dispatch(SelectionKey k)方法进行分配处理,在dispatch方法中,从SelectionKey的attachment中获得的一个线程,然后启动这个线程,这样,获取发生事件后,同时也驱动了对事件的进一步处理。
那么,这个线程是如何被赋予SelectionKey呢?原来在前面有一句:
sk.attach(new Acceptor(selector, sc));
SlectionKey有两种处理附件attachment的方法:
public abstract class SelectionKey
{
…
public final Object attach (Object ob) //类似setAttachment
public final Object attachment( ) //类似getAttachment
…
}
首先通过attach将一个对象和SelectionKey发生联系,然后再通过attachment( )获得这个对象,这个对象可以是任何业务对象、处理器或另外一个Channel。attach只是保存对象的引用,在使用完成这个功能后,要使用attach(null)来清除附件对象的引用,以便垃圾回收机制能够回收这个附件对象。
SelectionKey注册自己的特定对象用如下语句:
SelectionKey key = channel.register (selector, SelectionKey.OP_READ, myObject);
等同于下列语句:
SelectionKey key = channel.register (selector, SelectionKey.OP_READ);
key.attach (myObject);
本系统中是采取后者做法:
sk.attach(new Acceptor(selector, sc));
那么被attach的线程对象Accpetor是对事件实行进一步处理的,注意一下事先注册的事件是SelectionKey.OP_ACCEPT,即系统运行开始时,第一个关注的事件总是OP_ACCEPT:是否有可接受的网络连接,这是服务器运行后一直应该关注的头等事件。
如果有这样的事件发生,那么就会激活Accpetor线程对象,从而启动一个Acceptor线程,在这个线程中,将准备下一步工作,就是再向Selector注册其他事件,例如这个连接是否可以读出或是否可以写入数据等。代码如下(程序1-2):
程序1-2
public class Acceptor implements Runnable {
private final Selector selector;
private final ServerSocketChannel ssc;
public Acceptor(Selector selector, ServerSocketChannel ssc) {
this.selector = selector;
this.ssc = ssc;
}
public void run() {
try {
Debug.logVerbose("-->ready for accept!");
SocketChannel sc = ssc.accept();
if (sc != null) {
sc.configureBlocking(false); //设定为非堵塞
SelectionKey sk = sc.register(selector, 0); //注册这个SocketChannel
//同时将SelectionKey标记为可读,以便读取
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup(); //因为interestOps,防止Selector死锁
sk.attach(new Handler(sk, sc)); //携带Handler对象
}
} catch (Exception ex) {
Debug.logVerbose("accept stop!" + ex);
}
}
}
在Accpetor中,从ServerSocketChannel获得SocketChannel实例,这两个Channel可注册的事件是不一样的,后者可以注册是否可读或可写等事件。Accpetor代码中注册了是否可以读SelectionKey.OP_READ的事件,然后attach了Handler线程对象。
这样,Selector将一直关注OP_READ事件,一旦有这类事件发生,将激活attachment为Handler线程的运行。Handler在可读事件发生后启动,就是从SocketChannel中读取客户端传送的数据了,Handler代码如下(程序1-3):
程序1-3
public class TCPHandler implements Runnable {
private final static String module = TCPHandler.class.getName();
private final SocketChannel sc;
private final SelectionKey sk;
private SocketHelper socketHelper; //Socket读写帮助类
public TCPHandler (SelectionKey sk, SocketChannel sc) throws IOException {
this.sc = sc;
this.sk = sk;
socketHelper = new SocketHelper();
Debug.logVerbose(" SocketReadHandler prepare ...", module);
}
public void run() { //线程run方法
Debug.logVerbose("Handler running ...", module);
try {
if (state == READING) read(); //读取数据
else if (state == SENDING) send(); //写入数据
} catch (Exception ex) {
Debug.logError("readRequest error:" + ex, module);
socketHelper.close(sc);
sk.cancel();
}
}
//从SocketChannel中读取数据
private void read() throws Exception{
try {
//从Socket中读取byte[]数组
byte[] bytes = socketHelper.readSocket(sc);
if (bytes.length == 0) throw new Exception();
//实现服务器聊天核心处理功能,这里暂时打印出来,方便测试
System.out.println(" ge result is :" + new String(bytes));
state=SENDING;
sk.interestOps(SelectionKey.OP_WRITE); //注册新的事件
} catch (Exception ex) {
throw new Exception(ex);
}
}
//向SocketChannel写入数据
private void send()throws Exception{
try {
//写入测试数据
String request1 = "come back";
System.out.println(" send result is :" + request1);
socketHelper.writeSocket(request1.getBytes(),sc);
state=READING;
sk.interestOps(SelectionKey.OP_READ);
} catch (Exception ex) {
throw new Exception(ex);
}
}
}
在Handler的read方法中,简单地从SocketChannel中读取Message一个实例,然后打印出来,下一步可以在这里启动新的线程,进行聊天具体处理。如获取这个聊天信息的接受方用户ID,然后以用户ID寻找出它的SocketChannel,从而向对方用户发出该信息。
为了实现一个分布式的服务器环境,可以使用JMS这样的消息处理系统,通过查询该服务器内的用户名单,如果对方用户ID不是登录本服务器,那么通过JMS将消息发送给它。
下页