RabbitMQ Client for Vert.x
您的 Vert.x 应用可使用 Vert.x RabbitMQ Client(以下简称客户端)与 RabbitMQ 服务实例(基于 AMQP 0.9.1 版协议)互动
此服务是实验性的,API可能会在解决之前发生变化。
入门
Maven
在您的 maven 项目中,需要添加下列依赖:
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rabbitmq-client</artifactId>
<version>4.1.8</version>
</dependency>
创建客户端
您还可以像下面这样,使用完整的 amqp uri 来创建一个客户端实例: (译者注:amqp uri的使用规范可参考官网 https://www.rabbitmq.com/uri-spec.html )
RabbitMQOptions config = new RabbitMQOptions();
// full amqp uri
config.setUri("amqp://xvjvsrrc:VbuL1atClKt7zVNQha0bnnScbNvGiqgb@moose.rmq.cloudamqp.com/xvjvsrrc");
RabbitMQClient client = RabbitMQClient.create(vertx, config);
// Connect
client.start(asyncResult -> {
if (asyncResult.succeeded()) {
System.out.println("RabbitMQ successfully connected!");
} else {
System.out.println("Fail to connect to RabbitMQ " + asyncResult.cause().getMessage());
}
});
或者您也可以手动指定一些特定的参数:
RabbitMQOptions config = new RabbitMQOptions();
// 每个参数都是可选的
// 如果参数没有被设置,将会使用默认的参数值
config.setUser("user1");
config.setPassword("password1");
config.setHost("localhost");
config.setPort(5672);
config.setVirtualHost("vhost1");
config.setConnectionTimeout(6000); // in milliseconds
config.setRequestedHeartbeat(60); // in seconds
config.setHandshakeTimeout(6000); // in milliseconds
config.setRequestedChannelMax(5);
config.setNetworkRecoveryInterval(500); // in milliseconds
config.setAutomaticRecoveryEnabled(true);
RabbitMQClient client = RabbitMQClient.create(vertx, config);
// Connect
client.start(asyncResult -> {
if (asyncResult.succeeded()) {
System.out.println("RabbitMQ successfully connected!");
} else {
System.out.println("Fail to connect to RabbitMQ " + asyncResult.cause().getMessage());
}
});
您可以设置多个地址以连接到集群;
RabbitMQOptions config = new RabbitMQOptions();
config.setUser("user1");
config.setPassword("password1");
config.setVirtualHost("vhost1");
config.setAddresses(Arrays.asList(Address.parseAddresses("firstHost,secondHost:5672")));
RabbitMQClient client = RabbitMQClient.create(vertx, config);
// Connect
client.start(asyncResult -> {
if (asyncResult.succeeded()) {
System.out.println("RabbitMQ successfully connected!");
} else {
System.out.println("Fail to connect to RabbitMQ " + asyncResult.cause().getMessage());
}
});
恢复连接和重新连接
在 RabbitMQClient 中,存在两种单独且互不兼容的机制来处理重新连接:
-
Java RabbitMQ 客户端程序库提供的自动恢复连接机制;
-
重新启动 RabbitMQClient.
默认情况下均未启用任何一种机制。
Java RabbitMQ 客户端库提供的重新连接机制并非会在所有情况下都生效(如果和服务端的连接很好的断连,客户端将会关闭并且不会再恢复连接)
[source, java] 为了使用 Java RabbitMQ 客户端程序库的自动恢复连接功能,您必须启用它并且同时禁用 `RabbitMQClient` 程序库的重连尝试功能: RabbitMQOptions options = new RabbitMQOptions(); options.setAutomaticRecoveryEnabled(true); options.setReconnectAttempts(0);
客户端程序库同样会如其文档所述,尽可能的尝试拓扑恢复(该功能在客户端程序库里面是默认启用的,并且在 RabbitMQClientOptions
中没有公开)
或者 RabbitMQClient
可以配置为,每当存在连接问题就重连 RabbitMQ
服务端。
这种连接失败可能是由于一个短时间的网络异常(客户端可能会连接回相同的 RabbitMQ
服务端),也可能是因为故障转移方案引起。
这种方法比客户端程序库采用的方法更直接 —— 当客户端程序库报告了一个问题,不断尝试从头开始重连时, RabbitMQClient
会通过关闭连接重启。
您可以通过在配置中设置 setReconnectInterval
以及 setReconnectAttempts
属性来配置重连策略。
RabbitMQOptions options = new RabbitMQOptions();
options.setAutomaticRecoveryEnabled(false);
options.setReconnectAttempts(Integer.MAX_VALUE);
options.setReconnectInterval(500);
RabbitMQClient
重连不具有任何形式的拓扑恢复功能。
当服务端的拓扑结构准备好之前(换句话说,即在创建/绑定交换机和队列之前)发送信息,可能会导致竞争状况。
为了在连接准备就绪之前提供创建这些对象的机会, RabbitMQClient
提供了 ConnectionEstablishedCallback
方法。
ConnectionEstablishedCallback
方法可用于在其他使用者(包括 RabbitMQConsumer
和 RabbitMQPublisher
)访问 `RabbitMQClient`前,执行任意操作。
RabbitMQClient client = RabbitMQClient.create(vertx, config);
client.addConnectionEstablishedCallback(promise -> {
client.exchangeDeclare("exchange", "fanout", true, false)
.compose(v -> {
return client.queueDeclare("queue", false, true, true);
})
.compose(declareOk -> {
return client.queueBind(declareOk.getQueue(), "exchange", "");
})
.onComplete(promise);
});
// At this point the exchange, queue and binding will have been declared even if the client connects to a new server
client.basicConsumer("queue", rabbitMQConsumerAsyncResult -> {
});
如果 RabbitMQConsumer
在一个自动删除且服务端命名的队列上监听消息时,服务端重启了,那么直到客户端重连的时候,该队列将被移除。
在这种情况下,需要在 RabbitMQConsumer
上重新创建队列并且设置新队列的名称。
RabbitMQClient client = RabbitMQClient.create(vertx, config);
AtomicReference<RabbitMQConsumer> consumer = new AtomicReference<>();
AtomicReference<String> queueName = new AtomicReference<>();
client.addConnectionEstablishedCallback(promise -> {
client.exchangeDeclare("exchange", "fanout", true, false)
.compose(v -> client.queueDeclare("", false, true, true))
.compose(dok -> {
queueName.set(dok.getQueue());
// The first time this runs there will be no existing consumer
// on subsequent connections the consumer needs to be update with the new queue name
RabbitMQConsumer currentConsumer = consumer.get();
if (currentConsumer != null) {
currentConsumer.setQueueName(queueName.get());
}
return client.queueBind(queueName.get(), "exchange", "");
})
.onComplete(promise);
});
client.start()
.onSuccess(v -> {
// At this point the exchange, queue and binding will have been declared even if the client connects to a new server
client.basicConsumer(queueName.get(), rabbitMQConsumerAsyncResult -> {
if (rabbitMQConsumerAsyncResult.succeeded()) {
consumer.set(rabbitMQConsumerAsyncResult.result());
}
});
})
.onFailure(ex -> {
System.out.println("It went wrong: " + ex.getMessage());
});
客户端启用SSL/TLS
您可以很轻松配置 RabbitMQClient
来使用 SSL
。
RabbitMQOptions options = new RabbitMQOptions()
.setSsl(true);
客户端证书认证配置
如果您将 trustAll
设置为 true
,那么客户端将信任所有服务端的证书。
虽然连接仍然会被加密,但是很容易受到 '中间人' 的攻击。
后果不堪设想, 不要在生产环境中使用该选项! 该配置的默认值是 false
。
RabbitMQOptions options = new RabbitMQOptions()
.setSsl(true)
.setTrustAll(true));
如果您将 trustAll
设置为 false
,客户端将进行妥当的服务端验证。这里有三个主要的可选策略。
-
如果您默认的
truststore
已经信任了服务端,那么在这种情况下一切都没问题 -
启动java进程的时候,携带 -Djavax.net.ssl.trustStore=xxx.jks ,自定义客户端信任证书仓库
-
通过
RabbitMQOptions
给客户端提供一个自定义的客户端信任证书仓库。
配置JKS格式证书信任仓库
RabbitMQOptions options = new RabbitMQOptions()
.setSsl(true)
.setTrustOptions(new JksOptions()
.setPath("/path/myKeyStore.jks")
.setPassword("myKeyStorePassword"));
声明交换机并携带额外配置
您可以向 RabbitMQ
的 exchangeDeclare
方法传入额外的配置参数。
JsonObject config = new JsonObject();
config.put("x-dead-letter-exchange", "my.deadletter.exchange");
config.put("alternate-exchange", "my.alternate.exchange");
// ...
client.exchangeDeclare("my.exchange", "fanout", true, false, config, onResult -> {
if (onResult.succeeded()) {
System.out.println("Exchange successfully declared with config");
} else {
onResult.cause().printStackTrace();
}
});
声明队列并携带额外配置
您可以向 RabbitMQ
的 queueDeclare
方法传入额外的配置参数。
JsonObject config = new JsonObject();
config.put("x-message-ttl", 10_000L);
client.queueDeclare("my-queue", true, false, true, config, queueResult -> {
if (queueResult.succeeded()) {
System.out.println("Queue declared!");
} else {
System.err.println("Queue failed to be declared!");
queueResult.cause().printStackTrace();
}
});
各种操作
下面是一些 RabbitMQService API
支持的操作示例。
关于所有 API
方法的详细信息,请参阅 API
文档。
发布消息
将消息发布到队列
Buffer message = Buffer.buffer("body", "Hello RabbitMQ, from Vert.x !");
client.basicPublish("", "my.queue", message, pubResult -> {
if (pubResult.succeeded()) {
System.out.println("Message published !");
} else {
pubResult.cause().printStackTrace();
}
});
发布消息并进行确认
将消息发布到队列,并确认服务端已收到消息。
Buffer message = Buffer.buffer("body", "Hello RabbitMQ, from Vert.x !");
// Put the channel in confirm mode. This can be done once at init.
client.confirmSelect(confirmResult -> {
if(confirmResult.succeeded()) {
client.basicPublish("", "my.queue", message, pubResult -> {
if (pubResult.succeeded()) {
// Check the message got confirmed by the broker.
client.waitForConfirms(waitResult -> {
if(waitResult.succeeded())
System.out.println("Message published !");
else
waitResult.cause().printStackTrace();
});
} else {
pubResult.cause().printStackTrace();
}
});
} else {
confirmResult.cause().printStackTrace();
}
});
可靠的消息发布
为了可靠的将消息发布到 RabbitMQ
,有必要去确认每条消息是否都已被服务端接受。
最简单的确认方法是使用上面的 basicPublishWithConfirm
方法,该方法是在发送每条消息的时候,同步进行确认操作 —— 阻塞发布通道,直到确认消息已被接受。
RabbitMQ
为了达成更大的吞吐量,提供了异步的确认方法。
异步确认方法可以一次性确认多条消息,因此客户端有必要按照发布的顺序,追踪所有的消息。
此外,直到服务端确认消息前,可能有必要重新发送它们,因此这些消息必须被客户端继续保留。
RabbitMQPublisher
类实现了一个处理异步确认的标准方法,这避免了大量的样版代码。
RabbitMQPublisher
运作的方式如下:
* 将所有需要发送的消息添加到一个内部的队列中。
* 从队列发送消息时,追踪这些在单独队列中等待确认的消息。
* 处理 RabbitMQ
异步确认结果时,一但消息被确认,就将这些消息从等待确认的队列中移除。
* 每条被确认的消息都会通知调用者(一次通知一条消息,不同于 RabbitMQ
使用的批量消息确认机制)
RabbitMQPublisher publisher = RabbitMQPublisher.create(vertx, client, options);
messages.forEach((k,v) -> {
com.rabbitmq.client.BasicProperties properties = new AMQP.BasicProperties.Builder()
.messageId(k)
.build();
publisher.publish("exchange", "routingKey", properties, v.toBuffer());
});
publisher.getConfirmationStream().handler(conf -> {
if (conf.isSucceeded()) {
messages.remove(conf.getMessageId());
}
});
投递标签
对于任何想实现他们自己的 RabbitMQPublisher
的人来说,本节的实现细节会很有用。
要使 RabbitMQPublisher
工作,必须了解投递标签。RabbitMQ
会对每一条已发布的消息使用投递标签。
RabbitMQ
的确认信息,可在完成 basicPublish
方法的调用前,就到达客户端。因此您在使用异步确认的时候,是不可能通过任何 basicPublish
返回的东西来识别投递标签。
出于这个原因,RabbitMQClient
有必要通过单独的回调告诉 RabbitMQPublisher
每一条消息的投递标签。而这个回调发生在消息发送之前的 RabbitMQClient::basicPublish
调用过程中。
另外,单个消息的投递标签也有可能会变化(投递标签使用的是单通道,因此如果一条消息在重新连接之后被重新发送,那么这条消息会有一条新的投递标签)—— 这意味着,我们无法用 Future
把投递标签通知给客户端。
针对一条给定的消息,进行多次 deliveryTagHandler
方法调用时,忽略旧的投递标签是安全的 —— 因为无论什么时候,一条消息只存在一条有效的投递标签。
要想捕获投递标签,可使用下面 RabbitMqClient::basicPublishWithDeliveryTag
方法中的一个。
void basicPublishWithDeliveryTag(String exchange, String routingKey, BasicProperties properties, Buffer body, Handler<Long> deliveryTagHandler, Handler<AsyncResult<Void>> resultHandler);
Future<Void> basicPublishWithDeliveryTag(String exchange, String routingKey, BasicProperties properties, Buffer body, @Nullable Handler<Long> deliveryTagHandler);
这是 RabbitMqClient::basicPublishWithDeliveryTag
的列表。
消费消息
从队列中消费消息。
// 您可以从队列创建一个消息 stream
client.basicConsumer("my.queue", rabbitMQConsumerAsyncResult -> {
if (rabbitMQConsumerAsyncResult.succeeded()) {
System.out.println("RabbitMQ consumer created !");
RabbitMQConsumer mqConsumer = rabbitMQConsumerAsyncResult.result();
mqConsumer.handler(message -> {
System.out.println("Got message: " + message.body().toString());
});
} else {
rabbitMQConsumerAsyncResult.cause().printStackTrace();
}
});
任何时候您都可以暂停、或者继续 stream
。当 stream
暂停时,您将不会接收到任何消息。
consumer.pause();
consumer.resume();
当您要创建一个消费 stream
时,有一组选项可供选择。
QueueOptions
允许您进行定制化:
-
可以用
setMaxInternalQueueSize
来设置内部队列的长度 -
使用
setKeepMostRecent
可以设置stream
是否保留更多的最近消息
QueueOptions options = new QueueOptions()
.setMaxInternalQueueSize(1000)
.setKeepMostRecent(true);
client.basicConsumer("my.queue", options, rabbitMQConsumerAsyncResult -> {
if (rabbitMQConsumerAsyncResult.succeeded()) {
System.out.println("RabbitMQ consumer created !");
} else {
rabbitMQConsumerAsyncResult.cause().printStackTrace();
}
});
当您想要停止从队列中消费消息,那么您可以参照下面的例子:
rabbitMQConsumer.cancel(cancelResult -> {
if (cancelResult.succeeded()) {
System.out.println("Consumption successfully stopped");
} else {
System.out.println("Tired in attempt to stop consumption");
cancelResult.cause().printStackTrace();
}
});
当队列不再处理任何消息时,您会收到结束处理程序的通知:
rabbitMQConsumer.endHandler(v -> {
System.out.println("It is the end of the stream");
});
您可以设置专门处理异常的 handler
,在程序运行出错时用它收到通知。
consumer.exceptionHandler(e -> {
System.out.println("An exception occurred in the process of message handling");
e.printStackTrace();
});
最后,您可能想要查找消费者标签:(译者注:费者标签可以由客户端或者服务器来生成,用于消费者的身份识别。详见官方文档:http://rabbitmq.mr-ping.com/ClientDocumentation/java-api-guide.html)
String consumerTag = consumer.consumerTag();
System.out.println("Consumer tag is: " + consumerTag);
获取消息
从队列中获取消息
client.basicGet("my.queue", true, getResult -> {
if (getResult.succeeded()) {
RabbitMQMessage msg = getResult.result();
System.out.println("Got message: " + msg.body());
} else {
getResult.cause().printStackTrace();
}
});
手动确认消费的消息
(译者注:设置autoAck = false时,需要手动对投递到 Consumer
的消息进行确认)
client.basicConsumer("my.queue", new QueueOptions().setAutoAck(false), consumeResult -> {
if (consumeResult.succeeded()) {
System.out.println("RabbitMQ consumer created !");
RabbitMQConsumer consumer = consumeResult.result();
// Set the handler which messages will be sent to
consumer.handler(msg -> {
JsonObject json = (JsonObject) msg.body();
System.out.println("Got message: " + json.getString("body"));
// ack
client.basicAck(json.getLong("deliveryTag"), false, asyncResult -> {
});
});
} else {
consumeResult.cause().printStackTrace();
}
});