使用Rqueue框架基于Redis和Spring Boot执行异步任务 -sonus21


在本文中,我们将学习如何使用Spring Boot 2.x和Redis执行异步任务,最后的代码演示了本文中描述的步骤。
一个典型的API调用包括五件事:

  1. 执行一个或多个数据库(RDBMS / NoSQL)查询。
  2. 在某些缓存系统(内存中,分布式等)上的一项或多项操作。
  3. 一些计算(可能是做一些数学运算的一些数据处理)。
  4. 调用其他服务(内部/外部)。
  5. 安排一个或多个任务在以后或立即在后台执行。

出于多种原因,可以在以后的某个时间安排任务。例如,必须在订单创建或装运后7天生成发票。同样,不需要立即发送电子邮件通知,因此我们可以延迟它们。 
考虑到这些实际示例,有时,我们需要异步执行任务以减少API响应时间。例如,我们收到一个立即删除1K +记录的请求,如果我们在同一API调用中删除所有这些记录,那么肯定会增加API响应时间。为了减少API响应时间,我们可以在后台运行一个任务,以删除这些记录。 
 
Cron计划缺点
每当我们计划任务以给定时间或特定间隔运行时,我们就会使用计划在特定时间或间隔进行的cron作业。我们可以使用UNIX风格的crontabs,Chronos等其他工具来运行计划任务。如果我们使用的是Spring框架,则可以使用现成的Scheduled注释。 
大多数cron作业会查找何时需要采取特定措施的记录,例如,在7天后查找所有发货以及未生成发票的记录。这些调度机制中的大多数都存在缩放问题,因为我们在其中扫描数据库以查找相关的行/记录。
在许多情况下,这会导致全表扫描表现很差,想象一下实时应用程序和此批处理系统使用相同数据库的情况。
由于它不可扩展,因此我们需要一些可扩展的系统,该系统可以在给定的时间或间隔执行任务,而不会出现任何性能问题。
有许多以这种方式扩展的方法,例如以批处理方式运行任务或在用户/区域的特定子集上运行任务。另一种方法是在给定时间运行特定任务,而不依赖于其他任务,例如无服务器功能。一个延迟队列可以的情况下,一旦定时器达到预定时间的作业将被触发使用。有许多可用的排队系统/软件,但很少有提供此功能的系统,例如SQS 它提供15分钟的延迟,而不是7个小时或7天之类的任意延迟。
 
Rqueue框架
Rqueue是为Spring框架构建的消息代理,将数据存储在Redis中,并提供了一种在任意延迟下执行任务的机制。由于Redis与其他广泛使用的排队系统(例如Kafka或SQS)相比,具有一些优势,因此Rqueue得到了Redis的支持。在大多数Web应用程序的后端中,Redis用于存储缓存的数据或其他目的。在当今世界上,有8.4% 的Web应用程序正在使用Redis数据库。
通常,对于队列,我们​​使用Kafka,SQS或其他一些系统。这些系统带来了不同维度的额外开销,例如,使用Rqueue和Redis可以将金钱减少为零。
除了成本外,如果我们使用Kafka,那么我们需要进行基础架构设置,维护,即需要更多操作,因为大多数应用程序已经在使用Redis,因此我们不会有操作开销。实际上,相同的Redis服务器/群集可与Rqueue一起使用,因为 Rqueue支持任意延迟。
这篇文章的完整代码可以在我的GitHub repo中找到。 
 
消息传递
Rqueue保证至少一次发送消息,因为长时间的数据不会在数据库中丢失。您可以在这里阅读更多有关此内容:Rqueue简介
我们将需要的工具:
  • Any IDE
  • Gradle 
  • Java
  • Redis 

依赖:
  1. Spring Data Redis
  2. Spring Web
  3. Lombok 

我们将使用Rqueue库以任意延迟执行任何任务。Rqueue是基于Spring的异步任务执行器,可以在任何延迟下执行任务。它是由Spring消息传递库构建的,并由Redis支持。
我们将添加Rqueue Spring Boot starter 2.7.0依赖项:

