Spring Reactor介绍

13-10-27 banq
         

Reactor - A Foundation for Reactive FastData Appli该文简单介绍了Spring reactor 1.0的基本特性。

目前reactor是作为Spring.io核心包下面项目。

Reactor 是一个基础性库包

–定位在用户级和低级之间的灰色区域的抽象。

– 能够在Reactor上建立组件和应用核心

– 驱动器 服务器和数据整合库,领域整合库,事件驱动架构

Reactor的应用是reactive的。

– 属于Reactive Extensions in .NET

– 类似Netflix RxJava

– 观察者模式Observer pattern

Reactor应用基于一个Selector发送事件。

– 象一个消息系统中routing topic, 但是它是一个对象

– 支持Regex, URI template, Class.isAssingableFrom, 定制逻辑

Reactor Core内部封装了LMAX Disruptor的RingBuffer,再通过Reactor-Spring等支持支持各种Spring应用,如下图:


Reactor演示代码

Environment env = new Environment();
Reactor reactor = Reactors.reactor()
                               .env(env)
                               .dispatcher(RING_BUFFER)
                               .get();

reactor.on($(“topic”), (Event<String> ev) → {
                             System.out.println(“Hello “ + ev.getData());
                  });

reactor.notify(“topic”, Event.wrap(“John Doe”));
<p>

RING_BUFFER是Disruptor的RingBuffer操作,熟悉Disruptor的应该知道。

reactor.notify发送一个事件,而reactor.on能够接受到这个事件即时响应。

Reactor 的分发器 Dispatchers 类似Akka的分发器

● 分发器管理任务执行,有下面几种:

– ThreadPoolExecutorDispatcher

● 标准的 ThreadPoolExecutor

– BlockingQueueDispatcher

● 能够进行事件轮询

– RingBufferDispatcher

● LMAX Disruptor RingBuffer

– SynchronousDispatcher

Reactor的 Selectors

● Selectors 是一个等式的左边。

– 一个Selector能够被任何对象使用$(obj)创建

(或者: Selectors.object(obj))

– 一个Selector能够从匹配的key中释放数据

– Predicate<T> Selectors 能够创建匹配特定领域准则

(domain-specific criteria)

比如RegexSelector:

reactor.on(R(“some.(.+)”), (Event<String> ev) → {

// s will be 'topic'

String s = ev.getHeaders().get(“group1”);

});

reactor.notify(“some.topic”, Event.wrap(“John Doe”));

其中R(“some.(.*)”)匹配事件发送者“some.topic”。

UriTemplateSelector能够从URI匹配字符串:

reactor.on(U(“/some/{topic}”), (Event<String> ev) → {

// s will be 'topic'

String s = ev.getHeaders().get(“topic”);

});

reactor.notify(“/some/topic”, Event.wrap(“John Doe”));

Reactor 的Stream

● Streams允许基于数据的函数组合composition

– Callback++

– 类似Netflix RxJava Observable, JDK 8 Stream

Stream<String> str;
str.map(String::toUpperCase)
     .filter(new Predicate<String>() {
               public boolean test(String s) { … }
     })
    .consume(s → log.info(“consumed string {}”, s));
<p>

Reactor 的 Promise

允许在Stream之间分享函数

Promise<String> p;
String s = p
        .onSuccess(s → log.info(“consumed string {}”, s))
        .onFailure(t → log.error(t.getMessage(), t))
        .onComplete(t → log.info(“complete”))
        .await(5, SECONDS);

p.map(String::toUpperCase).consume(s → log.info(“UC: {}”, s));
<p>

Reactor 的 Processor

干脆直接将Disruptor API转为Reactor API

对于#UberFastData有超级快性能

Processor<Buffer> proc;
Operation<Buffer> op = proc.prepare();
op.get().append(data).flip();
op.commit();
proc.batch(512, buff → buff.append(data).flip());
<p>

与Spring整合:

首先使用@EnableReactor 激活reactor

@Configuration
@EnableReactor
public class ReactorConfiguration {

  @Bean
   public Reactor input(Environment env) {
        return Reactors.reactor().env(env)
                  .dispatcher(RING_BUFFER).get();
   }

   @Bean
   public Reactor output(Environment env) {
        return Reactors.reactor().env(env)
                  .dispatcher(RING_BUFFER).get();
}
<p>

然后在监听者或观察者写入:

@Component
public class SimpleHandler {
    @Autowired
    private Reactor reactor;

    @Selector(“test.topic”)
    public void onTestTopic(String s) {
             // Handle data
    }
}
<p>

reactor的groovy整合:

@CompileStatic
def welcome(){
    reactor.on('greetings') { String s ->
            reply “hello $s”
            reply “how are you?”
}
reactor.notify 'greetings', 'Jon'
           reactor.send('greetings', 'Stephane'){
                  println it
            cancel()
           }
}
<p>

[该贴被banq于2013-10-27 10:22修改过]