上一级 首页 下一级

4  JMS 邮件发送组件

J2EE中,JMSJava Message System)提供了一种异步处理机制的实现。JMS通过异步的、非阻塞的消息传递,将消息的生产者和使用者松散的联系在一起。对于使用者,它无所谓是谁产生了消息或者是在何时产生的。这就能够建立一种动态的、灵活的可靠的系统。所谓可靠,因为JMS将每个消息都保存起来,只有确保本消息处理后才会完全放弃。否则,将会反复提交处理。这种可靠的机制使得JMS能够成功的在证券、银行系统中得到广泛应用。

JMS中的消息类型有两种:TopicQueueTopic的操作是使用发布/订阅(publish/subscribe)的方式;Queue的操作是点对点(ponit to point)的方式。

·          publish/subscribe:发布者(publisher)发布消息到Topic,订阅者(subsribe)从Topic订阅消息,订阅者的数量是不受限制的。

·          ponit to point:点对点传输消息,建立在消息队列Queue的基础上,每个消息使用者只对应一个消息队列,不像publish/subscribe那样可以有很多消息使用者。

本项目中,可以由单个独立的程序来实现邮件发送,这个程序作为一个消息使用者,只需一个就可以了,因此使用Queue来实现。

JMS在消息到达消息使用者,有两种——同步和异步。

·          同步是指消息使用者明确地主动地调用从QueueTopic中得到消息,一直进行循环直至一个消息到达,或者也可以设定超时时间。很明显这个方法是比较耗费CPU资源的。

·          异步接受是指消息到达时,主动通知消息使用者,消息使用者可以实现message listener接口。这样每当消息到达时,JMS provider 会通过调用这个listeneronMessage方法来传输这个消息。该方法相对要有效率,邮件发送组件将采取这种方式。

下面描述这个邮件发送组件是如何具体实现的。

在这个组件中,消息生产者应该是具体应用程序。但是组件为了达到通用目的,需要建立一个专门的消息生产者,可以称之为AsyncSender,(异步发送器)。具体应用程序通过调用这个异步发送器,将邮件内容发给JMSQueue,负责邮件发送的类作为这个Queue另外一端的消息使用者,在Queue中有需要发送的邮件出现时就立即将它从Queue中取出,实现邮件发送。


4.1  消息发送器

AsyncSender属于消息生产者它是具体应用系统直接调用的可以使用无状态Session Bean来实现。

AsyncSender中,首先要通过JNDI获得一个ConnectionFactory,创建一个QueueConnection实例,这样就获得与JMS Provider的一个连接。再由这个QueueConnection创建一个QueueSession,这样就启动了一个和JMS Provider连接相关的线程,这个线程可以是发送或接收消息。

AsyncSender还需要通过JNDI获得一个已经存在的Queue。这样,通过将发送消息的线程和这个Queue联系起来,表示向这个Queue中发送消息。

AsyncSenderbean代码如下:

/**

 *  JMS客户端 消息生产者

 * <p>Copyright: Jdon.com Copyright (c) 2003</p>

 * <p>Company: 上海汲道计算机技术有限公司</p>

 * @author banq

 * @version 1.0

 */

public class AsyncSenderBean implements SessionBean {

 

  private final static Logger logger = Logger.getLogger(AsyncSenderBean.class);

 

  SessionContext sessionContext;

  private SessionContext sc;

  private Queue queue;

  private QueueConnectionFactory qFactory;

 

  public void ejbCreate() throws CreateException {

    try {

      ServiceLocator serviceLocator = new ServiceLocator();

        //查询JNDI获得QueueConnectionFactory

      qFactory =

          serviceLocator.getQueueConnectionFactory(

          JNDINames.QUEUE_CONNECTION_FACTORY);

       //查询JNDI 获得已经存在的Queue

      queue = serviceLocator.getQueue(JNDINames.ASYNC_SENDER_QUEUE);

    } catch (ServiceLocatorException sle) {

      throw new EJBException("AsyncSenderEJB.ejbCreate failed", sle);

    }

  }

  //发送邮件的方法

