使用RxNetty访问Streaming API案例

Accessing Meetup's streaming API with RxNetty介绍了如何使用响应式Reactive方式访问社会媒体API,通过非堵塞的RxNetty实时加载meetup.com的事件并进行处理,RxNetty是结合Netty框架的强大和RxJava的灵活。

Meetup社会媒体网站提供了一个公共Streaming API,实时发布全世界的每个会议,浏览URL: stream.meetup.com/2/open_events 可以得到这些流事件,每次有人创建了包含JSON的新事件会从服务器推送到浏览器,这意味着从来不会中断,只要我们需要随时可以接受到事件数据,每个会议事件已标准的JSON文档发布,类似如下:


{ "id" : "219088449",
"name" : "Silver Wings Brunch",
"time" : 1421609400000,
"mtime" : 1417814004321,
"duration" : 900000,
"rsvp_limit" : 0,
"status" : "upcoming",
"event_url" : "http://www.meetup.com/Laguna-Niguel-Social-Networking-Meetup/events/219088449/",
"group" : { "name" : "Former Flight Attendants South Orange and North San Diego Co",
"state" : "CA"
...
},
"venue" : { "address_1" : "26860 Ortega Highway",
"city" : "San Juan Capistrano",
"country" : "US"
...
},
"venue_visibility" : "public",
"visibility" : "public",
"yes_rsvp_count" : 1
...
}

每次我们的long-polling HTTP连接(Transfer-Encoding:chunked 响应头)都会推送过来一段段JSON,我们需要分析它们并做一些工作,如果你讨厌callback回调,还可以使用另外一个替换Observable<Event>。

第一步:使用RxNetty接受数据
我们不能使用普通的Http客户端,因为它们只专注于请求/响应方式,在这里没有响应,我们只是简单地永远打开一个连接,然后消费接受数据,RxJava有一个out-of-the-box库包:RxApacheHttp,但是它只假设是text/event-stream 内容类型content type,而我们要使用更底层的RxNetty库,这是一个Netty包装,能够执行TCP/IP(包括Http)和UDP客户端与服务器的通讯,但是它不是面向流而是面向数据包,这样我们假设Meetup每次推送的是一个Netty事件,如下代码:


HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder("stream.meetup.com", 443)
.pipelineConfigurator(new HttpClientPipelineConfigurator<>())
.withSslEngineFactory(DefaultFactories.trustAll())
.build();

final Observable<HttpClientResponse<ByteBuf>> responses =
httpClient.submit(HttpClientRequest.createGet(
"/2/open_events"));
final Observable<ByteBuf> byteBufs =
responses.flatMap(AbstractHttpContentHolder::getContent);
final Observable<String> chunks =
byteBufs.map(content -> content.toString(StandardCharsets.UTF_8));

首先我们创建一个HttpClient,然后设置SSL(trustAll不一定是生产环境最好的设置),然后我们通过submit提交请求,获得一个响应Observable<HttpClientResponse<ByteBuf>>,ByteBuf是Netty的基于连续发送或接受字节bytes抽象,这个可观察者observable会立即告诉我们关于从Meetup接受到每片数据,在释放ByteBuf以后转为String字符,其中包含了前面我们需要的JSON数据。


第二步:JSON文档提取
Netty是强大的,因为它不会将复杂通过漏洞泄漏,每次我们通过TCP/IP接受到什么东西,它就会提示我们,你可以相信它,当服务器发送100字节,客户端的Netty也会提示接受到100字节,TCP/IP栈的数据是一种流,当Meetup发送一个事件,我们也许在chunks的observable中只接受到一个String, 如果是多个事件,chunks会发射多个String,最坏情况,如果Meetup接着同时发送了两个事件,它们也是可以放入一个数据包中,在这种情况,chunks会发送一个String,其中带有两个独立的JSON文档,这样我们就不能假设JSON字符串和接受到的网络数据包是对应的,我们只知道独立的JSON文档代表一行行事件,RxJavaString增加了支持这个方式:


Observable<String> jsonChunks = StringObservable.split(chunks, "\n");

实际上有更简单的StringObservable.byLine(chunks),但是它使用与平台无关的行结束符号。
如下图:


现在我们可以安全地解析jsonChunks发送的每个String了。


第三步:解析JSON
通常JSON解析需要我们手工一个个将JSON转换成POJO对象,这里有另外一个方法,我们只要将样本Meetup的straming API产生的JSON样本放在我们的项目文件中:src/main/json/meetup/event.json,然后使用jsonschema2pojo-maven-plugin,它能根据样本自动产生Java POJO源码。


<plugin>
<groupId>org.jsonschema2pojo</groupId>
<artifactId>jsonschema2pojo-maven-plugin</artifactId>
<version>0.4.7</version>
<configuration>
<sourceDirectory>${basedir}/src/main/json/meetup</sourceDirectory>
<targetPackage>com.nurkiewicz.meetup.generated</targetPackage>
<includeHashcodeAndEquals>true</includeHashcodeAndEquals>
<includeToString>true</includeToString>
<initializeCollections>true</initializeCollections>
<sourceType>JSON</sourceType>
<useCommonsLang3>true</useCommonsLang3>
<useJodaDates>true</useJodaDates>
<useLongIntegers>true</useLongIntegers>
<outputDirectory>target/generated-sources</outputDirectory>
</configuration>
<executions>
<execution>
<id>generate-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
</execution>

这样,Maven将创建Event.java, Venue.java, Group.java,这些都兼容于Jackson:


private Event parseEventJson(String jsonStr) {
try {
return objectMapper.readValue(jsonStr, Event.class);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

这样和我们的前面代码可以配合一起如下:


final Observable<Event> events = jsonChunks.map(this::parseEventJson);


[该贴被banq于2014-12-24 09:16修改过]