RabbitMQ Client for Vert.x

A Vert.x client allowing applications to interact with a RabbitMQ broker (AMQP 0.9.1)

This service is experimental and the APIs are likely to change before settling down.

Getting Started

Maven

Add the following dependency to your maven project

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-rabbitmq-client</artifactId>
 <version>4.0.3</version>
</dependency>

Gradle

Add the following dependency to your gradle project

dependencies {
 compile 'io.vertx:vertx-rabbitmq-client:4.0.3'
}

Create a client

You can create a client instance as follows using a full amqp uri:

RabbitMQOptions config = new RabbitMQOptions();
// full amqp uri
config.setUri("amqp://xvjvsrrc:VbuL1atClKt7zVNQha0bnnScbNvGiqgb@moose.rmq.cloudamqp.com/xvjvsrrc");
RabbitMQClient client = RabbitMQClient.create(vertx, config);

// Connect
client.start(asyncResult -> {
  if (asyncResult.succeeded()) {
    System.out.println("RabbitMQ successfully connected!");
  } else {
    System.out.println("Fail to connect to RabbitMQ " + asyncResult.cause().getMessage());
  }
});

Or you can also specify individual parameters manually:

RabbitMQOptions config = new RabbitMQOptions();
// Each parameter is optional
// The default parameter with be used if the parameter is not set
config.setUser("user1");
config.setPassword("password1");
config.setHost("localhost");
config.setPort(5672);
config.setVirtualHost("vhost1");
config.setConnectionTimeout(6000); // in milliseconds
config.setRequestedHeartbeat(60); // in seconds
config.setHandshakeTimeout(6000); // in milliseconds
config.setRequestedChannelMax(5);
config.setNetworkRecoveryInterval(500); // in milliseconds
config.setAutomaticRecoveryEnabled(true);

RabbitMQClient client = RabbitMQClient.create(vertx, config);

// Connect
client.start(asyncResult -> {
  if (asyncResult.succeeded()) {
    System.out.println("RabbitMQ successfully connected!");
  } else {
    System.out.println("Fail to connect to RabbitMQ " + asyncResult.cause().getMessage());
  }
});

You can set multiples addresses to connect to a cluster;

RabbitMQOptions config = new RabbitMQOptions();
config.setUser("user1");
config.setPassword("password1");
config.setVirtualHost("vhost1");

config.setAddresses(Arrays.asList(Address.parseAddresses("firstHost,secondHost:5672")));

RabbitMQClient client = RabbitMQClient.create(vertx, config);

// Connect
client.start(asyncResult -> {
  if (asyncResult.succeeded()) {
    System.out.println("RabbitMQ successfully connected!");
  } else {
    System.out.println("Fail to connect to RabbitMQ " + asyncResult.cause().getMessage());
  }
});

Recovery and Reconnections

There are two, separate and incompatible, mechanisms for handling reconnections in the RabbitMQClient:

  • Java RabbitMQ client library auto recovery;

  • RabbitMQClient restarts.

Neither mechanism is enabled by default.

The reconnections provided by the Java RabbitMQ client library do not work in all situations (if the connection to the server disconnects nicely the client will shut down and not recover).

[source, java]
In order to use the Java RabbitMQ client library auto recovery it is necessary to both enable it and disable the RabbitMQClient library reconnect attempts:
RabbitMQOptions options = new RabbitMQOptions();
options.setAutomaticRecoveryEnabled(true);
options.setReconnectAttempts(0);

The client library will also attempt topology recovery as detailed in its documentation (this is enabled by default in the library and is not exposed in the RabbitMQClientOptions).

Alternatively the RabbitMQClient may be configured to retry connecting to the RabbitMQ server whenever there is a connection problem. The failure of a connection could be caused by a transient network failure (where the client would probably connect back to the same RabbitMQ server) or it could be caused by a failover scenario. This approach is more brutal than that followed by the client library - the RabbitMQClient restarts work by closing the connections when the client library reports a problem and then repeatedly trying to reconnect from scratch.

The reconnection policy can be configured by setting the setReconnectInterval and setReconnectAttempts properties in the configuration:

RabbitMQOptions options = new RabbitMQOptions();
options.setAutomaticRecoveryEnabled(false);
options.setReconnectAttempts(Integer.MAX_VALUE);
options.setReconnectInterval(500);