  public void sendAMessage(String msg) {

    QueueSession session = null;

    QueueConnection qConnect = null;

    QueueSender qSender = null;

    try {

      //创建一个QueueConnection

      qConnect = qFactory.createQueueConnection();

      //创建一个QueueSession

      session = qConnect.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

      logger.debug("-->>create sender");

      //创建一个发送者

      qSender = session.createSender(queue);

      TextMessage jmsMsg = session.createTextMessage();

      //设置发送内容

      jmsMsg.setText(msg);

      //向目标queue发送

      qSender.send(jmsMsg);

      logger.debug("-->>send ok msg:"+msg);

    } catch (Exception e) {

      logger.error("sendAMessage error " + e);

      throw new EJBException("askMDBToSendAMessage: Error!", e);

    } finally {

      try {

        if (qConnect != null) {          qConnect.close();        }

      } catch (Exception e) {}

    }

  }

  …

}

其中,QueueConnectionFactoryJNDI名称是java:comp/env/jms/QueueConnection Factory,而QueueJNDI名称是java:comp/env/jms/MailQueue,分别需要在ejb-jar.xml中落实这两个环境变量。

ejb-jar.xml中,有:

<session>

     <display-name>AsyncSender</display-name>

     <ejb-name>AsyncSender</ejb-name>

     <local-home>

        com.jdon.asyncsender.ejb.AsyncSenderLocalHome

     </local-home>

     <local>com.jdon.asyncsender.ejb.AsyncSenderLocal</local>

    <ejb-class>com.jdon.asyncsender.ejb.AsyncSenderBean</ejb-class>

     <session-type>Stateless</session-type>

     <transaction-type>Container</transaction-type>

      <!-- resource-ref一般用来设置数据库资源或JMS Provider资源  -->

     <resource-ref>

          <description />

                <res-ref-name>jms/QueueConnectionFactory</res-ref-name>

                <res-type>javax.jms.QueueConnectionFactory</res-type>

                <res-auth>Container</res-auth>

   </resource-ref>

   <!--  resource-ref指定的资源相关的环境配置 -->

     <resource-env-ref>

          <description />

          <resource-env-ref-name>jms/MailQueue</resource-env-ref-name>

          <resource-env-ref-type>javax.jms.Queue</resource-env-ref-type>

    </resource-env-ref>

</session>

QueueConnectionFactoryQueue与具体JMS容器有关。在JBoss 3.0中,提供了现成的QueueConnectionFactoryQueue,当然也可以参考JBoss手册自己设置建立这两个配置。

这里使用JBossConnectionFactory和已经存在的/testQueue,其JNDI名称写法分别是java:/ConnectionFactoryqueue/testQueue,这些都是JBoss的规定写法。如果是Topic,那么就需要写成topic/testTopictestTopic/testQueueJBoss容器在启动时建立好的一个Topic/Queue。在jboss.xml中,写入下列代码:

  <session>

        <ejb-name>AsyncSender</ejb-name>

        <local-jndi-name>AsyncSenderLocal</local-jndi-name>

        <resource-ref>

             <res-ref-name>jms/QueueConnectionFactory</res-ref-name>

             <jndi-name>java:/ConnectionFactory</jndi-name>

        </resource-ref>

        <resource-env-ref>

             <resource-env-ref-name>jms/MailQueue</resource-env-ref-name>

             <jndi-name>queue/testQueue</jndi-name>

        </resource-env-ref>

  </session>

通过上面开发,消息的生产者功能基本完成,通过调用EJB AsyncSendersendAMessage(String msg)方法,可以将msg发送到JMSQueue中。


4.2  MDB

Queue的另外一端是消息的使用者。MDBMessage-Driven Beans)专门处理JMS异步消息,Session BeanEntity Bean只允许同步地去发送消息和接收消息,不支持异步。MDB是一个message listener,它能够从一个Queue或一个durable subscription中可靠地接收消息。

MDB与一个普通的消息使用者客户端的区别是,EJB容器将自动做下面的事情,无需应用者再自己编程实现:

·          创建一个消息接受者(QueueReceiver/TopicSubscriber)接收消息。在部署时,将destinationConnectionFactoryMDB联合起来。在JBoss中通过指定destination-jndi-name来实现。

·          自动实现message listener接口(无需调用setMessageListener方法)。

·          容器自动指定了消息签收模式。

因此,使用MDB作为消息的使用者就非常简单,而且没有homeremote接口,只有一个bean类。建立MDB MailerBean代码如下:

