Vert.x AMQP Client

Vert.x AMQP 客户端可以与 AMQP 1.0 broker 和 router 互通。他可以做到如下事情:

  • 连接一个 AMQP 的 broker 或 router - 支持 SASL 和 TLS 连接

  • 消费 queue 或 topic 当中的消息

  • 向 queue 或 topic 中发送消息

  • 发送消息后检查ACK

AMQP 1.0 协议支持持久订阅、持久化、安全保障、会话、复杂路由…​…​ 更多 该协议的细节详见 AMQP homepage

Vert.x AMQP 客户端基于 Vert.x Proton 实现。如果您需要更细粒度的控制,我们建议您 直接使用 Vert.x Proton

使用 Vert.x AMQP 客户端

为了使用 Vert.x AMQP 客户端, 需要将以下依赖添加到您构建描述文件 的依赖项配置中:

  • Maven(在您的 pom.xml 文件中):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-amqp-client</artifactId>
 <version>4.2.7</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中):

compile 'io.vertx:vertx-amqp-client:4.2.7'

创建 AMQP 客户端

只要您在 CLASSPATH 中添加该客户端,您就可以按如下方式实例化 AmqpClient 对象

AmqpClientOptions options = new AmqpClientOptions()
  .setHost("localhost")
  .setPort(5672)
  .setUsername("user")
  .setPassword("secret");
// 用内部Vert.x实例创建客户端
AmqpClient client1 = AmqpClient.create(options);

// 显式使用Vert.x实例
AmqpClient client2 = AmqpClient.create(vertx, options);

创建 AmqpClient 对象有两个方法。您可以显式的传一个 Vert.x 实例, 如果您正在 Vert.x 应用或 Vert.x Verticle 内,请用此方式;否则您可以省略传递 Vert.x 参数, 当客户端关闭时,有一个内部的 Vert.x 对象会被创建/关闭。

要创建 AmqpClient ,您需要传 AmqpClientOptions 参数。 这个 options 参数包含了 broker 和 router 的位置信息,认证信息等…​.. 用 options 参数可以配置 AMQP 客户端的许多方面。 注意:您也可以用这些 options 来控制底层的 Proton client 。

也可以从系统参数或者环境变量来配置 host、port、username、password:

  • Host:系统参数: amqp-client-host ,环境变量: AMQP_CLIENT_HOST (强制必填项)

  • Port:系统参数: amqp-client-port ,环境变量: AMQP_CLIENT_PORT (默认为5672)

  • Username:系统参数: amqp-client-username ,环境变量: AMQP_CLIENT_USERNAME

  • Password:系统参数: amqp-client-password ,环境变量: AMQP_CLIENT_PASSWORD (默认为5672)

建立连接

一旦您创建了一个客户端,您需要显式地连接远程服务。 这可用 connect 方法实现:

client.connect(ar -> {
  if (ar.failed()) {
    System.out.println("Unable to connect to the broker");
  } else {
    System.out.println("Connection succeeded");
    AmqpConnection connection = ar.result();
  }
});

一旦 连接 建立成功或失败,则相应处理器会被调用。值得注意的是 连接 用于建立 receiver 和 sender。

创建receiver

receiver 用于接收消息。AMQP receiver 可以用如下两种方式获取:

connection.createReceiver("my-queue",
  done -> {
    if (done.failed()) {
      System.out.println("Unable to create receiver");
    } else {
      AmqpReceiver receiver = done.result();
      receiver.handler(msg -> {
        // 每次接收到消息就被调用
        System.out.println("Received " + msg.bodyAsString());
      });
    }
  }
);

connection.createReceiver("my-queue",
  done -> {
    if (done.failed()) {
      System.out.println("Unable to create receiver");
    } else {
      AmqpReceiver receiver = done.result();
      receiver
        .exceptionHandler(t -> {
          // 抛出异常
        })
        .handler(msg -> {
          // 关联消息处理器
        });
    }
  }
);

这两种方式的主要区别在于 何时 关联消息处理器。第一种方式 直接设置处理器,并且立即开始接收消息;第二种方式中, 处理器在连接创建完成之后被人工关联。这样,您可以获得更多控制权并添加其他的处理器。

在 completion handler 中传入的 receiver 可以作为Stream来使用。所以您可以暂停、恢复 消息的接收。背压协议(back-pressure protocol)由 AMQP credits 实现。

接收到的消息是 AmqpMessage 实例。这些实例是不可变(immutable)的, 并且支持访问大多数 AMQP 元数据。请查看 properties 列表作参考。注意: 要从消息体中获取 JSON object 或 JSON array ,那么作为 AMQP 数据 的值则是必须的。

