Spring Cloud Stream事件路由 - spring.io


Spring Cloud Stream(SCSt)的事件路由有以下功能:a)将事件路由到特定事件订阅者,或b)将事件订阅者产生的事件路由到特定目的地。
让我们快速看一下基于注释的编程模型中的工作方式。在本文中,我们将其称为路由“ TO”和路由“ FROM”。
为了路由到事件订阅者,我们使用condition了StreamListener注释的属性,如下所示:

@StreamListener(target = Sink.INPUT, condition = "headers['type']=='order'")
public void receiveOrders(Order order) {...}

是有关此方法的更多详细信息。
而且,为了从事件订阅者进行路由,我们使用了动态绑定目标 -这种方法允许框架根据单个事件中提供的某些指令将框架绑定到目标。

具有函数的事件路由
使用函数性方法,我们可以通过一些附加函数以更简洁明了的方式完成上述所有操作。

路由“ TO”
可以通过依赖Spring Cloud Function(SCF)中可用的路由功能来实现路由“ TO”功能。您可以通过设置spring.cloud.stream.function.routing.enabled属性来显式启用路由,也可以通过设置spring.cloud.function.routing-expression属性并使用Spring Expression Language(SpEL)提供路由指令来隐式启用路由。路由指令应导致路由到“ TO”的功能的定义。
对于路由目的,路由目的地的名称是functionRouter-in-0(见RoutingFunction.FUNCTION_NAME和描述的绑定命名约定在这里)。
当一个消息被发送到该目的地,路由功能尝试确定哪些实际功能需要来处理这样的事件。它首先试图访问spring.cloud.function.routing-expression消息报头,并且如果提供,确定实际的函数调用的名称。这是最动态的方法。第二种最动态的方法是提供spring.cloud.function.definition标头,其中应包含将“ TO”路由到的函数的定义。两种方法都需要通过设置spring.cloud.stream.function.routing.enabled属性来明确启用路由功能。
至于以前版本中没有的其他功能,spring.cloud.function.routing-expression也可以用作应用程序属性。例如,请考虑无论传入事件如何,表达式都相同的情况,如本文前面显示的基于注释的示例(例如,spring.cloud.function.routing-expression=headers['type']=='order')。对于这种方法,您无需显式启用路由功能,因为spring.cloud.function.routing-expression作为应用程序属性具有相同的效果。
尽管很简单,但以下是上述方法之一的完整示例:

@SpringBootApplication
public class RoutingStreamApplication {

  public static void main(String[] args) {
      SpringApplication.run(RoutingStreamApplication.class,
      "--spring.cloud.function.routing-expression="
      +
"T(java.lang.System).nanoTime() % 2 == 0 ? 'even' : 'odd'");
  }
  @Bean
  public Consumer<Integer> even() {
    return value -> System.out.println(
"EVEN: " + value);
  }

  @Bean
  public Consumer<Integer> odd() {
    return value -> System.out.println(
"ODD: " + value);
  }
}

通过发送消息到functionRouter-in-0,这是由rabbit或kafka绑定暴露的,基于消息系统时间的nanoTime()方法返回值,消息将被路由到Consumer相应的'even'或'odd'

路由“ FROM”
和以前一样,路由“ FROM”依赖于SCSt的“动态绑定目标”功能。但是,与路由“ TO”一样,还有许多其他功能。
以下示例显示了基础知识:

@Autowired
private BinderAwareChannelResolver resolver;

public Consumer<String> send(Message message) {   
     MessageChannel destination = resolver
        .resolveDestination(message.getHeaders().get("type"))
     Message outgoingMessage = . . .
// your code
     destination.send(outgoingMessage);
}

您所需要的只是BinderAwareChannelResolver的引用(在后面的示例中自动注入)。然后,您可以使用一些逻辑来确定目标名称(在本例中,我们使用“类型”标头的值)。确定目的地名称后,您可以通过使用该BinderAwareChannelResolver.resolveDestination(..)操作并向其发送消息来获取对其的引用。这就是全部。
上述方法的缺点是某些特定于框架的抽象会泄漏到您的代码中。看一下您需要了解BinderAwareChannelResolver和的MessageChannel事实。实际上,前面示例中的大多数代码都是样板代码。
一种更动态,更少泄漏的方法是依靠spring.cloud.stream.sendto.destination属性,这有效地完成了上述所有操作-但在幕后。下面的示例演示如何使用此方法:

@SpringBootApplication
public class RoutingStreamApplication {

  @Bean
  public Function<Message<String>, Message<String>> process() {
    return message -> {
      // some logic to process incoming message
      Message<String> outgoingMessage = MessageBuilder
        .withPayload(
"Hello")
        .setHeader(
"spring.cloud.stream.sendto.destination", "even")
        .build();
       return outgoingMessage;
     };
  }
}

我们不再需要注入BinderAwareChannelResolver执行解析MessageChannel。我们只需创建一个新Message,指定一个头部header:框架使用这个标头即可动态解析目标。

路由源
最后但并非最不重要的一点,让我们看一下路由“ FROM”的另一个流行用例,其中数据源起源于SCSt的上下文之外,但需要路由到适当的目的地:

@Controller
public class SourceWithDynamicDestination {
    @Autowired
    private ObjectMapper jsonMapper;

    private final EmitterProcessor<?> processor = EmitterProcessor.create();

    @RequestMapping(path = "/", method = POST, consumes = "*/*")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@RequestBody String body, 
      @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) 
      throws Exception {
        Map<String, String> payload = jsonMapper.readValue(body, Map.class);
        String destination = payload.get(
"id");
        Message<?> message =
          MessageBuilder.withPayload(payload)
           .setHeader(
"spring.cloud.stream.sendto.destination", destination)
           .build();
        processor.onNext(message);
    }

    @Bean
    public Supplier<Flux<?>> source() {
        return () -> processor;
    }
}

然后,我们可以通过运行以下curl命令来查看结果:

curl -H "Content-Type: application/json" -X POST -d '{"id":"customerId-1","bill-pay":"100"}' http://localhost:8080

在这里,我们借助Supplier<Flux<?>>bean 既使用函数性方法又使用反应式范例的混合。我们有一个简单的MVC控制器,我们希望根据内容的'id'属性值将请求路由到下游。尽管EmitterProcessor此处的详细信息及其用法是另一篇文章的主题,但重要的是它演示了一个功能齐全的应用程序,其中HTTP请求被动态路由到目标绑定程序管理的目的地。
在GitHub上查看Spring Cloud Stream