The RabbitMQClient reconnections do not feature any form of automatic topology recovery. This can lead to a race condition where messages are sent before the topology on the server is ready (i.e. before exchanges and queues have been created/bound). To provide an opportunity to create these objects before the connection is considered ready the RabbitMQClient provides the ConnectionEstablishedCallback. The ConnectionEstablishedCallback can be used to carry out any operations on the RabbitMQClient before other users (including the RabbitMQConsumer and RabbitMQPublisher) are able to access it.

RabbitMQClient client = RabbitMQClient.create(vertx, config);
client.addConnectionEstablishedCallback(promise -> {
            client.exchangeDeclare("exchange", "fanout", true, false)
                .compose(v -> {
                  return client.queueDeclare("queue", false, true, true);
                })
                .compose(declareOk -> {
                  return client.queueBind(declareOk.getQueue(), "exchange", "");
                })
                .onComplete(promise);
});

// At this point the exchange, queue and binding will have been declared even if the client connects to a new server
client.basicConsumer("queue", rabbitMQConsumerAsyncResult -> {
});

If a RabbitMQConsumer is listening for messages on an auto-delete server-named queue and the broker restarts the queue will have been removed by the time the client reconnects. In this instance it is necessary to both recreate the queue and set the new queue name on the RabbitMQConsumer.

RabbitMQClient client = RabbitMQClient.create(vertx, config);
AtomicReference<RabbitMQConsumer> consumer = new AtomicReference<>();
AtomicReference<String> queueName = new AtomicReference<>();
client.addConnectionEstablishedCallback(promise -> {
      client.exchangeDeclare("exchange", "fanout", true, false)
              .compose(v -> client.queueDeclare("", false, true, true))
              .compose(dok -> {
                  queueName.set(dok.getQueue());
                  // The first time this runs there will be no existing consumer
                  // on subsequent connections the consumer needs to be update with the new queue name
                  RabbitMQConsumer currentConsumer = consumer.get();
                  if (currentConsumer != null) {
                    currentConsumer.setQueueName(queueName.get());
                  }
                  return client.queueBind(queueName.get(), "exchange", "");
              })
              .onComplete(promise);
});

client.start()
        .onSuccess(v -> {
            // At this point the exchange, queue and binding will have been declared even if the client connects to a new server
            client.basicConsumer(queueName.get(), rabbitMQConsumerAsyncResult -> {
                if (rabbitMQConsumerAsyncResult.succeeded()) {
                    consumer.set(rabbitMQConsumerAsyncResult.result());
                }
            });
        })
        .onFailure(ex -> {
            System.out.println("It went wrong: " + ex.getMessage());
        });

Enabling SSL/TLS on the client

The RabbitMQClient can easily configured to use SSL.

RabbitMQOptions options = new RabbitMQOptions()
 .setSsl(true);
Client trust configuration

If trustAll is set to true, the client will trust all server certificates. The connection will still be encrypted but is then vulnerable to 'man in the middle' attacks. Greatbadness, Do not use this option in production! Default value is false.

RabbitMQOptions options = new RabbitMQOptions()
 .setSsl(true)
 .setTrustAll(true));

If trustAll is set to false, proper server authentication will takes place. Three main options are available.

  • Your default truststore already "trusts" the server, in which case all is fine

  • You start the java process with -Djavax.net.ssl.trustStore=xxx.jks specifying the custom trust store

  • You supply a custom trust store via RabbitMQOptions

JKS trust store option
RabbitMQOptions options = new RabbitMQOptions()
 .setSsl(true)
 .setTrustOptions(new JksOptions()
   .setPath("/path/myKeyStore.jks")
   .setPassword("myKeyStorePassword"));
p12/pfx trust store option
RabbitMQOptions options = new RabbitMQOptions()
 .setSsl(true)
 .setPfxTrustOptions(
   new PfxOptions().
     setPath("/path/myKeyStore.p12").
     setPassword("myKeyStorePassword"));
PEM trust option
RabbitMQOptions options = new RabbitMQOptions()
 .setSsl(true)
 .setPemTrustOptions(
   new PemTrustOptions().
     addCertPath("/path/ca-cert.pem"));

Declare exchange with additional config

