Vert.x 断路器
Circuit Breaker 是Vert.x 断路 模式 的实现。 它用来追踪故障,当失败次数达到阈值时 触发断路 ,并提供可选择的失败回退。
支持以下故障:
-
在
Future
内记录的失败 -
代码里主动抛出异常
-
没有完成的
Future
(即超时)。
断路器要旨是保障其操作是非阻塞且异步的, 以受益于Vert.x 执行模型。
准备工作
使用Vert.x Circuit Breaker之前, 您必须在您的项目中添加如下 依赖 :
-
Maven(在您的
pom.xml
文件中):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-circuit-breaker</artifactId>
<version>4.2.7</version>
</dependency>
-
Gradle(在您的
build.gradle
文件中):
compile 'io.vertx:vertx-circuit-breaker:4.2.7'
使用断路器
使用断路器需要按以下步骤进行:
-
创建一个断路器,并配置您所需要的超时,最大故障次数等参数
-
使用断路器执行代码
重要!!! 断路器应该是稳定的单例,而不是每次使用就重新创建它。推荐将该单例存放在某个字段中。
例子:
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions()
.setMaxFailures(5) // 最大失败数
.setTimeout(2000) // 超时时间
.setFallbackOnFailure(true) // 失败后是否调用回退函数(fallback)
.setResetTimeout(10000) // 在开启状态下,尝试重试之前所需时间
);
// ---
// 将断路器存放在某个字段中并像如下方式使用
// ---
breaker.execute(promise -> {
// 在断路器中执行的代码
// 这里的代码可以成功或者失败,
// 如果该 promise 在这里被标记为失败,断路器将自增失败数
}).onComplete(ar -> {
// 处理结果.
});
execute 代码块接收 Future
对象参数,以标识该操作以及结果的失败或成功。
例如:在下面的例子中,
对应的结果就是 REST 调用的输出:
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
);
// ---
// 将断路器存放在某个字段中并像如下方式使用
// ---
breaker.<String>execute(promise -> {
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(req -> req
.send()
.compose(resp -> {
if (resp.statusCode() != 200) {
return Future.failedFuture("HTTP error");
} else {
return resp.body().map(Buffer::toString);
}
})).onComplete(promise);
}).onComplete(ar -> {
// 处理结果
});
操作的结果以下面的方式提供:
也可以增加一个可选参数,用于断路时进行失败回退(fallback):
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
);
// ---
// 断路器会临时存储该次运行结果,用于断路判断
// ---
breaker.executeWithFallback(
promise -> {
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(req -> req
.send()
.compose(resp -> {
if (resp.statusCode() != 200) {
return Future.failedFuture("HTTP error");
} else {
return resp.body().map(Buffer::toString);
}
})).onComplete(promise);
}, v -> {
// 当断路器断路时,返回Hello
return "Hello";
})
.onComplete(ar -> {
// 处理结果
});
每当断路器断路的时候,都会调用失败回退(fallback),也可以调用
isFallbackOnFailure
方法开启失败回退。
当回退函数被成功设置之后,回退函数将会接收 Throwable
对象为参数并返回预期类型。
通过 CircuitBreaker
直接设置失败回退方法:
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
).fallback(v -> {
// 当断路器断路时将调用此处代码
return "hello";
});
breaker.<String>execute(
promise -> {
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(req -> req
.send()
.compose(resp -> {
if (resp.statusCode() != 200) {
return Future.failedFuture("HTTP error");
} else {
return resp.body().map(Buffer::toString);
}
})).onComplete(promise);
});
重试
还可以通过 setMaxRetries
.
设置重试次数,如设置大于0的数值,失败的情况下会重试,直到重试次数等于该数值,如果其中一次重试成功,
那么会跳过剩下的重试。
注意 如您设置最大重试次数 maxRetries
为 2,那么您的代码在失败的情况将会执行3次,分别为初次请求,
以及 2 次重试。
在默认情况下超时时间(timeout)和重试次数(retries)为0,那么将会无延时地一直请求下去,这会导致调用服务负载增加
并导致服务恢复时间延长。所以为了减少这种情况,建议设置延时以及重试次数。
方法 retryPolicy
用于设置重试策略。该方法接收一个Function<Integer,Long>的函数体(传入参数为重试次数,返回具体超时时间,单位:毫秒),
允许用户定制更加复杂的延时策略,例如:带抖动的延时补偿。
下面是设置了重试策略的例子,重试超时时间与重试时间呈线指数增长。
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions().setMaxFailures(5).setMaxRetries(5).setTimeout(2000)
).openHandler(v -> {
System.out.println("Circuit opened");
}).closeHandler(v -> {
System.out.println("Circuit closed");
}).retryPolicy(retryCount -> retryCount * 100L);
breaker.<String>execute(
promise -> {
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(req -> req
.send()
.compose(resp -> {
if (resp.statusCode() != 200) {
return Future.failedFuture("HTTP error");
} else {
return resp.body().map(Buffer::toString);
}
})).onComplete(promise);
});
回调
您可以配置断路开启或关闭时的回调函数:
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
).openHandler(v -> {
System.out.println("Circuit opened");
}).closeHandler(v -> {
System.out.println("Circuit closed");
});
breaker.<String>execute(
promise -> {
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(req -> req
.send()
.compose(resp -> {
if (resp.statusCode() != 200) {
return Future.failedFuture("HTTP error");
} else {
return resp.body().map(Buffer::toString);
}
})).onComplete(promise);
});
当断路器决定尝试复位的时候( half-open 状态),我们也可以注册 halfOpenHandler
的回调从而得到回调通知。
事件总线通知
每当断路器发生状态改变的时候,断路器都会在事件总线上推送通知,总线通默认地址为:vertx.circuit-breaker
。
当然这个也是可以配置的,您可以调用方法
setNotificationAddress
来设置总线通知地址。如果您设置为 null
,那么总线通知将被禁用。
每个总线通知都会包含一个 Json Object 对象,该对象包括以下字段:
-
state
: 断路器最新的状态(OPEN
,CLOSED
,HALF_OPEN
) -
name
: 断路器的名称 -
failures
: 错误次数 -
node
: 节点标识 (如果事件总线并非运行在集群模式中,那么该值为:local
)
半开状态
当断路器处于开路状态时,对其调用会立即失败,不会执行实际操作。经过适当的时间 (通过
setResetTimeout
配置),
断路器决定是否恢复状态,此时进入半开启状态(half-open state)。在这种状态下,
允许下一次断路器的调用实际调用如果成功,断路器将复位并返回到关闭状态,
回归正常的模式;但是如果这次调用失败,则断路器返回到断路状态,直到下次半开状态。
异常
回退函数将会接收到:
-
当断路器开启时,会抛出
OpenCircuitException
-
当操作超时,会抛出
TimeoutException
将断路器指标推送到Hystrix看板(Dashboard)
Netflix Hystrix 带有一个看板(dashboard),用于显示断路器的当前状态。 Vert.x 断路器可以发布其指标(metric),以供 Hystrix 仪表板使用。 Hystrix 仪表板需要一个发送指标的 SSE 流,
此流由 HystrixMetricHandler
该 Vert.x Web 处理器所提供:
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx);
CircuitBreaker breaker2 = CircuitBreaker.create("my-second-circuit-breaker", vertx);
// 创建 Vert.x Web 路由
Router router = Router.router(vertx);
// 注册指标Handler
router.get("/hystrix-metrics").handler(HystrixMetricHandler.create(vertx));
// / 创建HTTP服务器,并分配路由
vertx.createHttpServer()
.requestHandler(router)
.listen(8080);
在 Hystrix 看板, 配置 stream url 地址,例如: http://localhost:8080/metrics
. 现在就可以获取 Vert.x 的断路器指标了。
注意:这些指标量是由 Vert.x Web Handler 使用 Event Bus 事件通知收集。 如您不想使用默认的通知地址,请在创建的时候设置。
使用 Netflix Hystrix
Hystrix 提供了断路器模式的实现。可以在 Vert.x 中使用 Hystrix 提供的断路器或组合使用。 本节介绍在 Vert.x 应用程序中使用 Hystrix 的技巧。
首先,您需要将 Hystrix 添加到您的依赖中。 详细信息请参阅 Hystrix 页面。然后,您需要使用 Command 隔离“受保护的”调用。 您可以这样执行之:
HystrixCommand<String> someCommand = getSomeCommandInstance();
String result = someCommand.execute();
但是,代码执行是阻塞的,所以需要使用 executeBlocking
方法执行,
或在 Worker Verticle 中调用:
HystrixCommand<String> someCommand = getSomeCommandInstance();
vertx.<String>executeBlocking(
future -> future.complete(someCommand.execute()),
ar -> {
// 回到Event Loop线程中
String result = ar.result();
}
);
如果您使用了 Hystrix 异步方法, 对应回调函数是不会在 Vert.x 的线程中执行的,因此我们必须在执行前保持上下的引用,
(使用 getOrCreateContext
方法),
执行
runOnContext
方法将当前线程切换回Event Loop线程。
不这样做的话,您将失去Vert.x异步模型的优势,并且必须自行管理线程同步和执行顺序:
vertx.runOnContext(v -> {
Context context = vertx.getOrCreateContext();
HystrixCommand<String> command = getSomeCommandInstance();
command.observe().subscribe(result -> {
context.runOnContext(v2 -> {
// 回到 Vert.x Context 下(Event Loop线程或Worker线程)
String r = result;
});
});
});