高性能聊天室中问题: 用户池

05-12-05 weiminghui
在高性能聊天室(实用系统一书中的代码)里我希望添加用户池,以达到用户可以和指定的其他用户发信息。

在用户池实现上,

我的理解是

1) 应用层获得一个连接的数据后,

例如 数据字符串为 from:1 ,to:(2,3) ,info: hello

通过应用层解析 获得需要发送给用户ID 为 2和3的用户。

2) 在用户连接池中查找ID 号为 2和3 的连接

在一个连接到来时: 通过 Connection conn = connFactory.getTcpConnection(true)

获得应用层次上的连接对象,并添加 到用户池中 connpool.save(userid,conn);

例如:

现有三个连接分别为 用户1,2,3 的连接用户池中

1 要求发送信息 给2、3,在用户池中找到对应的 2,3对应的conn对象。

3) 通过2 和3 的连接 发给用户(2,3)

这一步中,因为request、responde 消息都使用消息队列的方式,就将发给2和3的请求写到发送队列中。

在2和3对应的 Channel 可写时,将信息从队列中取出,并从socket 发送出去。

不知道这样处理是不是正确的?

但我看书中UserPool类里 存的Connector对象中直接存的IPaddress 和 SocketChannel

是不是想 在TCPHandler 这个类中直接找到用户对应的 socketchannel.write(buffer1); 写到要发送的客户中去。

但这样的话,有要将用户 这部分写到 TCPHandler 里了吗?有要破坏对socket操作的封装?

请指教!!

banq
2005-12-05 17:41
当初我写这套底层框架已经考虑用户的相关应用,用户这部分应用不必修改原来的代码,是基于其之上的层次。

weiminghui
2005-12-05 21:56
谢谢banq 大哥。

不过我还是太明白。

你的意思是 UserPool里直接 存用户对应的SocketChannl? 这样的话,不修改TcpHandler 似乎不好处理。

你说的“用户这部分应用不必修改原来的代码,是基于其之上的层次。”是指的应用层次上的吗??

看在我认真研读你代码份上,说明白点吧。 我真的是想了很久没想明白,才问的!!先谢谢了!

weiminghui
2005-12-05 22:09
谢谢 banq 大哥的回答,不过我还是大明白;

你说的:“用户这部分应用不必修改原来的代码,是基于其之上的层次”

是指的应用层次上的吗?服务器把返回的写到 responde 队列里吗??

banq
2005-12-06 08:42
呵呵,因为时间长了,具体细节我以及忘记,你可以参考Tomcat的源码看看,我记得是在User中保存一个socketChannel的,拷贝一段我N年前做的代码供你参考:

public class SocketManager {

    public static Logger logger = Logger.getLogger(SocketManager.class);

    //this maintain persistent relation (userID --> socketChannel)
    private Hashtable user2socket = new Hashtable();

    ///////////////////////////////////////////////////////////////////////
    //this record the user operate time
    private Hashtable user2time = new Hashtable();

    /**
     * 检查是否有userId
     * @param userId
     * @return
     */
    public boolean isExsit(String userId) {
        if (user2socket.containsKey(userId))
            return true;
        else
            return false;
    }

    /**
     * 根据userId得到Socketchannel
     * @param userId
     * @return
     */
    public SocketChannel getSocket(String userId) {
        return (SocketChannel) user2socket.get(userId);
    }

    /**
     * 保存userId 和Socketchannel的关系
     * @param userId
     * @param sc
     */
    public void putSocket(String userId, SocketChannel sc) {
        user2socket.put(userId, sc);
    }

    public int getUser2socketSize() {
        return user2socket.size();
    }

    public boolean isOnlySocket(SocketChannel sc){
        if (user2socket.containsValue(sc))
           return false;
        else
            return true;
    }

    /**
     * 返回user2time 主要用于遍历user2time
     * @return
     */
    public Hashtable getUser2time() {
        return user2time;
    }

    /**
     * 得到userId的最新更新时间
     * @param userId
     * @return
     */
    public Long getUpdateTime(String userId) {
        return (Long) user2time.get(userId);
    }

    /**
     * 保存userId的最新更新时间
     * @param userId
     */
    public void putUpdateTime(String userId) {
        logger.debug("-->> update the user2time");
        user2time.put(userId, new Long(System.currentTimeMillis()));
    }


    public String getUserId(SocketChannel sc) {

        try {
            if (!user2socket.containsValue(sc)) {
                logger.debug("-->> there is no this socket:" + sc);
                return null;
            }
            Enumeration e = user2socket.keys();
            while (e.hasMoreElements()) {
                String sid = (String) e.nextElement();
                if (sc == (SocketChannel) user2socket.get(sid)) {
                    return sid;
                }
            }
        } catch (Exception ex) {
            logger.error(ex);
        }

        return null;

    }


    /**
     *
     * 关闭Socketchannel
     * @param incomingChannel
     */
    public synchronized void channelClose(SocketChannel incomingChannel) {

        logger.debug("-->> close socket");

        try {
            if (incomingChannel != null) {
                incomingChannel.socket().close();
                incomingChannel.close();
                incomingChannel = null;
            }
        } catch (Exception ex) {
            logger.error(ex);
        }
    }

    /**
     *
     * 关闭Socketchannel
     * 同时清除用户相关数据
     * @param userID
     */
    public synchronized void channelClose(String userID) {

        logger.debug("-->> delete socket and session by userID" + userID);

        try {
            SocketChannel sc = (SocketChannel) user2socket.get(userID);
            sc.socket().close();
            sc.close();
            sc = null;
        } catch (Exception ex) {
            logger.error(ex);
        }

    }


    /**
     * remove the userid's socketchannel record
     * @param userid
     */
    public void removeUserState(String userid) {

        logger.debug("-->> delete user2socket  by userid=" + userid);

        user2socket.remove(userid);

    }

    /**
     * remove the userid's socketchannel record
     * @param userid
     */
    public void removeUserMonitor(String userid) {
        logger.debug("-->> delete monitor session by userid=" + userid);

        user2time.remove(userid);

    }



    public boolean testSocket(String userId, byte[] bt) {

        ByteBuffer bf = ByteBuffer.wrap(bt);
        SocketChannel sc = getSocket(userId);
        try {
            sc.write(bf);
        } catch (Exception ex) {
            return false;
        }
        return true;
    }

}
<p>

weiminghui
2005-12-06 11:24
非常感谢 banq 大哥的热心回答。

那在userpool 里循环得到需要的 socketchannel

socketchannel.write(buffer1); 直接写消息?

猜你喜欢