Spring专题

Spring 3 HornetQ 2.1整合教程源码下载

Spring 3 HornetQ 2.1整合教程源码下载

HornetQ是一个开源的项目,一个多协议,可嵌入,高性能,集群的异步消息传递系统。它是用Java编写,运行在一个Java5或更高版本平台上。 HornetQ的高性能的日志消息持久性能等同于正常的消息非持久的性能。消息非持久性的性能更是非常高的。HornetQ提供服务器复制和失败容错,消息负载平衡,并提供全面的管理API来管理和监控所有HornetQ的服务器。

配置applicationContext.xml:

<bean name="namingServerImpl" class="org.jnp.server.NamingBeanImpl" init-method="start" destroy-method="stop" />

<bean name="namingServer" class="org.jnp.server.Main" init-method="start" destroy-method="stop">
 <property name="namingInfo" ref="namingServerImpl" />
 <property name="port" value="1099" />
 <property name="bindAddress" value="localhost" />
 <property name="rmiPort" value="1098" />
 <property name="rmiBindAddress" value="localhost" />
</bean>

<bean name="mbeanServer" class="java.lang.management.ManagementFactory" factory-method="getPlatformMBeanServer" />

<bean name="fileConfiguration" class="org.hornetq.core.config.impl.FileConfiguration" init-method="start" destroy-method="stop" />

<bean name="hornetQSecurityManagerImpl" class="org.hornetq.spi.core.security.HornetQSecurityManagerImpl" />

<!-- The core server -->
<bean name="hornetQServerImpl" class="org.hornetq.core.server.impl.HornetQServerImpl">
 <constructor-arg ref="fileConfiguration" />
 <constructor-arg ref="mbeanServer" />
 <constructor-arg ref="hornetQSecurityManagerImpl" />
</bean>

<!-- The JMS server -->
<bean name="jmsServerManagerImpl" class="org.hornetq.jms.server.impl.JMSServerManagerImpl" init-method="start" destroy-method="stop" depends-on="namingServer">
 <constructor-arg ref="hornetQServerImpl" />
</bean>

JmsTemplate连接工厂配置:

<bean name="connectionFactory" class="org.hornetq.jms.client.HornetQConnectionFactory" >
 <constructor-arg>
  <bean class="org.hornetq.api.core.TransportConfiguration">
   <constructor-arg value="org.hornetq.integration.transports.netty.NettyConnectorFactory" />
   <constructor-arg>
    <map key-type="java.lang.String" value-type="java.lang.Object">
     <entry key="port" value="5445"></entry>
    </map>
   </constructor-arg>
  </bean>
 </constructor-arg>
</bean>

jmsTemplate提供JMS连接:

<bean name="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
 <property name="connectionFactory" ref="connectionFactory"></property>
</bean>

JNDI配置:

<bean id="inVMConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean" depends-on="jmsServerManagerImpl">
 <property name="jndiName">
  <value>java:/ConnectionFactory</value>
 </property>
</bean>

配置Queue,在hornetq-jms.xml 加入:

<queue name="Notifications">
 <entry name="/queue/Notifications"/>
</queue>

为了使用这个Queue,配置JNDI:

<bean id="notificationsQueue" class="org.springframework.jndi.JndiObjectFactoryBean" depends-on="jmsServerManagerImpl">
<property name="jndiName">
<value>/queue/Notifications</value>
</property>
</bean>

队列Queue的生产者:

package com.javacodegeeks.gwtspring.server.utils;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service("notificationsProducer")
public class NotificationsProducer {

 @Autowired
 Queue notificationsQueue;

 @Autowired
 ConnectionFactory inVMConnectionFactory;

 private Connection notificationsQueueConnection;
 private Session notificationsQueueSession;
 private MessageProducer notificationsQueueProducer;

 

 @PostConstruct
 public void init() throws Exception {
  notificationsQueueConnection = inVMConnectionFactory.createConnection();
  notificationsQueueSession = notificationsQueueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  notificationsQueueProducer = notificationsQueueSession.createProducer(notificationsQueue);
  notificationsQueueProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 }

 @PreDestroy
 public void destroy() throws Exception {
  if(notificationsQueueConnection != null)
   notificationsQueueConnection.close();
 }