You can pass additional config parameters to RabbitMQ’s exchangeDeclare method

JsonObject config = new JsonObject();

config.put("x-dead-letter-exchange", "my.deadletter.exchange");
config.put("alternate-exchange", "my.alternate.exchange");
// ...
client.exchangeDeclare("my.exchange", "fanout", true, false, config, onResult -> {
  if (onResult.succeeded()) {
    System.out.println("Exchange successfully declared with config");
  } else {
    onResult.cause().printStackTrace();
  }
});

Declare queue with additional config

You can pass additional config parameters to RabbitMQs queueDeclare method

JsonObject config = new JsonObject();
config.put("x-message-ttl", 10_000L);

client.queueDeclare("my-queue", true, false, true, config, queueResult -> {
  if (queueResult.succeeded()) {
    System.out.println("Queue declared!");
  } else {
    System.err.println("Queue failed to be declared!");
    queueResult.cause().printStackTrace();
  }
});

Operations

The following are some examples of the operations supported by the RabbitMQService API. Consult the javadoc/documentation for detailed information on all API methods.

Publish

Publish a message to a queue

Buffer message = Buffer.buffer("body", "Hello RabbitMQ, from Vert.x !");
client.basicPublish("", "my.queue", message, pubResult -> {
  if (pubResult.succeeded()) {
    System.out.println("Message published !");
  } else {
    pubResult.cause().printStackTrace();
  }
});

Publish with confirm

Publish a message to a queue and confirm the broker acknowledged it.

Buffer message = Buffer.buffer("body", "Hello RabbitMQ, from Vert.x !");

// Put the channel in confirm mode. This can be done once at init.
client.confirmSelect(confirmResult -> {
  if(confirmResult.succeeded()) {
    client.basicPublish("", "my.queue", message, pubResult -> {
      if (pubResult.succeeded()) {
        // Check the message got confirmed by the broker.
        client.waitForConfirms(waitResult -> {
          if(waitResult.succeeded())
            System.out.println("Message published !");
          else
            waitResult.cause().printStackTrace();
        });
      } else {
        pubResult.cause().printStackTrace();
      }
    });
  } else {
    confirmResult.cause().printStackTrace();
  }
});

Reliable Message Publishing

In order to reliably publish messages to RabbitMQ it is necessary to handle confirmations that each message has been accepted by the server. The simplest approach to confirmations is to use the basicPublishWithConfirm approach, above, which synchronously confirms each message when it is sent - blocking the publishing channel until the confirmation is received.

In order to achieve greater throughput RabbitMQ provides asynchronous confirmations. The asynchronous confirmations can confirm multiple messages in one go, so it is necessary for the client to track all messages in the order that they were published. Also, until messages are confirmed by the server it may be necessary to resend them, so they must be retained by the client.

The RabbitMQPublisher class implements a standard approach to handling asynchronous confirmations, avoiding much of the boiler plate code that would otherwise be required.

The RabbitMQPublisher works by: * Adding all sent messages to an internal queue. * Sending messages from the queue when it is able, keeping track of these messages pending acknowledgement in a separate queue. * Handling asynchronous confirmations from RabbitMQ, removing messages from the pendingAck queue once they are confirmed. * Notifying the caller for each message that is confirmed (this is always a single message at a time, not the bulk confirmation used by RabbitMQ).

RabbitMQPublisher publisher = RabbitMQPublisher.create(vertx, client, options);

messages.forEach((k,v) -> {
  com.rabbitmq.client.BasicProperties properties = new AMQP.BasicProperties.Builder()
          .messageId(k)
          .build();
  publisher.publish("exchange", "routingKey", properties, v.toBuffer());
});

publisher.getConfirmationStream().handler(conf -> {
  if (conf.isSucceeded()) {
    messages.remove(conf.getMessageId());
  }
});

Delivery Tags

This section is an implementation detail that is useful for anyone that wants to implement their own alternative to RabbitMQPublisher.

