vertx-lang-kotlin-coroutines

vertx-lang-kotlin-coroutines 集成了 Kotlin 协程 以执行异步操作和处理事件。 以便在不阻塞内核线程的情况下使用类似顺序代码的编程模型。

介绍

Vert.x 不同于许多经典应用平台的一个关键优势是它几乎是完全非阻塞的 (指内核线程)。 这允许基于 Vert.x 的应用程序使用极小数量的内核线程处理极高的并发(比如许多连接和消息), 这为强大的扩容能力扫除了障碍。

非阻塞的特性促生了异步的API,异步API存在很多种变种形式, 比如 callbacks, promises, fibers 和 reactive 扩展。在核心API中,Vert.x使用了callback风格, 但它也支持其他模型比如 RxJava。

在一些情况下,使用异步 API 编程要比使用经典的/顺序风格更加富有挑战性, 特别是当几个操作需要以顺序的形式完成时。 同理,当使用异步 API 时,错误传递通常也更加复杂。

vertx-lang-kotlin-coroutines 使用 协程 。协程是一种非常轻量级的、不对应到内核线程的线程, 因此当一个 协程 需要“阻塞”时,它会 挂起 并释放当前的内核线程, 使另一个协程可以处理事件。

vertx-lang-kotlin-coroutines 使用 kotlinx.coroutines 来实现协程。

注意
vertx-lang-kotlin-coroutines 目前仅在 Kotlin 下工作,并且将在 Kotlin 1.3 脱离 Experimental(实验性) 状态

在 Vert.x 上下文中运行协程

导入 io.vertx.kotlin.coroutines.VertxCoroutine 后,通过 GlobalScope.launch 方法可以在 "Global" 作用域(生命周期与应用一致) 中运行一个协程代码块,

val vertx = Vertx.vertx()

GlobalScope.launch(vertx.dispatcher()) {
  val timerId = awaitEvent<Long> { handler ->
    vertx.setTimer(1000, handler)
  }
  println("Event fired from timer with id $timerId")
}

vertx.dispatcher() 返回了一个协程调度器,以便让协程运行在 Vert.x 的事件循环上。

awaitEvent 函数挂起了协程,当定时器超时时协程会通过被提供的 handler 恢复执行。

下一章节会提供更多有关 handler, 事件和流事件的信息。

继承 CoroutineVerticle

你可以在代码中编写一个继承于 io.vertx.kotlin.coroutines.CoroutineVerticle 的实例,这是专为 Kotlin 协程准备的一个特殊类型的 Verticle。 CoroutineVerticle 类实现了 kotlinx.coroutines.experimental.CoroutineScope 接口,所有协程构建器都默认绑定到 Verticle 的上下文。 你应当重写 Verticle 中的suspend方法 start() , 以及 stop()(可选的):

class MyVerticle : CoroutineVerticle() {
  override suspend fun start() {
    // ...
  }

  override suspend fun stop() {
    // ...
  }
}

以上所有的代码都将会被运行在一个 CoroutineVerticle 实例中,但是你也可以把所有的 <builder> { .. } 都替换为 GlobalScope.<builder> { .. } 以使代码运行在应用内自定义的作用域。

单次获取异步执行的结果

Vert.x 中的许多异步操作的最后一个参数是一个 Handler<AsyncResult<T>> 。 比如在通过 Vert.x Mongo 客户端获取一个对象,或者在事件总线上发送一条消息并等待回复的时候。

可以通过 awaitResult 方法来返回值或者抛出一个异常。

协程将被挂起直到事件被处理,内核线程不会被阻塞。

该方法通过指定一个会在运行时被传入 handler 的异步操作代码块来运行。

示例如下:

suspend fun awaitResultExample() {
  val consumer = vertx.eventBus().localConsumer<String>("a.b.c")
  consumer.handler { message ->
    println("Consumer received: ${message.body()}")
    message.reply("pong")
  }

  // 发送一条消息并等待回复
  val reply = awaitResult<Message<String>> { h ->
    vertx.eventBus().request("a.b.c", "ping", h)
  }
  println("Reply received: ${reply.body()}")
}

当块中的代码产生了一个异常时,调用者可以像处理一个普通的异常一样,使用 try/catch 来处理异常:

suspend fun awaitResultFailureExample() {
  val consumer = vertx.eventBus().localConsumer<String>("a.b.c")
  consumer.handler { message ->
    // consumer会接收到一个失败
    message.fail(0, "it failed!!!")
  }

  // 发送一条消息并等待回复
  try {
    awaitResult<Message<String>> { h ->
      vertx.eventBus().request("a.b.c", "ping", h)
    }
  } catch (e: ReplyException) {
    // 在这里处理特定的异常回复
    println("Reply failure: ${e.message}")
  }
}

