Vert.x Camel 桥接(Bridge)
Apache Camel (http://camel.apache.org) 是一个开源的Java框架, 它致力于让开发人员更简单的集成和更方便的使用。此桥接允许 Vert.x 程序与 Camel 端点进行交互:
-
向 Camel 发送消息。
-
接收来自 Camel 的消息。
此桥接依赖于 Vert.x 的事件总线(Event Bus,后面统一称 事件总线
),并将事件总线地址与 Camel 端点进行关联。
小心
|
此组件不是 多语言 的,因为它依赖的 Camel 中的某些类只能在Java中使用。 |
使用 vertx-camel-bridge
使用 Vert.x Camel 桥接请将以下依赖项添加到您的 依赖 中:
-
Maven ( 在
pom.xml
文件中 ):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-camel-bridge</artifactId>
<version>4.3.8</version>
</dependency>
-
Gradle ( 在您的
build.gradle
文件中 ):
compile 'io.vertx:vertx-camel-bridge:4.3.8'
配置桥接
在使用桥接之前,需要对其进行配置:
CamelContext camel = new DefaultCamelContext();
CamelBridge.create(vertx,
new CamelBridgeOptions(camel)
.addInboundMapping(InboundMapping.fromCamel("direct:stuff").toVertx("eventbus-address"))
.addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel("stream:out"))
).start();
此桥接需要一个 CamelContext
上下文,它将从上下文中找到端点。桥接在使用之前,要确保已经启动。 需要注意的是 start
方法是异步的,你可以使用
start
方法来注册桥接启动时的回调。
入站映射
入站映射将 Camel 的端点关联到对应事件总线地址上。 在此端点上接收到的消息将被转换成事件总线的消息。
Endpoint endpoint = camel.getEndpoint("direct:foo");
CamelBridge.create(vertx,
new CamelBridgeOptions(camel)
.addInboundMapping(InboundMapping.fromCamel("direct:stuff").toVertx("eventbus-address"))
.addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address"))
.addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address")
.withoutHeadersCopy())
.addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address")
.usePublish())
.addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address")
.withBodyType(String.class))
);
上面的代码展示了配置入站映射的不同方法:
-
您可以使用
Endpoint
对象或对应的 url 来配置 Camel 端点。 -
您可以禁用 header 头的复制( Camel 消息头将会被复制到事件总线的消息中)。
-
您可以使用
publish
代替send
来将消息广播给所有事件总线的消费者。 -
您可以配置事件总线消息体的类型。如果您未配置,则默认使用 Camel消息负载。 如果您配置了,它将在 Camel 上下文中查找 Camel 消息有效负载和所需要类型之间的转换器。
注意: org.fusesource.hawtbuf.Buffer
会自动转换成 Buffer
。
如果您调用了 send
(而不是 publish
)方法,并且 Camel 交换 期望收到回复 ( In Out 交换),
那么 Vert.x 代码应回复接受到的消息。回复的消息会被传播到 Camel 交换:
Endpoint endpoint = camel.getEndpoint("direct:stuff");
CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
.addInboundMapping(new InboundMapping().setAddress("test-reply").setEndpoint(endpoint)));
vertx.eventBus().consumer("with-reply", message -> {
message.reply("How are you ?");
});
camel.start();
bridge.start();
ProducerTemplate template = camel.createProducerTemplate();
Future<Object> future = template.asyncRequestBody(endpoint, "hello");
String response = template.extractFutureBody(future, String.class);
您还可以通过 setTimeout
方法来配置回复的 超时
。
出站映射
出站映射将事件总线地址关联到 Camel 的端点上。 在此地址上接收到的消息将被转换成发送到 Camel 对应端点的消息
Endpoint endpoint = camel.getEndpoint("stream:out");
CamelBridge.create(vertx,
new CamelBridgeOptions(camel)
.addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel("stream:out"))
.addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel(endpoint))
.addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel(endpoint)
.withoutHeadersCopy())
.addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel(endpoint))
);
上面的示例展示了配置出站映射的不同方法。
您可以将出站映射链接到 Camel 路由上:
camel.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
.transform(constant("OK"));
}
});
CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
.addOutboundMapping(OutboundMapping.fromVertx("test").toCamel("direct:start")));
camel.start();
bridge.start();
vertx.eventBus().request("test", "hello", reply -> {
// 来自路由的回复(这里是“OK”)
});
如果您在事件总线上发送消息时注册了回复的处理器,则它将 Camel 交换 配置为期望收到响应 (它使用EIP的请求-响应模式),响应在回复的消息体中。 如果路由失败,您将收到一个失败的回复(收件人失败),并伴随以下消息:
camel.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:my-route")
.to("http://localhost:8080");
}
});
CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
.addOutboundMapping(OutboundMapping.fromVertx("camel-route").toCamel("direct:my-route")));
camel.start();
bridge.start();
vertx.eventBus().request("camel-route", "hello", reply -> {
if (reply.succeeded()) {
Object theResponse = reply.result().body();
} else {
Throwable theCause = reply.cause();
}
});
如果您正在执行阻塞的逻辑,您必须将 blocking 设置为 true
。这样可以避免在
event loop 线程执行相应逻辑。
camel.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:my-route")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
// 执行阻塞逻辑……
}
})
.to("http://localhost:8080");
}
});
CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
.addOutboundMapping(OutboundMapping.fromVertx("camel-route").toCamel("direct:my-route").setBlocking(true)));
camel.start();
bridge.start();
vertx.eventBus().request("camel-route", "hello", reply -> {
if (reply.succeeded()) {
Object theResponse = reply.result().body();
} else {
Throwable theCause = reply.cause();
}
});
默认情况下,它使用默认的工作线程池,您也可以通过
setWorkerExecutor
方法来自定义。
停止桥接
别忘记使用 stop
方法来停止桥接。 stop
方法是异步的,你可以使用
stop
方法注册桥接结束时的回调。