在反应式编程中使用BlockHound检查调用链的堵塞 - frankel


反应式Reactive编程是基于异步消息传递。不同的框架/库在它们的方法上会有所不同:例如,在 Project Reactor 中,API 调用不是阻塞请求-响应调用,而是对发布者将在未来交付的消息的订阅。
标准调用链很少只涉及发布者和订阅者。一般来说,它们之间有多个步骤。每个中间步骤,称为处理器,充当下一步的订阅者和前一个步骤的生产者。当目标生产者发出时,它会向其订阅者发送一条消息,该订阅者向其订阅者发送一条消息,依此类推,直到链中的第一个订阅者。
如果链中的一个调用被阻塞,它会“冻结”整个链,直到工作完成。在这种情况下,它会大大减少反应式方法的好处。进入BlockHound
BlockHound 将透明地检测 JVM 类并拦截阻塞调用(例如 IO),如果它们是从标记为“非阻塞操作”的线程(即实现 ReactorNonBlocking标记接口的线程,如由 执行的Schedulers.parallel()启动的线程)
简而言之,BlockHound 在不应该存在的地方检查阻塞调用。它允许检查阻塞调用是否发生在不应发生的线程中,并在发生时在运行时抛出异常。
 
使用 BlockHound
使用 BlockHound 很简单:

  • 将依赖项添加到您的类路径
  • 调用 BlockHound.install()

  
阻塞只对“主”线程的调用
BlockHound 不会抛出每个阻塞调用,而只会抛出发生在非阻塞线程上的调用。要将线程标记为非阻塞:
public static void main(String[] args) throws InterruptedException {
    BlockHound.install(builder -> {
        builder.nonBlockingThreadPredicate(current ->
                current.or(thread -> thread.getName().equals("main"))); //该main线程被标记为非阻塞
    });
    Thread.currentThread().sleep(200);
}

运行上面的代码预期会抛出以下内容:
Exception in thread "main" reactor.blockhound.BlockingOperationError: \
  Blocking call! java.lang.Thread.sleep
    at java.base/java.lang.Thread.sleep(Thread.java)
    at ch.frankel.blog.blockhound.B.main(B.java:11)

 

BlockHound 的好坏取决于它的配置
虽然它的工件groupId是io.projectreactor,但 BlockHound 是通用的:

  • JAR 是独立的。唯一的依赖是 ByteBuddy,它被 BlockHound 遮蔽。
  • BlockHound 提供了一个集成抽象。集成允许提供配置位。它可以是通用的或特定于框架的,例如RxJava、Reactor 等。
  • 使用 Java 的服务加载器加载集成

 
结论
使用 BlockHound很简单,即,BlockHound.install()。一个简短的解释是 BlockHound 是一个 Java 代理。
点击标题