Vert.x 断路器
Circuit Breaker 是Vert.x 断路 模式 的实现。 它用来追踪故障,当失败次数达到阈值时 触发断路 ,并提供可选择的失败回退。
支持以下故障:
-
在
Future
内记录的失败 -
代码里主动抛出异常
-
没有完成的
Future
(即超时)。
断路器要旨是保障其操作是非阻塞且异步的, 以受益于Vert.x 执行模型。
准备工作
使用Vert.x Circuit Breaker之前, 你必须在你的项目中添加如下 依赖 :
-
Maven(在您的
pom.xml
文件中):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-circuit-breaker</artifactId>
<version>4.1.8</version>
</dependency>
-
Gradle(在您的
build.gradle
文件中):
compile 'io.vertx:vertx-circuit-breaker:4.1.8'
使用断路器
使用断路器需要按以下步骤进行:
-
创建一个断路器,并配置成你所需要的超时,最大故障次数等参数
-
使用断路器执行代码
重要!!! 断路器应该是稳定的单例,而不是每次使用就重新创建它。推荐将该单例存放在某个领域中。
例子:
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions()
.setMaxFailures(5) // 最大失败数
.setTimeout(2000) // 超时时间
.setFallbackOnFailure(true) // 失败后是否调用回退函数(fallback)
.setResetTimeout(10000) // 在开启状态下,尝试重试之前所需时间
);
// ---
// 将断路器存放在某个领域中并像如下方式使用
// ---
breaker.execute(promise -> {
// 在断路器中执行的代码
// 这里的代码可以成功或者失败,
// 如果该 promise 在这里被标记为失败,断路器将自增失败数
}).onComplete(ar -> {
// 处理结果.
});
execute代码块接收 Future
对象参数,以标识该操作以及结果的失败或成功。
例如:在下面的例子中,
对应的结果就是REST调用的输出:
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
);
// ---
// Store the circuit breaker in a field and access it as follows
// ---
breaker.<String>execute(promise -> {
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(req -> req
.send()
.compose(resp -> {
if (resp.statusCode() != 200) {
return Future.failedFuture("HTTP error");
} else {
return resp.body().map(Buffer::toString);
}
})).onComplete(promise);
}).onComplete(ar -> {
// 处理结果
});
操作的结果以下面的方式提供:
也可以增加一个可选参数,用于断路时进行失败回退(fallback):
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
);
// ---
// 断路器会临时存储该次运行结果,用于断路判断
// ---
breaker.executeWithFallback(
promise -> {
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(req -> req
.send()
.compose(resp -> {
if (resp.statusCode() != 200) {
return Future.failedFuture("HTTP error");
} else {
return resp.body().map(Buffer::toString);
}
})).onComplete(promise);
}, v -> {
// 当断路器断路时,返回Hello
return "Hello";
})
.onComplete(ar -> {
// 处理结果
});
每当断路器断路的时候,都会调用失败回退(fallback),也可以调用
isFallbackOnFailure
方法开启失败回退。
当回退函数被成功设置之后,回退函数将会接收 Throwable
对象为参数并返回预期类型。
通过 CircuitBreaker
直接设置失败回退方法:
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
).fallback(v -> {
// 当断路器断路时将调用此处代码
return "hello";
});
breaker.<String>execute(
promise -> {
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(req -> req
.send()
.compose(resp -> {
if (resp.statusCode() != 200) {
return Future.failedFuture("HTTP error");
} else {
return resp.body().map(Buffer::toString);
}
})).onComplete(promise);
});
重试
还可以通过 setMaxRetries
.
设置重试次数,如设置大于0的数值,失败的情况下会重试,直到重试次数等于该数值,如果其中一次重试成功,
那么会跳过剩下的重试。
注意 如您设置最大重试次数 maxRetries
为 2,那么您的代码在失败的情况将会执行3次,分别为初次请求,
以及 2 次重试。
在默认情况下超时时间(timeout)和重试次数(retries)为0,那么将会无延时地一直请求下去,这会导致调用服务负载增加
并导致服务恢复时间延长。所以为了减少这种情况,建议设置延时以及重试次数。
方法 retryPolicy
用于设置重试策略。该方法接收一个Function<Integer,Long>的函数体(传入参数为重试次数,返回具体超时时间,单位:毫秒),
允许用户定制更加复杂的延时策略,例如:带抖动的延时补偿。
下面是设置了重试策略的例子,重试超时时间与重试时间呈线指数增长。
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions().setMaxFailures(5).setMaxRetries(5).setTimeout(2000)
).openHandler(v -> {
System.out.println("Circuit opened");
}).closeHandler(v -> {
System.out.println("Circuit closed");
}).retryPolicy(retryCount -> retryCount * 100L);
breaker.<String>execute(
promise -> {
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(req -> req
.send()
.compose(resp -> {
if (resp.statusCode() != 200) {
return Future.failedFuture("HTTP error");
} else {
return resp.body().map(Buffer::toString);
}
})).onComplete(promise);
});
回调
您可以配置断路开启或关闭时的回调函数:
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
).openHandler(v -> {
System.out.println("Circuit opened");
}).closeHandler(v -> {
System.out.println("Circuit closed");
});
breaker.<String>execute(
promise -> {
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(req -> req
.send()
.compose(resp -> {
if (resp.statusCode() != 200) {
return Future.failedFuture("HTTP error");
} else {
return resp.body().map(Buffer::toString);
}
})).onComplete(promise);
});
当断路器决定尝试复位的时候( half-open 状态),我们也可以注册 halfOpenHandler
的回调从而得到回调通知。
事件总线通知
每当断路器发生状态改变的时候,断路器都会在事件总线上推送通知,总线通默认地址为:vertx.circuit-breaker
。
当然这个也是可以配置的,调用方法
setNotificationAddress
. If null
is
你可以设置总线通知地址。如果设置为 null
那么总线通知将被禁用。
每个总线通知都会包含一个 Json Object对象,该对象包括以下字段:
-
state
: 断路器最新的状态(OPEN
,CLOSED
,HALF_OPEN
) -
name
: 断路器的名称 -
failures
: 错误次数 -
node
: 节点标识 (如果事件总线并非运行在集群模式中,那么该值为:local
)
半开状态
当断路器处于开路状态时,对其调用会立即失败,不会执行实际操作。经过适当的时间 (通过
setResetTimeout
配置),
断路器决定是否恢复状态,此时进入半开启状态(half-open state)。在这种状态下,
允许下一次断路器的调用实际调用如果成功,断路器将复位并返回到关闭状态,
回归正常的模式;但是如果这次调用失败,则断路器返回到断路状态,直到下次半开状态。
异常
回退函数将会接收到:
-
当断路器开启时,会抛出
OpenCircuitException
-
当操作超时,会抛出
TimeoutException
将断路器指标推送到Hystrix看板(Dashboard)
Netflix Hystrix带有一个看板(dashboard),用于显示断路器的当前状态。 Vert.x 断路器可以发布其指标(metric),以供Hystrix 仪表板使用。 Hystrix 仪表板需要一个发送指标的SSE流,
此流由 HystrixMetricHandler
该 Vert.x Web 处理器所提供:
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx);
CircuitBreaker breaker2 = CircuitBreaker.create("my-second-circuit-breaker", vertx);
// 创建 Vert.x Web 路由
Router router = Router.router(vertx);
// 注册指标Handler
router.get("/hystrix-metrics").handler(HystrixMetricHandler.create(vertx));
// / 创建HTTP服务器,并分配路由
vertx.createHttpServer()
.requestHandler(router)
.listen(8080);
在Hystrix 看板, 配置 stream url 地址,例如: http://localhost:8080/metrics
. 现在就可以获取Vert.x的断路器指标了。
注意:这些指标量是由 Vert.x Web Handler 使用 Event Bus 事件通知收集。 如您不想使用默认的通知地址,请在创建的时候设置。
使用 Netflix Hystrix
Hystrix 提供了断路器模式的实现。可以在Vert.x中使用Hystrix提供的断路器或组合使用。 本节介绍在Vert.x应用程序中使用Hystrix的技巧。
首先,您需要将Hystrix添加到你的依赖中。 详细信息请参阅Hystrix页面。然后,您需要使用 Command 隔离“受保护的”调用。 您可以这样执行之:
HystrixCommand<String> someCommand = getSomeCommandInstance();
String result = someCommand.execute();
但是,代码执行是阻塞的,所以需要使用 executeBlocking
方法执行,
或在Worker Verticle中调用:
HystrixCommand<String> someCommand = getSomeCommandInstance();
vertx.<String>executeBlocking(
future -> future.complete(someCommand.execute()),
ar -> {
// 回到Event Loop线程中
String result = ar.result();
}
);
如果你使用了Hystrix异步方法, 对应回调函数是不会在Vert.x的线程中执行的,因此我们必须在执行前保持上下的引用,
(使用 getOrCreateContext
方法),
执行
runOnContext
方法将当前线程切换回Event Loop线程。
不这样做的话,您将失去Vert.x异步模型的优势,并且必须自行管理线程同步和执行顺序:
vertx.runOnContext(v -> {
Context context = vertx.getOrCreateContext();
HystrixCommand<String> command = getSomeCommandInstance();
command.observe().subscribe(result -> {
context.runOnContext(v2 -> {
// 回到Vert.x Context下(Event Loop线程或Worker线程)
String r = result;
});
});
});