面向函数编程

使用Scala的Akka HTTP,Akka Stream和Reactive Mongo建立REST服务

  该教程演示了如何使用Akka Http(Spray)创建一个简单的Web服务器,展示如何从Spray.io路由转发到Akka。这篇文章包括以下步骤:

  • 将一些虚拟数据放入mongoDB用于测试.
  • 使用Akka Http创建服务器,使用简单的异步来处理请求
  • 使用订制的流程图来创建一个服务器以处理传入的请求
  • 使用Akka-Http创建的http客户端测试这两个服务器

 

加载数据到MongoDB

  首先我们使用一些数据样本

wget http://jsonstudio.com/wp-content/uploads/2014/02/stocks.zip

在终端启动MongoDB:

mongod --dbpath ./data/

使用mongoimport导入数据:

unzip -c stocks.zip | mongoimport --db akka --collection stocks

使用下面查询确认数据导入成功:

jos@Joss-MacBook-Pro.local:~$ mongo akka     

MongoDB shell version: 2.4.8

connecting to: akka

> db.stocks.findOne({},{Company: 1, Country: 1, Ticker:1 } )

{

        "_id" : ObjectId("52853800bb1177ca391c17ff"),

        "Ticker" : "A",

        "Country" : "USA",

        "Company" : "Agilent Technologies Inc."

}

 

使用简单异步处理创建一个服务器

  为了能让Akka Http正常工作和可以访问Mongo中数据,我们需要一些附加的库包,如下sbt配置:

import com.typesafe.sbt.SbtAspectj._

 

name := "http-akka"

 

version := "1.0"

 

scalaVersion := "2.11.5"

 

libraryDependencies ++= Seq(

  "com.typesafe.akka" %% "akka-http-core-experimental" % "1.0-M2",

  "org.reactivemongo" %% "reactivemongo" % "0.10.5.0.akka23",

  "org.reactivemongo" %% "play2-reactivemongo" % "0.10.5.0.akka23",

  "com.typesafe.play" % "play-json_2.11" % "2.4.0-M2",

  "ch.qos.logback" % "logback-classic" % "1.1.2"

)

 

resolvers += "Sonatype Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/"

 

resolvers += "Typesafe" at "https://repo.typesafe.com/typesafe/releases/"

 

mainClass in (Compile, run) := Some("Boot")

从上面配置中你可以看到以下依赖库:

  • akka-http-core-experimental 包含了所有Http服务器和客户端的库包,这个库包依赖akka-stream。我们也能从系统的class path获得它。
  • reactiemongo 允许我们以reactive方式连接mongo
  • 加入play2-reactivemongo 和play-json能够让从MongoDB获得BSON转为JSON更容易些。
  • logback是日志记录

下面我们看看你如何运行服务器代码,使用这段代码如何查询MongDB,创建一个辅助对象Database:

import reactivemongo.api._

import reactivemongo.api.collections.default.BSONCollection

import reactivemongo.bson.BSONDocument

import scala.concurrent.ExecutionContext.Implicits.global

import scala.concurrent.Future

 

object Database {

 

  val collection = connect()

 

 

  def connect(): BSONCollection = {

 

    val driver = new MongoDriver

    val connection = driver.connection(List("localhost"))

 

    val db = connection("akka")

    db.collection("stocks")

  }

 

  def findAllTickers(): Future[List[BSONDocument]] = {

    val query = BSONDocument()

    val filter = BSONDocument("Company" -> 1, "Country" -> 1, "Ticker" -> 1)

 

    // which results in a Future[List[BSONDocument]]

    Database.collection

      .find(query, filter)

      .cursor[BSONDocument]

      .collect[List]()

  }

 

  def findTicker(ticker: String) : Future[Option[BSONDocument]] = {

    val query = BSONDocument("Ticker" -> ticker)

 

    Database.collection

      .find(query)

      .one

  }

 

}

请注意,这里find函数返回的是一个future,所以调用这个函数不会阻塞,现在我们已经有了一些基本方法,让我们看看使用异步处理器实现的第一个Http服务器:

/**

 * Simple Object that starts an HTTP server using akka-http. All requests are handled

 * through an Akka flow.

 */

object Boot extends App {

 

  // the actor system to use. Required for flowmaterializer and HTTP.

  // passed in implicit

  implicit val system = ActorSystem("Streams")

  implicit val materializer = FlowMaterializer()

 

  // start the server on the specified interface and port.

  val serverBinding2 = Http().bind(interface = "localhost", port = 8091)

  serverBinding2.connections.foreach { connection =>

    connection.handleWith(Flow[HttpRequest].mapAsync(asyncHandler))

   }

 }

在这段代码中,我们创建一个http服务器监听端口是8091,我们使用asyncHandler处理每个连接,这个处理器应该返回一个Future[HttpResponse],看看这个处理器函数的内容:

