Redis用于高速缓存数据。除了主/从复制,发布/订阅功能和时间序列数据支持外,还添加了对流的支持。Kafka的问题是很难配置。基础架构维护非常具有挑战性。但是Redis非常容易并且重量轻。
样例应用:
- 我们的发布者将发布与Redis购买相关的一些偶数。让我们称它们为购买事件流。
- 消费者群体有兴趣听这些事件。这可能是用于计算收入或处理付款或发送电子邮件!
- 当您需要执行所有这些操作时,例如:付款处理和发送电子邮件,则每个用户都需要一个单独的消费者组。
- 消费者将消费事件,他们可以做任何事情。在我们的案例中,我们只是找到用户支付的价格,然后按类别计算收入。
- 可以将其记录在单独的数据库中。但是为了简单起见,我将在Redis中将此信息记录为SortedSet。
源代码:
完整的源代码在这里。
生产者
理想情况下,生产者和消费者将是两个不同的微服务/应用程序。在这里,我们两个人都将在同一个项目中。但是,我们基于名为“ app.role ”的自定义属性来控制应用程序的行为,使其像生产者还是消费者。基于该值,将在Spring中创建相应的组件。
@Service @ConditionalOnProperty(name="app.role", havingValue="producer") public class PurchaseEventProducer {
private AtomicInteger atomicInteger = new AtomicInteger(0);
@Value("${stream.key}") private String streamKey;
@Autowired private ProductRepository repository;
@Autowired private ReactiveRedisTemplate<String, String> redisTemplate;
@Scheduled(fixedRateString= "${publish.rate}") public void publishEvent(){ Product product = this.repository.getRandomProduct(); ObjectRecord<String, Product> record = StreamRecords.newRecord() .ofObject(product) .withStreamKey(streamKey); this.redisTemplate .opsForStream() .add(record) .subscribe(System.out::println); atomicInteger.incrementAndGet(); }
@Scheduled(fixedRate = 10000) public void showPublishedEventsSoFar(){ System.out.println( "Total Events :: " + atomicInteger.get() ); }
}
|
- publishEvent方法定期发布一些随机购买的产品。
- showPublishedEventsSoFar方法仅显示到目前为止已下的订单数。
消费者
我们的发布者已经准备好。让我们创建一个消费者。要使用RedisStreams,我们需要实现StreamListener接口。
@Service @ConditionalOnProperty(name="app.role", havingValue="consumer") public class PurchaseEventConsumer implements StreamListener<String, ObjectRecord<String, Product>> {
private AtomicInteger atomicInteger = new AtomicInteger(0);
@Autowired private ReactiveRedisTemplate<String, String> redisTemplate;
@Override @SneakyThrows public void onMessage(ObjectRecord<String, Product> record) { System.out.println( InetAddress.getLocalHost().getHostName() + " - consumed :" + record.getValue() ); this.redisTemplate .opsForZSet() .incrementScore("revenue", record.getValue().getCategory().toString(), record.getValue().getPrice()) .subscribe(); atomicInteger.incrementAndGet(); }
@Scheduled(fixedRate = 10000) public void showPublishedEventsSoFar(){ System.out.println( "Total Consumed :: " + atomicInteger.get() ); }
}
|
- 我们只简单地显示消费记录。
- 然后,我们获得支付的价格并将其添加到redis排序集中。
- 像发布者一样,我们会定期显示此使用者消耗的事件数。
Redis流配置
创建使用者后,我们需要通过将上述使用者添加到StreamMessageListenerContainer实例中来创建订阅。
@Configuration @ConditionalOnProperty(name="app.role", havingValue="consumer") public class RedisStreamConfig {
@Value("${stream.key}") private String streamKey;
@Autowired private StreamListener<String, ObjectRecord<String, Product>> streamListener;
@Bean public Subscription subscription(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException { var options = StreamMessageListenerContainer .StreamMessageListenerContainerOptions .builder() .pollTimeout(Duration.ofSeconds(1)) .targetType(Product.class) .build(); var listenerContainer = StreamMessageListenerContainer .create(redisConnectionFactory, options); var subscription = listenerContainer.receiveAutoAck( Consumer.from(streamKey, InetAddress.getLocalHost().getHostName()), StreamOffset.create(streamKey, ReadOffset.lastConsumed()), streamListener); listenerContainer.start(); return subscription; }
}
|
详细运行配置见原文