Vert.x MQTT

使用 Vert.x MQTT

这个组件已经在Vertx栈中正式发布了,现在只需要在构建描述符中添加如下内容即可使用:

  • Maven (在 pom.xml 文件中):

<dependency>
   <groupId>io.vertx</groupId>
   <artifactId>vertx-mqtt</artifactId>
   <version>4.0.3</version>
</dependency>
  • Gradle (in your build.gradle file):

compile io.vertx:vertx-mqtt:4.0.3

Vert.x MQTT 服务端

这个组件提供了一个服务,它能处理远程 MQTT 连接,通信和信息交换。 它的API提供了一系列接收客户端原生协议消息的事件,并且提供了一些发送信息到客户端的功能。

它不是一个功能齐全的MQTT broker,但可以用来建立类似的东西或者进行协议转换

警告
这个模块还处于技术预览阶段,这意味着它的API在接下来的版本中可能会改变

开始

处理客户端连接/断开

这个例子展示了如何处理一个来自远程MQTT客户端的请求,首先,它会创建一个 MqttServer 实例, 然后使用endpointHandler方法指定一个处理器来处理远程客户端发送的CONNECT信息,一个 MqttEndpoint 实例会作为handler的参数传入,它携带了所有与CONNECT消息相关联的主要信息,例如客户端标识符,用户名/密码,"will"信息,session清除标志,协议版本和保活超时等。 在handler内,endpoint 实例提供accept方法以相应的CONNACK消息响应远程客户端,通过这种方式,成功建立连接。 最后,通过 listen 方法启动一个默认的服务端(运行在localhost上并且默认MQTT端口为1883), 这个方法同样允许指定一个 handler 来检查是否服务器是否已经正常启动。