// With an async handler, we use futures. Threads aren't blocked.

def asyncHandler(request: HttpRequest): Future[HttpResponse] = {

 

  // we match the request, and some simple path checking

  request match {

 

    // match specific path. Returns all the avaiable tickers

    case HttpRequest(GET, Uri.Path("/getAllTickers"), _, _, _) => {

 

      // make a db call, which returns a future.

      // use for comprehension to flatmap this into

      // a Future[HttpResponse]

      for {

        input <- Database.findAllTickers

      } yield {

        HttpResponse(entity = convertToString(input))

      }

    }

 

    // match GET pat. Return a single ticker

    case HttpRequest(GET, Uri.Path("/get"), _, _, _) => {

 

      // next we match on the query paramter

      request.uri.query.get("ticker") match {

 

          // if we find the query parameter

          case Some(queryParameter) => {

 

            // query the database

            val ticker = Database.findTicker(queryParameter)

 

            // use a simple for comprehension, to make

            // working with futures easier.

            for {

              t <- ticker

            } yield  {

              t match {

                case Some(bson) => HttpResponse(entity = convertToString(bson))

                case None => HttpResponse(status = StatusCodes.OK)

              }

            }

          }

 

          // if the query parameter isn't there

          case None => Future(HttpResponse(status = StatusCodes.OK))

        }

    }

 

    // Simple case that matches everything, just return a not found

    case HttpRequest(_, _, _, _, _) => {

      Future[HttpResponse] {

        HttpResponse(status = StatusCodes.NotFound)

      }

    }

  }

}

这段代码很简单,我们使用模式匹配来匹配一个用户调用的URL,使用Database对象来查询mongo,注意到这个调用是convertToString,这是使用play库包将BSON转换到JSON的帮助工具:

def convertToString(input: List[BSONDocument]) : String = {

   input

     .map(f => convertToString(f))

     .mkString("[", ",", "]")

 }

 

 def convertToString(input: BSONDocument) : String = {

   Json.stringify(BSONFormats.toJSON(input))

 }

启动服务器用浏览器访问:http://localhost:8091/get?ticker=ABCB

 

使用定制流程图处理请求的服务器

  Akka-http内部使用akka-stream处理http连接,这意味着我们很容易通过一个reactive方式使用akka-stream,对于一个线性流程,我们能够使用akka提供的标准流程api,使用akka-stream提供的DSL,你很容易创建一个复杂的流程事件并行处理图。

我们在8090端口建立一个新的服务器绑定:

object Boot extends App {

 

  // the actor system to use. Required for flowmaterializer and HTTP.

  // passed in implicit

  implicit val system = ActorSystem("Streams")

  implicit val materializer = FlowMaterializer()

 

  // start the server on the specified interface and port.

  val serverBinding1 = Http().bind(interface = "localhost", port = 8090)

 

  serverBinding1.connections.foreach { connection =>

    connection.handleWith(broadCastMergeFlow)

   }

 }

这个服务器绑定是和之前案例差不多方式,主要区别是这次我们没有将请求处理传入一个处理器,而是,而是指定了一个名为broadCastMergeFlow的流程实例,其内部流程merge如下:

val bCast = Broadcast[HttpRequest]

 

// some basic steps that each retrieve a different ticket value (as a future)

val step1 = Flow[HttpRequest].mapAsync[String](getTickerHandler("GOOG"))

val step2 = Flow[HttpRequest].mapAsync[String](getTickerHandler("AAPL"))

val step3 = Flow[HttpRequest].mapAsync[String](getTickerHandler("MSFT"))

 

// We'll use the source and output provided by the http endpoint

val in = UndefinedSource[HttpRequest]

val out = UndefinedSink[HttpResponse]

 // when an element is available on one of the inputs, take

// that one, igore the rest

val merge = Merge[String]

// since merge doesn't output a HttpResponse add an additional map step.

val mapToResponse = Flow[String].map[HttpResponse](

(inp:String) => HttpResponse(status = StatusCodes.OK, entity = inp)

)

 

 

  // define another flow. This uses the merge function which

  // takes the first available response

  val broadCastMergeFlow = Flow[HttpRequest, HttpResponse]() {

    implicit builder =>

 

            bCast ~> step1 ~> merge

      in ~> bCast ~> step2 ~> merge ~> mapToResponse ~> out

            bCast ~> step3 ~> merge

 

      (in, out)

  }

最重要的是代码最后几行,我们定义了一个图,指明服务器如何处理请求,首先,我们将进来的请求广播到三个并行刘处理,然后调用我们的数据库获得一个ticket,然后将结果merge在一起,创建一个响应,我们获得的ticket将可能是 GOOG, AAPL or MSFT等数据,前后顺序取决于哪个步骤最快了,为了看到结果,我们加入一个sleep到getTickerHandler

