Vert.x MQTT

使用 Vert.x MQTT

这个组件已经在 Vertx 栈中正式发布了,现在只需要在构建描述符中添加如下内容即可使用:

  • Maven (在 pom.xml 文件中):

<dependency>
   <groupId>io.vertx</groupId>
   <artifactId>vertx-mqtt</artifactId>
   <version>4.1.8</version>
</dependency>
  • Gradle (in your build.gradle file):

compile io.vertx:vertx-mqtt:4.1.8

Vert.x MQTT 服务端

这个组件提供了一个服务,它能处理远程 MQTT 连接,通信和信息交换。 它的API提供了一系列接收客户端原生协议消息的事件,并且提供了一些发送信息到客户端的功能。

它不是一个功能齐全的 MQTT broker,但可以用来建立类似的东西或者进行协议转换

警告
这个模块还处于技术预览阶段,这意味着它的API在接下来的版本中可能会改变

开始

处理客户端连接/断开

这个例子展示了如何处理一个来自远程 MQTT 客户端的请求,首先,它会创建一个 MqttServer 实例, 然后使用 endpointHandler 方法指定一个处理器来处理远程客户端发送的CONNECT信息,一个 MqttEndpoint 实例会作为 handler 的参数传入,它携带了所有与CONNECT消息相关联的主要信息,例如客户端标识符,用户名/密码,"will"信息,session 清除标志,协议版本和保活超时等。 在 handler 内, endpoint 实例提供 accept 方法以相应的 CONNACK 消息响应远程客户端,通过这种方式,成功建立连接。 最后,通过 listen 方法启动一个默认的服务端(运行在 localhost 上并且默认 MQTT 端口为 1883), 这个方法同样允许指定一个 handler 来检查是否服务器是否已经正常启动。

