深入研究Spring Cloud负载平衡器 – Piotr


Spring Cloud当前即将发生大的变化。虽然几乎所有的Spring Cloud Netflix组件都将在下一版本中删除,但最大的变化似乎是将Ribbon客户端替换为Spring Cloud Load Balancer。当前,关于Spring Cloud Load Balancer的在线文章很少。实际上,该组件仍在积极开发中,因此我们可以在不久的将来期待一些新功能。Netflix Ribbon客户端是稳定的解决方案,但不幸的是它不再开发。但是,它仍被用作所有Spring Cloud项目中的默认负载均衡器,并具有许多有趣的功能,例如与断路器集成或根据来自服务实例的平均响应时间进行负载均衡。目前,Spring Cloud Load Balancer尚不提供此类功能,但是我们可以创建一些自定义代码来实现它们。在本文中,我将向您展示如何将spring-cloud-loadbalancer模块与RestTemplate 对于应用程序之间的通信,如何基于平均响应时间实现自定义负载均衡器,最后如何提供服务地址的静态列表。

您可以在我的GitHub存储库https://github.com/piomin/course-spring-microservices.git中找到与本文相关的源代码片段。该存储库也用于我的在线课程,因此我决定通过新示例对其进行扩展。所有必需的更改都在该存储库中的目录内部通信/内部呼叫者服务中执行。该代码用Kotlin编写。这里有三个应用程序,它们是示例系统的一部分:discovery-server(Spring Cloud Netflix Eureka),inter-callme-service(公开REST API的Spring Boot应用程序),最后是inter-caller-service(调用公开的端点的Spring Boot应用程序inter-callme-service)。

如何开始
为了为我们的应用程序启用Spring Cloud Load Balancer,我们首先需要包括以下启动器对Maven的依赖关系(此模块也可以与其他一些Spring Cloud启动器一起包含在内)。

dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>

由于Ribbon仍用作应用程序之间基于REST的通信的默认客户端负载平衡器,因此我们需要在应用程序属性中将其禁用。这是application.yml文件的片段。

spring:
  application:
    name: inter-caller-service
  cloud:
    loadbalancer:
      ribbon:
        enabled: false

对于发现集成,我们还需要包含spring-cloud-starter-netflix-eureka-client。要RestTemplate与客户端负载平衡器一起使用,我们应该定义此类bean并使用进行注释@LoadBalanced。正如你在下面的代码我还设置拦截上RestTemplate,但更多的是在接下来的一节。
@Bean
@LoadBalanced
fun template(): RestTemplate = RestTemplateBuilder()
        .interceptors(responseTimeInterceptor())
        .build()

使流量适应平均响应时间
Spring Cloud Load Balancer提供了简单的循环规则,可在单个服务的多个实例之间实现负载平衡。我们的目标是实施一个规则,该规则可测量每个应用程序的响应时间并根据该时间给出权重。响应时间越长,重量就越少。该规则应随机选择一个可能性,该可能性由其权重决定。要记录每个调用的响应时间,我们需要设置已经提到的实现的拦截器ClientHttpRequestInterceptor。拦截器在每个请求上执行。由于实施非常典型,因此需要一行解释。我从Slf4J中存在的线程作用域变量获取目标应用程序的地址MDC。当然,我也可以基于实现一个简单的线程范围上下文ThreadLocal,但是MDC 此处仅用于简化。

class ResponseTimeInterceptor(private val responseTimeHistory: ResponseTimeHistory) : ClientHttpRequestInterceptor {
 
    private val logger: Logger = LoggerFactory.getLogger(ResponseTimeInterceptor::class.java)
 
    override fun intercept(request: HttpRequest, array: ByteArray,
                           execution: ClientHttpRequestExecution): ClientHttpResponse {
        val startTime: Long = System.currentTimeMillis()
        val response: ClientHttpResponse = execution.execute(request, array) // 1
        val endTime: Long = System.currentTimeMillis()
        val responseTime: Long = endTime - startTime
        logger.info("Response time: instance->{}, time->{}", MDC.get("address"), responseTime)
        responseTimeHistory.addNewMeasure(MDC.get("address"), responseTime) // 2
        return response
    }
}

