高性能聊天系统

  作者:板桥banq

上页

1.5  应用接口设计和实现

前面章节着重讨论的是接口层与Socket部分的代码重整。通过重整,使得接口层与Socket底层的联系变得松散而富于变化,为将来的数据类型和协议的动态扩展留下了发展的余地。

在本节中,将进一步优化接口层与应用层部分的代码,使得应用层对接口层的调用变得更加方便和直接。

原来在应用层的类ChatClient中对接口层调用语法如下:

QueueWorker worker =

 new  QueueAddWorker(queueFactory.getQueue(QueueFactory.TCP_QUEUE));

StringType st = new StringType(worker.REQUEST);

st.setContent(msg);

st.accpet(worker);

这是将msg字符串作为Request发送到服务器端。虽然这些语句已经显得精简和灵活了,但是因为TCP连接的开启和关闭控制还不能在这里实现控制,所以有必要重整或重新设计这部分代码,使得应用层调用底层Socket能类似调用HTTPUrlConnection那样方便。

在应用层中,每次需要发送和接受数据时,都习惯以创建一个连接为开始,然后向这个连接写入数据,写完以后就关闭这个连接。遵循这样对网络连接的操作习惯,在这里设计一个专门创建连接的工厂ConnectionFactory。连接工厂创建的产品是连接Connection

nio

1-9  应用层调用底层结构图

1-9中,应用层分别从ConnectionFactory获得相应的Connection,然后通过ConnectionSocket底层实现网络调用,整个Socket核心底层对于客户端和服务器端来说,犹如一个整体的黑匣子,客户端和服务器要实现数据交换,只要使用这个黑匣子作为中介者桥梁,方便地实现数据交换,根本无需考虑黑匣子中是如何具体实现的。

1.5.1  Connection API

下面进行具体设计和实现。Connection有两种具体产品类型:TCPConnectionUDPConnection。每个连接都有建立时的初始化方法和关闭方法,设计接口Connection如下(程序1-11):

程序1-11

package com.jdon.jserver.application;

 

import com.jdon.jserver.connector.queue.MessageQueue;

import com.jdon.jserver.connector.data.*;

import com.jdon.jserver.application.connection.ConnectionFactory;

import com.jdon.util.Debug;

/**

 * 连接接口,需要发送和接受数据时,使用本类。

 * ConnectionFactory获得本类实例

 * <p>Copyright: Jdon.com Copyright (c) 2003</p>

 * <p>Company: 上海极道计算机技术有限公司</p>

 * @author banq

 * @version 1.0

 */

public abstract class Connection {

  private final static String module = Connection.class.getName();

  protected MessageQueue queue = null;  

  protected int CSType;    //连接类型,是客户端还是服务器端

  //发出Object

  public void writeObject(Object obj) throws Exception {

    if (!isConnect())  throw new Exception("not connected");

    try {

      QueueWorker worker = new QueueAddWorker(queue);

      ObjectType ot = new ObjectType(getWriteMsgType());

      ot.setContent(obj);

      ot.accpet(worker);

    } catch (Exception ex) {

      Debug.logError("TcpConnection writeObject error:" + ex, module);

      throw new Exception(ex);

    }

  }

  //读取Object

  public Object readObject() throws Exception {

    if (!isConnect())   throw new Exception("not connected");

    try {

      QueueWorker worker = new QueueTakeWorker(queue);

      ObjectType ot = new ObjectType(getReadMsgType());

      ot.accpet(worker);

      return ot.getContent();

    } catch (Exception ex) {

      Debug.logError("TcpConnection writeObject error:" + ex, module);

      throw new Exception(ex);

    }

  }

  //发出字符串

  public void writeString(String msg) throws Exception {

    if (!isConnect())    throw new Exception("not connected");

    try {

      QueueWorker worker = new QueueAddWorker(queue);

      StringType st = new StringType(getWriteMsgType());

      st.setContent(msg);

      st.accpet(worker);

    } catch (Exception ex) {

      Debug.logError("TcpConnection writeString error:" + ex, module);

      throw new Exception(ex);

 

    }

  }

  //读取字符串

  public String readString() throws Exception {

    if (!isConnect())

      throw new Exception("not connected");

    try {

      QueueWorker worker = new QueueTakeWorker(queue);

      StringType st = new StringType(getReadMsgType());

      st.accpet(worker);

      return st.getContent();

    } catch (Exception ex) {

      Debug.logError("TcpConnection writeObject error:" + ex, module);

      throw new Exception(ex);

    }

  }

 

  /**

   * 根据是服务器应用还是客户端应用,设置写入Queue中的信息类型

   * request 还是response

   */

  public int getWriteMsgType() {

    if (CSType == ConnectionFactory.CLIENT)

      return QueueWorker.REQUEST;

    else

      return QueueWorker.RESPONSE;

  }

 

