高性能聊天系统
作者:板桥banq
1.5 应用接口设计和实现
前面章节着重讨论的是接口层与Socket部分的代码重整。通过重整,使得接口层与Socket底层的联系变得松散而富于变化,为将来的数据类型和协议的动态扩展留下了发展的余地。
在本节中,将进一步优化接口层与应用层部分的代码,使得应用层对接口层的调用变得更加方便和直接。
原来在应用层的类ChatClient中对接口层调用语法如下:
QueueWorker worker =
new QueueAddWorker(queueFactory.getQueue(QueueFactory.TCP_QUEUE));
StringType st = new StringType(worker.REQUEST);
st.setContent(msg);
st.accpet(worker);
这是将msg字符串作为Request发送到服务器端。虽然这些语句已经显得精简和灵活了,但是因为TCP连接的开启和关闭控制还不能在这里实现控制,所以有必要重整或重新设计这部分代码,使得应用层调用底层Socket能类似调用HTTPUrlConnection那样方便。
在应用层中,每次需要发送和接受数据时,都习惯以创建一个连接为开始,然后向这个连接写入数据,写完以后就关闭这个连接。遵循这样对网络连接的操作习惯,在这里设计一个专门创建连接的工厂ConnectionFactory。连接工厂创建的产品是连接Connection。
图1-9 应用层调用底层结构图
图1-9中,应用层分别从ConnectionFactory获得相应的Connection,然后通过Connection向Socket底层实现网络调用,整个Socket核心底层对于客户端和服务器端来说,犹如一个整体的黑匣子,客户端和服务器要实现数据交换,只要使用这个黑匣子作为中介者桥梁,方便地实现数据交换,根本无需考虑黑匣子中是如何具体实现的。
1.5.1 Connection API
下面进行具体设计和实现。Connection有两种具体产品类型:TCPConnection和UDPConnection。每个连接都有建立时的初始化方法和关闭方法,设计接口Connection如下(程序1-11):
程序1-11
package com.jdon.jserver.application;
import com.jdon.jserver.connector.queue.MessageQueue;
import com.jdon.jserver.connector.data.*;
import com.jdon.jserver.application.connection.ConnectionFactory;
import com.jdon.util.Debug;
/**
* 连接接口,需要发送和接受数据时,使用本类。
* 从ConnectionFactory获得本类实例
* <p>Copyright: Jdon.com Copyright (c) 2003</p>
* <p>Company: 上海极道计算机技术有限公司</p>
* @author banq
* @version 1.0
*/
public abstract class Connection {
private final static String module = Connection.class.getName();
protected MessageQueue queue = null;
protected int CSType; //连接类型,是客户端还是服务器端
//发出Object
public void writeObject(Object obj) throws Exception {
if (!isConnect()) throw new Exception("not connected");
try {
QueueWorker worker = new QueueAddWorker(queue);
ObjectType ot = new ObjectType(getWriteMsgType());
ot.setContent(obj);
ot.accpet(worker);
} catch (Exception ex) {
Debug.logError("TcpConnection writeObject error:" + ex, module);
throw new Exception(ex);
}
}
//读取Object
public Object readObject() throws Exception {
if (!isConnect()) throw new Exception("not connected");
try {
QueueWorker worker = new QueueTakeWorker(queue);
ObjectType ot = new ObjectType(getReadMsgType());
ot.accpet(worker);
return ot.getContent();
} catch (Exception ex) {
Debug.logError("TcpConnection writeObject error:" + ex, module);
throw new Exception(ex);
}
}
//发出字符串
public void writeString(String msg) throws Exception {
if (!isConnect()) throw new Exception("not connected");
try {
QueueWorker worker = new QueueAddWorker(queue);
StringType st = new StringType(getWriteMsgType());
st.setContent(msg);
st.accpet(worker);
} catch (Exception ex) {
Debug.logError("TcpConnection writeString error:" + ex, module);
throw new Exception(ex);
}
}
//读取字符串
public String readString() throws Exception {
if (!isConnect())
throw new Exception("not connected");
try {
QueueWorker worker = new QueueTakeWorker(queue);
StringType st = new StringType(getReadMsgType());
st.accpet(worker);
return st.getContent();
} catch (Exception ex) {
Debug.logError("TcpConnection writeObject error:" + ex, module);
throw new Exception(ex);
}
}
/**
* 根据是服务器应用还是客户端应用,设置写入Queue中的信息类型
* 是request 还是response
*/
public int getWriteMsgType() {
if (CSType == ConnectionFactory.CLIENT)
return QueueWorker.REQUEST;
else
return QueueWorker.RESPONSE;
}
/**
* 根据是服务器应用还是客户端应用,设置从Queue中读取的信息类型
* @return
*/
public int getReadMsgType() {
if (CSType == ConnectionFactory.CLIENT)
return QueueWorker.RESPONSE;
else
return QueueWorker.REQUEST;
}
//以下是需要实现的行为方法
// 打开连接
public abstract void open(String url, int port) throws Exception;
//关闭连接
public abstract void close() throws Exception;
//是否连接
public abstract boolean isConnect() throws Exception;
}
这个Connection是面向应用层最终用户的API,通过获得Connection实例,客户端或服务器端应用可以收取或发送数据了,其中提供了基本的行为方法:打开连接、关闭连接、读数据以及写数据。Connection类的API总结见表1-1。
表1-1 Connection API方法总结
方 法
|
说 明
|
abstract void |
close()
关闭连接 |
int |
getReadMsgType() |
int |
getWriteMsgType() |
abstract boolean |
isConnect() |
abstract
void
|
open(java.lang.String url, int port) |
java.lang.Object
|
readObject() |
java.lang.String
|
readString() |
void
|
writeObject(java.lang.Object obj) |
void
|
writeString(java.lang.String msg) |
1.5.2 ConnectionFactory API
这个Connection实例的获得是通过ConnectionFactory产生的,ConnectionFactory是一个工厂方法模式的实现,代码如下:
package com.jdon.jserver.application.connection;
import com.jdon.jserver.application.Connection;
import com.jdon.util.Debug;
import com.jdon.jserver.connector.tcp.*;
import com.jdon.jserver.connector.udp.*;
public class ConnectionFactory {
private final static String module = ConnectionFactory.class.getName();
//以下是线程实例
private TCPClient cclient = null;
private UDPClient uclient = null;
private TCPReactor tserver = null;
private UDPReactor userver = null;
private int fType; //是服务器端的还是CLient端,其值是下列3个之一
public final static int CLIENT = 1;
public final static int TCPSERVER = 2;
public final static int UDPSERVER = 3;
//单态模式,初始化工厂类
private static ConnectionFactory factory;
public synchronized static ConnectionFactory getInstance(int fType) {
if (factory == null)
factory = new ConnectionFactory(fType);
return factory;
}
private ConnectionFactory(int fType) {
this.fType = fType;
}
/**
* 获得一个TcpConnection实例
* @return Connection
*/
public Connection getTcpConnection() {
if (fType == CLIENT) {
startTcpClientSocket(); //启动客户端底层Socket
return new TcpConnection(cclient);
} else {
startTcpServerSocket(); //启动服务器端底层Socket
return new TcpConnection();
}
}
/**
* 获得一个UdpConnection实例
* @return Connection
*/
public Connection getUdpConnection() {
if (fType == CLIENT) {
startUdpClientSocket();
return new UdpConnection(uclient);
} else {
startUdpServerSocket();
return new UdpConnection();
}
}
/**
* 开启客户端Tcp Socket线程
*/
private void startTcpClientSocket() {
if (cclient != null)
return;
try {
cclient = new TCPClient();
Thread thread = new Thread(cclient);
thread.setDaemon(true);
thread.start();
Debug.logVerbose("--> started Tcp Socket thread ..", module);
} catch (Exception ex) {
Debug.logError("started Tcp error:" + ex, module);
}
}
/**
* 开启客户端Udp Socket线程
*/
private void startUdpClientSocket() {
if (uclient != null)
return;
try {
uclient = new UDPClient();
Thread thread = new Thread(uclient);
thread.setDaemon(true);
thread.start();
Debug.logVerbose("--> started Udp Socket thread ..", module);
} catch (Exception ex) {
Debug.logError("started Udp error:" + ex, module);
}
}
/**
* 开启服务器端TCP Socket线程
*/
private void startTcpServerSocket() {
if (tserver != null)
return;
try {
ServerCfg cfg = new ServerCfg();
tserver = new TCPReactor(cfg.getTcpPort());
Thread thread = new Thread(tserver);
thread.setDaemon(true);
thread.start();
Debug.logVerbose("--> started Tcp Socket thread ..", module);
} catch (Exception ex) {
Debug.logError("started Tcp error:" + ex, module);
}
}
/**
* 开启服务器端UDP Socket线程
*/
private void startUdpServerSocket() {
if (userver != null)
return;
try {
ServerCfg cfg = new ServerCfg();
userver = new UDPReactor(cfg.getUdpPort());
Thread thread = new Thread(userver);
thread.setDaemon(true);
thread.start();
Debug.logVerbose("--> started Udp Socket thread ..", module);
} catch (Exception ex) {
Debug.logError("started Udp error:" + ex, module);
}
}
}
ConnectionFactory是专门用于生产Connection的工厂类,主要是提供两种方法:getTcpConnection和getUdpConnection。因为客户端和服务器端都要使用Connection,所以通过参数fType来区分是客户端还是服务器端。在产生新的连接之前,检查Socket非堵塞I/O线程是否已经启动,如果没有,首先启动它。实际上,启动的是一个Selector检查线程。在以后的open方法中,只要向这个Selector注册一个Channel,使用writeXXXX或readXXX方法就可以发出或收取数据了。
1.5.3 TcpConnection API
向Selector注册Channel是在Connection子类中实现的,TcpConnection是一个基于TCP Socket的实现,代码如下:
package com.jdon.jserver.application.connection;
import java.nio.channels.SocketChannel;
import com.jdon.jserver.application.Connection;
import com.jdon.jserver.connector.tcp.TCPClient;
import com.jdon.jserver.connector.queue.QueueFactory;
import com.jdon.jserver.connector.data.*;
import com.jdon.util.Debug;
public class TcpConnection extends Connection {
private final static String module = TcpConnection.class.getName();
private final static QueueFactory queueFactory = QueueFactory.getInstance();
private TCPClient client = null;
private SocketChannel sc = null;
private boolean isConnect = false;
/**
* 客户端连接初始化
* @param client
*/
public TcpConnection(TCPClient client){
this();
this.client = client;
this.CSType = ConnectionFactory.CLIENT; //设置为客户端模式
}
/**
* 服务器端连接初始化
*/
public TcpConnection(){
queue = queueFactory.getQueue(QueueFactory.TCP_QUEUE);
this.CSType = ConnectionFactory.TCPSERVER; //设置为服务器端模式
isConnect = true; //服务器模式下 连接一直Open,默认采取长连接
}
//Connection抽象类的具体实现
public void open(String url, int port) throws Exception {
try {
sc = client.openSocketChannel(url, port); //打开并注册一个新的SocketChannel
isConnect = true;
} catch (Exception ex) {
Debug.logError("TcpConnection open error:" + ex, module);
throw new Exception(ex);
}
}
public boolean isConnect() throws Exception{
return isConnect;
}
//关闭SocketChannel
public void close() throws Exception {
if (!isConnect) return;
try {
sc.close();
isConnect = false;;
} catch (Exception ex) {
Debug.logError("TcpConnection close error:" + ex, module);
throw new Exception(ex);
}
}
}
1.5.4 UdpConnection API
基于UDP的Connection实现子类比TCP要简单一点,因为UDP没有建立连接的概念,因此isConnect()方法基本无实际意义。UdpConnection代码如下:
package com.jdon.jserver.application.connection;
import java.nio.channels.DatagramChannel;
import com.jdon.jserver.application.Connection;
import com.jdon.jserver.connector.udp.UDPClient;
import com.jdon.jserver.connector.queue.QueueFactory;
import com.jdon.jserver.connector.data.*;
import com.jdon.util.Debug;
public class UdpConnection extends Connection {
private final static String module = UdpConnection.class.getName();
private final static QueueFactory queueFactory = QueueFactory.getInstance();
private UDPClient client = null;
private DatagramChannel channel;
/**
* 客户端连接初始化
* @param client
*/
public UdpConnection(UDPClient client){
this();
this.client = client;
this.CSType = ConnectionFactory.CLIENT;
}
/**
* 服务器端连接初始化
*/
public UdpConnection(){
queue = queueFactory.getQueue(QueueFactory.UDP_QUEUE);
this.CSType = ConnectionFactory.UDPSERVER;
}
//默认一直连接
public boolean isConnect() throws Exception{
return true;
}
//打开一个DatagramChannel
public void open(String url, int port) throws Exception {
try {
channel = client.openDatagramChannel(url, port);
} catch (Exception ex) {
Debug.logError("TcpConnection open error:" + ex, module);
throw new Exception(ex);
}
}
//关闭DatagramChannel
public void close() throws Exception {
try {
channel.disconnect();
channel.close();
} catch (Exception ex) {
Debug.logError("TcpConnection close error:" + ex, module);
throw new Exception(ex);
}
}
}
这样,整个连接Connection结构基本完成,应用层主要是通过Connection实现客户端和服务器之间的数据交换。