MqttServer mqttServer = MqttServer.create(vertx);
mqttServer.endpointHandler(endpoint -> {

  // shows main connect info
  System.out.println("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession());

  if (endpoint.auth() != null) {
    System.out.println("[username = " + endpoint.auth().getUsername() + ", password = " + endpoint.auth().getPassword() + "]");
  }
  if (endpoint.will() != null) {
    System.out.println("[will topic = " + endpoint.will().getWillTopic() + " msg = " + new String(endpoint.will().getWillMessageBytes()) +
      " QoS = " + endpoint.will().getWillQos() + " isRetain = " + endpoint.will().isWillRetain() + "]");
  }

  System.out.println("[keep alive timeout = " + endpoint.keepAliveTimeSeconds() + "]");

  // accept connection from the remote client
  endpoint.accept(false);

})
  .listen(ar -> {

    if (ar.succeeded()) {

      System.out.println("MQTT server is listening on port " + ar.result().actualPort());
    } else {

      System.out.println("Error on starting the server");
      ar.cause().printStackTrace();
    }
  });

当远程客户端发送一个 DISCONNECT 消息来主动断开与服务端的连接,这个 endpoint 实例提供了一个 disconnectHandler 方法来指定 handler 进行处理, 该handler没有参数

endpoint.disconnectHandler(v -> {

  System.out.println("Received disconnect from client");
});

支持使用SSL / TLS 处理客户端连接/断开连接

服务端支持通过SSL/TLS方式来授权和加密客户端的连接请求,为了做到这一点,MqttServerOptions 类提供了 setSsl 方法来启用 SSL/TLS(值设为true),以及一些其他有用的方法来配置服务端证书和相关私钥(作为java键存储引用,PEM或PFX格式),在下面的例子中,setKeyCertOptions 方法可以用来配置PEM格式的证书, 这个方法需要一个 KeyCertOptions 接口的实例作为参数传入。与此同时,PemKeyCertOptions 类提供了 setCertPathsetKeyPath 来分别设置 服务端的证书和私钥的路径。 MQTT服务端需要一个 Vert.x实例和一个上面提到的MQTT配置实例作为参数来启动。

MqttServerOptions options = new MqttServerOptions()
  .setPort(8883)
  .setKeyCertOptions(new PemKeyCertOptions()
    .setKeyPath("./src/test/resources/tls/server-key.pem")
    .setCertPath("./src/test/resources/tls/server-cert.pem"))
  .setSsl(true);

MqttServer mqttServer = MqttServer.create(vertx, options);
mqttServer.endpointHandler(endpoint -> {

  // shows main connect info
  System.out.println("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession());

  if (endpoint.auth() != null) {
    System.out.println("[username = " + endpoint.auth().getUsername() + ", password = " + endpoint.auth().getPassword() + "]");
  }
  if (endpoint.will() != null) {
    System.out.println("[will topic = " + endpoint.will().getWillTopic() + " msg = " + new String(endpoint.will().getWillMessageBytes()) +
      " QoS = " + endpoint.will().getWillQos() + " isRetain = " + endpoint.will().isWillRetain() + "]");
  }

  System.out.println("[keep alive timeout = " + endpoint.keepAliveTimeSeconds() + "]");

  // accept connection from the remote client
  endpoint.accept(false);

})
  .listen(ar -> {

    if (ar.succeeded()) {

      System.out.println("MQTT server is listening on port " + ar.result().actualPort());
    } else {

      System.out.println("Error on starting the server");
      ar.cause().printStackTrace();
    }
  });

通过 WebSocket 处理客户端的连接

如果你想要通过 WebSocket 来进行连接,可以通过 MqttServerOptions 来启用它, 调用 setUseWebSocket 方法并设置参数为 true , 它将会监听 /mqtt 路径上所有的 websocket 连接。

与其他连接的配置方式一样,这种方式下 endpoint 的连接创建以及连接断开与常规的连接管理方式相同。

DeploymentOptions options = new DeploymentOptions().setInstances(10);
vertx.deployVerticle("com.mycompany.MyVerticle", options);

处理客户端 订阅/退订 请求

在客户端和服务端的连接建立后,客户端可以发送 SUBSCRIBE 消息以订阅主题。 MqttEndpoint 允许使用 subscribeHandler 方法来指定一个 handler 处理到来的订阅请求,这个 handler 接收一个 MqttSubscribeMessage 类型的实例,该实例携带了主题列表以及客户端指定的 QoS 等级。 最后,这个 endpoint 实例提供了 subscribeAcknowledge 方法来回复一个包含相关许可 QoS 等级的 SUBACK 消息给客户端。

endpoint.subscribeHandler(subscribe -> {

  List<MqttQoS> grantedQosLevels = new ArrayList<>();
  for (MqttTopicSubscription s: subscribe.topicSubscriptions()) {
    System.out.println("Subscription for " + s.topicName() + " with QoS " + s.qualityOfService());
    grantedQosLevels.add(s.qualityOfService());
  }
  // ack the subscriptions request
  endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQosLevels);

});

相应的,也可以使用endpoint上的 unsubscribeHandler 方法来指定一个handler来处理客户端的UNSUBSCRIBE消息, 这个handler接收一个携带退订主题列表的 MqttUnsubscribeMessage 类型实例作为参数。 最后,这个endpoint实例提供了 unsubscribeAcknowledge 方法来回复客户端相关的UNSUBACK消息。

endpoint.unsubscribeHandler(unsubscribe -> {

  for (String t: unsubscribe.topics()) {
    System.out.println("Unsubscription for " + t);
  }
  // ack the subscriptions request
  endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
});

处理客户端发布的消息

为了处理远程客户端发布的消息,MqttEndpoint 接口提供了 publishHandler 方法来指定一个handler, 这个handler接收一个 MqttPublishMessage 类型的实例作为参数,该实例 包含了载荷信息,QoS 等级以及复制和保留标识。

如果 QoS 等级是 0(AT_MOST_ONCE),endpoint就没有必要回复客户端了。

如果 QoS 等级是 1(AT_LEAST_ONCE),endpoint 需要使用 publishAcknowledge 方法回复一个PUBACK消息给客户端

如果 QoS 等级是 2(EXACTLY_ONCE),endpoint 需要使用 publishReceived 方法回复一个PUBREC消息给客户端。 在这种情况下,这个 endpoint 同时也要通过 publishReleaseHandler 指定一个 handler 来处理来自客户端的PUBREL(远程客户端接收到 endpoint 发送的 PUBREC 后发送的)消息 为了结束 QoS 等级为2的消息的传递,endpoint 可以使用 publishComplete 方法发送一个 PUBCOMP 消息给客户端。

endpoint.publishHandler(message -> {

  System.out.println("Just received message [" + message.payload().toString(Charset.defaultCharset()) + "] with QoS [" + message.qosLevel() + "]");

  if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
    endpoint.publishAcknowledge(message.messageId());
  } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
    endpoint.publishReceived(message.messageId());
  }

}).publishReleaseHandler(messageId -> {

  endpoint.publishComplete(messageId);
});