  /**

   * 根据是服务器应用还是客户端应用,设置从Queue中读取的信息类型

   * @return

   */

  public int getReadMsgType() {

    if (CSType == ConnectionFactory.CLIENT)

      return QueueWorker.RESPONSE;

    else

      return QueueWorker.REQUEST;

  }

  //以下是需要实现的行为方法

  // 打开连接

  public abstract void open(String url, int port) throws Exception;

//关闭连接

  public abstract void close() throws Exception;

//是否连接

  public abstract boolean isConnect() throws Exception;

}

这个Connection是面向应用层最终用户的API,通过获得Connection实例,客户端或服务器端应用可以收取或发送数据了,其中提供了基本的行为方法:打开连接、关闭连接、读数据以及写数据。Connection类的API总结见表1-1

1-1  Connection API方法总结

   

   

abstract  void

close()

关闭连接

int

getReadMsgType()
根据是服务器应用还是客户端应用,设置从Queue中读取的信息类型是request 还是response

int

getWriteMsgType()
根据是服务器应用还是客户端应用,设置写入Queue中的信息类型 request 还是response

abstract  boolean

isConnect()
是否连接

abstract  void

open(java.lang.String url, int port)
打开连接

java.lang.Object

readObject()
读取Object

java.lang.String

readString()
读取字符串

void

writeObject(java.lang.Object obj)
写入Object

void

writeString(java.lang.String msg)
写入字符串

1.5.2  ConnectionFactory API

这个Connection实例的获得是通过ConnectionFactory产生的ConnectionFactory是一个工厂方法模式的实现代码如下

package com.jdon.jserver.application.connection;

 

import com.jdon.jserver.application.Connection;

import com.jdon.util.Debug;

import com.jdon.jserver.connector.tcp.*;

import com.jdon.jserver.connector.udp.*;

 

public class ConnectionFactory {

  private final static String module = ConnectionFactory.class.getName();

    //以下是线程实例

  private TCPClient cclient = null;

  private UDPClient uclient = null;

  private TCPReactor tserver = null;

  private UDPReactor userver = null;

 

  private int fType; //是服务器端的还是CLient端,其值是下列3个之一

public final static int CLIENT = 1;

  public final static int TCPSERVER = 2;

  public final static int UDPSERVER = 3;

  //单态模式,初始化工厂类

  private static ConnectionFactory factory;

  public synchronized static ConnectionFactory getInstance(int fType) {

    if (factory == null)

      factory = new ConnectionFactory(fType);

    return factory;

  }

  private ConnectionFactory(int fType) {

    this.fType = fType;

  }

 

  /**

   * 获得一个TcpConnection实例

   * @return Connection

   */

  public Connection getTcpConnection() {

    if (fType == CLIENT) {

      startTcpClientSocket();  //启动客户端底层Socket

      return new TcpConnection(cclient);

    } else {

      startTcpServerSocket();  //启动服务器端底层Socket

      return new TcpConnection();

    }

  }

 

  /**

   * 获得一个UdpConnection实例

   * @return Connection

   */

  public Connection getUdpConnection() {

    if (fType == CLIENT) {

      startUdpClientSocket();

      return new UdpConnection(uclient);

    } else {

      startUdpServerSocket();

      return new UdpConnection();

    }

  }

  /**

   * 开启客户端Tcp Socket线程

   */

  private void startTcpClientSocket() {

    if (cclient != null)

      return;

    try {

      cclient = new TCPClient();

      Thread thread = new Thread(cclient);

      thread.setDaemon(true);

      thread.start();

      Debug.logVerbose("-->  started Tcp Socket thread ..", module);

    } catch (Exception ex) {

      Debug.logError("started Tcp error:" + ex, module);

    }

  }

  /**

   * 开启客户端Udp Socket线程

   */

  private void startUdpClientSocket() {

    if (uclient != null)

      return;

    try {

      uclient = new UDPClient();

      Thread thread = new Thread(uclient);

      thread.setDaemon(true);

      thread.start();

      Debug.logVerbose("-->  started Udp Socket thread ..", module);

    } catch (Exception ex) {

      Debug.logError("started Udp error:" + ex, module);

    }

  }

  /**

   * 开启服务器端TCP Socket线程

   */

  private void startTcpServerSocket() {

    if (tserver != null)

      return;

    try {

      ServerCfg cfg = new ServerCfg();

      tserver = new TCPReactor(cfg.getTcpPort());

      Thread thread = new Thread(tserver);

      thread.setDaemon(true);

      thread.start();

      Debug.logVerbose("-->  started Tcp Socket thread ..", module);

    } catch (Exception ex) {

      Debug.logError("started Tcp error:" + ex, module);

    }

  }

  /**

   * 开启服务器端UDP Socket线程

   */

