SOA专题

使用vert.x 2.0, RxJava 和 mongoDB创建一个简单RESTful服务

  这是使用vertx异步编程创建一个RESTful服务,该项目源码下载Github,注意有以下步骤:

  • 使用Maven创建空vert.x项目
  • 在IntelliJ创建一个简单的HTTP服务器
  • 使用vert.x 的MongoDB persistor模块从MongoDB加载数据
  • 通过一个REST接口暴露
  • 用RxJava观察者替代回调

创建项目

创建一个vertx项目,使用Maven命令:

mvn archetype:generate -Dfilter=io.vertx:

将会显示:

[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Maven Stub Project (No POM) 1
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] >>> maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom >>>
[INFO]
[INFO] <<< maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom <<<
[INFO]
[INFO] --- maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom ---
[INFO] Generating project in Interactive mode
[INFO] No archetype defined. Using maven-archetype-quickstart (org.apache.maven.archetypes:maven-archetype-quickstart:1.0)
Choose archetype:
1: remote -> io.vertx:vertx-maven-archetype (-)
Choose a number or apply filter (format: [groupId:]artifactId, case sensitive contains): :

选择1,然后出现选择版本,选择 2.0.1-final:

Choose io.vertx:vertx-maven-archetype version:
1: 1.0.0-beta1
2: 1.0.0-beta2
3: 1.0.0-beta3
4: 1.0.0-CR1
5: 1.0.0-CR2
6: 2.0.0-final
7: 2.0.1-final
Choose a number: 7:

选择7,回车后,出现定义项目名称和版本:

Define value for property 'groupId': : org.smartjava
Define value for property 'artifactId': : vertx-demo-1
Define value for property 'version': 1.0-SNAPSHOT: :
Define value for property 'package': org.smartjava: :
Confirm properties configuration:
groupId: org.smartjava
artifactId: vertx-demo-1
version: 1.0-SNAPSHOT
package: org.smartjava
Y: : Y

输入你自己的名称,然后输入Y,项目就创建完成:

