Vert.x 断路器

Circuit Breaker 是Vert.x 断路 模式 的实现。 它用来追踪故障,当失败次数达到阈值时 触发断路 ,并提供可选择的失败回退。

支持以下故障:

  • Future 内记录的失败

  • 代码里主动抛出异常

  • 没有完成的 Future(即超时)。

断路器要旨是保障其操作是非阻塞且异步的, 以受益于Vert.x 执行模型。

准备工作

使用Vert.x Circuit Breaker之前, 你必须在你的项目中添加如下 依赖

  • Maven(在您的 pom.xml 文件中):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-circuit-breaker</artifactId>
 <version>4.1.8</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中):

compile 'io.vertx:vertx-circuit-breaker:4.1.8'

使用断路器

使用断路器需要按以下步骤进行:

  1. 创建一个断路器,并配置成你所需要的超时,最大故障次数等参数

  2. 使用断路器执行代码

重要!!! 断路器应该是稳定的单例,而不是每次使用就重新创建它。推荐将该单例存放在某个领域中。

例子:

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
    new CircuitBreakerOptions()
        .setMaxFailures(5) // 最大失败数
        .setTimeout(2000) // 超时时间
        .setFallbackOnFailure(true) // 失败后是否调用回退函数(fallback)
        .setResetTimeout(10000) // 在开启状态下,尝试重试之前所需时间
);

// ---
// 将断路器存放在某个领域中并像如下方式使用
// ---

breaker.execute(promise -> {
  // 在断路器中执行的代码
  // 这里的代码可以成功或者失败,
  // 如果该 promise 在这里被标记为失败,断路器将自增失败数
}).onComplete(ar -> {
  // 处理结果.
});

execute代码块接收 Future 对象参数,以标识该操作以及结果的失败或成功。 例如:在下面的例子中, 对应的结果就是REST调用的输出:

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
    new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
);

// ---
// Store the circuit breaker in a field and access it as follows
// ---

breaker.<String>execute(promise -> {
  vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
    .compose(req -> req
      .send()
      .compose(resp -> {
        if (resp.statusCode() != 200) {
          return Future.failedFuture("HTTP error");
        } else {
          return resp.body().map(Buffer::toString);
        }
      })).onComplete(promise);
}).onComplete(ar -> {
  // 处理结果
});

操作的结果以下面的方式提供:

  • 调用 execute 方法时,返回 Future

  • 调用 executeAndReport 方法时,提供 Future 参数

也可以增加一个可选参数,用于断路时进行失败回退(fallback):

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
    new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
);

// ---
// 断路器会临时存储该次运行结果,用于断路判断
// ---

breaker.executeWithFallback(
    promise -> {
      vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
        .compose(req -> req
          .send()
          .compose(resp -> {
            if (resp.statusCode() != 200) {
              return Future.failedFuture("HTTP error");
            } else {
              return resp.body().map(Buffer::toString);
            }
          })).onComplete(promise);
      }, v -> {
      // 当断路器断路时,返回Hello
      return "Hello";
    })
    .onComplete(ar -> {
        // 处理结果
    });

每当断路器断路的时候,都会调用失败回退(fallback),也可以调用 isFallbackOnFailure 方法开启失败回退。 当回退函数被成功设置之后,回退函数将会接收 Throwable 对象为参数并返回预期类型。

通过 CircuitBreaker 直接设置失败回退方法:

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
    new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
).fallback(v -> {
  //  当断路器断路时将调用此处代码
  return "hello";
});

breaker.<String>execute(
    promise -> {
      vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
        .compose(req -> req
          .send()
          .compose(resp -> {
            if (resp.statusCode() != 200) {
              return Future.failedFuture("HTTP error");
            } else {
              return resp.body().map(Buffer::toString);
            }
          })).onComplete(promise);
    });

重试

还可以通过 setMaxRetries. 设置重试次数,如设置大于0的数值,失败的情况下会重试,直到重试次数等于该数值,如果其中一次重试成功, 那么会跳过剩下的重试。

注意 如您设置最大重试次数 maxRetries 为 2,那么您的代码在失败的情况将会执行3次,分别为初次请求, 以及 2 次重试。

在默认情况下超时时间(timeout)和重试次数(retries)为0,那么将会无延时地一直请求下去,这会导致调用服务负载增加 并导致服务恢复时间延长。所以为了减少这种情况,建议设置延时以及重试次数。 方法 retryPolicy 用于设置重试策略。该方法接收一个Function<Integer,Long>的函数体(传入参数为重试次数,返回具体超时时间,单位:毫秒), 允许用户定制更加复杂的延时策略,例如:带抖动的延时补偿。

下面是设置了重试策略的例子,重试超时时间与重试时间呈线指数增长。

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
  new CircuitBreakerOptions().setMaxFailures(5).setMaxRetries(5).setTimeout(2000)
).openHandler(v -> {
  System.out.println("Circuit opened");
}).closeHandler(v -> {
  System.out.println("Circuit closed");
}).retryPolicy(retryCount -> retryCount * 100L);