发布消息到客户端

可以使用 publish 方法发布一个消息到远程客户端,该方法需要补充一下参数: 发布主题,消息载荷,QoS 等级,复制和保留标识。

如果 QoS 等级是 0(AT_MOST_ONCE),endpoint 就不会收到任何客户端的响应

如果 QoS 等级是 1(AT_LEAST_ONCE),endpoint需要处理客户端的PUBACK消息,为了收到最后的确认消息,需要使用 publishAcknowledgeHandler 指定一个handler来接收。

如果 QoS 等级是 2(EXACTLY_ONCE),endpoint 需要处理客户端的PUBREC消息,可以通过 publishReceivedHandler 方法指定一个handler来实现。 在这个handler内,endpoint 可以使用 publishRelease 方法回复客户端 PUBREL 消息。最后一步是处理来自客户端的PUBCOMP消息作为已发布消息的最终确认。 这可以使用 publishCompletionHandler 方法指定一个handler来处理最终接收到的 PUBCOMP 消息。

endpoint.publish("my_topic",
  Buffer.buffer("Hello from the Vert.x MQTT server"),
  MqttQoS.EXACTLY_ONCE,
  false,
  false);

// specifing handlers for handling QoS 1 and 2
endpoint.publishAcknowledgeHandler(messageId -> {

  System.out.println("Received ack for message = " +  messageId);

}).publishReceivedHandler(messageId -> {

  endpoint.publishRelease(messageId);

}).publishCompletionHandler(messageId -> {

  System.out.println("Received ack for message = " +  messageId);
});

客户端保活通知

MQTT底层的保活机制是由服务端内部处理的。当接收到CONNECT消息,服务端解析消息中指定的保活超时时间以便于检查客户端在这段时间内是否有发送消息, 与此同时,没收到一个PINGREQ消息,服务端都会回复一个相关的PINGRESP消息。 尽管上层应用不需要处理这些,MqttEndpoint 依然提供了 pingHandler 方法来选定一个handler 来自客户端的PINGREQ消息。对于应用程序来说这只是一个通知,客户端只会发送一个用于检测保活的没有任何意义的ping消息。无论如何,PINGRESP都会被服务端内部自动发送。

endpoint.pingHandler(v -> {

  System.out.println("Ping received from client");
});

关闭服务端

MqttServer 提供了 close 方法来关闭服务。 他会停止监听到来的连接以及关闭所有已经建立的连接,该方法是一个异步方法,并且可以指定一个成功回调handler,这个handler会在服务端完全关闭后被调用

mqttServer.close(v -> {

  System.out.println("MQTT server closed");
});

在verticles中自动清理

如果你是在verticles内部创建的MQTT服务端,当verticle卸载时这些服务端会被自动关闭。

扩展:共享MQTT服务器

与MQTT服务器相关的handler总是在同一个event loop线程中执行。这意味着在一个多核系统中,仅有一个实例被部署,一个核被使用。 为了使用更多的核,可以部署更多的MQTT服务端实例 可以通过编程方式实现:

for (int i = 0; i < 10; i++) {

  MqttServer mqttServer = MqttServer.create(vertx);
  mqttServer.endpointHandler(endpoint -> {
    // handling endpoint
  })
    .listen(ar -> {

      // handling start listening
    });

}

或者使用一个verticle指定实例的数量:

DeploymentOptions options = new DeploymentOptions().setInstances(10);
vertx.deployVerticle("com.mycompany.MyVerticle", options);

实际上,尽管仅有一个MQTT服务器被部署, 但是当传入的连接到达时,会被Vert.x使用轮转算法分发到不同的核上运行的处理器(handlers)上。

Vert.x MQTT 客户端

这个组件提供了一个符合3.1.1版本规范的 MQTT 客户端,它的API提供了一系列方法来处理连接建立/断开,发布消息(完整支持3种不同等级的 QoS)以及主题订阅

警告
这个模块还处于技术预览阶段,这意味着它的API在接下来的版本中可能会改变

开始

连接建立/连接断开

