使用RSocket进行服务通信的反应性服务简介 - Rafał Kowalski


在本文中,我们将讨论微服务架构中的通信问题,以及如何使用RSocket解决这些问题。我们介绍了它的API和支持简单的“hello world”示例和基本背压机制实现的交互模型。
RSocket是一种新的,消息驱动的二进制协议,它标准化了云中的通信方法。它有助于以一致的方式解决常见的应用程序问题,并且它支持多种语言(例如java,js,python)和传输层(TCP,WebSocket,Aeron)。
RSocket中的交互被分解为帧。每个帧由一个帧头组成,该帧头包含流id,帧类型定义和特定于帧类型的其他数据。帧头之后是元数据和有效载荷 - 这些部分携带用户指定的数据。
存在多种类型的帧,表示交互模型的不同动作和可用方法。我们不打算涵盖所有这些内容,因为它们在官方文档(http://rsocket.io/docs/Protocol)中有详细描述
应该注意,在连接建立阶段之后,RSocket不区分客户端和服务器。每一方都可以开始将数据发送到另一方 - 它使协议几乎完全对称。

性能
帧作为字节流发送。它使得RSocket方式比典型的基于文本的协议更有效。从开发人员的角度来看,当JSON在网络中来回飞行时,调试系统会更容易,但是会对性能的影响。RSocket协议没有强加任何指定的序列化/反序列化机制,它将帧视为可以转换为任何东西的一个比特包。这使得我们可以使用JSON序列化或更高效的解决方案,如Protobuf或AVRO。
对RSocket性能产生巨大影响的第二个因素是多路复用。该协议在单个物理连接的顶部创建逻辑流(信道)。每个流都有其唯一的ID,在某种程度上,它可以被解释为消息传递系统的队列。此外,RSocket本身支持传输大型有效负载。在这种情况下,有效载荷帧被分成几个帧,其中有一个额外的标志 - 给定片段的序数。

反应性和流量控制
RSocket协议完全符合Reactive Manifesto中所述的原则。它在资源方面的异步特性和节俭有助于减少最终用户所经历的延迟和基础架构的成本。由于流式传输,我们不需要将数据从一个服务提取到另一个服务,而是在数据可用时推送数据。它是一种非常强大的机制,但也可能存在风险。
让我们考虑一个简单的场景:在我们的系统中,我们将事件从服务A流式传输到服务B.在接收器端执行的操作非常重要,需要一些计算时间。如果服务A推送事件的速度快于B能够处理它们的速度,最终B将耗尽资源,发送方将杀死了接收方,由于RSocket使用Reactor,它内置了对流量控制的支持,这有助于避免这种情况。
我们可以轻松提供背压机制实施,根据我们的需求进行调整。接收方可以指定它想要消耗多少数据,并且在通知发送方已准备好处理更多数据之前不会获得更多数据。另一方面,为了限制来自请求者的传入帧的数量,RSocket实现了一种租用机制。响应者可以指定请求者在定义的时间范围内可以发送多少请求。

API
如前一节所述,RSocket使用Reactor,因此在API级别上我们主要操作Mono和Flux对象。它也完全支持无功信号 - 我们可以轻松地对不同事件实现“反应” - onNext,onError,onClose等。
以下段落将介绍API和RSocket中可用的每个交互选项。讨论将以代码片段和所有示例的描述为后盾。在我们进入交互模型之前,值得描述API基础知识,因为它将出现在多个代码示例中。

使用RSocketFactory建立连接
在对等体之间建立RSocket连接相当容易。API提供了(RSocketFactory)工厂和工厂方法receive和connect ,用来分别在客户端和服务器端创建RSocket和CloseableChannel实例。
通信双方(请求者和响应者)中存在的第二个共同属性是transport。RSocket可以使用多个解决方案作为transport层(TCP,WebSocket,Aeron)。无论您选择哪种API,都会提供工厂方法,允许您调整和调整连接。

RSocket socket = RSocketFactory.connect()
        .transport(TcpClientTransport.create(HOST, PORT))
        .start()
        .block();

RSocketFactory.receive()
    .acceptor(new HelloWorldSocketAcceptor())
    .transport(TcpServerTransport.create(HOST, PORT))
    .start()
    .subscribe();

在响应接收端者,我们必须创建一个套接字接受器实例。SocketAcceptor是一个提供对等体之间契约的接口。它有一个accept接受RSocket发送请求的方法,并返回一个RSocket实例,用于处理来自对等体的请求。除了提供契约外,SocketAcceptor还允许我们访问设置框架内容。在API级别,它与ConnectionSetupPayload对象有关。

public interface SocketAcceptor {
   Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket);
}

如上所示,在对等体之间建立连接相对容易,特别是对于之前使用过WebSockets的人来说 - 就API而言,两种解决方案都非常相似。

交互模型​​​​​​​
设置连接后,我们可以继续进行交互模型。RSocket支持以下操作:

  1. fire-forget
  2. request-resonse 传统请求响应模型
  3. request-stream
  4. channel

​​​​​​​
在fire-forget模型中,还包括元数据推送,发送方都不关心操作的结果 , 它在返回类型(Mono)。在发生fire-forget时,完全成熟的帧被发送到接收器,而对于元数据推送动作,帧则不包含有效载荷 - 它仅包括头部和元数据,这种轻量级消息可用于向IoT设备的移动或对等通信发送通知。
RSocket还能够模仿HTTP行为。它支持请求 - 响应语义,并且可能是您将与RSocket一起使用的主要交互类型。在流上下文中,这种操作可以表示为由单个对象组成的流。在这种情况下,客户端正在等待响应帧,但它以完全非阻塞的方式执行。
云应用程序中更有趣的是请求流request-stream和请求通道channel交互,它们对数据流进行操作,通常是无限的。在请求流操作的情况下,请求者将单个帧发送给响应者并获回数据流。这种交互方法使服务能够将策略切换pull data到push data,而不是向响应者发送定期请求查询请求者,而是订阅流并对传入数据做出反应 - 将在可用时自动到达时发生反应。
由于多路复用和双向数据传输支持,我们可以使用请求通道方法更进一步。RSocket能够将数据从请求者流式传输到响应者,反之则使用单个物理连接。当请求者更新订阅时,这种交互可能很有用 - 例如,更改订阅标准。如果没有双向通道,客户端将不得不取消流并使用新参数重新请求它。
在API中,交互模型的所有操作都由下面显示的RSocket接口的方法表示。

public interface RSocket extends Availability, Closeable {

    Mono<Void> fireAndForget(Payload payload);

    Mono<Payload> requestResponse(Payload payload);

    Flux<Payload> requestStream(Payload payload);

    Flux<Payload> requestChannel(Publisher<Payload> payloads);

    Mono<Void> metadataPush(Payload payload);
}

在这个例子中,我们正在请求数据流,但是为了确保传入的帧不会杀死请求者,我们将背压机制放在适当的位置。为了实现这种机制,我们使用request_n框架.在订阅[ onSubscribe(Subscription s)] 的开始时,我们正在请求5个对象,然后我们在onNext(Payload payload)中计算收到的项目。当所有预期的帧到达请求者时,我们正在请求接下来的5个对象 - 再次使用subscription.request(n)方法。
本节介绍的背压机制的实施是非常基础的。在生产中,我们应该基于更准确的度量提供更复杂的解决方案,例如预测/平均计算时间。毕竟,背压机制不会使过度生产响应者的问题消失。它将问题转移到响应方,在那里可以更好地处理。关于背压的进一步阅读GitHub上


请注意,此处提供了完整的工作示例→