当然,计算平均响应时间只是我们工作的一部分。最重要的是自定义负载均衡器的实现,如下所示。它应该实现interface ReactorServiceInstanceLoadBalancer。它需要注入ServiceInstanceListSupplierbean来以重写方法获取给定服务的可用实例列表choose。选择正确的实例时,我们正在分析ResponseTimeHistoryby 保存的每个实例的平均响应时间ResponseTimeInterceptor。首先,我们的负载均衡器的作用就像简单的循环轮询。

class WeightedTimeResponseLoadBalancer(
        private val serviceInstanceListSupplierProvider: ObjectProvider<ServiceInstanceListSupplier>,
        private val serviceId: String,
        private val responseTimeHistory: ResponseTimeHistory) : ReactorServiceInstanceLoadBalancer {
 
    private val logger: Logger = LoggerFactory.getLogger(WeightedTimeResponseLoadBalancer::class.java)
    private val position: AtomicInteger = AtomicInteger()
 
    override fun choose(request: Request<*>?): Mono<Response<ServiceInstance>> {
        val supplier: ServiceInstanceListSupplier = serviceInstanceListSupplierProvider
                .getIfAvailable { NoopServiceInstanceListSupplier() }
        return supplier.get().next()
                .map { serviceInstances: List<ServiceInstance> -> getInstanceResponse(serviceInstances) }
    }
 
    private fun getInstanceResponse(instances: List<ServiceInstance>): Response<ServiceInstance> {
        return if (instances.isEmpty()) {
            EmptyResponse()
        } else {
            val address: String? = responseTimeHistory.getAddress(instances.size)
            val pos: Int = position.incrementAndGet()
            var instance: ServiceInstance = instances[pos % instances.size]
            if (address != null) {
                val found: ServiceInstance? = instances.find { "${it.host}:${it.port}" == address }
                if (found != null)
                    instance = found
            }
            logger.info("Current instance: [address->{}:{}, stats->{}ms]", instance.host, instance.port,
                    responseTimeHistory.stats["${instance.host}:${instance.port}"])
            MDC.put("address", "${instance.host}:${instance.port}")
            DefaultResponse(instance)
        }
    }
}

这是ResponseTimeHistorybean 的实现,它负责存储度量并根据计算的权重选择服务实例。

class ResponseTimeHistory(private val history: MutableMap<String, Queue<Long>> = mutableMapOf(),
                          val stats: MutableMap<String, Long> = mutableMapOf()) {
 
    private val logger: Logger = LoggerFactory.getLogger(ResponseTimeHistory::class.java)
 
    fun addNewMeasure(address: String, measure: Long) {
        var list: Queue<Long>? = history[address]
        if (list == null) {
            history[address] = LinkedList<Long>()
            list = history[address]
        }
        logger.info("Adding new measure for->{}, measure->{}", address, measure)
        if (measure == 0L)
            list!!.add(1L)
        else list!!.add(measure)
        if (list.size > 9)
            list.remove()
        stats[address] = countAvg(address)
        logger.info("Counting avg for->{}, stat->{}", address, stats[address])
    }
 
    private fun countAvg(address: String): Long {
        val list: Queue<Long>? = history[address]
        return list?.sum()?.div(list.size) ?: 0
    }
 
    fun getAddress(numberOfInstances: Int): String? {
        if (stats.size < numberOfInstances)
            return null
        var sum: Long = 0
        stats.forEach { sum += it.value }
        var r: Long = Random.nextLong(100)
        var current: Long = 0
        stats.forEach {
            val weight: Long = (sum - it.value)*100 / sum
            logger.info("Weight for->{}, value->{}, random->{}", it.key, weight, r)
            current += weight
            if (r <= current)
                return it.key
        }
        return null
    }
 
}

自定义LOADBALANCER
我们的加权响应时间规则机制的实现已经准备就绪,因此最后一步是将其应用于Spring Cloud Load Balancer。为此,我们需要使用ReactorLoadBalancerbean声明创建一个专用的配置类,如下所示。

class CustomCallmeClientLoadBalancerConfiguration(private val responseTimeHistory: ResponseTimeHistory) {
 
    @Bean
    fun loadBalancer(environment: Environment, loadBalancerClientFactory: LoadBalancerClientFactory):
            ReactorLoadBalancer<ServiceInstance> {
        val name: String? = environment.getProperty("loadbalancer.client.name")
        return WeightedTimeResponseLoadBalancer(
                loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier::class.java),
                name!!, responseTimeHistory)
    }
}

