Vert.x AMQP Client
Vert.x AMQP 客户端可以与 AMQP 1.0 broker 和 router 互通。他可以做到如下事情:
-
连接一个 AMQP 的 broker 或 router - 支持 SASL 和 TLS 连接
-
消费 queue 或 topic 当中的消息
-
向 queue 或 topic 中发送消息
-
发送消息后检查ACK
AMQP 1.0 协议支持持久订阅、持久化、安全保障、会话、复杂路由…… 更多 该协议的细节详见 AMQP homepage 。
Vert.x AMQP 客户端基于 Vert.x Proton 实现。如果您需要更细粒度的控制,我们建议您 直接使用 Vert.x Proton
使用 Vert.x AMQP 客户端
为了使用 Vert.x AMQP 客户端, 需要将以下依赖添加到您构建描述文件 的依赖项配置中:
-
Maven(在您的
pom.xml
文件中):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-amqp-client</artifactId>
<version>4.4.0</version>
</dependency>
-
Gradle(在您的
build.gradle
文件中):
compile 'io.vertx:vertx-amqp-client:4.4.0'
创建 AMQP 客户端
只要您在 CLASSPATH 中添加该客户端,您就可以按如下方式实例化 AmqpClient
对象
AmqpClientOptions options = new AmqpClientOptions()
.setHost("localhost")
.setPort(5672)
.setUsername("user")
.setPassword("secret");
// 用内部Vert.x实例创建客户端
AmqpClient client1 = AmqpClient.create(options);
// 显式使用Vert.x实例
AmqpClient client2 = AmqpClient.create(vertx, options);
创建 AmqpClient
对象有两个方法。您可以显式的传一个 Vert.x 实例,
如果您正在 Vert.x 应用或 Vert.x Verticle 内,请用此方式;否则您可以省略传递 Vert.x 参数,
当客户端关闭时,有一个内部的 Vert.x 对象会被创建/关闭。
要创建 AmqpClient
,您需要传 AmqpClientOptions
参数。
这个 options 参数包含了 broker 和 router 的位置信息,认证信息等…..
用 options 参数可以配置 AMQP 客户端的许多方面。
注意:您也可以用这些 options 来控制底层的 Proton client 。
也可以从系统参数或者环境变量来配置 host、port、username、password:
-
Host:系统参数:
amqp-client-host
,环境变量:AMQP_CLIENT_HOST
(强制必填项) -
Port:系统参数:
amqp-client-port
,环境变量:AMQP_CLIENT_PORT
(默认为5672) -
Username:系统参数:
amqp-client-username
,环境变量:AMQP_CLIENT_USERNAME
-
Password:系统参数:
amqp-client-password
,环境变量:AMQP_CLIENT_PASSWORD
(默认为5672)
建立连接
一旦您创建了一个客户端,您需要显式地连接远程服务。
这可用 connect
方法实现:
client.connect(ar -> {
if (ar.failed()) {
System.out.println("Unable to connect to the broker");
} else {
System.out.println("Connection succeeded");
AmqpConnection connection = ar.result();
}
});
一旦 连接
建立成功或失败,则相应处理器会被调用。值得注意的是 连接
用于建立 receiver 和 sender。
创建receiver
receiver 用于接收消息。AMQP receiver 可以用如下两种方式获取:
connection.createReceiver("my-queue",
done -> {
if (done.failed()) {
System.out.println("Unable to create receiver");
} else {
AmqpReceiver receiver = done.result();
receiver.handler(msg -> {
// 每次接收到消息就被调用
System.out.println("Received " + msg.bodyAsString());
});
}
}
);
connection.createReceiver("my-queue",
done -> {
if (done.failed()) {
System.out.println("Unable to create receiver");
} else {
AmqpReceiver receiver = done.result();
receiver
.exceptionHandler(t -> {
// 抛出异常
})
.handler(msg -> {
// 关联消息处理器
});
}
}
);
这两种方式的主要区别在于 何时 关联消息处理器。第一种方式 直接设置处理器,并且立即开始接收消息;第二种方式中, 处理器在连接创建完成之后被人工关联。这样,您可以获得更多控制权并添加其他的处理器。
在 completion handler 中传入的 receiver 可以作为Stream来使用。所以您可以暂停、恢复 消息的接收。背压协议(back-pressure protocol)由 AMQP credits 实现。
接收到的消息是 AmqpMessage
实例。这些实例是不可变(immutable)的,
并且支持访问大多数 AMQP 元数据。请查看
properties 列表作参考。注意:
要从消息体中获取 JSON object 或 JSON array ,那么作为 AMQP 数据 的值则是必须的。
您也可以用客户端直接创建 receiver :
client.createReceiver("my-queue"
,
done -> {
if (done.failed()) {
System.out.println("Unable to create receiver");
} else {
AmqpReceiver receiver = done.result();
receiver.handler(msg -> {
// 每次接收消息时都被调用
System.out.println("Received " + msg.bodyAsString());
});
}
}
);
这个示例中,连接是自动创建的。您可以用
connection
方法获取它。
默认情况下,消息自动发送 ACK 响应,您可以用
setAutoAcknowledgement
来禁用这个此操作。然后您则需要用如下方法
显式的发送 ACK:
* accepted
* rejected
* released
创建 sender
sender 可以将消息发送到 queue 和 topic 当中。您可以通过如下方式获取到 sender:
connection.createSender("my-queue", done -> {
if (done.failed()) {
System.out.println("Unable to create a sender");
} else {
AmqpSender result = done.result();
}
});
一旦您获取了 AMQP sender,您就可以创建消息。
因为 AmqpMessage
是不可变(immutable)的,所以要用 AmqpMessageBuilder
类来执行创建操作。
以下是一些例子:
AmqpMessageBuilder builder = AmqpMessage.create();
// 一条普通的消息
AmqpMessage m1 = builder.withBody("hello").build();
// 指定了地址的消息
AmqpMessage m2 = builder.withBody("hello").address("another-queue").build();
// 带有JSON消息体、元数据、TTL的消息
AmqpMessage m3 = builder
.withJsonObjectAsBody(new JsonObject().put("message", "hello"))
.subject("subject")
.ttl(10000)
.applicationProperties(new JsonObject().put("prop1", "value1"))
.build();
在您创建 sender 和消息之后,您可以用如下方法发送消息:
-
send
- 发送消息 -
sendWithAck
- 发送消息并监控其ACK
以下是最简单的发消息方式:
sender.send(AmqpMessage.create().withBody("hello").build());
发送消息时,您可以监控其 ACK
sender.sendWithAck(AmqpMessage.create().withBody("hello").build(), acked -> {
if (acked.succeeded()) {
System.out.println("Message accepted");
} else {
System.out.println("Message not accepted");
}
});
注意:如果传输状态为 ACCEPTED
,那么就视为该消息已收到 ACK。
其他情况则视为未收到 ACK(详细的信息可以从回传的 cause 中获得)。
AmqpSender
可以用作 write stream。流的控制是用 AMQP credits 实现的
您也可以用客户端直接生成 sender:
client.createSender("my-queue", maybeSender -> {
//...
});
这个示例中,连接是自动建立的。您可以用
connection
获取它。
实现 request-reply
要实现 request-reply ,您可以用动态 receiver 和匿名 sender。动态 receiver 不关联于用户创建的 address,而是由 broker 提供这个 address。匿名 sender 也不和指定的 address 关联, 它要求所有的消息都包含一个address。
以下便展示了如何实现 request-reply:
connection.createAnonymousSender(responseSender -> {
// 获取匿名sender用于响应消息
// 注册 main receiver:
connection.createReceiver("my-queue", done -> {
if (done.failed()) {
System.out.println("Unable to create receiver");
} else {
AmqpReceiver receiver = done.result();
receiver.handler(msg -> {
// 获取到了消息,响应之
responseSender.result().send(AmqpMessage.create()
.address(msg.replyTo())
.correlationId(msg.id()) // 发送消息id作为关联的id
.withBody("my response to your request")
.build()
);
});
}
});
});
// sender端(发送请求并等待接收响应)
connection.createDynamicReceiver(replyReceiver -> {
// 获取receiver,address由broker提供
String replyToAddress = replyReceiver.result().address();
// 关联处理器用于接收响应
replyReceiver.result().handler(msg -> {
System.out.println("Got the reply! " + msg.bodyAsString());
});
// 创建sender并发送消息:
connection.createSender("my-queue", sender -> {
sender.result().send(AmqpMessage.create()
.replyTo(replyToAddress)
.id("my-message-id")
.withBody("This is my request").build());
});
});
要响应一个消息,就要将它回应到指定的 address。另外,用 message id
作为 correlation id
是一个好的做法,
这样响应的接收者可以将响应与请求相关联。