使用RxNetty访问Streaming API案例

14-12-24 banq
              

Accessing Meetup's streaming API with RxNetty介绍了如何使用响应式Reactive方式访问社会媒体API,通过非堵塞的RxNetty实时加载meetup.com的事件并进行处理,RxNetty是结合Netty框架的强大和RxJava的灵活。

Meetup社会媒体网站提供了一个公共Streaming API,实时发布全世界的每个会议,浏览URL: stream.meetup.com/2/open_events 可以得到这些流事件,每次有人创建了包含JSON的新事件会从服务器推送到浏览器,这意味着从来不会中断,只要我们需要随时可以接受到事件数据,每个会议事件已标准的JSON文档发布,类似如下:

{ "id" : "219088449",
  "name" : "Silver Wings Brunch",
  "time" : 1421609400000,
  "mtime" : 1417814004321,
  "duration" : 900000,
  "rsvp_limit" : 0,
  "status" : "upcoming",
  "event_url" : "http://www.meetup.com/Laguna-Niguel-Social-Networking-Meetup/events/219088449/",
  "group" : { "name" : "Former Flight Attendants South Orange and North San Diego Co",
              "state" : "CA"
              ...
  },
  "venue" : { "address_1" : "26860 Ortega Highway",
              "city" : "San Juan Capistrano",
              "country" : "US"
              ...
  },
  "venue_visibility" : "public",
  "visibility" : "public",
  "yes_rsvp_count" : 1
  ...
}
<p>

每次我们的long-polling HTTP连接(Transfer-Encoding:chunked 响应头)都会推送过来一段段JSON,我们需要分析它们并做一些工作,如果你讨厌callback回调,还可以使用另外一个替换Observable<Event>。

第一步:使用RxNetty接受数据

我们不能使用普通的Http客户端,因为它们只专注于请求/响应方式,在这里没有响应,我们只是简单地永远打开一个连接,然后消费接受数据,RxJava有一个out-of-the-box库包:RxApacheHttp,但是它只假设是text/event-stream 内容类型content type,而我们要使用更底层的RxNetty库,这是一个Netty包装,能够执行TCP/IP(包括Http)和UDP客户端与服务器的通讯,但是它不是面向流而是面向数据包,这样我们假设Meetup每次推送的是一个Netty事件,如下代码:

HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder("stream.meetup.com", 443)
        .pipelineConfigurator(new HttpClientPipelineConfigurator<>())
        .withSslEngineFactory(DefaultFactories.trustAll())
        .build();
 
final Observable<HttpClientResponse<ByteBuf>> responses = 
    httpClient.submit(HttpClientRequest.createGet("/2/open_events"));
final Observable<ByteBuf> byteBufs = 
    responses.flatMap(AbstractHttpContentHolder::getContent);
final Observable<String> chunks = 
    byteBufs.map(content -> content.toString(StandardCharsets.UTF_8));
<p>

首先我们创建一个HttpClient,然后设置SSL(trustAll不一定是生产环境最好的设置),然后我们通过submit提交请求,获得一个响应Observable<HttpClientResponse<ByteBuf>>,ByteBuf是Netty的基于连续发送或接受字节bytes抽象,这个可观察者observable会立即告诉我们关于从Meetup接受到每片数据,在释放ByteBuf以后转为String字符,其中包含了前面我们需要的JSON数据。

第二步:JSON文档提取

Netty是强大的,因为它不会将复杂通过漏洞泄漏,每次我们通过TCP/IP接受到什么东西,它就会提示我们,你可以相信它,当服务器发送100字节,客户端的Netty也会提示接受到100字节,TCP/IP栈的数据是一种流,当Meetup发送一个事件,我们也许在chunks的observable中只接受到一个String, 如果是多个事件,chunks会发射多个String,最坏情况,如果Meetup接着同时发送了两个事件,它们也是可以放入一个数据包中,在这种情况,chunks会发送一个String,其中带有两个独立的JSON文档,这样我们就不能假设JSON字符串和接受到的网络数据包是对应的,我们只知道独立的JSON文档代表一行行事件,RxJavaString增加了支持这个方式:

Observable<String> jsonChunks = StringObservable.split(chunks, "\n");
<p>

实际上有更简单的StringObservable.byLine(chunks),但是它使用与平台无关的行结束符号。

如下图:


现在我们可以安全地解析jsonChunks发送的每个String了。

第三步:解析JSON

通常JSON解析需要我们手工一个个将JSON转换成POJO对象,这里有另外一个方法,我们只要将样本Meetup的straming API产生的JSON样本放在我们的项目文件中:src/main/json/meetup/event.json,然后使用jsonschema2pojo-maven-plugin,它能根据样本自动产生Java POJO源码。

<plugin>
    <groupId>org.jsonschema2pojo</groupId>
    <artifactId>jsonschema2pojo-maven-plugin</artifactId>
    <version>0.4.7</version>
    <configuration>
        <sourceDirectory>${basedir}/src/main/json/meetup</sourceDirectory>
        <targetPackage>com.nurkiewicz.meetup.generated</targetPackage>
        <includeHashcodeAndEquals>true</includeHashcodeAndEquals>
        <includeToString>true</includeToString>
        <initializeCollections>true</initializeCollections>
        <sourceType>JSON</sourceType>
        <useCommonsLang3>true</useCommonsLang3>
        <useJodaDates>true</useJodaDates>
        <useLongIntegers>true</useLongIntegers>
        <outputDirectory>target/generated-sources</outputDirectory>
    </configuration>
    <executions>
        <execution>
            <id>generate-sources</id>
            <phase>generate-sources</phase>
            <goals>
                <goal>generate</goal>
            </goals>
        </execution>
<p>

这样,Maven将创建Event.java, Venue.java, Group.java,这些都兼容于Jackson:

private Event parseEventJson(String jsonStr) {
    try {
        return objectMapper.readValue(jsonStr, Event.class);
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
}
<p>

这样和我们的前面代码可以配合一起如下:

final Observable<Event> events = jsonChunks.map(this::parseEventJson);

<p>

[该贴被banq于2014-12-24 09:16修改过]

              

2