Spring 3 HornetQ 2.1整合教程源码下载
HornetQ是一个开源的项目,一个多协议,可嵌入,高性能,集群的异步消息传递系统。它是用Java编写,运行在一个Java5或更高版本平台上。 HornetQ的高性能的日志消息持久性能等同于正常的消息非持久的性能。消息非持久性的性能更是非常高的。HornetQ提供服务器复制和失败容错,消息负载平衡,并提供全面的管理API来管理和监控所有HornetQ的服务器。
配置applicationContext.xml:
<bean name="namingServerImpl" class="org.jnp.server.NamingBeanImpl" init-method="start" destroy-method="stop" />
<bean name="namingServer" class="org.jnp.server.Main" init-method="start" destroy-method="stop">
<property name="namingInfo" ref="namingServerImpl" />
<property name="port" value="1099" />
<property name="bindAddress" value="localhost" />
<property name="rmiPort" value="1098" />
<property name="rmiBindAddress" value="localhost" />
</bean>
<bean name="mbeanServer" class="java.lang.management.ManagementFactory" factory-method="getPlatformMBeanServer" />
<bean name="fileConfiguration" class="org.hornetq.core.config.impl.FileConfiguration" init-method="start" destroy-method="stop" />
<bean name="hornetQSecurityManagerImpl" class="org.hornetq.spi.core.security.HornetQSecurityManagerImpl" />
<!-- The core server -->
<bean name="hornetQServerImpl" class="org.hornetq.core.server.impl.HornetQServerImpl">
<constructor-arg ref="fileConfiguration" />
<constructor-arg ref="mbeanServer" />
<constructor-arg ref="hornetQSecurityManagerImpl" />
</bean>
<!-- The JMS server -->
<bean name="jmsServerManagerImpl" class="org.hornetq.jms.server.impl.JMSServerManagerImpl" init-method="start" destroy-method="stop" depends-on="namingServer">
<constructor-arg ref="hornetQServerImpl" />
</bean>
JmsTemplate连接工厂配置:
<bean name="connectionFactory" class="org.hornetq.jms.client.HornetQConnectionFactory" >
<constructor-arg>
<bean class="org.hornetq.api.core.TransportConfiguration">
<constructor-arg value="org.hornetq.integration.transports.netty.NettyConnectorFactory" />
<constructor-arg>
<map key-type="java.lang.String" value-type="java.lang.Object">
<entry key="port" value="5445"></entry>
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
jmsTemplate提供JMS连接:
<bean name="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"></property>
</bean>
JNDI配置:
<bean id="inVMConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean" depends-on="jmsServerManagerImpl">
<property name="jndiName">
<value>java:/ConnectionFactory</value>
</property>
</bean>
配置Queue,在hornetq-jms.xml 加入:
<queue name="Notifications">
<entry name="/queue/Notifications"/>
</queue>
为了使用这个Queue,配置JNDI:
<bean id="notificationsQueue" class="org.springframework.jndi.JndiObjectFactoryBean" depends-on="jmsServerManagerImpl">
<property name="jndiName">
<value>/queue/Notifications</value>
</property>
</bean>
队列Queue的生产者:
package com.javacodegeeks.gwtspring.server.utils;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service("notificationsProducer")
public class NotificationsProducer {
@Autowired
Queue notificationsQueue;
@Autowired
ConnectionFactory inVMConnectionFactory;
private Connection notificationsQueueConnection;
private Session notificationsQueueSession;
private MessageProducer notificationsQueueProducer;
@PostConstruct
public void init() throws Exception {
notificationsQueueConnection = inVMConnectionFactory.createConnection();
notificationsQueueSession = notificationsQueueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
notificationsQueueProducer = notificationsQueueSession.createProducer(notificationsQueue);
notificationsQueueProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
@PreDestroy
public void destroy() throws Exception {
if(notificationsQueueConnection != null)
notificationsQueueConnection.close();
}
public void sendNotification(final String message) throws Exception {
TextMessage textMessage = notificationsQueueSession.createTextMessage(message);
notificationsQueueProducer.send(textMessage);
}
}
消费者:
package com.javacodegeeks.gwtspring.server.utils;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service("notificationsConsumer")
public class NotificationsConsumer implements MessageListener {
@Autowired
Queue notificationsQueue;
@Autowired
ConnectionFactory inVMConnectionFactory;
private Connection notificationsQueueConnection;
@PostConstruct
public void init() throws Exception {
notificationsQueueConnection = inVMConnectionFactory.createConnection();
Session notificationsQueueSession = notificationsQueueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer notificationsQueueConsumer = notificationsQueueSession.createConsumer(notificationsQueue);
notificationsQueueConsumer.setMessageListener(this);
notificationsQueueConnection.start();
}
@PreDestroy
public void destroy() throws Exception {
if(notificationsQueueConnection != null)
notificationsQueueConnection.close();
}
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
String text = ((TextMessage) message).getText();
System.out.println("The Notification Message is : \n" + text);
} catch (JMSException ex) {
throw new RuntimeException(ex);
}
} else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}
}
服务代码:
package com.javacodegeeks.gwtspring.server.services;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import com.javacodegeeks.gwtspring.server.dao.EmployeeDAO;
import com.javacodegeeks.gwtspring.server.utils.NotificationsProducer;
import com.javacodegeeks.gwtspring.shared.dto.EmployeeDTO;
import com.javacodegeeks.gwtspring.shared.services.EmployeeService;
@Service("employeeService")
public class EmployeeServiceImpl implements EmployeeService {
@Autowired
private EmployeeDAO employeeDAO;
@Autowired
NotificationsProducer notificationsProducer;
@PostConstruct
public void init() throws Exception {
}
@PreDestroy
public void destroy() {
}
@Transactional(propagation=Propagation.SUPPORTS, rollbackFor=Exception.class)
public EmployeeDTO findEmployee(long employeeId) {
return employeeDAO.findById(employeeId);
}
@Transactional(propagation=Propagation.REQUIRED, rollbackFor=Exception.class)
public void saveEmployee(long employeeId, String name, String surname, String jobDescription) throws Exception {
EmployeeDTO employeeDTO = employeeDAO.findById(employeeId);
if(employeeDTO == null) {
employeeDTO = new EmployeeDTO(employeeId, name,surname, jobDescription);
employeeDAO.persist(employeeDTO);
}
}
@Transactional(propagation=Propagation.REQUIRED, rollbackFor=Exception.class)
public void updateEmployee(long employeeId, String name, String surname, String jobDescription) throws Exception {
EmployeeDTO employeeDTO = employeeDAO.findById(employeeId);
if(employeeDTO != null) {
employeeDTO.setEmployeeName(name);
employeeDTO.setEmployeeSurname(surname);
employeeDTO.setJob(jobDescription);
}
}
@Transactional(propagation=Propagation.REQUIRED, rollbackFor=Exception.class)
public void deleteEmployee(long employeeId) throws Exception {
EmployeeDTO employeeDTO = employeeDAO.findById(employeeId);
if(employeeDTO != null)
employeeDAO.remove(employeeDTO);
}
@Transactional(propagation=Propagation.REQUIRED, rollbackFor=Exception.class)
public void saveOrUpdateEmployee(long employeeId, String name, String surname, String jobDescription) throws Exception {
EmployeeDTO employeeDTO = new EmployeeDTO(employeeId, name,surname, jobDescription);
employeeDAO.merge(employeeDTO);
notificationsProducer.sendNotification("Save Or Update Employee with values : \nID : " + employeeId + "\nName : " + name + "\nSurname : " + surname + "\nJob description : " + jobDescription);
}
}
部署运行,浏览器访问:http://localhost:8080/GWTSpringInfinispanHornetQ/
输出结果:
The Notification Message is :
Save Or Update Employee with values :
ID : xxx
Name : xxx
Surname : xxx
Job description : xxx