MqttServer mqttServer = MqttServer.create(vertx);
mqttServer.endpointHandler(endpoint -> {

  // shows main connect info
  System.out.println("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession());

  if (endpoint.auth() != null) {
    System.out.println("[username = " + endpoint.auth().getUsername() + ", password = " + endpoint.auth().getPassword() + "]");
  }
  if (endpoint.will() != null) {
    System.out.println("[will topic = " + endpoint.will().getWillTopic() + " msg = " + new String(endpoint.will().getWillMessageBytes()) +
      " QoS = " + endpoint.will().getWillQos() + " isRetain = " + endpoint.will().isWillRetain() + "]");
  }

  System.out.println("[keep alive timeout = " + endpoint.keepAliveTimeSeconds() + "]");

  // accept connection from the remote client
  endpoint.accept(false);

})
  .listen(ar -> {

    if (ar.succeeded()) {

      System.out.println("MQTT server is listening on port " + ar.result().actualPort());
    } else {

      System.out.println("Error on starting the server");
      ar.cause().printStackTrace();
    }
  });

当远程客户端发送一个 DISCONNECT 消息来主动断开与服务端的连接,这个 endpoint 实例提供了一个 disconnectHandler 方法来指定 handler 进行处理, 该 handler 没有参数

endpoint.disconnectHandler(v -> {

  System.out.println("Received disconnect from client");
});

支持使用SSL / TLS 处理客户端连接/断开连接

服务端支持通过 SSL/TLS 方式来授权和加密客户端的连接请求,为了做到这一点,MqttServerOptions 类提供了 setSsl 方法来启用 SSL/TLS(值设为 true),以及一些其他有用的方法来配置服务端证书和相关私钥(作为 java 键存储引用,PEM 或 PFX 格式),在下面的例子中,setKeyCertOptions 方法可以用来配置PEM格式的证书, 这个方法需要一个 KeyCertOptions 接口的实例作为参数传入。与此同时,PemKeyCertOptions 类提供了 setCertPathsetKeyPath 来分别设置 服务端的证书和私钥的路径。 MQTT 服务端需要一个 Vert.x 实例和一个上面提到的 MQTT 配置实例作为参数来启动。

MqttServerOptions options = new MqttServerOptions()
  .setPort(8883)
  .setKeyCertOptions(new PemKeyCertOptions()
    .setKeyPath("./src/test/resources/tls/server-key.pem")
    .setCertPath("./src/test/resources/tls/server-cert.pem"))
  .setSsl(true);

MqttServer mqttServer = MqttServer.create(vertx, options);
mqttServer.endpointHandler(endpoint -> {

  // shows main connect info
  System.out.println("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession());

  if (endpoint.auth() != null) {
    System.out.println("[username = " + endpoint.auth().getUsername() + ", password = " + endpoint.auth().getPassword() + "]");
  }
  if (endpoint.will() != null) {
    System.out.println("[will topic = " + endpoint.will().getWillTopic() + " msg = " + new String(endpoint.will().getWillMessageBytes()) +
      " QoS = " + endpoint.will().getWillQos() + " isRetain = " + endpoint.will().isWillRetain() + "]");
  }

  System.out.println("[keep alive timeout = " + endpoint.keepAliveTimeSeconds() + "]");

  // accept connection from the remote client
  endpoint.accept(false);

})
  .listen(ar -> {

    if (ar.succeeded()) {

      System.out.println("MQTT server is listening on port " + ar.result().actualPort());
    } else {

      System.out.println("Error on starting the server");
      ar.cause().printStackTrace();
    }
  });

通过 WebSocket 处理客户端的连接

如果您想要通过 WebSocket 来进行连接,可以通过 MqttServerOptions 将其启用, 调用 setUseWebSocket 方法并设置参数为 true , 它将会监听 /mqtt 路径上所有的 websocket 连接。

与其他连接的配置方式一样,这种方式下 endpoint 的连接创建以及连接断开与常规的连接管理方式相同。

DeploymentOptions options = new DeploymentOptions().setInstances(10);
vertx.deployVerticle("com.mycompany.MyVerticle", options);

处理客户端 订阅/退订 请求

在客户端和服务端的连接建立后,客户端可以发送 SUBSCRIBE 消息以订阅主题。 MqttEndpoint 允许使用 subscribeHandler 方法来指定一个 handler 处理到来的订阅请求,这个 handler 接收一个 MqttSubscribeMessage 类型的实例,该实例携带了主题列表以及客户端指定的 QoS 等级。 最后,这个 endpoint 实例提供了 subscribeAcknowledge 方法来回复一个包含相关许可 QoS 等级的 SUBACK 消息给客户端。

endpoint.subscribeHandler(subscribe -> {

  List<MqttQoS> grantedQosLevels = new ArrayList<>();
  for (MqttTopicSubscription s: subscribe.topicSubscriptions()) {
    System.out.println("Subscription for " + s.topicName() + " with QoS " + s.qualityOfService());
    grantedQosLevels.add(s.qualityOfService());
  }
  // ack the subscriptions request
  endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQosLevels);

});

相应的,也可以使用 endpoint 上的 unsubscribeHandler 方法来指定一个 handler 来处理客户端的UNSUBSCRIBE消息, 这个 handler 接收一个携带退订主题列表的 MqttUnsubscribeMessage 类型实例作为参数。 最后,这个 endpoint 实例提供了 unsubscribeAcknowledge 方法来回复客户端相关的UNSUBACK消息。

endpoint.unsubscribeHandler(unsubscribe -> {

  for (String t: unsubscribe.topics()) {
    System.out.println("Unsubscription for " + t);
  }
  // ack the subscriptions request
  endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
});

处理客户端发布的消息

为了处理远程客户端发布的消息,MqttEndpoint 接口提供了 publishHandler 方法来指定一个 handler, 这个handler接收一个 MqttPublishMessage 类型的实例作为参数,该实例 包含了载荷信息,QoS 等级以及复制和保留标识。

如果 QoS 等级是 0(AT_MOST_ONCE),endpoint 就没有必要回复客户端了。

如果 QoS 等级是 1(AT_LEAST_ONCE),endpoint 需要使用 publishAcknowledge 方法回复一个 PUBACK 消息给客户端

如果 QoS 等级是 2(EXACTLY_ONCE),endpoint 需要使用 publishReceived 方法回复一个PUBREC消息给客户端。 在这种情况下,这个 endpoint 同时也要通过 publishReleaseHandler 指定一个 handler 来处理来自客户端的PUBREL(远程客户端接收到 endpoint 发送的 PUBREC 后发送的)消息 为了结束 QoS 等级为2的消息的传递,endpoint 可以使用 publishComplete 方法发送一个 PUBCOMP 消息给客户端。

endpoint.publishHandler(message -> {

  System.out.println("Just received message [" + message.payload().toString(Charset.defaultCharset()) + "] with QoS [" + message.qosLevel() + "]");

  if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
    endpoint.publishAcknowledge(message.messageId());
  } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
    endpoint.publishReceived(message.messageId());
  }

}).publishReleaseHandler(messageId -> {

  endpoint.publishComplete(messageId);
});

发布消息到客户端

可以使用 publish 方法发布一个消息到远程客户端,该方法需要补充一下参数: 发布主题,消息载荷,QoS 等级,复制和保留标识。

如果 QoS 等级是 0(AT_MOST_ONCE),endpoint 就不会收到任何客户端的响应

如果 QoS 等级是 1(AT_LEAST_ONCE),endpoint 需要处理客户端的PUBACK消息,为了收到最后的确认消息,需要使用 publishAcknowledgeHandler 指定一个handler来接收。

