Vert.x-Stomp

STOMP 即简单(流)文本定向消息。 STOMP 提供了一种可交互的消息管线,因此 STOMP 客户端可以与任意的 STOMP 消息中介(broker) 通信,并 基于各种不同的语言,平台与消息中介实现信息的交互和操作。 参阅 https://stomp.github.io/index.html 了解更多关于 STOMP 的信息。

Vertx-Stomp 提供了 STOMP 客户端和服务端的实现。 您可以使用其他客户端连接 Vert.x 的 STOMP 服务端,也可以 使用 Vert.x 的 STOMP 客户端连接其他服务端实现。 Vert.x 提供的服务端和客户端都支持 STOMP 的 1.0,1.1 和 1.2 版本 (参见 https://stomp.github.io/stomp-specification-1.2.html)。 该 STOMP 服务端也可以用于跟 Vert.x 的事件总线桥接, 或者直接与 websocket 客户端建立连接 (使用 StompJS)

使用 vertx-stomp

为了使用 Vert.x Stomp 服务端和客户端, 需要将以下依赖项添加到您的项目构建描述文件 的 依赖配置 中:

  • Maven (在您的 pom.xml):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-stomp</artifactId>
 <version>4.4.0</version>
</dependency>
  • Gradle (在您的 build.gradle 文件中):

compile 'io.vertx:vertx-stomp:4.4.0'

STOMP 服务器

创建 STOMP 服务器

以下是使用默认配置去创建 STOMP 服务器的最简单的方法:

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx))
    .listen();

这会创建一个符合 STOMP 标准的服务器, 它会监听 localhost:61613 地址

您可以使用 listen 方法 去配置服务器监听的主机地址和端口:

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx))
    .listen(1234, "0.0.0.0");

如果您设置端口号为 -1 , TCP 服务器将无法启动。 如果您使用到了 websocket 桥接 这会很有用。 使用以下方式设置回调处理器,以便在服务器就绪时被调用:

StompServer server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx))
    .listen(ar -> {
      if (ar.failed()) {
        System.out.println("Failing to start the STOMP server : " + ar.cause().getMessage());
      } else {
        System.out.println("Ready to receive STOMP frames");
      }
    });

该回调处理器接收一个类型是 StompServer 的引用参数

您也可以使用 StompServerOptions 配置服务器监听的主机地址和端口:

Future<StompServer> server = StompServer.create(vertx, new StompServerOptions().setPort(1234).setHost("0.0.0.0"))
    .handler(StompServerHandler.create(vertx))
    .listen();

关闭 STOMP 服务器

使用如下方式关闭 STOMP 服务器:

server.close(ar -> {
  if (ar.succeeded()) {
    System.out.println("The STOMP server has been closed");
  } else {
    System.out.println("The STOMP server failed to close : " + ar.cause().getMessage());
  }
});

配置服务端

使用 StompServerOptions 配置 STOMP 服务器的多项参数

首先,STOMP 服务器是基于 NetServer 的, 因此您可以在 StompServerOptions 中配置 底层使用的 NetServer。或者您也可以将 您想要使用的 NetServer 实例作为参数传入:

Future<StompServer> server = StompServer.create(vertx, netServer)
    .handler(StompServerHandler.create(vertx))
    .listen();

该配置类 StompServerOptions 可以让您设置以下选项:

  • STOMP 服务器默认监听的主机地址和端口 - 默认为 0.0.0.0:61613.

  • STOMP 服务器是否使用安全验证 - 默认为 false

  • STOMP 协议消息体的最大尺寸限制 - 默认为 10 Mb

  • STOMP 协议消息体的最大首部限制 - 默认为 1000

  • STOMP 协议消息体中首部行长度的最大限制 - 默认为 10240

  • STOMP 协议的心跳时间 - 默认为 1000, 1000

  • 支持的 STOMP 协议版本 (默认为 1.0, 1.1 和 1.2)

  • STOMP 协议中事务所支持的最大消息数量 (默认为 1000)

  • 分块传输的最大尺寸 - 默认为 1000 (参见 setTransactionChunkSize )

  • 一个客户端可以使用的最大订阅数量 - 默认为 1000

