RabbitMQ Client for Vert.x

您的 Vert.x 应用可使用 Vert.x RabbitMQ Client(以下简称客户端)与 RabbitMQ 服务实例(基于 AMQP 0.9.1 版协议)互动

此服务是实验性的,API可能会在解决之前发生变化。

入门

Maven

在您的 maven 项目中,需要添加下列依赖:

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-rabbitmq-client</artifactId>
 <version>4.1.8</version>
</dependency>

Gradle

在您的 gradle 项目中,需要添加下列依赖:

dependencies {
 compile 'io.vertx:vertx-rabbitmq-client:4.1.8'
}

创建客户端

您还可以像下面这样,使用完整的 amqp uri 来创建一个客户端实例: (译者注:amqp uri的使用规范可参考官网 https://www.rabbitmq.com/uri-spec.html

RabbitMQOptions config = new RabbitMQOptions();
// full amqp uri
config.setUri("amqp://xvjvsrrc:VbuL1atClKt7zVNQha0bnnScbNvGiqgb@moose.rmq.cloudamqp.com/xvjvsrrc");
RabbitMQClient client = RabbitMQClient.create(vertx, config);

// Connect
client.start(asyncResult -> {
  if (asyncResult.succeeded()) {
    System.out.println("RabbitMQ successfully connected!");
  } else {
    System.out.println("Fail to connect to RabbitMQ " + asyncResult.cause().getMessage());
  }
});

或者您也可以手动指定一些特定的参数:

RabbitMQOptions config = new RabbitMQOptions();
// 每个参数都是可选的
// 如果参数没有被设置,将会使用默认的参数值
config.setUser("user1");
config.setPassword("password1");
config.setHost("localhost");
config.setPort(5672);
config.setVirtualHost("vhost1");
config.setConnectionTimeout(6000); // in milliseconds
config.setRequestedHeartbeat(60); // in seconds
config.setHandshakeTimeout(6000); // in milliseconds
config.setRequestedChannelMax(5);
config.setNetworkRecoveryInterval(500); // in milliseconds
config.setAutomaticRecoveryEnabled(true);

RabbitMQClient client = RabbitMQClient.create(vertx, config);

// Connect
client.start(asyncResult -> {
  if (asyncResult.succeeded()) {
    System.out.println("RabbitMQ successfully connected!");
  } else {
    System.out.println("Fail to connect to RabbitMQ " + asyncResult.cause().getMessage());
  }
});

您可以设置多个地址以连接到集群;

RabbitMQOptions config = new RabbitMQOptions();
config.setUser("user1");
config.setPassword("password1");
config.setVirtualHost("vhost1");

config.setAddresses(Arrays.asList(Address.parseAddresses("firstHost,secondHost:5672")));

RabbitMQClient client = RabbitMQClient.create(vertx, config);

// Connect
client.start(asyncResult -> {
  if (asyncResult.succeeded()) {
    System.out.println("RabbitMQ successfully connected!");
  } else {
    System.out.println("Fail to connect to RabbitMQ " + asyncResult.cause().getMessage());
  }
});

恢复连接和重新连接

在 RabbitMQClient 中,存在两种单独且互不兼容的机制来处理重新连接:

  • Java RabbitMQ 客户端程序库提供的自动恢复连接机制;

  • 重新启动 RabbitMQClient.

默认情况下均未启用任何一种机制。

Java RabbitMQ 客户端库提供的重新连接机制并非会在所有情况下都生效(如果和服务端的连接很好的断连,客户端将会关闭并且不会再恢复连接)

[source, java]
为了使用 Java RabbitMQ 客户端程序库的自动恢复连接功能,您必须启用它并且同时禁用 `RabbitMQClient` 程序库的重连尝试功能:
RabbitMQOptions options = new RabbitMQOptions();
options.setAutomaticRecoveryEnabled(true);
options.setReconnectAttempts(0);

客户端程序库同样会如其文档所述,尽可能的尝试拓扑恢复(该功能在客户端程序库里面是默认启用的,并且在 RabbitMQClientOptions 中没有公开)

或者 RabbitMQClient 可以配置为,每当存在连接问题就重连 RabbitMQ 服务端。 这种连接失败可能是由于一个短时间的网络异常(客户端可能会连接回相同的 RabbitMQ 服务端),也可能是因为故障转移方案引起。 这种方法比客户端程序库采用的方法更直接 —— 当客户端程序库报告了一个问题,不断尝试从头开始重连时, RabbitMQClient 会通过关闭连接重启。

您可以通过在配置中设置 setReconnectInterval 以及 setReconnectAttempts 属性来配置重连策略。

RabbitMQOptions options = new RabbitMQOptions();
options.setAutomaticRecoveryEnabled(false);
options.setReconnectAttempts(Integer.MAX_VALUE);
options.setReconnectInterval(500);

RabbitMQClient 重连不具有任何形式的拓扑恢复功能。 当服务端的拓扑结构准备好之前(换句话说,即在创建/绑定交换机和队列之前)发送信息,可能会导致竞争状况。 为了在连接准备就绪之前提供创建这些对象的机会, RabbitMQClient 提供了 ConnectionEstablishedCallback 方法。 ConnectionEstablishedCallback 方法可用于在其他使用者(包括 RabbitMQConsumerRabbitMQPublisher )访问 `RabbitMQClient`前,执行任意操作。

RabbitMQClient client = RabbitMQClient.create(vertx, config);
client.addConnectionEstablishedCallback(promise -> {
            client.exchangeDeclare("exchange", "fanout", true, false)
                .compose(v -> {
                  return client.queueDeclare("queue", false, true, true);
                })
                .compose(declareOk -> {
                  return client.queueBind(declareOk.getQueue(), "exchange", "");
                })
                .onComplete(promise);
});

// At this point the exchange, queue and binding will have been declared even if the client connects to a new server
client.basicConsumer("queue", rabbitMQConsumerAsyncResult -> {
});

如果 RabbitMQConsumer 在一个自动删除且服务端命名的队列上监听消息时,服务端重启了,那么直到客户端重连的时候,该队列将被移除。 在这种情况下,需要在 RabbitMQConsumer 上重新创建队列并且设置新队列的名称。

RabbitMQClient client = RabbitMQClient.create(vertx, config);
AtomicReference<RabbitMQConsumer> consumer = new AtomicReference<>();
AtomicReference<String> queueName = new AtomicReference<>();
client.addConnectionEstablishedCallback(promise -> {
      client.exchangeDeclare("exchange", "fanout", true, false)
              .compose(v -> client.queueDeclare("", false, true, true))
              .compose(dok -> {
                  queueName.set(dok.getQueue());
                  // The first time this runs there will be no existing consumer
                  // on subsequent connections the consumer needs to be update with the new queue name
                  RabbitMQConsumer currentConsumer = consumer.get();
                  if (currentConsumer != null) {
                    currentConsumer.setQueueName(queueName.get());
                  }
                  return client.queueBind(queueName.get(), "exchange", "");
              })
              .onComplete(promise);
});

client.start()
        .onSuccess(v -> {
            // At this point the exchange, queue and binding will have been declared even if the client connects to a new server
            client.basicConsumer(queueName.get(), rabbitMQConsumerAsyncResult -> {
                if (rabbitMQConsumerAsyncResult.succeeded()) {
                    consumer.set(rabbitMQConsumerAsyncResult.result());
                }
            });
        })
        .onFailure(ex -> {
            System.out.println("It went wrong: " + ex.getMessage());
        });

客户端启用SSL/TLS

您可以很轻松配置 RabbitMQClient 来使用 SSL

RabbitMQOptions options = new RabbitMQOptions()
 .setSsl(true);
客户端证书认证配置

如果您将 trustAll 设置为 true ,那么客户端将信任所有服务端的证书。 虽然连接仍然会被加密,但是很容易受到 '中间人' 的攻击。 后果不堪设想, 不要在生产环境中使用该选项! 该配置的默认值是 false

RabbitMQOptions options = new RabbitMQOptions()
 .setSsl(true)
 .setTrustAll(true));

