使用RSocket进行服务通信的反应性服务 - 负载平衡和可恢复性 | Rafał Kowalski


RSocket可以彻底改变分布式系统中的机器到机器通信。在以下段落中,我们将讨论云中的负载平衡问题以及我们将介绍有助于处理网络问题的可恢复性功能,尤其是在物联网系统中。
请注意,本文中提供的代码示例可在GitHub上获得

高可用性和负载平衡是企业级系统的关键部分
应用程序可用性和可靠性是银行和保险等许多业务领域的重要组成部分。在这些要求苛刻的行业中,即使在高流量,网络延迟增加或自然灾害期间,服务也必须全天候运营。为确保最终用户始终可以使用该软件,通常会在多个可用区域中以冗余方式部署该软件。
在这种情况下,每个微服务的至少两个实例部署在至少两个可用区中。这种技术有助于我们的系统恢复弹性并增加其容量 - 微服务的多个实例能够处理明显更高的负载。那么诀窍在哪里?冗余引入了额外的复杂性。作为工程师,我们必须确保传入流量分布在所有可用实例中。解决此问题有两种主要技术:服务器负载平衡和客户端负载平衡。
第一种方法基于请求者不知道响应者的IP地址的假设。取而代之的是,请求者与负载均衡器进行通信,负载均衡器负责将请求分布在与其连接的微服务上。这种设计在云时代相当容易采用。IaaS提供商通常拥有内置的可靠解决方案,例如Amazon Web Services中提供的Elastic Load Balancer。这种技术的主要缺点是我们必须配置和部署额外的资源,如果我们的系统由数百个微服务组成,这可能会很痛苦。此外,它可能会影响延迟 - 每个请求在负载均衡器上都有额外的“网络跳跃”。
第二种技术颠倒了这种关系。请求者不知道用于连接到响应者的中心点,而是知道给定微服务的每个实例的IP地址。拥有这些知识后,客户端可以选择发送请求的响应者实例或打开连接。此策略不需要任何额外资源,但我们必须确保请求者具有响应者的所有实例的IP地址,客户端负载平衡模式的主要好处是它的性能 - 通过减少一个额外的“网络跳跃”,我们可以显着减少延迟。这是RSocket实现客户端负载平衡模式的关键原因之一。

RSocket中的客户端负载平衡
在代码级别上,在RSocket中实现客户端负载平衡非常简单。该机制依赖于LoadBalancedRSocketMono作为一包可用RSocket实例的对象,由RSocket供应商提供。要访问RSockets,我们必须订阅LoadBalancedRSocketMono哪个onNext信号发出完全成熟的RSocket实例。此外,它计算每个RSocket的统计数据,以便能够估计每个实例的负载,并在此基础上选择在给定时间点具有最佳性能的实例。
该算法考虑了多个参数,如延迟,维护连接数以及许多待处理请求。每个RSocket的运行状况由可用性参数反映 - 该值从0到1取值,其中0表示给定实例无法处理任何请求,1表示分配给完全运行的套接字。下面的代码片段显示了负载均衡的RSocket的基本示例,它连接到响应者的三个不同实例并执行100个请求。每次它从LoadBalancedRSocketMono对象中获取RSocket 。

@Slf4j
public class LoadBalancedClient {

    static final int[] PORTS = new int[]{7000, 7001, 7002};

    public static void main(String[] args) {

        List<RSocketSupplier> rsocketSuppliers = Arrays.stream(PORTS)
                .mapToObj(port -> new RSocketSupplier(() -> RSocketFactory.connect()
                        .transport(TcpClientTransport.create(HOST, port))
                        .start()))
                .collect(Collectors.toList());

        LoadBalancedRSocketMono balancer = LoadBalancedRSocketMono.create((Publisher<Collection<RSocketSupplier>>) s -> {
            s.onNext(rsocketSuppliers);
            s.onComplete();
        });

        Flux.range(0, 100)
                .flatMap(i -> balancer)
                .doOnNext(rSocket -> rSocket.requestResponse(DefaultPayload.create("test-request")).block())
                .blockLast();
    }

}

