使用 db-scheduler 和 Spring 的事务性分阶段作业


在 web 应用程序中,除了更新数据库之外,请求处理通常具有次要效果,例如更新另一个数据源或发送电子邮件。但是很难可靠地控制二次效应发生的时间和条件。

在这篇博文中,我将向您展示如何使用db-scheduler和 Spring 事务通过使辅助更新成为主数据库中的事务分阶段作业来增强控制。

什么是事务性分阶段作业?
这个想法本质上是不是在请求处理代码中直接执行次要更新(同步或异步),而是将这样做的意图存储在数据库中并将其绑定到与主要更新相同的本地事务。然后在提交时由后台处理框架获取此意图。

因此提交或回滚将影响这两个更新。我们将预期的二次更新称为事务性分阶段作业。
它将实现与跨越数据库和事务感知消息队列的分布式事务类似的保证。

案例
让我们用一个虚构的例子来做铺垫。我们向我们的网络应用程序请求创建新用户。新用户被保存到数据库中并发送一封欢迎他们的电子邮件。理想情况下,电子邮件只发送给成功注册的用户,所有新用户都会收到电子邮件。我们怎样才能使它可靠?
为了说明,这里有一些选项:

  • 选项 1 —创建用户后发送电子邮件。弱点:如果操作之间出现故障,则可能会在未发送电子邮件的情况下创建用户。
  • 选项 2 — 在创建用户之前发送电子邮件。弱点:如果操作之间出现故障,则可能会向不存在的用户发送电子邮件。
  • 选项 3 — 跨越本地事务中的操作并在成功创建用户后发送电子邮件。如果电子邮件发送失败,事务回滚。弱点:如果提交前出现故障,可能会向不存在的用户发送电子邮件。

两者都不能保证在成功创建用户后会尝试发送电子邮件。在这种情况下,事务性分阶段作业可能会有所帮助。

使用事务性分阶段作业模式,我们从上面的示例中得到另一个选项。

  • 选项 4 — 在本地事务中跨越创建用户和分阶段电子邮件发送作业的操作。对可见的分阶段作业进行后台进程轮询并执行它们。

这将保证我们将始终在事务提交(并创建用户)时尝试发送电子邮件。它可能仍然会失败,但由于它现在是一项持久性工作,我们可以控制如果它失败应该发生什么。例如:重试。

db-scheduler非常适合,因为它实际上是一个用于处理存储在数据库表中的持久作业的库。在继续实施该示例之前,让我们简要了解一下它是如何工作的。

什么是db-scheduler?
db-scheduler是一个持久化的Java工作调度器,它的建立是为了简单地使用和理解,并易于嵌入到现有的应用程序中。它使用一个RDBMS来实现持久性和可靠性。它同时支持一次性和重复性的作业(又称任务)。

作业的调度实例(又称执行)被跟踪在一个单一的表,即 scheduled_tasks。调度器每隔<polling-interval>(默认为10s)检查一次到期的执行,对于任何到期的执行,它在一个线程池中找到并运行合适的任务(即执行)。当它完成后,一个CompletionalHandler决定下一步应该发生什么。例如,根据指定的时间表删除或重新安排。

一个任务是一个Java类,有以下组成部分:

  • 一个名称,即一个唯一标识该任务的字符串。用于将从数据库中获取的执行映射到它们的Java实现。
  • 一个ExecutionHandler。定义了当调度器执行任务实例时应该运行的代码。
  • 一个FailureHandler。定义当ExecutionHandler抛出异常时应该发生什么。默认为无限次重试,有固定的延迟。
  • 一个DeadExecutionHandler。定义了当调度器检测到一个已经被选中的执行,但缺少表示正在执行的心跳时,应该发生什么。例如,由非优雅的关闭引起的。默认为尽快重试。

Execution 执行是一个任务的实例,计划在某个时间点上运行。它有以下组成部分:

  • 一个任务(以任务名的形式)。
  • 该任务实例的一个字符串ID。用于识别和去重。
  • 一个执行时间,指定它应该何时运行。
  • data 是这个特定实例的可选,即一个有效载荷。默认情况下,数据是使用Java序列化的。对于Spring Boot,建议使用JSON序列化。
  • 好了,这是对db-scheduler如何工作的一个非常简短的总结。更多细节见README。现在,让我们来看看如何在db-scheduler中实现上面的事务性分阶段作业。

