Vert.x Circuit Breaker

    • Circuit Breaker:熔断器
    • metric:指标

    组件介绍

    熔断器用来追踪故障次数,当失败次数达到阈值时触发熔断,并且可选择性提供失败回调。

    熔断器支持以下的故障:

    • 使用 时失败
    • 运行时抛出异常
    • 没有完成的 Future(超时)

    熔断器要旨是保护 Vert.x 的 非阻塞异步 的行为,以便受益于Vert.x 执行模型。

    使用 Vert.x 熔断器

    要使用 Vert.x 熔断器,只需要在依赖中增加以下代码片段:

    • Maven (在 pom.xml 文件中):
    • Gradle (在 build.gradle 文件中):
    1. compile 'io.vertx:vertx-circuit-breaker:3.4.1'

    为了使用熔断器我们需要以下的步骤:

    • 创建一个熔断器,并配置成你所需要的(超时,最大故障次数)
    • 使用熔断器执行代码

    以下是例子:

    1. CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
    2. new CircuitBreakerOptions()
    3. .setMaxFailures(5) // 最大故障次数
    4. .setTimeout(2000) // 超时时间
    5. .setFallbackOnFailure(true) // 设置是否失败回调
    6. .setResetTimeout(10000) // 重置状态超时
    7. );
    8. breaker.execute(future -> {
    9. // 在熔断器中执行的代码
    10. // 这里的代码可以成功或者失败
    11. // 如果future在这里被标记为失败,熔断器将自增失败数
    12. }).setHandler(ar -> {
    13. // 处理结果

    执行块中接收 Future 作为参数,以表示操作和结果的成功或失败。 例如在下面的例子中,对应的结果就是REST调用的输出:

    • 调用 execute 方式返回 Future
    • 调用 executeAndReport 时作为参数提供的

    也可以提供一个失败时回调方法(fallback):

    1. CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
    2. new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
    3. );
    4. breaker.executeWithFallback(
    5. future -> {
    6. vertx.createHttpClient().getNow(8080, "localhost", "/", response -> {
    7. if (response.statusCode() != 200) {
    8. future.fail("HTTP error");
    9. } else {
    10. response
    11. .exceptionHandler(future::fail)
    12. .bodyHandler(buffer -> {
    13. future.complete(buffer.toString());
    14. }
    15. });
    16. }, v -> {
    17. // 当熔断器熔断时将调用此处代码
    18. return "Hello";
    19. })
    20. .setHandler(ar -> {
    21. // 处理结果
    22. });

    熔断状态中都会调用失败回调(fallback),或者设置 isFallbackOnFailure,其结果是失败回调函数的输出。失败回调函数将 对象作为参数,并返回预期类型的​​对象。

    失败回调可以直接设置在 CircuitBreaker 上:

    1. CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
    2. new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
    3. ).fallback(v -> {
    4. // 当熔断器熔断时将调用此处代码
    5. return "hello";
    6. });
    7. breaker.execute(
    8. future -> {
    9. vertx.createHttpClient().getNow(8080, "localhost", "/", response -> {
    10. future.fail("HTTP error");
    11. } else {
    12. response
    13. .exceptionHandler(future::fail)
    14. .bodyHandler(buffer -> {
    15. future.complete(buffer.toString());
    16. });
    17. }
    18. });
    19. });

    可以指定熔断器在生效之前的尝试次数,使用 。如果将其设置为高于0的值,则您的代码在最终失败之前进行尝试多次执行。如果代码在其中一个重试中成功,则处理程序将得到通知,并且跳过剩余的重试。此配置仅当熔断器未生效时工作。

    回调

    你能够配置熔断生效/关闭时回调。

    当熔断器决定尝试复位的时候( half-open 状态),我们也可以注册 的回调从而得到回调通知。

    Event Bus 通知

    每次熔断器状态发生变化时,会在Event Bus上发布事件。事件发送的地址可以使用 进行配置。如果将 null 传递给此方法,则通知将被禁用。默认情况下,使用的地址是 vertx.circuit-breaker
    每次事件信息包含以下:

    • : 熔断器的新状态 (OPENCLOSEDHALF_OPEN
    • name: 熔断器的名字
    • failures: 故障的数量
    • node: 节点的标志符(如果运行在单节点模式是 local

    将熔断器指标推送到Hystrix Dashboard

    带有一个仪表板(dashboard),用于显示熔断器的当前状态。 Vert.x 熔断器可以发布其指标(metric),以供Hystrix 仪表板使用。 Hystrix 仪表板需要一个发送指标的SSE流,此流由 HystrixMetricHandler 这个 Vert.x Web Handler 提供:

    1. CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx);
    2. CircuitBreaker breaker2 = CircuitBreaker.create("my-second-circuit-breaker", vertx);
    3. // 创建 Vert.x Web 路由
    4. Router router = Router.router(vertx);
    5. // 注册指标Handler
    6. router.get("/hystrix-metrics").handler(HystrixMetricHandler.create(vertx));
    7. // 创建HTTP服务器,并分配路由
    8. vertx.createHttpServer()
    9. .requestHandler(router::accept)
    10. .listen(8080);

    在Hystrix 仪表板中,配置流网址(stream url),如:。仪表板将使用Vert.x熔断器的指标。

    请注意,这些指标量是由 Vert.x Web Handler 使用 Event Bus 事件通知收集的。如果您不使用默认通知地址,则需要在创建时指定。

    使用 Netflix Hystrix

    提供了熔断器模式的实现。可以在Vert.x中使用Hystrix提供的熔断器或组合使用。本节介绍在Vert.x应用程序中使用Hystrix的技巧。

    首先,您需要将Hystrix添加到你的依赖中。详细信息请参阅Hystrix页面。然后,您需要使用 Command 隔离“受保护的”调用。你可以这样执行它:

    1. HystrixCommand<String> someCommand = getSomeCommandInstance();
    2. String result = someCommand.execute();

    但是,命令执行是阻塞的,必须结合 executeBlocking 方法执行,或者在Worker Verticle中调用:

    如果您使用Hystrix的异步支持,请注意,对应的回调函数不会在Vert.x线程中执行,并且必须在执行前保留对上下文的引用(使用 getOrCreateContext ),并且在回调中,执行 runOnContext 函数将当前线程切换回Event Loop线程。如果不这样做的话,您将失去Vert.x并发模型的优势,并且必须自行管理线程同步和执行顺序:

    1. vertx.runOnContext(v -> {
    2. Context context = vertx.getOrCreateContext();
    3. HystrixCommand<String> command = getSomeCommandInstance();
    4. command.observe().subscribe(result -> {
    5. context.runOnContext(v2 -> {
    6. // 回到Vert.x Context下(Event Loop线程或Worker线程)
    7. String r = result;
    8. });
    9. });