您也可以用客户端直接创建 receiver :

client.createReceiver("my-queue"
  ,
  done -> {
    if (done.failed()) {
      System.out.println("Unable to create receiver");
    } else {
      AmqpReceiver receiver = done.result();
      receiver.handler(msg -> {
        // 每次接收消息时都被调用
        System.out.println("Received " + msg.bodyAsString());
      });
    }
  }
);

这个示例中,连接是自动创建的。您可以用 connection 方法获取它。

默认情况下,消息自动发送 ACK 响应,您可以用 setAutoAcknowledgement 来禁用这个此操作。然后您则需要用如下方法 显式的发送 ACK: * accepted * rejected * released

创建 sender

sender 可以将消息发送到 queue 和 topic 当中。您可以通过如下方式获取到 sender:

connection.createSender("my-queue", done -> {
  if (done.failed()) {
    System.out.println("Unable to create a sender");
  } else {
    AmqpSender result = done.result();
  }
});

一旦您获取了 AMQP sender,您就可以创建消息。 因为 AmqpMessage 是不可变(immutable)的,所以要用 AmqpMessageBuilder 类来执行创建操作。 以下是一些例子:

AmqpMessageBuilder builder = AmqpMessage.create();

// 一条普通的消息
AmqpMessage m1 = builder.withBody("hello").build();

// 指定了地址的消息
AmqpMessage m2 = builder.withBody("hello").address("another-queue").build();

// 带有JSON消息体、元数据、TTL的消息
AmqpMessage m3 = builder
  .withJsonObjectAsBody(new JsonObject().put("message", "hello"))
  .subject("subject")
  .ttl(10000)
  .applicationProperties(new JsonObject().put("prop1", "value1"))
  .build();

在您创建 sender 和消息之后,您可以用如下方法发送消息:

以下是最简单的发消息方式:

sender.send(AmqpMessage.create().withBody("hello").build());

发送消息时,您可以监控其 ACK

sender.sendWithAck(AmqpMessage.create().withBody("hello").build(), acked -> {
  if (acked.succeeded()) {
    System.out.println("Message accepted");
  } else {
    System.out.println("Message not accepted");
  }
});

注意:如果传输状态为 ACCEPTED ,那么就视为该消息已收到 ACK。 其他情况则视为未收到 ACK(详细的信息可以从回传的 cause 中获得)。

AmqpSender 可以用作 write stream。流的控制是用 AMQP credits 实现的

您也可以用客户端直接生成 sender:

client.createSender("my-queue", maybeSender -> {
  //...
});

这个示例中,连接是自动建立的。您可以用 connection 获取它。

实现 request-reply

要实现 request-reply ,您可以用动态 receiver 和匿名 sender。动态 receiver 不关联于用户创建的 address,而是由 broker 提供这个 address。匿名 sender 也不和指定的 address 关联, 它要求所有的消息都包含一个address。

以下便展示了如何实现 request-reply:

connection.createAnonymousSender(responseSender -> {
  // 获取匿名sender用于响应消息
  // 注册 main receiver:
  connection.createReceiver("my-queue", done -> {
    if (done.failed()) {
      System.out.println("Unable to create receiver");
    } else {
      AmqpReceiver receiver = done.result();
      receiver.handler(msg -> {
        // 获取到了消息,响应之
        responseSender.result().send(AmqpMessage.create()
          .address(msg.replyTo())
          .correlationId(msg.id()) // 发送消息id作为关联的id
          .withBody("my response to your request")
          .build()
        );
      });
    }
  });
});

// sender端(发送请求并等待接收响应)
connection.createDynamicReceiver(replyReceiver -> {
  // 获取receiver,address由broker提供
  String replyToAddress = replyReceiver.result().address();

  // 关联处理器用于接收响应
  replyReceiver.result().handler(msg -> {
    System.out.println("Got the reply! " + msg.bodyAsString());
  });

  // 创建sender并发送消息:
  connection.createSender("my-queue", sender -> {
    sender.result().send(AmqpMessage.create()
      .replyTo(replyToAddress)
      .id("my-message-id")
      .withBody("This is my request").build());
  });
});

要响应一个消息,就要将它回应到指定的 address。另外,用 message id 作为 correlation id 是一个好的做法, 这样响应的接收者可以将响应与请求相关联。

关闭客户端

一旦您创建了 recever 或 sender 的连接,那么您需要用 close 方法关闭他们。 即关闭连接以及所有相关的 reciever 和 sender。

一旦客户端不再使用了,您就必须关闭它。这同时会关闭所有连接, 最终关闭 receiver 和 sender。