如下所示,使用 JsonObject 配置 STOMP 协议的心跳时间:

Future<StompServer> server = StompServer.create(vertx, new StompServerOptions().setHeartbeat(
    new JsonObject().put("x", 1000).put("y", 1000)))
    .handler(StompServerHandler.create(vertx))
    .listen();

要使用安全认证功能,需要提供 AuthenticationProvider 以处理 认证请求:

Future<StompServer> server = StompServer.create(vertx, new StompServerOptions().setSecured(true))
    .handler(StompServerHandler.create(vertx).authProvider(provider))
    .listen();

更多关于 AuthenticationProvider 的信息请参考 此文档.

如果一个消息体的大小超过了限制, 那么它会被拒绝接收,并且客户端会收到一个 ERROR 消息。 按照协议 要求,在发送这种错误消息后,客户端的连接需要立即关闭。 如果发送的消息不符合其他 要求的限制,客户端也需要立即关闭。

订阅

默认的 STOMP 服务器将订阅的目的地点作为普通的字符串处理。 因此它不会对其进行解析 和分级。 默认的 STOMP 服务器使用 topic 模式处理订阅 (因此消息会被分发给所有 对应的订阅客户端)

消息目的地点类型

默认情况下, STOMP 服务器将消息发送的 目的地点 作为 topic 类型处理。 因此消息会被发送给所有的订阅者。 您可以 配置 STOMP 服务器将消息发送的目的地作为 queue 类型处理, 或两种模式都支持:

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx)
        .destinationFactory((v, name) -> {
          if (name.startsWith("/queue")) {
            return Destination.queue(vertx, name);
          } else {
            return Destination.topic(vertx, name);
          }
        }))
    .listen();

在以上代码中, 所有以 /queue 开头的目的地点均被作为 queue 类型处理,而其他的目的地点被当作 topic 类型。 目的地点是在 STOMP 服务器 第一次接收到对应的订阅消息时被创建的

STOMP 服务器通过返回 null 以拒绝目的地点的创建:

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx)
        .destinationFactory((v, name) -> {
          if (name.startsWith("/forbidden")) {
            return null;
          } else if (name.startsWith("/queue")) {
            return Destination.queue(vertx, name);
          } else {
            return Destination.topic(vertx, name);
          }
        }))
    .listen();

在这种情况下, 订阅客户端会收到一个 ERROR 消息

queue 类型的目的地点使用轮询调度策略向订阅者分发消息

提供您自定义的目的地点类型

Vert.x 提供的 STOMP 有意没有实现任何高级特性。 如果您需要更高级的消息分发策略, 可以提供自定义的 DestinationFactory 实现 以返回您自定义的 Destination 实例。

确认消息

默认情况下, STOMP 服务器如果没有收到一个消息的确认消息不会做任何处理。 您可以通过提供您自定义的 Destination 以实现对确认消息的处理

自定义的目的地点类型应该实现

onAckonNack 方法以供 StompServerHandler 实现特定行为:

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx)
        .onAckHandler(acknowledgement -> {
          // Action to execute when the frames (one in `client-individual` mode, several
          // in `client` mode are acknowledged.
        })
        .onNackHandler(acknowledgement -> {
          // Action to execute when the frames (1 in `client-individual` mode, several in
          // `client` mode are not acknowledged.
        }))
    .listen();

自定义 STOMP 服务器配置

除了上文所述的回调, 您还可以配置 STOMP 服务器的几乎所有配置, 例如收到 指定消息类型的响应动作, 发送给客户端的 ping 消息 (为了实现 心跳)。以下是示例

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx)
            .closeHandler(connection -> {
              // 客户端关闭的回调
            })
            .beginHandler(frame -> {
              // 开始传输事务的回调
            })
            .commitHandler(frame -> {
                  // 传输事务完成的回调
                }
            )
        //...
    ).listen();