单次获取事件

可以使用函数 awaitEvent 来处理一个单次事件(如果事件再次发生,它将不会继续被处理)。

suspend fun awaitEventExample() {
  val id = awaitEvent<Long> { h -> vertx.setTimer(2000L, h) }
  println("This should be fired in 2s by some time with id=$id")
}

单次获取 worker 结果

处理阻塞计算的结果可以使用函数 awaitBlocking

suspend fun awaitBlockingExample() {
  awaitBlocking {
    Thread.sleep(1000)
    "some-string"
  }
}

事件流

Vert.x API 有许多地方使用处理器(handlers)处理事件流。 以下示例包含了处理事件总线消息和 HTTP 服务请求。

ReceiveChannelHandler 类允许通过suspend方法 receive 接收事件:

suspend fun streamExample() {
  val adapter = vertx.receiveChannelHandler<Message<Int>>()
  vertx.eventBus().localConsumer<Int>("a.b.c").handler(adapter)

  // 发送15条消息
  for (i in 0..15) vertx.eventBus().send("a.b.c", i)

  // 接收前面10条消息
  for (i in 0..10) {
    val message = adapter.receive()
    println("Received: ${message.body()}")
  }
}

获取 Vert.x 异步操作的完成结果

Vert.x 4 提供了 future 模型并且 Future 包含一个可用来异步获取结果的suspend方法 await()

Vert.x 异步结果实例上的 await 扩展方法挂起协程直到异步操作完成,并返回一个关联的 AsyncResult<T> 对象。

suspend fun awaitingFuture(anotherFuture: Future<String>) {
  // 获取一个 future
  val httpServerFuture = vertx.createHttpServer()
    .requestHandler { req -> req.response().end("Hello!") }
    .listen(8000)

  val httpServer = httpServerFuture.await()
  println("HTTP server port: ${httpServer.actualPort()}")

  // 对于 composite futures 也一样
  val result = CompositeFuture.all(httpServerFuture, anotherFuture).await()
  if (result.succeeded()) {
    println("The server is now running!")
  } else {
    result.cause().printStackTrace()
  }
}

suspend(可挂起)的扩展方法

为了简化使用 Vert.x 异步 API 的协程编写,Vert.x 3 生成了扩展方法。 它让用户不必使用 awaitResult ,这使得代码更加简洁和可读。

Vert.x 4 提供了基于 future 的 API,但那些扩展方法仍会存在, 但将被 弃用

suspend fun generatedSuspendingExtensionMethod() {
  // 使用扩展方法代替 awaitResult
  val client = vertx.createNetClient()
  val socket = client.connect(1234, "localhost").await()
}

channels(通道)

channel类似于 Java 的 BlockingQueue ,不同之处在于它不是阻塞的,而是在如下情况中挂起协程:

  • 向一个满的channel中写入值

  • 从一个空的channel中读取值

可以通过使用 toChannel 扩展方法使 Vert.x 的 ReadStreamWriteStream 适配到channel

这些适配器将会管理背压和流终止

  • ReadStream<T> 适配到 ReceiveChannel<T>

  • WriteStream<T> 适配到 SendChannel<T>

接收数据

当你需要处理一系列互相关联的值时,channel非常有用:

suspend fun handleTemperatureStream() {
  val stream = vertx.eventBus().consumer<Double>("temperature")
  val channel = stream.toChannel(vertx)

  var min = Double.MAX_VALUE
  var max = Double.MIN_VALUE

  // 迭代直到 stream 被关闭
  // 非阻塞的
  for (msg in channel) {
    val temperature = msg.body()
    min = Math.min(min, temperature)
    max = Math.max(max, temperature)
  }

  // stream 现在被关闭了
}

解析协议时,channel也非常有用,下面我们将构建一个非阻塞的 HTTP 请求解析器来展示channel的强大功能。

我们将依靠 RecordParser 来根据 \r\n 切分缓冲流。

下面是这个解析器的一个初始版本,它仅处理 HTTP 的请求行

vertx.createNetServer().connectHandler { socket ->

  // 记录解析器提供了一个以\r\n分隔的缓冲流
  val stream = RecordParser.newDelimited("\r\n", socket)

  // 将 stream 转换为一个 Kotlin channel
  val channel = stream.toChannel(vertx)

  // 启动协程
  launch {

    // 接收请求行
    // 非阻塞
    val line = channel.receive().toString().split(" ")
    val method = line[0]
    val uri = line[1]

    println("Received HTTP request ($method, $uri)")

    // 仍然需要解析标题和正文……
  }
}

解析请求行只需简单地在channel上调用 receive

