Vert.x 断路器

Circuit Breaker 是Vert.x 断路 模式 的实现。 它用来追踪故障,当失败次数达到阈值时 触发断路 ,并提供可选择的失败回退。

支持以下故障:

  • Future 内记录的失败

  • 代码里主动抛出异常

  • 没有完成的 Future(即超时)。

断路器要旨是保障其操作是非阻塞且异步的, 以受益于Vert.x 执行模型。

准备工作

使用Vert.x Circuit Breaker之前, 您必须在您的项目中添加如下 依赖

  • Maven(在您的 pom.xml 文件中):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-circuit-breaker</artifactId>
 <version>4.4.0</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中):

compile 'io.vertx:vertx-circuit-breaker:4.4.0'

使用断路器

使用断路器需要按以下步骤进行:

  1. 创建一个断路器,并配置您所需要的超时,最大故障次数等参数

  2. 使用断路器执行代码

重要!!! 断路器应该是稳定的单例,而不是每次使用就重新创建它。推荐将该单例存放在某个字段中。

例子:

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
    new CircuitBreakerOptions()
        .setMaxFailures(5) // 最大失败数
        .setTimeout(2000) // 超时时间
        .setFallbackOnFailure(true) // 失败后是否调用回退函数(fallback)
        .setResetTimeout(10000) // 在开启状态下,尝试重试之前所需时间
);

// ---
// 将断路器存放在某个字段中并像如下方式使用
// ---

breaker.execute(promise -> {
  // 在断路器中执行的代码
  // 这里的代码可以成功或者失败,
  // 如果该 promise 在这里被标记为失败,断路器将自增失败数
}).onComplete(ar -> {
  // 处理结果.
});

execute 代码块接收 Future 对象参数,以标识该操作以及结果的失败或成功。 例如:在下面的例子中, 对应的结果就是 REST 调用的输出:

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
    new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
);

// ---
// 将断路器存放在某个字段中并像如下方式使用
// ---

breaker.<String>execute(promise -> {
  vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
    .compose(req -> req
      .send()
      .compose(resp -> {
        if (resp.statusCode() != 200) {
          return Future.failedFuture("HTTP error");
        } else {
          return resp.body().map(Buffer::toString);
        }
      })).onComplete(promise);
}).onComplete(ar -> {
  // 处理结果
});

操作的结果以下面的方式提供:

  • 调用 execute 方法时,返回 Future

  • 调用 executeAndReport 方法时,提供 Future 参数

也可以增加一个可选参数,用于断路时进行失败回退(fallback):

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
    new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
);

// ---
// 断路器会临时存储该次运行结果,用于断路判断
// ---

breaker.executeWithFallback(
    promise -> {
      vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
        .compose(req -> req
          .send()
          .compose(resp -> {
            if (resp.statusCode() != 200) {
              return Future.failedFuture("HTTP error");
            } else {
              return resp.body().map(Buffer::toString);
            }
          })).onComplete(promise);
      }, v -> {
      // 当断路器断路时,返回Hello
      return "Hello";
    })
    .onComplete(ar -> {
        // 处理结果
    });

每当断路器断路的时候,都会调用失败回退(fallback),也可以调用 isFallbackOnFailure 方法开启失败回退。 当回退函数被成功设置之后,回退函数将会接收 Throwable 对象为参数并返回预期类型。

通过 CircuitBreaker 直接设置失败回退方法:

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
    new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
).fallback(v -> {
  //  当断路器断路时将调用此处代码
  return "hello";
});

breaker.<String>execute(
    promise -> {
      vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
        .compose(req -> req
          .send()
          .compose(resp -> {
            if (resp.statusCode() != 200) {
              return Future.failedFuture("HTTP error");
            } else {
              return resp.body().map(Buffer::toString);
            }
          })).onComplete(promise);
    });

重试

您还可以通过 setMaxRetries 方法设置重试频率。 如果您设置大于0的数值,失败的情况下会重试,直到重试次数等于该数值, 如果其中一次重试成功,那么会跳过剩下的重试。 仅当断路器关闭时才会支持重试。

注意
如您设置最大重试次数 maxRetries 为 2,那么您的代码在失败的情况将会执行3次:分别为初次请求,以及 2 次重试。

在默认情况下,多次重试之间的超时时间为0,这意味着重试将没有延时的一个接一个地执行, 然而,这会导致调用服务负载增加,并导致服务恢复时间延长。 所以为了减少这种情况,建议设置延时后再重试。

方法 retryPolicy 用于设置重试策略。 重试策略是一个以接收操作失败和重试计数为参数的函数,并在重试前以毫秒为单位返回超时时间。

它也允许用户定制更加复杂的延时策略,例如:使用Retry-After不可用服务发送的标头值。 同时提供了一些通用策略:RetryPolicy.constantDelayRetryPolicy.linearDelayRetryPolicy.exponentialDelayWithJitter

下面是一个带有抖动的指数延迟示例:

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
  new CircuitBreakerOptions().setMaxFailures(5).setMaxRetries(5).setTimeout(2000)
).openHandler(v -> {
  System.out.println("Circuit opened");
}).closeHandler(v -> {
  System.out.println("Circuit closed");
}).retryPolicy(RetryPolicy.exponentialDelayWithJitter(50, 500));

breaker.<String>execute(
  promise -> {
    vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
      .compose(req -> req
        .send()
        .compose(resp -> {
          if (resp.statusCode() != 200) {
            return Future.failedFuture("HTTP error");
          } else {
            return resp.body().map(Buffer::toString);
          }
        })).onComplete(promise);
  });