如果 QoS 等级是 2(EXACTLY_ONCE),endpoint 需要处理客户端的PUBREC消息,可以通过 publishReceivedHandler 方法指定一个handler来实现。 在这个handler内,endpoint 可以使用 publishRelease 方法回复客户端 PUBREL 消息。最后一步是处理来自客户端的PUBCOMP消息作为已发布消息的最终确认。 这可以使用 publishCompletionHandler 方法指定一个handler来处理最终接收到的 PUBCOMP 消息。

endpoint.publish("my_topic",
  Buffer.buffer("Hello from the Vert.x MQTT server"),
  MqttQoS.EXACTLY_ONCE,
  false,
  false);

// specifing handlers for handling QoS 1 and 2
endpoint.publishAcknowledgeHandler(messageId -> {

  System.out.println("Received ack for message = " +  messageId);

}).publishReceivedHandler(messageId -> {

  endpoint.publishRelease(messageId);

}).publishCompletionHandler(messageId -> {

  System.out.println("Received ack for message = " +  messageId);
});

客户端保活通知

MQTT 底层的保活机制是由服务端内部处理的。当接收到CONNECT消息,服务端解析消息中指定的保活超时时间以便于检查客户端在这段时间内是否有发送消息, 与此同时,没收到一个 PINGREQ 消息,服务端都会回复一个相关的 PINGRESP 消息。 尽管上层应用不需要处理这些,MqttEndpoint 依然提供了 pingHandler 方法来选定一个handler 来自客户端的 PINGREQ 消息。对于应用程序来说这只是一个通知,客户端只会发送一个用于检测保活的没有任何意义的 ping 消息。无论如何,PINGRESP 都会被服务端内部自动发送。

endpoint.pingHandler(v -> {

  System.out.println("Ping received from client");
});

关闭服务端

MqttServer 提供了 close 方法来关闭服务。 他会停止监听到来的连接以及关闭所有已经建立的连接,该方法是一个异步方法,并且可以指定一个成功回调 handler,这个 handler 会在服务端完全关闭后被调用

mqttServer.close(v -> {

  System.out.println("MQTT server closed");
});

在 verticles 中自动清理

如果您是在 verticles 内部创建的 MQTT 服务端,当 verticle 卸载时这些服务端会被自动关闭。

扩展:共享 MQTT 服务器

与MQTT服务器相关的 handler 总是在同一个 event loop 线程中执行。这意味着在一个多核系统中,仅有一个实例被部署,一个核被使用。 为了使用更多的核,可以部署更多的 MQTT 服务端实例 可以通过编程方式实现:

for (int i = 0; i < 10; i++) {

  MqttServer mqttServer = MqttServer.create(vertx);
  mqttServer.endpointHandler(endpoint -> {
    // handling endpoint
  })
    .listen(ar -> {

      // handling start listening
    });

}

或者使用一个 verticle 指定实例的数量:

DeploymentOptions options = new DeploymentOptions().setInstances(10);
vertx.deployVerticle("com.mycompany.MyVerticle", options);

实际上,尽管仅有一个MQTT服务器被部署, 但是当传入的连接到达时,会被 Vert.x 使用轮转算法分发到不同的核上运行的处理器(handlers)上。

Vert.x MQTT 客户端

这个组件提供了一个符合3.1.1版本规范的 MQTT 客户端,它的 API 提供了一系列方法来处理连接建立/断开,发布消息(完整支持3种不同等级的 QoS)以及主题订阅

警告
这个模块还处于技术预览阶段,这意味着它的API在接下来的版本中可能会改变

开始

连接建立/连接断开

这个客户端让您可以与服务端建立连接或者断开连接。 相应的,您可以通过构造函数的方式传入一个 MqttClientOptions 类型的实例 来指定想要建立连接的服务端的地址和端口号。

正如下面这个例子所展示的,您可以使用 Vert.x MQTT 客户端实例,分别调用 connectdisconnect 方法 来完成与服务端的连接建立或者断开。

MqttClient client = MqttClient.create(vertx);

client.connect(1883, "mqtt.eclipse.org", s -> {
  client.disconnect();
});
注意
如果您在使用 SSL/TSL,服务端 MqttClientOptions 提供的默认的地址是 localhost:1883 和 localhost:8883 。

订阅主题消息

现在,让我们再仔细看一下这个示例:

client.publishHandler(s -> {
  System.out.println("There are new message in topic: " + s.topicName());
  System.out.println("Content(as string) of the message: " + s.payload().toString());
  System.out.println("QoS: " + s.qosLevel());
})
  .subscribe("rpi2/temp", 2);