如果云中的机器到机器通信,实时流数据并不是什么大问题,但如果我们考虑位于无法访问稳定、可靠的互联网连接的区域的物联网设备,问题就会变得更多复杂。我们可以轻松确定在这样的系统中可能遇到的两个主要问题:网络延迟和连接稳定性。从软件的角度来看,我们可以用第一个做很多事情,但我们可以尝试处理后者。让我们用RSocket来解决这个问题,从选择合适的交互模型开始。在这种情况下最合适的是request stream方法,其中部署在云中的微服务是请求者,温度传感器是响应者。在选择交互模型后,我们应用了可恢复性机制。在RSocket中,我们通过resume()调用的方法来实现RSocketFactory,如下例所示:

@Slf4j
public class ResumableRequester {

    private static final int CLIENT_PORT = 7001;

    public static void main(String[] args) {
        RSocket socket = RSocketFactory.connect()
                .resume()
                .resumeSessionDuration(RESUME_SESSION_DURATION)
                .transport(TcpClientTransport.create(HOST, CLIENT_PORT))
                .start()
                .block();
        socket.requestStream(DefaultPayload.create("dummy"))
                .map(payload -> {
                    log.info(
"Received data: [{}]", payload.getDataUtf8());
                    return payload;
                })
                .blockLast();

    }
}

@Slf4j
public class ResumableResponder {

    private static final int SERVER_PORT = 7000;
    static final String HOST = "localhost";
    static final Duration RESUME_SESSION_DURATION = Duration.ofSeconds(60);

    public static void main(String[] args) throws InterruptedException {
        RSocketFactory.receive()
                .resume()
                .resumeSessionDuration(RESUME_SESSION_DURATION)
                .acceptor((setup, sendingSocket) -> Mono.just(new AbstractRSocket() {
                    @Override
                    public Flux<Payload> requestStream(Payload payload) {
                        log.info(
"Received 'requestStream' request with payload: [{}]", payload.getDataUtf8());
                        return Flux.interval(Duration.ofMillis(1000))
                                .map(t -> DefaultPayload.create(t.toString()));
                    }
                }))
                .transport(TcpServerTransport.create(HOST, SERVER_PORT))
                .start()
                .subscribe();
        log.info(
"Server running");

        Thread.currentThread().join();
    }
}

请注意,要运行提供的示例,您需要在计算机上安装“socat”,请参阅自述文件以获取更多详细信息

请求者和响应者端的机制类似,它基于一些组件。首先,有一个ResumableFramesStore作为帧的缓冲区。默认情况下,它将它们存储在内存中,但我们可以通过实现ResumableFramesStore接口轻松调整它以满足我们的需求(例如,将帧存储在分布式缓存中,如Redis)。存储保存在保持活动帧之间发出的数据,这些帧是定期来回发送的,并指示对等体之间的连接是否稳定。
此外,保持活动帧包含令牌,该令牌确定请求者和响应者的最后接收位置。当对等方想要恢复连接时,它会发送具有隐含位置的恢复帧。隐含位置是根据上次接收的位置(与我们在保持活动帧中看到的值相同)加上从该时刻收到的帧的长度计算得出的。该算法适用于通信双方,在恢复帧中,它由最后接收的服务器位置和第一个客户端可用位置令牌反映。(恢复操作的整个流程点击标题见原文配图)
通过采用RSocket协议中内置的可恢复性机制,我们可以相对较低的努力减少网络问题的影响。如上例所示,可恢复性在数据流应用程序中可能非常有用,尤其是在设备进行云通信的情况下。

总结
在本文中,我们讨论了RSocket协议的更高级功能,这些功能有助于减少网络对系统操作性的影响。我们介绍了客户端负载均衡模式和可恢复性机制的实现。这些功能与强大的交互模型相结合构成了协议的核心。

(banq注:重试 幂等这些机制在Rsocket实现是否需要考虑?)