如果您将 trustAll 设置为 false ,客户端将进行妥当的服务端验证。这里有三个主要的可选策略。

  • 如果您默认的 truststore 已经信任了服务端,那么在这种情况下一切都没问题

  • 启动java进程的时候,携带 -Djavax.net.ssl.trustStore=xxx.jks ,自定义客户端信任证书仓库

  • 通过 RabbitMQOptions 给客户端提供一个自定义的客户端信任证书仓库。

配置JKS格式证书信任仓库
RabbitMQOptions options = new RabbitMQOptions()
 .setSsl(true)
 .setTrustOptions(new JksOptions()
   .setPath("/path/myKeyStore.jks")
   .setPassword("myKeyStorePassword"));
配置p12/pfx格式证书信任仓库
RabbitMQOptions options = new RabbitMQOptions()
 .setSsl(true)
 .setPfxTrustOptions(
   new PfxOptions().
     setPath("/path/myKeyStore.p12").
     setPassword("myKeyStorePassword"));
配置PEM格式证书
RabbitMQOptions options = new RabbitMQOptions()
 .setSsl(true)
 .setPemTrustOptions(
   new PemTrustOptions().
     addCertPath("/path/ca-cert.pem"));

声明交换机并携带额外配置

您可以向 RabbitMQexchangeDeclare 方法传入额外的配置参数。