breaker.<String>execute(
  promise -> {
    vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
      .compose(req -> req
        .send()
        .compose(resp -> {
          if (resp.statusCode() != 200) {
            return Future.failedFuture("HTTP error");
          } else {
            return resp.body().map(Buffer::toString);
          }
        })).onComplete(promise);
  });

回调

您可以配置断路开启或关闭时的回调函数:

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
    new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
).openHandler(v -> {
  System.out.println("Circuit opened");
}).closeHandler(v -> {
  System.out.println("Circuit closed");
});

breaker.<String>execute(
    promise -> {
      vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
        .compose(req -> req
          .send()
          .compose(resp -> {
            if (resp.statusCode() != 200) {
              return Future.failedFuture("HTTP error");
            } else {
              return resp.body().map(Buffer::toString);
            }
          })).onComplete(promise);
    });

当断路器决定尝试复位的时候( half-open 状态),我们也可以注册 halfOpenHandler 的回调从而得到回调通知。

事件总线通知

每当断路器发生状态改变的时候,断路器都会在事件总线上推送通知,总线通默认地址为:vertx.circuit-breaker。 当然这个也是可以配置的,调用方法 setNotificationAddress. If null is 你可以设置总线通知地址。如果设置为 null 那么总线通知将被禁用。

每个总线通知都会包含一个 Json Object对象,该对象包括以下字段:

  • state : 断路器最新的状态(OPEN, CLOSED, HALF_OPEN

  • name : 断路器的名称

  • failures : 错误次数

  • node : 节点标识 (如果事件总线并非运行在集群模式中,那么该值为:local)

半开状态

当断路器处于开路状态时,对其调用会立即失败,不会执行实际操作。经过适当的时间 (通过 setResetTimeout 配置), 断路器决定是否恢复状态,此时进入半开启状态(half-open state)。在这种状态下, 允许下一次断路器的调用实际调用如果成功,断路器将复位并返回到关闭状态, 回归正常的模式;但是如果这次调用失败,则断路器返回到断路状态,直到下次半开状态。

异常

回退函数将会接收到:

将断路器指标推送到Hystrix看板(Dashboard)

Netflix Hystrix带有一个看板(dashboard),用于显示断路器的当前状态。 Vert.x 断路器可以发布其指标(metric),以供Hystrix 仪表板使用。 Hystrix 仪表板需要一个发送指标的SSE流, 此流由 HystrixMetricHandler 该 Vert.x Web 处理器所提供:

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx);
CircuitBreaker breaker2 = CircuitBreaker.create("my-second-circuit-breaker", vertx);

// 创建 Vert.x Web 路由
Router router = Router.router(vertx);
// 注册指标Handler
router.get("/hystrix-metrics").handler(HystrixMetricHandler.create(vertx));

// / 创建HTTP服务器,并分配路由
vertx.createHttpServer()
  .requestHandler(router)
  .listen(8080);

在Hystrix 看板, 配置 stream url 地址,例如: http://localhost:8080/metrics. 现在就可以获取Vert.x的断路器指标了。

注意:这些指标量是由 Vert.x Web Handler 使用 Event Bus 事件通知收集。 如您不想使用默认的通知地址,请在创建的时候设置。

使用 Netflix Hystrix

Hystrix 提供了断路器模式的实现。可以在Vert.x中使用Hystrix提供的断路器或组合使用。 本节介绍在Vert.x应用程序中使用Hystrix的技巧。

首先,您需要将Hystrix添加到你的依赖中。 详细信息请参阅Hystrix页面。然后,您需要使用 Command 隔离“受保护的”调用。 您可以这样执行之:

HystrixCommand<String> someCommand = getSomeCommandInstance();
String result = someCommand.execute();

但是,代码执行是阻塞的,所以需要使用 executeBlocking 方法执行, 或在Worker Verticle中调用:

HystrixCommand<String> someCommand = getSomeCommandInstance();
vertx.<String>executeBlocking(
future -> future.complete(someCommand.execute()),
ar -> {
// 回到Event Loop线程中
String result = ar.result();
}
);

如果你使用了Hystrix异步方法, 对应回调函数是不会在Vert.x的线程中执行的,因此我们必须在执行前保持上下的引用, (使用 getOrCreateContext 方法), 执行 runOnContext 方法将当前线程切换回Event Loop线程。 不这样做的话,您将失去Vert.x异步模型的优势,并且必须自行管理线程同步和执行顺序:

vertx.runOnContext(v -> {
    Context context = vertx.getOrCreateContext();
    HystrixCommand<String> command = getSomeCommandInstance();
    command.observe().subscribe(result -> {
        context.runOnContext(v2 -> {
            // 回到Vert.x Context下(Event Loop线程或Worker线程)
            String r = result;
        });
    });
});