Sirix.io是如何基于Vert.x和Kotlin协程构建异步RESTful API

18-12-30 banq
                   

Sirix是一个存储系统,它的核心是日志结构,读取可以是随机的,并且在事务提交期间将写入批处理并同步到磁盘。数据永远不会写回到同一个地方,因此不会就地修改,相反,Sirix在记录级别使用写时复制(COW)(因此,它创建页面片段并且通常不复制整个页面),每次必须修改页面时,已更改的记录都会写入新位置,确切复制哪些记录取决于所使用的版本控制算法。

对数据库/资源​​的更改发生在资源绑定事务中。因此,必须打开ResourceManager才能创建写入事务。在任何时候,只允许与N读取事务同时进行一次写入事务。每个事务都绑定到一个修订版,而它们可以在任何修订版上打开,无论哪个修订版都无关紧要。

Vert.x在Node.js和JVM之后进行了严格的建模。Vert.x中的所有内容都应该是非阻塞的。因此,称为事件循环的单个线程可以处理大量请求。阻止调用必须在特殊的线程池上处理。默认值是每个CPU两个事件循环(多反应器模式)。

我们正在使用Kotlin,因为它简单而简洁。其中一个非常有趣的功能是协同程序。从概念上讲,它们就像非常轻量级的线程 另一方面,创建线程非常昂贵。关于协同程序的一个很酷的事情是,它们允许编写几乎像顺序的异步代码。每当一个协程将被挂起时,底层线程不会被阻塞并且可以被重用。在引擎盖下,每个挂起函数通过Kotlin编译器获得另一个参数,这是一个延续,它存储恢复函数的位置(正常恢复,恢复异常)。

Keycloak用作OAuth2授权服务器(密码凭据流量),因为我们决定不自己实现授权。

为了获得访问令牌,首先必须针对POST / login进行请求- 使用身份中作为JSON对象发送的用户名/密码凭证进行路由。实现代码:

post("/login").produces("application/json").coroutineHandler { rc ->
    val userJson = rc.bodyAsJson
    val user = keycloak.authenticateAwait(userJson)
    rc.response().end(user.principal().toString())
}

coroutine-handler是一个简单的扩展函数:

/* An extension method for simplifying coroutines usage with Vert.x Web routers. */
private fun Route.coroutineHandler(fn: suspend (RoutingContext) -> Unit) {
  handler { ctx ->
    launch(ctx.vertx().dispatcher()) {
      try {
        fn(ctx)
      } catch (e: Exception) {
        ctx.fail(e)
      }
    }
  }
}

协程序在Vert.x事件循环(调度程序)上启动。

这是为了执行更长的运行处理程序:

vertxContext.executeBlockingAwait(Handler < Future < Nothing >> {
  //更长时间运行任务
})

Vert.x为这类任务使用不同的线程池。因此,该任务在另一个线程中执行。请注意当协程被暂停,事件循环不会被阻止。

现在我们再次将焦点转移到我们的API,并展示它是如何设计的。我们首先需要设置我们的服务器和Keycloak。

一旦两个服务器都启动并运行,我们就能够编写一个简单的HTTP客户端。我们首先必须让/login使用指定的“用户名/密码”JSON-Object 从端点获取令牌。在Kotlin中使用异步HTTP客户端(来自Vert.x),它看起来像这样:

val server = "https://localhost:9443"

val credentials = json {
  obj("username" to "testUser",
      "password" to "testPass")
}

val response = client.postAbs("$server/login").sendJsonAwait(credentials)

if (200 == response.statusCode()) {
  val user = response.bodyAsJsonObject()
  val accessToken = user.getString("access_token")
}

然后,必须在Authorization HTTP-Header中为每个后续请求发送此访问令牌。存储第一个资源看起来像这样(简单的HTTP PUT-Request):

val xml = """
    <xml>
      foo
      <bar/>
    </xml>
""".trimIndent()

var httpResponse = client.putAbs("$server/database/resource1").putHeader(HttpHeaders.AUTHORIZATION.toString(), "Bearer $accessToken").sendBufferAwait(Buffer.buffer(xml))

if (200 == response.statusCode()) {
  println("Stored document.")
} else {
  println("Something went wrong ${response.message}")
}

首先,创建一个名称database带有一些元数据的空数据库,然后使用名称存储XML片段resource1。PUT HTTP-Request是幂等的。具有相同URL端点的另一个PUT-Request将删除以前的数据库和资源并重新创建数据库/资源​

HTTP-Response应为200,产生HTTP-body:

<rest:sequence xmlns:rest="https://sirix.io/rest">
  <rest:item>
    <xml rest:id="1">
      foo
      <bar rest:id="3"/>
    </xml>
  </rest:item>