JsonObject config = new JsonObject();

config.put("x-dead-letter-exchange", "my.deadletter.exchange");
config.put("alternate-exchange", "my.alternate.exchange");
// ...
client.exchangeDeclare("my.exchange", "fanout", true, false, config, onResult -> {
  if (onResult.succeeded()) {
    System.out.println("Exchange successfully declared with config");
  } else {
    onResult.cause().printStackTrace();
  }
});

声明队列并携带额外配置

您可以向 RabbitMQqueueDeclare 方法传入额外的配置参数。

JsonObject config = new JsonObject();
config.put("x-message-ttl", 10_000L);

client.queueDeclare("my-queue", true, false, true, config, queueResult -> {
  if (queueResult.succeeded()) {
    System.out.println("Queue declared!");
  } else {
    System.err.println("Queue failed to be declared!");
    queueResult.cause().printStackTrace();
  }
});

各种操作

下面是一些 RabbitMQService API 支持的操作示例。 关于所有 API 方法的详细信息,请参阅 API 文档。

发布消息

将消息发布到队列

Buffer message = Buffer.buffer("body", "Hello RabbitMQ, from Vert.x !");
client.basicPublish("", "my.queue", message, pubResult -> {
  if (pubResult.succeeded()) {
    System.out.println("Message published !");
  } else {
    pubResult.cause().printStackTrace();
  }
});

发布消息并进行确认

将消息发布到队列,并确认服务端已收到消息。

Buffer message = Buffer.buffer("body", "Hello RabbitMQ, from Vert.x !");

// Put the channel in confirm mode. This can be done once at init.
client.confirmSelect(confirmResult -> {
  if(confirmResult.succeeded()) {
    client.basicPublish("", "my.queue", message, pubResult -> {
      if (pubResult.succeeded()) {
        // Check the message got confirmed by the broker.
        client.waitForConfirms(waitResult -> {
          if(waitResult.succeeded())
            System.out.println("Message published !");
          else
            waitResult.cause().printStackTrace();
        });
      } else {
        pubResult.cause().printStackTrace();
      }
    });
  } else {
    confirmResult.cause().printStackTrace();
  }
});

可靠的消息发布

为了可靠的将消息发布到 RabbitMQ,有必要去确认每条消息是否都已被服务端接受。 最简单的确认方法是使用上面的 basicPublishWithConfirm 方法,该方法是在发送每条消息的时候,同步进行确认操作 —— 阻塞发布通道,直到确认消息已被接受。

RabbitMQ 为了达成更大的吞吐量,提供了异步的确认方法。 异步确认方法可以一次性确认多条消息,因此客户端有必要按照发布的顺序,追踪所有的消息。 此外,直到服务端确认消息前,可能有必要重新发送它们,因此这些消息必须被客户端继续保留。

RabbitMQPublisher 类实现了一个处理异步确认的标准方法,这避免了大量的样版代码。

RabbitMQPublisher 运作的方式如下: * 将所有需要发送的消息添加到一个内部的队列中。 * 从队列发送消息时,追踪这些在单独队列中等待确认的消息。 * 处理 RabbitMQ 异步确认结果时,一但消息被确认,就将这些消息从等待确认的队列中移除。 * 每条被确认的消息都会通知调用者(一次通知一条消息,不同于 RabbitMQ 使用的批量消息确认机制)

RabbitMQPublisher publisher = RabbitMQPublisher.create(vertx, client, options);

messages.forEach((k,v) -> {
  com.rabbitmq.client.BasicProperties properties = new AMQP.BasicProperties.Builder()
          .messageId(k)
          .build();
  publisher.publish("exchange", "routingKey", properties, v.toBuffer());
});

publisher.getConfirmationStream().handler(conf -> {
  if (conf.isSucceeded()) {
    messages.remove(conf.getMessageId());
  }
});

投递标签

对于任何想实现他们自己的 RabbitMQPublisher 的人来说,本节的实现细节会很有用。

要使 RabbitMQPublisher 工作,必须了解投递标签。RabbitMQ 会对每一条已发布的消息使用投递标签。 RabbitMQ 的确认信息,可在完成 basicPublish 方法的调用前,就到达客户端。因此您在使用异步确认的时候,是不可能通过任何 basicPublish 返回的东西来识别投递标签。 出于这个原因,RabbitMQClient 有必要通过单独的回调告诉 RabbitMQPublisher 每一条消息的投递标签。而这个回调发生在消息发送之前的 RabbitMQClient::basicPublish 调用过程中。 另外,单个消息的投递标签也有可能会变化(投递标签使用的是单通道,因此如果一条消息在重新连接之后被重新发送,那么这条消息会有一条新的投递标签)—— 这意味着,我们无法用 Future 把投递标签通知给客户端。 针对一条给定的消息,进行多次 deliveryTagHandler 方法调用时,忽略旧的投递标签是安全的 —— 因为无论什么时候,一条消息只存在一条有效的投递标签。

