Vert.x Camel 桥接(Bridge)

Apache Camel (http://camel.apache.org) 是一个开源的Java框架, 它致力于让开发人员更简单的集成和更方便的使用。此桥接允许 Vert.x 程序与 Camel 端点进行交互:

  • 向 Camel 发送消息。

  • 接收来自 Camel 的消息。

此桥接依赖于 Vert.x 的事件总线(Event Bus,后面统一称 事件总线),并将事件总线地址与 Camel 端点进行关联。

小心
此组件不是 多语言 的,因为它依赖的 Camel 中的某些类只能在Java中使用。

使用 vertx-camel-bridge

使用 Vert.x Camel 桥接请将以下依赖项添加到您的 依赖 中:

  • Maven ( 在 pom.xml 文件中 ):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-camel-bridge</artifactId>
 <version>4.1.8</version>
</dependency>
  • Gradle ( 在您的 build.gradle 文件中 ):

compile 'io.vertx:vertx-camel-bridge:4.1.8'

配置桥接

在使用桥接之前,需要对其进行配置:

CamelContext camel = new DefaultCamelContext();
CamelBridge.create(vertx,
    new CamelBridgeOptions(camel)
        .addInboundMapping(InboundMapping.fromCamel("direct:stuff").toVertx("eventbus-address"))
        .addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel("stream:out"))
).start();

此桥接需要一个 CamelContext 上下文,它将从上下文中找到端点。桥接在使用之前,要确保已经启动。 需要注意的是 start 方法是异步的,你可以使用 start 方法来注册桥接启动时的回调。

入站映射

入站映射将 Camel 的端点关联到对应事件总线地址上。 在此端点上接收到的消息将被转换成事件总线的消息。

Endpoint endpoint = camel.getEndpoint("direct:foo");

CamelBridge.create(vertx,
    new CamelBridgeOptions(camel)
        .addInboundMapping(InboundMapping.fromCamel("direct:stuff").toVertx("eventbus-address"))
        .addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address"))
        .addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address")
            .withoutHeadersCopy())
        .addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address")
            .usePublish())
        .addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address")
            .withBodyType(String.class))
);

上面的代码展示了配置入站映射的不同方法:

  • 您可以使用 Endpoint 对象或对应的 url 来配置 Camel 端点。

  • 您可以禁用 header 头的复制( Camel 消息头将会被复制到事件总线的消息中)。

  • 您可以使用 publish 代替 send 来将消息广播给所有事件总线的消费者。

  • 您可以配置事件总线消息体的类型。如果您未配置,则默认使用 Camel消息负载。 如果您配置了,它将在 Camel 上下文中查找 Camel 消息有效负载和所需要类型之间的转换器。

注意: org.fusesource.hawtbuf.Buffer 会自动转换成 Buffer

如果您调用了 send (而不是 publish )方法,并且 Camel 交换 期望收到回复 ( In Out 交换), 那么 Vert.x 代码应回复接受到的消息。回复的消息会被传播到 Camel 交换:

Endpoint endpoint = camel.getEndpoint("direct:stuff");

CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
    .addInboundMapping(new InboundMapping().setAddress("test-reply").setEndpoint(endpoint)));

vertx.eventBus().consumer("with-reply", message -> {
  message.reply("How are you ?");
});

camel.start();
bridge.start();

ProducerTemplate template = camel.createProducerTemplate();
Future<Object> future = template.asyncRequestBody(endpoint, "hello");
String response = template.extractFutureBody(future, String.class);

您还可以通过 setTimeout 方法来配置回复的 超时

出站映射

出站映射将事件总线地址关联到 Camel 的端点上。 在此地址上接收到的消息将被转换成发送到 Camel 对应端点的消息

Endpoint endpoint = camel.getEndpoint("stream:out");

CamelBridge.create(vertx,
    new CamelBridgeOptions(camel)
        .addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel("stream:out"))
        .addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel(endpoint))
        .addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel(endpoint)
            .withoutHeadersCopy())
        .addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel(endpoint))
);

上面的示例展示了配置出站映射的不同方法。

您可以将出站映射链接到 Camel 路由上:

camel.addRoutes(new RouteBuilder() {
  @Override
  public void configure() throws Exception {
    from("direct:start")
        .transform(constant("OK"));
  }
});

CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
    .addOutboundMapping(OutboundMapping.fromVertx("test").toCamel("direct:start")));

camel.start();
bridge.start();


vertx.eventBus().request("test", "hello", reply -> {
  // 来自路由的回复(这里是“OK”)
});

如果您在事件总线上发送消息时注册了回复的处理器,则它将 Camel 交换 配置为期望收到响应 (它使用EIP的请求-响应模式),响应在回复的消息体中。 如果路由失败,您将收到一个失败的回复(收件人失败),并伴随以下消息:

camel.addRoutes(new RouteBuilder() {
  @Override
  public void configure() throws Exception {
    from("direct:my-route")
        .to("http://localhost:8080");
  }
});

CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
    .addOutboundMapping(OutboundMapping.fromVertx("camel-route").toCamel("direct:my-route")));

camel.start();
bridge.start();

vertx.eventBus().request("camel-route", "hello", reply -> {
  if (reply.succeeded()) {
    Object theResponse = reply.result().body();
  } else {
    Throwable theCause = reply.cause();
  }
});

如果您正在执行阻塞的逻辑,您必须blocking 设置为 true 。这样可以避免在 event loop 线程执行相应逻辑。

camel.addRoutes(new RouteBuilder() {
  @Override
  public void configure() throws Exception {
    from("direct:my-route")
      .process(new Processor() {
        @Override
        public void process(Exchange exchange) throws Exception {
          // 执行阻塞逻辑……
        }
      })
      .to("http://localhost:8080");
  }
});

CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
  .addOutboundMapping(OutboundMapping.fromVertx("camel-route").toCamel("direct:my-route").setBlocking(true)));

camel.start();
bridge.start();

vertx.eventBus().request("camel-route", "hello", reply -> {
  if (reply.succeeded()) {
    Object theResponse = reply.result().body();
  } else {
    Throwable theCause = reply.cause();
  }
});

默认情况下,它使用默认的工作线程池,您也可以通过 setWorkerExecutor 方法来自定义。

停止桥接

别忘记使用 stop 方法来停止桥接。 stop 方法是异步的,你可以使用 stop 方法注册桥接结束时的回调。

交换自定义对象

如果您需要发送或者接收自定义的对象,您需要在事件总线上注册编码/解码器。

vertx.eventBus().registerDefaultCodec(Person.class, codec);