使用Java 9 Flow进行响应式编程

18-11-28 banq
         

在本指南中,您将学习Java 9中的Flow API如何帮助您使用新的Publisher和Subscriber构建反应模式。阅读之后,您应该能够理解这种新的编程风格及其优缺点。

本指南重点介绍新的  Flow API,它使我们能够仅使用JDK采用Reactive Programming,而不需要其他库,如RxJava或Project Reactor等。

但是,在看了API之后,你很快就会发现这个API由几个接口和一个实现组成:

  • 接口  Flow.Publisher <T>定义了生成项目和控制信号的方法。
  • Flow.Subscriber <T> 接口  定义了接收这些消息和信号的方法。
  • Flow.Subscription接口   定义了链接发布服务器和订阅服务器的方法。
  • 接口  Flow.Processor <T,R>定义了一些方法来执行一些高级操作,例如将项目的转换从发布者链接到订阅者。
  • 最后,SubmissionPublisher类  <T>实现了Flow.Publisher <T>,它是一个灵活的项目生成器,符合Reactive Streams计划。

即使没有很多类可以使用,包括Java 9中的这个API也是一个重大变化:第三方可以为依赖于这些接口的库提供Reactive支持,例如从JDBC驱动程序到RabbitMQ的反应式实现。

从Pull 到Push 再到Pull-Push

反应式编程主要是消费者控制数据流,由于它集成在主要框架和库分发版(例如Java 9或Spring 5)中,它现在变得流行,并且分布式系统的兴起带来了大量的数据需要相互关联传达。

回顾过去有助于我们了解其崛起。几年前,从消费者那里获取数据的最流行的技术是基于Pull 拉的机制。客户端定期轮询数据,如果可用,他们会读取数据。优点是,在资源较少的情况下,他们可以控制数据流(停止轮询); 主要缺点是在没有任何消耗时通过轮询数据浪费处理时间和/或网络资源。

随着时间的推移,趋势发生了变化,从生产者那里推送数据并让消费者照顾它变得很受欢迎。问题在于消费者可能拥有比生产者更有限的资源,在消费者缓慢和数据丢失的情况下最终会得到完整的输入缓冲。如果只发生在我们订阅者中的一小部分用户,这可能会很好,但是如果它发生在大多数用户身上呢?我们可以做得更好,减缓我们的生产者发布...

Reactive Programming附带的混合推拉Pull-Push方法试图带来两全其美:它让消费者负责请求数据并控制来自发布者的流量,这也可以在阻止或丢弃数据时做出决定资源。我们将在下面看到一个很好的实际例子。

Flow和Stream的区别

应式编程并不是取代函数编程。两者兼容并且完美地协同工作。虽然Java 8中引入的Streams API非常适合处理数据流(map,reduce和所有变体),但Flow API会在通信方面(请求,减速,丢弃,阻塞等)发挥作用。

您可以将Streams用作Publisher的数据源,根据需要阻止它们或删除项目。您也可以在订阅者身边使用它们,例如,在收到某些项目后执行聚合。更不用说所有其他的编程逻辑,其中反应流不适合,但它可以用函数式编写,并且比命令式编程的可读性和维护更容易十倍。

有个困扰:如果你需要两个系统之间交换并转换数据怎么办?Stream和Flow如何一起工作?在这种情况下,我们可以使用Java 8函数将源映射到目标(转换它)但我们不能在发布者和订阅者之间使用Stream,对吗?

我们可能会想到在两者之间创建一个订阅者,它从原始发布者那里获取项目,转换它,然后像发布者那样发布。好消息:这就是Java 9的Flow.Processor<T, R>模式, 所以我们只需要实现该接口并在那里编写函数来转换数据。

就个人而言,我不喜欢全反应,  过度反应或成为反应性布道者(我无法决定具体的术语)。尽量不要为此疯狂。

案例

本指南中包含的示例代码模拟了Magazine Publisher用例。发布者只有两个订阅者。

出版商将为每位订阅者制作一系列20种杂志。他们知道他们的读者在交货时通常不在家,他们想避免邮递员退回杂志或扔掉杂志。这可能发生,因为发布者知道订阅者的邮箱通常很小,无法放置更多邮件(订阅者的缓冲区)。

取而代之的是,他们实施了一个非常创新的交付系统:用户在他们在家时打电话给他们,他们几乎立即交付一本杂志。出版商计划在办公室为每位订阅者保留一个小盒子,以防有些人在发布杂志后不立即致电该杂志。经理认为在发行人办公室为每个订户保留最多8个杂志的空间已经足够了(注意缓冲区现在是如何在出版商那边)。

然后,其中一名工人前往经理办公室,警告他们不同的情况:

  • 如果订阅者的速度足以要求发送,就像发布者正在打印新杂志一样快,就不会有空间问题。
  • 如果订阅者没有像打印杂志那样以相同的速度打电话,那么这些盒子可能会变满。工人要求管理者应如何  作出反应,以这种情况下:

    1. 将每个用户的盒子大小增加到20,这将解决问题(即发布者方面的更多资源)。
    2. 停止打印直到情况得到修复(订阅者请求至少一个)然后放慢速度,这会损害一些可能足够快以使他们的盒子保持空白的订阅者。
    3. 在生产后立即将任何不适合订户盒的杂志扔到回收站(丢弃)。
    4. 中间解决方案:如果任何一个框已满,请在打印下一个数字之前等待最长时间。如果在那之后还没有空间,那么他们将回收(丢弃)新号码。

经理解释说,他们买不起第一个解决方案,花费这么多资源只是为了应对缓慢的用户将是一种浪费,并决定选择礼貌等待(d),这可能会损害一些用户,但只是为了减少时间。营销团队决定将这种方法称为Reactive Magazine Publishing,因为它“适应了他们的读者”。提升上述分析的工人成为本月的员工。

(晕倒,这个案例用例够复杂的,业务复杂,技术复杂,不适合学习!需要详细了解可点击标题看原文)

 在GitHub上找到源码:Java 9 Flow - Reactive

         

1