Vert.x 断路器
Circuit Breaker 是Vert.x 断路 模式 的实现。 它用来追踪故障,当失败次数达到阈值时 触发断路 ,并提供可选择的失败回退。
支持以下故障:
-
在
Future
内记录的失败 -
代码里主动抛出异常
-
没有完成的
Future
(即超时)。
断路器要旨是保障其操作是非阻塞且异步的, 以受益于Vert.x 执行模型。
准备工作
使用Vert.x Circuit Breaker之前, 您必须在您的项目中添加如下 依赖 :
-
Maven(在您的
pom.xml
文件中):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-circuit-breaker</artifactId>
<version>4.3.8</version>
</dependency>
-
Gradle(在您的
build.gradle
文件中):
compile 'io.vertx:vertx-circuit-breaker:4.3.8'
使用断路器
使用断路器需要按以下步骤进行:
-
创建一个断路器,并配置您所需要的超时,最大故障次数等参数
-
使用断路器执行代码
重要!!! 断路器应该是稳定的单例,而不是每次使用就重新创建它。推荐将该单例存放在某个字段中。
例子:
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions()
.setMaxFailures(5) // 最大失败数
.setTimeout(2000) // 超时时间
.setFallbackOnFailure(true) // 失败后是否调用回退函数(fallback)
.setResetTimeout(10000) // 在开启状态下,尝试重试之前所需时间
);
// ---
// 将断路器存放在某个字段中并像如下方式使用
// ---
breaker.execute(promise -> {
// 在断路器中执行的代码
// 这里的代码可以成功或者失败,
// 如果该 promise 在这里被标记为失败,断路器将自增失败数
}).onComplete(ar -> {
// 处理结果.
});
execute 代码块接收 Future
对象参数,以标识该操作以及结果的失败或成功。
例如:在下面的例子中,
对应的结果就是 REST 调用的输出:
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
);
// ---
// 将断路器存放在某个字段中并像如下方式使用
// ---
breaker.<String>execute(promise -> {
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(req -> req
.send()
.compose(resp -> {
if (resp.statusCode() != 200) {
return Future.failedFuture("HTTP error");
} else {
return resp.body().map(Buffer::toString);
}
})).onComplete(promise);
}).onComplete(ar -> {
// 处理结果
});
操作的结果以下面的方式提供:
也可以增加一个可选参数,用于断路时进行失败回退(fallback):
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
);
// ---
// 断路器会临时存储该次运行结果,用于断路判断
// ---
breaker.executeWithFallback(
promise -> {
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(req -> req
.send()
.compose(resp -> {
if (resp.statusCode() != 200) {
return Future.failedFuture("HTTP error");
} else {
return resp.body().map(Buffer::toString);
}
})).onComplete(promise);
}, v -> {
// 当断路器断路时,返回Hello
return "Hello";
})
.onComplete(ar -> {
// 处理结果
});
每当断路器断路的时候,都会调用失败回退(fallback),也可以调用
isFallbackOnFailure
方法开启失败回退。
当回退函数被成功设置之后,回退函数将会接收 Throwable
对象为参数并返回预期类型。
通过 CircuitBreaker
直接设置失败回退方法:
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
).fallback(v -> {
// 当断路器断路时将调用此处代码
return "hello";
});
breaker.<String>execute(
promise -> {
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(req -> req
.send()
.compose(resp -> {
if (resp.statusCode() != 200) {
return Future.failedFuture("HTTP error");
} else {
return resp.body().map(Buffer::toString);
}
})).onComplete(promise);
});
重试
您还可以通过 setMaxRetries
方法设置重试频率。
如果您设置大于0的数值,失败的情况下会重试,直到重试次数等于该数值,
如果其中一次重试成功,那么会跳过剩下的重试。
仅当断路器关闭时才会支持重试。
注意
|
如您设置最大重试次数 maxRetries 为 2,那么您的代码在失败的情况将会执行3次:分别为初次请求,以及 2 次重试。
|
在默认情况下,多次重试之间的超时时间为0,这意味着重试将没有延时的一个接一个地执行, 然而,这会导致调用服务负载增加,并导致服务恢复时间延长。 所以为了减少这种情况,建议设置延时后再重试。
方法 retryPolicy
用于设置重试策略。
重试策略是一个以接收操作失败和重试计数为参数的函数,并在重试前以毫秒为单位返回超时时间。
它也允许用户定制更加复杂的延时策略,例如:使用Retry-After不可用服务发送的标头值。
同时提供了一些通用策略:RetryPolicy.constantDelay
, RetryPolicy.linearDelay
和 RetryPolicy.exponentialDelayWithJitter
。
下面是一个带有抖动的指数延迟示例:
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions().setMaxFailures(5).setMaxRetries(5).setTimeout(2000)
).openHandler(v -> {
System.out.println("Circuit opened");
}).closeHandler(v -> {
System.out.println("Circuit closed");
}).retryPolicy(RetryPolicy.exponentialDelayWithJitter(50, 500));
breaker.<String>execute(
promise -> {
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(req -> req
.send()
.compose(resp -> {
if (resp.statusCode() != 200) {
return Future.failedFuture("HTTP error");
} else {
return resp.body().map(Buffer::toString);
}
})).onComplete(promise);
});
回调
您可以配置断路开启或关闭时的回调函数:
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
).openHandler(v -> {
System.out.println("Circuit opened");
}).closeHandler(v -> {
System.out.println("Circuit closed");
});
breaker.<String>execute(
promise -> {
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(req -> req
.send()
.compose(resp -> {
if (resp.statusCode() != 200) {
return Future.failedFuture("HTTP error");
} else {
return resp.body().map(Buffer::toString);
}
})).onComplete(promise);
});
当断路器决定尝试复位的时候( half-open 状态),我们也可以注册 halfOpenHandler
的回调从而得到回调通知。
事件总线通知
每次断路器状态发生变化,都会在事件总线上发布一个事件。
要启用此功能,请将 notification address
设置为非 null
值:
options.setNotificationAddress(CircuitBreakerOptions.DEFAULT_NOTIFICATION_ADDRESS);
该事件中包含了断路器的指标,其计算需要您将以下依赖项添加到构建描述符中的 依赖管理 部分:
-
Maven (在您的
pom.xml
文件中):
<dependency>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
<version>2.1.12</version>
</dependency>
-
Gradle (在您的
build.gradle
文件中):
compile 'org.hdrhistogram:HdrHistogram:2.1.12'
注意
|
启用后,默认情况下,通知仅传递给本地消费者。
如果必须将通知发送给集群中的所有消费者,您可通过设置 |
每个总线通知都会包含一个 Json Object 对象,该对象包括以下字段:
-
state
: 断路器最新的状态(OPEN
,CLOSED
,HALF_OPEN
) -
name
: 断路器的名称 -
failures
: 错误次数 -
node
: 节点标识 (如果事件总线并非运行在集群模式中,那么该值为:local
) -
指标信息
半开状态
当断路器处于开路状态时,对其调用会立即失败,不会执行实际操作。经过适当的时间 (通过
setResetTimeout
配置),
断路器决定是否恢复状态,此时进入半开启状态(half-open state)。在这种状态下,
允许下一次断路器的调用实际调用如果成功,断路器将复位并返回到关闭状态,
回归正常的模式;但是如果这次调用失败,则断路器返回到断路状态,直到下次半开状态。
异常
回退函数将会接收到:
-
当断路器开启时,会抛出
OpenCircuitException
-
当操作超时,会抛出
TimeoutException
将断路器指标推送到Hystrix看板(Dashboard)
Netflix Hystrix 带有一个看板(dashboard),用于显示断路器的当前状态。 Vert.x 断路器可以发布其指标(metric),以供 Hystrix 仪表板使用。 Hystrix 仪表板需要一个发送指标的 SSE 流,
此流由 HystrixMetricHandler
该 Vert.x Web 处理器所提供:
CircuitBreakerOptions options = new CircuitBreakerOptions()
.setNotificationAddress(CircuitBreakerOptions.DEFAULT_NOTIFICATION_ADDRESS);
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx, new CircuitBreakerOptions(options));
CircuitBreaker breaker2 = CircuitBreaker.create("my-second-circuit-breaker", vertx, new CircuitBreakerOptions(options));
// 创建 Vert.x Web 路由
Router router = Router.router(vertx);
// 注册指标Handler
router.get("/hystrix-metrics").handler(HystrixMetricHandler.create(vertx));
// / 创建HTTP服务器,并分配路由
vertx.createHttpServer()
.requestHandler(router)
.listen(8080);
在 Hystrix 看板, 配置 stream url 地址,例如: http://localhost:8080/metrics
. 现在就可以获取 Vert.x 的断路器指标了。
重要
|
这些指标量是由 Vert.x Web Handler 使用 事件总线(Event Bus)通知 收集。 您必须启用该功能并且,同时如您不想使用默认的通知地址,请在创建的时候设置。 |
使用 Netflix Hystrix
Hystrix 提供了断路器模式的实现。可以在 Vert.x 中使用 Hystrix 提供的断路器或组合使用。 本节介绍在 Vert.x 应用程序中使用 Hystrix 的技巧。
首先,您需要将 Hystrix 添加到您的依赖中。 详细信息请参阅 Hystrix 页面。然后,您需要使用 Command 隔离“受保护的”调用。 您可以这样执行之:
HystrixCommand<String> someCommand = getSomeCommandInstance();
String result = someCommand.execute();
但是,代码执行是阻塞的,所以需要使用 executeBlocking
方法执行,
或在 Worker Verticle 中调用:
HystrixCommand<String> someCommand = getSomeCommandInstance();
vertx.<String>executeBlocking(
future -> future.complete(someCommand.execute()),
ar -> {
// 回到Event Loop线程中
String result = ar.result();
}
);
如果您使用了 Hystrix 异步方法, 对应回调函数是不会在 Vert.x 的线程中执行的,因此我们必须在执行前保持上下的引用,
(使用 getOrCreateContext
方法),
执行
runOnContext
方法将当前线程切换回Event Loop线程。
不这样做的话,您将失去Vert.x异步模型的优势,并且必须自行管理线程同步和执行顺序:
vertx.runOnContext(v -> {
Context context = vertx.getOrCreateContext();
HystrixCommand<String> command = getSomeCommandInstance();
command.observe().subscribe(result -> {
context.runOnContext(v2 -> {
// 回到 Vert.x Context 下(Event Loop线程或Worker线程)
String r = result;
});
});
});