  private void startUdpServerSocket() {

    if (userver != null)

      return;

    try {

      ServerCfg cfg = new ServerCfg();

      userver = new UDPReactor(cfg.getUdpPort());

      Thread thread = new Thread(userver);

      thread.setDaemon(true);

      thread.start();

      Debug.logVerbose("-->  started Udp Socket thread ..", module);

    } catch (Exception ex) {

      Debug.logError("started Udp error:" + ex, module);

    }

  }

}

ConnectionFactory是专门用于生产Connection的工厂类,主要是提供两种方法:getTcpConnectiongetUdpConnection。因为客户端和服务器端都要使用Connection,所以通过参数fType来区分是客户端还是服务器端。在产生新的连接之前,检查Socket非堵塞I/O线程是否已经启动,如果没有,首先启动它。实际上,启动的是一个Selector检查线程。在以后的open方法中,只要向这个Selector注册一个Channel,使用writeXXXXreadXXX方法就可以发出或收取数据了。

1.5.3  TcpConnection API

Selector注册Channel是在Connection子类中实现的,TcpConnection是一个基于TCP Socket的实现,代码如下:

package com.jdon.jserver.application.connection;

 

import java.nio.channels.SocketChannel;

import com.jdon.jserver.application.Connection;

import com.jdon.jserver.connector.tcp.TCPClient;

import com.jdon.jserver.connector.queue.QueueFactory;

import com.jdon.jserver.connector.data.*;

import com.jdon.util.Debug;

 

public class TcpConnection extends Connection {

  private final static String module = TcpConnection.class.getName();

  private final static QueueFactory queueFactory = QueueFactory.getInstance();

 

  private TCPClient client = null;

  private SocketChannel sc = null;

  private boolean isConnect = false;

 

  /**

   * 客户端连接初始化

   * @param client

   */

  public TcpConnection(TCPClient client){

    this();

    this.client = client;

    this.CSType = ConnectionFactory.CLIENT;  //设置为客户端模式

  }

 

  /**

   * 服务器端连接初始化

   */

  public TcpConnection(){

     queue = queueFactory.getQueue(QueueFactory.TCP_QUEUE);

     this.CSType = ConnectionFactory.TCPSERVER;  //设置为服务器端模式

     isConnect = true;  //服务器模式下 连接一直Open,默认采取长连接

  }

  //Connection抽象类的具体实现

  public void open(String url, int port) throws Exception {

    try {

      sc = client.openSocketChannel(url, port); //打开并注册一个新的SocketChannel

      isConnect = true;

    } catch (Exception ex) {

      Debug.logError("TcpConnection open error:" + ex, module);

      throw new Exception(ex);

    }

  }

  public boolean isConnect() throws Exception{

    return isConnect;

  }

  //关闭SocketChannel

  public void close() throws Exception {

    if (!isConnect) return;

    try {

      sc.close();

      isConnect = false;;

    } catch (Exception ex) {

      Debug.logError("TcpConnection close error:" + ex, module);

      throw new Exception(ex);

    }

  }

}

1.5.4  UdpConnection API

基于UDPConnection实现子类比TCP要简单一点,因为UDP没有建立连接的概念,因此isConnect()方法基本无实际意义。UdpConnection代码如下:

package com.jdon.jserver.application.connection;

 

import java.nio.channels.DatagramChannel;

import com.jdon.jserver.application.Connection;

import com.jdon.jserver.connector.udp.UDPClient;

import com.jdon.jserver.connector.queue.QueueFactory;

import com.jdon.jserver.connector.data.*;

import com.jdon.util.Debug;

 

public class UdpConnection extends Connection {

  private final static String module = UdpConnection.class.getName();

  private final static QueueFactory queueFactory = QueueFactory.getInstance();

  private UDPClient client = null;

  private DatagramChannel channel;

  /**

   * 客户端连接初始化

   * @param client

   */

  public UdpConnection(UDPClient client){

    this();

    this.client = client;

    this.CSType = ConnectionFactory.CLIENT;

  }

  /**

   * 服务器端连接初始化

   */

  public UdpConnection(){

    queue = queueFactory.getQueue(QueueFactory.UDP_QUEUE);

    this.CSType = ConnectionFactory.UDPSERVER;

  }

  //默认一直连接

  public boolean isConnect() throws Exception{

    return true;

  }

  //打开一个DatagramChannel

  public void open(String url, int port) throws Exception {

    try {

      channel = client.openDatagramChannel(url, port);

    } catch (Exception ex) {

      Debug.logError("TcpConnection open error:" + ex, module);

      throw new Exception(ex);

    }

  }

  //关闭DatagramChannel

  public void close() throws Exception {

    try {

     channel.disconnect();

     channel.close();

    } catch (Exception ex) {

      Debug.logError("TcpConnection close error:" + ex, module);

      throw new Exception(ex);

    }

  }

}

这样,整个连接Connection结构基本完成,应用层主要是通过Connection实现客户端和服务器之间的数据交换。

下页