vertx-lang-kotlin-coroutines
vertx-lang-kotlin-coroutines
集成了 Kotlin 协程 以执行异步操作和处理事件。
以便在不阻塞内核线程的情况下使用类似顺序代码的编程模型。
介绍
Vert.x 不同于许多经典应用平台的一个关键优势是它几乎是完全非阻塞的 (指内核线程)。 这允许基于 Vert.x 的应用程序使用极小数量的内核线程处理极高的并发(比如许多连接和消息), 这为强大的扩容能力扫除了障碍。
非阻塞的特性促生了异步的API,异步API存在很多种变种形式, 比如 callbacks, promises, fibers 和 reactive 扩展。在核心API中,Vert.x使用了callback风格, 但它也支持其他模型比如 RxJava。
在一些情况下,使用异步 API 编程要比使用经典的/顺序风格更加富有挑战性, 特别是当几个操作需要以顺序的形式完成时。 同理,当使用异步 API 时,错误传递通常也更加复杂。
vertx-lang-kotlin-coroutines
使用 协程 。协程是一种非常轻量级的、不对应到内核线程的线程,
因此当一个 协程 需要“阻塞”时,它会 挂起 并释放当前的内核线程,
使另一个协程可以处理事件。
vertx-lang-kotlin-coroutines
使用 kotlinx.coroutines 来实现协程。
注意
|
vertx-lang-kotlin-coroutines 目前仅在 Kotlin 下工作,并且将在 Kotlin 1.3
脱离 Experimental(实验性) 状态
|
在 Vert.x 上下文中运行协程
导入 io.vertx.kotlin.coroutines.VertxCoroutine
后,通过 GlobalScope.launch
方法可以在 "Global" 作用域(生命周期与应用一致)
中运行一个协程代码块,
val vertx = Vertx.vertx()
GlobalScope.launch(vertx.dispatcher()) {
val timerId = awaitEvent<Long> { handler ->
vertx.setTimer(1000, handler)
}
println("Event fired from timer with id $timerId")
}
vertx.dispatcher()
返回了一个协程调度器,以便让协程运行在 Vert.x 的事件循环上。
awaitEvent
函数挂起了协程,当定时器超时时协程会通过被提供的 handler
恢复执行。
下一章节会提供更多有关 handler, 事件和流事件的信息。
继承 CoroutineVerticle
你可以在代码中编写一个继承于 io.vertx.kotlin.coroutines.CoroutineVerticle
的实例,这是专为 Kotlin 协程准备的一个特殊类型的 Verticle。
CoroutineVerticle
类实现了 kotlinx.coroutines.experimental.CoroutineScope
接口,所有协程构建器都默认绑定到 Verticle 的上下文。
你应当重写 Verticle 中的suspend方法 start()
,
以及 stop()
(可选的):
class MyVerticle : CoroutineVerticle() {
override suspend fun start() {
// ...
}
override suspend fun stop() {
// ...
}
}
以上所有的代码都将会被运行在一个 CoroutineVerticle
实例中,但是你也可以把所有的 <builder> { .. }
都替换为
GlobalScope.<builder> { .. }
以使代码运行在应用内自定义的作用域。
单次获取异步执行的结果
Vert.x 中的许多异步操作的最后一个参数是一个 Handler<AsyncResult<T>>
。
比如在通过 Vert.x Mongo 客户端获取一个对象,或者在事件总线上发送一条消息并等待回复的时候。
可以通过 awaitResult
方法来返回值或者抛出一个异常。
协程将被挂起直到事件被处理,内核线程不会被阻塞。
该方法通过指定一个会在运行时被传入 handler 的异步操作代码块来运行。
示例如下:
suspend fun awaitResultExample() {
val consumer = vertx.eventBus().localConsumer<String>("a.b.c")
consumer.handler { message ->
println("Consumer received: ${message.body()}")
message.reply("pong")
}
// 发送一条消息并等待回复
val reply = awaitResult<Message<String>> { h ->
vertx.eventBus().request("a.b.c", "ping", h)
}
println("Reply received: ${reply.body()}")
}
当块中的代码产生了一个异常时,调用者可以像处理一个普通的异常一样,使用
try
/catch
来处理异常:
suspend fun awaitResultFailureExample() {
val consumer = vertx.eventBus().localConsumer<String>("a.b.c")
consumer.handler { message ->
// consumer会接收到一个失败
message.fail(0, "it failed!!!")
}
// 发送一条消息并等待回复
try {
awaitResult<Message<String>> { h ->
vertx.eventBus().request("a.b.c", "ping", h)
}
} catch (e: ReplyException) {
// 在这里处理特定的异常回复
println("Reply failure: ${e.message}")
}
}
单次获取事件
可以使用函数 awaitEvent
来处理一个单次事件(如果事件再次发生,它将不会继续被处理)。
suspend fun awaitEventExample() {
val id = awaitEvent<Long> { h -> vertx.setTimer(2000L, h) }
println("This should be fired in 2s by some time with id=$id")
}
单次获取 worker 结果
处理阻塞计算的结果可以使用函数 awaitBlocking
:
suspend fun awaitBlockingExample() {
awaitBlocking {
Thread.sleep(1000)
"some-string"
}
}
事件流
Vert.x API 有许多地方使用处理器(handlers)处理事件流。 以下示例包含了处理事件总线消息和 HTTP 服务请求。
ReceiveChannelHandler
类允许通过suspend方法 receive
接收事件:
suspend fun streamExample() {
val adapter = vertx.receiveChannelHandler<Message<Int>>()
vertx.eventBus().localConsumer<Int>("a.b.c").handler(adapter)
// 发送15条消息
for (i in 0..15) vertx.eventBus().send("a.b.c", i)
// 接收前面10条消息
for (i in 0..10) {
val message = adapter.receive()
println("Received: ${message.body()}")
}
}
获取 Vert.x 异步操作的完成结果
Vert.x 4 提供了 future 模型并且 Future
包含一个可用来异步获取结果的suspend方法 await()
。
Vert.x 异步结果实例上的 await
扩展方法挂起协程直到异步操作完成,并返回一个关联的 AsyncResult<T>
对象。
suspend fun awaitingFuture(anotherFuture: Future<String>) {
// 获取一个 future
val httpServerFuture = vertx.createHttpServer()
.requestHandler { req -> req.response().end("Hello!") }
.listen(8000)
val httpServer = httpServerFuture.await()
println("HTTP server port: ${httpServer.actualPort()}")
// 对于 composite futures 也一样
val result = CompositeFuture.all(httpServerFuture, anotherFuture).await()
if (result.succeeded()) {
println("The server is now running!")
} else {
result.cause().printStackTrace()
}
}
suspend(可挂起)的扩展方法
为了简化使用 Vert.x 异步 API 的协程编写,Vert.x 3 生成了扩展方法。
它让用户不必使用 awaitResult
,这使得代码更加简洁和可读。
Vert.x 4 提供了基于 future 的 API,但那些扩展方法仍会存在, 但将被 弃用 。
suspend fun generatedSuspendingExtensionMethod() {
// 使用扩展方法代替 awaitResult
val client = vertx.createNetClient()
val socket = client.connect(1234, "localhost").await()
}
channels(通道)
channel类似于 Java 的 BlockingQueue
,不同之处在于它不是阻塞的,而是在如下情况中挂起协程:
-
向一个满的channel中写入值
-
从一个空的channel中读取值
可以通过使用 toChannel
扩展方法使 Vert.x 的 ReadStream
和 WriteStream
适配到channel
这些适配器将会管理背压和流终止
-
ReadStream<T>
适配到ReceiveChannel<T>
-
WriteStream<T>
适配到SendChannel<T>
接收数据
当你需要处理一系列互相关联的值时,channel非常有用:
suspend fun handleTemperatureStream() {
val stream = vertx.eventBus().consumer<Double>("temperature")
val channel = stream.toChannel(vertx)
var min = Double.MAX_VALUE
var max = Double.MIN_VALUE
// 迭代直到 stream 被关闭
// 非阻塞的
for (msg in channel) {
val temperature = msg.body()
min = Math.min(min, temperature)
max = Math.max(max, temperature)
}
// stream 现在被关闭了
}
解析协议时,channel也非常有用,下面我们将构建一个非阻塞的 HTTP 请求解析器来展示channel的强大功能。
我们将依靠 RecordParser
来根据 \r\n
切分缓冲流。
下面是这个解析器的一个初始版本,它仅处理 HTTP 的请求行
vertx.createNetServer().connectHandler { socket ->
// 记录解析器提供了一个以\r\n分隔的缓冲流
val stream = RecordParser.newDelimited("\r\n", socket)
// 将 stream 转换为一个 Kotlin channel
val channel = stream.toChannel(vertx)
// 启动协程
launch {
// 接收请求行
// 非阻塞
val line = channel.receive().toString().split(" ")
val method = line[0]
val uri = line[1]
println("Received HTTP request ($method, $uri)")
// 仍然需要解析标题和正文……
}
}
解析请求行只需简单地在channel上调用 receive
。
下一步是通过接收分块来解析 HTTP 头,直到遇到一个空白行。
// 接收 HTTP 头
val headers = HashMap<String, String>()
while (true) {
// 非阻塞
val header = channel.receive().toString()
// 完成头解析
if (header.isEmpty()) {
break
}
val pos = header.indexOf(':')
headers[header.substring(0, pos).toLowerCase()] = header.substring(pos + 1).trim()
}
println("Received HTTP request ($method, $uri) with headers ${headers.keys}")
最终我们用处理一个可选的请求体来终止解析器
// 接收请求体
val transferEncoding = headers["transfer-encoding"]
val contentLength = headers["content-length"]
val body: Buffer?
if (transferEncoding == "chunked") {
// 处理分块编码,例如
// 5\r\n
// HELLO\r\n
// 0\r\n
// \r\n
body = Buffer.buffer()
while (true) {
// 解析长度块
// 非阻塞
val len = channel.receive().toString().toInt(16)
if (len == 0) {
break
}
// 翻转stream以解析确切大小的块
stream.fixedSizeMode(len + 2)
// 接收数据块并添加到末尾
// 非阻塞
val chunk = channel.receive()
body.appendBuffer(chunk, 0, chunk.length() - 2)
// stream被翻转回\r\n分隔符以解析下一个块
stream.delimitedMode("\r\n")
}
} else if (contentLength != null) {
// 翻转stream以解析确切大小的块
stream.fixedSizeMode(contentLength.toInt())
// 非阻塞
body = channel.receive()
} else {
body = null
}
val bodySize = body?.length() ?: 0
println("Received HTTP request ($method, $uri) with headers ${headers.keys} and body with size $bodySize")
发送数据
使用channel发送数据非常简单清晰:
suspend fun sendChannel(httpResponse: HttpServerResponse) {
val channel = httpResponse.toChannel(vertx)
while (true) {
val buffer = readBuffer()
// 广播 temperature
// 非阻塞但是可以被挂起
channel.send(buffer)
//等待1秒
awaitEvent<Long> { vertx.setTimer(1000, it) }
}
}
SendChannel#send
和 WriteStream#write
都是非阻塞操作,然而不同于
SendChannel#send
可以在channel满时暂停执行,不使用channel的情况看起来像
fun broadcastTemperature(httpResponse: HttpServerResponse) {
// 检查是否可以向 stream 中写入
if (httpResponse.writeQueueFull()) {
// 这时我们不能写入,所以我们设置了一个引流处理程序,当我们可以再次写的时候被调用
httpResponse.drainHandler { broadcastTemperature(httpResponse) }
} else {
// 读入 temperature
val temperature = readBuffer()
// 将它写回 stream
httpResponse.write(temperature)
// 等待1秒
vertx.setTimer(1000) {
broadcastTemperature()
}
}
}
延迟,取消和超时
Vertx 调度器通过Vert.x timers为协程的 delay
函数提供了完整的支持:
launch {
// 设置一个1秒的 Vertx timer计时器
delay(1000)
}
定时器支持取消
val job = launch {
// 设置一个1秒的 Vertx timer计时器
while (true) {
delay(1000)
// 做一些周期性的工作
}
}
// 一段时间后
job.cancel()
取消操作是 协作的
你也可以使用 withTimeout
来设定一个超时值
launch {
try {
val id = withTimeout<String>(1000) {
awaitEvent { anAsyncMethod(it) }
}
} catch (e: TimeoutCancellationException) {
// 被取消
}
}
协程构建器
Vert.x 适用于任何协程构建器,如 launch
, async
, produce
…… ,只要 CoroutineScope
实例是有效的。
下面是几个注意事项:
-
不要在 Vert.x 事件循环线程中使用
runBlocking
,因为这个方法不需要提供CoroutineScope
。 -
为了避免内存泄漏,请始终使用
coroutineScope {..}
来定义一个子作用域。这样,如果作用域中的一个协程失败,所有在该作用域中的协程也都会被取消。
协程互操作性
Vert.x 集成被设计成可与 Kotlin 协程全面互通
-
当使用 vertx 调度器时,
kotlinx.coroutines.experimental.sync.Mutex
将在事件循环线程上执行
RxJava 互操作性
模块 vertx-lang-kotlin-coroutines
没有提供与 RxJava 的特定集成,然而 Kotlin 协程提供与 RxJava 的集成,
它可以很好地和 vertx-lang-kotlin-coroutines
一起工作。
可以在这里了解更多: Coroutines for reactive streams 。