使用Spring Boot + Redis 进行实时流处理 - vinsguru


Redis用于高速缓存数据。除了主/从复制,发布/订阅功能和时间序列数据支持外,还添加了对流的支持。Kafka的问题是很难配置。基础架构维护非常具有挑战性。但是Redis非常容易并且重量轻。
 
样例应用:

  • 我们的发布者将发布与Redis购买相关的一些偶数。让我们称它们为购买事件流。
  • 消费者群体有兴趣听这些事件。这可能是用于计算收入或处理付款或发送电子邮件!
    • 当您需要执行所有这些操作时,例如:付款处理和发送电子邮件,则每个用户都需要一个单独的消费者组。
  • 消费者将消费事件,他们可以做任何事情。在我们的案例中,我们只是找到用户支付的价格,然后按类别计算收入。
    • 可以将其记录在单独的数据库中。但是为了简单起见,我将在Redis中将此信息记录为SortedSet。

源代码:
完整的源代码在这里
 
生产者
理想情况下,生产者和消费者将是两个不同的微服务/应用程序。在这里,我们两个人都将在同一个项目中。但是,我们基于名为“ app.role ”的自定义属性来控制应用程序的行为,使其像生产者还是消费者。基于该值,将在Spring中创建相应的组件。
@Service
@ConditionalOnProperty(name="app.role", havingValue="producer")
public class PurchaseEventProducer {

    private AtomicInteger atomicInteger = new AtomicInteger(0);

    @Value(
"${stream.key}")
    private String streamKey;

    @Autowired
    private ProductRepository repository;

    @Autowired
    private ReactiveRedisTemplate<String, String> redisTemplate;

    @Scheduled(fixedRateString=
"${publish.rate}")
    public void publishEvent(){
        Product product = this.repository.getRandomProduct();
        ObjectRecord<String, Product> record = StreamRecords.newRecord()
                .ofObject(product)
                .withStreamKey(streamKey);
        this.redisTemplate
                .opsForStream()
                .add(record)
                .subscribe(System.out::println);
        atomicInteger.incrementAndGet();
    }

    @Scheduled(fixedRate = 10000)
    public void showPublishedEventsSoFar(){
        System.out.println(
               
"Total Events :: " + atomicInteger.get()
        );
    }

}

  • publishEvent方法定期发布一些随机购买的产品。
  • showPublishedEventsSoFar方法仅显示到目前为止已下的订单数。

 
消费者
我们的发布者已经准备好。让我们创建一个消费者。要使用RedisStreams,我们需要实现StreamListener接口。
@Service
@ConditionalOnProperty(name="app.role", havingValue="consumer")
public class PurchaseEventConsumer implements StreamListener<String, ObjectRecord<String, Product>> {

    private AtomicInteger atomicInteger = new AtomicInteger(0);

    @Autowired
    private ReactiveRedisTemplate<String, String> redisTemplate;

    @Override
    @SneakyThrows
    public void onMessage(ObjectRecord<String, Product> record) {
        System.out.println(
                InetAddress.getLocalHost().getHostName() +
" - consumed :" +
                record.getValue()
        );
        this.redisTemplate
                .opsForZSet()
                .incrementScore(
"revenue", record.getValue().getCategory().toString(), record.getValue().getPrice())
                .subscribe();
        atomicInteger.incrementAndGet();
    }

    @Scheduled(fixedRate = 10000)
    public void showPublishedEventsSoFar(){
        System.out.println(
               
"Total Consumed :: " + atomicInteger.get()
        );
    }

}

  • 我们只简单地显示消费记录。
  • 然后,我们获得支付的价格并将其添加到redis排序集中。
  • 像发布者一样,我们会定期显示此使用者消耗的事件数。

 
Redis流配置
创建使用者后,我们需要通过将上述使用者添加到StreamMessageListenerContainer实例中来创建订阅。
@Configuration
@ConditionalOnProperty(name="app.role", havingValue="consumer")
public class RedisStreamConfig {

    @Value(
"${stream.key}")
    private String streamKey;

    @Autowired
    private StreamListener<String, ObjectRecord<String, Product>> streamListener;

    @Bean
    public Subscription subscription(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
        var options = StreamMessageListenerContainer
                            .StreamMessageListenerContainerOptions
                            .builder()
                            .pollTimeout(Duration.ofSeconds(1))
                            .targetType(Product.class)
                            .build();
        var listenerContainer = StreamMessageListenerContainer
                                    .create(redisConnectionFactory, options);
        var subscription = listenerContainer.receiveAutoAck(
                Consumer.from(streamKey, InetAddress.getLocalHost().getHostName()),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
                streamListener);
        listenerContainer.start();
        return subscription;
    }

}


详细运行配置见原文