1. 程式人生 > >在Java專案中使用traceId跟蹤請求全流程日誌

在Java專案中使用traceId跟蹤請求全流程日誌

最近在專案開發中遇到了一些問題,專案為多機部署,使用kibana收集日誌,但併發大時使用日誌定位比較麻煩,大量日誌輸出導致很難篩出指定請求的全部相關日誌,以及下游服務呼叫對應的日誌。因此計劃對專案日誌列印進行一些小改造,使用一個traceId跟蹤請求的全部路徑,前提是不修改原有的列印方式。

簡單的解決思路

想要跟蹤請求,第一個想到的就是當請求來時生成一個traceId放在ThreadLocal裡,然後列印時去取就行了。但在不改動原有輸出語句的前提下自然需要日誌框架的支援了,搜尋的一番發現主流日誌框架都提供了MDC功能。

MDC

MDC 介紹
MDC(Mapped Diagnostic Context,對映除錯上下文)是 log4j 和 logback 提供的一種方便在多執行緒條件下記錄日誌的功能。MDC 可以看成是一個與當前執行緒繫結的Map,可以往其中新增鍵值對。MDC 中包含的內容可以被同一執行緒中執行的程式碼所訪問。當前執行緒的子執行緒會繼承其父執行緒中的 MDC 的內容。當需要記錄日誌時,只需要從 MDC 中獲取所需的資訊即可。MDC 的內容則由程式在適當的時候儲存進去。對於一個 Web 應用來說,通常是在請求被處理的最開始儲存這些資料。

簡而言之,MDC就是日誌框架提供的一個InheritableThreadLocal,專案程式碼中可以將鍵值對放入其中,然後使用指定方式取出列印即可。

在 log4j 和 logback 的取值方式為:

%X{traceid}

初步實現

首先建立攔截器,加入攔截列表中,在請求到達時生成traceId。當然你還可以根據需求在此處後或後續流程中放入spanId、訂單流水號等需要列印的資訊。

import org.slf4j.MDC;

public class TraceInterceptor extends HandlerInterceptorAdapter {
    @Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) { // "traceId" MDC.put(Constants.LOG_TRACE_ID, TraceLogUtils.getTraceId()); return true; } }

然後在日誌配置xml檔案中新增traceId列印:

<property name="normal-pattern" value="[%p][%d{yyyy-MM-dd'T'HH:mm:ss.SSSZ,Asia/Shanghai}][%X{traceId}][%15.15t][%c:%L] %msg%n"
/>

初步改造完成!是不是感覺還挺簡單的?且慢,僅僅這樣的改造在實際使用過程中會遇到以下問題:

  • 執行緒池中的執行緒會列印錯誤的traceId
  • 呼叫下游服務後會生成新的traceId,無法繼續跟蹤

下面來一一解決這些問題。

支援執行緒池跟蹤

MDC使用的InheritableThreadLocal只是在執行緒被建立時繼承,但是執行緒池中的執行緒是複用的,後續請求使用已有的執行緒將打印出之前請求的traceId。這時候就需要對執行緒池進行一定的包裝,線上程在執行時讀取之前儲存的MDC內容。不僅自身業務會用到執行緒池,spring專案也使用到了很多執行緒池,比如@Async非同步呼叫,zookeeper執行緒池、kafka執行緒池等。不管是哪種執行緒池都大都支援傳入指定的執行緒池實現,拿@Async舉例:

@Bean("SpExecutor")
public Executor getAsyncExecutor() {
    // 對執行緒池進行包裝,使之支援traceId透傳
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor() {
        @Override
        public <T> Future<T> submit(Callable<T> task) {
            // 傳入執行緒池之前先複製當前執行緒的MDC
            return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
        }
        @Override
        public void execute(Runnable task) {
            super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
        }
    };
    executor.setCorePoolSize(config.getPoolCoreSize());
    ... // 其他配置
    executor.initialize();
    return executor;
}

public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) {
    return new Callable<T>() {
        @Override
        public T call() throws Exception {
            // 實際執行前匯入對應請求的MDC副本
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            if (MDC.get(Constants.LOG_TRACE_ID) == null) {
                MDC.put(Constants.LOG_TRACE_ID, TraceLogUtils.getTraceId());
            }
            try {
                return callable.call();
            } finally {
                MDC.clear();
            }
        }
    };
}

ThreadPoolExecutor的包裝也類似,注意為了嚴謹考慮,需要對連線池中的所有呼叫方法進行封裝。

ThreadPoolExecutor中有:

  • public void execute(Runnable command)
  • public <T> Future<T> submit(Callable<T> task)
  • public Future<?> submit(Runnable task)
  • public <T> Future<T> submit(Runnable task, T result)

ThreadPoolTaskExecutor中有:

  • public void execute(Runnable command)
  • public void execute(Runnable task, long startTimeout)
  • public Future<?> submit(Runnable task)
  • public <T> Future<T> submit(Runnable task, T result)
  • public <T> ListenableFuture<T> submitListenable(Callable<T> task)
  • public ListenableFuture<?> submitListenable(Runnable task)