For the RabbitMQPublisher to work it has to know the delivery tag that RabbitMQ will use for each message published. The confirmations from RabbitMQ can arrive at the client before the call to basicPublish has completed, so it is not possible to identify the delivery tag via anything returned by basicPublish if asynchronous confirmations are being used. For this reason it is necessary for the RabbitMQClient to tell the RabbitMQPublisher the delivery tag of each message via a separate callback that occurs in the call to RabbitMQClient::basicPublish before the message is actually sent on the network. It is also possible for the delivery tag of a single message to change (delivery tags are per-channel, so if the message is resent following a reconnection it will have a new delivery tag) - this means that we cannot use a Future to inform the client of the delivery tag. If the deliveryTagHandler is called more than once for a given message it is always safe to ignore the previous value - there can be only one valid delivery tag for a message at any time.

To capture the delivery tag one of the RabbitMqClient::basicPublishWithDeliveryTag methods should be used.

 void basicPublishWithDeliveryTag(String exchange, String routingKey, BasicProperties properties, Buffer body, Handler<Long> deliveryTagHandler, Handler<AsyncResult<Void>> resultHandler);
 Future<Void> basicPublishWithDeliveryTag(String exchange, String routingKey, BasicProperties properties, Buffer body, @Nullable Handler<Long> deliveryTagHandler);

These methods

Consume

Consume messages from a queue.

// Create a stream of messages from a queue
client.basicConsumer("my.queue", rabbitMQConsumerAsyncResult -> {
  if (rabbitMQConsumerAsyncResult.succeeded()) {
    System.out.println("RabbitMQ consumer created !");
    RabbitMQConsumer mqConsumer = rabbitMQConsumerAsyncResult.result();
    mqConsumer.handler(message -> {
      System.out.println("Got message: " + message.body().toString());
    });
  } else {
    rabbitMQConsumerAsyncResult.cause().printStackTrace();
  }
});

At any moment of time you can pause or resume the stream. When stream is paused you won’t receive any message.

consumer.pause();
consumer.resume();

There are actually a set of options to specify when creating a consumption stream.

The QueueOptions lets you specify:

  • The size of internal queue with setMaxInternalQueueSize

  • Should the stream keep more recent messages when queue size is exceed with setKeepMostRecent

QueueOptions options = new QueueOptions()
  .setMaxInternalQueueSize(1000)
  .setKeepMostRecent(true);

client.basicConsumer("my.queue", options, rabbitMQConsumerAsyncResult -> {
  if (rabbitMQConsumerAsyncResult.succeeded()) {
    System.out.println("RabbitMQ consumer created !");
  } else {
    rabbitMQConsumerAsyncResult.cause().printStackTrace();
  }
});

When you want to stop consuming message from a queue, you can do:

rabbitMQConsumer.cancel(cancelResult -> {
  if (cancelResult.succeeded()) {
    System.out.println("Consumption successfully stopped");
  } else {
    System.out.println("Tired in attempt to stop consumption");
    cancelResult.cause().printStackTrace();
  }
});

You can get notified by the end handler when the queue won’t process any more messages:

rabbitMQConsumer.endHandler(v -> {
  System.out.println("It is the end of the stream");
});

You can set the exception handler to be notified of any error that may occur when a message is processed:

consumer.exceptionHandler(e -> {
  System.out.println("An exception occurred in the process of message handling");
  e.printStackTrace();
});

And finally, you may want to retrive a related to the consumer tag:

String consumerTag = consumer.consumerTag();
System.out.println("Consumer tag is: " + consumerTag);

Get

Will get a message from a queue

client.basicGet("my.queue", true, getResult -> {
  if (getResult.succeeded()) {
    RabbitMQMessage msg = getResult.result();
    System.out.println("Got message: " + msg.body());
  } else {
    getResult.cause().printStackTrace();
  }
});

Consume messages without auto-ack

client.basicConsumer("my.queue", new QueueOptions().setAutoAck(false), consumeResult -> {
  if (consumeResult.succeeded()) {
    System.out.println("RabbitMQ consumer created !");
    RabbitMQConsumer consumer = consumeResult.result();

    // Set the handler which messages will be sent to
    consumer.handler(msg -> {
      JsonObject json = (JsonObject) msg.body();
      System.out.println("Got message: " + json.getString("body"));
      // ack
      client.basicAck(json.getLong("deliveryTag"), false, asyncResult -> {
      });
    });
  } else {
    consumeResult.cause().printStackTrace();
  }
});

Running the tests

You will need to have RabbitMQ installed and running with default ports on localhost for this to work.