resilience4j不够用?自制分布式断路器来帮忙 -Nicolas

20-04-30 banq

当服务的多个实例可以调用指定的外部服务,在这些服务实例中都要定制断路策略很浪费,比如调用外部服务一段时间后进行关闭处理逻辑等。他们可以统一共享调用同一个外部服务的统计信息,这样一个调用失败以后,其他服务实例就不要再重试一遍,这是使用分布式断路器的地方。

由于找不到现有解决方案,我们决定自行尝试一下。简而言之,我们从ratelimitj的启发中快速构建了一个分布式断路器。通话统计信息是使用Redis共享的,它就像一个超级按钮。我们计划将其开源,但这将是另一篇文章的主题;-) 这里分享思路。

1. 共享的统计信息:指标,跟踪和监视

我们对指标的需求非常简单:对于每个队列和每种命令,我们都希望跟踪,并了解其中有多少成功了,或失败和重试或最终移入隔离区。这将使我们能够了解流量并在需要时调整参数。

鉴于我们在Java / Kotlin应用程序中使用的是Spring Boot,因此这里没有做任何决定:我们将照常使用Micrometer来发布带有适当标签的量规,然后在Datadog中遵循这些指标,这是一个(好)监控SAAS。

2. 可视化

尽管有日志,指标和警报是很明显的,但我们希望有一种方法可以随时可视化计划或隔离的任务及其尝试次数和任何可能的错误,以及采取行动来执行这些任务(删除它们) ,重新安排时间,等等)。

3.总体设计思路:

第一个设计决策是关于如何表示命令,执行请求和隔离的。以下是做出的主要决定和最终决定:

  • 命令是一块代码,可以通过名称找到,并且通过提供一个其(Java)的地图的参数执行。由于我们使用的是Spring,因此命令是Spring组件,并且Spring上下文充当命令注册表。
  • 命令执行请求包括一个命令的名称和参数,例如:(名称= pushTransactionForInvoice,ARGS = {invoiceId,的transactionId})
  • 命令与“逻辑”队列相关联,每个队列对应于一个外部服务。因此,该“逻辑”队列是跟踪呼叫并应用速率限制和断路逻辑的单元。
  • 计划后,命令执行请求将作为任务存储在PostgreSQL表(队列表)中,并具有以下核心详细信息:任务ID,计划执行日期/时间,“逻辑”队列名称,命令名称,命令参数,命令重量(限制wrt速率,请参阅“算法”部分
  • 还存储了与命令的执行相关的其他详细信息:任务状态(PENDING或LOCKED),到目前为止的执行次数,最新的执行错误(如果有)。
  • 最后,存储“ 下一个任务 s” 的列表,这些列表是仅在成功完成当前任务后才执行的命令。稍后对此有更多详细信息。
  • 命令的参数和要执行的下一个任务的列表存储为JSONB,以说明它们的可变性。
  • 该隔离被表示为第二表,存储几乎快要任务,加上上次执行尝试的时间相同的细节。

4.为指定服务配置新的命令队列

//将请求定义为命令 放入队列中
const val MY_QUEUE_NAME: String = "myServiceQueue"

@Configuration
class MyServiceQueueConfiguration {

    @Bean(MY_QUEUE_NAME)
    fun myServiceQueue(commandExecutionQueueFactory: CommandExecutionQueueFactory) =
        commandExecutionQueueFactory.createQueue(
                MY_QUEUE_NAME,

                // optionally redefine part or totality of the default policy
                DEFAULT_EXECUTION_POLICY.copy(
                        concurrency = 4,
                        delayBeforeConsideringTask = Duration.ofSeconds(5),
                        maxRetriesBeforeQuarantine = 10,

                        // optional, none by default
                        rateLimits = RateLimits(
                                2300 executionsOver Duration.ofMinutes(15),
                                4500 executionsOver Duration.ofMinutes(30),
                                8800 executionsOver Duration.ofHours(1)
                        ),

                        // optional, none by default
                        circuitBreaking = CircuitBreaking(
                                failureRateThreshold = 0.5,
                                windowDuration = Duration.ofMinutes(10),
                                // will tell that some exceptions are to be considered as provider failures
                                considerExceptionAsFailureIf = someExceptionPredicate()
                        )
                ),

                // optional, a probe that will be queried to know whether to pause task consumption
                // (may query a feature flag, a state defined via some UI, etc.)
                somePauseProbe()
        )
}

您可以在此处找到所有可用的队列选项

将命令执行请求定义为一个简单的对象,其中包含要执行的命令的名称和一个(Java)参数映射。

5. 命令执行队列的核心逻辑

class CommandQueue(...) {
    // ...
    
    override fun schedule(command: CommandSpecification) {
        // add task to queue, log details, emit metrics
        schedule(ScheduledTask(
                command,
                queueName,
                clock,
                // this is the important part for deduplication to work
                scheduledExecutionDate = executionPolicy.computeNextExecutionDate(clock, tries = 0)
        ), command.deduplicate)
    }

    // ...

    override fun processCommands(): Boolean {
        val circuitBreaker = circuitBreaker()
        if (circuitBreaker.isOpen()) {
            return false
        }

        val task = taskRepository.tryLockingTaskWithEarliestScheduleOlderThan(queueName, LocalDateTime.now(clock))
                ?: return false

        val command = commandRegistry.get(task.commandName)

        // each case: decides what to do with task, log details, emit metrics
        val executionResult = when {
            command == null -> commandNotFound(task)
            violatesRateLimit(task.weight) -> rateLimited(task)
            !running -> aborted(task)
            else -> executeCommand(command, task)
        }

        registerCall(circuitBreaker, executionResult)
        // remove task, or move it to quarantine, or update number of tries
        handleExecutionResult(executionResult)
        return executionResult.commandExecuted
    }
    
    // ...
}

6.缺陷

轮询PostgreSQL表也不是一个好主意。但是,每个“逻辑”队列每秒最多只能轮询一次。

谈到队列,它们都在同一张表中进行管理,考虑到更多的使用情况,这可能是性能问题。如果发生这种情况,我们可以将表专用于每个队列,而当要添加队列时,我们需要付出更多配置的代价(现在我们需要创建表)。

我们系统的一个更实际的限制是它目前仅处理同步操作,但是我们可以对其进行调整,以便以异步方式接收命令执行的结果。

更多点击标题见原文

                   

猜你喜欢