NoSQL专题
基于Spring+redis实现pub/sub
Pub / Sub(publisher/subcriber发布和订阅)消息是许多软件架构的重要组成部分。消息传递解决方案提供了高性能,可扩展性,队列持久性和耐用性,故障转移支持等,以及许多更漂亮具备的功能,在Java世界中大多总是使用JMS实现。后来使用Apache ActiveMQ的,有时只是需要简单的排队支持,而Apache ActiveMQ显得过于复杂。
在这个案例中,使用Redis作为pub/sub的队列,将消息存放在Redis内存中。
首先,配置Maven:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemalocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelversion>4.0.0</modelversion>
<groupid>com.example.spring</groupid>
<artifactid>redis</artifactid>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceencoding>UTF-8</project.build.sourceencoding>
<spring.version>3.1.1.RELEASE</spring.version>
</properties>
<dependencies>
<dependency>
<groupid>org.springframework.data</groupid>
<artifactid>spring-data-redis</artifactid>
<version>1.0.1.RELEASE</version>
</dependency>
<dependency>
<groupid>cglib</groupid>
<artifactid>cglib-nodep</artifactid>
<version>2.2</version>
</dependency>
<dependency>
<groupid>log4j</groupid>
<artifactid>log4j</artifactid>
<version>1.2.16</version>
</dependency>
<dependency>
<groupid>redis.clients</groupid>
<artifactid>jedis</artifactid>
<version>2.0.0</version>
<type>jar</type>
</dependency>
<dependency>
<groupid>org.springframework</groupid>
<artifactid>spring-core</artifactid>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupid>org.springframework</groupid>
<artifactid>spring-context</artifactid>
<version>${spring.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupid>org.apache.maven.plugins</groupid>
<artifactid>maven-compiler-plugin</artifactid>
<version>2.3.2</version>
<configuration>
<source>1.6
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
使用 Jedis作为Java客户端,需要三个组件:
- 连接Redis的工厂 -> JedisConnectionFactory
- redis操作模板-> RedisTemplate
- 消息监听者-> RedisMessageListenerContainer
下面配置Spring的Java类:
package com.example.redis.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericToStringSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.annotation.EnableScheduling;
import com.example.redis.IRedisPublisher;
import com.example.redis.impl.RedisMessageListener;
import com.example.redis.impl.RedisPublisherImpl;
@Configuration
@EnableScheduling
public class AppConfig {
@Bean
JedisConnectionFactory jedisConnectionFactory() {
return new JedisConnectionFactory();
}
@Bean
RedisTemplate< String, Object > redisTemplate() {
final RedisTemplate< String, Object > template = new RedisTemplate< String, Object >();
template.setConnectionFactory( jedisConnectionFactory() );
template.setKeySerializer( new StringRedisSerializer() );
template.setHashValueSerializer( new GenericToStringSerializer< Object >( Object.class ) );
template.setValueSerializer( new GenericToStringSerializer< Object >( Object.class ) );
return template;
}
@Bean
MessageListenerAdapter messageListener() {
return new MessageListenerAdapter( new RedisMessageListener() );
}
@Bean
RedisMessageListenerContainer redisContainer() {
final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory( jedisConnectionFactory() );
container.addMessageListener( messageListener(), topic() );
return container;
}
@Bean
IRedisPublisher redisPublisher() {
return new RedisPublisherImpl( redisTemplate(), topic() );
}
@Bean
ChannelTopic topic() {
return new ChannelTopic( 'pubsub:queue' );
}
}
下面做一个消息发布在Producer,每100ms发布一个字符串消息:
package com.example.redis.impl;
import java.util.concurrent.atomic.AtomicLong;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.scheduling.annotation.Scheduled;
import com.example.redis.IRedisPublisher;
public class RedisPublisherImpl implements IRedisPublisher {
private final RedisTemplate< String, Object > template;
private final ChannelTopic topic;
private final AtomicLong counter = new AtomicLong( 0 );
public RedisPublisherImpl( final RedisTemplate< String, Object > template,
final ChannelTopic topic ) {
this.template = template;
this.topic = topic;
}
@Scheduled( fixedDelay = 100 )
public void publish() {
template.convertAndSend( topic.getTopic(), 'Message ' + counter.incrementAndGet() +
', ' + Thread.currentThread().getName() );
}
}
消息监听者:
package com.example.redis.impl;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
public class RedisMessageListener implements MessageListener {
@Override
public void onMessage( final Message message, final byte[] pattern ) {
System.out.println( 'Message received: ' + message.toString() );
}
}
客户端调用运行:
public class RedisPubSubStarter {
public static void main(String[] args) {
new AnnotationConfigApplicationContext( AppConfig.class );
}
}
输出:
...
Message received: Message 1, pool-1-thread-1
Message received: Message 2, pool-1-thread-1
Message received: Message 3, pool-1-thread-1
Message received: Message 4, pool-1-thread-1
Message received: Message 5, pool-1-thread-1
Message received: Message 6, pool-1-thread-1
Message received: Message 7, pool-1-thread-1
Message received: Message 8, pool-1-thread-1
Message received: Message 9, pool-1-thread-1
Message received: Message 10, pool-1-thread-1
Message received: Message 11, pool-1-thread-1
Message received: Message 12, pool-1-thread-1
Message received: Message 13, pool-1-thread-1
Message received: Message 14, pool-1-thread-1
Message received: Message 15, pool-1-thread-1
Message received: Message 16, pool-1-thread-1
...