dependencies {  
  implementation 'org.springframework.boot:spring-boot-starter-data-redis'
  implementation 'org.springframework.boot:spring-boot-starter-web'
  implementation 'com.github.sonus21:rqueue-spring-boot-starter:2.0.0-RELEASE'
  compileOnly 'org.projectlombok:lombok'   
  annotationProcessor 'org.projectlombok:lombok'
  providedRuntime 'org.springframework.boot:spring-boot-starter-tomcat'
  testImplementation('org.springframework.boot:spring-boot-starter-test') {
    exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'  
  }
}
出于测试目的,我们将启用Spring Web MVC功能,以便我们可以发送测试请求。 
 
创建任务
使用Rqueue添加任务非常简单,我们只需要使用RqueueListener注释一个方法即可。RqueuListener批注具有多个可以根据用例设置的字段,例如,设置deadLetterQueue可以将任务推送到另一个队列,否则在失败时将丢弃该任务。我们还可以使用numRetries字段设置任务应重试多少次。
创建一个Java文件名MessageListener并添加一些方法来执行任务。

import com.github.sonus21.rqueue.annotation.RqueueListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MessageListener {

  @RqueueListener(value = "${email.queue.name}")
  public void sendEmail(Email email) {
    log.info(
"Email {}", email);
  }

  @RqueueListener(value =
"${invoice.queue.name}")
  public void generateInvoice(Invoice invoice) {
    log.info(
"Invoice {}", invoice);
  }
}

 
任务提交
可以使用RqueueMessageEnqueuer bean提交任务。 它有多种方法可以根据用例排队任务,例如重试使用,重试计数和延迟任务的延迟。
我们需要AutoWire RqueueMessageEnqueuer或使用构造函数来注入此bean。
创建用于测试目的的控制器:
我们将计划在接下来的30秒内完成发票生成,为此,我们将提交一个延迟30000(毫秒)的任务。另外,我们将尝试发送将在后台完成的电子邮件。为此,我们将添加两个GET方法sendEmail和generateInvoice,我们也可以使用POST。
@RestController
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class Controller {
  private @NonNull RqueueMessageEnqueuer rqueueMessageEnqueuer;

  @Value("${email.queue.name}")
  private String emailQueueName;

  @Value(
"${invoice.queue.name}")
  private String invoiceQueueName;

  @Value(
"${invoice.queue.delay}")
  private Long invoiceDelay;

  @GetMapping(
"email")
  public String sendEmail(
      @RequestParam String email, @RequestParam String subject, @RequestParam String content) {
    log.info(
"Sending email");
    rqueueMessageEnqueuer.enqueue(emailQueueName, new Email(email, subject, content));
    return
"Please check your inbox!";
  }

  @GetMapping(
"invoice")
  public String generateInvoice(@RequestParam String id, @RequestParam String type) {
    log.info(
"Generate invoice");
    rqueueMessageEnqueuer.enqueueIn(invoiceQueueName, new Invoice(id, type), invoiceDelay);
    return
"Invoice would be generated in " + invoiceDelay + " milliseconds";
  }
}

application.properties:

email.queue.name=email-queue
invoice.queue.name=invoice-queue
# 30 seconds delay for invoice
invoice.queue.delay=300000

运行测:http://localhost:8080/email?email=xample@exampl.com&subject=%22test%20email%22&content=%22testing%20email%22

30秒后发票:
http://localhost:8080/invoice?id=INV-1234&type=PROFORMA
 
总之,我们可以使用Rqueue调度任务,而无需花费很多锅炉代码。在配置和使用Rqueue库时,我们需要考虑一些事项。重要的一项是任务是否是延迟的任务。默认情况下,假定任务需要尽快执行。
完整的代码可以在我的Github帐户中找到https://github.com/sonus21/rqueue-task-exector
Rqueue库代码:https://github.com/sonus21/rqueue