注意,改变默认的实现可能会导致对 STOMP 标准的破坏。 所以请参考 默认的实现。

STOMP 客户端

STOMP 客户端可以连接到 STOMP 服务器并且接受和发送数据。

创建 STOMP 客户端

通过以下方式使用默认配置创建 StompClient 实例:

StompClient.create(vertx)
  .connect()
  .onSuccess(connection -> {
    // 使用连接
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

以上的代码片段创建了一个客户端连接到 "0.0.0.0:61613"。 一旦连接成功, 您就可以获得一个 StompClientConnection 实例用于和服务端交互。 您可以 通过以下方式配置客户端的连接地址和端口:

StompClient.create(vertx)
  .connect(61613, "0.0.0.0")
  .onSuccess(connection -> {
    // 使用连接
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

为了捕获因为安全认证原因导致的异常, 或其他因为交互时 产生的异常而返回的错误信息, 您可以提供一个 异常处理器 给 Stomp 客户端。 所有 被该 STOMP 客户端创建的连接都会默认继承该异常处理器 (但他们也可以各自单独设置异常处理器):

StompClient.create(vertx)
  .errorFrameHandler(frame -> {
    // 接受错误消息
  })
  .connect(61613, "0.0.0.0")
  .onSuccess(connection -> {
    // 使用连接
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

您也可以使用 StompClientOptions 来配置 STOMP 客户端的连接地址和端口:

StompClient
  .create(vertx, new StompClientOptions().setHost("localhost").setPort(1234))
  .connect()
  .onSuccess(connection -> {
    // 使用连接
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

关闭 STOMP 客户端

您可以关闭 STOMP 客户端:

StompClient client = StompClient
  .create(vertx, new StompClientOptions().setHost("localhost").setPort(1234));

client
  .connect()
  .onSuccess(connection -> {
    // 使用连接
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

client.close();

然而, 使用以上方式关闭客户端不会告知 STOMP 服务器断开连接。 为了显式的断开连接, 您应当 使用 disconnect 方法:

StompClient
  .create(vertx, new StompClientOptions().setHost("localhost").setPort(1234))
  .connect()
  .onSuccess(connection -> {
    // 使用连接
    connection.disconnect();
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

如果您使用了心跳并且 STOMP 客户端在配置的心跳时间内没有检测到服务端的活动, 连接 会自动关闭

错误处理

对于 StompClientConnection 的实例, 您可以注册一个错误处理器用于接收 服务端发送的 ERROR 消息。 注意服务端会在发送该错误消息后关闭连接:

StompClient
  .create(vertx, new StompClientOptions().setHost("localhost").setPort(1234))
  .connect()
  .onSuccess(connection -> {
    // 使用连接
    connection
      .errorHandler(frame ->
        System.out.println("ERROR frame received : " + frame));
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

如果客户端发现当前的连接被丢弃时会收到通知。 STOMP 客户端通过心跳机制 监视连接是否失效。 当 STOMP 服务器在心跳时间窗口内没有发送心跳, 那么连接会被 关闭并且 connectionDroppedHandler 会被调用 (如果设置了的话)。 为了设置 connectionDroppedHandler, 您需要调用 connectionDroppedHandler 方法。该处理器可以起到 让该 STOMP 客户端实例重新连接服务器的作用。

StompClient.create(vertx)
  .connect()
  .onSuccess(connection -> {

    connection.connectionDroppedHandler(con -> {
      // 该连接已被丢失
      // 您需要重新建立连接或切换使用另一个连接
    });

    connection.send("/queue", Buffer.buffer("Hello"))
      .onSuccess(frame -> System.out.println("Message processed by the server")
      );
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

配置客户端

您可以通过在创建 StompClient 时传入 StompClientOptions 以自定义多项配置。 由于 STOMP 客户端底层依赖了 NetClient, 因此您可以在 StompClientOptions 中配置 底层的 NetClient。 或者您也可以在 connect 方法中 传入一个 您想使用的 NetClient

StompClient.create(vertx)
  .connect(netClient)
  .onSuccess(connection -> {
    // 使用连接
    connection
      .errorHandler(frame ->
        System.out.println("ERROR frame received : " + frame));
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

使用 StompClientOptions 可以配置:

  • 连接 STOMP 服务器的地址和端口

  • 连接 STOMP 服务器的登录和密码

  • 如果没有显式设置 content-length 首部,是否自动添加 (默认开启)

  • 是否用 STOMP 指令来替换 CONNECT 指令 (默认关闭)

  • CONNECT 消息中 host 首部是否被忽略 (默认关闭)

  • 心跳时间配置 (默认为 1000, 1000)

订阅

为了订阅消息的目的地点,使用

StompClient.create(vertx)
  .connect()
  .onSuccess(connection -> {
    // 使用连接
    connection.subscribe("/queue", frame ->
      System.out.println("Just received a frame from /queue : " + frame));
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

为了取消订阅,使用:

StompClient.create(vertx)
  .connect()
  .onSuccess(connection -> {
    // 使用连接
    connection.subscribe("/queue", frame ->
      System.out.println("Just received a frame from /queue : " + frame));

    // ....

    connection.unsubscribe("/queue");
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

发送消息

为了发送消息,使用:

StompClient.create(vertx)
  .connect()
  .onSuccess(connection -> {
    Map<String, String> headers = new HashMap<>();
    headers.put("header1", "value1");
    connection.send("/queue", headers, Buffer.buffer("Hello"));
    //没有首部:
    connection.send("/queue", Buffer.buffer("World"));
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

在 Java 和 Groovy 语言中, 您可以使用 Headers 类用于简化首部的创建

确认消息

STOMP 客户端可以发送 ACKNACK 消息:

StompClient.create(vertx)
  .connect()
  .onSuccess(connection -> {
    connection.subscribe("/queue", frame -> {
      connection.ack(frame.getAck());
      // 或者
      connection.nack(frame.getAck());
    });
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

事务

STOMP 客户端也可以创建事务。 ACK, NACKSEND 消息只有在事务被提交时才会 发送。

StompClient.create(vertx)
  .connect()
  .onSuccess(connection -> {
    Map<String, String> headers = new HashMap<>();
    headers.put("transaction", "my-transaction");
    connection.beginTX("my-transaction");
    connection.send("/queue", headers, Buffer.buffer("Hello"));
    connection.send("/queue", headers, Buffer.buffer("World"));
    connection.send("/queue", headers, Buffer.buffer("!!!"));
    connection.commit("my-transaction");
    // 或者
    connection.abort("my-transaction");
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

确认消息

每次发送指令可以设置一个 确认 处理器, 该处理器会在服务端确认处理完消息后被调用:

StompClient.create(vertx)
  .connect()
  .onSuccess(connection -> {
    connection
      .send("/queue", Buffer.buffer("Hello"))
      .onSuccess(frame ->
        System.out.println("Message processed by the server"));
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

使用 STOMP 服务器桥接 Vert.x 事件总线

STOMP 服务器可以桥接 Vert.x 的事件总线。 这种桥接是双向的,这意味着 STOMP 消息 可以被转换为事件总线中的消息,并且事件总线中的消息可以被转化为 STOMP 消息。

为了使用桥接, 您需要配置入站和出站地址。 入站地址是指需要转入事件总线的 STOMP 消息的目的地点。 这些 STOMP 消息的目的地点会被作为事件总线的地址。 出站地址 是指需要被转化为 STOMP 消息的事件总线地址。

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx)
        .bridge(new BridgeOptions()
            .addInboundPermitted(new PermittedOptions().setAddress("/toBus"))
            .addOutboundPermitted(new PermittedOptions().setAddress("/toStomp"))
        )
    )
    .listen();

默认情况下, 桥接使用 发布/订阅 模式发送消息 (topic 模式)。 您也可以配置它使用点对点模式去发送 消息, 这样就只会有一个 STOMP 客户端或事件总线的消费者被调用:

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx)
            .bridge(new BridgeOptions()
                    .addInboundPermitted(new PermittedOptions().setAddress("/toBus"))
                    .addOutboundPermitted(new PermittedOptions().setAddress("/toStomp"))
                    .setPointToPoint(true)
            )
    )
    .listen();

出入站的许可字符串可以作为一个 "正则字符串" 或者一个 匹配项匹配项 是指 消息体需要满足的特定结构。 以下代码展示了消息体必须拥有字段 "foo" ,并且其 值为 "bar"。 结构匹配当前只支持 Json 数据类型。

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx)
        .bridge(new BridgeOptions()
            .addInboundPermitted(new PermittedOptions().setAddress("/toBus")
                .setMatch(new JsonObject().put("foo", "bar")))
            .addOutboundPermitted(new PermittedOptions().setAddress("/toStomp"))
            .setPointToPoint(true)
        )
    )
    .listen();

使用基于 websocket 的 STOMP 服务器

如果您想使用一个 Javascript 客户端 (在 node.js 或浏览器中) 直接连接 STOMP 服务器, 您可以使用 websocket。 STOMP 协议目前完成了适配,可以基于 websocket 工作,参见 StompJS 。 使用 JavaScript 客户端直接连接到 STOMP 服务器并且 使用 websocket 协议发送消息。 这样也可以使用 websocket 接收 STOMP 消息。

为了配置服务器使用 StompJS, 您需要:

  1. 使用 websocket 桥接并且配置 websocket 需要监听的路径 (默认为 /stomp)

  2. 在您的应用中引入 StompJS (您的应用可以是 HTML 页面上的脚本, 或者一个 node 模块 (https://www.npmjs.com/package/stompjs)

  3. 连接到服务器

为了完成第一步, 您首先需要创建一个 HTTP 服务器, 并且将 webSocketHandler 的结果传递给 webSocketHandler 方法:

StompServer server = StompServer.create(vertx, new StompServerOptions()
    .setPort(-1) // 禁用 tcp 端口,这一项是可选的
    .setWebsocketBridge(true) // 开启 websocket 支持
    .setWebsocketPath("/stomp")) // 配置 websocket 路径,默认是 /stomp
    .handler(StompServerHandler.create(vertx));

Future<HttpServer> http = vertx.createHttpServer(
    new HttpServerOptions().setWebSocketSubProtocols(Arrays.asList("v10.stomp", "v11.stomp"))
)
    .webSocketHandler(server.webSocketHandler())
    .listen(8080);

不要忘了声明要支持的子协议, 否则连接会被拒绝。

以下示例代码来自 the StompJS documentation ,展示了 STOMP 客户端如何连接 到服务器:

var url = "ws://localhost:8080/stomp";
var client = Stomp.client(url);
var callback = function(frame) {
  console.log(frame);
};

client.connect({}, function() {
var subscription = client.subscribe("foo", callback);
});

注册接收消息处理器和写入消息处理器

STOMP 客户端, 客户端连接和服务端都支持注册一个接收 Frame 的处理器,该处理器会在每次接收到消息时被调用。 您可以通过 它去打印数据包的日志或实现其他自定义的行为。 该处理器也会在接收 PING 消息, 和其他 非法 / 未知 类型消息时被调用:

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx).receivedFrameHandler(sf -> {
      System.out.println(sf.frame());
    }))
    .listen();

StompClient client = StompClient.create(vertx).receivedFrameHandler(frame -> System.out.println(frame));

该处理器是在消息体被处理前调用的,因此可以使用它来 修改 消息体内容

使用了不合法的指令的消息会被当做 UNKNOWN 指令类型来处理。 原本的指令值会被写入 首部,其字段为 Frame.STOMP_FRAME_COMMAND

您也可以设置一个处理器,该处理器会在数据包被发送前调用 (写入到网络前):

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx))
    .writingFrameHandler(sf -> {
      System.out.println(sf.frame());
    })
    .listen();

StompClient client = StompClient.create(vertx).writingFrameHandler(frame -> {
  System.out.println(frame);
});