面向函数编程
使用Scala的Akka HTTP,Akka Stream和Reactive Mongo建立REST服务
该教程演示了如何使用Akka Http(Spray)创建一个简单的Web服务器,展示如何从Spray.io路由转发到Akka。这篇文章包括以下步骤:
- 将一些虚拟数据放入mongoDB用于测试.
- 使用Akka Http创建服务器,使用简单的异步来处理请求
- 使用订制的流程图来创建一个服务器以处理传入的请求
- 使用Akka-Http创建的http客户端测试这两个服务器
加载数据到MongoDB
首先我们使用一些数据样本:
wget http://jsonstudio.com/wp-content/uploads/2014/02/stocks.zip
在终端启动MongoDB:
mongod --dbpath ./data/
使用mongoimport导入数据:
unzip -c stocks.zip | mongoimport --db akka --collection stocks
使用下面查询确认数据导入成功:
jos@Joss-MacBook-Pro.local:~$ mongo akka
MongoDB shell version: 2.4.8
connecting to: akka
> db.stocks.findOne({},{Company: 1, Country: 1, Ticker:1 } )
{
"_id" : ObjectId("52853800bb1177ca391c17ff"),
"Ticker" : "A",
"Country" : "USA",
"Company" : "Agilent Technologies Inc."
}
>
使用简单异步处理创建一个服务器
为了能让Akka Http正常工作和可以访问Mongo中数据,我们需要一些附加的库包,如下sbt配置:
import com.typesafe.sbt.SbtAspectj._
name := "http-akka"
version := "1.0"
scalaVersion := "2.11.5"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-http-core-experimental" % "1.0-M2",
"org.reactivemongo" %% "reactivemongo" % "0.10.5.0.akka23",
"org.reactivemongo" %% "play2-reactivemongo" % "0.10.5.0.akka23",
"com.typesafe.play" % "play-json_2.11" % "2.4.0-M2",
"ch.qos.logback" % "logback-classic" % "1.1.2"
)
resolvers += "Sonatype Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/"
resolvers += "Typesafe" at "https://repo.typesafe.com/typesafe/releases/"
mainClass in (Compile, run) := Some("Boot")
从上面配置中你可以看到以下依赖库:
- akka-http-core-experimental 包含了所有Http服务器和客户端的库包,这个库包依赖akka-stream。我们也能从系统的class path获得它。
- reactiemongo 允许我们以reactive方式连接mongo
- 加入play2-reactivemongo 和play-json能够让从MongoDB获得BSON转为JSON更容易些。
- logback是日志记录
下面我们看看你如何运行服务器代码,使用这段代码如何查询MongDB,创建一个辅助对象Database:
import reactivemongo.api._
import reactivemongo.api.collections.default.BSONCollection
import reactivemongo.bson.BSONDocument
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
object Database {
val collection = connect()
def connect(): BSONCollection = {
val driver = new MongoDriver
val connection = driver.connection(List("localhost"))
val db = connection("akka")
db.collection("stocks")
}
def findAllTickers(): Future[List[BSONDocument]] = {
val query = BSONDocument()
val filter = BSONDocument("Company" -> 1, "Country" -> 1, "Ticker" -> 1)
// which results in a Future[List[BSONDocument]]
Database.collection
.find(query, filter)
.cursor[BSONDocument]
.collect[List]()
}
def findTicker(ticker: String) : Future[Option[BSONDocument]] = {
val query = BSONDocument("Ticker" -> ticker)
Database.collection
.find(query)
.one
}
}
请注意,这里find函数返回的是一个future,所以调用这个函数不会阻塞,现在我们已经有了一些基本方法,让我们看看使用异步处理器实现的第一个Http服务器:
/**
* Simple Object that starts an HTTP server using akka-http. All requests are handled
* through an Akka flow.
*/
object Boot extends App {
// the actor system to use. Required for flowmaterializer and HTTP.
// passed in implicit
implicit val system = ActorSystem("Streams")
implicit val materializer = FlowMaterializer()
// start the server on the specified interface and port.
val serverBinding2 = Http().bind(interface = "localhost", port = 8091)
serverBinding2.connections.foreach { connection =>
connection.handleWith(Flow[HttpRequest].mapAsync(asyncHandler))
}
}
在这段代码中,我们创建一个http服务器监听端口是8091,我们使用asyncHandler处理每个连接,这个处理器应该返回一个Future[HttpResponse],看看这个处理器函数的内容:
// With an async handler, we use futures. Threads aren't blocked.
def asyncHandler(request: HttpRequest): Future[HttpResponse] = {
// we match the request, and some simple path checking
request match {
// match specific path. Returns all the avaiable tickers
case HttpRequest(GET, Uri.Path("/getAllTickers"), _, _, _) => {
// make a db call, which returns a future.
// use for comprehension to flatmap this into
// a Future[HttpResponse]
for {
input <- Database.findAllTickers
} yield {
HttpResponse(entity = convertToString(input))
}
}
// match GET pat. Return a single ticker
case HttpRequest(GET, Uri.Path("/get"), _, _, _) => {
// next we match on the query paramter
request.uri.query.get("ticker") match {
// if we find the query parameter
case Some(queryParameter) => {
// query the database
val ticker = Database.findTicker(queryParameter)
// use a simple for comprehension, to make
// working with futures easier.
for {
t <- ticker
} yield {
t match {
case Some(bson) => HttpResponse(entity = convertToString(bson))
case None => HttpResponse(status = StatusCodes.OK)
}
}
}
// if the query parameter isn't there
case None => Future(HttpResponse(status = StatusCodes.OK))
}
}
// Simple case that matches everything, just return a not found
case HttpRequest(_, _, _, _, _) => {
Future[HttpResponse] {
HttpResponse(status = StatusCodes.NotFound)
}
}
}
}
这段代码很简单,我们使用模式匹配来匹配一个用户调用的URL,使用Database对象来查询mongo,注意到这个调用是convertToString,这是使用play库包将BSON转换到JSON的帮助工具:
def convertToString(input: List[BSONDocument]) : String = {
input
.map(f => convertToString(f))
.mkString("[", ",", "]")
}
def convertToString(input: BSONDocument) : String = {
Json.stringify(BSONFormats.toJSON(input))
}
启动服务器用浏览器访问:http://localhost:8091/get?ticker=ABCB
使用定制流程图处理请求的服务器
Akka-http内部使用akka-stream处理http连接,这意味着我们很容易通过一个reactive方式使用akka-stream,对于一个线性流程,我们能够使用akka提供的标准流程api,使用akka-stream提供的DSL,你很容易创建一个复杂的流程事件并行处理图。
我们在8090端口建立一个新的服务器绑定:
object Boot extends App {
// the actor system to use. Required for flowmaterializer and HTTP.
// passed in implicit
implicit val system = ActorSystem("Streams")
implicit val materializer = FlowMaterializer()
// start the server on the specified interface and port.
val serverBinding1 = Http().bind(interface = "localhost", port = 8090)
serverBinding1.connections.foreach { connection =>
connection.handleWith(broadCastMergeFlow)
}
}
这个服务器绑定是和之前案例差不多方式,主要区别是这次我们没有将请求处理传入一个处理器,而是,而是指定了一个名为broadCastMergeFlow的流程实例,其内部流程merge如下:
val bCast = Broadcast[HttpRequest]
// some basic steps that each retrieve a different ticket value (as a future)
val step1 = Flow[HttpRequest].mapAsync[String](getTickerHandler("GOOG"))
val step2 = Flow[HttpRequest].mapAsync[String](getTickerHandler("AAPL"))
val step3 = Flow[HttpRequest].mapAsync[String](getTickerHandler("MSFT"))
// We'll use the source and output provided by the http endpoint
val in = UndefinedSource[HttpRequest]
val out = UndefinedSink[HttpResponse]
// when an element is available on one of the inputs, take
// that one, igore the rest
val merge = Merge[String]
// since merge doesn't output a HttpResponse add an additional map step.
val mapToResponse = Flow[String].map[HttpResponse](
(inp:String) => HttpResponse(status = StatusCodes.OK, entity = inp)
)
// define another flow. This uses the merge function which
// takes the first available response
val broadCastMergeFlow = Flow[HttpRequest, HttpResponse]() {
implicit builder =>
bCast ~> step1 ~> merge
in ~> bCast ~> step2 ~> merge ~> mapToResponse ~> out
bCast ~> step3 ~> merge
(in, out)
}
最重要的是代码最后几行,我们定义了一个图,指明服务器如何处理请求,首先,我们将进来的请求广播到三个并行刘处理,然后调用我们的数据库获得一个ticket,然后将结果merge在一起,创建一个响应,我们获得的ticket将可能是 GOOG, AAPL or MSFT等数据,前后顺序取决于哪个步骤最快了,为了看到结果,我们加入一个sleep到getTickerHandler:
def getTickerHandler(tickName: String)(request: HttpRequest): Future[String] = {
// query the database
val ticker = Database.findTicker(tickName)
Thread.sleep(Math.random() * 1000 toInt)
// use a simple for comprehension, to make
// working with futures easier.
for {
t <- ticker
} yield {
t match {
case Some(bson) => convertToString(bson)
case None => ""
}
}
}
Akka-stream提供了许多疾病构建块,你能使用它们创建流程,这里我们要压缩所有步骤在一起,因此创建流程如下:
// waits for events on the three inputs and returns a response
val zip = ZipWith[String, String, String, HttpResponse] (
(inp1, inp2, inp3) => new HttpResponse(status = StatusCodes.OK,entity = inp1 + inp2 + inp3)
// define a flow which broadcasts the request to the three
// steps, and uses the zipWith to combine the elements before
val broadCastZipFlow = Flow[HttpRequest, HttpResponse]() {
implicit builder =>
bCast ~> step1 ~> zip.input1
in ~> bCast ~> step2 ~> zip.input2 ~> out
bCast ~> step3 ~> zip.input3
(in, out)
}
当调用10次,结果输出如下:
{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217
{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217
{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217
{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282
{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217
{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217
{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282
{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217
{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217
{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282
使用Akka-http建立的http客户端测试这两个服务器
Akka-http提供了易于在http客户端使用stream/flow处理消息流,下面是完整代码:
import akka.actor.ActorSystem
import akka.http.Http
import akka.stream.FlowMaterializer
import akka.http.model._
import akka.stream.scaladsl._
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.FlowGraphImplicits._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
/**
* Simple HTTP client created with akka-http
*/
object Client extends App {
// the actor system to use. Required for flowmaterializer and HTTP.
// passed in implicit
implicit val system = ActorSystem("ServerTest")
implicit val materializer = FlowMaterializer()
val httpClient1 = Http(system).outgoingConnection("localhost", 8090).flow
val httpClient2 = Http(system).outgoingConnection("localhost", 8091).flow
// define a sink that will process the answer
// we could also process this as a flow
val printChunksConsumer = Sink.foreach[HttpResponse] { res =>
if(res.status == StatusCodes.OK) {
println("Recieved response : " + res);
res.entity.getDataBytes().map {
chunk =>
System.out.println("Chunk: " + chunk.decodeString(HttpCharsets.`UTF-8`.value).substring(0, 80))
}.to(Sink.ignore).run()
} else
println(res.status)
}
// we need to set allow cycles since internally the httpclient
// has some cyclic flows (apparently)
// we construct a sink, to which we connect a later to define source.
val reqFlow2: Sink[HttpRequest] = Sink[HttpRequest]() { implicit b =>
b.allowCycles()
val source = UndefinedSource[HttpRequest]
val bcast = Broadcast[HttpRequest]
val concat = Concat[HttpResponse]
// simple graph. Duplicate the request, send twice.
// concat the result.
bcast ~> httpClient1 ~> concat.first
source ~> bcast ~> httpClient1 ~> concat.second ~> printChunksConsumer
source
}
// make two calls, both return futures, first one shows direct linked sinks and
// sources. Second one makes yse if our graph.
// make number of calls
val res = 1 to 5 map( i => {
Source.single(HttpRequest()).to(reqFlow2).run().get(printChunksConsumer)
})
val f = Future.sequence(res)
// make some calls with filled in request URI
val f3 = Source.single(HttpRequest(uri = Uri("/getAllTickers"))).via(httpClient2).runWith(printChunksConsumer)
val f4 = Source.single(HttpRequest(uri = Uri("/get?ticker=ADAT"))).via(httpClient2).runWith(printChunksConsumer)
val f5 = Source.single(HttpRequest(uri = Uri("/get?tikcer=FNB"))).via(httpClient2).runWith(printChunksConsumer)
for {
f2Result <- f
f2Result <- f3
f2Result <- f4
f2Result <- f5
} yield ({
println("All calls done")
system.shutdown()
system.awaitTermination()
}
)
}