回调

您可以配置断路开启或关闭时的回调函数:

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
    new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
).openHandler(v -> {
  System.out.println("Circuit opened");
}).closeHandler(v -> {
  System.out.println("Circuit closed");
});

breaker.<String>execute(
    promise -> {
      vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
        .compose(req -> req
          .send()
          .compose(resp -> {
            if (resp.statusCode() != 200) {
              return Future.failedFuture("HTTP error");
            } else {
              return resp.body().map(Buffer::toString);
            }
          })).onComplete(promise);
    });

当断路器决定尝试复位的时候( half-open 状态),我们也可以注册 halfOpenHandler 的回调从而得到回调通知。

事件总线通知

每次断路器状态发生变化,都会在事件总线上发布一个事件。

要启用此功能,请将 notification address 设置为非 null 值:

options.setNotificationAddress(CircuitBreakerOptions.DEFAULT_NOTIFICATION_ADDRESS);

该事件中包含了断路器的指标,其计算需要您将以下依赖项添加到构建描述符中的 依赖管理 部分:

  • Maven (在您的 pom.xml 文件中):

<dependency>
 <groupId>org.hdrhistogram</groupId>
 <artifactId>HdrHistogram</artifactId>
 <version>2.1.12</version>
</dependency>
  • Gradle (在您的 build.gradle 文件中):

compile 'org.hdrhistogram:HdrHistogram:2.1.12'
注意

启用后,默认情况下,通知仅传递给本地消费者。 如果必须将通知发送给集群中的所有消费者,您可通过设置 setNotificationLocalOnly 以更改此行为。

每个总线通知都会包含一个 Json Object 对象,该对象包括以下字段:

  • state: 断路器最新的状态(OPEN, CLOSED, HALF_OPEN

  • name: 断路器的名称

  • failures: 错误次数

  • node: 节点标识 (如果事件总线并非运行在集群模式中,那么该值为:local

  • 指标信息

半开状态

当断路器处于开路状态时,对其调用会立即失败,不会执行实际操作。经过适当的时间 (通过 setResetTimeout 配置), 断路器决定是否恢复状态,此时进入半开启状态(half-open state)。在这种状态下, 允许下一次断路器的调用实际调用如果成功,断路器将复位并返回到关闭状态, 回归正常的模式;但是如果这次调用失败,则断路器返回到断路状态,直到下次半开状态。

异常

回退函数将会接收到:

将断路器指标推送到Hystrix看板(Dashboard)

Netflix Hystrix 带有一个看板(dashboard),用于显示断路器的当前状态。 Vert.x 断路器可以发布其指标(metric),以供 Hystrix 仪表板使用。 Hystrix 仪表板需要一个发送指标的 SSE 流, 此流由 HystrixMetricHandler 该 Vert.x Web 处理器所提供:

CircuitBreakerOptions options = new CircuitBreakerOptions()
  .setNotificationAddress(CircuitBreakerOptions.DEFAULT_NOTIFICATION_ADDRESS);
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx, new CircuitBreakerOptions(options));
CircuitBreaker breaker2 = CircuitBreaker.create("my-second-circuit-breaker", vertx, new CircuitBreakerOptions(options));

// 创建 Vert.x Web 路由
Router router = Router.router(vertx);
// 注册指标Handler
router.get("/hystrix-metrics").handler(HystrixMetricHandler.create(vertx));

// / 创建HTTP服务器,并分配路由
vertx.createHttpServer()
  .requestHandler(router)
  .listen(8080);

在 Hystrix 看板, 配置 stream url 地址,例如: http://localhost:8080/metrics. 现在就可以获取 Vert.x 的断路器指标了。

重要
这些指标量是由 Vert.x Web Handler 使用 事件总线(Event Bus)通知 收集。 您必须启用该功能并且,同时如您不想使用默认的通知地址,请在创建的时候设置。

使用 Netflix Hystrix

Hystrix 提供了断路器模式的实现。可以在 Vert.x 中使用 Hystrix 提供的断路器或组合使用。 本节介绍在 Vert.x 应用程序中使用 Hystrix 的技巧。

首先,您需要将 Hystrix 添加到您的依赖中。 详细信息请参阅 Hystrix 页面。然后,您需要使用 Command 隔离“受保护的”调用。 您可以这样执行之:

HystrixCommand<String> someCommand = getSomeCommandInstance();
String result = someCommand.execute();

但是,代码执行是阻塞的,所以需要使用 executeBlocking 方法执行, 或在 Worker Verticle 中调用:

HystrixCommand<String> someCommand = getSomeCommandInstance();
vertx.<String>executeBlocking(
future -> future.complete(someCommand.execute()),
ar -> {
// 回到Event Loop线程中
String result = ar.result();
}
);

如果您使用了 Hystrix 异步方法, 对应回调函数是不会在 Vert.x 的线程中执行的,因此我们必须在执行前保持上下的引用, (使用 getOrCreateContext 方法), 执行 runOnContext 方法将当前线程切换回Event Loop线程。 不这样做的话,您将失去Vert.x异步模型的优势,并且必须自行管理线程同步和执行顺序:

vertx.runOnContext(v -> {
    Context context = vertx.getOrCreateContext();
    HystrixCommand<String> command = getSomeCommandInstance();
    command.observe().subscribe(result -> {
        context.runOnContext(v2 -> {
            // 回到 Vert.x Context 下(Event Loop线程或Worker线程)
            String r = result;
        });
    });
});