方式與上述的實現類似,不做贅述。
提供一下我的工具類:

public class ThreadMdcUtil {

    public static void setTraceIdIfAbsent() {
        if (MDC.get(Constants.LOG_TRACE_ID) == null) {
            MDC.put(Constants.LOG_TRACE_ID, TraceLogUtils.getTraceId());
        }
    }

    public static void setTraceId() {
        MDC.put(Constants.LOG_TRACE_ID, TraceLogUtils.getTraceId());
    }

    public static void setTraceId(String traceId) {
        MDC.put(Constants.LOG_TRACE_ID, traceId);
    }

    public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) {
        return () -> {
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            setTraceIdIfAbsent();
            try {
                return callable.call();
            } finally {
                MDC.clear();
            }
        };
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
        return () -> {
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            setTraceIdIfAbsent();
            try {
                runnable.run();
            } finally {
                MDC.clear();
            }
        };
    }

    public static class ThreadPoolTaskExecutorMdcWrapper extends ThreadPoolTaskExecutor {
        @Override
        public void execute(Runnable task) {
            super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
        }

        @Override
        public void execute(Runnable task, long startTimeout) {
            super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()), startTimeout);
        }

        @Override
        public <T> Future<T> submit(Callable<T> task) {
            return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
        }

        @Override
        public Future<?> submit(Runnable task) {
            return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
        }

        @Override
        public ListenableFuture<?> submitListenable(Runnable task) {
            return super.submitListenable(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
        }

        @Override
        public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
            return super.submitListenable(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
        }
    }

    public static class ThreadPoolExecutorMdcWrapper extends ThreadPoolExecutor {
        public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                            BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }

        public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        }

        public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                            BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        }

        public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
                                            RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        }

        @Override
        public void execute(Runnable task) {
            super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
        }

        @Override
        public <T> Future<T> submit(Runnable task, T result) {
            return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()), result);
        }

        @Override
        public <T> Future<T> submit(Callable<T> task) {
            return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
        }

        @Override
        public Future<?> submit(Runnable task) {
            return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
        }
    }

    public static class ForkJoinPoolMdcWrapper extends ForkJoinPool {
        public ForkJoinPoolMdcWrapper() {
            super();
        }

        public ForkJoinPoolMdcWrapper(int parallelism) {
            super(parallelism);
        }

        public ForkJoinPoolMdcWrapper(int parallelism, ForkJoinWorkerThreadFactory factory,
                                      Thread.UncaughtExceptionHandler handler, boolean asyncMode) {
            super(parallelism, factory, handler, asyncMode);
        }

        @Override
        public void execute(Runnable task) {
            super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
        }

        @Override
        public <T> ForkJoinTask<T> submit(Runnable task, T result) {
            return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()), result);
        }

        @Override
        public <T> ForkJoinTask<T> submit(Callable<T> task) {
            return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
        }
    }
}

下游服務使用相同traceId

以上方式在多級服務呼叫中每個服務都會生成新的traceId,導致無法銜接跟蹤。這時就需要對http呼叫工具進行相應的改造了,在傳送http請求時自動將traceId新增到header中,以RestTemplate為例,註冊攔截器:

// 以下省略其他相關配置
RestTemplate restTemplate = new RestTemplate();
// 使用攔截器包裝http header
restTemplate.setInterceptors(new ArrayList<ClientHttpRequestInterceptor>() {
    {
        add((request, body, execution) -> {
            String traceId = MDC.get(Constants.LOG_TRACE_ID);
            if (StringUtils.isNotEmpty(traceId)) {
                request.getHeaders().add(Constants.HTTP_HEADER_TRACE_ID, traceId);
            }
            return execution.execute(request, body);
        });
    }
});

HttpComponentsClientHttpRequestFactory factory = new HttpComponentsClientHttpRequestFactory();
// 注意此處需開啟快取,否則會報getBodyInternal方法“getBody not supported”錯誤
factory.setBufferRequestBody(true);
restTemplate.setRequestFactory(factory);

下游服務的攔截器改為:

public class TraceInterceptor extends HandlerInterceptorAdapter {

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
        String traceId = request.getHeader(Constants.HTTP_HEADER_TRACE_ID);
        if (StringUtils.isEmpty(traceId)) {
            traceId = TraceLogUtils.getTraceId();
        }
        MDC.put(Constants.LOG_TRACE_ID, traceId);
        return true;
    }
}

若使用自定義的http客戶端,則直接修改其工具類即可。

針對其他協議的呼叫暫無實踐經驗,可以借鑑上面的思路,通過攔截器插入特定欄位,再在下游讀取指定欄位加入MDC中。

總結

實現日誌跟蹤的基本方案沒有太大難度,重在實踐中發現問題並一層一層解決問題的思路。