def getTickerHandler(tickName: String)(request: HttpRequest): Future[String] = {

  // query the database

  val ticker = Database.findTicker(tickName)

 

  Thread.sleep(Math.random() * 1000 toInt)

 

  // use a simple for comprehension, to make

  // working with futures easier.

  for {

    t <- ticker

  } yield  {

    t match {

      case Some(bson) => convertToString(bson)

      case None => ""

    }

  }

}

Akka-stream提供了许多疾病构建块,你能使用它们创建流程,这里我们要压缩所有步骤在一起,因此创建流程如下:

// waits for events on the three inputs and returns a response

val zip = ZipWith[String, String, String, HttpResponse] (

  (inp1, inp2, inp3) => new HttpResponse(status = StatusCodes.OK,entity = inp1 + inp2 + inp3)

 

 

// define a flow which broadcasts the request to the three

// steps, and uses the zipWith to combine the elements before

val broadCastZipFlow = Flow[HttpRequest, HttpResponse]() {

  implicit builder =>

 

          bCast ~> step1 ~> zip.input1

    in ~> bCast ~> step2 ~> zip.input2 ~> out

          bCast ~> step3 ~> zip.input3

 

    (in, out)

}

当调用10次,结果输出如下:

{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217
{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217
{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217
{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282
{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217
{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217
{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282
{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217
{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217
{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282

 

使用Akka-http建立的http客户端测试这两个服务器

Akka-http提供了易于在http客户端使用stream/flow处理消息流,下面是完整代码:

import akka.actor.ActorSystem

import akka.http.Http

import akka.stream.FlowMaterializer

import akka.http.model._

import akka.stream.scaladsl._

import akka.stream.scaladsl.Source

import akka.stream.scaladsl.FlowGraphImplicits._

import scala.concurrent.ExecutionContext.Implicits.global

import scala.concurrent.Future

 

/**

 * Simple HTTP client created with akka-http

 */

object Client extends App {

 

  // the actor system to use. Required for flowmaterializer and HTTP.

  // passed in implicit

  implicit val system = ActorSystem("ServerTest")

  implicit val materializer = FlowMaterializer()

 

  val httpClient1 = Http(system).outgoingConnection("localhost", 8090).flow

  val httpClient2 = Http(system).outgoingConnection("localhost", 8091).flow

 

  // define a sink that will process the answer

  // we could also process this as a flow

  val printChunksConsumer = Sink.foreach[HttpResponse] { res =>

    if(res.status == StatusCodes.OK) {

 

      println("Recieved response : " + res);

      res.entity.getDataBytes().map {

        chunk =>

          System.out.println("Chunk: " + chunk.decodeString(HttpCharsets.`UTF-8`.value).substring(0, 80))

        }.to(Sink.ignore).run()

    } else

      println(res.status)

  }

 

  // we need to set allow cycles since internally the httpclient

  // has some cyclic flows (apparently)

  // we construct a sink, to which we connect a later to define source.

  val reqFlow2: Sink[HttpRequest] = Sink[HttpRequest]() { implicit b =>

    b.allowCycles()

    val source = UndefinedSource[HttpRequest]

    val bcast = Broadcast[HttpRequest]

    val concat = Concat[HttpResponse]

 

    // simple graph. Duplicate the request, send twice.

    // concat the result.

              bcast ~> httpClient1 ~> concat.first

    source ~> bcast ~> httpClient1 ~> concat.second ~> printChunksConsumer

    source

  }

 

  // make two calls, both return futures, first one shows direct linked sinks and

  // sources. Second one makes yse if our graph.

 

  // make number of calls

  val res = 1 to 5 map( i => {

    Source.single(HttpRequest()).to(reqFlow2).run().get(printChunksConsumer)

  })

  val f = Future.sequence(res)

 

  // make some calls with filled in request URI

  val f3 = Source.single(HttpRequest(uri = Uri("/getAllTickers"))).via(httpClient2).runWith(printChunksConsumer)

  val f4 = Source.single(HttpRequest(uri = Uri("/get?ticker=ADAT"))).via(httpClient2).runWith(printChunksConsumer)

  val f5 = Source.single(HttpRequest(uri = Uri("/get?tikcer=FNB"))).via(httpClient2).runWith(printChunksConsumer)

 

  for {

    f2Result <- f

    f2Result <- f3

    f2Result <- f4

    f2Result <- f5

  } yield ({

      println("All calls done")

      system.shutdown()

      system.awaitTermination()

    }

  )

}

使用Scala和Spray实现REST

Scala专题

Akka教程

使用Scala和Spray实现REST

Reactive响应式编程