这个客户端让你可以与服务端建立连接或者断开连接。 相应的,你可以通过构造函数的方式传入一个 MqttClientOptions 类型的实例 来指定想要建立连接的服务端的地址和端口号。

正如下面这个例子所展示的,你可以使用Vert.x MQTT客户端实例,分别调用 connectdisconnect 方法 来完成与服务端的连接建立或者断开。

MqttClient client = MqttClient.create(vertx);

client.connect(1883, "mqtt.eclipse.org", s -> {
  client.disconnect();
});
注意
如果你在使用SSL/TSL,服务端 MqttClientOptions 提供的默认的地址是 localhost:1883 和 localhost:8883 。

订阅主题消息

现在,让我们再仔细看一下这个示例:

client.publishHandler(s -> {
  System.out.println("There are new message in topic: " + s.topicName());
  System.out.println("Content(as string) of the message: " + s.payload().toString());
  System.out.println("QoS: " + s.qosLevel());
})
  .subscribe("rpi2/temp", 2);

这里我们有一个使用 subscribe 方法的例子, 为了接收到主题为 rpi2/temp 的消息, 我们调用了 subscribe 方法, 因此,为了能接收到的服务端的消息,你需要提供一个handler,每当你订阅的主题有新的消息传来,这个handler就会被调用。 正如这个实例描述的,你需要通过 publishHandler 方法来指定handler。

发布主题消息

如果你想要发布消息到主题上去就需要使用 publish 方法。 让我们来看下面这个示例:

client.publish("temperature",
  Buffer.buffer("hello"),
  MqttQoS.AT_LEAST_ONCE,
  false,
  false);

在这个示例中我们发布了消息到名称为 “temperature” 的主题上去。

与服务端保持连接

为了保持与服务端的连接,你需要时不时地发送一些数据到服务端,否则服务端可能会断开连接。 使用 ping 方法来保持连接是一个不错的选择。

重要
你的客户端默认情况下会自动保持与服务端的连接,这也意味着你不需要调用 ping 方法来保活,因为 MqttClient 已经帮你做了这些事。

如果你不想要这个特性,你需要调用 setAutoKeepAlive 方法,设置参数为 false 即可。

options.setAutoKeepAlive(false);

通知时机

  • 发布完成

    你需要调用 publishCompletionHandler 来指定一个handler,这个handler每次发布完成都会被调用。 这一步是非常有用的,因为你可以看到 PUBACK 或者 PUBCOMP 数据包的 packetId。

client.publishCompletionHandler(id -> {
  System.out.println("Id of just received PUBACK or PUBCOMP packet is " + id);
});
  // The line of code below will trigger publishCompletionHandler (QoS 2)
client.publish("hello", Buffer.buffer("hello"), MqttQoS.EXACTLY_ONCE, false, false);
  // The line of code below will trigger publishCompletionHandler (QoS is 1)
client.publish("hello", Buffer.buffer("hello"), MqttQoS.AT_LEAST_ONCE, false, false);
  // The line of code below does not trigger because QoS value is 0
client.publish("hello", Buffer.buffer("hello"), MqttQoS.AT_LEAST_ONCE, false, false);
警告
如果设置发布消息的 QoS=0,这个handler就不会被调用。
  • 订阅完成

    client.subscribeCompletionHandler(mqttSubAckMessage -> {
      System.out.println("Id of just received SUBACK packet is " + mqttSubAckMessage.messageId());
      for (int s : mqttSubAckMessage.grantedQoSLevels()) {
        if (s == 0x80) {
          System.out.println("Failure");
        } else {
          System.out.println("Success. Maximum QoS is " + s);
        }
      }
    });
    client.subscribe("temp", 1);
    client.subscribe("temp2", 2);
  • 退订完成

    client
      .unsubscribeCompletionHandler(id -> {
        System.out.println("Id of just received UNSUBACK packet is " + id);
      });
    client.subscribe("temp", 1);
    client.unsubscribe("temp");
  • 退订发布

    client.subscribe("temp", 1);
    client.unsubscribe("temp", id -> {
        System.out.println("Id of just sent UNSUBSCRIBE packet is " + id);
      });
  • 接收PINGRESP

    client.pingResponseHandler(s -> {
      //The handler will be called time to time by default
      System.out.println("We have just received PINGRESP packet");
    });