一个使用db-scheduler和Spring Boot的解决方案
我们将使用spring-boot-autostarter(由evenh贡献)来在Spring Boot项目中设置db-scheduler。该启动器会将Spring上下文中发现的任何任务类与Scheduler实例进行采集和注册,并启动它。maven的依赖性:

<dependency>
    <groupId>com.github.kagkarlsson</groupId>
    <artifactId>db-scheduler-spring-boot-starter</artifactId>
    <version>12.1.0</version>
</dependency>


有了这个依赖关系,让我们注册一个任务,向新用户发送电子邮件。ExecutionHandler是提供给execute方法的lambda。其他处理程序保持默认状态。

@Configuration
public class SendUserEmailConfiguration {

  public static final TaskWithDataDescriptor<EmailData> EMAIL_NEW_USER_TASK =
      new TaskWithDataDescriptor<>("email-new-user", EmailData.class);

  @Bean
  public Task<EmailData> sendEmailTask() {
    return Tasks.oneTime(EMAIL_NEW_USER_TASK)
        .execute(
            (TaskInstance<EmailData> inst, ExecutionContext ctx) -> {
              EmailData emailData = inst.getData();
              sendWelcomeEmail(emailData);          
// dummy implementation
            });
  }

上面使用的EmailData类是持有与背景工作相关数据的类。定义为::

public class EmailData implements Serializable {
    // json serialization can also be used
    private static final long serialVersionUID = 1L; 
    public final String username;
    public final String emailAddress;

    public EmailData(String username, String emailAddress) {
        this.username = username;
        this.emailAddress = emailAddress;
    }
}

最后,让我们定义请求处理代码,它将在每次新用户注册时运行,即上面的选项4。

public UserController(SchedulerClient schedulerClient, TransactionTemplate tx) {
  this.schedulerClient = schedulerClient;
  this.tx = tx;
}

@PostMapping
public void registerNewUser(@RequestBody NewUser newUser) {
  String username = newUser.getUsername();
  String emailAddress = newUser.getEmailAddress();

  // begin transaction (tx)
  tx.executeWithoutResult(  
// 3
      (TransactionStatus status) -> {
        userDao.createUser(username, emailAddress);  
// 1

        EmailData data = new EmailData(username, emailAddress);
        TaskInstance<EmailData> newEmailInstance =
            EMAIL_NEW_USER_TASK.instance(UUID.randomUUID().toString(), data);

       
// Schedule the INTENT of sending an email
       
// This will insert a new job in the table backing db-scheduler
        schedulerClient.schedule(newEmailInstance, Instant.now());  
// 2

        doSomeOtherStuffThatMightFail();
      });
}

上面的例子有三个重要部分。

  • userDao.createUser是在数据库中持久化新用户所必需的虚构操作。
  • SchedulerClient是用来安排尽快发送电子邮件的意图。db-scheduler中的调度基本上意味着在scheduler_tasks表中插入一条新行。
  • Spring的TransactionTemplate被用来以编程方式控制事务边界。在这里,我们确保它横跨两个数据库操作(userDao.createUser和schedulerClient.schedule)。这依赖于一个事实,即SchedulerClient被送入一个具有事务感知能力的数据源,在Spring的案例中是TransactionAwareDataSourceProxy。

如果事务提交了,发送邮件的执行就变得可见,并被选中执行。如果事务回滚,调度器将永远看不到该执行。请看README中的更多配置选项(例如自定义FailureHandler或.enableImmediateExecution())。

概括
在这篇博文中,我展示了如何通过使用与主更新相同的本地事务将它们暂存为数据库中的持久作业来实现可靠的辅助更新。db-scheduler 是一个开源的 Java 作业调度程序,可以很容易地促进这一点,因为它被设计为将数据库用作作业队列并且了解 Spring 事务。

如果您想试用上面的示例,请查看官方示例TransactionallyStagedJobConfiguration.java,它是此处示例的变体。如前所述,这类似于微服务架构中的事务性发件箱模式。db-scheduler 也可用于实现类似Saga 模式的东西,我们需要以调用其他服务的形式编排多个二次更新。