Vertx OpenTracing

Vert.x 借助 Jaeger 的支持集成了 OpenTracing 。

您可以使用 Jaeger 客户端的下列配置项来配置 Vert.x 环境

Vertx vertx = Vertx.vertx(new VertxOptions()
  .setTracingOptions(
    new OpenTracingOptions()
  )
);

您也可以传递一个自定义的 Tracer 来更好地调控 配置项

Vertx vertx = Vertx.vertx(new VertxOptions()
  .setTracingOptions(
    new OpenTracingOptions(tracer)
  )
);

追踪策略

当追踪功能启用时, 一个组件的行为由追踪策略来定义:

  • PROPAGATE: 该组件上报活跃trace中的span

  • ALWAYS: 该组件上报活跃trace中的span或者创建一个新的活跃trace

  • IGNORE: 该组件不会参与任何的trace

tracing 策略通常可在组件选项里配置

HTTP tracing

Vert.x HTTP 服务端和客户端根据 HTTP 请求上报跨度(span):

  • operationName: 当前HTTP方法

  • tags

  • http.method: HTTP 请求方法

  • http.url: 请求 URL

  • http.status_code: HTTP 状态码

HTTP服务端默认的追踪策略是 ALWAYS, 您可以使用 setTracingPolicy 来调配该策略。

HttpServer server = vertx.createHttpServer(new HttpServerOptions()
  .setTracingPolicy(TracingPolicy.IGNORE)
);

HTTP 客户端默认的追踪策略是 PROPAGATE , 您可以使用 setTracingPolicy 来调配该策略。

HttpClient client = vertx.createHttpClient(new HttpClientOptions()
  .setTracingPolicy(TracingPolicy.IGNORE)
);

要启动客户端调用的追踪, 您需要先创建它并使得 Vert.x 通过使用 OpenTracingUtil.setSpan 留意到这一点:

Span span = tracer.buildSpan("my-operation")
  .withTag("some-key", "some-value")
  .start();
OpenTracingUtil.setSpan(span);
// 做一些事,例如客户端发送请求
span.finish();

在包含有两个 Vert.x 服务的 HTTP 场景中,将会在客户端创建一个跨度, 而后追踪上下文将在服务器端传播,另一个跨度将添加到追踪中。

事件总线追踪

Vert.x 事件总线围绕消息交换报告跨度。

默认的追踪策略是 PROPAGATE , 您可以使用 setTracingPolicy 来调配该策略.

DeliveryOptions options = new DeliveryOptions().setTracingPolicy(TracingPolicy.ALWAYS);
vertx.eventBus().send("the-address", "foo", options);

获取当前的 Span

Vert.x 使用本地上下文来保存当前的 Span 对象。 为了获取当前的 Span 实例,请使用 OpenTracingUtil.getSpan() 方法

这个方法只能在 Vert.x 的线程上使用 (线程必须是 VertxThread 的实例)。 在一个不是 Vert.x 的线程上调用该方法是不符合设计预期的,因此该方法会返回 null。

协程支持

现在并没有提供对协程的直接支持,但是通过极小的改动就可以做到。

为了支持协程,您需要做以下几步。

  1. 使用 CoroutineVerticle

  2. 将您的 每个路由处理器 转换为协程。

  3. 使用 CoroutineContext 来保存 Tracer 和当前的 Span 对象

示例代码:

class TracedVerticle(private val tracer: Tracer): CoroutineVerticle() {
   override suspend fun start() {
       val router = Router.router(vertx)

       router.route("/hello1")
           .method(HttpMethod.GET)
           .coroutineHandler { ctx ->                          // (1)
               launch { println("Hello to Console") }
               ctx.end("Hello from coroutine handler")
           }

       router.route("/hello2")
           .method(HttpMethod.GET)
           .coroutineHandler(::nonSuspendHandler)              // (2)

       vertx.createHttpServer()
           .requestHandler(router)
           .listen(8080)
           .await()
   }