这里我们有一个使用 subscribe 方法的例子, 为了接收到主题为 rpi2/temp 的消息, 我们调用了 subscribe 方法, 因此,为了能接收到的服务端的消息,您需要提供一个 handler,每当您订阅的主题有新的消息传来,这个 handler 就会被调用。 正如这个实例描述的,您需要通过 publishHandler 方法来指定 handler。

发布主题消息

如果您想要发布消息到主题上去就需要使用 publish 方法。 让我们来看下面这个示例:

client.publish("temperature",
  Buffer.buffer("hello"),
  MqttQoS.AT_LEAST_ONCE,
  false,
  false);

在这个示例中我们发布了消息到名称为 “temperature” 的主题上去。

与服务端保持连接

为了保持与服务端的连接,您需要时不时地发送一些数据到服务端,否则服务端可能会断开连接。 使用 ping 方法来保持连接是一个不错的选择。

重要
您的客户端默认情况下会自动保持与服务端的连接,这也意味着您不需要调用 ping 方法来保活,因为 MqttClient 已经帮您做了这些事。

如果您不想要这个特性,您需要调用 setAutoKeepAlive 方法,设置参数为 false 即可。

options.setAutoKeepAlive(false);

通知时机

  • 发布完成

    您需要调用 publishCompletionHandler 来指定一个handler,这个handler每次发布完成都会被调用。 这一步是非常有用的,因为您可以看到 PUBACK 或者 PUBCOMP 数据包的 packetId。

client.publishCompletionHandler(id -> {
  System.out.println("Id of just received PUBACK or PUBCOMP packet is " + id);
});
  // The line of code below will trigger publishCompletionHandler (QoS 2)
client.publish("hello", Buffer.buffer("hello"), MqttQoS.EXACTLY_ONCE, false, false);
  // The line of code below will trigger publishCompletionHandler (QoS is 1)
client.publish("hello", Buffer.buffer("hello"), MqttQoS.AT_LEAST_ONCE, false, false);
  // The line of code below does not trigger because QoS value is 0
client.publish("hello", Buffer.buffer("hello"), MqttQoS.AT_LEAST_ONCE, false, false);
警告
如果设置发布消息的 QoS=0,这个 handler 就不会被调用。
  • 订阅完成

    client.subscribeCompletionHandler(mqttSubAckMessage -> {
      System.out.println("Id of just received SUBACK packet is " + mqttSubAckMessage.messageId());
      for (int s : mqttSubAckMessage.grantedQoSLevels()) {
        if (s == 0x80) {
          System.out.println("Failure");
        } else {
          System.out.println("Success. Maximum QoS is " + s);
        }
      }
    });
    client.subscribe("temp", 1);
    client.subscribe("temp2", 2);
  • 退订完成

    client
      .unsubscribeCompletionHandler(id -> {
        System.out.println("Id of just received UNSUBACK packet is " + id);
      });
    client.subscribe("temp", 1);
    client.unsubscribe("temp");
  • 退订发布

    client.subscribe("temp", 1);
    client.unsubscribe("temp", id -> {
        System.out.println("Id of just sent UNSUBSCRIBE packet is " + id);
      });
  • 接收 PINGRESP

    client.pingResponseHandler(s -> {
      //The handler will be called time to time by default
      System.out.println("We have just received PINGRESP packet");
    });

使用代理协议

MqttServer mqttServer = MqttServer
  .create(vertx, new MqttServerOptions()
    // 设置是否使用代理为 true
    .setUseProxyProtocol(true));
mqttServer.endpointHandler(endpoint -> {
  // 此处设置的远程地址为真实的 MQTT 服务器地址,而不是代理地址
  System.out.println(endpoint.remoteAddress());
  endpoint.accept(false);

})
  .listen(ar -> {

    if (ar.succeeded()) {

      System.out.println("MQTT server is listening on port " + ar.result().actualPort());
    } else {

      System.out.println("Error on starting the server");
      ar.cause().printStackTrace();
    }
  });

如果您的服务器位于 haproxy 或 nginx 之后,并且您希望获取到 MQTT 客户端真实的 ip 和端口,那么您需要将 setUseProxyProtocol 选项设置为 true

重要
为了使用该特性, 您需要添加 netty-codec-haproxy 依赖项。 但是默认情况下该依赖不会被引入,因此您需要手动添加
  • Maven (在您的 pom.xml 中):

<dependency>
   <groupId>io.netty</groupId>
   <artifactId>netty-codec-haproxy</artifactId>
   <version>4.1.8</version>
</dependency>
  • Gradle (在您的 build.gradle 文件中):

compile io.netty:netty-codec-haproxy:4.1.8