要想捕获投递标签,可使用下面 RabbitMqClient::basicPublishWithDeliveryTag 方法中的一个。

 void basicPublishWithDeliveryTag(String exchange, String routingKey, BasicProperties properties, Buffer body, Handler<Long> deliveryTagHandler, Handler<AsyncResult<Void>> resultHandler);
 Future<Void> basicPublishWithDeliveryTag(String exchange, String routingKey, BasicProperties properties, Buffer body, @Nullable Handler<Long> deliveryTagHandler);

这是 RabbitMqClient::basicPublishWithDeliveryTag 的列表。

消费消息

从队列中消费消息。

// 您可以从队列创建一个消息 stream
client.basicConsumer("my.queue", rabbitMQConsumerAsyncResult -> {
  if (rabbitMQConsumerAsyncResult.succeeded()) {
    System.out.println("RabbitMQ consumer created !");
    RabbitMQConsumer mqConsumer = rabbitMQConsumerAsyncResult.result();
    mqConsumer.handler(message -> {
      System.out.println("Got message: " + message.body().toString());
    });
  } else {
    rabbitMQConsumerAsyncResult.cause().printStackTrace();
  }
});

任何时候您都可以暂停、或者继续 stream。当 stream 暂停时,您将不会接收到任何消息。

consumer.pause();
consumer.resume();

当您要创建一个消费 stream 时,有一组选项可供选择。

QueueOptions 允许您进行定制化:

  • 可以用 setMaxInternalQueueSize 来设置内部队列的长度

  • 使用 setKeepMostRecent 可以设置 stream 是否保留更多的最近消息

QueueOptions options = new QueueOptions()
  .setMaxInternalQueueSize(1000)
  .setKeepMostRecent(true);

client.basicConsumer("my.queue", options, rabbitMQConsumerAsyncResult -> {
  if (rabbitMQConsumerAsyncResult.succeeded()) {
    System.out.println("RabbitMQ consumer created !");
  } else {
    rabbitMQConsumerAsyncResult.cause().printStackTrace();
  }
});

当您想要停止从队列中消费消息,那么您可以参照下面的例子:

rabbitMQConsumer.cancel(cancelResult -> {
  if (cancelResult.succeeded()) {
    System.out.println("Consumption successfully stopped");
  } else {
    System.out.println("Tired in attempt to stop consumption");
    cancelResult.cause().printStackTrace();
  }
});

当队列不再处理任何消息时,您会收到结束处理程序的通知:

rabbitMQConsumer.endHandler(v -> {
  System.out.println("It is the end of the stream");
});

您可以设置专门处理异常的 handler,在程序运行出错时用它收到通知。

consumer.exceptionHandler(e -> {
  System.out.println("An exception occurred in the process of message handling");
  e.printStackTrace();
});

最后,您可能想要查找消费者标签:(译者注:费者标签可以由客户端或者服务器来生成,用于消费者的身份识别。详见官方文档:http://rabbitmq.mr-ping.com/ClientDocumentation/java-api-guide.html)

String consumerTag = consumer.consumerTag();
System.out.println("Consumer tag is: " + consumerTag);

获取消息

从队列中获取消息

client.basicGet("my.queue", true, getResult -> {
  if (getResult.succeeded()) {
    RabbitMQMessage msg = getResult.result();
    System.out.println("Got message: " + msg.body());
  } else {
    getResult.cause().printStackTrace();
  }
});

手动确认消费的消息

(译者注:设置autoAck = false时,需要手动对投递到 Consumer 的消息进行确认)

client.basicConsumer("my.queue", new QueueOptions().setAutoAck(false), consumeResult -> {
  if (consumeResult.succeeded()) {
    System.out.println("RabbitMQ consumer created !");
    RabbitMQConsumer consumer = consumeResult.result();

    // Set the handler which messages will be sent to
    consumer.handler(msg -> {
      JsonObject json = (JsonObject) msg.body();
      System.out.println("Got message: " + json.getString("body"));
      // ack
      client.basicAck(json.getLong("deliveryTag"), false, asyncResult -> {
      });
    });
  } else {
    consumeResult.cause().printStackTrace();
  }
});

运行测试

为此,您需要安装完 RabbitMQ,并且在本地使用默认端口运行它