   private fun nonSuspendHandler(ctx: RoutingContext) {
       ctx.end("Hello from usual handler")
   }

   private fun Route.coroutineHandler(handler: Handler<RoutingContext>): Route = // (3)
       this.coroutineHandler(handler::handle)

   private fun Route.coroutineHandler(                                           // (4)
       handler: suspend (RoutingContext) -> (Unit)
   ): Route = handler { ctx ->
       val span: Span = OpenTracingUtil.getSpan()                                // (5)
       launch(ctx.vertx().dispatcher() + SpanElement(tracer, span)) {            // (6)
           val spanElem = coroutineContext[SpanElement]                          // (7)
           if (spanElem == null) {
               handler(ctx)
           } else {
               val span = spanElem.span
               val tracer = spanElem.tracer
               val childSpan = span                                                // (8)
               try {
                   withContext(SpanElement(tracer, childSpan)) { handler(ctx) }    // (9)
               } finally {
                   // childSpan.finish()                                           // (10)
               }
           }
           // 或为了复用代码而创建一个帮助方法
           withContextTraced(coroutineContext) {
               try {
                   handler(ctx)
               } catch (t: Throwable) {
                   ctx.fail(t)
               }
           }
       }
   }
}
  1. 使用 coroutineHandler 扩展方法来创建一个协程处理器。

  2. 创建一个普通的异步处理器,并将其包装在一个协程中。

  3. 使用扩展方法来讲 Handler<RoutingContext> 转化为一个可挂起的函数。

  4. 声明一个用来在 Vert.x 的事件循环上创建并执行协程的扩展方法。

  5. 自动从当前的 Vert.x 上下文中获取 Span 实例(该方法会自动这么做)。

  6. 创建一个包装协程,并将当前的 Span 添加到 CoroutineContext 中。

  7. 从协程上下文中恢复 Span

  8. 复用 span 或使用 tracer.buildSpan("").asChildOf(span).start() 方法创建一个新的 Span

  9. 将这个 Span 放入上下文中

  10. 如果您创建了一个新的话,请终结掉之前的 childSpan

以下是一些有用的代码,您的实现可以参考:

/**
* 保存一个 tracer 和当前协程上下文中的 Span 的引用
*/
class SpanElement(val tracer: Tracer, val span: Span) :
   ThreadContextElement<Scope>,
   AbstractCoroutineContextElement(SpanElement) {

   companion object Key : CoroutineContext.Key<SpanElement>

   /**
   *  在协程挂起后,关闭 [Scope]
   */
   override fun restoreThreadContext(context: CoroutineContext, oldState: Scope) {
       oldState.close()
   }

   /**
   * 当协程恢复后,创建新的 [Scope] , scope 激活后可以提供 [span] 的实例
   */
   override fun updateThreadContext(context: CoroutineContext): Scope {
       return tracer.activateSpan(span)
   }
}

/**
* 更高级的帮助代码,包含一些选项,并且展示了怎么使用 MDCContext 来将 Span 传递给 logger。
*/
suspend fun <T> withContextTraced(
   context: CoroutineContext,
   reuseParentSpan: Boolean = true,
   block: suspend CoroutineScope.() -> T
): T {
   return coroutineScope {
       val spanElem = this.coroutineContext[SpanElement]

       if (spanElem == null) {
           logger.warn { "Calling 'withTracer', but no span found in context" }
           withContext(context, block)
       } else {
           val childSpan = if (reuseParentSpan) spanElem.span
           else spanElem.tracer.buildSpan("").asChildOf(spanElem.span).start()

           try {
               val mdcSpan = mapOf(MDC_SPAN_KEY to childSpan.toString())
               withContext(context + SpanElement(spanElem.tracer, childSpan) + MDCContext(mdcSpan), block)
           } finally {
               if (!reuseParentSpan) childSpan.finish()
           }
       }
   }
}
private const val MDC_SPAN_KEY = "request.span.id"