[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: vertx-maven-archetype:2.0.1-final
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: org.smartjava
[INFO] Parameter: artifactId, Value: vertx-demo-1
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: package, Value: org.smartjava
[INFO] Parameter: packageInPathFormat, Value: org/smartjava
[INFO] Parameter: package, Value: org.smartjava
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: groupId, Value: org.smartjava
[INFO] Parameter: artifactId, Value: vertx-demo-1
[INFO] project created from Archetype in dir: /Users/jos/Dev/playground/vertx-demo-1
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 5:37.710s
[INFO] Finished at: Sun Nov 24 14:55:12 CET 2013
[INFO] Final Memory: 9M/24M
[INFO] ------------------------------------------------------------------------

为了测试是否成功,可到创建的目录下运行mvn install 这会下载需要的库包,运行一些测试,安装你的项目到本地仓储,现在我们已经有了一个maven项目。

导入IntelliJ创建http服务器

可以使用Eclipse,两者差不多,在IntelliJ中通过'File->Import Project 导入项目,导入后注意再加入vert.x之类的库包。

在IDE中运行该项目,它能创建一个简单的Http服务器,我们可以从浏览器看到一些输出,便于调试,下面创建 PingVerticle.java代码:

import org.vertx.java.core.Handler;

import org.vertx.java.core.http.HttpServerRequest;

import org.vertx.java.platform.Verticle;

 

public class PingVerticle extends Verticle {

 

  public void start() {

 

    vertx.createHttpServer().requestHandler(new Handler<HttpServerRequest>() {

        @Override

        public void handle(HttpServerRequest httpServerRequest) {

            httpServerRequest.response().end("Hello smartjava");

        }

    }).listen(8888);

 

 

    container.logger().info("Webserver started, listening on port: 8888");

 

  }

}

运行该类,然后在浏览器输入localhost:8888, 可以看到输出结果:Hello smartjava

 

从mongoDB 加载数据

安装启动mongoDB,网上教程很多,启动后如下:

$ mkdir data
$ mongod --dbpath ./data/
Sun Nov 24 16:23:51.765 [initandlisten] MongoDB starting : pid=77755 port=27017 dbpath=./data/ 64-bit host=Joss-MacBook-Pro.local
Sun Nov 24 16:23:51.765 [initandlisten] db version v2.4.5
Sun Nov 24 16:23:51.765 [initandlisten] git version: nogitversion
Sun Nov 24 16:23:51.765 [initandlisten] build info: Darwin Joss-MacBook-Pro.local 12.4.0 Darwin Kernel Version 12.4.0: Wed May 1 17:57:12 PDT 2013; root:xnu-2050.24.15~1/RELEASE_X86_64 x86_64 BOOST_LIB_VERSION=1_54
Sun Nov 24 16:23:51.765 [initandlisten] allocator: tcmalloc
Sun Nov 24 16:23:51.765 [initandlisten] options: { dbpath: "./data/" }
Sun Nov 24 16:23:51.766 [initandlisten] journal dir=./data/journal
Sun Nov 24 16:23:51.766 [initandlisten] recover : no journal files present, no recovery needed
Sun Nov 24 16:23:51.779 [FileAllocator] allocating new datafile ./data/local.ns, filling with zeroes...
Sun Nov 24 16:23:51.779 [FileAllocator] creating directory ./data/_tmp
Sun Nov 24 16:23:51.812 [FileAllocator] done allocating datafile ./data/local.ns, size: 16MB, took 0.031 secs
Sun Nov 24 16:23:51.853 [FileAllocator] allocating new datafile ./data/local.0, filling with zeroes...
Sun Nov 24 16:23:52.254 [FileAllocator] done allocating datafile ./data/local.0, size: 64MB, took 0.4 secs
Sun Nov 24 16:23:52.260 [initandlisten] command local.$cmd command: { create: "startup_log", size: 10485760, capped: true } ntoreturn:1 keyUpdates:0 reslen:37 480ms
Sun Nov 24 16:23:52.260 [initandlisten] waiting for connections on port 27017
Sun Nov 24 16:23:52.260 [websvr] admin web console waiting for connections on port 28017

我们使用的数据样本是http://media.mongodb.org/zips.json,可下载这个文本然后倒入到MongoDB中:

$ wget http://media.mongodb.org/zips.json
--2013-11-24 16:25:45-- http://media.mongodb.org/zips.json
Resolving media.mongodb.org... 54.230.131.14, 54.230.131.51, 54.230.128.129, ...
Connecting to media.mongodb.org|54.230.131.14|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2871006 (2.7M) [application/json]
Saving to: `zips.json'

100%[======================================>] 2,871,006 2.20M/s in 1.2s

2013-11-24 16:25:47 (2.20 MB/s) - `zips.json' saved [2871006/2871006]

$ mongoimport --db vertx --collection zips --file ./zips.json
connected to: 127.0.0.1
Sun Nov 24 16:26:28.337 check 9 29470
Sun Nov 24 16:26:28.458 imported 29470 objects

如果你已经安装了IntelliJ 的mongoDB插件,可从开发工具中看到数据结果。

为了从MongoDB中加载数据,需要mongodb persistor 库包,这个模块的下载地址是:https://github.com/vert-x/mod-mongo-persistor/ 配置Maven:

<dependency>

          <groupId>io.vertx</groupId>

          <artifactId>mod-mongo-persistor</artifactId>

          <version>2.1.0-SNAPSHOT</version>

          <scope>compile</scope>

      </dependency>

代码调用如下:

public void start() {

 

        // load the general config object, loaded by using -config on command line

        JsonObject appConfig = container.config();

 

        // deploy the mongo-persistor module, which we'll use for persistence

        container.deployModule("io.vertx~mod-mongo-persistor~2.1.0-SNAPSHOT", appConfig.getObject("mongo-persistor"));

 

        // create and run the server

        vertx.createHttpServer().requestHandler(new Handler<HttpServerRequest>() {

            @Override

            public void handle(final HttpServerRequest httpServerRequest) {

 

                // we send the response from the mongo query back to the client.

                // first create the query

                JsonObject matcher = new JsonObject().putString("state", "AL");

                JsonObject json = new JsonObject().putString("collection", "zips")

                        .putString("action", "find")

                        .putObject("matcher", matcher);

 

                // send it over the bus

                vertx.eventBus().send("mongodb-persistor", json, new Handler<Message<JsonObject>>() {

 

                    @Override

                    public void handle(Message<JsonObject> message) {

                        // send the response back, encoded as string

                        httpServerRequest.response().end(message.body().encodePrettily());

                    }

                });

            }

        }).listen(8888);

 

        // output that the server is started

        container.logger().info("Webserver started, listening on port: 8888");

    }

通过浏览器访问简单的Http服务器可以得到如下:

说明我们数据库访问一切正常。

REST接口暴露数据

下面我们希望通过RESTful接口暴露这些数据,我们定义URL如下:

* GET /zips
显示所有邮编
* GET /zips/:id
显示某个邮编
* GET /zips?state=:state&city=:city
简单搜索
* POST /zips/:id
更新一个存在的邮编

实现这四个功能,代码大概如下:

RouteMatcher matcher = new RouteMatcher();

//搜索

matcher.get("/zips", new Handler<HttpServerRequest>() {...}

 //查询指定的邮编

matcher.get("/zips/:id", new Handler<HttpServerRequest>() {...}

//更新一个存在的邮编

matcher.post("/zips/:id", new Handler<HttpServerRequest>() {...}

vertx.createHttpServer().requestHandler(matcher).listen(8888);

 

具体实现可见源码项目。为了后面讲解,这里贴出post案例代码:

matcher.post("/zips/:id", new Handler<HttpServerRequest>() {

            public void handle(final HttpServerRequest req) {

 

                // process the body

                req.bodyHandler(new Handler<Buffer>() {

 

                    @Override

                    public void handle(Buffer event) {

 

                        final String body = event.getString(0,event.length());

 

                        // 创建程序

                        JsonObject newObject = new JsonObject(body);

                        JsonObject matcher = new JsonObject().putString("_id", req.params().get("id"));

                        JsonObject json = new JsonObject().putString("collection", "zips")

                                .putString("action", "update")

                                .putObject("criteria", matcher)

                                .putBoolean("upsert", false)

                                .putBoolean("multi",false)

                                .putObject("objNew",newObject);

 

                        // 使用回调函数访问数据库

                       vertx.eventBus().send("mongodb-persistor", json,
new Handler<Message<JsonObject>>() {

                                  //  数据库访问成功回调此方法

                            public void handle(Message<JsonObject> event) {

                                  //  将数据输出                   

                       

                                req.response().end(body);

                            }

                        });

                    }

                });

            }

        });


 

使用RxJava observers替代回调

首先导入RxJava包:

<dependency>

          <groupId>io.vertx</groupId>

          <artifactId>mod-rxjava</artifactId>

          <version>1.0.0-beta2-SNAPSHOT</version>

          <scope>compile</scope>

      </dependency>

使用RxJava Observables容易组合流程和异步数据序列,在前面 'post' 代码中我们有一个嵌入的回调, 使用observables 很容易完成这些,代码更加简单。

我们完成的任务如下:

首先我们获得前端要更新的新数据,查询数据库是否存在,有我们就用新数据更新,成功后然后我们返回新数据。

Post处理过程:

  1. 首先获取请求内容
  2. 有了内容更新数据库字段
  3. 成功后从数据库获得最新版本
  4. 将新版本内容发送回响应

代码如下:

matcher.post("/rxzips/:id", new Handler<HttpServerRequest>() {

            public void handle(final HttpServerRequest req) {

                //创建一个an observable.

                Observable<Buffer> reqDataObservable = RxSupport.toObservable(req);

 

                //获得要更新的数据

                Observable<RxMessage<JsonObject>> updateObservable = reqDataObservable.flatMap(new Func1<Buffer, Observable<RxMessage<JsonObject>>>() {

                    @Override

                    public Observable<RxMessage<JsonObject>> call(Buffer buffer) {

                        System.out.println("buffer = " + buffer);

                        // 创建消息

                        JsonObject newObject = new JsonObject(buffer.getString(0, buffer.length()));

                        JsonObject matcher = new JsonObject().putString("_id", req.params().get("id"));

                        JsonObject json = new JsonObject().putString("collection", "zips")

                                .putString("action", "update")

                                .putObject("criteria", matcher)

                                .putBoolean("upsert", false)

                                .putBoolean("multi", false)

                                .putObject("objNew", newObject);

 

                        // 返回一个 observable 用于下一个函数

                        return rxEventBus.send("mongodb-persistor", json);

                    }

                });

 

                // 使用之前的获得数据,进行数据库更新

                Observable<RxMessage<JsonObject>> getLatestObservable = updateObservable.flatMap(
new Func1<RxMessage<JsonObject>,
Observable<RxMessage<JsonObject>>>() {

               

                    public Observable<RxMessage<JsonObject>> call(
RxMessage<JsonObject> jsonObjectRxMessage) {

                        System.out.println("jsonObjectRxMessage = " + jsonObjectRxMessage);

                        JsonObject matcher = new JsonObject().putString("_id", req.params().get("id"));

                        JsonObject json2 = new JsonObject().putString("collection", "zips")

                                .putString("action", "find")

                                .putObject("matcher", matcher);

                       //再返回一个observable 用于下一个函数

                        return rxEventBus.send("mongodb-persistor", json2);

                    }

                });

 

                // 我们已经得到数据库更新完成的最新的数据,返回给前端。

                getLatestObservable.subscribe(new Action1<RxMessage<JsonObject>>() {

                    @Override

                    public void call(RxMessage<JsonObject> jsonObjectRxMessage) {

                        req.response().end(jsonObjectRxMessage.body().encodePrettily());

                    }

                });

            }

        });

分开讲解如下:

reqDataObservable = RxSupport.toObservable(req)
当数据在我们的请求的缓冲区中可用后会得到通知。

有了数据,我们要做些事情:

reqDataObservable.flatMap

这使得我们可以指定对某些数据创建的可观察对象。作为代替嵌套的回调,我们只是通过不同的异步调用指定了数据流。当接收数据时,我们用它来更新数据库。

这里我们使用了rxEventBus.send,它不会立即调用,但是一旦调用就返回新的一个可观察者observable。这个新的可观察者再被下一个函数使用。

比如我们第一次flatMap得到updateObservable可观察者对象,然再从这个updateObservable.flatMap中进行rxEventBus.send,返回了一个getLatestObservable新的可观察者对象,这个getLatestObservable又被以getLatestObservable.subscribe调用,返回给前端更新完成的数据。

总结:

  • 我们使用observables 使得异步执行更容易。
  • 我们使用 flatmap将来自一个异步调用的结果传入另外一个,避免了嵌套回调。
  • 使用 rxSupport和rxEventbus (rxJava vert.x 扩展)容易创建rxJava 的observables
  • 在一个observable链条中我们获取最后一个以便获得查询结果。

本项目可在github下

RxJava专题

Reactive

Vert.x入门教程

并发编程

异步编程

EDA