下一步是通过接收分块来解析 HTTP 头,直到遇到一个空白行。

// 接收 HTTP 头
val headers = HashMap<String, String>()
while (true) {

  // 非阻塞
  val header = channel.receive().toString()

  // 完成头解析
  if (header.isEmpty()) {
    break
  }

  val pos = header.indexOf(':')
  headers[header.substring(0, pos).toLowerCase()] = header.substring(pos + 1).trim()
}

println("Received HTTP request ($method, $uri) with headers ${headers.keys}")

最终我们用处理一个可选的请求体来终止解析器

// 接收请求体
val transferEncoding = headers["transfer-encoding"]
val contentLength = headers["content-length"]

val body: Buffer?
if (transferEncoding == "chunked") {

  // 处理分块编码,例如
  // 5\r\n
  // HELLO\r\n
  // 0\r\n
  // \r\n

  body = Buffer.buffer()
  while (true) {

    // 解析长度块
    // 非阻塞
    val len = channel.receive().toString().toInt(16)
    if (len == 0) {
      break
    }

    // 翻转stream以解析确切大小的块
    stream.fixedSizeMode(len + 2)

    // 接收数据块并添加到末尾
    // 非阻塞
    val chunk = channel.receive()
    body.appendBuffer(chunk, 0, chunk.length() - 2)

    // stream被翻转回\r\n分隔符以解析下一个块
    stream.delimitedMode("\r\n")
  }
} else if (contentLength != null) {

  // 翻转stream以解析确切大小的块
  stream.fixedSizeMode(contentLength.toInt())

  // 非阻塞
  body = channel.receive()
} else {
  body = null
}

val bodySize = body?.length() ?: 0
println("Received HTTP request ($method, $uri) with headers ${headers.keys} and body with size $bodySize")

发送数据

使用channel发送数据非常简单清晰:

suspend fun sendChannel(httpResponse: HttpServerResponse) {
  val channel = httpResponse.toChannel(vertx)

  while (true) {
    val buffer = readBuffer()

    // 广播 temperature
    // 非阻塞但是可以被挂起
    channel.send(buffer)

    //等待1秒
    awaitEvent<Long> { vertx.setTimer(1000, it) }
  }
}

SendChannel#sendWriteStream#write 都是非阻塞操作,然而不同于 SendChannel#send 可以在channel满时暂停执行,不使用channel的情况看起来像

fun broadcastTemperature(httpResponse: HttpServerResponse) {
// 检查是否可以向 stream 中写入
  if (httpResponse.writeQueueFull()) {

    // 这时我们不能写入,所以我们设置了一个引流处理程序,当我们可以再次写的时候被调用
    httpResponse.drainHandler { broadcastTemperature(httpResponse) }
  } else {

    // 读入 temperature
    val temperature = readBuffer()

    // 将它写回 stream
    httpResponse.write(temperature)

    // 等待1秒
    vertx.setTimer(1000) {
      broadcastTemperature()
    }
  }
}

延迟,取消和超时

Vertx 调度器通过Vert.x timers为协程的 delay 函数提供了完整的支持:

launch {
  // 设置一个1秒的 Vertx timer计时器
  delay(1000)
}

定时器支持取消

val job = launch {
  // 设置一个1秒的 Vertx timer计时器
  while (true) {
    delay(1000)
    // 做一些周期性的工作
  }
}

// 一段时间后
job.cancel()

取消操作是 协作的

你也可以使用 withTimeout 来设定一个超时值

launch {
  try {
    val id = withTimeout<String>(1000) {
      awaitEvent { anAsyncMethod(it) }
    }
  } catch (e: TimeoutCancellationException) {
    // 被取消
  }
}

协程构建器

Vert.x 适用于任何协程构建器,如 launchasyncproduce …… ,只要 CoroutineScope 实例是有效的。 下面是几个注意事项:

  • 不要在 Vert.x 事件循环线程中使用 runBlocking ,因为这个方法不需要提供 CoroutineScope

  • 为了避免内存泄漏,请始终使用 coroutineScope {..} 来定义一个子作用域。这样,如果作用域中的一个协程失败,所有在该作用域中的协程也都会被取消。

协程互操作性

Vert.x 集成被设计成可与 Kotlin 协程全面互通

  • 当使用 vertx 调度器时,kotlinx.coroutines.experimental.sync.Mutex 将在事件循环线程上执行

RxJava 互操作性

模块 vertx-lang-kotlin-coroutines 没有提供与 RxJava 的特定集成,然而 Kotlin 协程提供与 RxJava 的集成, 它可以很好地和 vertx-lang-kotlin-coroutines 一起工作。

可以在这里了解更多: Coroutines for reactive streams