 public void sendNotification(final String message) throws Exception {

  TextMessage textMessage = notificationsQueueSession.createTextMessage(message);
  notificationsQueueProducer.send(textMessage);

 }

}

消费者:

package com.javacodegeeks.gwtspring.server.utils;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service("notificationsConsumer")
public class NotificationsConsumer implements MessageListener {

 @Autowired
 Queue notificationsQueue;

 @Autowired
 ConnectionFactory inVMConnectionFactory;

 private Connection notificationsQueueConnection;

 @PostConstruct
 public void init() throws Exception {
  notificationsQueueConnection = inVMConnectionFactory.createConnection();
  Session notificationsQueueSession = notificationsQueueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  MessageConsumer notificationsQueueConsumer = notificationsQueueSession.createConsumer(notificationsQueue);
  notificationsQueueConsumer.setMessageListener(this);
  notificationsQueueConnection.start();
 }

 @PreDestroy
 public void destroy() throws Exception {
  if(notificationsQueueConnection != null)
   notificationsQueueConnection.close();
 }

 @Override
 public void onMessage(Message message) {
  if (message instanceof TextMessage) {
   try {
    String text = ((TextMessage) message).getText();
    System.out.println("The Notification Message is : \n" + text);
   } catch (JMSException ex) {
     throw new RuntimeException(ex);
   }
  } else {
    throw new IllegalArgumentException("Message must be of type TextMessage");
  }
 }

}

服务代码:

package com.javacodegeeks.gwtspring.server.services;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import com.javacodegeeks.gwtspring.server.dao.EmployeeDAO;
import com.javacodegeeks.gwtspring.server.utils.NotificationsProducer;
import com.javacodegeeks.gwtspring.shared.dto.EmployeeDTO;
import com.javacodegeeks.gwtspring.shared.services.EmployeeService;

@Service("employeeService")
public class EmployeeServiceImpl implements EmployeeService {

 @Autowired
 private EmployeeDAO employeeDAO;

 @Autowired
 NotificationsProducer notificationsProducer;

 @PostConstruct
 public void init() throws Exception {
 }

 @PreDestroy
 public void destroy() {
 }

 @Transactional(propagation=Propagation.SUPPORTS, rollbackFor=Exception.class)
 public EmployeeDTO findEmployee(long employeeId) {

  return employeeDAO.findById(employeeId);

 }

 @Transactional(propagation=Propagation.REQUIRED, rollbackFor=Exception.class)
 public void saveEmployee(long employeeId, String name, String surname, String jobDescription) throws Exception {

  EmployeeDTO employeeDTO = employeeDAO.findById(employeeId);

  if(employeeDTO == null) {
   employeeDTO = new EmployeeDTO(employeeId, name,surname, jobDescription);
   employeeDAO.persist(employeeDTO);
  }

 }

 @Transactional(propagation=Propagation.REQUIRED, rollbackFor=Exception.class)
 public void updateEmployee(long employeeId, String name, String surname, String jobDescription) throws Exception {

  EmployeeDTO employeeDTO = employeeDAO.findById(employeeId);

  if(employeeDTO != null) {
   employeeDTO.setEmployeeName(name);
   employeeDTO.setEmployeeSurname(surname);
   employeeDTO.setJob(jobDescription);
  }

 }

 @Transactional(propagation=Propagation.REQUIRED, rollbackFor=Exception.class)
 public void deleteEmployee(long employeeId) throws Exception {

  EmployeeDTO employeeDTO = employeeDAO.findById(employeeId);

  if(employeeDTO != null)
   employeeDAO.remove(employeeDTO);

 }

 @Transactional(propagation=Propagation.REQUIRED, rollbackFor=Exception.class)
 public void saveOrUpdateEmployee(long employeeId, String name, String surname, String jobDescription) throws Exception {

  EmployeeDTO employeeDTO = new EmployeeDTO(employeeId, name,surname, jobDescription);

  employeeDAO.merge(employeeDTO);

  notificationsProducer.sendNotification("Save Or Update Employee with values : \nID : " + employeeId + "\nName : " + name + "\nSurname : " + surname + "\nJob description : " + jobDescription);

 }

}

部署运行,浏览器访问:http://localhost:8080/GWTSpringInfinispanHornetQ/

输出结果:

The Notification Message is :
Save Or Update Employee with values :
ID : xxx
Name : xxx
Surname : xxx
Job description : xxx