Vert.x Core 文档手册
Vert.x 的核心 Java API 被我们称为 Vert.x Core
Vert.x Core 提供了下列功能:
-
编写 TCP 客户端和服务端
-
编写支持 WebSocket 的 HTTP 客户端和服务端
-
事件总线
-
共享数据 —— 本地的Map和分布式集群Map
-
周期性、延迟性动作
-
部署和撤销 Verticle 实例
-
数据报套接字
-
DNS客户端
-
文件系统访问
-
高可用性
-
本地传输
-
集群
Vert.x Core 中的功能相当底层,不包含诸如数据库访问、授权或高层 Web 应用的功能。 您可以在 Vert.x ext (扩展包)(译者注:Vert.x的扩展包是Vert.x的子项目集合,类似 Web、 Web Client、 Databases等)中找到这些功能。
Vert.x Core 小巧而轻便,您可以只使用您需要的部分, 它可整体嵌入现存应用中。 Vert.x没有强制要求使用特定的方式构造应用。
Vert.x也支持在其他语言中使用Vert.x Core, 而且在使用诸如 JavaScript 或 Ruby 等语言编写Vert.x代码时,无需直接调用 Java的API;毕竟不同的语言有不同的代码风格, 若强行让 Ruby 开发人员遵循 Java 的代码风格会很怪异, 所以我们根据 Java API 自动生成了适应不同语言代码风格的 API。
从现在开始,我们将仅使用 core 以指代 Vert.x core 。
如果您在使用 Maven 或 Gradle(译者注:两种常用的项目构建工具), 将以下依赖项添加到项目描述文件的 dependencies 节点即可使用 Vert.x Core 的API:
-
Maven (您的
pom.xml
中)
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>4.0.3</version>
</dependency>
-
Gradle (您的
build.gradle
中)
dependencies {
compile 'io.vertx:vertx-core:4.0.3'
}
接下来讨论 Vert.x Core 的概念和特性。
故事从 Vert.x 开始
使用Vert.x进行开发离不开 Vertx
对象。
它是 Vert.x 的控制中心,也是您做几乎一切事情的基础,包括创建客户端和服务器、 获取事件总线的引用、设置定时器等等。
那么如何获取它的实例呢?
如果您用嵌入方式使用Vert.x,可通过以下代码创建实例:
Vertx vertx = Vertx.vertx();
注意
|
大部分应用将只会需要一个Vert.x实例,但如果您有需要也可创建多个Vert.x实例, 如:隔离的事件总线或不同组的客户端和服务器。 |
创建 Vertx 对象时指定配置项
如果缺省的配置不适合您,可在创建 Vertx
对象的同时指定配置项:
Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(40));
VertxOptions
对象有很多配置,包括集群、高可用、池大小等。在Javadoc中描述了所有配置的细节。
创建集群模式的 Vert.x 对象
如果您想创建一个 集群模式的 Vert.x 对象(参考 event bus 章节了解更多事件总线集群细节),
那么通常情况下您将需要使用另一种异步的方式来创建 Vertx
对象。
这是因为让不同的 Vert.x 实例组成一个集群需要一些时间(也许是几秒钟)。 在这段时间内,我们不想去阻塞调用线程,所以我们将结果异步返回给您。
译者注:这里给个示例:
// 注意要添加对应的集群管理器依赖,详情见集群管理器章节
VertxOptions options = new VertxOptions();
Vertx.clusteredVertx(options, res -> {
if (res.succeeded()) {
Vertx vertx = res.result(); // 获取到了集群模式下的 Vertx 对象
// 做一些其他的事情
} else {
// 获取失败,可能是集群管理器出现了问题
}
});
是流式的吗?
您也许注意到前边的例子里使用了一个 流式(Fluent) 的API。
一个流式的API表示将多个方法的调用链在一起。例如:
request.response().putHeader("Content-Type", "text/plain").end("some text");
这是贯穿 Vert.x API 中的一个通用模式,所以请适应这种代码风格。
流式调用可以让代码更为简洁。 当然,Vert.x并 不强制 您用这种方式书写代码,如果您更倾向于用以下非流式编码, 您可以忽略它:
HttpServerResponse response = request.response();
response.putHeader("Content-Type", "text/plain");
response.write("some text");
response.end();
Don’t call us, we’ll call you.
Vert.x 的 API 大部分都是 事件驱动 的。这意味着当您感兴趣的事情发生时, 会以事件的形式发送给您。
以下是一些事件的例子:
-
触发一个计时器
-
Socket 收到了一些数据
-
从磁盘中读取了一些数据
-
发生了一个异常
-
HTTP 服务器收到了一个请求
Vert.x API调用您提供的 处理器 来处理事件。 例如每隔一秒发送一个事件的计时器:
vertx.setPeriodic(1000, id -> {
// 这个处理器将会每隔一秒被调用一次
System.out.println("timer fired!");
});
又比如收到一个 HTTP 请求:
server.requestHandler(request -> {
// 服务器每次收到一个HTTP请求时这个处理器将被调用
request.response().end("hello world!");
});
稍后当Vert.x有事件要传给您的处理器时,它会 异步地 调用这个处理器。
由此,下面会引入Vert.x中一些重要的概念。
不要阻塞我!
Vert.x中的几乎所有API都不会阻塞调用线程,除了个别特例(如以 "Sync" 结尾的某些文件系统操作)。
可以立即提供结果的API会立即返回,否则您需要提供一个处理器(Handler
)
来接收稍后回调的事件。
因为Vert.x API不会阻塞线程, 所以通过Vert.x您可以只使用少量的线程来处理大量的并发。
当使用传统的阻塞式API做以下操作时,调用线程可能会被阻塞:
-
从 Socket 中读取数据
-
写数据到磁盘
-
发送消息给接收者并等待回复
-
其他很多情况
在上述情况下,线程在等待处理结果时它不能做任何事,此时这些线程并无实际用处。
这意味着如果使用阻塞式API处理大量并发, 需要大量线程来防止应用程序停止运转。
而这些线程使用的内存(例如它们的栈)和线程上下文切换开销很可观。
这意味着,阻塞式的方式对于现代应用程序所需要的并发级别来说是难于扩展的。
Reactor 模式和 Multi-Reactor 模式
我们前边提过 Vert.x 的 API 都是事件驱动的,当有事件时 Vert.x 会将事件传给处理器来处理。
在多数情况下,Vert.x使用被称为 Event Loop 的线程来调用您的处理器。
由于Vert.x或应用程序的代码块中没有阻塞,Event Loop 可以在事件到达时快速地分发到不同的处理器中。
由于没有阻塞,Event Loop 可在短时间内分发大量的事件。 例如,一个单独的 Event Loop 可以非常迅速地处理数千个 HTTP 请求。
我们称之为 Reactor 模式(译者注:Reactor Pattern 翻译成了 反应器模式)。
您之前也许听说过它,例如 Node.js 实现了这种模式。
在一个标准的Reactor实现中,有 一个独立的 Event Loop 会循环执行, 处理所有到达的事件并传递给处理器处理。
单一线程的问题在于它在任意时刻只能运行在一个核上, 如果您希望单线程反应器应用(如您的 Node.js 应用)扩展到多核服务器上, 则需要启动并且管理多个不同的进程。
Vert.x的工作方式有所不同。每个 Vertx
实例维护的是 多个Event Loop 线程。
默认情况下,我们会根据机器上可用的核数量来设置 Event Loop 的数量,您亦可自行设置。
这意味着 Vertx 进程能够在您的服务器上扩展,与 Node.js 不同。
我们将这种模式称为 Multi-Reactor 模式(多反应器模式),区别于单线程的 Reactor 模式(反应器模式)。
注意
|
即使一个 Vertx 实例维护了多个 Event Loop,任何一个特定的处理器永远不会被并发执行。
大部分情况下(除了 Worker Verticle 以外)
它们总是在同一个 Event Loop 线程中被调用。
|
黄金法则:不要阻塞Event Loop
尽管Vert.x 的 API 都是非阻塞式的,且不会阻塞 Event Loop, 但是用户编写的处理器中可能会阻塞 Event Loop。
如果这样做,该 Event Loop 在被阻塞时就不能做任何事情;如果您阻塞了 Vertx
实例中的所有Event Loop,那么您的应用就会完全停止!
所以不要这样做!这是一个警告!
这些阻塞做法包括:
-
Thead.sleep()
-
等待一个锁
-
等待一个互斥信号或监视器(例如同步的代码块)
-
执行一个长时间数据库操作并等待其结果
-
执行一个复杂的计算,占用了可感知的时长
-
在循环语句中长时间逗留
如果上述任何一种情况停止了 Event Loop 并占用了 显著执行时间 , 那您应该去面壁(译者注:原文此处为 Naughy Step,英国父母会在家里选择一个角落作为小孩罚站或静坐的地方,被称为 naughty corner 或 naughty step),等待下一步的指示。
所以,什么是 显著执行时间 ?
您要等多久?它取决于您的应用程序和所需的并发数量。
如果您只有单个 Event Loop,而且您希望每秒处理10000个 HTTP 请求, 很明显的是每一个请求处理时间不可以超过0.1毫秒,所以您不能阻塞任何过多(大于0.1毫秒)的时间。
这个数学题并不难,将留给读者作为练习。
如果您的应用程序没有响应,可能这是一个迹象,表明您在某个地方阻塞了Event Loop。 为了帮助您诊断类似问题,若 Vert.x 检测到 Event Loop 有一段时间没有响应,将会自动记录这种警告。 若您在日志中看到类似警告,那么您需要检查您的代码。比如:
Thread vertx-eventloop-thread-3 has been blocked for 20458 ms
Vert.x 还将提供堆栈跟踪,以精确定位发生阻塞的位置。
如果想关闭这些警告或更改设置,您可以在创建 Vertx
对象之前在
VertxOptions
中完成此操作。
Future的异步结果
Vert.x 4使用future承载异步结果。
异步的方法会返回一个 Future
对象,其包含
成功 或 失败 的异步结果。
我们不能直接操作future的异步结果,而应该设置future的handler; 当future执行完毕,结果可用时,会调用handler进行处理。
FileSystem fs = vertx.fileSystem();
Future<FileProps> future = fs.props("/my_file.txt");
future.onComplete((AsyncResult<FileProps> ar) -> {
if (ar.succeeded()) {
FileProps props = ar.result();
System.out.println("File size = " + props.size());
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
注意
|
Vert.x 3的API只提供了回调模式;为了减少从Vert.x 3迁移到Vert.x 4的工作量,Vert.x 4为每个异步方法都保留了回调版本。
如上面样例代码的 props 方法,提供了带回调参数的版本
props
|
Future组合
compose
方法作用于顺序组合 future:
-
若当前future成功,执行
compose
方法指定的方法,该方法返回新的future;当返回的新future完成时,future组合成功; -
若当前future失败,则future组合失败。
FileSystem fs = vertx.fileSystem();
Future<Void> future = fs
.createFile("/foo")
.compose(v -> {
// createFile文件创建完成后执行
return fs.writeFile("/foo", Buffer.buffer());
})
.compose(v -> {
// writeFile文件写入完成后执行
return fs.move("/foo", "/bar");
});
这里例子中,有三个操作被串起来了:
-
一个文件被创建(
createFile
) -
一些东西被写入到文件(
writeFile
) -
文件被移走(
move
)
如果这三个步骤全部成功,则最终的 Future
(future
)会是成功的;
其中任何一步失败,则最终 Future
就是失败的。
除了上述方法, Future
还提供了更多方法:map
,recover
,otherwise
,以及 flatMap
(等同 compose
方法)。
Future协作
Vert.x 中的 futures
支持协调多个Future,
支持并发组合(并行执行多个异步调用)和顺序组合
(依次执行异步调用)。
CompositeFuture.all
方法接受多个 Future
对象作为参数(最多6个,或者传入 List
)。
当所有的 Future
都成功完成,该方法将返回一个 成功的 Future
;当任一个 Future
执行失败,则返回一个 失败的 Future
:
Future<HttpServer> httpServerFuture = httpServer.listen();
Future<NetServer> netServerFuture = netServer.listen();
CompositeFuture.all(httpServerFuture, netServerFuture).onComplete(ar -> {
if (ar.succeeded()) {
// 所有服务器启动完成
} else {
// 有一个服务器启动失败
}
});
所有被合并的 Future
中的操作同时运行。当组合的处理操作完成时,该方法返回的 Future
上绑定的处理器(Handler
)会被调用。
只要有一个操作失败(其中的某一个 Future
的状态被标记成失败),
则返回的 Future
会被标记为失败。如果所有的操作都成功,
则返回的 Future
将会成功完成。
您可以传入一个 Future
列表(可能为空):
CompositeFuture.all(Arrays.asList(future1, future2, future3));
all
方法的合并会 等待 所有的 Future 成功执行(或任一失败),
而 any
方法的合并会 等待 第一个成功执行的Future。CompositeFuture.any
方法接受多个 Future
作为参数(最多6个,或传入 List
)。当任意一个 Future
成功得到结果,则该 Future
成功;
当所有的 Future
都执行失败,则该 Future
失败。
CompositeFuture.any(future1, future2).onComplete(ar -> {
if (ar.succeeded()) {
// 至少一个成功
} else {
// 所有的都失败
}
});
它也可使用 Future
列表传参:
CompositeFuture.any(Arrays.asList(f1, f2, f3));
join
方法的合并会 等待 所有的 Future
完成,无论成败。
CompositeFuture.join
方法接受多个 Future
作为参数(最多6个),并将结果归并成一个 Future
。
当全部 Future
成功执行完成,得到的 Future
是成功状态的;当至少一个 Future
执行失败时,
得到的 Future
是失败状态的。
CompositeFuture.join(future1, future2, future3).onComplete(ar -> {
if (ar.succeeded()) {
// 所有都成功
} else {
// 全部完成(无论成功还是失败),且至少一个失败
}
});
它也可使用 Future
列表传参:
CompositeFuture.join(Arrays.asList(future1, future2, future3));
兼容CompletionStage
JDK的 CompletionStage
接口用于组合异步操作,
Vert.x的 Future
API可兼容 CompletionStage
。
我们可以用 toCompletionStage
方法将Vert.x的 Future
对象转为 CompletionStage
对象,如:
Future<String> future = vertx.createDnsClient().lookup("vertx.io");
future.toCompletionStage().whenComplete((ip, err) -> {
if (err != null) {
System.err.println("Could not resolve vertx.io");
err.printStackTrace();
} else {
System.out.println("vertx.io => " + ip);
}
});
相应地,可使用 Future.fromCompletionStage
方法将 CompletionStage
对象转为Vert.x的 Future
对象。
Future.fromCompletionStage
有两个重载方法:
-
第一个重载方法只接收一个
CompletionStage
参数,会在执行CompletionStage
实例的线程中调用Future
的方法; -
第二个重载方法额外多接收一个
Context
参数,会在Vert.x的Context中调用Future
的方法。
重要
|
由于Vert.x的 Future 通常会与Vert.x的代码、库以及客户端等一起使用,为了与Vert.x的线程模型更好地配合,
大部分场景下应使用 Future.fromCompletionStage(CompletionStage, Context) 方法。
|
下面的例子展示了如何将 CompletionStage
对象转为Vert.x的 Future
对象,这里选择使用Vert.x的Context执行:
Future.fromCompletionStage(completionStage, vertx.getOrCreateContext())
.flatMap(str -> {
String key = UUID.randomUUID().toString();
return storeInDb(key, str);
})
.onSuccess(str -> {
System.out.println("We have a result: " + str);
})
.onFailure(err -> {
System.err.println("We have a problem");
err.printStackTrace();
});
Verticles
Vert.x 通过开箱即用的方式提供了一个简单便捷的、可扩展的、类似 Actor Model 的部署和并发模型机制。 您可以用此模型机制来保管您自己的代码组件。
这个模型是可选的,Vert.x 并不强制使用这种方式创建应用程序。
这个模型并不是严格的 Actor 模式实现,但它确实有相似之处, 特别是在并发、扩展性和部署等方面。
使用该模型,需要将应用代码编写成多个 Verticle。
Verticle 是由 Vert.x 部署和运行的代码块。默认情况一个 Vert.x 实例维护了N个 Event Loop 线程(默认情况下N = CPU核数 x 2)。Verticle 实例可使用任意 Vert.x 支持的编程语言编写, 而且一个简单的应用程序也可以包含多种语言编写的 Verticle。
您可以将 Verticle 想成 Actor Model 中的 Actor。
一个应用程序通常是由在同一个 Vert.x 实例中同时运行的许多 Verticle 实例组合而成。 不同的 Verticle 实例通过向 Event Bus 上发送消息来相互通信。
编写 Verticle
Verticle 的实现类必须实现 Verticle
接口。
如果您喜欢的话,可以直接实现该接口,但是通常直接从抽象类
AbstractVerticle
继承更简单。
这儿有一个例子:
public class MyVerticle extends AbstractVerticle { // Verticle部署时调用 public void start() { } // 可选 - Verticle撤销时调用 public void stop() { } }
通常您需要像上边例子一样重写 start
方法。
当 Vert.x 部署 Verticle 时,它的 start
方法将被调用,这个方法执行完成后 Verticle
就变成已启动状态。
您同样可以重写 stop
方法,当Vert.x 撤销一个 Verticle 时它会被调用,
这个方法执行完成后 Verticle 就变成已停止状态了。
Verticle 异步启动和停止
有些时候您的 Verticle 启动会耗费一些时间,您想要在这个过程做一些事,
并且您做的这些事并不想等到Verticle部署完成过后再发生。例如,您想在 start
方法中启动一个 HTTP 服务并在 listen
方法中处理一个异步结果。
您不能在您的 start
方法中阻塞等待其他的 Verticle 部署完成,这样做会破坏 黄金法则。
所以您要怎么做?
您可以实现 异步版本 的 start
方法来实现,它接收一个 Promise
参数。
方法执行完时,Verticle 实例并没有部署好(状态不是 deployed)。
当所有您需要做的事(如:启动HTTP服务)完成后,就可以调用 Future
的 complete
(或 fail
)
方法来标记启动完成或失败了。
这儿有一个例子:
public class MyVerticle extends AbstractVerticle { private HttpServer server; public void start(Promise<Void> startPromise) { server = vertx.createHttpServer().requestHandler(req -> { req.response() .putHeader("content-type", "text/plain") .end("Hello from Vert.x!"); }); // Now bind the server: server.listen(8080, res -> { if (res.succeeded()) { startPromise.complete(); } else { startPromise.fail(res.cause()); } }); } }
同样的,这儿也有一个异步版本的 stop
方法,如果您想做一些耗时的 Verticle清理工作,
您可以使用它。
public class MyVerticle extends AbstractVerticle { public void start() { // 做一些事 } public void stop(Promise<Void> stopPromise) { obj.doSomethingThatTakesTime(res -> { if (res.succeeded()) { stopPromise.complete(); } else { stopPromise.fail(); } }); } }
注意
|
在Verticle中启动的HTTP服务,无需在 stop 方法中手动停止;
Vert.x在撤销Verticle时会自动停止运行中的服务。
|
Verticle 种类
这儿有两种 Verticle:
- Standard Verticles
-
这是最常用的一类 Verticle —— 它们永远运行在 Event Loop 线程上。 更多讨论详见稍后的章节。
- Worker Verticles
-
这类 Verticle 会运行在 Worker Pool 中的线程上。 一个实例绝对不会被多个线程同时执行。
Standard verticles
当 Standard Verticle 被创建时,它会被分派给一个 Event Loop 线程,并在这个 Event Loop 中执行它的 start
方法。
当您在一个 Event Loop 上调用了 Core API 中的方法并传入了处理器时,Vert.x
将保证用与调用该方法时相同的 Event Loop 来执行这些处理器。
这意味着我们可以保证您的 Verticle 实例中 所有的代码都是在相同Event Loop中执行 (只要您不创建自己的线程来调用它!)
同样意味着您可以将您的应用中的所有代码用单线程方式编写,让 Vert.x 去考虑线程和扩展问题。您不用再考虑 synchronized 和 volatile 的问题, 也可以避免传统的多线程应用经常会遇到的竞态条件和死锁的问题。
Worker verticles
Worker Verticle 和 Standard Verticle 很像,但它并不是由一个 Event Loop 来执行, 而是由Vert.x中的 Worker Pool 中的线程执行。
Worker Verticle 设计用于调用阻塞式代码,它不会阻塞任何 Event Loop。
如果您不想使用 Worker Verticle 来运行阻塞式代码, 您还可以在一个Event Loop中直接使用 内联阻塞式代码
您需要通过 setWorker
方法来将 Verticle 部署成一个 Worker Verticle:
DeploymentOptions options = new DeploymentOptions().setWorker(true);
vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options);
在 Vert.x 中,Worker Verticle 实例绝对不会同时被多个线程执行, 但它可以在不同时间由不同线程执行。
编程方式部署Verticle
部署Verticle可以使用任意一个 deployVerticle
方法,
并传入一个 Verticle名称或Verticle 实例。
注意
|
通过 Verticle 实例 来部署 Verticle 仅限Java语言。 |
Verticle myVerticle = new MyVerticle();
vertx.deployVerticle(myVerticle);
您同样可以指定 Verticle 的 名称 来部署它。
这个 Verticle 的名称会用于查找实例化 Verticle的特定
VerticleFactory
。
不同的 Verticle Factory 可用于实例化不同语言的 Verticle,也可用于其他目的, 例如加载服务、运行时从Maven中获取Verticle实例等。
因此可以部署任何使用Vert.x支持的语言编写的Verticle。
下面的例子展示了如何部署多个不同语言编写的 Verticle :
vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle");
// 部署JavaScript的Verticle
vertx.deployVerticle("verticles/myverticle.js");
// 部署Ruby的Verticle
vertx.deployVerticle("verticles/my_verticle.rb");
Verticle名称到Factory的映射规则
使用名称部署Verticle时,会通过名称来选择一个用于实例化 Verticle 的 Verticle Factory。
Verticle 名称可以增加一个以冒号结尾的前缀,这个前缀用于查找Factory,如:
js:foo.js // 使用JavaScript的Factory groovy:com.mycompany.SomeGroovyCompiledVerticle // 用Groovy的Factory service:com.mycompany:myorderservice // 用Service的Factory
如果不指定前缀,Vert.x将根据Verticle名称的后缀来查找对应Factory,如:
foo.js // 将使用JavaScript的Factory SomeScript.groovy // 将使用Groovy的Factory
若前缀后缀都没指定,Vert.x将假定Verticle名称是一个Java 全限定类名(FQCN), 并尝试实例化它。
如何定位Verticle Factory?
大部分Verticle Factory会从 classpath 中加载,并在 Vert.x 启动时注册。
您同样可以使用编程的方式去注册或注销Verticle Factory:通过 registerVerticleFactory
方法和
unregisterVerticleFactory
方法。
等待部署完成
Verticle是异步部署的,换而言之,可能在 deploy
方法调用返回后一段时间才会完成部署。
如果您想要在部署完成时收到通知,则可以指定一个完成处理器:
vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", res -> {
if (res.succeeded()) {
System.out.println("Deployment id is: " + res.result());
} else {
System.out.println("Deployment failed!");
}
});
如果部署成功,这个完成处理器的结果中将会包含部署ID的字符串。
这个部署ID可以用于撤销部署。
撤销Verticle
我们可以通过 undeploy
方法来撤销部署好的 Verticle。
撤销操作也是异步的,因此若您想要在撤销完成后收到通知,则可以指定另一个完成处理器:
vertx.undeploy(deploymentID, res -> {
if (res.succeeded()) {
System.out.println("Undeployed ok");
} else {
System.out.println("Undeploy failed!");
}
});
设置 Verticle 实例数量
使用名称部署 Verticle 时,可以指定需要部署的 Verticle 实例的数量。
DeploymentOptions options = new DeploymentOptions().setInstances(16);
vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options);
这个功能对于跨多核扩展时很有用。例如,您有一个带Web服务的Verticle需要部署在多核的机器上, 您可以部署多个实例来利用所有的核。
向 Verticle 传入配置
可在部署时传给 Verticle 一个 JSON 格式的配置
JsonObject config = new JsonObject().put("name", "tim").put("directory", "/blah");
DeploymentOptions options = new DeploymentOptions().setConfig(config);
vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options);
System.out.println("Configuration: " + config().getString("name"));
高可用性
Verticle可以启用高可用方式(HA)部署。在这种方式下,当其中一个部署在 Vert.x 实例中的 Verticle 突然挂掉,这个 Verticle 可以在集群环境中的另一个 Vert.x 实例中重新部署。
若要启用高可用方式运行一个 Verticle,仅需要追加 -ha
参数:
vertx run my-verticle.js -ha
当启用高可用方式时,不需要追加 -cluster
参数。
关于高可用的功能和配置的更多细节可参考 高可用和故障转移
从命令行运行Verticle
您可以在 Maven 或 Gradle 项目中以正常方式添加 Vert.x Core 为依赖,在项目中直接使用 Vert.x。
您也可以从命令行直接运行 Vert.x 的 Verticle。
为此,您需要下载并安装 Vert.x 的发行版,并且将安装的 bin
目录添加到您的
PATH
环境变量中,并确保您的 PATH
中设置了Java 8的JDK环境。
注意
|
在 PATH 设置JDK是为了支持Java代码的运行时编译(on the fly compilation)。
|
现在您可以使用 vertx run
命令运行Verticle了,下面是一些例子:
# 运行JavaScript的Verticle vertx run my_verticle.js # 运行Ruby的Verticle vertx run a_n_other_verticle.rb # 使用集群模式运行Groovy的Verticle vertx run FooVerticle.groovy -cluster
您甚至可以不必编译 Java 源代码,直接运行它:
vertx run SomeJavaSourceFile.java
Vert.x 在运行Java 源代码文件之前将执行运行时编译, 这对于快速原型制作和演示很有用,而且不需要配置 Maven 或 Gradle 就能跑起来!
欲了解有关在命令行执行 vertx
可用的各种选项完整信息,
可以直接在命令行键入 vertx
查看帮助。
退出 Vert.x 环境
Vert.x 实例维护的线程不是守护线程,因此它们会阻止JVM退出。
如果您通过嵌入式的方式使用 Vert.x 并且完成了操作,您可以调用 close
方法关闭它。
这将关闭所有内部线程池并关闭其他资源,允许JVM退出。
Context 对象
当 Vert.x 传递一个事件给处理器或者调用 Verticle
的 start 或 stop 方法时,
它会关联一个 Context
对象来执行。通常来说这个context会是一个
event-loop context,它绑定到了一个特定的 Event Loop 线程上。所以在该context上执行的操作总是
在同一个 Event Loop 线程中。对于运行内联的阻塞代码的 Worker Verticle 来说,会关联一个
Worker Context,并且所有的操作运都会运行在 Worker 线程池的线程上。(译者注:每个 Verticle
在部署的时候都会被分配一个 Context
(根据配置不同,可以是Event Loop Context 或者 Worker Context),之后此 Verticle
上所有的普通代码都会在此 Context
上执行(即对应的 Event Loop 或Worker 线程)。一个 Context
对应一个 Event Loop 线程(或 Worker 线程),但一个 Event Loop 可能对应多个 Context
。)
您可以通过 getOrCreateContext
方法获取 Context
实例:
Context context = vertx.getOrCreateContext();
若已经有一个context和当前线程关联,那么它直接重用这个context对象, 如果没有则创建一个新的。您可以检查获取的context的 类型 :
Context context = vertx.getOrCreateContext();
if (context.isEventLoopContext()) {
System.out.println("Context attached to Event Loop");
} else if (context.isWorkerContext()) {
System.out.println("Context attached to Worker Thread");
} else if (! Context.isOnVertxThread()) {
System.out.println("Context not attached to a thread managed by vert.x");
}
当您获取了这个context对象,您就可以在context中异步执行代码了。换句话说, 您提交的任务将会在同一个context中运行:
vertx.getOrCreateContext().runOnContext( (v) -> {
System.out.println("This will be executed asynchronously in the same context");
});
当在同一个context中运行了多个处理函数时,可能需要在它们之间共享数据。
context对象提供了存储和读取共享数据的方法。举例来说,它允许您将数据传递到
runOnContext
方法运行的某些操作中:
final Context context = vertx.getOrCreateContext();
context.put("data", "hello");
context.runOnContext((v) -> {
String hello = context.get("data");
});
您还可以通过 config
方法访问 Verticle 的配置信息。查看 向 Verticle 传入配置 章节了解更多配置信息。
执行周期性/延迟性操作
在 Vert.x 中,延迟执行或定期执行操作很常见。
在 Standard Verticle 中您不能直接让线程休眠以引入延迟,因为它会阻塞 Event Loop 线程。
取而代之是使用 Vert.x 定时器。定时器可以是 一次性 或 周期性 的,两者我们都会讨论到。
一次性计时器
一次性计时器会在一定延迟后调用一个 Event Handler,以毫秒为单位计时。
您可以通过 setTimer
方法传递延迟时间和一个处理器来设置计时器的触发。
long timerID = vertx.setTimer(1000, id -> {
System.out.println("And one second later this is printed");
});
System.out.println("First this is printed");
返回值是一个唯一的计时器id,该id可用于之后取消该计时器,这个计时器id会传入给处理器。
周期性计时器
您同样可以使用 setPeriodic
方法设置一个周期性触发的计时器。
第一次触发之前同样会有一段设置的延时时间。
setPeriodic
方法的返回值也是一个唯一的计时器id,若之后该计时器需要取消则使用该id。
传给处理器的参数也是这个唯一的计时器id。
请记住这个计时器将会定期触发。如果您的定时任务会花费大量的时间,则您的计时器事件可能会连续执行, 甚至发生更坏的情况:重叠。
这种情况,您应考虑使用 setTimer
方法,
当任务执行完成时设置下一个计时器。
long timerID = vertx.setPeriodic(1000, id -> {
System.out.println("And every second this is printed");
});
System.out.println("First this is printed");
Verticle worker pool
Verticle 使用 Vert.x 中的 Worker Pool 来执行阻塞式行为,例如 executeBlocking
或
Worker Verticle。
可以在部署配置项中指定不同的 Worker 线程池:
vertx.deployVerticle("the-verticle", new DeploymentOptions().setWorkerPoolName("the-specific-pool"));
Event Bus
event bus
是 Vert.x 的 神经系统 。
每一个 Vert.x 实例都有一个单独的 Event Bus 实例。您可以通过 Vertx
实例的 eventBus
方法来获得对应的 EventBus
实例。
应用中的不同组成部分可以通过 Event Bus 相互通信,您无需关心它们由哪一种语言实现, 也无需关心它们是否在同一个 Vert.x 实例中。
您甚至可以通过桥接的方式让浏览器中运行的多个JavaScript客户端在同一个 Event Bus 上相互通信。
Event Bus构建了一个跨越多个服务器节点和多个浏览器的分布式点对点消息系统。
Event Bus支持发布/订阅、点对点、请求-响应的消息传递方式。
Event Bus的API很简单。基本上只涉及注册处理器、 注销处理器以及发送和发布(publish)消息。
先来看一些基本概念和理论。
基本概念
寻址
消息的发送目标被称作 地址(address) 。
Vert.x的地址格式并不花哨。Vert.x中的地址就是一个简单的字符串,任何字符串都合法。
不过还是建议使用某种规范来进行地址的命名。 例如 使用点号(.
)来划分命名空间。
一些合法的地址形如:europe.news.feed1、acme.games.pacman、sausages 以及 X 。
点对点消息传递 与 请求-响应消息传递
Event Bus也支持 点对点 消息模式。
消息将被发送到一个地址上,Vert.x仅会把消息发给注册在该地址上的处理器中的其中一个。
若这个地址上注册有不止一个处理器,那么Vert.x将使用 不严格的轮询算法 选择其中一个。
点对点消息传递模式下,可在消息发送的时候指定一个应答处理器(可选)。
当接收者收到消息并且处理完成时,它可以选择性地回复该消息。 若回复,则关联的应答处理器将会被调用。
当发送者收到应答消息时,发送者还可以继续回复这个“应答”,这个过程可以 不断 重复。 通过这种方式可以在两个不同的 Verticle 之间建立一个对话窗口。
这也是一个常见的消息传递模式:请求-响应 模式。
Event Bus API
下面我们来看一下 API。
获取Event Bus
您可以通过下面的代码获取 Event Bus 的引用:
EventBus eb = vertx.eventBus();
每一个 Vertx.x 实例仅有一个 Event Bus 实例。
注册处理器
最简单的注册处理器的方式是使用 consumer
方法,
这儿有个例子:
EventBus eb = vertx.eventBus();
eb.consumer("news.uk.sport", message -> {
System.out.println("I have received a message: " + message.body());
});
当消息达到您的处理器时,该消息会被放入 message
参数进行处理器的调用。
调用 consumer() 方法会返回一个 MessageConsumer
对象。
该对象后续可用于注销处理器,或者流式地处理该对象。
您也可以使用 consumer
方法直接返回一个不带处理器的 MessageConsumer,
之后再在这个返回的对象上设置处理器。如:
EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.consumer("news.uk.sport");
consumer.handler(message -> {
System.out.println("I have received a message: " + message.body());
});
在向集群模式下的 Event Bus 注册处理器时, 注册信息会花费一些时间才能传播到集群中的所有节点。
若您希望在完成注册后收到通知,您可以在 MessageConsumer
对象上注册
一个 completion handler
。
consumer.completionHandler(res -> {
if (res.succeeded()) {
System.out.println("The handler registration has reached all nodes");
} else {
System.out.println("Registration failed!");
}
});
注销处理器
您可以通过 unregister
方法来注销处理器。
若您在使用集群模式的 Event Bus,注销处理器的动作会花费一些时间在节点中传播。若您想
在完成后收到通知,可以使用 unregister
方法注册回调:
consumer.unregister(res -> {
if (res.succeeded()) {
System.out.println("The handler un-registration has reached all nodes");
} else {
System.out.println("Un-registration failed!");
}
});
发布消息
发布消息很简单,只需使用 publish
方法指定一个地址去发布即可。
eventBus.publish("news.uk.sport", "Yay! Someone kicked a ball");
这个消息将会传递给所有在地址 news.uk.sport 上注册过的处理器。
发送消息
在对应地址上注册过的所有处理器中,仅一个处理器能够接收到发送的消息。 这是一种点对点消息传递模式。Vert.x 使用不严格的轮询算法来选择绑定的处理器。
您可以使用 send
方法来发送消息:
eventBus.send("news.uk.sport", "Yay! Someone kicked a ball");
设置消息头
在 Event Bus 上发送的消息可包含头信息。您可以在发送或发布(publish)时提供一个
DeliveryOptions
来指定头信息。例如:
DeliveryOptions options = new DeliveryOptions();
options.addHeader("some-header", "some-value");
eventBus.send("news.uk.sport", "Yay! Someone kicked a ball", options);
应答消息/发送回复
当使用 send
方法发送消息时,
Event Bus会尝试将消息传递到注册在Event Bus上的 MessageConsumer
中。
某些情况下,发送者可以通过 请求/响应+ 模式来得知消费者已经收到并"处理"了该消息。
消费者可以通过调用
reply
方法来应答这个消息,确认该消息已被处理。
此时,它会将一个应答消息返回给发送者并调用发送者的应答处理器。
看这个例子会更清楚:
接收者:
MessageConsumer<String> consumer = eventBus.consumer("news.uk.sport");
consumer.handler(message -> {
System.out.println("I have received a message: " + message.body());
message.reply("how interesting!");
});
发送者:
eventBus.request("news.uk.sport", "Yay! Someone kicked a ball across a patch of grass", ar -> {
if (ar.succeeded()) {
System.out.println("Received reply: " + ar.result().body());
}
});
在应答的消息体中可以包含一些有用的信息。
“处理中”的实际含义应当由应用程序来定义。 这完全取决于消费者如何执行,Event Bus 对此并不关心。
一些例子:
-
一个简单地实现了返回当天时间的服务, 在应答的消息里会包含当天时间信息。
-
一个实现了持久化队列的消息消费者,可以回复
true
来表示消息已成功持久化到存储设备中,或回复false
表示失败。 -
一个处理订单的消息消费者可以使用
true
确认这个订单已经成功处理, 并且可以从数据库中删除。
带超时的发送
当发送带有应答处理器的消息时,可以在 DeliveryOptions
中指定一个超时时间。
如果在这个时间之内没有收到应答,则会以“失败的结果”为参数调用应答处理器。
默认超时是 30 秒。
消息编解码器
您可以在 Event Bus 中发送任何对象,只需为这个对象类型注册一个编解码器 message codec
即可。
每个消息编解码器都有一个名称,您需要在发送或发布消息时通过 DeliveryOptions
来指定:
eventBus.registerCodec(myCodec);
DeliveryOptions options = new DeliveryOptions().setCodecName(myCodec.name());
eventBus.send("orders", new MyPOJO(), options);
若您希望某个类总是使用特定的编解码器,那么您可以为这个类注册默认编解码器。 这样您就不需要在每次发送的时候指定了:
eventBus.registerDefaultCodec(MyPOJO.class, myCodec);
eventBus.send("orders", new MyPOJO());
您可以通过 unregisterCodec
方法注销某个消息编解码器。
消息编解码器的编码输入和解码输出不一定使用同一个类型。 例如您可以编写一个编解码器来发送 MyPOJO 类的对象,但是当消息发送给处理器后解码成 MyOtherPOJO 对象。
集群模式的 Event Bus
Event Bus 不仅仅只存在于单个 Vert.x 实例中。将网络上不同的 Vert.x 实例组合成集群, 就可以在这些实例间形成一个单一的、分布式的Event Bus。
通过编写代码启用集群模式
若您用编程的方式创建 Vert.x 实例(Vertx),则可以通过将 Vert.x 实例配置成集群模式来获取集群模式的Event Bus:
VertxOptions options = new VertxOptions();
Vertx.clusteredVertx(options, res -> {
if (res.succeeded()) {
Vertx vertx = res.result();
EventBus eventBus = vertx.eventBus();
System.out.println("We now have a clustered event bus: " + eventBus);
} else {
System.out.println("Failed: " + res.cause());
}
});
您需要确保在您的 classpath 中(或构建工具的依赖中)包含 ClusterManager
的实现类,如默认的 HazelcastClusterManager
。
配置 Event Bus
Event Bus 是可配置的,这对于以集群模式运行的 Event Bus 来说非常有用。
Event Bus 使用 TCP 连接发送和接收消息,因此可以通过 EventBusOptions
对TCP连接进行全面的配置。
由于 Event Bus 既可以用作客户端又可以用作服务端,因此这些配置近似于 NetClientOptions
和 NetServerOptions
。
VertxOptions options = new VertxOptions()
.setEventBusOptions(new EventBusOptions()
.setSsl(true)
.setKeyStoreOptions(new JksOptions().setPath("keystore.jks").setPassword("wibble"))
.setTrustStoreOptions(new JksOptions().setPath("keystore.jks").setPassword("wibble"))
.setClientAuth(ClientAuth.REQUIRED)
);
Vertx.clusteredVertx(options, res -> {
if (res.succeeded()) {
Vertx vertx = res.result();
EventBus eventBus = vertx.eventBus();
System.out.println("We now have a clustered event bus: " + eventBus);
} else {
System.out.println("Failed: " + res.cause());
}
});
上边代码段描述了如何在Event Bus中使用SSL连接替换明文的TCP连接。
警告 :若要在集群模式下保证安全性,您 必须 将集群管理器配置成加密的或者加强安全规则。 参考集群管理器的文档获取更多细节。
Event Bus 的配置需要在集群的所有节点中保持一致。
EventBusOptions
还允许您指定 Event Bus 是否运行在集群模式下,以及它的端口和主机信息(译者注:host,这里指网络socket绑定的地址)。
在容器中使用时,您还可以配置公共主机和端口号:(译者注:setClusterPublicHost 和 setClusterPublicPort 的功能在原文档上描述得不清晰,但是API文档上有详细描述。 在某些容器、云环境等场景下,本节点监听的地址,和其他节点连接本节点时使用的地址,是不同的。这种情况下则可以利用上面两个配置区分监听地址和公开暴露的地址。 )
VertxOptions options = new VertxOptions()
.setEventBusOptions(new EventBusOptions()
.setClusterPublicHost("whatever")
.setClusterPublicPort(1234)
);
Vertx.clusteredVertx(options, res -> {
if (res.succeeded()) {
Vertx vertx = res.result();
EventBus eventBus = vertx.eventBus();
System.out.println("We now have a clustered event bus: " + eventBus);
} else {
System.out.println("Failed: " + res.cause());
}
});
JSON
和其他一些语言不同,Java 没有对 JSON 做原生支持(first class support), 因此我们提供了两个类,以便在 Vert.x 应用中更方便地处理 JSON。
JSON objects
JsonObject
类用来描述JSON对象。
一个JSON 对象基本上只是一个 Map 结构。它具有字符串的键,值可以是任意一种JSON 支持的类型 (如 string, number, boolean)。
JSON 对象也支持 null 值。
创建 JSON 对象
可以使用默认构造函数创建空的JSON对象。
您可以通过一个 JSON 格式的字符串创建JSON对象:
String jsonString = "{\"foo\":\"bar\"}";
JsonObject object = new JsonObject(jsonString);
您可以根据Map创建JSON对象:
Map<String, Object> map = new HashMap<>();
map.put("foo", "bar");
map.put("xyz", 3);
JsonObject object = new JsonObject(map);
将键值对放入 JSON 对象
使用 put
方法可以将值放入到JSON对象里。
这个API是流式的,因此这个方法可以被链式地调用。
JsonObject object = new JsonObject();
object.put("foo", "bar").put("num", 123).put("mybool", true);
从 JSON 对象获取值
您可使用 getXXX
方法从JSON对象中获取值。例如:
String val = jsonObject.getString("some-key");
int intVal = jsonObject.getInteger("some-other-key");
JSON 对象和 Java 对象间的映射
您可以根据 Java 对象的字段创建一个JSON 对象,如下所示:
你可以根据一个 JSON 对象来实例化一个Java 对象并填充字段值。如下所示:
request.bodyHandler(buff -> {
JsonObject jsonObject = buff.toJsonObject();
User javaObject = jsonObject.mapTo(User.class);
});
请注意上述代码直接使用了 Jackson 的 ObjectMapper#convertValue()
来执行映射。
关于字段和构造函数的可见性的影响、对象引用的序列化和反序列化等等的问题,
可参考 Jackson 的文档获取更多信息。
在最简单的情况下,如果 Java 类中所有的字段都是
public
(或者有 public
的 getter/setter)时,并且有一个 public
的默认构造函数(或不定义构造函数),mapFrom
和 mapTo
都应该成功。
只要不存在对象的循环引用,嵌套的 Java 对象就可以和嵌套的 JSON 对象相互序列化/反序列化。
将 JSON 对象编码成字符串
您可使用 encode
方法将一个对象编码成字符串格式。(译者注:如要得到更优美、格式化的字符串,可以使用 encodePrettily
方法。)
JSON 数组
JsonArray
类用来描述 JSON数组。
一个JSON 数组是一个值的序列(值的类型可以是 string、number、boolean 等)。
JSON 数组同样可以包含 null
值。
创建 JSON 数组
可以使用默认构造函数创建空的JSON数组。
您可以根据JSON格式的字符串创建一个JSON数组:
String jsonString = "[\"foo\",\"bar\"]";
JsonArray array = new JsonArray(jsonString);
将数组项添加到JSON数组
您可以使用 add
方法添加数组项到JSON数组中:
JsonArray array = new JsonArray();
array.add("foo").add(123).add(false);
从 JSON 数组中获取值
您可使用 getXXX
方法从JSON 数组中获取值。例如:
String val = array.getString(0);
Integer intVal = array.getInteger(1);
Boolean boolVal = array.getBoolean(2);
将 JSON 数组编码成字符串
您可以使用 encode
编码成字符串格式。
创建任意类型的 JSON
创建 JSON 对象或数组的前提是,你需要事先已知其输入是合法的字符串。(译者注:这里说的“合法”指的是,你在使用 JsonObject
时,需要事先知道构造函数输入的字符串是一个json object {…}
,同理,使用 JsonArray
时,字符串需要是一个json array […]
,否则即使输入了一个规范的Json字符串,也没有办法成功解析。)
当你不确定字符串是否合法时,你应当转而使用 Json.decodeValue
方法。
Object object = Json.decodeValue(arbitraryJson);
if (object instanceof JsonObject) {
// That's a valid json object
} else if (object instanceof JsonArray) {
// That's a valid json array
} else if (object instanceof String) {
// That's valid string
} else {
// etc...
}
Json 指针(Json Pointers)
Vert.x 提供了一个 Json指针 RFC6901 的实现。
无论是查询还是写入,你都可以使用Json指针来完成。你可以基于字符串、URI,或者通过手动追加路径(path)的方式,
来构建 JsonPointer
对象:
JsonPointer pointer1 = JsonPointer.from("/hello/world");
// Build a pointer manually
JsonPointer pointer2 = JsonPointer.create()
.append("hello")
.append("world");
Object result1 = objectPointer.queryJson(jsonObject);
// Query a JsonArray
Object result2 = arrayPointer.queryJson(jsonArray);
// Write starting from a JsonObject
objectPointer.writeJson(jsonObject, "new element");
// Write starting from a JsonObject
arrayPointer.writeJson(jsonArray, "new element");
你可以将Vert.x的Json指针功能应用在任何类型的对象上,只需实现一个自定义的 JsonPointerIterator
即可。
Buffers
在 Vert.x 内部,大部分数据被重新组织(shuffle,表意为洗牌)成 Buffer
格式。
Buffer
是一个可以被读取或写入的,包含0个或多个字节的序列,并且能够根据写入的字节自动扩容。
您也可以将 Buffer
想象成一个智能的字节数组。
创建 Buffer
可以使用静态方法 Buffer.buffer
来创建 Buffer。
Buffer可以从字符串或字节数组初始化,或者直接创建空的Buffer。
这儿有一些创建Buffer的例子。
创建一个空的Buffer:
Buffer buff = Buffer.buffer();
从字符串创建一个Buffer,这个Buffer中的字符会以 UTF-8 格式编码:
Buffer buff = Buffer.buffer("some string");
从字符串创建一个Buffer,这个字符串会以指定的编码方式编码,例如:
Buffer buff = Buffer.buffer("some string", "UTF-16");
从字节数组 byte[]
创建Buffer:
byte[] bytes = new byte[] {1, 3, 5};
Buffer buff = Buffer.buffer(bytes);
创建一个指定初始大小的Buffer。若您知道您的 Buffer会写入一定量的数据, 您可以在创建Buffer时指定它的大小,使这个Buffer在初始化时就分配了更多的内存, 比数据写入时重新调整大小效率更高。
注意以这种方式创建的Buffer是 空的。它不会创建一个填满了0的Buffer。代码如下:(译者注:这里说的“空的”、“不会填满0”,指的是buffer内部的游标会从头开始,并不是在说内存布局。这种实现方式和使用直觉是一致的,只不过明确通过文档进行描述有点奇怪。)
Buffer buff = Buffer.buffer(10000);
向Buffer写入数据
向Buffer写入数据的方式有两种:追加和随机访问。
任何一种情况下 Buffer都会自动进行扩容,
所以你不会在使用Buffer时遇到 IndexOutOfBoundsException
。
从Buffer中读取
可使用 getXXX
方法从 Buffer 中读取数据,getXXX
为各种不同数据类型提供了对应的方法,
这些方法的第一个参数是Buffer中待获取的数据的索引位置。
Buffer buff = Buffer.buffer();
for (int i = 0; i < buff.length(); i += 4) {
System.out.println("int value at " + i + " is " + buff.getInt(i));
}
使用无符号数
可使用 getUnsignedXXX
、
appendUnsignedXXX
和 setUnsignedXXX
方法将无符号数从Buffer中读取或追加/设置到Buffer里。
这对于实现一个致力于优化带宽占用的网络协议的编解码器是非常有用的。
下边例子中,值 200 被设置到了仅占用一个字节的特定位置:
Buffer buff = Buffer.buffer(128);
int pos = 15;
buff.setUnsignedByte(pos, (short) 200);
System.out.println(buff.getUnsignedByte(pos));
控制台中显示 "200"。
Buffer长度
可使用 length
方法获取Buffer长度,
Buffer的长度值是Buffer中包含的字节的最大索引 + 1。
拷贝Buffer
可使用 copy
方法创建一个Buffer的副本。
裁剪Buffer
裁剪得到的Buffer是完全依赖于原始Buffer的一个新的Buffer,换句话说,它不会对Buffer中的数据做拷贝。
使用 slice
方法裁剪一个Buffer。
编写 TCP 服务端和客户端
Vert.x让您轻松编写非阻塞的TCP客户端和服务器。
配置 TCP 服务端
若您不想使用默认配置,可以在创建时通过传入一个 NetServerOptions
实例来配置服务器:
NetServerOptions options = new NetServerOptions().setPort(4321);
NetServer server = vertx.createNetServer(options);
启动服务端监听
要告诉服务端监听传入的请求,您可以使用其中一个 listen
方法。
让服务器监听配置项指定的主机和端口:
NetServer server = vertx.createNetServer();
server.listen();
或在调用 listen
方法时指定主机和端口号,忽略配置项中的配置:
NetServer server = vertx.createNetServer();
server.listen(1234, "localhost");
默认主机名是 0.0.0.0
,它表示:监听所有可用地址。默认端口号是 0
,
这也是一个特殊值,它告诉服务器随机选择并监听一个本地没有被占用的端口。
实际的绑定也是异步的,因此服务器在调用了 listen
方法的一段时间 之后
才会实际开始监听。
若您希望在服务器实际监听时收到通知,您可以在调用 listen
方法时提供一个处理器。
例如:
NetServer server = vertx.createNetServer();
server.listen(1234, "localhost", res -> {
if (res.succeeded()) {
System.out.println("Server is now listening!");
} else {
System.out.println("Failed to bind!");
}
});
监听随机端口
若设置监听端口为 0
,服务器将随机寻找一个没有使用的端口来监听。
可以调用 actualPort
方法来获得服务器实际监听的端口:
NetServer server = vertx.createNetServer();
server.listen(0, "localhost", res -> {
if (res.succeeded()) {
System.out.println("Server is now listening on actual port: " + server.actualPort());
} else {
System.out.println("Failed to bind!");
}
});
接收传入连接的通知
若您想要在连接创建完时收到通知,则需要设置一个 connectHandler
:
NetServer server = vertx.createNetServer();
server.connectHandler(socket -> {
// Handle the connection in here
});
当连接成功时,您可以在回调函数中处理得到的 NetSocket
实例。
这是一个代表了实际连接的套接字接口,它允许您读取和写入数据、以及执行各种其他操作, 如关闭 Socket。
从Socket读取数据
您可以在Socket上调用 handler
方法来设置用于读取数据的处理器。
每次 Socket 接收到数据时,会以 Buffer
对象为参数调用处理器。
NetServer server = vertx.createNetServer();
server.connectHandler(socket -> {
socket.handler(buffer -> {
System.out.println("I received some bytes: " + buffer.length());
});
});
向Socket中写入数据
您可使用 write
方法写入数据到Socket:
Buffer buffer = Buffer.buffer().appendFloat(12.34f).appendInt(123);
socket.write(buffer);
// 以UTF-8的编码方式写入一个字符串
socket.write("some data");
// 以指定的编码方式写入一个字符串
socket.write("some data", "UTF-16");
写入操作是异步的,可能调用 write
方法返回过后一段时间才会执行。
关闭处理器
若您想要在 Socket 关闭时收到通知,可以设置一个 closeHandler
处理器:
socket.closeHandler(v -> {
System.out.println("The socket has been closed");
});
处理异常
您可以设置一个 exceptionHandler
来接收所有socket上发生的异常。
exceptionHandler
所设置的异常处理器还可以接收在
connectHandler
接受到连接对象前发生的所有异常,
比如在TLS握手期间。
Event Bus 写处理器
每个 Socket 会自动在Event Bus中注册一个处理器,当这个处理器中收到任意Buffer时, 它会将数据写入到 Socket。
这意味着您可以通过向这个地址发送Buffer的方式,从不同的 Verticle 甚至是不同的 Vert.x 实例中向指定的 Socket 发送数据。
处理器的地址由 writeHandlerID
方法提供。
发送文件或 Classpath 中的资源
您可以直接通过 sendFile
方法将文件和 classpath 中的资源写入Socket。
这种做法是非常高效的,它可以被操作系统内核直接处理。
请阅读 从 Classpath 访问文件 章节了解类路径的限制或禁用它。
socket.sendFile("myfile.dat");
升级到 SSL/TLS 连接
一个非SSL/TLS连接可以通过 upgradeToSsl
方法升级到SSL/TLS连接。
必须为服务器或客户端配置SSL/TLS才能正常工作。请参阅 chapter on SSL/TLS 章节获取详细信息。
关闭 TCP 服务端
您可以调用 close
方法关闭服务端。
关闭操作将关闭所有打开的连接并释放所有服务端资源。
关闭操作也是异步的,可能直到方法调用返回过后一段时间才会实际关闭。 若您想在实际关闭完成时收到通知,那么您可以传递一个处理器。
当关闭操作完成后,绑定的处理器将被调用:
server.close(res -> {
if (res.succeeded()) {
System.out.println("Server is now closed");
} else {
System.out.println("close failed");
}
});
扩展 - 共享 TCP 服务端
任意一个TCP服务端中的处理器总是在相同的Event-Loop线程上执行。
这意味着如果您在多核的服务器上运行,并且只部署了一个实例, 那么您的服务器上最多只能使用一个核。
为了利用更多的服务器核,您将需要部署更多的服务器实例。
您可以在代码中以编程方式实例化更多(Server的)实例:
for (int i = 0; i < 10; i++) {
NetServer server = vertx.createNetServer();
server.connectHandler(socket -> {
socket.handler(buffer -> {
// Just echo back the data
socket.write(buffer);
});
});
server.listen(1234, "localhost");
}
如果您使用的是 Verticle,您可以通过在命令行上使用 -instances
选项来简单部署更多的服务器实例:
vertx run com.mycompany.MyVerticle -instances 10
或者使用编程方式部署您的 Verticle 时:
DeploymentOptions options = new DeploymentOptions().setInstances(10);
vertx.deployVerticle("com.mycompany.MyVerticle", options);
一旦您这样做,您将发现echo服务器在功能上与之前相同, 但是服务器上的所有核都可以被利用,并且可以处理更多的工作。
在这一点上,您可能会问自己:”如何让多台服务器在同一主机和端口上侦听? 尝试部署一个以上的实例时真的不会遇到端口冲突吗?“
Vert.x施加了一点魔法。
当您在与现有服务器相同的主机和端口上部署另一个服务器实例时, 实际上它并不会尝试创建在同一主机/端口上侦听的新服务器实例。
相反,它内部仅仅维护一个服务器实例。当传入新的连接时, 它以轮询的方式将其分发给任意一个连接处理器处理。
因此,Vert.x TCP 服务端可以水平扩展到多个核,并且每个实例保持单线程环境不变。
配置 TCP 客户端
如果您不想使用默认值,则可以在创建实例时传入 NetClientOptions
给客户端:
NetClientOptions options = new NetClientOptions().setConnectTimeout(10000);
NetClient client = vertx.createNetClient(options);
创建连接
NetClientOptions options = new NetClientOptions().setConnectTimeout(10000);
NetClient client = vertx.createNetClient(options);
client.connect(4321, "localhost", res -> {
if (res.succeeded()) {
System.out.println("Connected!");
NetSocket socket = res.result();
} else {
System.out.println("Failed to connect: " + res.cause().getMessage());
}
});
配置连接重试
可以将客户端配置为在无法连接的情况下自动重试。
这是通过 setReconnectInterval
和
setReconnectAttempts
方法配置的。
注意
|
目前如果连接失效,Vert.x将不尝试重新连接。 重新连接尝试和时间间隔仅适用于创建初始连接。 |
NetClientOptions options = new NetClientOptions().
setReconnectAttempts(10).
setReconnectInterval(500);
NetClient client = vertx.createNetClient(options);
默认情况下,多个连接尝试是被禁用的。
记录网络活动
网络活动可以被记录下来,用于调试:
NetServerOptions options = new NetServerOptions().setLogActivity(true);
NetServer server = vertx.createNetServer(options);
对于客户端:
NetClientOptions options = new NetClientOptions().setLogActivity(true);
NetClient client = vertx.createNetClient(options);
Netty 使用 DEBUG
级别和 io.netty.handler.logging.LoggingHandler
名称来记录网络活动。
使用网络活动记录时,需要注意以下几点:
-
日志的记录是由Netty而不是Vert.x的日志来执行
-
这个功能 不能 用于生产环境
您应该阅读 Netty 日志记录 章节来了解详细信息。
配置服务端和客户端以使用SSL/TLS
TCP 客户端和服务端可以通过配置来使用 [TLS(传输层安全性协议)]Transport Layer Security ——早期版本的TLS被称为SSL。
无论是否使用SSL/TLS,服务器和客户端的API都是相同的。通过创建客户端/服务器时使用的
NetClientOptions
或 / NetServerOptions
来启用TLS/SSL。
指定服务端的密钥/证书
SSL/TLS 服务端通常向客户端提供证书,以便验证服务端的身份。
可以通过以下几种方式为服务端配置证书/密钥:
第一种方法是指定包含证书和私钥的Java密钥库位置。
可以使用 JDK 附带的 keytool 实用程序来管理Java密钥存储。
还应提供密钥存储的密码:
NetServerOptions options = new NetServerOptions().setSsl(true).setKeyStoreOptions(
new JksOptions().
setPath("/path/to/your/server-keystore.jks").
setPassword("password-of-your-keystore")
);
NetServer server = vertx.createNetServer(options);
或者,您可以自己读取密钥库到一个Buffer,并将它直接提供给 JksOptions
:
Buffer myKeyStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/server-keystore.jks");
JksOptions jksOptions = new JksOptions().
setValue(myKeyStoreAsABuffer).
setPassword("password-of-your-keystore");
NetServerOptions options = new NetServerOptions().
setSsl(true).
setKeyStoreOptions(jksOptions);
NetServer server = vertx.createNetServer(options);
PKCS#12格式的密钥/证书( http://en.wikipedia.org/wiki/PKCS_12 ,通常为 .pfx
或 .p12
扩展名),
也可以用与JKS密钥存储相似的方式加载:
NetServerOptions options = new NetServerOptions().setSsl(true).setPfxKeyCertOptions(
new PfxOptions().
setPath("/path/to/your/server-keystore.pfx").
setPassword("password-of-your-keystore")
);
NetServer server = vertx.createNetServer(options);
也支持通过Buffer来配置:
Buffer myKeyStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/server-keystore.pfx");
PfxOptions pfxOptions = new PfxOptions().
setValue(myKeyStoreAsABuffer).
setPassword("password-of-your-keystore");
NetServerOptions options = new NetServerOptions().
setSsl(true).
setPfxKeyCertOptions(pfxOptions);
NetServer server = vertx.createNetServer(options);
另外一种分别提供服务器私钥和证书的方法是使用 .pem
文件。
NetServerOptions options = new NetServerOptions().setSsl(true).setPemKeyCertOptions(
new PemKeyCertOptions().
setKeyPath("/path/to/your/server-key.pem").
setCertPath("/path/to/your/server-cert.pem")
);
NetServer server = vertx.createNetServer(options);
也支持通过 Buffer
来配置:
Buffer myKeyAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/server-key.pem");
Buffer myCertAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/server-cert.pem");
PemKeyCertOptions pemOptions = new PemKeyCertOptions().
setKeyValue(myKeyAsABuffer).
setCertValue(myCertAsABuffer);
NetServerOptions options = new NetServerOptions().
setSsl(true).
setPemKeyCertOptions(pemOptions);
NetServer server = vertx.createNetServer(options);
Vert.x支持从PKCS8 PEM文件中读取未加密的基于RSA和/或ECC的私钥。 也可以从PKCS1 PEM文件中读取基于RSA的私钥。 若PEM文件包含由 RFC 7468, Section 5 定义的文本编码证书, 可以从该PEM文件中读取X.509证书。
警告
|
请记住,未加密的PKCS8或PKCS1 PEM文件中包含的密钥, 可以被有这些文件读取权限的人提取出来。因此,请确保对此类PEM文件设置适当的访问限制, 以防止滥用。 |
最后,您还可以加载通用Java密钥库,使用其他密钥库实现时非常有用, 比如使用Bouncy Castle时:
NetServerOptions options = new NetServerOptions().setSsl(true).setKeyCertOptions(
new KeyStoreOptions().
setType("BKS").
setPath("/path/to/your/server-keystore.bks").
setPassword("password-of-your-keystore")
);
NetServer server = vertx.createNetServer(options);
指定服务器信任
SSL/TLS 服务端可以使用证书颁发机构来验证客户端的身份。
证书颁发机构可通过多种方式为服务端配置。
可使用JDK随附的 keytool 实用程序来管理Java 受信存储。
还应提供受信存储的密码:
NetServerOptions options = new NetServerOptions().
setSsl(true).
setClientAuth(ClientAuth.REQUIRED).
setTrustStoreOptions(
new JksOptions().
setPath("/path/to/your/truststore.jks").
setPassword("password-of-your-truststore")
);
NetServer server = vertx.createNetServer(options);
或者您可以自己读取受信存储到Buffer,并将它直接提供:
Buffer myTrustStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/truststore.jks");
NetServerOptions options = new NetServerOptions().
setSsl(true).
setClientAuth(ClientAuth.REQUIRED).
setTrustStoreOptions(
new JksOptions().
setValue(myTrustStoreAsABuffer).
setPassword("password-of-your-truststore")
);
NetServer server = vertx.createNetServer(options);
PKCS#12格式的密钥/证书( http://en.wikipedia.org/wiki/PKCS_12 ,通常为 .pfx
或 .p12
扩展名),
也可以用与JKS密钥存储相似的方式加载:
NetServerOptions options = new NetServerOptions().
setSsl(true).
setClientAuth(ClientAuth.REQUIRED).
setPfxTrustOptions(
new PfxOptions().
setPath("/path/to/your/truststore.pfx").
setPassword("password-of-your-truststore")
);
NetServer server = vertx.createNetServer(options);
也支持通过 Buffer
来配置:
Buffer myTrustStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/truststore.pfx");
NetServerOptions options = new NetServerOptions().
setSsl(true).
setClientAuth(ClientAuth.REQUIRED).
setPfxTrustOptions(
new PfxOptions().
setValue(myTrustStoreAsABuffer).
setPassword("password-of-your-truststore")
);
NetServer server = vertx.createNetServer(options);
另一种提供服务器证书颁发机构的方法是使用一个 .pem
文件列表。
NetServerOptions options = new NetServerOptions().
setSsl(true).
setClientAuth(ClientAuth.REQUIRED).
setPemTrustOptions(
new PemTrustOptions().
addCertPath("/path/to/your/server-ca.pem")
);
NetServer server = vertx.createNetServer(options);
也支持通过 Buffer
来配置:
Buffer myCaAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/server-ca.pfx");
NetServerOptions options = new NetServerOptions().
setSsl(true).
setClientAuth(ClientAuth.REQUIRED).
setPemTrustOptions(
new PemTrustOptions().
addCertValue(myCaAsABuffer)
);
NetServer server = vertx.createNetServer(options);
客户端受信配置
若客户端将 trustALl
设置为 true
,
则客户端将信任所有服务端证书。连接仍然会被加密,但这种模式很容易受到中间人攻击。
即您无法确定您正连接到谁,请谨慎使用。默认值为 false
。
NetClientOptions options = new NetClientOptions().
setSsl(true).
setTrustAll(true);
NetClient client = vertx.createNetClient(options);
若客户端没有设置 trustAll
,则必须配置客户端受信存储,
并且受信客户端应该包含服务器的证书。
默认情况下,客户端禁用主机验证。 要启用主机验证,请在客户端上设置使用的算法(目前仅支持HTTPS和LDAPS):
NetClientOptions options = new NetClientOptions().
setSsl(true).
setHostnameVerificationAlgorithm("HTTPS");
NetClient client = vertx.createNetClient(options);
和服务器配置相同,也可通过以下几种方式配置受信客户端:
第一种方法是指定包含证书颁发机构的Java受信库的位置。
它只是一个标准的Java密钥存储,与服务器端的密钥存储相同。
通过在 jks options
上使用 path
设置客户端受信存储位置。
如果服务器在连接期间提供不在客户端受信存储中的证书,则尝试连接将不会成功。
NetClientOptions options = new NetClientOptions().
setSsl(true).
setTrustStoreOptions(
new JksOptions().
setPath("/path/to/your/truststore.jks").
setPassword("password-of-your-truststore")
);
NetClient client = vertx.createNetClient(options);
它也支持 Buffer
的配置:
Buffer myTrustStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/truststore.jks");
NetClientOptions options = new NetClientOptions().
setSsl(true).
setTrustStoreOptions(
new JksOptions().
setValue(myTrustStoreAsABuffer).
setPassword("password-of-your-truststore")
);
NetClient client = vertx.createNetClient(options);
PKCS#12格式的密钥/证书( http://en.wikipedia.org/wiki/PKCS_12 ,通常为 .pfx
或 .p12
扩展名),
也可以用与JKS密钥存储相似的方式加载:
NetClientOptions options = new NetClientOptions().
setSsl(true).
setPfxTrustOptions(
new PfxOptions().
setPath("/path/to/your/truststore.pfx").
setPassword("password-of-your-truststore")
);
NetClient client = vertx.createNetClient(options);
它也支持 Buffer
的配置:
Buffer myTrustStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/truststore.pfx");
NetClientOptions options = new NetClientOptions().
setSsl(true).
setPfxTrustOptions(
new PfxOptions().
setValue(myTrustStoreAsABuffer).
setPassword("password-of-your-truststore")
);
NetClient client = vertx.createNetClient(options);
另一种提供服务器证书颁发机构的方法是使用一个 .pem
文件列表。
NetClientOptions options = new NetClientOptions().
setSsl(true).
setPemTrustOptions(
new PemTrustOptions().
addCertPath("/path/to/your/ca-cert.pem")
);
NetClient client = vertx.createNetClient(options);
它也支持 Buffer
的配置:
Buffer myTrustStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/ca-cert.pem");
NetClientOptions options = new NetClientOptions().
setSsl(true).
setPemTrustOptions(
new PemTrustOptions().
addCertValue(myTrustStoreAsABuffer)
);
NetClient client = vertx.createNetClient(options);
指定客户端的密钥/证书
如果服务器需要客户端认证,那么当连接时,客户端必须向服务器提供自己的证书。 可通过以下几种方式配置客户端:
第一种方法是指定包含密钥和证书的Java密钥库的位置,它只是一个常规的Java密钥存储。
使用 jks options
上的
path
方法设置客户端密钥库位置。
NetClientOptions options = new NetClientOptions().setSsl(true).setKeyStoreOptions(
new JksOptions().
setPath("/path/to/your/client-keystore.jks").
setPassword("password-of-your-keystore")
);
NetClient client = vertx.createNetClient(options);
也支持通过 Buffer
来配置:
Buffer myKeyStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/client-keystore.jks");
JksOptions jksOptions = new JksOptions().
setValue(myKeyStoreAsABuffer).
setPassword("password-of-your-keystore");
NetClientOptions options = new NetClientOptions().
setSsl(true).
setKeyStoreOptions(jksOptions);
NetClient client = vertx.createNetClient(options);
PKCS#12格式的密钥/证书( http://en.wikipedia.org/wiki/PKCS_12 ,通常为 .pfx
或 .p12
扩展名),
也可以用与JKS密钥存储相似的方式加载:
NetClientOptions options = new NetClientOptions().setSsl(true).setPfxKeyCertOptions(
new PfxOptions().
setPath("/path/to/your/client-keystore.pfx").
setPassword("password-of-your-keystore")
);
NetClient client = vertx.createNetClient(options);
也支持通过 Buffer
来配置:
Buffer myKeyStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/client-keystore.pfx");
PfxOptions pfxOptions = new PfxOptions().
setValue(myKeyStoreAsABuffer).
setPassword("password-of-your-keystore");
NetClientOptions options = new NetClientOptions().
setSsl(true).
setPfxKeyCertOptions(pfxOptions);
NetClient client = vertx.createNetClient(options);
另一种单独提供服务器私钥和证书的方法是使用 .pem
文件。
NetClientOptions options = new NetClientOptions().setSsl(true).setPemKeyCertOptions(
new PemKeyCertOptions().
setKeyPath("/path/to/your/client-key.pem").
setCertPath("/path/to/your/client-cert.pem")
);
NetClient client = vertx.createNetClient(options);
也支持通过 Buffer
来配置:
Buffer myKeyAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/client-key.pem");
Buffer myCertAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/client-cert.pem");
PemKeyCertOptions pemOptions = new PemKeyCertOptions().
setKeyValue(myKeyAsABuffer).
setCertValue(myCertAsABuffer);
NetClientOptions options = new NetClientOptions().
setSsl(true).
setPemKeyCertOptions(pemOptions);
NetClient client = vertx.createNetClient(options);
请记住 pem
的配置中,私钥是不加密的。
用于测试和开发目的的自签名证书
小心
|
不要在生产设置中使用,这里生成的密钥非常不安全。 |
在运行单元/集成测试或是运行开发版的应用程序时, 都经常需要自签名证书。
SelfSignedCertificate
可用于提供自签名PEM证书,并可以提供
KeyCertOptions
和 TrustOptions
配置:
SelfSignedCertificate certificate = SelfSignedCertificate.create();
NetServerOptions serverOptions = new NetServerOptions()
.setSsl(true)
.setKeyCertOptions(certificate.keyCertOptions())
.setTrustOptions(certificate.trustOptions());
vertx.createNetServer(serverOptions)
.connectHandler(socket -> socket.end(Buffer.buffer("Hello!")))
.listen(1234, "localhost");
NetClientOptions clientOptions = new NetClientOptions()
.setSsl(true)
.setKeyCertOptions(certificate.keyCertOptions())
.setTrustOptions(certificate.trustOptions());
NetClient client = vertx.createNetClient(clientOptions);
client.connect(1234, "localhost", ar -> {
if (ar.succeeded()) {
ar.result().handler(buffer -> System.out.println(buffer));
} else {
System.err.println("Woops: " + ar.cause().getMessage());
}
});
客户端也可配置为信任所有证书:
NetClientOptions clientOptions = new NetClientOptions()
.setSsl(true)
.setTrustAll(true);
自签名证书也适用于其他基于TCP的协议,如HTTPS:
SelfSignedCertificate certificate = SelfSignedCertificate.create();
vertx.createHttpServer(new HttpServerOptions()
.setSsl(true)
.setKeyCertOptions(certificate.keyCertOptions())
.setTrustOptions(certificate.trustOptions()))
.requestHandler(req -> req.response().end("Hello!"))
.listen(8080);
待撤销证书颁发机构
可以通过配置证书吊销列表(CRL)来吊销不再被信任的证书机构。
crlPath
配置了使用的CRL:
NetClientOptions options = new NetClientOptions().
setSsl(true).
setTrustStoreOptions(trustOptions).
addCrlPath("/path/to/your/crl.pem");
NetClient client = vertx.createNetClient(options);
也支持通过 Buffer
来配置:
Buffer myCrlAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/crl.pem");
NetClientOptions options = new NetClientOptions().
setSsl(true).
setTrustStoreOptions(trustOptions).
addCrlValue(myCrlAsABuffer);
NetClient client = vertx.createNetClient(options);
配置密码套件
默认情况下,TLS配置将使用运行Vert.x的JVM 密码套件,该密码套件可以 配置一套启用的密码:
NetServerOptions options = new NetServerOptions().
setSsl(true).
setKeyStoreOptions(keyStoreOptions).
addEnabledCipherSuite("ECDHE-RSA-AES128-GCM-SHA256").
addEnabledCipherSuite("ECDHE-ECDSA-AES128-GCM-SHA256").
addEnabledCipherSuite("ECDHE-RSA-AES256-GCM-SHA384").
addEnabledCipherSuite("CDHE-ECDSA-AES256-GCM-SHA384");
NetServer server = vertx.createNetServer(options);
密码套件可在 NetServerOptions
或 NetClientOptions
配置项中指定。
配置TLS协议版本
默认情况下,TLS配置将使用以下协议版本:SSLv2Hello、TLSv1、TLSv1.1 和 TLSv1.2。 协议版本可以通过显式添加启用协议进行配置:
NetServerOptions options = new NetServerOptions().
setSsl(true).
setKeyStoreOptions(keyStoreOptions).
removeEnabledSecureTransportProtocol("TLSv1").
addEnabledSecureTransportProtocol("TLSv1.3");
NetServer server = vertx.createNetServer(options);
协议版本可在 NetServerOptions
或 NetClientOptions
配置项中指定。
SSL引擎
引擎实现可以配置为使用 OpenSSL 而不是JDK实现(来支持SSL)。 OpenSSL提供比JDK引擎更好的性能和CPU使用率、以及JDK版本独立性。
引擎选项可使用:
-
设置了
getSslEngineOptions
时,使用该选项 -
否则使用
JdkSSLEngineOptions
NetServerOptions options = new NetServerOptions().
setSsl(true).
setKeyStoreOptions(keyStoreOptions);
// Use JDK SSL engine explicitly
options = new NetServerOptions().
setSsl(true).
setKeyStoreOptions(keyStoreOptions).
setJdkSslEngineOptions(new JdkSSLEngineOptions());
// Use OpenSSL engine
options = new NetServerOptions().
setSsl(true).
setKeyStoreOptions(keyStoreOptions).
setOpenSslEngineOptions(new OpenSSLEngineOptions());
服务器名称指示(SNI)
服务器名称指示(英語:Server Name Indication,缩写:SNI)是TLS的一个扩展协议,需要指定尝试连接的主机名: TLS握手开始时客户端提供要连接的服务器名称,服务端根据该服务器名称,返回主机名对应的证书, 而非默认部署的证书。 如果服务器要求客户端身份验证,则服务器可以根据指定的服务器名称, 使用特定的受信任CA证书。
当SNI可用时,服务器使用以下规则匹配证书:
-
服务器名称完全匹配的证书CN(Common Name,通用名称)或SAN DNS(带有DNS的主题备用名称),例如
www.example.com
-
带匹配通配符的服务器名称匹配的证书CN或SAN DNS证书,例如
* .example.com
-
否则,当客户端不提供服务器名称或提供的服务器名称无法匹配时,返回第一个证书
当服务器另外要求客户端身份验证时:
*如果使用 JksOptions
设置信任选项
( options
),
则将与信任库别名完全匹配
*否则,将使用无SNI的方式处理可用的CA证书
通过设置 setSni
为 true
,可以在服务器上启用SNI,
并为服务器配置多个密钥/证书对。
Java KeyStore文件,或PKCS12文件可以开箱即用地存储多个密钥/证书对。
JksOptions keyCertOptions = new JksOptions().setPath("keystore.jks").setPassword("wibble");
NetServer netServer = vertx.createNetServer(new NetServerOptions()
.setKeyStoreOptions(keyCertOptions)
.setSsl(true)
.setSni(true)
);
PemKeyCertOptions
can be configured to hold multiple entries:
PemKeyCertOptions keyCertOptions = new PemKeyCertOptions()
.setKeyPaths(Arrays.asList("default-key.pem", "host1-key.pem", "etc..."))
.setCertPaths(Arrays.asList("default-cert.pem", "host2-key.pem", "etc...")
);
NetServer netServer = vertx.createNetServer(new NetServerOptions()
.setPemKeyCertOptions(keyCertOptions)
.setSsl(true)
.setSni(true)
);
客户端默认使用连接的主机名的完全限定域名(Fully Qualified Domain Name,缩写:FQDN)作为SNI服务器名称。
您可以在连接socket时,提供明确的服务器名称。
NetClient client = vertx.createNetClient(new NetClientOptions()
.setTrustStoreOptions(trustOptions)
.setSsl(true)
);
// 连接到 'localhost',并以 'server.name'为服务器名
client.connect(1234, "localhost", "server.name", res -> {
if (res.succeeded()) {
System.out.println("Connected!");
NetSocket socket = res.result();
} else {
System.out.println("Failed to connect: " + res.cause().getMessage());
}
});
可以用于以下目的:
-
指定与服务器主机不一样的服务器名称
-
使用IP进行连接时,指定服务器名称
-
使用短名称进行连接时,强制指定服务器名称
应用层协议协商
ALPN(Application-Layer Protocol Negotiation)是应用层协议协商的TLS扩展,它被HTTP/2使用: 在TLS握手期时,客户端给出其接受的应用协议列表, 之后服务器使用它所支持的协议响应。
Java 9 支持HTTP/2,可以开箱即用,无需其他步骤。
标准的Java 8不支持ALPN,所以ALPN应该通过其他方式启用:
-
OpenSSL 支持
-
Jetty-ALPN 支持
引擎选项可使用:
-
设置了
getSslEngineOptions
时,使用该选项 -
JDK中ALPN可用时,使用
JdkSSLEngineOptions
-
OpenSSL中ALPN可用时,使用
OpenSSLEngineOptions
-
否则失败
OpenSSL ALPN支持
OpenSSL提供了原生的ALPN支持。
OpenSSL需要配置 setOpenSslEngineOptions
并在类路径上使用 netty-tcnative 的jar库。
依赖于tcnative的实现,需要OpenSSL安装在您的操作系统中。
Jetty-ALPN支持
Jetty-ALPN是一个小型的jar,它覆盖了几种Java 8发行版用以支持ALPN。
JVM必须将 alpn-boot-${version}.jar 放在它的 bootclasspath
中启动:
-Xbootclasspath/p:/path/to/alpn-boot${version}.jar
其中 ${version} 取决于JVM的版本,如 OpenJDK 1.8.0u74 中的 8.1.7.v20160121。 这个完整列表可以在 Jetty-ALPN page 页面上找到。
这种方法主要缺点是ALPN的实现版本依赖于JVM的版本。
为了解决这个问题,可以使用 Jetty ALPN agent 。agent是一个JVM代理, 它会为运行它的JVM选择正确的ALPN版本:
-javaagent:/path/to/alpn/agent
客户端连接使用代理
NetClient
支持HTTP/1.x CONNECT、SOCKS4a 或 SOCKS5 代理。
代理可以在 NetClientOptions
内设置
ProxyOptions
来配置代理类型、主机名、端口、可选的用户名和密码。
以下是一个例子:
NetClientOptions options = new NetClientOptions()
.setProxyOptions(new ProxyOptions().setType(ProxyType.SOCKS5)
.setHost("localhost").setPort(1080)
.setUsername("username").setPassword("secret"));
NetClient client = vertx.createNetClient(options);
DNS 解析总是在代理服务器上完成解析,为了实现 SOCKS4 客户端的功能, 需要先在本地解析 DNS 地址。
使用HA PROXY协议
HA PROXY 协议 提供了一种便捷的安全传输连接信息 (例如客户端的地址)的方式, 可以跨多层NAT或TCP代理传输。
HA PROXY 协议通过 setUseProxyProtocol
方法设置启用,
同时需要在classpath中增加以下依赖:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-haproxy</artifactId>
<!--<version>该版本号需要和Vert.x使用的netty版本对齐</version>-->
</dependency>
NetServerOptions options = new NetServerOptions().setUseProxyProtocol(true);
NetServer server = vertx.createNetServer(options);
server.connectHandler(so -> {
// 打印HA Proxy协议提供的真实客户端地址,而非代理的地址
System.out.println(so.remoteAddress());
// 打印代理的地址
System.out.println(so.localAddress());
});
编写 HTTP 服务端和客户端
Vert.x 允许您轻松编写非阻塞的 HTTP 客户端和服务端。
Vert.x 支持 HTTP/1.0、HTTP/1.1 和 HTTP/2 协议。
用于 HTTP 的基本 API 对 HTTP/1.x 和 HTTP/2 是相同的,特定的API功能也可用于处理 HTTP/2 协议。
配置 HTTP 服务端
若您不想用默认值,可以在创建服务器时传递一个 HttpServerOptions
实例给它:
HttpServerOptions options = new HttpServerOptions().setMaxWebSocketFrameSize(1000000);
HttpServer server = vertx.createHttpServer(options);
配置 HTTP/2 服务端
Vert.x 通过 TLS 之上的 h2
和 TCP 之上的 h2c
来支持 HTTP/2 协议。
-
h2
表示使用了TLS的 应用层协议协商 (ALPN)协议来协商的 HTTP/2 协议 -
h2c
表示在TCP层上使用明文形式的 HTTP/2 协议, 这样的连接可以使用 HTTP/1.1 升级请求建立,也可以直接建立。
要处理 h2
请求,你必须调用 setUseAlpn
方法来启用TLS:
HttpServerOptions options = new HttpServerOptions()
.setUseAlpn(true)
.setSsl(true)
.setKeyStoreOptions(new JksOptions().setPath("/path/to/my/keystore"));
HttpServer server = vertx.createHttpServer(options);
ALPN是一个TLS的扩展,它在客户端和服务器开始交换数据之前协商协议。
不支持ALPN的客户端仍然可以执行 经典的 SSL握手。
通常情况,ALPN会对 h2
协议达成一致,不过服务器或客户端仍然可以决定使用
HTTP/1.1
协议。
要处理 h2c
请求,TLS必须被禁用,服务器将升级到 HTTP/2 以满足任何希望升级到 HTTP/2 的 HTTP/1.1 请求。
它还将接受以 PRI * HTTP/2.0\r\nSM\r\n
开始的 h2c
直接连接。
警告
|
大多数浏览器不支持 h2c ,所以在建站时,您应该使用 h2 而不是 h2c 。
|
当服务器接受 HTTP/2 连接时,它会向客户端发送其 初始设置
。
定义客户端如何使用连接,服务器的默认初始设置为:
-
getMaxConcurrentStreams
:按照 HTTP/2 RFC建议推荐值为100
-
其他默认的 HTTP/2 的设置
服务端网络活动日志
网络活动可以被记录下来,用于调试。
HttpServerOptions options = new HttpServerOptions().setLogActivity(true);
HttpServer server = vertx.createHttpServer(options);
详细说明请参阅 网络活动日志 章节。
开启服务端监听
要告诉服务器监听传入的请求,您可以使用其中一个 listen
方法。
在配置项中让服务器监听指定的主机和端口:
HttpServer server = vertx.createHttpServer(); // 译者注:配置服务器监听指定的主机和端口的例子:vertx.createHttpServer(new HttpServerOptions().setPort(8080).setHost("0.0.0.0"))
server.listen();
或在调用listen方法时指定主机和端口号,这样就忽略了配置项(中的主机和端口):
HttpServer server = vertx.createHttpServer();
server.listen(8080, "myhost.com");
默认主机名是 0.0.0.0
,它表示:监听所有可用地址;默认端口号是 80
。
实际的绑定也是异步的,因此服务器也许并没有在调用listen方法返回时监听,而是在一段时间 过后 才监听。
若您希望在服务器实际监听时收到通知,您可以向 listen
提供一个处理器。
例如:
HttpServer server = vertx.createHttpServer();
server.listen(8080, "myhost.com", res -> {
if (res.succeeded()) {
System.out.println("Server is now listening!");
} else {
System.out.println("Failed to bind!");
}
});
收到传入请求的通知
若您需要在收到请求时收到通知,则需要设置一个 requestHandler
:
HttpServer server = vertx.createHttpServer();
server.requestHandler(request -> {
// Handle the request in here
});
处理请求
当请求到达时,Vert.x 会像对应的处理函数传入一个 HttpServerRequest
实例并调用请求处理函数。
此对象表示服务端 HTTP 请求。
当请求的头信息被完全读取时会调用该请求处理器。
如果请求包含请求体,那么该请求体将在请求处理器被调用后的某个时间到达服务器。
每一个服务请求对象和一个服务响应对象绑定,您可以用
response
方法获取一个 HttpServerResponse
对象的引用。
这是服务器处理请求并回复 “hello world” 的简单示例。
vertx.createHttpServer().requestHandler(request -> {
request.response().end("Hello world");
}).listen(8080);
请求版本
在请求中指定的 HTTP 版本可通过 version
方法获取。
请求方法
使用 method
方法读取请求中的 HTTP 方法。
(即GET、POST、PUT、DELETE、HEAD、OPTIONS等)。
请求URI
使用 uri
方法读取请求中的URI路径。
请注意,这是在HTTP请求中传递的实际URI,它总是一个相对的URI。
这个URI是在 Section 5.1.2 of the HTTP specification - Request-URI 中定义的。
请求路径
使用 path
方法读取URI中的路径部分。
例如,请求的URI为:
a/b/c/page.html?param1=abc¶m2=xyz
请求路径是:
/a/b/c/page.html
请求查询
使用 query
读取URI中的查询部分。
例如,请求的URI为:
a/b/c/page.html?param1=abc¶m2=xyz
请求查询是:
param1=abc¶m2=xyz
请求头部
使用 headers
方法获取HTTP请求中的请求头部信息。
这个方法返回一个 MultiMap
实例。它像一个普通的Map或哈希表,并且它还允许同个键对应多个值
—— 因为HTTP允许同一个键对应多个请求头的值。
它的键不区分大小写,这意味着您可以执行以下操作:
MultiMap headers = request.headers();
// 读取User-Agent
System.out.println("User agent is " + headers.get("user-agent"));
// 这样做可以得到和上边相同的结果
System.out.println("User agent is " + headers.get("User-Agent"));
请求参数
您可以使用 params
方法返回HTTP请求中的参数信息。
请求参数在请求URI的 path 部分之后,例如URI是:
/page.html?param1=abc¶m2=xyz
那么参数将包含以下内容:
param1: 'abc' param2: 'xyz
请注意,这些请求参数是从请求的 URI 中解析读取的,
若您已经将表单属性存放在请求体中发送出去,并且该请求为 multi-part/form-data
类型请求,那么它们将不会显示在此处的参数中。
远程地址
可以使用 remoteAddress
方法读取请求发送者的地址。
绝对URI
HTTP 请求中传递的URI通常是相对的,若您想要读取请求中和相对URI对应的绝对URI,
可调用 absoluteURI
方法。
结束处理器
当整个请求(包括所有请求体)已经被完全读取时,请求中的 endHandler
方法会被调用。
从请求体中读取数据
HTTP请求通常包含我们需要读取的请求体。如前所述,当请求头部达到时, 请求处理器会被调用,因此请求对象在此时没有请求体。
这是因为请求体可能非常大(如文件上传),并且我们不会在内容发送给您之前将其全部缓冲存储在内存中, 这可能会导致服务器耗尽可用内存。
要接收请求体,您可在请求中调用 handler
方法设置一个处理器,
每次请求体的一小块数据收到时,该处理器都会被调用。以下是一个例子:
request.handler(buffer -> {
System.out.println("I have received a chunk of the body of length " + buffer.length());
});
传递给处理器的对象是一个 Buffer
,
当数据从网络到达时,处理器可以多次被调用,这取决于请求体的大小。
在某些情况下(例:若请求体很小),您将需要将这个请求体聚合到内存中, 您可以按照下边的方式进行聚合:
Buffer totalBuffer = Buffer.buffer();
request.handler(buffer -> {
System.out.println("I have received a chunk of the body of length " + buffer.length());
totalBuffer.appendBuffer(buffer);
});
request.endHandler(v -> {
System.out.println("Full body received, length = " + totalBuffer.length());
});
这是一个常见的情况,Vert.x为您提供了一个 bodyHandler
方法来执行此操作。
当所有请求体被收到时,bodyHandler
绑定的处理器会被调用一次:
request.bodyHandler(totalBuffer -> {
System.out.println("Full body received, length = " + totalBuffer.length());
});
处理 HTML 表单
您可使用 application/x-www-form-urlencoded
或 multipart/form-data
这两种 content-type 来提交 HTML 表单。
对于使用 URL 编码过的表单,表单属性会被编码在URL中,如同普通查询参数一样。
对于 multipart 类型的表单,它会被编码在请求体中,而且在整个请求体被 完全读取之前它是不可用的。
Multipart 表单还可以包含文件上传。
若您想要读取 multipart 表单的属性,您应该告诉 Vert.x 您会在读取任何请求体
之前 调用 setExpectMultipart
方法,
然后在整个请求体都被读取后,您可以使用 formAttributes
方法来读取实际的表单属性。
server.requestHandler(request -> {
request.setExpectMultipart(true);
request.endHandler(v -> {
// The body has now been fully read, so retrieve the form attributes
MultiMap formAttributes = request.formAttributes();
});
});
处理文件上传
Vert.x 可以处理以 multipart 编码形式上传的的文件。
要接收文件,您可以告诉 Vert.x 使用 multipart 表单,并对请求设置
uploadHandler
。
当服务器每次接收到上传请求时, 该处理器将被调用一次。
传递给处理器的对象是一个 HttpServerFileUpload
实例。
server.requestHandler(request -> {
request.setExpectMultipart(true);
request.uploadHandler(upload -> {
System.out.println("Got a file upload " + upload.name());
});
});
上传的文件可能很大,我们不会在单个缓冲区中包含整个上传的数据,因为这样会导致内存耗尽。 相反,上传数据是以块的形式被接收的:
request.uploadHandler(upload -> {
upload.handler(chunk -> {
System.out.println("Received a chunk of the upload of length " + chunk.length());
});
});
上传对象实现了 ReadStream
接口,因此您可以将请求体读取到任何
WriteStream
实例中。详细说明请参阅 流
章节。
若您只是想将文件上传到服务器磁盘的某个地方,可以使用 streamToFileSystem
方法:
request.uploadHandler(upload -> {
upload.streamToFileSystem("myuploads_directory/" + upload.filename());
});
警告
|
确保您检查了生产系统的文件名,以避免恶意客户将文件上传到文件系统中的任意位置。 有关详细信息,参阅 安全说明。 |
处理cookies
使用 removeCookie
删除Cookie。
使用 addCookie
增加Cookie。
增加的Cookie会在响应的时候自动写到响应头,随后浏览器可以获取到设置的 Cookie 并存储起来。
(Vert.x的)cookie是 Cookie
的实例。
可以从中获取cookie的名字、取值、域名、路径以及其他cookie的常规属性。
设置了SameSite的Cookie禁止服务器在发送跨域请求时带上发送
(站点是否跨域,取决于可注册域),从而为伪造跨域请求攻击提供了一些保护。
这种Cookie可以通过 setSameSite
设置。
Cookie的SameSite属性接受三个取值:
-
None - 允许在跨域请求和非跨域请求中发送
-
Strict - 只能在同站点的请求中发送(请求到设置该Cookie的站点)。 如果设置Cookie的站点与当前请求的站点不一致, 则不会发送SameSite设置为Strict的Cookie
-
Lax - 在跨域的子请求(例如调用加载图像或iframe)不发送这种SameSite(设为Lax的)Cookie, 但当用户从外部站点导航到URL时将发送该Cookie, 例如通过链接打开。
下面是一个查询并增加Cookie的例子:
Cookie someCookie = request.getCookie("mycookie");
String cookieValue = someCookie.getValue();
// 处理Cookie的逻辑
// 增加Cookie - 会自动写入响应头
request.response().addCookie(Cookie.cookie("othercookie", "somevalue"));
处理压缩体
Vert.x 可以处理在客户端通过 deflate 或 gzip 算法压缩过的请求体信息。
若要启用解压缩功能则您要在创建服务器时调用 setDecompressionSupported
方法设置配置项。
默认情况下解压缩是并未被启用的。
接收自定义 HTTP/2 帧
HTTP/2 是用于 HTTP 请求/响应模型的包含各种帧的一种帧协议, 该协议允许发送和接收其他类型的帧。
若要接收自定义帧(frame),您可以在请求中使用 customFrameHandler
,
每次当自定义的帧数据到达时,这个处理器会被调用。这而是一个例子:
request.customFrameHandler(frame -> {
System.out.println("Received a frame type=" + frame.type() +
" payload" + frame.payload().toString());
});
HTTP/2 帧不受流量控制限制 —— 当接收到自定义帧时,不论请求是否暂停, 自定义帧处理器都将立即被调用。
返回响应
服务器响应对象是一个 HttpServerResponse
实例,
它可以从request对应的 response
方法中读取。
您可以使用响应对象回写一个响应到 HTTP客户端。
设置状态码和消息
默认的 HTTP 状态响应码为 200
,表示 OK
。
可使用 setStatusCode
方法设置不同状态代码。
您还可用 setStatusMessage
方法指定自定义状态消息。
若您不指定状态信息,将会使用默认的状态码响应。
注意
|
在 HTTP/2 中,状态码的描述信息不会在响应中出现 —— 因为协议不传递该信息。 |
向 HTTP 响应写入数据
想要将数据写入 HTTP Response,您可使用任意一个 write
方法。
它们可以在响应结束之前被多次调用,它们可以通过以下几种方式调用:
对用单个缓冲区:
HttpServerResponse response = request.response();
response.write(buffer);
写入字符串,这种请求字符串将使用 UTF-8 进行编码,并将结果写入到报文中。
HttpServerResponse response = request.response();
response.write("hello world!");
写入带编码方式的字符串,这种情况字符串将使用指定的编码方式编码, 并将结果写入到报文中。
HttpServerResponse response = request.response();
response.write("hello world!", "UTF-16");
响应写入是异步的,并且在写操作进入队列之后会立即返回。
若您只需要将单个字符串或Buffer写入到HTTP 响应,则可使用
end
方法将其直接写入响应中并发回到客户端。
第一次写入操作会触发响应头的写入,因此,
若您不使用 HTTP 分块,那么必须在写入响应之前设置 Content-Length
头,
否则不会生效。若您使用 HTTP 分块则不需要担心这点。
完成 HTTP 响应
一旦您完成了 HTTP 响应,可调用 end
将其发回客户端。
这可以通过几种方式完成:
没有参数,直接结束响应,发回客户端:
HttpServerResponse response = request.response();
response.write("hello world!");
response.end();
您也可以和调用 write
方法一样传String或Buffer给 end
方法。
这种方式类似于先调用带String或Buffer参数的 write
方法,再调用无参 end
方法。例如:
HttpServerResponse response = request.response();
response.end("hello world!");
关闭底层连接
您可以调用 close
方法关闭底层的TCP 连接。
当响应结束时,Vert.x 将自动关闭非 keep-alive 的连接。
默认情况下,Vert.x 不会自动关闭 keep-alive 的连接,
若您想要在一段空闲时间之后让 Vert.x 自动关闭 keep-alive 的连接,则使用 setIdleTimeout
方法进行配置。
HTTP/2 连接在关闭响应之前会发送 {@literal GOAWAY} 帧。
设置响应头
HTTP 响应头可直接添加到 HTTP 响应中,通常直接操作
headers
:
HttpServerResponse response = request.response();
MultiMap headers = response.headers();
headers.set("content-type", "text/html");
headers.set("other-header", "wibble");
或您可使用 putHeader
方法:
HttpServerResponse response = request.response();
response.putHeader("content-type", "text/html").putHeader("other-header", "wibble");
响应头必须在写入响应体之前进行设置。
分块 HTTP 响应和附加尾部
Vert.x 支持 分块传输编码(HTTP Chunked Transfer Encoding) 。
这允许HTTP 响应体以块的形式写入,通常在响应体预先不知道尺寸、 需要将很大响应正文以流式传输到客户端时使用。
您可以通过如下方式开启分块模式:
HttpServerResponse response = request.response();
response.setChunked(true);
默认是不分块的,当处于分块模式时,每次调用任意一个 write
方法将导致新的 HTTP 块被写出。
在分块模式下,您还可以将响应的 HTTP 响应附加尾部(trailers)写入响应, 这种方式实际上是在写入响应的最后一块。
注意
|
分块响应在 HTTP/2 流中无效。 |
若要向响应添加尾部,则直接添加到 trailers
里。
HttpServerResponse response = request.response();
response.setChunked(true);
MultiMap trailers = response.trailers();
trailers.set("X-wibble", "woobble").set("X-quux", "flooble");
或者调用 putTrailer
方法。
HttpServerResponse response = request.response();
response.setChunked(true);
response.putTrailer("X-wibble", "woobble").putTrailer("X-quux", "flooble");
直接从磁盘或 Classpath 读文件
若您正在编写一个Web 服务端,一种从磁盘中读取并提供文件的方法是将文件作为 AsyncFile
打开并将其传送到HTTP 响应中。
或您可以使用 readFile
方法一次性加载它,并直接将其写入响应。
此外,Vert.x 提供了一种方法,允许您只执行一次操作, 即可直接将文件从磁盘或文件系统写入 HTTP 响应。 若底层操作系统支持,操作系统可以不拷贝到用户态, 而直接把数据从文件传输到Socket。
这是使用 sendFile
方法完成的,对于大文件处理通常更有效,
而这个方法对于小文件可能很慢。
这儿是一个非常简单的 Web 服务器,它使用 sendFile
方法从文件系统中读取并提供文件:
vertx.createHttpServer().requestHandler(request -> {
String file = "";
if (request.path().equals("/")) {
file = "index.html";
} else if (!request.path().contains("..")) {
file = request.path();
}
request.response().sendFile("web/" + file);
}).listen(8080);
发送文件是异步的,可能在调用返回一段时间后才能完成。如果要在
文件写入时收到通知,可以在 sendFile
方法中设置一个处理器。
请阅读 从 Classpath 访问文件 章节了解类路径的限制或禁用它。
注意
|
若在 HTTPS 协议中使用 sendFile 方法,它将会通过用户空间进行复制,因为若内核将数据
直接从磁盘复制到 Socket,则不会给我们任何加密的机会。
|
警告
|
若您要直接使用 Vert.x 编写 Web 服务器,请注意,不要允许用户滥用请求路径, 以此访问服务目录之外的、或者 classpath 之外的路径。 更安全的做法是使用Vert.x Web替代。 |
当需要提供文件的一部分,从给定的字节开始,您可以像下边这样做:
vertx.createHttpServer().requestHandler(request -> {
long offset = 0;
try {
offset = Long.parseLong(request.getParam("start"));
} catch (NumberFormatException e) {
// error handling...
}
long end = Long.MAX_VALUE;
try {
end = Long.parseLong(request.getParam("end"));
} catch (NumberFormatException e) {
// error handling...
}
request.response().sendFile("web/mybigfile.txt", offset, end);
}).listen(8080);
若您想要从偏移量开始发送文件直到尾部,则不需要提供长度信息, 这种情况下,您可以执行以下操作:
vertx.createHttpServer().requestHandler(request -> {
long offset = 0;
try {
offset = Long.parseLong(request.getParam("start"));
} catch (NumberFormatException e) {
// error handling...
}
request.response().sendFile("web/mybigfile.txt", offset);
}).listen(8080);
管道式响应
服务端响应 HttpServerResponse
也是一个 WriteStream
实例,因此您可以从任何
ReadStream
向其传送数据,如 AsyncFile
, NetSocket
,
WebSocket
或者 HttpServerRequest
。
这儿有一个例子,它回应了任何 PUT 方法的响应中的请求体。 它为请求体使用了管道,所以即使 HTTP 请求体远远超过内存容量, 它依旧可以正常工作。:
vertx.createHttpServer().requestHandler(request -> {
HttpServerResponse response = request.response();
if (request.method() == HttpMethod.PUT) {
response.setChunked(true);
request.pipeTo(response);
} else {
response.setStatusCode(400).end();
}
}).listen(8080);
还可以使用 send
方法发送 ReadStream
。
发送流是一个管道操作,但由于这方法是 HttpServerResponse
的,
当 content-length
响应头未设置时,此方法可以处理分块响应。
vertx.createHttpServer().requestHandler(request -> {
HttpServerResponse response = request.response();
if (request.method() == HttpMethod.PUT) {
response.send(request);
} else {
response.setStatusCode(400).end();
}
}).listen(8080);
写入 HTTP/2 帧
HTTP/2 是用于 HTTP 请求/响应模型的包含各种帧的一种帧协议, 该协议允许发送和接收其他类型的帧。
要发送这样的帧,您可以在响应中使用 writeCustomFrame
方法。
以下是一个例子:
int frameType = 40;
int frameStatus = 10;
Buffer payload = Buffer.buffer("some data");
// 向客户端发送一帧
response.writeCustomFrame(frameType, frameStatus, payload);
这些帧被立即发送,并且不受流控的影响 —— 当这样的帧被发送到对端时, 可以在其他的 {@literal DATA} 帧之前完成。
流重置
HTTP/1.x 不允许请求或响应流执行清除重置, 如当客户端上传的资源已经存在于服务器上,服务器就需要接受整个响应。
HTTP/2 在请求/响应期间随时支持流重置:
request.response().reset();
默认会发送 NO_ERROR
(0)错误代码,您也可以发送另外一个错误代码:
request.response().reset(8);
HTTP/2 规范中定义了可用的 错误码 列表:
若使用了 request handler
和
response handler
两个处理器过后,在流重置完成时您将会收到通知:
request.response().exceptionHandler(err -> {
if (err instanceof StreamResetException) {
StreamResetException reset = (StreamResetException) err;
System.out.println("Stream reset " + reset.getCode());
}
});
服务器推送
服务器推送(Server Push)是 HTTP/2 支持的一个新功能,可以为单个客户端请求并行发送多个响应。
当服务器处理请求时,它可以向客户端推送一次请求/响应(译者注:server push会先推送一条“请求”,然后再推送对应的“响应”):
HttpServerResponse response = request.response();
// 推送main.js到客户端
response.push(HttpMethod.GET, "/main.js", ar -> {
if (ar.succeeded()) {
// 服务器准备推送响应
HttpServerResponse pushedResponse = ar.result();
// 发送main.js响应
pushedResponse.
putHeader("content-type", "application/json").
end("alert(\"Push response hello\")");
} else {
System.out.println("Could not push client resource " + ar.cause());
}
});
// 发送请求的资源内容
response.sendFile("<html><head><script src=\"/main.js\"></script></head><body></body></html>");
当服务器准备推送响应时,推送响应处理器会被调用,并会发送响应。
推送响应处理器可能会接收到失败,如:客户端可能取消推送,因为在缓存中已经包含了 main.js
,
不再需要它。
您必须在响应结束之前调用 push
方法,但是在推送响应过后依然可以写响应。
处理异常
调用 exceptionHandler
可以设置一个处理器,用于接收
连接传递给 requestHandler
之前发生的异常,
或者是传递给 webSocketHandler
之前发生的异常,如TLS握手期间发生的异常。
HTTP 压缩
Vert.x 支持开箱即用的HTTP压缩。
这意味着在响应发送回客户端之前,您可以将响应体自动压缩。
若客户端不支持HTTP 压缩,则它可以发回没有压缩过的请求。
这允许它同时处理支持HTTP 压缩的客户端和不支持的客户端。
要启用压缩,可以使用 setCompressionSupported
方法进行配置。
默认情况下,未启用压缩。
当启用HTTP 压缩时,服务器将检查客户端请求头中是否包含了 Accept-Encoding
并支持常用的 deflate 和 gzip 压缩算法。
Vert.x 两者都支持。
若找到这样的请求头,服务器将使用所支持的压缩算法之一自动压缩响应正文, 并发送回客户端。
可以通过将响应头 content-encoding
设置为 identity
,来关闭响应内容的压缩:
request.response()
.putHeader(HttpHeaders.CONTENT_ENCODING, HttpHeaders.IDENTITY)
.sendFile("/path/to/image.jpg");
注意:压缩可以减少网络流量,但是CPU密集度会更高。
为了解决后边一个问题,Vert.x也允许您调整原始的 gzip/deflate 压缩算法的 “压缩级别” 参数。
压缩级别允许根据所得数据的压缩比和压缩/解压的计算成本来配置 gzip/deflate 算法。
压缩级别是从 “1” 到 “9” 的整数值,其中 “1” 表示更低的压缩比但是最快的算法,“9” 表示可用的最大压缩比但比较慢的算法。
使用高于 1-2 的压缩级别通常允许仅仅节约一些字节大小 —— 它的增益不是线性的,并取决于要压缩的特定数据 —— 但它可以满足服务器所要求的CPU周期的不可控的成本 (注意现在Vert.x不支持任何缓存形式的响应数据,如静态文件, 因此压缩是在每个请求体生成时进行的),它可生成压缩过的响应数据、并对接收的响应解码(inflating)—— 和客户端使用的方式一致, 这种操作随着压缩级别的增长会变得更加倾向于CPU密集型。
默认情况下 —— 如果通过 setCompressionSupported
方法启用压缩,Vert.x 将使用 “6” 作为压缩级别,
但是该参数可通过 setCompressionLevel
方法来更改。
创建 HTTP 客户端
您可通过以下方式创建一个具有默认配置的 HttpClient
实例:
HttpClient client = vertx.createHttpClient();
若您想要配置客户端选项,可按以下方式创建:
HttpClientOptions options = new HttpClientOptions().setKeepAlive(false);
HttpClient client = vertx.createHttpClient(options);
Vert.x 支持基于 TLS h2
和 TCP h2c
的 HTTP/2 协议。
默认情况下,HTTP 客户端会发送 HTTP/1.1 请求。若要执行 HTTP/2 请求,则必须调用 setProtocolVersion
方法将版本设置成 HTTP_2
。
对于 h2
请求,必须使用 应用层协议协商(ALPN) 启用TLS:
HttpClientOptions options = new HttpClientOptions().
setProtocolVersion(HttpVersion.HTTP_2).
setSsl(true).
setUseAlpn(true).
setTrustAll(true);
HttpClient client = vertx.createHttpClient(options);
对于 h2c
请求,TLS必须禁用,客户端将执行 HTTP/1.1 请求并尝试升级到 HTTP/2:
HttpClientOptions options = new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_2);
HttpClient client = vertx.createHttpClient(options);
h2c
连接也可以直接建立,如连接可以使用前文提到的方式创建,当
setHttp2ClearTextUpgrade
选项设置为 false
时:
建立连接后,客户端将发送 HTTP/2 连接前缀,
并期望从服务端接收相同的连接偏好。
HTTP 服务端可能不支持 HTTP/2,当响应到达时,可以使用
version
方法检查实际HTTP版本。。
当客户端连接到 HTTP/2 服务端时,它将向服务端发送其 初始设置
。
设置定义服务器如何使用连接、
客户端的默认初始设置是由 HTTP/2 RFC定义的。
客户端网络活动日志
网络活动可以被记录下来,用于调试。
HttpClientOptions options = new HttpClientOptions().setLogActivity(true);
HttpClient client = vertx.createHttpClient(options);
详情请参阅 网络活动日志 章节。
发送请求
HTTP 客户端是很灵活的,您可以通过各种方式发出请求。
发送请求的第一步是获取远程服务器的HTTP连接:
client.request(HttpMethod.GET,8080, "myserver.mycompany.com", "/some-uri", ar1 -> {
if (ar1.succeeded()) {
// 已连接到服务器
}
});
HTTP客户端会连接到远程服务器,也可能复用连接池里可用的连接。
默认主机和端口
通常您希望使用 HTTP 客户端向同一个主机/端口发送很多请求。为避免每次发送请求时重复设主机/端口, 您可以为客户端配置默认主机/端口:
HttpClientOptions options = new HttpClientOptions().setDefaultHost("wibble.com");
// 若您需要,可设置默认端口
HttpClient client = vertx.createHttpClient(options);
client.request(HttpMethod.GET, "/some-uri", ar1 -> {
if (ar1.succeeded()) {
HttpClientRequest request = ar1.result();
request.send(ar2 -> {
if (ar2.succeeded()) {
HttpClientResponse response = ar2.result();
System.out.println("Received response with status code " + response.statusCode());
}
});
}
});
设置请求头
可以使用 HttpHeaders
设置请求头,比如:
HttpClient client = vertx.createHttpClient();
// 使用MultiMap设置请求头
MultiMap headers = HttpHeaders.set("content-type", "application/json").set("other-header", "foo");
client.request(HttpMethod.GET, "some-uri", ar1 -> {
if (ar1.succeeded()) {
if (ar1.succeeded()) {
HttpClientRequest request = ar1.result();
request.headers().addAll(headers);
request.send(ar2 -> {
HttpClientResponse response = ar2.result();
System.out.println("Received response with status code " + response.statusCode());
});
}
}
});
这个headers是 MultiMap
的实例,它提供了添加、设置、删除条目的操作。
HTTP请求头允许一个特定的键包含多个值。
您也可以使用 putHeader
方法设置请求头:
request.putHeader("content-type", "application/json")
.putHeader("other-header", "foo");
若您想写入请求头,则您必须在写入任何请求体之前这样做来设置请求头。
写请求并处理响应
HttpClientRequest
的 request
方法会连接到远程服务器,
或复用一个已有连接。获得的请求实例已预先填充了一些数据,
例如主机或请求URI,但您需要将此请求发送到服务器。
调用 send
方法可以发送HTTP请求,
如 GET
请求,并异步处理 HttpClientResponse
响应。
client.request(HttpMethod.GET,8080, "myserver.mycompany.com", "/some-uri", ar1 -> {
if (ar1.succeeded()) {
HttpClientRequest request = ar1.result();
// 发送请求并处理响应
request.send(ar -> {
if (ar.succeeded()) {
HttpClientResponse response = ar.result();
System.out.println("Received response with status code " + response.statusCode());
} else {
System.out.println("Something went wrong " + ar.cause().getMessage());
}
});
}
});
你也可以发送带请求体的请求。
使用 send
方法可以发送String类型的请求体,
如果 Content-Length
请求头没有预先设置,则会自动设置。
client.request(HttpMethod.GET,8080, "myserver.mycompany.com", "/some-uri", ar1 -> {
if (ar1.succeeded()) {
HttpClientRequest request = ar1.result();
// 发送请求并处理响应
request.send("Hello World", ar -> {
if (ar.succeeded()) {
HttpClientResponse response = ar.result();
System.out.println("Received response with status code " + response.statusCode());
} else {
System.out.println("Something went wrong " + ar.cause().getMessage());
}
});
}
});
使用 send
方法可以发送Buffer类型的请求体,
如果 Content-Length
请求头没有预先设置,则会自动设置。
request.send(Buffer.buffer("Hello World"), ar -> {
if (ar.succeeded()) {
HttpClientResponse response = ar.result();
System.out.println("Received response with status code " + response.statusCode());
} else {
System.out.println("Something went wrong " + ar.cause().getMessage());
}
});
使用 send
方法可以发送Stream类型的请求体,
如果 Content-Length
请求头没有预先设置,则会设置分块传输的 Content-Encoding
请求头。
request
.putHeader(HttpHeaders.CONTENT_LENGTH, "1000")
.send(stream, ar -> {
if (ar.succeeded()) {
HttpClientResponse response = ar.result();
System.out.println("Received response with status code " + response.statusCode());
} else {
System.out.println("Something went wrong " + ar.cause().getMessage());
}
});
发送流的请求体
HttpClient的 send
方法在调用后马上发起请求。
但有时候需要对请求体的写入做底层控制。
HttpClientRequest
可用于写请求体.
下面是发起带请求体的POST请求的例子:
HttpClient client = vertx.createHttpClient();
client.request(HttpMethod.POST, "some-uri")
.onSuccess(request -> {
request.response().onSuccess(response -> {
System.out.println("Received response with status code " + response.statusCode());
});
// 现在可以对请求做各种配置
request.putHeader("content-length", "1000");
request.putHeader("content-type", "text/plain");
request.write(body);
// 确认请求可以结束
request.end();
});
// 或使用链式调用风格:
client.request(HttpMethod.POST, "some-uri")
.onSuccess(request -> {
request
.response(ar -> {
if (ar.succeeded()) {
HttpClientResponse response = ar.result();
System.out.println("Received response with status code " + response.statusCode());
}
})
.putHeader("content-length", "1000")
.putHeader("content-type", "text/plain")
.end(body);
});
也有一些方法可用于写入UTF-8编码的字符串,或以其他特定编码写入buffer:
request.write("some data");
// 指定字符串编码
request.write("some other data", "UTF-16");
// 通过buffer写入
Buffer buffer = Buffer.buffer();
buffer.appendInt(123).appendLong(245l);
request.write(buffer);
如果你的http请求只需要写入一个字符串或buffer,
可以直接调用 end
方法。
request.end("some simple data");
// 通过调用一次end方法,写入一个buffer并结束请求
Buffer buffer = Buffer.buffer().appendDouble(12.34d).appendLong(432l);
request.end(buffer);
当您写入请求时,第一次调用 write
方法将先将请求头写入到请求报文中。
实际写入操作是异步的,它可能在调用返回一段时间后才发生。
带请求体的非分块 HTTP 请求需要提供 Content-Length
头。
因此,若您不使用 HTTP 分块,则必须在写入请求之前设置 Content-Length
头,
否则会出错。
若您在调用其中一个 end
方法处理 String 或 Buffer,在写入请求体之前,Vert.x 将自动计算并设置
Content-Length
。
若您在使用 HTTP 分块模式,则不需要 Content-Length
头,
因此您不必先计算大小。
结束HTTP请求流
一旦完成了 HTTP 请求的准备工作,您必须调用其中一个 end
方法来
发送该请求(结束请求)。
结束一个请求时,若请求头尚未被写入,会导致它们被写入,并且请求被标记 成完成的。
请求可以通过多种方式结束。无参简单结束请求的方式如:
request.end();
或可以在调用 end
方法时提供 String 或 Buffer,这个和先调用带 String/Buffer 参数的 write
方法之后再调用无参 end
方法一样:
request.end("some-data");
// 使用buffer结束
Buffer buffer = Buffer.buffer().appendFloat(12.3f).appendInt(321);
request.end(buffer);
使用流式请求
HttpClientRequest
实例实现了 WriteStream
接口。
这意味着您可以从任何 ReadStream
实例将数据pipe到请求中。
例如,您可以将磁盘上的文件直接管送到HTTP 请求体中,如下所示:
request.setChunked(true);
file.pipeTo(request);
分块 HTTP 请求
Vert.x 支持 HTTP Chunked Transfer Encoding 请求。
这允许使用块方式写入HTTP 请求体,这个在请求体比较大需要流式发送到服务器, 或预先不知道大小时很常用。
您可使用 setChunked
将 HTTP 请求设置成分块模式。
在分块模式下,每次调用 write
方法将导致新的块被写入到报文。
这种模式中,无需先设置请求头中的 Content-Length
。
request.setChunked(true);
// 写一些块
for (int i = 0; i < 10; i++) {
request.write("this-is-chunk-" + i);
}
request.end();
请求超时
您可使用 setTimeout
或
setTimeout
设置一个特定 HTTP 请求的超时时间。
若请求在超时期限内未返回任何数据,则异常将会被传给异常处理器 (若已提供),并且请求将会被关闭。
写 HTTP/2 帧
HTTP/2 是用于 HTTP 请求/响应模型的具有各种帧的一个帧协议, 该协议允许发送和接收其他类型的帧。
要发送这样的帧,您可以使用 write
方法写入请求,以下是一个例子:
int frameType = 40;
int frameStatus = 10;
Buffer payload = Buffer.buffer("some data");
// 发送一帧到服务器
request.writeCustomFrame(frameType, frameStatus, payload);
流重置
HTTP/1.x 不允许请求或响应流进行重置,如当客户端上传了服务器上存在的资源时, 服务器依然要接收整个响应。
HTTP/2 在请求/响应期间随时支持流重置:
request.reset();
默认情况,发送 NO_ERROR(0)
错误代码,可发送其他错误代码:
request.reset(8);
HTTP/2规范定义了可使用的 错误码 列表。
若使用了 request handler
和
response handler
两个处理器过后,在流重置完成时您将会收到通知。
request.exceptionHandler(err -> {
if (err instanceof StreamResetException) {
StreamResetException reset = (StreamResetException) err;
System.out.println("Stream reset " + reset.getCode());
}
});
处理 HTTP 响应
您可以在请求方法中指定处理器或通过 HttpClientRequest
对象直接设置处理器来接收 HttpClientResponse
的实例。
您可以通过 statusCode
和 statusMessage
方法从响应中查询响应的状态码和状态消息:
request.send(ar2 -> {
if (ar2.succeeded()) {
HttpClientResponse response = ar2.result();
// 状态代码,如:200、404
System.out.println("Status code is " + response.statusCode());
// 状态消息,如:OK、Not Found
System.out.println("Status message is " + response.statusMessage());
}
});
// 与上面类似,设置一个请求发送完成的handler并结束请求
request
.response(ar2 -> {
if (ar2.succeeded()) {
HttpClientResponse response = ar2.result();
// 状态代码,如:200、404
System.out.println("Status code is " + response.statusCode());
// 状态消息,如:OK、Not Found
System.out.println("Status message is " + response.statusMessage());
}
})
.end();
使用流式响应
HttpClientResponse
实现了 ReadStream
,
这意味着您可以pipe数据到任何 WriteStream
实例。
响应头和尾
HTTP 响应可包含头信息。您可以使用 headers
方法来读取响应头。
该方法返回的对象是一个 MultiMap
实例,因为 HTTP 响应头中单个键可以关联多个值。
String contentType = response.headers().get("content-type");
String contentLength = response.headers().get("content-lengh");
分块 HTTP 响应还可以包含响应尾(trailer) —— 这实际上是在发送响应体的最后一个(数据)块。
读取请求体
当从报文中读取到响应头时,响应处理器就会被调用。
如果响应中包含响应体,那么响应体可能会在读取完header后,以多个分片的形式到达。 我们不会等待所有响应到达才调用响应处理器,因为响应可能会非常大, 我们可能会等待很长一段时间,或者因为巨大的响应体而耗尽内存。
client.request(HttpMethod.GET, "some-uri", ar1 -> {
if (ar1.succeeded()) {
HttpClientRequest request = ar1.result();
request.send(ar2 -> {
HttpClientResponse response = ar2.result();
response.handler(buffer -> {
System.out.println("Received a part of the response body: " + buffer);
});
});
}
});
若您知道响应体不是很大,并想在处理之前在内存中聚合所有响应体数据, 那么您可以自行聚合:
request.send(ar2 -> {
if (ar2.succeeded()) {
HttpClientResponse response = ar2.result();
// 创建空的缓冲区
Buffer totalBuffer = Buffer.buffer();
response.handler(buffer -> {
System.out.println("Received a part of the response body: " + buffer.length());
totalBuffer.appendBuffer(buffer);
});
response.endHandler(v -> {
// 现在所有的响应体都读取了
System.out.println("Total response body length is " + totalBuffer.length());
});
}
});
或者当响应已被完全读取时,您可以使用 body
方法以便读取整个响应体:
request.send(ar1 -> {
if (ar1.succeeded()) {
HttpClientResponse response = ar1.result();
response.body(ar2 -> {
if (ar2.succeeded()) {
Buffer body = ar2.result();
// 现在所有的响应体都读取了
System.out.println("Total response body length is " + body.length());
}
});
}
});
响应完成处理器
当整个响应体被完全读取或者无响应体的响应头被完全读取时,响应的 endHandler
就会被调用。
请求和响应组合使用
http客户端接口可以按下面的模式使用,非常简单:
-
调用
request
打开连接 -
调用
send
或write
/end
发送请求到服务器 -
处理
HttpClientResponse
响应的开始 -
处理响应事件
您可以使用Vert.x的Future组合的方式来简化代码,但是API是事件驱动的, 因此您需要充分了解它的工作过程,否则可能会遇到数据争夺 (即丢失事件导致数据损坏)的情况。
注意
|
Vert.x Web Client 是比HttpClient更高级别的替代品 (实际上它也是基于HttpClient构建的),如果HttpClient对于你的使用场景来说太底层,可以考虑WebClient。 |
HttpClient客户端有意地避免返回 Future<HttpClientResponse>
,
因为如果在event-loop之外设置Future的完成处理器可能会导致线程竞争。。
Future<HttpClientResponse> get = client.get("some-uri");
// 假设客户端返回的响应是Future
//(假设此事件 *不* 在event-loop中)
// 在这个例子里,会引入潜在的数据竞争
Thread.sleep(100);
get.onSuccess(response -> {
// 响应事件此时可能已经发生
response.body(ar -> {
});
});
将 HttpClientRequest
的使用限制在一个verticle的范围内是最简单的解决方案,
因为Verticle为避免数据竞争,会确保按顺序处理事件。
vertx.deployVerticle(() -> new AbstractVerticle() {
@Override
public void start() {
HttpClient client = vertx.createHttpClient();
Future<HttpClientRequest> future = client.request(HttpMethod.GET, "some-uri");
}
}, new DeploymentOptions());
在verticle外使用HttpClient进行交互时,可以安全地使用“组合”(compose), 只要不延迟响应事件即可。例如,直接在event-loop上处理响应。
Future<JsonObject> future = client
.request(HttpMethod.GET, "some-uri")
.compose(request -> request
.send()
.compose(response -> {
// Process the response on the event-loop which guarantees no races
if (response.statusCode() == 200 &&
response.getHeader(HttpHeaders.CONTENT_TYPE).equals("application/json")) {
return response
.body()
.map(buffer -> buffer.toJsonObject());
} else {
return Future.failedFuture("Incorrect HTTP response");
}
}));
// Listen to the composed final json result
future.onSuccess(json -> {
System.out.println("Received json result " + json);
}).onFailure(err -> {
System.out.println("Something went wrong " + err.getMessage());
});
如果需要延迟响应处理,则需要 pause
(暂停)响应或使用 pipe
,
当涉及另一个异步操作时,这可能是必需的。
Future<Void> future = client
.request(HttpMethod.GET, "some-uri")
.compose(request -> request
.send()
.compose(response -> {
// 在event-loop上处理响应,从而确保没有数据竞争
if (response.statusCode() == 200) {
// 创建一个管道,会暂停响应
Pipe<Buffer> pipe = response.pipe();
// 把文件写入磁盘
return fileSystem
.open("/some/large/file", new OpenOptions().setWrite(true))
.onFailure(err -> pipe.close())
.compose(file -> pipe.to(file));
} else {
return Future.failedFuture("Incorrect HTTP response");
}
}));
30x 重定向处理器
客户端可配置成根据 Location
响应头遵循HTTP 重定向规则:
-
GET或HEAD请求的HTTP响应码:
301
、302
、307
或308
-
GET请求的HTTP响应码
303
这有个例子:
client.request(HttpMethod.GET, "some-uri", ar1 -> {
if (ar1.succeeded()) {
HttpClientRequest request = ar1.result();
request.setFollowRedirects(true);
request.send(ar2 -> {
if (ar2.succeeded()) {
HttpClientResponse response = ar2.result();
System.out.println("Received response with status code " + response.statusCode());
}
});
}
});
默认情况最大的重定向数为 16
,您可使用 setMaxRedirects
方法设置。
HttpClient client = vertx.createHttpClient(
new HttpClientOptions()
.setMaxRedirects(32));
client.request(HttpMethod.GET, "some-uri", ar1 -> {
if (ar1.succeeded()) {
HttpClientRequest request = ar1.result();
request.setFollowRedirects(true);
request.send(ar2 -> {
if (ar2.succeeded()) {
HttpClientResponse response = ar2.result();
System.out.println("Received response with status code " + response.statusCode());
}
});
}
});
没有放之四海而皆准的策略,缺省的重定向策略可能不能满足您的需要。
默认重定向策略可使用自定义实现更改:
client.redirectHandler(response -> {
// 仅仅遵循301状态代码
if (response.statusCode() == 301 && response.getHeader("Location") != null) {
// 计算重定向URI
String absoluteURI = resolveURI(response.request().absoluteURI(), response.getHeader("Location"));
// 创建客户端将使用的新的可用请求
return Future.succeededFuture(new RequestOptions().setAbsoluteURI(absoluteURI));
}
// (其他情况)不需要重定向
return null;
});
这个策略将会处理接收到的原始 HttpClientResponse
,并返回 null
或 Future<HttpClientRequest>
。
-
当返回的是
null
时,处理原始响应 -
当返回的是
Future
时,请求将在它成功完成后发送 -
当返回的是
Future
时,请求失败时将调用设置的异常处理器
返回的请求必须是未发送的,这样原始请求处理器才会被发送而且客户端之后才能发送请求。
大多数原始请求设置将会传播(拷贝)到新请求中:
-
请求头,除非您已经设置了一些头
-
请求体,除非返回的请求使用了
GET
方法 -
响应处理器
-
请求异常处理器
-
请求超时
100-Continue 处理
根据 HTTP/1.1 规范 ,
客户端可以设置请求头 Expect: 100-Continue
,并且在发送剩余请求体之前先发送请求头。
然后服务器可以通过回复临时响应状态 Status: 100 (Continue)
来告诉客户端可以发送请求的剩余部分。
这里的想法是在发送大量数据之前允许服务器授权、接受/拒绝请求, 若请求不能被接收,则发送大量数据信息会浪费带宽, 并且会让服务器持续读取即将被丢弃的无用数据。
Vert.x 允许您在客户端请求对象中设置一个 continueHandler
。
它将在服务器发回一个状态 Status: 100 (Continue)
时被调用, 同时也表示(客户端)可以
发送请求的剩余部分。
通常将其与 sendHead
结合起来发送请求的头信息。
以下是一个例子:
client.request(HttpMethod.PUT, "some-uri")
.onSuccess(request -> {
request.response().onSuccess(response -> {
System.out.println("Received response with status code " + response.statusCode());
});
request.putHeader("Expect", "100-Continue");
request.continueHandler(v -> {
// 可发送请求体剩余部分
request.write("Some data");
request.write("Some more data");
request.end();
});
request.sendHead();
});
在服务端,Vert.x HTTP Server可配置成接收到 Expect: 100-Continue
头时
自动发回 100 Continue
临时响应信息。
这个可通过 setHandle100ContinueAutomatically
方法来设置。
若您想要决定是否手动发送持续响应,那么此属性可设置成
false
(默认值),然后您可以通过检查头信息并且调用 writeContinue
方法让客户端持续发送请求体:
httpServer.requestHandler(request -> {
if (request.getHeader("Expect").equalsIgnoreCase("100-Continue")) {
// 发送100 Continue持续响应
request.response().writeContinue();
// 当客户端收到100响应代码则可以发送剩余请求体
request.bodyHandler(body -> {
// 处理请求体
});
request.endHandler(v -> {
request.response().end();
});
}
});
您也可以通过直接发送故障状态代码来拒绝该请求:这种情况下,
请求体应该被忽略或连接应该被关闭( 100-Continue
是一个性能提示,
并不是逻辑协议约束):
httpServer.requestHandler(request -> {
if (request.getHeader("Expect").equalsIgnoreCase("100-Continue")) {
//
boolean rejectAndClose = true;
if (rejectAndClose) {
// 使用失败码拒绝并关闭这个连接,
// 长连接情况下最好加上(指的是Connection: close)
request.response()
.setStatusCode(405)
.putHeader("Connection", "close")
.end();
} else {
// 使用失败码拒绝忽略请求体,
// 若体积很小,这是适用的
request.response()
.setStatusCode(405)
.end();
}
}
});
创建HTTP隧道
可以使用 connect
创建HTTP隧道:
client.request(HttpMethod.CONNECT, "some-uri")
.onSuccess(request -> {
// 连接到服务器
request.connect(ar -> {
if (ar.succeeded()) {
HttpClientResponse response = ar.result();
if (response.statusCode() != 200) {
// 某些原因连接失败
} else {
// HTTP隧道创建成功,原始数据将传输到缓冲区
NetSocket socket = response.netSocket();
}
}
});
});
收到HTTP响应头时,会调用上面传入的handler,socket也会准备好隧道传输, 并将会发送、接收数据到缓冲区。
connect
方法的作用类似 send
方法,
区别在于前者重新配置传输交换原始数据缓冲区。
客户端推送
服务器推送(Server Push)是 HTTP/2 的一个新功能,它可以为单个客户端并行发送多个响应。
可以在接收服务器推送的请求/响应的请求上设置一个推送处理器:
client.request(HttpMethod.GET, "/index.html")
.onSuccess(request -> {
request
.response().onComplete(response -> {
// 处理index.html响应
});
// 设置一个推送处理器来感知服务器推送的任何资源
request.pushHandler(pushedRequest -> {
// 为当前请求推送资源
System.out.println("Server pushed " + pushedRequest.path());
// 为响应设置处理器
pushedRequest.response().onComplete(pushedResponse -> {
System.out.println("The response for the pushed request");
});
});
// 结束请求
request.end();
});
若客户端不想收到推送请求,它可重置流:
request.pushHandler(pushedRequest -> {
if (pushedRequest.path().equals("/main.js")) {
pushedRequest.reset();
} else {
// 处理逻辑
}
});
若没有设置任何处理器时,任何被推送的流将被客户端自动重置
(错误代码 8
)。
客户端启用压缩
HTTP 客户端支持开箱即用的 HTTP 压缩功能。
这意味着客户端可以让远程服务器知道它支持压缩,并且能处理 压缩过的响应体(数据)。
HTTP 服务端可以自由地使用自己支持的压缩算法之一进行压缩,也可以在 不压缩的情况下将响应体发回。所以这仅仅是 HTTP 服务端的一个可能被随意忽略的提示。
要告诉服务器当前客户端支持哪种压缩,请求头将包含一个 Accept-Encoding
头,
其值为可支持的压缩算法,(该值可)支持多种压缩算法。
Vert.x 会添加以下头:
Accept-Encoding: gzip, deflate
服务器将从其中选择一个算法,您可以通过服务器返回的响应中的响应头
Content-Encoding
来检测服务器是否适应这个正文。
若响应体通过 gzip
压缩,它将包含例如下边的头:
Content-Encoding: gzip
创建客户端时可使用 setTryUseCompression
设置配置项启用压缩。
默认情况压缩被禁用。
HTTP/1.x pooling 和 keep alive
HTTP 的 Keep Alive 允许单个 HTTP 连接用于多个请求。当您向同一台服务器发送多个请求时, 可以更加有效使用连接。
对于 HTTP/1.x 版本,HTTP 客户端支持连接池,它允许您重用请求之间的连接。
为了连接池(能)工作,配置客户端时,keep alive 必须通过 setKeepAlive
方法设置成 true
。默认值为 true
。
当 keep alive 启用时,Vert.x 将为每一个发送的 HTTP/1.0 请求添加一个 Connection: Keep-Alive
头。
当 keep alive 禁用时,Vert.x 将为每一个 HTTP/1.1 请求添加一个 Connection: Close
头 ——
表示在响应完成后连接将被关闭。
可使用 setMaxPoolSize
方法 为每个服务器 配置连接池的最大连接数。
当启用连接池创建请求时,若存在少于已经为服务器创建的最大连接数,Vert.x 将创建一个新连接, 否则直接将请求添加到队列中。
Keep Alive的连接将在闲置一段时间后被客户端自动关闭。这个超时时间可以在服务端通过
keep-alive
请求头设置:
keep-alive: timeout=30
或者,您可使用 setKeepAliveTimeout
设置空闲时间——在设置的时间内然后没使用的连接将被关闭。
请注意空闲超时值以秒为单位而不是毫秒。
HTTP/1.1 pipe-lining
客户端还支持同一条连接上的管道(pipeline)。
管道意味着在返回一个响应之前,在同一个连接上发送另一个请求。 管道不是对所有请求都适用的。
若要启用管道,必须调用 setPipelining
方法。
默认是禁用管道的。
当启用管道时,请求可以不等待以前的响应返回而写入到连接。
单个连接的管道请求限制数由 setPipeliningLimit
方法设置。
此选项定义了发送到服务器的等待响应的最大请求数。
这个限制可以保证客户端请求数量在同一服务端的多条连接之间保持平衡。
HTTP/2 多路复用
HTTP/2 提倡使用服务器的单一连接,默认情况下,HTTP 客户端针对每个服务器都使用单一连接, 同样服务器上的所有流都会复用到对应连接中。
当客户端需要使用连接池并使用超过一个连接时,则可使用 setHttp2MaxPoolSize
设置。
当您使用连接池(而不是单个连接),并希望限制每个连接的多路复用流数量时,
可使用 setHttp2MultiplexingLimit
设置。
HttpClientOptions clientOptions = new HttpClientOptions().
setHttp2MultiplexingLimit(10).
setHttp2MaxPoolSize(3);
// Uses up to 3 connections and up to 10 streams per connection
HttpClient client = vertx.createHttpClient(clientOptions);
连接的多路复用数量限制是在客户端上设置的流数量限制。
如果服务器使用 SETTINGS_MAX_CONCURRENT_STREAMS
设置的值比该值更低,则有效值会更低。
HTTP/2 连接不会被客户端自动关闭,若要关闭它们,可以调用 close
来关闭客户端实例。
或者,您可以使用 setIdleTimeout
设置空闲时间
——这个时间内没有被使用过的连接将被关闭,注意,空闲时间以秒为单位,不是毫秒。
HTTP 连接
HttpConnection
接口提供了处理 HTTP 连接事件、生命周期、
设置(settings)的API。
HTTP/2 实现了完整的 HttpConnection
API。
HTTP/1.x 实现了 HttpConnection
中的部分API:
仅实现了关闭操作、关闭处理器和异常处理器。
该协议并不提供其他操作的语义。
服务端连接
connection
方法会返回服务器上的请求连接:
HttpConnection connection = request.connection();
可以在服务器上设置连接处理器,任意连接传入时可得到通知:
HttpServer server = vertx.createHttpServer(http2Options);
server.connectionHandler(connection -> {
System.out.println("A client connected");
});
客户端连接
connection
方法会返回客户端上的连接请求:
HttpConnection connection = request.connection();
可以在请求上设置连接处理器在连接发生时通知:
client.connectionHandler(connection -> {
System.out.println("Connected to the server");
});
连接配置
HTTP/2 由 Http2Settings
数据对象来配置。
每个 Endpoint 都必须遵守连接另一端的发送设置。
当建立连接时,客户端和服务器交换初始配置,初始设置由客户端上的
setInitialSettings
和
服务器上的 setInitialSettings
方法配置。
连接建立后可随时更改设置:
connection.updateSettings(new Http2Settings().setMaxConcurrentStreams(100));
由于远端可能会发送配置更新的确认, 所以可能会在回调中收到通知:
connection.updateSettings(new Http2Settings().setMaxConcurrentStreams(100), ar -> {
if (ar.succeeded()) {
System.out.println("The settings update has been acknowledged ");
}
});
相反,在收到新的远程设置时会通知
remoteSettingsHandler
。
connection.remoteSettingsHandler(settings -> {
System.out.println("Received new settings");
});
注意
|
此功能仅适用于 HTTP/2 协议。 |
连接 Ping
HTTP/2 连接 ping 对于确定连接往返时间或检查连接有效性很有用:
ping
发送 {@literal PING}
帧到远端:
Buffer data = Buffer.buffer();
for (byte i = 0;i < 8;i++) {
data.appendByte(i);
}
connection.ping(data, pong -> {
System.out.println("Remote side replied");
});
当接收到 {@literal PING} 帧时,Vert.x 将自动发送确认, 可设置处理器当收到 ping 帧时发送通知调用处理器:
connection.pingHandler(ping -> {
System.out.println("Got pinged by remote side");
});
处理器只是接到通知,确认无论如何都会自动发送。 这个功能是为基于 HTTP/2 实现的其他协议提供的。
注意
|
此功能仅适用于 HTTP/2 协议。 |
连接关闭/GOAWAY
调用 shutdown
方法将发送 {@literal GOAWAY} 帧到
远程的连接,要求其停止创建流:客户端将停止发送新请求,
并且服务器将停止推送响应。发送 {@literal GOAWAY} 帧后,连接
将等待一段时间(默认为30秒),直到当前所有流关闭,然后关闭连接。
connection.shutdown();
shutdownHandler
通知何时关闭所有流,
并且连接尚未关闭。
有可能只需发送 {@literal GOAWAY} 帧,和关闭主要的区别在于 它将只是告诉远程连接停止创建新流,而没有计划 关闭连接:
connection.goAway(0);
相反,也可以在收到 {@literal GOAWAY}
时收到通知:
connection.goAwayHandler(goAway -> {
System.out.println("Received a go away frame");
});
当所有当前流已经关闭并且可关闭连接时,shutdownHandler
将被调用:
connection.goAway(0);
connection.shutdownHandler(v -> {
// 所有流被关闭时,关闭连接
connection.close();
});
当接收到 {@literal GOAWAY} 时也适用。
注意
|
此功能仅适用于HTTP/2协议。 |
连接关闭
您可以通过 close
方法关闭连接:
-
对于 HTTP/1.x 来说,它会关闭底层的 Socket
-
对于 HTTP/2 来说,它将执行无延迟关闭, {@literal GOAWAY} 帧将会在连接关闭之前被发送
连接关闭时 closeHandler
将发出通知。
HttpClient 使用说明
HttpClient可以在一个 Verticle 中使用或者嵌入使用。
在 Verticle 中使用时,Verticle 应该使用自己的客户端实例 。
一般来说,不应该在不同的 Vert.x 上下文环境之间共享客户端,因为它可能导致不可预知的意外。
例如:keep-alive的连接将在打开连接的请求上下文环境调用客户端处理器,后续请求将使用 相同上下文环境。
当这种情况发生时,Vert.x会检测到并记录以下警告:
Reusing a connection with a different context: an HttpClient is probably shared between different Verticles
HttpClient可以嵌套在非 Vert.x 线程中,如单元测试或纯Java的 main
线程中:
客户端处理器将被不同的 Vert.x 线程和上下文调用,这样的上下文会根据需要创建。
对于生产环境,不推荐这样使用。
水平扩展 - 服务端共享
当多个 HTTP 服务端在同一个端口上监听时,Vert.x 会使用轮询策略来管理请求处理。
我们用 Verticle 来创建 HTTP 服务端,如:
vertx.createHttpServer().requestHandler(request -> {
request.response().end("Hello from server " + this);
}).listen(8080);
这个服务监听8080端口。那么当verticle被实例化多次,如:
vertx run io.vertx.examples.http.sharing.HttpServerVerticle -instances 2
,将会发生什么?
如果两个Verticle 都绑定到同一个端口,您将收到一个 Socket 异常。
幸运的是,Vert.x 可以为您处理这种情况。在与现有服务端相同的主机和端口上部署另一个服务器时,
实际上并不会尝试创建在同一主机/端口上监听的新服务端,它只绑定一次到Socket,
当接收到请求时,会按照轮询策略调用服务端的请求处理函数。
我们现在想象一个客户端,如下:
vertx.setPeriodic(100, (l) -> {
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/", ar1 -> {
if (ar1.succeeded()) {
HttpClientRequest request = ar1.result();
request.send(ar2 -> {
if (ar2.succeeded()) {
HttpClientResponse resp = ar2.result();
resp.bodyHandler(body -> {
System.out.println(body.toString("ISO-8859-1"));
});
}
});
}
});
});
Vert.x 将请求顺序委托给其中一个服务器:
Hello from i.v.e.h.s.HttpServerVerticle@1
Hello from i.v.e.h.s.HttpServerVerticle@2
Hello from i.v.e.h.s.HttpServerVerticle@1
Hello from i.v.e.h.s.HttpServerVerticle@2
...
因此,服务器可直接扩展可用的核,而每个 Vert.x 中的 Verticle 实例仍然严格使用单线程, 您不需要像编写负载均衡器那样使用任何特殊技巧去编写, 就可以在多核机器上扩展服务器。
使用 HTTPS
Vert.x 的 HTTP 服务端和客户端可以配置成和网络服务器完全相同的方式使用 HTTPS。
有关详细信息,请参阅 配置网络服务器以使用 SSL 章节。
SSL可以通过每个请求的 RequestOptions
来启用/禁用,
或在指定模式时调用 setAbsoluteURI
方法。
client.request(new RequestOptions()
.setHost("localhost")
.setPort(8080)
.setURI("/")
.setSsl(true), ar1 -> {
if (ar1.succeeded()) {
HttpClientRequest request = ar1.result();
request.send(ar2 -> {
if (ar2.succeeded()) {
HttpClientResponse response = ar2.result();
System.out.println("Received response with status code " + response.statusCode());
}
});
}
});
setSsl
设置将用作客户端默认配置。
setSsl
将覆盖默认客户端设置。
-
即使客户端配置成使用 SSL/TLS,该值设置成
false
将禁用SSL/TLS。 -
即使客户端配置成不使用 SSL/TLS,该值设置成
true
将启用SSL/TLS, 实际的客户端SSL/TLS(如受信、密钥/证书、密码、ALPN 等)将被重用。
同样, setAbsoluteURI
方法也会
(在调用时)覆盖默认客户端设置。
WebSockets
WebSockets 是一种Web技术, 可以在 HTTP 服务端和 HTTP 客户端(通常是浏览器)之间实现全双工 Socket 连接。
Vert.x HTTP 客户端和服务端都支持 WebSocket。
服务端 WebSocket
在服务端处理 WebSocket 有两种方法。
WebSocket handler
第一种方法需要在服务端实例上提供一个
webSocketHandler
。
当对服务端创建 WebSocket 连接时,Vert.x 将向 Handler
传入一个
ServerWebSocket
实例,在其中去处理它。
server.webSocketHandler(webSocket -> {
System.out.println("Connected!");
});
您可以调用 reject
方法来拒绝一个 WebSocket。
server.webSocketHandler(webSocket -> {
if (webSocket.path().equals("/myapi")) {
webSocket.reject();
} else {
// 处理逻辑
}
});
调用 setHandshake
并传入 Future
, 可以实现异步处理WebSocket握手:
server.webSocketHandler(webSocket -> {
Promise<Integer> promise = Promise.promise();
webSocket.setHandshake(promise.future());
authenticate(webSocket.headers(), ar -> {
if (ar.succeeded()) {
// 用101状态码(协议切换)结束握手
// 或用401状态码(未鉴权)拒绝握手
promise.complete(ar.succeeded() ? 101 : 401);
} else {
// 发送500错误状态码
promise.fail(ar.cause());
}
});
});
注意
|
除非手动设置了WebSocket握手处理器,否则调用(webSocketHandler传入的)处理器后,将自动接受WebSocket握手。 |
协议切换为 WebSocket
处理 WebSocket 的第二种方法是处理从客户端发送的HTTP升级请求,调用服务器请求对象的
toWebSocket
方法。
server.requestHandler(request -> {
if (request.path().equals("/myapi")) {
Future<ServerWebSocket> fut = request.toWebSocket();
fut.onSuccess(ws -> {
// 处理逻辑
});
} else {
// 拒绝切换WebSocket
request.response().setStatusCode(400).end();
}
});
服务端 WebSocket
您可以通过 ServerWebSocket
实例读取在 WebSocket 握手中HTTP请求的 headers
,
path
, query
和
URI
。
客户端 WebSocket
Vert.x的 HttpClient
支持 WebSocket。
您可以调用其中任意一个 webSocket
方法创建 WebSocket 连接到服务端,
并提供回调函数。
当连接建立时,处理器将被调用并且传入 WebSocket
实例:
client.webSocket("/some-uri", res -> {
if (res.succeeded()) {
WebSocket ws = res.result();
System.out.println("Connected!");
}
});
向 WebSocket 写入消息
若您想将一个 WebSocket 消息写入 WebSocket,可使用
writeBinaryMessage
方法或
writeTextMessage
方法来执行该操作:
Buffer buffer = Buffer.buffer().appendInt(123).appendFloat(1.23f);
webSocket.writeBinaryMessage(buffer);
// 写一个简单文本消息
String message = "hello";
webSocket.writeTextMessage(message);
若WebSocket 消息大于使用
setMaxWebSocketFrameSize
设置的 WebSocket 的帧的最大值,则Vert.x在将其发送到报文之前将其拆分为多个 WebSocket 帧。
向 WebSocket 写入帧
WebSocket 消息可以由多个帧组成,在这种情况下,第一帧要么是 二进制(binary) 帧,要么是 文本(text) 帧帧, 后边跟着零个或多个 continuation 帧。
消息中的最后一帧标记成 final 。
要发送多个帧组成的消息,请使用
WebSocketFrame.binaryFrame
, WebSocketFrame.textFrame
或
WebSocketFrame.continuationFrame
方法创建帧,
并使用 writeFrame
方法将其写入WebSocket。
以下是二进制帧的示例:
WebSocketFrame frame1 = WebSocketFrame.binaryFrame(buffer1, false);
webSocket.writeFrame(frame1);
WebSocketFrame frame2 = WebSocketFrame.continuationFrame(buffer2, false);
webSocket.writeFrame(frame2);
// 写最终帧
WebSocketFrame frame3 = WebSocketFrame.continuationFrame(buffer2, true);
webSocket.writeFrame(frame3);
许多情况下,您只需要发送一个包含了单个最终帧的 WebSocket 消息,因此我们提供了
writeFinalBinaryFrame
和 writeFinalTextFrame
这两个快捷方法。
下边是示例:
webSocket.writeFinalTextFrame("Geronimo!");
// 发送由单个最终二进制帧组成的websocket消息:
Buffer buff = Buffer.buffer().appendInt(12).appendString("foo");
webSocket.writeFinalBinaryFrame(buff);
从 WebSocket 读取帧
要 从WebSocket 读取帧,您可以使用 frameHandler
方法。
当帧到达时,会传入一个 WebSocketFrame
实例给帧处理器,并调用它,
例如:
webSocket.frameHandler(frame -> {
System.out.println("Received a frame of size!");
});
关闭 WebSocket
处理完成之后,请使用 close
方法关闭 WebSocket 连接。
管道式 WebSocket
WebSocket
也是 ReadStream
和
WriteStream
的实现类,因此可以和管道一起使用。
当使用 WebSocket 作为可写流或可读流时, 它只能用于 WebSocket 连接上连续传输的一组二进制帧(译者注:即二进制帧之间不能被其他帧分割)。
Event bus 处理器
每个WebSocket都会在事件总线上自动注册两个处理器,当此处理器中接收到任何数据时, 它会将数据写入WebSocket。这两个处理器是本地订阅,不会路由到集群上。
基于这个特性,你可以将数据写入WebSocket(可能在完全不同的verticle中), 只要将数据发送到这两个处理器监听的地址即可。
这两个处理器的监听地址由 binaryHandlerID
和
textHandlerID
给出。
使用 HTTP/HTTPS 连接代理
HTTP 客户端支持通过 HTTP 代理(如Squid)或 SOCKS4a 或 SOCKS5 代理访问 HTTP/HTTPS 的 URL。 CONNECT 协议使用 HTTP/1.x,但可以连接到 HTTP/1.x 和 HTTP/2 服务器。
到 h2c
(未加密HTTP/2服务器)的连接可能不受 HTTP 代理支持,
因为代理仅支持 HTTP/1.1。
您可以通过 HttpClientOptions
中的
ProxyOptions
对象配置来配置代理(包括代理类型、主机名、端口和可选用户名和密码)。
以下是使用 HTTP 代理的例子:
HttpClientOptions options = new HttpClientOptions()
.setProxyOptions(new ProxyOptions().setType(ProxyType.HTTP)
.setHost("localhost").setPort(3128)
.setUsername("username").setPassword("secret"));
HttpClient client = vertx.createHttpClient(options);
当客户端连接到HTTP URL时,它会连接到代理服务器,并在HTTP请求中提供完整URL ( "GET http://www.somehost.com/path/file.html HTTP/1.1" )。
当客户端连接到HTTPS URL时,它要求代理使用 CONNECT 方法创建到远程主机的通道。
对于 SOCKS5 代理:
HttpClientOptions options = new HttpClientOptions()
.setProxyOptions(new ProxyOptions().setType(ProxyType.SOCKS5)
.setHost("localhost").setPort(1080)
.setUsername("username").setPassword("secret"));
HttpClient client = vertx.createHttpClient(options);
DNS 解析会一直在代理服务器上执行。为了实现 SOCKS4 客户端的功能, 需要先在本地解析 DNS 地址。
处理其他协议
如果代理支持,HTTP代理的实现支持获取 ftp:// 协议的url。
当HTTP请求URI包含完整URL时,HttpClient不会计算完整的HTTP URL, 而是直接使用请求URI中指定的完整URL:
HttpClientOptions options = new HttpClientOptions()
.setProxyOptions(new ProxyOptions().setType(ProxyType.HTTP));
HttpClient client = vertx.createHttpClient(options);
client.request(HttpMethod.GET, "ftp://ftp.gnu.org/gnu/", ar -> {
if (ar.succeeded()) {
HttpClientRequest request = ar.result();
request.send(ar2 -> {
if (ar2.succeeded()) {
HttpClientResponse response = ar2.result();
System.out.println("Received response with status code " + response.statusCode());
}
});
}
});
使用HA代理协议
HA PROXY 协议 提供了一种便捷的安全传输连接信息(例如客户端的地址)的方式, 可以跨多层 NAT 或 TCP 代理传输。
HA PROXY 协议通过 setUseProxyProtocol
方法设置启用,
同时需要在classpath中增加以下依赖:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-haproxy</artifactId>
<!--<version>必须和 Vert.x 所使用的 netty 的版本一致</version>-->
</dependency>
HttpServerOptions options = new HttpServerOptions()
.setUseProxyProtocol(true);
HttpServer server = vertx.createHttpServer(options);
server.requestHandler(request -> {
// 打印HA代理协议提供的真实地址,而不是代理的地址
System.out.println(request.remoteAddress());
// 打印代理的地址
System.out.println(request.localAddress());
});
使用共享数据的API
顾名思义,共享数据(SharedData)
API允许您在如下组件中安全地共享数据:
-
应用程序的不同部分之间,或者
-
同一 Vert.x 实例中的不同应用程序之间,或者
-
Vert.x 集群中的不同实例之间
在实践中, 它提供了:
-
synchronous maps (local-only)
-
asynchronous maps
-
asynchronous locks
-
asynchronous counters
重要
|
分布式数据结构的行为取决于您使用的集群管理器。 网络分区面临的备份(复制)以及当出现网络分区时的行为,由集群管理器和其配置来决定。 请参阅集群管理器文档以及底层框架手册。 |
Local maps
Local maps
允许您在同一个 Vert.x 实例中的不同事件循环(如不同的 verticle)之间安全地共享数据。
仅允许将某些数据类型作为键值和值:
-
不可变的类型 (如 String、boolean,等等),或
-
实现了
Shareable
接口的类型 (比如Buffer,JSON数组,JSON对象,或您编写的Shareable实现类)。
在后一种情况中,键/值将被复制,然后再放到Map中。
这样,我们可以确保在Vert.x应用程序不同线程之间没有 共享访问可变状态 。 因此您不必担心需要通过同步访问来保护该状态。
以下是使用一个共享的 local map 的示例:
SharedData sharedData = vertx.sharedData();
LocalMap<String, String> map1 = sharedData.getLocalMap("mymap1");
map1.put("foo", "bar"); // String是不可变的,所以不需要复制
LocalMap<String, Buffer> map2 = sharedData.getLocalMap("mymap2");
map2.put("eek", Buffer.buffer().appendInt(123)); // Buffer将会在添加到Map之前拷贝
// 之后... 在您应用的另外一部分
map1 = sharedData.getLocalMap("mymap1");
String val = map1.get("foo");
map2 = sharedData.getLocalMap("mymap2");
Buffer buff = map2.get("eek");
异步共享的 maps
异步共享的 maps
允许数据被放到 map 中,并从本地或任何其他节点读取。
这使得它们对于托管Vert.x Web应用程序的服务器场中的会话状态存储非常有用。
获取Map的过程是异步的,返回结果可以传递给您指定的处理器。。以下是一个例子:
SharedData sharedData = vertx.sharedData();
sharedData.<String, String>getAsyncMap("mymap", res -> {
if (res.succeeded()) {
AsyncMap<String, String> map = res.result();
} else {
// 发生错误
}
});
当 Vert.x 是集群模式时, 你放进map的数据,从本地以及从集群中的其他成员那里都可以访问到。
重要
|
在集群模式中, 异步共享的 maps 依靠于集群管理器提供的分布式数据结构。 请注意,异步共享map操作的延迟,在集群模式下可能比在本地模式下高很多。 |
如果你的应用不需要和其它任何节点共享数据,那么你可以获取一个仅限本地的 map:
SharedData sharedData = vertx.sharedData();
sharedData.<String, String>getLocalAsyncMap("mymap", res -> {
if (res.succeeded()) {
// 仅限本地的异步map
AsyncMap<String, String> map = res.result();
} else {
// 发生错误
}
});
将数据放入map
您可以使用 put
方法将数据放入map。
put 方法是异步的,一旦完成它会通知处理器:
map.put("foo", "bar", resPut -> {
if (resPut.succeeded()) {
// 成功放入值
} else {
// 发生错误
}
});
异步锁
异步锁
允许您在集群中获取独占锁。
异步锁适用于:同一时刻仅在一个节点上执行某些操作或访问某个资源。
集群范围锁具有异步API,它和大多数等待锁释放的阻塞调用线程的API锁不相同。
若您拥有的锁没有其他调用者,集群上的任何地方都可以获得该锁。
当您用完锁后,您可以调用 release
方法来释放它,以便另一个调用者可获得它。
SharedData sharedData = vertx.sharedData();
sharedData.getLock("mylock", res -> {
if (res.succeeded()) {
// 获得锁
Lock lock = res.result();
// 5秒后我们释放该锁以便其他人可以得到它
vertx.setTimer(5000, tid -> lock.release());
} else {
// 发生错误
}
});
您可以为锁设置一个超时时间,若获取锁超时,则会通知处理器获取锁失败:
SharedData sharedData = vertx.sharedData();
sharedData.getLockWithTimeout("mylock", 10000, res -> {
if (res.succeeded()) {
// 获得锁
Lock lock = res.result();
} else {
// 获取锁失败
}
});
有更多信息,请参阅 API文档
。
重要
|
在集群模式中, 异步锁依靠于集群管理器提供的分布式数据结构。 请注意,异步共享锁的操作的延迟,在集群模式下可能比在本地模式下高很多。 |
如果你的应用不需要和其它任何节点共享锁,你可以获取一个仅限本地的锁:
SharedData sharedData = vertx.sharedData();
sharedData.getLocalLock("mylock", res -> {
if (res.succeeded()) {
// 仅限本地的计数器
Lock lock = res.result();
// 5秒后我们释放该锁以便其他人可以得到它
vertx.setTimer(5000, tid -> lock.release());
} else {
// 发生错误
}
});
异步计数器
有时你会需要在本地或者在应用节点之间维护一个原子计数器。
您可以用 Counter
来做到这一点。
您可以通过 getCounter
方法获取一个实例:
SharedData sharedData = vertx.sharedData();
sharedData.getCounter("mycounter", res -> {
if (res.succeeded()) {
Counter counter = res.result();
} else {
// 发生错误
}
});
在获取了一个实例后,您可以用多种方式获取当前的计数、原子地+1、-1、 加某个特定值。
有更多信息,请参阅 API文档
。
重要
|
在集群模式中, 异步计数器依靠于集群管理器提供的分布式数据结构。 请注意,异步共享计数器操作的延迟,在集群模式下可能比在本地模式下高很多。 |
如果你的应用不需要和其它任何节点共享计数器, 你可以获取一个仅限本地的计数器:
SharedData sharedData = vertx.sharedData();
sharedData.getLocalCounter("mycounter", res -> {
if (res.succeeded()) {
// 仅限本地的计数器
Counter counter = res.result();
} else {
// 发生错误
}
});
使用 Vert.x 访问文件系统
Vert.x的 FileSystem
对象提供了许多操作文件系统的方法。
每个Vert.x 实例有一个文件系统对象,您可以使用 fileSystem
方法获取它。
每个操作都提供了阻塞和非阻塞版本,其中非阻塞版本接受一个处理器(Handler), 当操作完成或发生错误时调用该处理器。
以下是文件异步拷贝的示例:
FileSystem fs = vertx.fileSystem();
// 从foo.txt拷贝到bar.txt
fs.copy("foo.txt", "bar.txt", res -> {
if (res.succeeded()) {
// 拷贝完成
} else {
// 发生错误
}
});
阻塞版本的方法名为 xxxBlocking
,它要么返回结果,要么直接抛出异常。
很多情况下,一些潜在的阻塞操作可以快速返回(这取决于操作系统和文件系统),
这就是我们为什么提供它。但是强烈建议您在event-loop中使用它之前测试使用它们究竟需要耗费多长时间,
以避免打破黄金法则。
以下是使用阻塞 API的拷贝示例:
FileSystem fs = vertx.fileSystem();
// 同步拷贝从foo.txt到bar.txt
fs.copyBlocking("foo.txt", "bar.txt");
Vert.x 文件系统支持 copy、move、truncate、chmod 等等许多其他文件操作。
我们不会在这里列出所有内容,请参考 API文档
获取完整列表。
让我们看看使用异步方法的几个例子:
vertx.fileSystem().readFile("target/classes/readme.txt", result -> {
if (result.succeeded()) {
System.out.println(result.result());
} else {
System.err.println("Oh oh ..." + result.cause());
}
});
// 拷贝文件
vertx.fileSystem().copy("target/classes/readme.txt", "target/classes/readme2.txt", result -> {
if (result.succeeded()) {
System.out.println("File copied");
} else {
System.err.println("Oh oh ..." + result.cause());
}
});
// 写文件
vertx.fileSystem().writeFile("target/classes/hello.txt", Buffer.buffer("Hello"), result -> {
if (result.succeeded()) {
System.out.println("File written");
} else {
System.err.println("Oh oh ..." + result.cause());
}
});
// 检测是否已经存在以及删除
vertx.fileSystem().exists("target/classes/junk.txt", result -> {
if (result.succeeded() && result.result()) {
vertx.fileSystem().delete("target/classes/junk.txt", r -> {
System.out.println("File deleted");
});
} else {
System.err.println("Oh oh ... - cannot delete the file: " + result.cause());
}
});
异步文件访问
Vert.x提供了异步文件访问的抽象,允许您操作文件系统上的文件。
您可以像下边代码打开一个 AsyncFile
:
OpenOptions options = new OpenOptions();
fileSystem.open("myfile.txt", options, res -> {
if (res.succeeded()) {
AsyncFile file = res.result();
} else {
// 发生错误
}
});
AsyncFile
实现了 ReadStream
和 WriteStream
接口,因此您可以将文件和其他流对象配合 管道 工作,
如NetSocket、HTTP请求和响应和WebSocket等。
它们还允许您直接读写。
随机访问写
要使用 AsyncFile
进行随机访问写,请使用
write
方法。
这个方法的参数有:
-
buffer
:要写入的缓冲 -
position
:一个整数,指定在文件中写入缓冲的位置,若位置大于或等于文件大小, 文件将被扩展以适应偏移的位置。 -
handler
:结果处理器
这是随机访问写的示例:
vertx.fileSystem().open("target/classes/hello.txt", new OpenOptions(), result -> {
if (result.succeeded()) {
AsyncFile file = result.result();
Buffer buff = Buffer.buffer("foo");
for (int i = 0; i < 5; i++) {
file.write(buff, buff.length() * i, ar -> {
if (ar.succeeded()) {
System.out.println("Written ok!");
// 等等
} else {
System.err.println("Failed to write: " + ar.cause());
}
});
}
} else {
System.err.println("Cannot open file " + result.cause());
}
});
随机访问读
要使用 AsyncFile
进行随机访问读,请使用
read
方法。
该方法的参数有:
-
buffer
:读取数据的 Buffer -
offset
:读取数据将被放到 Buffer 中的偏移量 -
position
:从文件中读取数据的位置 -
length
:要读取的数据的字节数 -
handler
:结果处理器
以下是随机访问读的示例:
vertx.fileSystem().open("target/classes/les_miserables.txt", new OpenOptions(), result -> {
if (result.succeeded()) {
AsyncFile file = result.result();
Buffer buff = Buffer.buffer(1000);
for (int i = 0; i < 10; i++) {
file.read(buff, i * 100, i * 100, 100, ar -> {
if (ar.succeeded()) {
System.out.println("Read ok!");
} else {
System.err.println("Failed to write: " + ar.cause());
}
});
}
} else {
System.err.println("Cannot open file " + result.cause());
}
});
打开选项
打开 AsyncFile
时,您可以传递一个 OpenOptions
实例,
这些选项描述了访问文件的行为。例如:您可使用
setRead
,setWrite
和 setPerms
方法配置文件访问权限。
若打开的文件已经存在,则可以使用
setCreateNew
和
setTruncateExisting
配置对应行为。
您可以使用
setDeleteOnClose
标记在关闭时或JVM停止时要删除的文件。
将 AsyncFile 作为 ReadStream 和 WriteStream
AsyncFile
实现了 ReadStream
和 WriteStream
接口。
您可以使用 管道 将数据与其他读取和写入流进行数据管送。
例如,下面的例子会将内容复制到另外一个 AsyncFile
:
final AsyncFile output = vertx.fileSystem().openBlocking("target/classes/plagiary.txt", new OpenOptions());
vertx.fileSystem().open("target/classes/les_miserables.txt", new OpenOptions(), result -> {
if (result.succeeded()) {
AsyncFile file = result.result();
file.pipeTo(output)
.onComplete(v -> {
System.out.println("Copy done");
});
} else {
System.err.println("Cannot open file " + result.cause());
}
});
您还可以使用 管道 将文件内容写入到HTTP 响应中,或者写入任意
WriteStream
。
从 Classpath 访问文件
当Vert.x找不到文件系统上的文件时,它尝试从类路径中解析该文件。
请注意,类路径的资源路径不以
/
开头。
由于Java不提供对类路径资源的异步方法, 所以当类路径资源第一次被访问时, 该文件将复制到工作线程中的文件系统。 当第二次访问相同资源时,访问的文件直接从 (工作线程的)文件系统提供。 即使类路径资源发生变化(例如开发系统中), 也会提供之前的内容。
此(文件)缓存行为可以通过 setFileCachingEnabled
方法进行设定。如果系统属性中没有预先设置 vertx.disableFileCaching
,则其默认值为 true
。
文件缓存的路径默认为 .vertx
,它可以通过设置系统属性
vertx.cacheDirBase
进行自定义。
如果想在系统级禁用整个classpath解析功能,可以将系统属性
vertx.disableFileCPResolving
设置为 true
。
注意
|
当加载 io.vertx.core.file.FileSystemOptions 类时,这些系统属性将被加载一次。
因此,在加载此类之前应该设置这些属性,或者在启动它时作为JVM系统属性来设置。
|
如果要禁用特定应用程序的类路径解析,但默认情况下在系统范围内将其保持启用状态,
则可以通过 setClassPathResolvingEnabled
选项设置。
关闭 AsyncFile
您可调用 close
方法来关闭 AsyncFile
。
关闭是异步的,如果希望在关闭过后收到通知,您可指定一个处理器作为函数 close
的参数。。
数据报套接字(UDP)
在Vert.x中使用用户数据报协议(UDP)就是小菜一碟。
UDP是无连接的传输,这意味着您与远程客户端没有建立持续的连接。
所以,您发送和接收的数据包都要包含有远程的地址。
除此之外,UDP不像TCP的使用那样安全, 这也就意味着不能保证发送的数据包一定会被对应的接收端(Endpoint)接收。
唯一可以保证的是,数据包要么被完整地接收,要么完全收不到,
因为每一个数据包将会作为一个包发送,所以在通常情况下您不能发送大于网络接口的最大传输单元(MTU)的数据包。
(译者注:实际上大于MTU的包是可以发送成功的,只不过数据包会在IP层做分片。由于分片不携带4层信息,所以有些NAT为了性能等因素会丢弃分片报文。而接收分片报文时,需要所有分片在规定时间内全部接收到,才算作收到一个完整的UDP包,所以分片报文的传输失败率会更高。一般不要发送超过 MTU - len(IP.header) - len(UDP.header)
长度的包。)
但是要注意,即使数据包尺寸小于MTU,它仍然可能会发送失败。
它失败的尺寸取决于操作系统等(其他原因),所以按照经验法则就是尝试发送小数据包。
依照UDP的本质,它最适合一些允许丢弃数据包的应用 (如监视应用程序)。
其优点是与TCP相比具有更少的开销, 而且可以由NetServer和NetClient处理(参考前文)。
创建 DatagramSocket
要使用UDP,您首先要创建一个 DatagramSocket
实例,
无论您是要仅仅发送数据或者收发数据,这都是一样的。
DatagramSocket socket = vertx.createDatagramSocket(new DatagramSocketOptions());
返回的 DatagramSocket
实例不会绑定到特定端口。
如果您只想发送数据(如作为客户端)的话,这是没问题的,但更多详细的内容在下一节。
发送数据报包
如上所述,用户数据报协议(UDP)将数据分组发送给远程对等体, 但是以不持续的方式来传送到它们。
这意味着每个数据包都可以发送到不同的远程对等体。
发送数据包很容易,如下所示:
DatagramSocket socket = vertx.createDatagramSocket(new DatagramSocketOptions());
Buffer buffer = Buffer.buffer("content");
// 发送Buffer
socket.send(buffer, 1234, "10.0.0.1", asyncResult -> {
System.out.println("Send succeeded? " + asyncResult.succeeded());
});
// 发送一个字符串
socket.send("A string used as content", 1234, "10.0.0.1", asyncResult -> {
System.out.println("Send succeeded? " + asyncResult.succeeded());
});
接收数据报包
若您想要接收数据包,则您需要调用 listen(…)
方法绑定
DatagramSocket
。
这样您就可以接收到被发送至 DatagramPacket
所监听的地址和端口的
DatagramSocket
。
除此之外,您还要设置一个 Handler
,每接收到一个 DatagramPacket
时它都会被调用。
DatagramPacket
有以下方法:
当您需要监听一个特定地址和端口时,您可以像下边这样:
DatagramSocket socket = vertx.createDatagramSocket(new DatagramSocketOptions());
socket.listen(1234, "0.0.0.0", asyncResult -> {
if (asyncResult.succeeded()) {
socket.handler(packet -> {
// 对包进行处理
});
} else {
System.out.println("Listen failed" + asyncResult.cause());
}
});
注意,即使 AsyncResult
成功,它只意味着它可能已经写入了网络堆栈,
但不保证它已经到达或者将到达远端。
若您需要这样的保证,您可在TCP之上建立一些握手逻辑。
多播
发送多播数据包
多播允许多个Socket接收相同的数据包, 该目标可以通过加入到同一个可发送数据包的多播组来实现。
我们将在下一节中介绍如何加入多播组,从而接收数据包。
现在让我们专注于如何发送多播报文,发送多播报文与发送普通数据报报文没什么不同。 唯一的区别是您可以将多播组的地址传递给send方法发送出去。
如下所示:
DatagramSocket socket = vertx.createDatagramSocket(new DatagramSocketOptions());
Buffer buffer = Buffer.buffer("content");
// 发送Buffer到多播地址
socket.send(buffer, 1234, "230.0.0.1", asyncResult -> {
System.out.println("Send succeeded? " + asyncResult.succeeded());
});
所有已经加入多播组 230.0.0.1 的Socket都将收到该报文。
接收多播数据包
若要接收特定多播组的数据包,您需要通过调用 DatagramSocket
的 listen(…)
方法来绑定一个地址并且加入多播组。
这样,您将能够接收到被发送到 DatagramSocket
所监听的地址和端口的数据报,
同时也可以接收被发送到该多播组的数据报。
除此之外,您还可设置一个处理器,它在每次接收到DatagramPacket时会被调用。
DatagramPacket
有以下方法:
-
sender()
: 表示数据报发送方的InetSocketAddress -
data()
: 保存接收数据的Buffer
因此,要监听指定的地址和端口、并且接收多播组230.0.0.1的数据报, 您需要执行如下操作:
DatagramSocket socket = vertx.createDatagramSocket(new DatagramSocketOptions());
socket.listen(1234, "0.0.0.0", asyncResult -> {
if (asyncResult.succeeded()) {
socket.handler(packet -> {
// 对数据包进行处理
});
// 加入多播组
socket.listenMulticastGroup("230.0.0.1", asyncResult2 -> {
System.out.println("Listen succeeded? " + asyncResult2.succeeded());
});
} else {
System.out.println("Listen failed" + asyncResult.cause());
}
});
取消订阅/离开多播组
有时候您想只在特定时间内接收多播组的数据包。
这种情况下,您可以先监听他们,之后再取消监听。
如下所示:
DatagramSocket socket = vertx.createDatagramSocket(new DatagramSocketOptions());
socket.listen(1234, "0.0.0.0", asyncResult -> {
if (asyncResult.succeeded()) {
socket.handler(packet -> {
// 对数据包进行处理
});
// 加入多播组
socket.listenMulticastGroup("230.0.0.1", asyncResult2 -> {
if (asyncResult2.succeeded()) {
// 现在将接收组的数据包
// 做一些事情
socket.unlistenMulticastGroup("230.0.0.1", asyncResult3 -> {
System.out.println("Unlisten succeeded? " + asyncResult3.succeeded());
});
} else {
System.out.println("Listen failed" + asyncResult2.cause());
}
});
} else {
System.out.println("Listen failed" + asyncResult.cause());
}
});
屏蔽多播
除了取消监听一个多播地址以外,也可以做到屏蔽指定发送者地址的多播。
请注意这仅适用于某些操作系统和内核版本, 所以请检查操作系统文档看是它是否支持。
这是专家级别的技巧。
要屏蔽来自特定地址的多播,您可以在DatagramSocket上调用 blockMulticastGroup(…)
,
如下所示:
DatagramSocket socket = vertx.createDatagramSocket(new DatagramSocketOptions());
// 一些代码
// 这将拒收从10.0.0.2发送的数据包
socket.blockMulticastGroup("230.0.0.1", "10.0.0.2", asyncResult -> {
System.out.println("block succeeded? " + asyncResult.succeeded());
});
DatagramSocket 属性
当创建 DatagramSocket
时,您可以通过
DatagramSocketOptions
对象来设置多个属性以更改它的功能。这些(属性)如下:
-
setSendBufferSize
以字节为单位设置发送缓冲区的大小。 -
setReceiveBufferSize
设置TCP接收缓冲区大小 (以字节为单位)。 -
setReuseAddress
若为true, 则TIME_WAIT状态中的地址在关闭后可重用。 -
setBroadcast
设置或清除SO_BROADCAST套接字选项。 设置此选项时,数据报(UDP)数据包可能会发送到本地接口的广播地址。 -
setMulticastNetworkInterface
设置或清除IP_MULTICAST_LOOP套接字选项, 设置此选项时,多播数据包也将在 本地接口上接收。 -
setMulticastTimeToLive
设置IP_MULTICAST_TTL套接字选项。 TTL表示“活动时间”,单这种情况下,它指定允许数据包经过的IP跳数,特别是用于多播流量。 转发数据包的每个路由器或网关会递减TTL, 如果路由器将TTL递减为0,则不会再转发。
DatagramSocket本地地址
您可以通过调用 localAddress
来查找套接字的本地地址(即UDP Socket这边的地址)。
若您在调用 listen(…)
之前已经绑定了 DatagramSocket
,则它将返回一个InetSocketAddress,否则返回null。
关闭DatagramSocket
您可以通过调用 close
方法来关闭Socket,它将关闭
Socket并释放所有资源。
DNS 客户端
通常情况下,您需要以异步方式来获取DNS信息。 但不幸的是,Java 虚拟机本身附带的API是不可能的, 因此Vert.x提供了它自己的完全异步解析DNS的API。
若要获取DnsClient实例,您可以通过Vertx实例来创建一个。
DnsClient client = vertx.createDnsClient(53, "10.0.0.1");
创建DnsClient时亦可通过传入配置设定查询的过期时间。
DnsClient client = vertx.createDnsClient(new DnsClientOptions()
.setPort(53)
.setHost("10.0.0.1")
.setQueryTimeout(10000)
);
创建DnsClient的时候,不指定参数或者不指定服务器地址的话,DnsClient则会使用服务器内部地址, 来进行非阻塞的域名解析。
DnsClient client1 = vertx.createDnsClient();
// 指定超时时间
DnsClient client2 = vertx.createDnsClient(new DnsClientOptions().setQueryTimeout(10000));
lookup
尝试为一个指定名称元素获取A(ipv4)或 AAAA(ipv6)记录时,第一条被返回的(记录)将会被使用。 它的操作方式和操作系统上使用 "nslookup" 类似。
要为 vertx.io
获取 A/AAAA 记录,您需要像下面那样做:
DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client.lookup("vertx.io", ar -> {
if (ar.succeeded()) {
System.out.println(ar.result());
} else {
System.out.println("Failed to resolve entry" + ar.cause());
}
});
lookup4
尝试查找给定名称的A(ipv4)记录。第一个返回的(记录)将会被使用, 因此它的操作方式与操作系统上使用 "nslookup" 类似。
要查找 "vertx.io" 的A记录,您需要像下面那样做:
DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client.lookup4("vertx.io", ar -> {
if (ar.succeeded()) {
System.out.println(ar.result());
} else {
System.out.println("Failed to resolve entry" + ar.cause());
}
});
lookup6
尝试查找给定名称的 AAAA(ipv6)记录。第一个返回的(记录)将会被使用, 因此它的操作方式与在操作系统上使用 "nslookup" 类似。
要查找 "vertx.io" 的 AAAA记录,您需要像下面那样做:
DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client.lookup6("vertx.io", ar -> {
if (ar.succeeded()) {
System.out.println(ar.result());
} else {
System.out.println("Failed to resolve entry" + ar.cause());
}
});
resolveA
尝试解析给定名称的所有A(ipv4)记录, 这与在类unix操作系统上使用 "dig" 类似。
要查找 "vertx.io" 的所有A记录,您通常会执行以下操作:
DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client.resolveA("vertx.io", ar -> {
if (ar.succeeded()) {
List<String> records = ar.result();
for (String record : records) {
System.out.println(record);
}
} else {
System.out.println("Failed to resolve entry" + ar.cause());
}
});
resolveAAAA
尝试解析给定名称的所有AAAA(ipv6)记录, 这与在类unix操作系统上使用 "dig" 类似。
要查找 "vertx.io" 的所有AAAA记录,您通常会执行以下操作:
DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client.resolveAAAA("vertx.io", ar -> {
if (ar.succeeded()) {
List<String> records = ar.result();
for (String record : records) {
System.out.println(record);
}
} else {
System.out.println("Failed to resolve entry" + ar.cause());
}
});
resolveCNAME
尝试解析给定名称的所有CNAME记录, 这与在类unix操作系统上使用 "dig" 类似。
要查找 "vertx.io" 的所有CNAME记录,您通常会执行以下操作:
DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client.resolveCNAME("vertx.io", ar -> {
if (ar.succeeded()) {
List<String> records = ar.result();
for (String record : records) {
System.out.println(record);
}
} else {
System.out.println("Failed to resolve entry" + ar.cause());
}
});
resolveMX
尝试解析给定名称的所有MX记录, MX记录用于定义哪个邮件服务器去接受指定域的电子邮件。
要查找 "vertx.io" 的所有MX记录,您通常会执行以下操作:
DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client.resolveMX("vertx.io", ar -> {
if (ar.succeeded()) {
List<MxRecord> records = ar.result();
for (MxRecord record: records) {
System.out.println(record);
}
} else {
System.out.println("Failed to resolve entry" + ar.cause());
}
});
请注意,列表将包含按照它们优先级排序的 MxRecord
,这意味着列表中优先级低的MX记录会第一个优先出现在列表中。
MxRecord
允许您通过下边提供的方法访问MX记录的优先级和名称:
record.priority();
record.name();
resolveTXT
尝试解析给定名称的所有TXT记录,TXT记录通常用于定义域的额外信息。
要解析 "vertx.io" 的所有TXT记录,您可以使用下边几行代码:
DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client.resolveTXT("vertx.io", ar -> {
if (ar.succeeded()) {
List<String> records = ar.result();
for (String record: records) {
System.out.println(record);
}
} else {
System.out.println("Failed to resolve entry" + ar.cause());
}
});
resolveNS
尝试解析给定名称的所有NS记录,NS记录指定一个DNS服务器, 这个服务器管理指定域的DNS信息。
要解析 "vertx.io" 的所有NS记录,您可以使用下边几行:
DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client.resolveNS("vertx.io", ar -> {
if (ar.succeeded()) {
List<String> records = ar.result();
for (String record: records) {
System.out.println(record);
}
} else {
System.out.println("Failed to resolve entry" + ar.cause());
}
});
resolveSRV
尝试解析给定名称的所有SRV记录,SRV记录用于定义服务端口和主机名等额外信息。 一些协议需要这些额外信息。
要查找 "vertx.io" 的所有SRV记录,您通常会执行以下操作:
DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client.resolveSRV("vertx.io", ar -> {
if (ar.succeeded()) {
List<SrvRecord> records = ar.result();
for (SrvRecord record: records) {
System.out.println(record);
}
} else {
System.out.println("Failed to resolve entry" + ar.cause());
}
});
请注意,列表将包含按照它们优先级排序的 SrvRecord
,这意味着优先级低的记录会第一个优先出现在列表中。
SrvRecord
允许您访问SRV记录本身中包含的所有信息:
record.priority();
record.name();
record.weight();
record.port();
record.protocol();
record.service();
record.target();
详细信息请参阅API文档
resolvePTR
尝试解析给定名称的PTR记录,PTR记录将ip地址映射到名称。
要解析IP地址 10.0.0.1 的PTR记录,您将使用 "1.0.0.10.in-addr.arpa"¸的PTR概念。
DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client.resolvePTR("1.0.0.10.in-addr.arpa", ar -> {
if (ar.succeeded()) {
String record = ar.result();
System.out.println(record);
} else {
System.out.println("Failed to resolve entry" + ar.cause());
}
});
reverseLookup
尝试对ipaddress进行反向查找,这与解析PTR记录类似。 但是允许您传递非有效PTR查询字符串的ip地址。
按照类似于下面这种方式来进行ip地址 10.0.0.1 的反向查找:
DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client.reverseLookup("10.0.0.1", ar -> {
if (ar.succeeded()) {
String record = ar.result();
System.out.println(record);
} else {
System.out.println("Failed to resolve entry" + ar.cause());
}
});
错误处理
如前边部分所述,DnsClient允许您传递一个Handler,
查询完成后会向其传入一个 AsyncResult。在出现错误的情况下,
通知中将包含一个 DnsException
,该异常会包含一个说明为何失败的 DnsResponseCode
。
此DnsResponseCode可帮助你更详细地检查原因。
DnsResponseCode的可能取值包含:
所有这些错误都由DNS服务器本身“生成”。
您可以从 DnsException 中获取 DnsResponseCode,如:
DnsClient client = vertx.createDnsClient(53, "10.0.0.1");
client.lookup("nonexisting.vert.xio", ar -> {
if (ar.succeeded()) {
String record = ar.result();
System.out.println(record);
} else {
Throwable cause = ar.cause();
if (cause instanceof DnsException) {
DnsException exception = (DnsException) cause;
DnsResponseCode code = exception.code();
// ...
} else {
System.out.println("Failed to resolve entry" + ar.cause());
}
}
});
流
在Vert.x中,有许多对象可以用于读取和写入。
在 Vert.x 中,写调用是立即返回的,而写操作的实际是在内部队列中排队写入。
不难看出,若写入对象的速度比实际写入底层数据资源速度快, 那么写入队列就会无限增长, 最终导致内存耗尽。
为了解决这个问题,Vert.x API中的一些对象提供了简单的流程控制( 回压 back-pressure )功能。
任何可控制的 写入 流对象都实现了 WriteStream
接口,
相应的,任何可控制的 读取 流对象都实现了 ReadStream
接口。
让我们举个例子,我们要从 ReadStream
中读取数据,然后将数据写入 WriteStream
。
一个非常简单的例子是从 NetSocket
读取然后写回到同一个 NetSocket
—— 因为 NetSocket
既实现了 ReadStream
也实现了 WriteStream
接口。
请注意,这些操作适用于任何实现了 ReadStream
和 WriteStream
接口的对象,
包括HTTP 请求、HTTP 响应、异步文件 I/O 和 WebSocket等。
一个最简单的方法是直接获取已经读取的数据,并立即将其写入
NetSocket
:
NetServer server = vertx.createNetServer(
new NetServerOptions().setPort(1234).setHost("localhost")
);
server.connectHandler(sock -> {
sock.handler(buffer -> {
// 直接把数据写回
sock.write(buffer);
});
}).listen();
上面的例子有一个问题:如果从Socket读取数据的速度比写回Socket的速度快,
那么它将在 NetSocket
的写队列中不断堆积,
最终耗尽内存。这是有可能会发生的,例如,若Socket另一端的客户端读取速度不够快,
无法快速地向连接的另一端回压。
由于 NetSocket
实现了 WriteStream
接口,我们可以在写入之前检查 WriteStream
是否已满:
NetServer server = vertx.createNetServer(
new NetServerOptions().setPort(1234).setHost("localhost")
);
server.connectHandler(sock -> {
sock.handler(buffer -> {
if (!sock.writeQueueFull()) {
sock.write(buffer);
}
});
}).listen();
这个例子不会耗尽内存,但如果写入队列已满,我们最终会丢失数据。
我们真正想要做的是在写入队列已满时暂停读取 NetSocket
:
NetServer server = vertx.createNetServer(
new NetServerOptions().setPort(1234).setHost("localhost")
);
server.connectHandler(sock -> {
sock.handler(buffer -> {
sock.write(buffer);
if (sock.writeQueueFull()) {
sock.pause();
}
});
}).listen();
我们已经快达到我们的目标,但还没有完全实现。现在 NetSocket
在文件已满时会暂停,
但是当写队列处理完成时,我们需要取消暂停:
NetServer server = vertx.createNetServer(
new NetServerOptions().setPort(1234).setHost("localhost")
);
server.connectHandler(sock -> {
sock.handler(buffer -> {
sock.write(buffer);
if (sock.writeQueueFull()) {
sock.pause();
sock.drainHandler(done -> {
sock.resume();
});
}
});
}).listen();
至此,我们的目标实现了。当写队列准备好接收更多的数据时,drainHandler
事件处理器将被调用,它会恢复 NetSocket
的状态,
允许读取更多的数据。
在编写Vert.x 应用程序时,这样做是很常见的,因此我们提供了一个名为
pipeTo
的方法替你完成这些繁杂的工作。
您只需要把 WriteStream
传给它并调用:
NetServer server = vertx.createNetServer(
new NetServerOptions().setPort(1234).setHost("localhost")
);
server.connectHandler(sock -> {
sock.pipeTo(sock);
}).listen();
以上和下面更详细的例子完全一样,额外加上stream对于失败和结束的处理:
当pipe最终成功或失败时, WriteStream
就会停止。
当读写操作结束时会发起通知:
server.connectHandler(sock -> {
// pipe和socket传输数据时,提供一个处理通知结果的handler
sock.pipeTo(sock, ar -> {
if (ar.succeeded()) {
System.out.println("Pipe succeeded");
} else {
System.out.println("Pipe failed");
}
});
}).listen();
当你处理异步目标时,你可以创建一个 Pipe
对象,
这个对象会暂停源流,并在源流通过pipe传输到目标时恢复源流:
server.connectHandler(sock -> {
// 创建异步操作管道
Pipe<Buffer> pipe = sock.pipe();
// 打开目标文件
fs.open("/path/to/file", new OpenOptions(), ar -> {
if (ar.succeeded()) {
AsyncFile file = ar.result();
// 用管道传输socket当中的信息到文件中,并最终关闭文件
pipe.to(file);
} else {
sock.close();
}
});
}).listen();
取消传输操作需要关闭pipe:
vertx.createHttpServer()
.requestHandler(request -> {
// 创建异步操作管道
Pipe<Buffer> pipe = request.pipe();
// 打开目标文件
fs.open("/path/to/file", new OpenOptions(), ar -> {
if (ar.succeeded()) {
AsyncFile file = ar.result();
// 用管道传输socket当中的信息到文件中,并最终关闭文件
pipe.to(file);
} else {
// 关闭管道,恢复请求,body当中的缓冲数据被丢弃
pipe.close();
// 返回错误
request.response().setStatusCode(500).end();
}
});
}).listen(8080);
当pipe关闭,steams的handler会被重置,ReadStream
恢复工作。
从上面可以看出,默认情况下,stream传输完毕之后,目标流都会停止。你可以 用pipe对象控制这些行为:
-
endOnFailure
控制失败时的操作 -
endOnSuccess
控制stream结束时的操作 -
endOnComplete
控制所有情况下的操作
下面是一个简单例子:
src.pipe()
.endOnSuccess(false)
.to(dst, rs -> {
// 追加文本并关闭关闭文件
dst.end(Buffer.buffer("done"));
});
让我们更进一步,看看 ReadStream
和 WriteStream
的细节。
ReadStream
ReadStream
(可读流) 接口的实现类包括: HttpClientResponse
, DatagramSocket
,
HttpClientRequest
, HttpServerFileUpload
,
HttpServerRequest
, MessageConsumer
,
NetSocket
, WebSocket
, TimeoutStream
,
AsyncFile
。
-
handler
: 设置一个处理器,它将从ReadStream
读取对象 -
pause
: 暂停处理器,暂停时,处理器中将不会收到任何对象 -
fetch
: 从stream中抓取指定数量的对象,任意对象抵达stream时,都会触发handler, fetch操作是累积的。 -
resume
: 恢复处理器,若任何对象到达目的地则handler将被触发;等价于fetch(Long.MAX_VALUE)
-
exceptionHandler
: 若ReadStream发生异常,将被调用 -
endHandler
: 当流的数据读取完毕时将被调用。触发原因是读取到了EOF
,可能分别来自如下: 与ReadStream
关联的文件、HTTP请求、或TCP Socket的连接被关闭
可读流有 flowing 和 fetch 两个模式:
-
最初 stream 是 <i>flowing</i> 模式
-
当 stream 处于 flowing 模式,stream中的元素被传输到handler
-
当 stream 处于 fetch 模式,只会将指定数量的元素传输到handler
-
resume()
设置ReadStream 为 flowing 模式 -
pause()
设置ReadStream 为 fetch 模式 并设置demand值为0 -
fetch(long)
请求指定数量的stream元素并将该数量加到目前的demand值当中
WriteStream
WriteStream
(可写流)接口的实现类包括:HttpClientRequest
,HttpServerResponse
WebSocket
,NetSocket
和 AsyncFile
。
函数:
-
write
: 往WriteStream写入一个对象,该方法将永远不会阻塞, 内部是排队写入并且底层资源是异步写入。 -
setWriteQueueMaxSize
: 设置写入队列容量——writeQueueFull
在队列 写满 时返回true
。 注意,当写队列已满时,调用写(操作)时 数据依然会被接收和排队。 实际数量取决于流的实现,对于Buffer
, size代表实际写入的字节数,而并非缓冲区的数量。 -
writeQueueFull
: 若写队列被认为已满,则返回true
。 -
exceptionHandler
:WriteStream
发生异常时调用。 -
drainHandler
: 判定WriteStream
有剩余空间时调用。
记录解析器(Record Parser)
记录解析器(Record Parser)允许您轻松解析由字节序列分割的协议,或者固定长度的协议。 它将输入缓冲区序列转换为按照配置重组后的缓冲区序列 (固定大小或带分隔符的记录)。
例如,若您使用 \n
分割的简单ASCII文本协议,并输入如下:
buffer1:HELLO\nHOW ARE Y
buffer2:OU?\nI AM
buffer3: DOING OK
buffer4:\n
记录解析器将生成下结果:
buffer1:HELLO
buffer2:HOW ARE YOU?
buffer3:I AM DOING OK
我们来看看相关代码:
final RecordParser parser = RecordParser.newDelimited("\n", h -> {
System.out.println(h.toString());
});
parser.handle(Buffer.buffer("HELLO\nHOW ARE Y"));
parser.handle(Buffer.buffer("OU?\nI AM"));
parser.handle(Buffer.buffer("DOING OK"));
parser.handle(Buffer.buffer("\n"));
我们还可以生成固定长度的块,如下:
RecordParser.newFixed(4, h -> {
System.out.println(h.toString());
});
有关更多详细信息,请查看 RecordParser
类。
Json 解析器
解析JSON结构很容易,但这要求你一次性提供完整的JSON, 而对于非常大的JSON结构,则并不是特别适合使用JSON解析器来处理。
非阻塞JSON解析器 则是一个事件驱动的解析器, 它可以处理体积非常大的JSON。 它会将一系列输入的buffer转换为一系列的JSON解析器事件。
JsonParser parser = JsonParser.newParser();
// 设置不同事件的handler
parser.handler(event -> {
switch (event.type()) {
case START_OBJECT:
// Json对象的开始
break;
case END_OBJECT:
// Json对象的结束
break;
case START_ARRAY:
// Json数组的开始
break;
case END_ARRAY:
// Json数组的结束
break;
case VALUE:
// 处理一个取值
String field = event.fieldName();
if (field != null) {
// 当前处于Json对象内
} else {
// 当前处于Json数组内,或Json最顶层
if (event.isString()) {
} else {
// ...
}
}
break;
}
});
该解析器是非阻塞的,并且事件由输入的buffer来驱动触发。
JsonParser parser = JsonParser.newParser();
// start array event
// start object event
// "firstName":"Bob" event
parser.handle(Buffer.buffer("[{\"firstName\":\"Bob\","));
// "lastName":"Morane" event
// end object event
parser.handle(Buffer.buffer("\"lastName\":\"Morane\"},"));
// start object event
// "firstName":"Luke" event
// "lastName":"Lucky" event
// end object event
parser.handle(Buffer.buffer("{\"firstName\":\"Luke\",\"lastName\":\"Lucky\"}"));
// end array event
parser.handle(Buffer.buffer("]"));
// Always call end
parser.end();
事件驱动的解析过程提供了更多的控制能力,同时带来了“需要处理细粒度事件”的缺点,有时候不太方便。 当你需要的时候,JSON解析器允许你将JSON结构作为值处理。
JsonParser parser = JsonParser.newParser();
parser.objectValueMode();
parser.handler(event -> {
switch (event.type()) {
case START_ARRAY:
// Start the array
break;
case END_ARRAY:
// End the array
break;
case VALUE:
// Handle each object
break;
}
});
parser.handle(Buffer.buffer("[{\"firstName\":\"Bob\"},\"lastName\":\"Morane\"),...]"));
parser.end();
value-mode
可以在解析过程中启用或停用,
允许你在细粒度事件与JSON对象事件之间自由切换。
JsonParser parser = JsonParser.newParser();
parser.handler(event -> {
// Start the object
switch (event.type()) {
case START_OBJECT:
// 设置为 value-mode,自此开始,解析器则不会触发start-object事件
parser.objectValueMode();
break;
case VALUE:
// 处理每一个对象
// 获得从对象中解析出来的字段
String id = event.fieldName();
System.out.println("User with id " + id + " : " + event.value());
break;
case END_OBJECT:
// 设置为 event mode,所以解析器重新触发 start/end 事件
parser.objectEventMode();
break;
}
});
parser.handle(Buffer.buffer("{\"39877483847\":{\"firstName\":\"Bob\"},\"lastName\":\"Morane\"),...}"));
parser.end();
你也可以对数组做同样的事情
JsonParser parser = JsonParser.newParser();
parser.handler(event -> {
// Json对象的开始
switch (event.type()) {
case START_OBJECT:
// 设置为value mode来处理每个元素,自此开始,解析器不会触发 start-array 事件
parser.arrayValueMode();
break;
case VALUE:
// 处理每一个数组
// 获取对象中的字段
System.out.println("Value : " + event.value());
break;
case END_OBJECT:
// 设置为 event mode,从而解析器会重新触发 start/end 事件
parser.arrayEventMode();
break;
}
});
parser.handle(Buffer.buffer("[0,1,2,3,4,...]"));
parser.end();
你也可以解码为POJO。
parser.handler(event -> {
// 获取每个对象
// 获取对象中的字段
String id = event.fieldName();
User user = event.mapTo(User.class);
System.out.println("User with id " + id + " : " + user.firstName + " " + user.lastName);
});
解析器解析buffer失败之后,会抛出异常;也可以通过设置 exception handler
处理异常:
JsonParser parser = JsonParser.newParser();
parser.exceptionHandler(err -> {
// 捕捉所有的解析/解码异常
});
解析器也可以解析JSON流:
-
连续的JSON流:
{"temperature":30}{"temperature":50}
-
行分割的JSON流:
{"an":"object"}\r\n3\r\n"a string"\r\nnull
更多细节,详见 JsonParser
。
运行阻塞式代码
在一个完美的世界中,不存在战争和饥饿,所有的API都将使用异步方式编写, 兔兔和小羊羔将会在阳光明媚的绿色草地上手牵手地跳舞。
但是……真实世界并非如此(您最近看新闻了吧?)
事实是很多库含有同步的API(如果不是多数的库的话,尤其JVM生态中的库),这些API中许多方法都是阻塞式的。 一个很好的例子就是 JDBC API,它本质上是同步的,无论多么努力地去尝试,Vert.x 都不能像魔法小精灵撒尘变法一样将它转换成异步API。
我们不会将所有的内容重写成异步方式,所以我们为您提供一种在 Vert.x 应用中安全调用"传统"阻塞API的方法。
如之前讨论,您不能在 Event Loop 中直接调用阻塞式操作,因为这样做会阻止 Event Loop 执行其他有用的任务。那您该怎么做?
可以通过调用 executeBlocking
方法来指定阻塞式代码的执行以及阻塞式代码执行后处理结果的异步回调。
vertx.executeBlocking(promise -> {
// 调用阻塞的、需要消耗显著执行时间的API
String result = someAPI.blockingMethod("hello");
promise.complete(result);
}, res -> {
System.out.println("The result is: " + res.result());
});
警告
|
阻塞式代码应该仅仅在合理的时间内阻塞(例如不超过几秒钟)。
长时间阻塞的操作或者轮询操作(例如一个线程以阻塞的方式不断的循环轮询事件)都应该避免。
当一个阻塞的操作持续超过10秒,blocked thread checker将会在控制台上打印一条消息。
长时间阻塞的操作应该由程序使用一个专用的线程管理,
他需要能够使用event-bus 或 runOnContext 与verticles交互
|
默认情况下,如果 executeBlocking
在同一个上下文环境中(如:同一个 Verticle 实例)被调用了多次,
那么这些不同的 executeBlocking
代码块会 顺序执行(一个接一个)。
若您不关心您调用 executeBlocking
的顺序,
可以将 ordered
参数的值设为 false
。这样任何 executeBlocking
都会在 Worker Pool 中并行执行。
另外一种运行阻塞式代码的方式是使用 worker verticle。
一个 Worker Verticle 始终会使用 Worker Pool 中的某个线程来执行。
默认情况下,阻塞式代码会在 Vert.x 的 Worker Pool 中执行,通过 setWorkerPoolSize
配置。
可以为不同的用途创建不同的池(pool):
WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool");
executor.executeBlocking(promise -> {
// 调用阻塞的、需要消耗显著执行时间的API
String result = someAPI.blockingMethod("hello");
promise.complete(result);
}, res -> {
System.out.println("The result is: " + res.result());
});
Worker Executor 在不需要的时候必须被关闭:
executor.close();
当使用同一个名字创建了许多 worker 时,它们将共享同一个 pool。
当所有的 worker executor 调用了 close
方法被关闭过后,对应的 worker pool 会被销毁。
如果 Worker Executor 在 Verticle 中创建,那么 Verticle 实例销毁的同时 Vert.x 将会自动关闭这个 Worker Executor。
Worker Executor 可以在创建的时候配置:
int poolSize = 10;
// 2分钟
long maxExecuteTime = 2;
TimeUnit maxExecuteTimeUnit = TimeUnit.MINUTES;
WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool", poolSize, maxExecuteTime, maxExecuteTimeUnit);
注意
|
这些配置信息在 worker pool 创建的时候设置。 |
Metrics SPI
默认情况下,Vert.x不会记录任何指标。相反,它提供了一个SPI,其他人可以将它的实现类添加到类路径中。
指标SPI是一项高级特性,允许实现类可以从Vert.x捕获事件以收集指标。
有关详细信息,请参阅
API 文档
。
若使用 setFactory
嵌入了Vert.x实例,
也可以用编程方式指定一个指标工厂。
"vertx" 命令行
vertx
命令用于在命令行中与 Vert.x 进行交互。主要用于运行 Vert.x Verticle。
为此,您需要下载并安装Vert.x 发行版,并将安装位置的 bin
目录添加
到 PATH
环境变量中,还要确保您的 PATH
上已配置了Java 8的JDK的路径。
注意
|
PATH 中的JDK是用于支持Java代码的快速编译。
|
运行 Verticles
您可以使用 vertx run
从命令行直接运行Vert.x 的 Verticle,以下是
run
命令 的几个示例:
vertx run my-verticle.js (1)
vertx run my-verticle.groovy (2)
vertx run my-verticle.rb (3)
vertx run io.vertx.example.MyVerticle (4)
vertx run io.vertx.example.MVerticle -cp my-verticle.jar (5)
vertx run MyVerticle.java (6)
-
部署一个JavaScript的Verticle
-
部署一个Groovy的Verticle
-
部署一个Ruby的Verticle
-
部署一个已经编译好的Java的Verticle,类的根路径是当前目录
-
部署一个已经打包成jar的Verticle,这个jar需要在类路径中
-
编译Java源代码并进行部署
正如您在Java中可看到的,该Verticle的名称可以是class文件的全限定类名, 也可以指定Java 源文件,Vert.x会为你编译它。
您可以在Verticle的名称前添加其他语言名称作为前缀来进行部署。例如:
若某个Verticle是Groovy编译的类,您可以使用语言前缀 groovy:
,让Vert.x 知道它是一个Groovy 类而不是Java 类。
vertx run groovy:io.vertx.example.MyGroovyVerticle
vertx run
命令可以使用几个可选参数,它们是:
-
-options <options>
- 提供Vert.x选项。options
是一个包含描述Vert.x选项的json文件的名称。该参数是可选的。 -
-conf <config>
- 提供了Verticle的一些配置,config
是一个包含描述Verticle配置的JSON文件的名称。该参数是可选的。 -
-cp <path>
- 搜索Verticle和它使用的其他任何资源的路径, 默认为.
(当前目录)。若您的Verticle引用了其他脚本、类或其他资源 (例如jar文件),请确保这些资源存在此路径上。该路径可以包含多个路径条目, 由:
(冒号)或;
(分号)进行分割——这取决于操作系统。每个路径条目可以是包含脚本的目录的绝对路径或相对路径, 也可以是jar或zip文件的绝对或相对文件名。 一个示例路径可能是-cp classes:lib/otherscripts:jars/myjar.jar:jars/otherjar.jar
。 始终使用路径引用您的Verticle需要的任何资源,不要 将它们放在系统类路径上, 因为这会导致部署的Verticle之间的隔离问题。 -
-instances <instances>
- 要实例化的Verticle实例的数目,每个Verticle实例都是严格单线程(运行)的, 因此为了在可用的cpu核心上扩展您的应用程序,您可能需要部署多个实例。 若省略,则部署单个实例。 -
-worker
- 此选项可确定一个Verticle是否为Worker Verticle。 -
-cluster
- 此选项确定Vert.x实例是否尝试与网络上的其他Vert.x实例形成集群, 集群Vert.x实例允许Vert.x与其他节点构建一个分布式Event Bus。 默认为false(非集群模式)。 -
-cluster-port
- 若指定了cluster
选项, 则可以确定哪个端口将用于与其他Vert.x实例进行集群通信。默认为0
——这意味着“ 选择一个空闲的随机端口 ”。 您通常不需要指定此参数,除非您需要绑定特定端口。 -
-cluster-host
- 若指定了cluster
选项,则可以确定哪个主机地址将用于与其他Vert.x实例进行集群通信。 若没有设置,集群的eventbus会尝试绑定到同一个host作为底层集群管理。 作为最后的手段, 将会在可用网路接口中选取其中一个。 -
-cluster-public-port
- 若指定了cluster
选项,则可以确定哪个端口将被公布用于与其他Vert.x实例进行集群通信。 默认值是-1
,表示与cluster-port
保持一致。 -
-cluster-public-host
- 若指定了cluster
选项,则可以确定哪个主机地址将被公布用于与其他Vert.x实例进行集群通信。 如果没有指定,则默认使用cluster-host
的值 -
-ha
- 若指定,该Verticle将部署为(支持)高可用性(HA)。 有关详细信息,请参阅相关章节。 -
-quorum
- 该参数需要和-ha
一起使用,它指定集群中所有 HA deploymentIDs 处于活动状态的最小节点数,默认为0。 -
-hagroup
- 该参数需要和-ha
一起使用,它指定此节点将加入的HA组。 集群中可以有多个HA组,节点只会故障转移到同一组中的其他节点。默认为__DEFAULT__
。
您还可以使用下边方式设置系统属性:-Dkey=value
。
下面有更多的例子:
使用默认设置运行JavaScript的Verticle:server.js:
vertx run server.js
运行指定类路径的预编译好的10个Java Verticle实例
vertx run com.acme.MyVerticle -cp "classes:lib/myjar.jar" -instances 10
通过 源文件 运行10个Java Verticle的实例
vertx run MyVerticle.java -instances 10
运行20个Ruby语言的Worker Verticle实例
vertx run order_worker.rb -instances 20 -worker
在同一台计算机上运行两个JavaScript Verticle, 并让它们彼此以及在网络上的其他任何服务器构建一个集群:
vertx run handler.js -cluster
vertx run sender.js -cluster
运行一个Ruby Verticle并传入一些配置:
vertx run my_verticle.rb -conf my_verticle.conf
其中 my_verticle.conf
也许会包含以下配置:
{
"name": "foo",
"num_widgets": 46
}
该配置可通过Core API在Verticle内部可用。
当使用Vert.x的高可用功能时,您可能需要创建一个Vert.x的 裸 实例。 此实例在启动时不会部署任何Verticle,但如果集群中的另一个节点消失,则会在此节点运行之前消失的实例。 如需要创建一个 裸 实例,执行以下命令:
vertx bare
根据您的集群配置,您可能需要添加 cluster-host
和 cluster-port
参数。
执行打包成 fat-jar 的Vert.x 应用
fat jar 是一个嵌入了所有依赖的可执行的jar,这意味着您不必在执行jar的机器上预先安装Vert.x。 它像任何可执行的Java jar一样可直接执行:
java -jar my-application-fat.jar
对于这点,Vert.x 没什么特别的,您可以使用任何Java应用程序。
您可以创建自己的主类并在 MANIFEST 中指定,但建议您将代码编写成Verticle,
并使用Vert.x中的 Launcher
类 (io.vertx.core.Launcher
) 作为您的主类。
这也是使用命令行运行Vert.x时使用的主类,因此允许您指定命令行参数,
如 -instances
以便更轻松地扩展应用程序。
要将您的Verticle全部部署在这个 fat-jar 中时,您必须将下边信息写入 manifest :
-
Main-Class
设置为io.vertx.core.Launcher
-
Main-Verticle
指定要运行的Main Verticle(Java完全限定类名或脚本文件名)
您还可以提供您将传递给 vertx run
的常用命令行参数:
java -jar my-verticle-fat.jar -cluster -conf myconf.json
java -jar my-verticle-fat.jar -cluster -conf myconf.json -cp path/to/dir/conf/cluster_xml
注意
|
请参阅官方 Vert.x Examples 仓库中的 Maven/Gradle 相应示例来了解如何将应用打包成fat-jar。 |
通过 fat jar 运行应用时,默认会执行 run
命令。
其他命令
除了 run
和 version
以外,vertx
命令行和 Launcher
还提供了其他 命令 :
您可以使用下边命令创建一个 bare
实例:
vertx bare
# or
java -jar my-verticle-fat.jar bare
您还可以在后台启动应用程序:
java -jar my-verticle-fat.jar start --vertx-id=my-app-name
若 my-app-name
未设置,将生成一个随机的id,并在命令提示符中打印。您可以将 run
选项传递给 start
命令:
java -jar my-verticle-fat.jar start —-vertx-id=my-app-name -cluster
一旦在后台启动,可以使用 stop
命令停止它:
java -jar my-verticle-fat.jar stop my-app-name
您还可以使用以下方式列出后台启动的Vert.x应用程序:
java -jar my-verticle-fat.jar list
vertx
工具也可以使用 start
、 stop
和 list
命令,start
命令支持以下几个选项:
-
vertx-id
:应用程序ID,若未设置,则使用随机UUID -
java-opts
:Java虚拟机选项,若未设置,则使用JAVA_OPTS
环境变量 -
redirect-output
:重定向生成的进程输出和错误流到父进程流
若选项值包含空白,则需使用 ""
(双引号)将选项值括起来。
由于 start
命令产生一个新的进程,传递给JVM的java选项不会被传播,所以您 必须
使用 java-opts
来配置JVM(-X
, -D
…)。若您使用 CLASSPATH
环境变量,
请确保路径下包含所有需要的jar(vertx-core、您的jar和所有依赖项)。
该命令集是可扩展的,请参考 扩展 Vert.x 启动器 章节。
实时重部署
在开发时,可以方便在文件更改时实时重新部署应用程序。vertx
命令行工具和更普遍的 Launcher
类提供了这个功能。
这里有些例子:
vertx run MyVerticle.groovy --redeploy="**/*.groovy" --launcher-class=io.vertx.core.Launcher
vertx run MyVerticle.groovy --redeploy="**/*.groovy,**/*.rb" --launcher-class=io.vertx.core.Launcher
java io.vertx.core.Launcher run org.acme.MyVerticle --redeploy="**/*.class" --launcher-class=io.vertx.core
.Launcher -cp ...
重新部署的过程执行如下。首先,您的应用程序作为后台应用程序启动
(使用 start
命令)。当发现文件更改时,该进程将停止并重新启动该应用。
这样可避免泄露。
要启用实时重新部署,请将 --redeploy
选项传递给 run
命令。--redeploy
表示要
监视 的文件集,这个集合可使用 Ant样式模式(使用 **
,*
和 ?
),
您也可以使用逗号(,
)分隔它们来指定多个集合。文件路径都是相对于当前工作目录。
传递给 run
命令的参数最终会传递给应用程序,可使用 --java-opts
配置JVM虚拟机选项。
例如,如果想传入一个 conf
参数或是系统属性,
您可以使用 --java-opts="-conf=my-conf.json -Dkey=value"
。
--launcher-class
选项确定应用程序的 主类 启动器。它通常是
Launcher
,但您也可以使用您自己的 主类 。
也可以在IDE中使用重部署功能:
-
Eclipse - 创建一个 Run 配置,使用
io.vertx.core.Launcher
类作为 主类 。在 Program Arguments 区域( Arguments 选项卡中),写入run your-verticle-fully-qualified-name --redeploy=/.java --launcher-class=io.vertx.core.Launcher
,您还可以添加其他参数。随着 Eclipse 在保存时增量编译您的文件,重部署工作会顺利进行。 -
IntelliJ - 创建一个 Run 配置(Application),将主类设置为
io.vertx.core.Launcher
。在 程序参数中写:run your-verticle-fully-qualified-name --redeploy=/.class --launcher-class=io.vertx.core.Launcher
。要触发重新部署,您需要显式 构造 项目或模块(Build → Make project)。
要调试应用程序,请将运行配置创建为远程应用程序,
并使用 --java-opts
配置调试器。每次重新部署后,请勿忘记重新插入(re-plug)调试器,
因为它每次都会创建一个新进程。
您还可以在重新部署周期中挂接(hook)构建过程:
java -jar target/my-fat-jar.jar --redeploy="**/*.java" --on-redeploy="mvn package"
java -jar build/libs/my-fat-jar.jar --redeploy="src/**/*.java" --on-redeploy='./gradlew shadowJar'
"on-redeploy"选项指定在应用程序关闭后和重新启动之前调用的命令。
因此,如果构建工具更新了某些运行时构件,则可以将其挂接。例如,您可以启动 gulp
或 grunt
来更新您的资源。如果需要传递参数到你的应用程序中,不要忘记将
--java-opts
添加到命令参数里:
java -jar target/my-fat-jar.jar --redeploy="**/*.java" --on-redeploy="mvn package" --java-opts="-Dkey=val"
java -jar build/libs/my-fat-jar.jar --redeploy="src/**/*.java" --on-redeploy='./gradlew shadowJar' --java-opts="-Dkey=val"
重新部署功能还支持以下设置:
-
redeploy-scan-period
:文件系统检查周期(以毫秒为单位),默认为250ms -
redeploy-grace-period
:在2次重新部署之间等待的时间(以毫秒为单位),默认为1000ms -
redeploy-termination-period
:停止应用程序后等待的时间 (在启动用户命令之前)。这个在Windows上非常有用,因为这个进程并没立即被杀死。 时间以毫秒为单位,默认0ms
集群管理器
在 Vert.x 中,集群管理器可用于各种功能,包括:
-
集群中 Vert.x 节点的发现和分组
-
维护集群范围中的主题订阅者列表(所以我们可知道哪些节点对哪个Event Bus地址感兴趣)
-
分布式Map的支持
-
分布式锁
-
分布式计数器
集群管理器 不 处理Event Bus节点之间的传输,这由 Vert.x 直接通过TCP连接完成。
Vert.x发行版中使用的默认集群管理器是使用的 Hazelcast 集群管理器, 但是它可以简单被替换成其他实现类,因为Vert.x集群管理器可插拔的。
集群管理器必须实现 ClusterManager
接口,
Vert.x在运行时使用Java的服务加载器
Service Loader
功能在类路径中查找 ClusterManager
的实例,从而定位集群管理器。
若您在命令行中使用Vert.x并要使用集群,则应确保Vert.x安装路径的 lib
目录包含您的集群管理器的jar包。
若您在 Maven/Gradle 项目使用Vert.x,则只需将集群管理器jar作为依赖添加到你的项目中。
您也可以以编程的方式在嵌入Vert.x 时使用
setClusterManager
指定集群管理器。
日志记录
Vert.x使用内置的日志API进行记录日志,并支持各种日志记录后端。
日志后端选择如下:
-
后端由设置的
vertx.logger-delegate-factory-class-name
系统属性表示,或者是 -
当在类路径下存在
vertx-default-jul-logging.properties
文件时,则使用JDK logging,或者是 -
类路径中存在以下实现,按照以下优先顺序进行选择:
-
SLF4J
-
Log4J
-
Log4J2
-
除此之外,Vert.x默认使用JDK日志记录
通过系统属性配置
设置 系统属性 vertx.logger-delegate-factory-class-name
的值为:
-
io.vertx.core.logging.SLF4JLogDelegateFactory
,则使用SLF4J, -
io.vertx.core.logging.Log4j2LogDelegateFactory
,则使用Log4J2, -
io.vertx.core.logging.JULLogDelegateFactory
,则使用JDK日志记录
自动配置
当没有设置系统属性 vertx.logger-delegate-factory-class-name
的值时,
Vert.x会尝试查找最合适的日志框架:
-
当类路径下有SLF4J实现类时,则使用SLF4J,例如
LoggerFactory.getILoggerFactory()
返回值不是NOPLoggerFactory
实例, -
否则,当classpath有Log4j2实现类时,则使用Log4j2
-
除此之外,使用JUL
配置JUL日志记录
JUL日志配置文件可以使用普通的JUL方式指定 —— 通过设置系统属性 java.util.logging.config.file
的值为您的配置文件。
更多关于此部分以及JUL配置文件结构的内容,请参阅 JUL
日志记录的文档。
Vert.x还提供了一种更方便的方式指定配置文件,无需设置系统属性。
您只需在您的类路径中提供名为 vertx-default-jul-logging.properties
的JUL配置文件(例如在您的fatjar中),Vert.x将使用该配置文件配置JUL。
Netty 日志记录
Netty并不依赖外部的日志配置(例如,系统属性)。 相反,它基于Netty类可见的日志库实现日志记录配置:
-
使用
SLF4J
库,如果它可见, -
否则使用
Log4j
,如果它可见, -
否则使用
Log4j2
,如果它可见, -
否则使用默认的
java.util.logging
注意
|
你们锐利的眼光可能已经注意到Vert.x遵循相同的优先级顺序 |
使用 io.netty.util.internal.logging.InternalLoggerFactory
可以直接强制设置日志实现类。
// 将日志实现强制设为 Log4j 2
InternalLoggerFactory.setDefaultFactory(Log4J2LoggerFactory.INSTANCE);
故障排除
SLF4J启动警告
若您在启动应用程序时看到以下信息:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
这意味着您的类路径中有 SLF4J-API 却没绑定到具体的实例类中。使用SLF4J记录的消息将会丢失。 您应该添加具体的实现到你的类路径下。参考 https://www.slf4j.org/manual.html#swapping 选择具体实现并配置。
请注意,Netty会寻找SLF4-API的jar,并在默认情况下使用它。
主机名解析
Vert.x 使用自带的网络地址解析器来执行主机名解析的工作(将主机名解析为IP地址), 而没有使用JVM内置的阻塞式解析器。
把主机名解析成IP地址的操作将会使用到:
-
操作系统的 hosts 文件
-
DNS查询服务器列表
默认情况下它使用系统环境中设定的DNS服务器地址列表,
如果无法获取该列表,则会使用谷歌的公用DNS服务器地址 "8.8.8.8"
以及 "8.8.4.4"
。
也可以在创建 Vertx
实例的时候配置DNS服务器:
Vertx vertx = Vertx.vertx(new VertxOptions().
setAddressResolverOptions(
new AddressResolverOptions().
addServer("192.168.0.1").
addServer("192.168.0.2:40000"))
);
DNS服务器的默认端口为 53
,当服务器使用不同的端口时,
可以使用半角冒号作为分隔符来指定端口,例如: 192.168.0.2:40000
。
注意
|
如果某些场景之下必须要使用JVM内置的解析器,此时可以通过在启动时设置系统属性 -Dvertx.disableDnsResolver=true 来激活JVM内置的解析器。 |
故障转移
当一个服务器没有及时响应时,解析器会从列表中取出下一个服务器进行查询,
该故障转移操作的次数限制可以通过 setMaxQueries
来设置(默认设置是 4
次)。
如果解析器在 getQueryTimeout
毫秒内未接收到正常响应,则认为DNS查询失败
(默认值是 5
秒)
服务器列表轮询
默认情况下,解析器总是使用服务器列表中的第一个服务器,剩下的服务器用于故障转移。
您可以将 setRotateServers
设置为 true
,
此时解析器将会使用 round-robin 风格的轮询操作,将查询的负担分摊到列表中的每一个服务器上,
从而避免所有的查询负担都落在列表中的第一个服务器上。
此时故障转移机制仍然有效,当某个服务器没有及时响应时,解析器会使用列表中的下一个服务器。
主机映射
操作系统自身的 hosts 文件用于查找主机名对应的IP地址。
除此之外也可以使用另外的 hosts 文件来代替操作系统自身的 hosts 文件:
Vertx vertx = Vertx.vertx(new VertxOptions().
setAddressResolverOptions(
new AddressResolverOptions().
setHostsPath("/path/to/hosts"))
);
DNS搜索域
默认情况下,解析器使用系统环境中设置的DNS搜索域。如果要使用显式指定的搜索域, 可以使用以下方式:
Vertx vertx = Vertx.vertx(new VertxOptions().
setAddressResolverOptions(
new AddressResolverOptions().addSearchDomain("foo.com").addSearchDomain("bar.com"))
);
当使用搜索域列表时, “.” 符号数量的阈值一般为 1
,在Linux操作系统里该阈值由 /etc/resolv.conf
文件来指定,
通过 setNdots
可以人为指定该阈值的大小。
高可用与故障转移
Vert.x 可以支持 verticle 运行于高可用(HA)模式。这种模式之下, 如果一个 vert.x 实例所运行的 verticle 突然宕掉,该 verticle 将会被迁移到其他的 vert.x 实例中 (该 vert.x 实例必须处于同一个集群之中)。
自动故障转移
当运行 vert.x 时开启了高可用(HA)选项,此时如果某个 vert.x 实例中的某个 verticle 运行失败或者宕掉, 该 verticle 将会被自动重新部署于集群中的另一个 vert.x 实例中。我们把这种机制称为 verticle故障转移。
只要在运行 vert.x 的命令行中追加 -ha
参数,就可以开启 高可用 模式:
vertx run my-verticle.js -ha
要让高可用机制起作用,您需要在集群中开启至少2个 Vert.x 实例, 现在假设您已经在集群中运行了一个 Vert.x 实例,例如:
vertx run my-other-verticle.js -ha
此时如果运行 my-verticle.js
的 Vert.x 实例宕掉了(例如您可以使用 kill -9
命令强行杀掉这个进程来模拟此场景),
运行 my-other-verticle.js
的 Vert.x 实例会自动地部署 my-verticle.js
,
此时该 Vert.x 实例同时运行了这两个 verticle (my-other-verticle.js 和 my-verticle.js)。
注意
|
如果要使得这种迁移机制起作用,则必须保证第二个 vert.x 实例可以访问到该 verticle 对应的文件
(在此场景中指的是 my-verticle.js )。
|
重要
|
请注意,通过正常方式退出的 Vert.x 实例不会触发故障转移操作
(例如使用 CTRL-C 组合键或者 kill -SIGINT 命令)。
|
您也可以启动若干个 Vert.x 裸 实例————指的是它们在启动时没有加载任何 verticle,此时, 它们一样可以对集群中的其他节点起到故障转移的作用。启动一个空白的 Vert.x 实例很简单,只需要执行以下命令:
vertx run -ha
当使用 -ha
参数时, 可以不需要再追加 -cluster
参数,
因为高可用模式是假定了您需要运行在集群模式之下的。
注意
|
依据您的集群配置选项,您可能还是需要自定义集群管理器
(默认使用 Hazelcast),以及追加集群主机(cluster-host )和集群端口(cluster-port )等参数。
|
高可用组
当 Vert.x 实例运行于高可用模式时,您还可以对其进行高可用分组,这里称之为 高可用组 。
此处的高可用组指的是一个集群之中的节点的一种逻辑分组,被分配了高可用组的节点只会对同一个高可用组之下的其他节点执行故障转移操作。
如果没有指定高可用组,系统会自动将节点分配到默认的 __DEFAULT__
高可用组。
在运行 verticle 时可以使用 -hagroup
参数指定高可用分组,例如:
vertx run my-verticle.js -ha -hagroup my-group
举个例子:
在第一个终端里运行:
vertx run my-verticle.js -ha -hagroup g1
在第二个终端里,我们以同一个高可用组运行另一个 verticle:
vertx run my-other-verticle.js -ha -hagroup g1
最后,在第三个终端里,我们以不同的高可用组再运行一个其他的 verticle:
vertx run yet-another-verticle.js -ha -hagroup g2
如果我们杀掉第一个终端里的实例,这里面的 verticle 将会通过故障转移机制迁移到第二个终端里的实例中, 而不是第三个终端里的实例中,因为第三个终端里的实例被分配了不同的高可用组。
如果杀掉第三个终端里的实例,则不会发生故障转移操作, 因为此终端里的 vert.x 实例被分配了不同的高可用组。
处理网络分区 - Quora
高可用实现也支持 quora
(一种多数派机制)。在分布式系统中, Quorum 是指一种投票机制,在这种投票机制之下,
某个分布式事务只有获得不少于指定投票数量的票数,才允许执行某个操作。
启动 Vert.x 实例的时候,您可以将其设置成在进行高可用(HA)部署之前需要一个 quorum
。
在这个语境之下, quorum
指的是集群中某个特定的分组内的节点数量的下限。
典型的例如您将 quorum
的数值设置为 1 + N/2
(现在以 Q 指代该数值,其中的 N 代表分组中的节点总数),
那么如果集群中少于 Q
个节点的情况下,该高可用(HA)部署将被取消,待到节点数量达到这个 Q 数值的时候,会再次进行部署。
这种机制可以防止出现网络分区(亦称 脑裂)。
关于 quora
的更多信息请参考 这里 。
要在运行 vert.x 实例的时候启用 quorum
,您只需要在命令行中指定 -quorum
参数,例如
在第一个终端中执行:
vertx run my-verticle.js -ha -quorum 3
此时 Vert.x 实例将会启动,但是并不会部署这个模块,因为现在只有1个节点, 而不是3个。
在第二个终端中执行:
vertx run my-other-verticle.js -ha -quorum 3
此时 Vert.x 实例将会启动,但是并不会部署这个模块,因为现在只有2个节点, 而不是3个。
在第三个终端中,您可以启动另一个 vert.x 实例:
vertx run yet-another-verticle.js -ha -quorum 3
哇!————我们有了3个节点,这正是 quorum
的数值。
此时此刻这些模块将会被自动地部署到所有实例上。
如果我们关闭或者强行杀死其中一个节点,那么这些模块将会被自动卸载,
因为节点数量已经不满足 quorum
数值条件。
Quora 也可以和高可用分组联合使用,此时 quora 仅在指定的分组中起作用。
本地传输
在BSD(OSX)和Linux操作系统中运行 Vert.x 的时候,如果条件允许,可以启用 native transports 这种特性:
Vertx vertx = Vertx.vertx(new VertxOptions().
setPreferNativeTransport(true)
);
// 如果本地传输已启用,则返回 true
boolean usingNative = vertx.isNativeTransportEnabled();
System.out.println("Running with native: " + usingNative);
注意
|
如果倾向于启用本地传输而相关条件却不满足的时候(例如相关JAR包缺失),程序依然可以运行。
如果您要求您的程序必须启用本地传输,您必须首先通过 isNativeTransportEnabled 来确认是否启用了本地传输。
|
Linux 下的本地传输
您需要在classpath中加入以下依赖:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
<!--<version>Should align with netty version that Vert.x uses</version>-->
</dependency>
Linux下的本地传输可以设置更多的网络选项:
-
SO_REUSEPORT
-
TCP_QUICKACK
-
TCP_CORK
-
TCP_FASTOPEN
vertx.createHttpServer(new HttpServerOptions()
.setTcpFastOpen(fastOpen)
.setTcpCork(cork)
.setTcpQuickAck(quickAck)
.setReusePort(reusePort)
);
BSD 下的本地传输
您需要在classpath中加入以下依赖:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<classifier>osx-x86_64</classifier>
<!--<version>必须和 Vert.x 所使用的 netty 的版本一致</version>-->
</dependency>
MacOS 中,Sierra及以上的版本支持这种特性。
BSD 下的本地传输可以启用以下额外的网络选项:
-
SO_REUSEPORT
vertx.createHttpServer(new HttpServerOptions().setReusePort(reusePort));
域套接字
通过本地传输,网络服务可以使用域套接字:
vertx.createNetServer().connectHandler(so -> {
// 处理请求
}).listen(SocketAddress.domainSocketAddress("/var/tmp/myservice.sock"));
http服务示例:
vertx.createHttpServer().requestHandler(req -> {
// 处理请求
}).listen(SocketAddress.domainSocketAddress("/var/tmp/myservice.sock"), ar -> {
if (ar.succeeded()) {
// 绑定到 socket
} else {
ar.cause().printStackTrace();
}
});
也适用于网络客户端:
NetClient netClient = vertx.createNetClient();
// 仅在 Linux 和 BSD 中可以使用
SocketAddress addr = SocketAddress.domainSocketAddress("/var/tmp/myservice.sock");
// 连接到服务器
netClient.connect(addr, ar -> {
if (ar.succeeded()) {
// 连接成功
} else {
ar.cause().printStackTrace();
}
});
http客户端示例:
HttpClient httpClient = vertx.createHttpClient();
// 仅在 Linux 和 BSD 中可以使用
SocketAddress addr = SocketAddress.domainSocketAddress("/var/tmp/myservice.sock");
// 向服务器发送请求
httpClient.request(new RequestOptions()
.setServer(addr)
.setHost("localhost")
.setPort(8080)
.setURI("/"))
.onSuccess(request -> {
request.send().onComplete(response -> {
// 处理响应信息
});
});
安全提示
Vert.x 是一套工具集,而不是一种强迫人们使用指定方式行事的框架,对于开发者而言, 这赋予了你们强大的力量,但也使得你们必须负起不小的责任。
与任何一种工具集一样,写出不安全的程序是难以避免的,所以您在开发程序时需要时刻小心, 特别是这个程序是暴露于毫无保护的公共场合(例如互联网)的情况下。
Web 应用
如果要编写一个 web 应用程序,这里强烈建议您使用 Vert.x-Web 来实现资源服务和文件上传功能,而不是直接使用 Vert.x core 。
Vert.x-Web 会对请求路径进行规整化,这可以阻止那些不怀好意的人利用精心构建的特殊URL, 来访问web应用根目录之外的资源的企图。
在文件上传方面也是如此, Vert.x-Web 不会完全信赖客户所端提供的文件名,因为客户端有可能精心设置一个特殊的文件名, 使得上传的文件被保存到磁盘上某个意料之外的位置上。 Vert.x-Web 可以保证上传的文件是被存放到磁盘上确切可知道位置的。
Vert.x core没有这样的检查,您需要自己实现。
Vert.x 命令行界面(CLI)API
Vert.x Core 提供了一套用于解析传递给程序的命令行参数的API。
这套API也可以用于打印命令行相关参数、选项的详细帮助信息。
即使这些功能远离Vert.x Core主题,该API已在 Launcher
类中使用,
因此您可以在 fat-jar 和 vertx
命令行工具中使用它们。
此外,它支持多语言(可用于任何已支持的语言),并可在 Vert.x Shell 中使用。
Vert.x CLI 不但提供一个编程模型用以描述命令行界面,还提供一个语法解析器。 这个语法解析器支持不同类型的语法:
-
POSIX 风格的选项参数 (例如:
tar -zxvf foo.tar.gz
) -
GNU 的长字符串风格的选项参数 (例如:
du --human-readable --max-depth=1
) -
Java 风格的属性参数 (例如:
java -Djava.awt.headless=true -Djava.net.useSystemProxies=true Foo
) -
附带选项值的简短风格的选项参数 (例如:
gcc -O2 foo.c
) -
包含单个连接符的长字符串风格的选项参数 (例如:
ant -projecthelp
)
使用这个命令行API只需要三个步骤:
-
定义命令行接口
-
解析用户输入的命令行
-
进行查询/问答交互操作
定义阶段
CLI cli = CLI.create("copy")
.setSummary("A command line interface to copy files.")
.addOption(new Option()
.setLongName("directory")
.setShortName("R")
.setDescription("enables directory support")
.setFlag(true))
.addArgument(new Argument()
.setIndex(0)
.setDescription("The source")
.setArgName("source"))
.addArgument(new Argument()
.setIndex(1)
.setDescription("The destination")
.setArgName("target"));
正如您所见到的一样,您可以通过 CLI.create
方法来创建一个新的
CLI
。此处传入的字符串参数就是这个CLI的名称。
创建之后,可以给它设置摘要和描述。一般来说,摘要是指一行简短的文字说明,
描述是指篇幅较长的详细说明。每个选项和参数可以使用
addArgument
和
addOption
方法加入到 CLI
对象中。
选项列表
Option
是指用户输入的命令行中出现的以 键 来标识的命令行参数。
选项必须至少有一个长名称或一个短名称。通常情况下,长名称使用 --
前缀,
短名称使用单个 -
前缀。这些名称都是大小写敏感的;但是,在 查询/问答交互阶段 的环节中,
如果输入的名称无法精确匹配,则会使用大小写不敏感的方式进行匹配。
选项可以在用法说明的部分显示出相关的描述(见下文)。选项可以接收0个,1个或者若干个选项值。
接收0个选项值的选项称作 标识(flag)
,标识必须使用
setFlag
来声明。缺省情况下,选项接收单个选项值,但是您也可以使用
setMultiValued
将其设置成接收多个选项值:
CLI cli = CLI.create("some-name")
.setSummary("A command line interface illustrating the options valuation.")
.addOption(new Option()
.setLongName("flag").setShortName("f").setFlag(true).setDescription("a flag"))
.addOption(new Option()
.setLongName("single").setShortName("s").setDescription("a single-valued option"))
.addOption(new Option()
.setLongName("multiple").setShortName("m").setMultiValued(true)
.setDescription("a multi-valued option"));
选项可以标记必填。用户如果没有输入必填选项, 则会在命令行解析的过程中抛出异常:
CLI cli = CLI.create("some-name")
.addOption(new Option()
.setLongName("mandatory")
.setRequired(true)
.setDescription("a mandatory option"));
非必填选项可以拥有一个 缺省值 ,在用户没有输入对应的选项值时, 则会启用这个默认选项值:
CLI cli = CLI.create("some-name")
.addOption(new Option()
.setLongName("optional")
.setDefaultValue("hello")
.setDescription("an optional option with a default value"));
选项也可以通过 setHidden
方法设置成 隐藏 的。
隐藏的选项不会在使用说明中显示出来,但是仍然可以起作用(提供给高级用户使用)。
如果选项值是一组固定的集合,可以设置允许输入哪些内容:
CLI cli = CLI.create("some-name")
.addOption(new Option()
.setLongName("color")
.setDefaultValue("green")
.addChoice("blue").addChoice("red").addChoice("green")
.setDescription("a color"));
选项列表配置也可以通过其对应格式的 JSON 数据来创建。
参数
和选项不一样,参数不以 键 进行标识而是以其 索引 作为标识。
例如,在 java com.acme.Foo
里, com.acme.Foo
就是一个参数。
参数没有名称,它们以从 0
开始计数的索引为标识。
第一个参数的索引为 0
:
CLI cli = CLI.create("some-name")
.addArgument(new Argument()
.setIndex(0)
.setDescription("the first argument")
.setArgName("arg1"))
.addArgument(new Argument()
.setIndex(1)
.setDescription("the second argument")
.setArgName("arg2"));
如果不设置参数的索引,则基于声明顺序自动计算索引值。
CLI cli = CLI.create("some-name")
// will have the index 0
.addArgument(new Argument()
.setDescription("the first argument")
.setArgName("arg1"))
// will have the index 1
.addArgument(new Argument()
.setDescription("the second argument")
.setArgName("arg2"));
argName
是可选的,并且在说明信息中会使用这个字段。
和选项一样,Argument
也可以:
-
使用
setHidden
设置为隐藏的 -
使用
setRequired
设置为必填的 -
使用
setDefaultValue
设置默认参数值 -
使用
setMultiValued
来接收多个参数值————只有最后一个参数才 允许设置成接收多个参数值。
参数也可以通过其对应格式的 JSON 数据来创建。
生成使用说明信息
当 CLI
实例配置完成之后,您可以用它来生成 使用说明 信息:
CLI cli = CLI.create("copy")
.setSummary("A command line interface to copy files.")
.addOption(new Option()
.setLongName("directory")
.setShortName("R")
.setDescription("enables directory support")
.setFlag(true))
.addArgument(new Argument()
.setIndex(0)
.setDescription("The source")
.setArgName("source"))
.addArgument(new Argument()
.setIndex(0)
.setDescription("The destination")
.setArgName("target"));
StringBuilder builder = new StringBuilder();
cli.usage(builder);
这可以生成诸如此类的使用说明信息:
Usage: copy [-R] source target
A command line interface to copy files.
-R,--directory enables directory support
如果需要调整这个使用说明信息,请参考 UsageMessageFormatter
类。
解析阶段
CLI
配置完成以后,您可以解析用户输入的命令行,
并以此处理每个参数和选项:
CommandLine commandLine = cli.parse(userCommandLineArguments);
parse
方法返回一个包含了这些值的 CommandLine
对象。
默认情况下,它会对用户输入的命令行进行检查校验,并确认哪些必填选项和必填参数有无缺失,
以及每个选项值的数量是否符合要求。您可以将
parse
方法中的第二个参数传入 false
值来禁用这项校验功能。
这可以用来检查某个参数或选项是否存在,无论命令行输入是否合规。
您可以使用
isValid
方法来检查 CommandLine
对象是否合规。
查询/问答交互阶段
命令行解析完成之后,您可以从
parse
方法返回的 CommandLine
对象中获取到选项值和参数值:
CommandLine commandLine = cli.parse(userCommandLineArguments);
String opt = commandLine.getOptionValue("my-option");
boolean flag = commandLine.isFlagEnabled("my-flag");
String arg0 = commandLine.getArgumentValue(0);
其中一个选项可以标记为“帮助”。如果命令行启用了“帮助”选项, 命令行的校验不会失败,而你有机会检查用户是否在寻求帮助:
CLI cli = CLI.create("test")
.addOption(
new Option().setLongName("help").setShortName("h").setFlag(true).setHelp(true))
.addOption(
new Option().setLongName("mandatory").setRequired(true));
CommandLine line = cli.parse(Collections.singletonList("-h"));
// The parsing does not fail and let you do:
if (!line.isValid() && line.isAskingForHelp()) {
StringBuilder builder = new StringBuilder();
cli.usage(builder);
stream.print(builder.toString());
}
类型化的选项和参数
上述的 Option
和 Argument
类是 无类型 的,
意味着只能从中获取到字符串类型的值。
TypedOption
和 TypedArgument
能让您对其赋予一个 类型 ,
这样(字符串类型的)原始值将被转换成对应的类型。
在
CLI
对象的定义中使用 TypedOption
和 TypedArgument
来取代
Option
和 Argument
:
CLI cli = CLI.create("copy")
.setSummary("A command line interface to copy files.")
.addOption(new TypedOption<Boolean>()
.setType(Boolean.class)
.setLongName("directory")
.setShortName("R")
.setDescription("enables directory support")
.setFlag(true))
.addArgument(new TypedArgument<File>()
.setType(File.class)
.setIndex(0)
.setDescription("The source")
.setArgName("source"))
.addArgument(new TypedArgument<File>()
.setType(File.class)
.setIndex(0)
.setDescription("The destination")
.setArgName("target"));
这时您就可以通过如下方式获取转换后的值:
CommandLine commandLine = cli.parse(userCommandLineArguments);
boolean flag = commandLine.getOptionValue("R");
File source = commandLine.getArgumentValue("source");
File target = commandLine.getArgumentValue("target");
Vert.x CLI 可以转换具有如下特征的类:
-
拥有参数签名为一个
String
类型的构造函数, 例如File
或者JsonObject
-
拥有一个名为
from
或者fromString
的静态方法 -
拥有一个静态的
valueOf
方法,例如原始类型和枚举类型
此外您也可以实现自定义的 Converter
并在 CLI
对象使用它:
CLI cli = CLI.create("some-name")
.addOption(new TypedOption<Person>()
.setType(Person.class)
.setConverter(new PersonConverter())
.setLongName("person"));
对于布尔类型而言,这些值将被视为 true
:on
, yes
, 1
, true
。
如果您的命令行选项存在 enum
类型,则会自动计算出一组可选值。
注解的使用
您也可以使用注解来定义CLI对象。可以通过在类和 setter 方法上使用注解来完成定义:
@Name("some-name")
@Summary("some short summary.")
@Description("some long description")
public class AnnotatedCli {
private boolean flag;
private String name;
private String arg;
@Option(shortName = "f", flag = true)
public void setFlag(boolean flag) {
this.flag = flag;
}
@Option(longName = "name")
public void setName(String name) {
this.name = name;
}
@Argument(index = 0)
public void setArg(String arg) {
this.arg = arg;
}
}
加上注解之后,您就可以使用以下方法来定义 CLI
对象并将对应的值注入进去:
CLI cli = CLI.create(AnnotatedCli.class);
CommandLine commandLine = cli.parse(userCommandLineArguments);
AnnotatedCli instance = new AnnotatedCli();
CLIConfigurator.inject(commandLine, instance);
Vert.x 启动器(Launcher)
Vert.x Launcher
在 fat-jar 中作为主类,由 vertx
命令行程序调用。
它可执行一组 命令 ,如 run 、 bare 和 start 等
扩展 Vert.x 启动器(Launcher)
您可以通过实现自己的 Command
类来扩展命令集(仅限于Java):
@Name("my-command")
@Summary("A simple hello command.")
public class MyCommand extends DefaultCommand {
private String name;
@Option(longName = "name", required = true)
public void setName(String n) {
this.name = n;
}
@Override
public void run() throws CLIException {
System.out.println("Hello " + name);
}
}
您还需要实现一个 CommandFactory
:
public class HelloCommandFactory extends DefaultCommandFactory<HelloCommand> {
public HelloCommandFactory() {
super(HelloCommand.class);
}
}
然后创建 src/main/resources/META-INF/services/io.vertx.core.spi.launcher.CommandFactory
并且添加一行表示工厂类的完全限定名称:
io.vertx.core.launcher.example.HelloCommandFactory
构建包含命令的jar。确保包含了SPI文件
(META-INF/services/io.vertx.core.spi.launcher.CommandFactory
)。
然后,将包含该命令的jar放入fat-jar(或包含在其中)的类路径中,或放在Vert.x发行版的 lib
目录中,您将可以执行:
vertx hello vert.x
java -jar my-fat-jar.jar hello vert.x
在 fat-jar 中使用启动器(Launcher)
要在 fat-jar 中使用 Launcher
类,只需要将 MANIFEST 的 Main-Class
设置为
io.vertx.core.Launcher
。另外,将 MANIFEST 中 Main-Verticle
条目设置为您的Main Verticle的名称。
默认情况下,它会执行 run
命令。但是,您可以通过设置 MANIFEST 的 Main-Command
条目来配置默认命令。
若在没有命令的情况下启动 fat-jar 会使用默认命令。
启动器(Launcher)子类
您还可以创建 Launcher
的子类来启动您的应用程序。
这个类设计得易于扩展。
一个启动器 Launcher
的子类可以:
-
在
beforeStartingVertx
中自定义 Vert.x 配置 -
通过重写
afterStartingVertx
来检索由“run”或“bare”命令创建的Vert.x实例 -
使用
getMainVerticle
和getDefaultCommand
方法配置默认的Verticle和命令 -
使用
register
和unregister
方法添加/删除命令
启动器(Launcher)和退出代码
当您使用 Launcher
类作为主类时,它的退出码有以下取值:
-
0
:进程顺利结束,或抛出未捕获的错误; -
1
:用于通用错误; -
11
:Vert.x无法初始化; -
12
:生成的进程无法启动、发现或停止,该错误代码一般由start
和stop
命令使用; -
14
:系统配置不符合系统要求(如找不到java
命令); -
15
:主Verticle不能被部署;
配置 Vert.x 缓存
当 Vert.x 需要从类路径中读取文件(嵌入在 fat-jar 中,在classpath中jar文件或classpath中其他文件)时, 它会把文件复制到缓存目录。背后原因很简单: 从 jar 或从输入流读取文件是阻塞的。所以为了避免每次都付出损耗, Vert.x 会将文件复制到其缓存目录中,并随后读取该文件。也可以配置此行为。
首先,默认情况下,Vert.x 使用 $CWD/.vertx
作为缓存目录,它在此目录创建一个唯一的目录,
以避免冲突。可以使用 vertx.cacheDirBase
系统属性配置该位置。
如,若当前工作目录不可写(例如在不可变容器中),
请使用以下命令启动应用程序:
vertx run my.Verticle -Dvertx.cacheDirBase=/tmp/vertx-cache
# or
java -jar my-fat.jar vertx.cacheDirBase=/tmp/vertx-cache
重要
|
该目录必须是 可写的 。 |
当您编辑资源(如HTML、CSS或JavaScript)时,这种缓存机制可能令人讨厌,因为它仅仅提供文件的第一个版本
(因此,如果您想重新加载页面,不会显示到您的编辑改变)。要避免此情况,
请使用 -Dvertx.disableFileCaching=true
启动应用程序。使用此设置,Vert.x 仍然使用缓存,
但会始终读取原文件然后刷新在缓存中的版本。
因此,如果您编辑从类路径提供的文件并刷新浏览器,Vert.x 会从类路径读取它,将其复制到缓存目录并从中提供。
不要在生产环境使用这个设置,它很有可能影响性能。
最后,您可以使用 -Dvertx.disableFileCPResolving=true
完全禁用缓存。
这个设置的副作用是:Vert.x将无法从类路径中读取任何文件(仅从文件系统中读取)。
使用此设置时要非常小心。