/**

 * JMS消息使用者 EJB消息Bean

 * <p>Copyright: Jdon.com Copyright (c) 2003</p>

 * <p>Company: 上海汲道计算机技术有限公司</p>

 * @author banq

 * @version 1.0

 */

public class MailerBean implements MessageDrivenBean, MessageListener {

  private final static Logger logger = Logger.getLogger(MailerBean.class);

  MessageDrivenContext messageDrivenContext;

  public void ejbCreate(){

  }

  public void ejbRemove() {}

  //当消息来时,将自动激活这个方法

  public void onMessage(Message msg){

    logger.debug(" --> enter onMessage ..");

    TextMessage textMessage = null;

    String xmlMailMessage = null;

    Mail recMail = null;

    try {

      textMessage = (TextMessage) msg;

      xmlMailMessage = textMessage.getText();

      //XML文本转换成Mail对象实例

      recMail = MailUtil.getMailFromMsg(xmlMailMessage);

      logger.debug(" --> begin connect the server ....");

      sendMail(recMail);

      logger.debug(" --> send mail ok ");

    } catch (JMSException je) {

      logger.error("MailerMDB.onMessag error" + je);

      throw new EJBException("MailerMDB.onMessage" + je);

    } catch (Exception me) {

      logger.error("MailerMDB.onMessag error" + me);

    }

  }

  //发送邮件

  private void sendMail(Mail mail) throws Exception{

    getMailHelper().createAndSendMail(mail);

  }

  private MailHelper getMailHelper(){

    return (new MailHelper());

  }

  public void setMessageDrivenContext(MessageDrivenContext messageDrivenContext) {

    this.messageDrivenContext = messageDrivenContext;

  }

}

MailHelper是一个专门使用J2EE容器的mail service发送邮件的类,代码如下:

public class MailHelper {

   private final static Logger logger = Logger.getLogger(MailHelper.class);

   //创建一个email message 并且使用J2EE mail services发送它

  public void createAndSendMail(Mail mail) throws MailerAppException {

    try {

      logger.debug(" --> lookup mail session");

      InitialContext ic = new InitialContext();

      Session session = (Session) ic.lookup(JNDINames.MAIL_SESSION);

      Message msg = new MimeMessage(session);

 

      logger.debug(" --> beigin to set mail ");

      msg.setFrom(new InternetAddress(mail.getFromAddress()));

      msg.setRecipients(Message.RecipientType.TO,

                        InternetAddress.parse(mail.getToAddress(), false));

      msg.setSubject(mail.getSubject());

      msg.setText(mail.getContent());

      msg.setSentDate(new Date());

      logger.debug(" --> beigin to send now ....");

      Transport.send(msg);

    } catch (Exception e) {

      logger.error("createAndSendMail exception : " + e);

      throw new MailerAppException("Failure while sending mail");

    }

  }

}

由上可见,MDB MailerBean是在Queue中有消息到达时,取出后委托MailHelper使用J2EE容器的Mail Service发送邮件。

这两个类都比较依赖容器,所以要进行容器的配置。

ejb-jar.xml中配置MailerBean如下:

<message-driven>

  <display-name>Mailer</display-name>

    <ejb-name>Mailer</ejb-name>

    <ejb-class>com.jdon.mailer.ejb.MailerBean</ejb-class>

    <transaction-type>Container</transaction-type>

    <message-driven-destination>

        <destination-type>javax.jms.Queue</destination-type>

    </message-driven-destination>

     <!--  使用容器的邮件操作资源 -->

    <resource-ref>

        <description />

          <res-ref-name>mail/DefaultMail</res-ref-name>

          <res-type>javax.mail.Session</res-type>

          <res-auth>Container</res-auth>

    </resource-ref>

</message-driven>

由于使用了容器的邮件发送服务,那么需要指定这个服务的JNDI,在jboss.xml当中加入:

<message-driven>

   <ejb-name>Mailer</ejb-name>

    <destination-jndi-name>queue/testQueue</destination-jndi-name>

    <resource-ref>

         <res-ref-name>mail/DefaultMail</res-ref-name>

         <jndi-name>java:/Mail</jndi-name>

    </resource-ref>

</message-driven>

这表示使用JNDIjava:/Mail的邮件服务资源,那么再部署发布本组件时,就需要在JBoss服务器中配置这个邮件服务资源,具体如何配置参见部署发布的章节。



上一级 首页 下一级