可以使用注释将自定义配置传递给负载均衡器@LoadBalancerClient。客户端名称应与发现中注册的名称相同。目前,这部分代码已在GitHub存储库中注释掉,因此,如果要启用它以进行测试,则只需取消注释即可。

@SpringBootApplication
@LoadBalancerClient(value = "inter-callme-service", configuration = [CustomCallmeClientLoadBalancerConfiguration::class])
class InterCallerServiceApplication {
 
    @Bean
    fun responseTimeHistory(): ResponseTimeHistory = ResponseTimeHistory()
 
    @Bean
    fun responseTimeInterceptor(): ResponseTimeInterceptor = ResponseTimeInterceptor(responseTimeHistory())
 
    // THE REST OF IMPLEMENTATION...
}

定制提供者实例列表
当前,Spring Cloud Load Balancer不支持在配置属性中设置的静态实例列表(与Netflix Ribbon不同)。我们可以轻松添加这样的机制。如下所示,将定义每个服务的实例的静态列表。

spring:
  application:
    name: inter-caller-service
  cloud:
    loadbalancer:
      ribbon:
        enabled: false
      instances:
        - name: inter-callme-service
          servers: localhost:59600, localhost:59800

第一步,我们应该定义一个实现接口ServiceInstanceListSupplier并覆盖两个方法的类:getServiceId()和get()。的以下实现ServiceInstanceListSupplier从应用程序属性到获取服务地址列表@ConfigurationProperties。

class StaticServiceInstanceListSupplier(private val properties: LoadBalancerConfigurationProperties,
                                        private val environment: Environment) : ServiceInstanceListSupplier {
 
    override fun getServiceId(): String = environment.getProperty("loadbalancer.client.name")!!
 
    override fun get(): Flux<MutableList<ServiceInstance>> {
        val serviceConfig: LoadBalancerConfigurationProperties.ServiceConfig? =
                properties.instances.find { it.name == serviceId }
        val list: MutableList<ServiceInstance> =
                serviceConfig!!.servers.split(",", ignoreCase = false, limit = 0)
                        .map { StaticServiceInstance(serviceId, it) }.toMutableList()
        return Flux.just(list)
    }
 
}

这是带有属性的配置类的实现。
@Configuration
@ConfigurationProperties("spring.cloud.loadbalancer")
class LoadBalancerConfigurationProperties {
 
    val instances: MutableList<ServiceConfig> = mutableListOf()
 
    class ServiceConfig {
        var name: String = ""
        var servers: String = ""
    }
 
}

与前面的示例相同,我们还应该ServiceInstanceListSupplier在自定义配置类中将Bean的实现注册为Bean。
class CustomCallmeClientLoadBalancerConfiguration) {
 
    @Bean
    fun discoveryClientServiceInstanceListSupplier(discoveryClient: ReactiveDiscoveryClient, environment: Environment,
        zoneConfig: LoadBalancerZoneConfig, context: ApplicationContext,
        properties: LoadBalancerConfigurationProperties): ServiceInstanceListSupplier {
        val delegate = StaticServiceInstanceListSupplier(properties, environment)
        val cacheManagerProvider = context.getBeanProvider(LoadBalancerCacheManager::class.java)
        return if (cacheManagerProvider.ifAvailable != null) {
            CachingServiceInstanceListSupplier(delegate, cacheManagerProvider.ifAvailable)
        } else delegate
    }
}

测试中
要测试针对本文目的实现的解决方案,您应该:

  1. 运行发现服务器实例(仅在StaticServiceInstanceListSupplier禁用时)
  2. 运行两个实例inter-callme-service(对于一个选定的实例,使用VM参数激活随机延迟-Dspring.profiles.active=delay)
  3. 运行的实例inter-caller-service,该实例在端口上可用8080
  4. 例如,使用命令将一些测试请求发送到呼叫者间服务 curl -X POST http://localhost:8080/caller/random-send/12345

下图显示了我们的测试场景。

结论
当前,Spring Cloud Load Balancer并没有提供像Netflix Ribbon客户端那样的用于服务间通信的有趣功能。当然,Spring Team仍在积极开发它。好消息是我们可以轻松自定义Spring Cloud Load Balancer来添加一些自定义功能。在本文中,我演示了如何提供更高级的负载平衡算法或如何创建自定义实例列表供应商。