</rest:sequence>

以上是从存储系统为元素节点序列化生成ID。

然后通过GET HTTP-Request,https://localhost:9443/database/resource1我们还可以再次检索存储的资源。

然而,到目前为止,这并不是很有趣。我们可以更新资源POST-Request。假设我们像以前一样检索了访问令牌,我们可以简单地执行POST-Request并使用我们之前收集的有关node-ID的信息:

val xml = """
    <test>
      yikes
      <bar/>
    </test>
""".trimIndent()

val url = "$server/database/resource1?nodeId=3&insert=asFirstChild"

val httpResponse = client.postAbs(url).putHeader(HttpHeaders.AUTHORIZATION
                         .toString(), "Bearer $accessToken").sendBufferAwait(Buffer.buffer(xml))

有趣的部分是URL,我们用作端点。我们简单地说,选择ID为3的节点,然后将给定的XML片段作为第一个子片段插入。这将生成以下序列化XML文档:

<rest:sequence xmlns:rest="https://sirix.io/rest">
  <rest:item>
    <xml rest:id="1">
      foo
      <bar rest:id="3">
        <test rest:id="4">
          yikes
          <bar rest:id="6"/>
        </test>
      </bar>
    </xml>
  </rest:item>
</rest:sequence>

每个PUT-以及POST请求都隐含commits了底层事务。因此,我们现在能够再次发送第一个GET请求来检索整个资源的内容,例如通过指定一个简单的XPath查询,在所有版本中选择根节点GET https://localhost:9443/database/resource1?query=/xml/all-time::*并获得以下XPath结果:

<rest:sequence xmlns:rest="https://sirix.io/rest">
  <rest:item rest:revision="1" rest:revisionTimestamp="2018-12-20T18:44:39.464Z">
    <xml rest:id="1">
      foo
      <bar rest:id="3"/>
    </xml>
  </rest:item>
  <rest:item rest:revision="2" rest:revisionTimestamp="2018-12-20T18:44:39.518Z">
    <xml rest:id="1">
      foo
      <bar rest:id="3">
        <xml rest:id="4">
          foo
          <bar rest:id="6"/>
        </xml>
      </bar>
    </xml>
  </rest:item>
</rest:sequence>

一般来说,我们支持几个额外的时间XPath轴:future ::,future-or-self ::,past ::,past-or-self ::,previous ::,previous-or-self ::,next ::, next-or-self ::,first ::,last ::,all-time ::

通过在GET请求中指定序列化(开始和结束修订参数)的一系列修订,可以实现相同的目的:

GET https://localhost:9443/database/resource1?start-revision=1&end-revision=2

或通过时间戳:

GET https://localhost:9443/database/resource1?start-revision-timestamp=2018-12-20T18:00:00&end-revision-timestamp=2018-12-20T19:00:00

我们肯定也能够通过更新XQuery表达式(不是非常RESTful)或使用简单的DELETEHTTP请求来删除资源或其任何子树:

val url = "$server/database/resource1?nodeId=3"

val httpResponse = client.deleteAbs(url).putHeader(HttpHeaders.AUTHORIZATION
                         .toString(), "Bearer $accessToken").sendAwait()

if (200 == httpResponse.statusCode()) {
  ...
}

这将删除ID为3的节点,在我们的例子中,因为它是整个子树的元素节点。肯定它已作为修订版3提交,因此所有旧版本仍然可以查询整个子树(或者在第一个修订版中,它只是名称为“bar”而没有任何子树的元素)。

如果我们想得到一个差异,目前以XQuery Update语句的形式(但我们可以以任何格式序列化它们),只需调用XQuery函数sdb:diff,该函数定义为:

sdb:diff($coll as xs:string, $res as xs:string, $rev1 as xs:int, $rev2 as xs:int) as xs:string

例如,通过我们上面创建的数据库/ resource1这样的GET请求,我们可以发出以下请求:

GET https://localhost:9443/?query=sdb%3Adiff%28%27database%27%2C%27resource1%27%2C1%2C2%29

请注意,query-String必须进行URL编码,然后对其进行解码

sdb:diff('database','resource1',1,2)

在我们的示例中,diff的输出是包含在封闭sequence-element中的XQuery-Update语句:

<rest:sequence xmlns:rest="https://sirix.io/rest">
  let $doc := sdb:doc('database','resource1', 1)
  return (
    insert nodes <xml>foo<bar/></xml> as first into sdb:select-node($doc, 3)
  )
</rest:sequence>

这意味着resource1从database第一次修订中打开,然后将子树<xml>foo<bar/></xml>附加到具有稳定节点ID 3作为第一子节点的节点。