MDC

2023/08/04 零散知识点 共 28719 字,约 83 分钟
闷骚的程序员

1. MDC追踪日志的底层原理?

MDC(Mapped Diagnostic Context,映射诊断上下文)追踪日志的底层原理基于线程本地存储(ThreadLocal)机制来实现的。MDC 允许你为每个线程维护独立的上下文信息,并确保这些信息能在日志输出时自动添加到日志条目中,方便日志追踪。

以下是 MDC 追踪日志的底层原理:

  1. 线程局部变量(ThreadLocal): MDC 使用 ThreadLocal 来为每个线程存储一个 Map(key-value 键值对)。每个线程都有自己的独立存储,彼此之间互不干扰。MDC 会将你设置的上下文数据存储在这个线程局部变量中。

  2. 日志输出时上下文自动注入: 当某个线程在执行任务时,MDC 中的上下文数据(例如,用户 ID、请求 ID 等)会随着日志记录一起输出。日志框架(如 Logback、Log4j 等)会在每次记录日志时自动从 MDC 中提取相关信息并插入日志消息的模板中。这是通过日志框架的布局(layout)实现的,通常你会在日志格式配置中通过占位符(如 %X{key})来引用 MDC 数据。

  3. 跨线程传递 MDC: 虽然 ThreadLocal 使 MDC 在单个线程中可以正常工作,但跨线程的场景(例如在线程池或异步任务中)需要手动进行 MDC 的传递。这通常可以通过手动复制 MDC 上下文到子线程或使用工具类(如 MDC.copyOfContextMap()MDC.setContextMap())来实现。很多框架(如 Spring、ExecutorService)也提供了对 MDC 的支持,以自动传播上下文。

  4. 清理 MDC: 由于 MDC 的上下文是存储在线程局部变量中,因此在每次请求或任务结束时,必须手动调用 MDC.clear() 来清理上下文信息,避免数据泄漏或干扰后续的日志记录。

MDC 使用示例

import org.slf4j.MDC;

public class LogExample {
    public static void main(String[] args) {
        try {
            // 设置MDC上下文
            MDC.put("userId", "12345");
            MDC.put("requestId", "abcde");

            // 记录日志
            log.info("This is a log with MDC context");

        } finally {
            // 清理MDC上下文
            MDC.clear();
        }
    }
}

在日志配置中可以使用如下格式:

<pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg [%X{userId}, %X{requestId}]%n</pattern>

这将确保每条日志都会输出 userIdrequestId 的值。

总的来说,MDC 提供了一种简便的方式来在同一线程内维护上下文信息,并且通过日志框架将这些上下文与日志记录绑定在一起,方便日志追踪和排查问题。

2. 结合复杂的completableFuture中启动了很多异步线程池,该如何透传MDC呢?现在有的请求有uuid,有的请求缺失了uuid

CompletableFuture 等异步任务场景下,由于 MDC 基于 ThreadLocal,默认情况下 MDC 中的上下文信息不会自动传递到异步线程池中的新线程中。因此,当你在使用 CompletableFuture 执行异步任务时,MDC 中的信息可能会丢失,导致某些请求的日志中缺少必要的上下文信息(如 UUID)。为了解决这个问题,你需要手动实现 MDC 的上下文传递,确保异步任务中的线程也能正确获取和使用主线程的 MDC 数据。

解决方案:手动传递 MDC 上下文

我们可以通过手动传递 MDC 的上下文信息,将主线程的 MDC 数据复制到异步线程中。在执行 CompletableFuture 任务时,可以通过包装任务逻辑来实现这一点。

步骤 1: 拷贝并传递 MDC

在主线程执行 CompletableFuture 任务时,先获取主线程的 MDC 上下文副本,并将其传递给异步线程。在异步线程中,设置 MDC 上下文,并在任务执行完成后清理。

实现方法

  1. 获取当前线程的 MDC 上下文: 使用 MDC.getCopyOfContextMap() 方法可以获取当前线程的 MDC 数据副本,作为 Map<String, String>

  2. 在异步线程中设置 MDC: 使用 MDC.setContextMap() 方法将主线程中的 MDC 上下文传递给异步线程。

  3. 清理 MDC 上下文: 确保在异步任务结束时清理 MDC 上下文,防止数据泄漏或影响后续任务。

示例代码

以下是一个使用 CompletableFuture 进行异步操作,并且正确传递 MDC 的示例:

import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AsyncMDCExample {

    // 包装异步任务,确保MDC上下文能在异步线程中传递
    public static <T> CompletableFuture<T> supplyAsyncWithMDC(
            CompletableFuture<T> future, ExecutorService executor, Map<String, String> contextMap) {
        
        return future.supplyAsync(() -> {
            try {
                // 在异步线程中设置MDC上下文
                if (contextMap != null) {
                    MDC.setContextMap(contextMap);
                }
                // 执行任务逻辑
                return performTask();
            } finally {
                // 确保清理MDC上下文
                MDC.clear();
            }
        }, executor);
    }

    public static String performTask() {
        // 模拟任务逻辑
        return "Task Result";
    }

    public static void main(String[] args) {
        // 创建线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);

        try {
            // 设置主线程的MDC上下文
            MDC.put("traceId", "12345");
            MDC.put("requestId", "abcde");

            // 获取主线程的MDC上下文副本
            Map<String, String> contextMap = MDC.getCopyOfContextMap();

            // 启动多个异步任务,透传MDC上下文
            CompletableFuture<String> future1 = supplyAsyncWithMDC(
                    new CompletableFuture<>(), executor, contextMap);
            CompletableFuture<String> future2 = supplyAsyncWithMDC(
                    new CompletableFuture<>(), executor, contextMap);

            // 等待所有任务完成
            CompletableFuture.allOf(future1, future2).join();

            // 获取任务结果
            System.out.println(future1.join());
            System.out.println(future2.join());
        } finally {
            // 关闭线程池
            executor.shutdown();
            // 清理主线程的MDC上下文
            MDC.clear();
        }
    }
}

步骤 2: 简化MDC传递

为简化 MDC 传递的流程,可以使用一些工具类,包装 RunnableCallable,自动处理 MDC 上下文的获取、设置和清理。例如:

import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.Callable;

public class MDCContextUtils {

    // 包装Callable以传递MDC
    public static <V> Callable<V> wrapWithMDC(Callable<V> task, Map<String, String> contextMap) {
        return () -> {
            try {
                // 设置MDC上下文
                if (contextMap != null) {
                    MDC.setContextMap(contextMap);
                }
                // 执行任务
                return task.call();
            } finally {
                // 清理MDC上下文
                MDC.clear();
            }
        };
    }

    // 包装Runnable以传递MDC
    public static Runnable wrapWithMDC(Runnable task, Map<String, String> contextMap) {
        return () -> {
            try {
                // 设置MDC上下文
                if (contextMap != null) {
                    MDC.setContextMap(contextMap);
                }
                // 执行任务
                task.run();
            } finally {
                // 清理MDC上下文
                MDC.clear();
            }
        };
    }
}

这样你就可以很方便地在使用 CompletableFuture 时传递 MDC 信息:

CompletableFuture<Void> future = CompletableFuture.runAsync(
        MDCContextUtils.wrapWithMDC(() -> performTask(), MDC.getCopyOfContextMap()), executor);

总结

CompletableFuture 或其他异步场景中使用 MDC 透传,可以通过如下步骤解决:

  1. 使用 MDC.getCopyOfContextMap() 获取当前线程的 MDC 上下文副本。
  2. 使用 MDC.setContextMap() 在异步线程中设置 MDC 上下文。
  3. 确保在任务执行完后使用 MDC.clear() 清理上下文。

通过这些步骤,你可以确保在复杂的异步操作中,MDC 上下文不会丢失,从而保证所有请求的日志都有一致的上下文信息。

3. 有如下一段代码:


package com.riskcontrol.cron.support;

import com.riskcontrol.common.enums.RiskFilterSourceEnum;
import com.riskcontrol.common.enums.RiskFilterStatusEnum;
import com.riskcontrol.common.enums.RiskFilterTypeEnum;
import com.riskcontrol.common.enums.RuleEnum;
import com.riskcontrol.common.exception.BusinessException;
import com.riskcontrol.common.utils.LogUtils;
import com.riskcontrol.cron.constants.ProjectConstant;
import com.riskcontrol.cron.engine.OriWithdrawReq;
import com.riskcontrol.cron.engine.WithdrawContext;
import com.riskcontrol.cron.entity.RiskFilterLog;
import com.riskcontrol.cron.service.WithdrawService;
import com.riskcontrol.cron.utils.ProductConstantsLoader;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static com.riskcontrol.cron.enums.WithdrawFilterEnum.CURRENT_ENTER_MANUALLY;
import static com.riskcontrol.cron.enums.WithdrawFilterEnum.NEXT_ENTER_MANUALLY;

@Component
@Slf4j
public class WithdrawServiceDelegate {

    @Resource(name = "withdrawExecutorService")
    private ExecutorService executorService;

    @Resource(name = "withdrawExecutorTimeOutService")
    private ExecutorService executorTimeOutService;

    @Resource
    private WithdrawService withdrawService;

    /**
     * 处理取款消息
     *
     * @param req
     */
    public void handleWithdraw(OriWithdrawReq req) {
        log.info("进入熔断流程处理,正常流程和熔断流程异步进行");
        // 设置风控日志记录器
        req.setRiskFilterLogger(withdrawService::handleRiskLog);
        int out = obtainTimeOut();
        Map<String, String> mdcContextMap = LogUtils.getMDCContextMap();
        CompletableFuture.supplyAsync(() -> needManual(req), executorService).thenCompose(needManual -> {
            if (Objects.isNull(needManual)) {
                log.info("[双流程]当前消息 requestId {} 非法,消息丢弃", req.getRequestId());
                return null;
            }
            if (needManual) {
                // 转人工处理
                return CompletableFuture.supplyAsync(() -> {
                    log.info("[双流程]当前消息 requestId {} 上一笔请求状态为-4,直接进入人工处理", req.getRequestId());
                    LogUtils.setMDCContextMap(mdcContextMap);
                    return withdrawService.modifyExceptionPrompt(withdrawService.weaveLogicForWithdrawContext(new WithdrawContext(), c -> {
                        c.setReq(req);
                        c.setAutoApprove(false);
                        c.setExceptionPromptType(CURRENT_ENTER_MANUALLY.getType());
                        c.setExceptionPrompt(CURRENT_ENTER_MANUALLY.getFilterMsg());
                        return c;
                    }));
                }, executorService);
            } else {
                return CompletableFuture.supplyAsync(() -> {
                    log.info("[双流程][正常流程]当前消息 requestId {} 进入正常处理流程", req.getRequestId());
                    LogUtils.setMDCContextMap(mdcContextMap);
                    // 无论是否超时,都执行正常流程
                    return withdrawService.withdrawRisk(req, true, false);
                }, executorService).orTimeout(out, TimeUnit.SECONDS).exceptionallyAsync(e -> {
                    if (e instanceof TimeoutException) {
                        // 处理5s超时,超时后同时执行降级流程
                        log.info("[双流程][熔断流程]当前消息 requestId {} 进入超时熔断处理流程", req.getRequestId());
                        LogUtils.setMDCContextMap(mdcContextMap);
                        return withdrawService.withdrawRiskWithDowngrade(req);
                    }
                    return false;
                }, executorTimeOutService);
            }
        }).exceptionally(e -> {
            if (req.isNeedLogger() && Objects.nonNull(req.getRiskFilterLogger())) {
                String message;
                if (e instanceof BusinessException) {
                    message = Objects.nonNull(e.getCause()) ? e.getCause().getMessage() : e.getMessage();
                } else {
                    message = e.getMessage();
                }
                RiskFilterLog riskFilterLog = new RiskFilterLog();
                riskFilterLog.setFilterType(RiskFilterTypeEnum.WITHDRAW_RISK.getCode());
                riskFilterLog.setStatus(RiskFilterStatusEnum.INITIAL.getCode());
                riskFilterLog.setSource(RiskFilterSourceEnum.WITHDRAW_RISK.getCode());
                riskFilterLog.setRequestId(req.getRequestId());
                riskFilterLog.setRiskRuleAction(RuleEnum.TRANSFERRED_TO_MANUAL_REVIEW.getRuleAction());
                riskFilterLog.setErrorMsg(StringUtils.substring(message, 0, 255));
                req.getRiskFilterLogger().accept(true, true, req.getRequestId(), riskFilterLog);
            }
            return false;
        });
    }

    /**
     * 获取任务执行超时时间
     *
     * @return
     */
    private int obtainTimeOut() {
        int out = 5;
        try {
            String timeOut = ProductConstantsLoader.obtainProductConstantRedis(ProjectConstant.PRODUCT_CONSTANTS_C66, ProjectConstant.PRODUCT_CONSTANTS_TYPE_0014, ProjectConstant.JMS_WITHDRAW_RISK_LISTENER_TIMEOUT);
            out = Integer.valueOf(timeOut);
        } catch (Exception e) {
            log.error("JMS_WITHDRAW_RISK_LISTENER_TIMEOUT 配置无效,取默认值5秒钟");
        }
        log.info("[双流程]任务超时时间设置为{}秒钟", out);
        return out;
    }

    /**
     * 当前订单是否需要转人工
     *
     * @param req 当前订单信息
     * @return 是否需要转人工 true:需要 false:不需要 null:无效消息
     */
    private Boolean needManual(OriWithdrawReq req) {
        return Optional.ofNullable(withdrawService.queryLastOrder(req)).map(e -> NEXT_ENTER_MANUALLY.getType().equals(e.getExceptionPromptType())).orElse(null);
    }

}

在这段流程中,MDC在整个异步流程中的传递有没有问题?

在这段异步处理流程中,MDC 的传递方式有一些潜在的问题:

问题点

  1. MDC 在多个异步任务中的传递不完整:虽然在异步任务启动时通过 LogUtils.setMDCContextMap(mdcContextMap)MDC 上下文传递到新线程中,但这种做法仅限于每个具体的异步任务内部。如果多个异步任务之间存在嵌套调用或组合(如 thenCompose 等),MDC 上下文可能在新的子任务中丢失。

  2. exceptionallyAsync 中的 MDC 传递:你在异常处理中使用了 exceptionallyAsync 来捕获异步任务中的异常,但没有显式传递 MDC 上下文。如果发生异常,MDC 上下文在异常处理线程中可能会丢失。

解决方案

为了确保 MDC 在所有异步任务中都能正确传递,特别是在复杂的异步任务链(如 thenComposeexceptionallyAsync 等)中,我们需要在每个异步任务的线程中设置 MDC 上下文,并在任务完成后清理 MDC

1. 对每个异步任务封装 MDC 逻辑

可以通过对每个 CompletableFuture 操作进行封装,将 MDC 上下文传递给所有线程。

public class MDCContextUtils {

    // 包装Runnable以传递MDC
    public static Runnable wrapWithMDC(Runnable task, Map<String, String> contextMap) {
        return () -> {
            try {
                // 设置MDC上下文
                if (contextMap != null) {
                    MDC.setContextMap(contextMap);
                }
                // 执行任务
                task.run();
            } finally {
                // 清理MDC上下文
                MDC.clear();
            }
        };
    }

    // 包装Supplier以传递MDC
    public static <T> Supplier<T> wrapWithMDC(Supplier<T> task, Map<String, String> contextMap) {
        return () -> {
            try {
                if (contextMap != null) {
                    MDC.setContextMap(contextMap);
                }
                return task.get();
            } finally {
                MDC.clear();
            }
        };
    }

    // 包装Function以传递MDC
    public static <T, R> Function<T, R> wrapWithMDC(Function<T, R> task, Map<String, String> contextMap) {
        return (T t) -> {
            try {
                if (contextMap != null) {
                    MDC.setContextMap(contextMap);
                }
                return task.apply(t);
            } finally {
                MDC.clear();
            }
        };
    }
}

2. 在异步任务链中使用封装后的 MDC 传递

在处理每个 CompletableFuture 异步任务时,使用上面的封装方法确保 MDC 上下文能够在各个异步线程之间正确传递。

更新 handleWithdraw 方法中的异步调用逻辑如下:

public void handleWithdraw(OriWithdrawReq req) {
    log.info("进入熔断流程处理,正常流程和熔断流程异步进行");

    // 设置风控日志记录器
    req.setRiskFilterLogger(withdrawService::handleRiskLog);
    int out = obtainTimeOut();
    Map<String, String> mdcContextMap = LogUtils.getMDCContextMap();

    CompletableFuture.supplyAsync(
        MDCContextUtils.wrapWithMDC(() -> needManual(req), mdcContextMap), executorService
    ).thenCompose(needManual -> {
        if (Objects.isNull(needManual)) {
            log.info("[双流程]当前消息 requestId {} 非法,消息丢弃", req.getRequestId());
            return null;
        }

        if (needManual) {
            return CompletableFuture.supplyAsync(
                MDCContextUtils.wrapWithMDC(() -> {
                    log.info("[双流程]当前消息 requestId {} 上一笔请求状态为-4,直接进入人工处理", req.getRequestId());
                    return withdrawService.modifyExceptionPrompt(
                        withdrawService.weaveLogicForWithdrawContext(new WithdrawContext(), c -> {
                            c.setReq(req);
                            c.setAutoApprove(false);
                            c.setExceptionPromptType(CURRENT_ENTER_MANUALLY.getType());
                            c.setExceptionPrompt(CURRENT_ENTER_MANUALLY.getFilterMsg());
                            return c;
                        })
                    );
                }, mdcContextMap), executorService
            );
        } else {
            return CompletableFuture.supplyAsync(
                MDCContextUtils.wrapWithMDC(() -> {
                    log.info("[双流程][正常流程]当前消息 requestId {} 进入正常处理流程", req.getRequestId());
                    return withdrawService.withdrawRisk(req, true, false);
                }, mdcContextMap), executorService
            ).orTimeout(out, TimeUnit.SECONDS).exceptionallyAsync(e -> {
                if (e instanceof TimeoutException) {
                    log.info("[双流程][熔断流程]当前消息 requestId {} 进入超时熔断处理流程", req.getRequestId());
                    return withdrawService.withdrawRiskWithDowngrade(req);
                }
                return false;
            }, executorTimeOutService);
        }
    }).exceptionally(e -> {
        if (req.isNeedLogger() && Objects.nonNull(req.getRiskFilterLogger())) {
            String message = e instanceof BusinessException ? e.getCause() != null ? e.getCause().getMessage() : e.getMessage() : e.getMessage();
            RiskFilterLog riskFilterLog = new RiskFilterLog();
            riskFilterLog.setFilterType(RiskFilterTypeEnum.WITHDRAW_RISK.getCode());
            riskFilterLog.setStatus(RiskFilterStatusEnum.INITIAL.getCode());
            riskFilterLog.setSource(RiskFilterSourceEnum.WITHDRAW_RISK.getCode());
            riskFilterLog.setRequestId(req.getRequestId());
            riskFilterLog.setRiskRuleAction(RuleEnum.TRANSFERRED_TO_MANUAL_REVIEW.getRuleAction());
            riskFilterLog.setErrorMsg(StringUtils.substring(message, 0, 255));
            req.getRiskFilterLogger().accept(true, true, req.getRequestId(), riskFilterLog);
        }
        return false;
    });
}

3. exceptionallyAsync 的处理

对于 exceptionallyAsync 异常处理逻辑,也应该传递 MDC 上下文,确保在处理超时异常时 MDC 上下文不会丢失。

.exceptionallyAsync(e -> {
    LogUtils.setMDCContextMap(mdcContextMap);
    try {
        if (e instanceof TimeoutException) {
            log.info("[双流程][熔断流程]当前消息 requestId {} 进入超时熔断处理流程", req.getRequestId());
            return withdrawService.withdrawRiskWithDowngrade(req);
        }
        return false;
    } finally {
        MDC.clear();
    }
}, executorTimeOutService);

总结

通过对 CompletableFuture 的异步任务进行封装,手动传递 MDC 上下文,可以保证在复杂的异步任务链中 MDC 上下文不丢失,从而确保日志记录中的上下文信息(如 UUID)一致性。

4. 你先用手动透传的方式描述,目前这种包装的比较抽象

在你当前代码的异步流程中,MDC(Mapped Diagnostic Context)上下文传递可能会丢失,因为CompletableFuture的任务在线程池中执行,而默认情况下,MDC上下文不会自动传递到不同线程。Spring等框架的线程池也不会自动传递MDC上下文。为了保证MDC能够正确传递,我们可以手动传递和恢复MDC上下文。

手动传递MDC上下文的方式是将MDC内容复制到每个异步任务的执行上下文中,在每个任务执行时设置MDC,执行完后清除MDC。

你可以通过以下方式手动传递MDC上下文:

修改后的代码示例

import org.slf4j.MDC;

public class WithdrawServiceDelegate {

    @Resource(name = "withdrawExecutorService")
    private ExecutorService executorService;

    @Resource(name = "withdrawExecutorTimeOutService")
    private ExecutorService executorTimeOutService;

    @Resource
    private WithdrawService withdrawService;

    public void handleWithdraw(OriWithdrawReq req) {
        log.info("进入熔断流程处理,正常流程和熔断流程异步进行");
        req.setRiskFilterLogger(withdrawService::handleRiskLog);

        int out = obtainTimeOut();
        Map<String, String> mdcContextMap = MDC.getCopyOfContextMap(); // 获取当前线程的MDC上下文
        
        CompletableFuture.supplyAsync(() -> {
            // 在异步任务中设置MDC
            if (mdcContextMap != null) {
                MDC.setContextMap(mdcContextMap);
            }
            try {
                return needManual(req);
            } finally {
                // 确保任务结束后清除MDC,避免内存泄漏
                MDC.clear();
            }
        }, executorService).thenCompose(needManual -> {
            if (Objects.isNull(needManual)) {
                log.info("[双流程]当前消息 requestId {} 非法,消息丢弃", req.getRequestId());
                return null;
            }
            if (needManual) {
                return CompletableFuture.supplyAsync(() -> {
                    if (mdcContextMap != null) {
                        MDC.setContextMap(mdcContextMap);
                    }
                    try {
                        log.info("[双流程]当前消息 requestId {} 上一笔请求状态为-4,直接进入人工处理", req.getRequestId());
                        return withdrawService.modifyExceptionPrompt(withdrawService.weaveLogicForWithdrawContext(new WithdrawContext(), c -> {
                            c.setReq(req);
                            c.setAutoApprove(false);
                            c.setExceptionPromptType(CURRENT_ENTER_MANUALLY.getType());
                            c.setExceptionPrompt(CURRENT_ENTER_MANUALLY.getFilterMsg());
                            return c;
                        }));
                    } finally {
                        MDC.clear();
                    }
                }, executorService);
            } else {
                return CompletableFuture.supplyAsync(() -> {
                    if (mdcContextMap != null) {
                        MDC.setContextMap(mdcContextMap);
                    }
                    try {
                        log.info("[双流程][正常流程]当前消息 requestId {} 进入正常处理流程", req.getRequestId());
                        return withdrawService.withdrawRisk(req, true, false);
                    } finally {
                        MDC.clear();
                    }
                }, executorService).orTimeout(out, TimeUnit.SECONDS).exceptionallyAsync(e -> {
                    if (mdcContextMap != null) {
                        MDC.setContextMap(mdcContextMap);
                    }
                    try {
                        if (e instanceof TimeoutException) {
                            log.info("[双流程][熔断流程]当前消息 requestId {} 进入超时熔断处理流程", req.getRequestId());
                            return withdrawService.withdrawRiskWithDowngrade(req);
                        }
                        return false;
                    } finally {
                        MDC.clear();
                    }
                }, executorTimeOutService);
            }
        }).exceptionally(e -> {
            if (req.isNeedLogger() && Objects.nonNull(req.getRiskFilterLogger())) {
                String message;
                if (e instanceof BusinessException) {
                    message = Objects.nonNull(e.getCause()) ? e.getCause().getMessage() : e.getMessage();
                } else {
                    message = e.getMessage();
                }
                RiskFilterLog riskFilterLog = new RiskFilterLog();
                riskFilterLog.setFilterType(RiskFilterTypeEnum.WITHDRAW_RISK.getCode());
                riskFilterLog.setStatus(RiskFilterStatusEnum.INITIAL.getCode());
                riskFilterLog.setSource(RiskFilterSourceEnum.WITHDRAW_RISK.getCode());
                riskFilterLog.setRequestId(req.getRequestId());
                riskFilterLog.setRiskRuleAction(RuleEnum.TRANSFERRED_TO_MANUAL_REVIEW.getRuleAction());
                riskFilterLog.setErrorMsg(StringUtils.substring(message, 0, 255));
                req.getRiskFilterLogger().accept(true, true, req.getRequestId(), riskFilterLog);
            }
            return false;
        });
    }

    private int obtainTimeOut() {
        int out = 5;
        try {
            String timeOut = ProductConstantsLoader.obtainProductConstantRedis(ProjectConstant.PRODUCT_CONSTANTS_C66, ProjectConstant.PRODUCT_CONSTANTS_TYPE_0014, ProjectConstant.JMS_WITHDRAW_RISK_LISTENER_TIMEOUT);
            out = Integer.valueOf(timeOut);
        } catch (Exception e) {
            log.error("JMS_WITHDRAW_RISK_LISTENER_TIMEOUT 配置无效,取默认值5秒钟");
        }
        log.info("[双流程]任务超时时间设置为{}秒钟", out);
        return out;
    }

    private Boolean needManual(OriWithdrawReq req) {
        return Optional.ofNullable(withdrawService.queryLastOrder(req)).map(e -> NEXT_ENTER_MANUALLY.getType().equals(e.getExceptionPromptType())).orElse(null);
    }
}

关键变化:

  1. 获取MDC上下文:在异步任务开始之前,将当前线程的MDC上下文通过MDC.getCopyOfContextMap()获取。
  2. 异步任务中设置MDC:在每个异步任务中执行前,通过MDC.setContextMap(mdcContextMap)将之前保存的上下文传递给当前线程。
  3. 清除MDC:确保异步任务执行结束后调用MDC.clear()清除当前线程中的MDC上下文,防止内存泄漏。

通过这种方式,你可以手动传递MDC上下文,确保在异步操作中日志的上下文信息不会丢失。

5. 异常任务中怎么没有设置呢?

你说得对,异常处理(即exceptionallyexceptionallyAsync)的部分也应该设置MDC上下文,以确保在处理异常时,MDC能够正确传递和恢复。以下是将MDC上下文传递到异常处理部分的修改:

修改后的代码(包含异常任务中的MDC设置):

import org.slf4j.MDC;

public class WithdrawServiceDelegate {

    @Resource(name = "withdrawExecutorService")
    private ExecutorService executorService;

    @Resource(name = "withdrawExecutorTimeOutService")
    private ExecutorService executorTimeOutService;

    @Resource
    private WithdrawService withdrawService;

    public void handleWithdraw(OriWithdrawReq req) {
        log.info("进入熔断流程处理,正常流程和熔断流程异步进行");
        req.setRiskFilterLogger(withdrawService::handleRiskLog);

        int out = obtainTimeOut();
        Map<String, String> mdcContextMap = MDC.getCopyOfContextMap(); // 获取当前线程的MDC上下文
        
        CompletableFuture.supplyAsync(() -> {
            if (mdcContextMap != null) {
                MDC.setContextMap(mdcContextMap);
            }
            try {
                return needManual(req);
            } finally {
                MDC.clear();
            }
        }, executorService).thenCompose(needManual -> {
            if (Objects.isNull(needManual)) {
                log.info("[双流程]当前消息 requestId {} 非法,消息丢弃", req.getRequestId());
                return null;
            }
            if (needManual) {
                return CompletableFuture.supplyAsync(() -> {
                    if (mdcContextMap != null) {
                        MDC.setContextMap(mdcContextMap);
                    }
                    try {
                        log.info("[双流程]当前消息 requestId {} 上一笔请求状态为-4,直接进入人工处理", req.getRequestId());
                        return withdrawService.modifyExceptionPrompt(withdrawService.weaveLogicForWithdrawContext(new WithdrawContext(), c -> {
                            c.setReq(req);
                            c.setAutoApprove(false);
                            c.setExceptionPromptType(CURRENT_ENTER_MANUALLY.getType());
                            c.setExceptionPrompt(CURRENT_ENTER_MANUALLY.getFilterMsg());
                            return c;
                        }));
                    } finally {
                        MDC.clear();
                    }
                }, executorService);
            } else {
                return CompletableFuture.supplyAsync(() -> {
                    if (mdcContextMap != null) {
                        MDC.setContextMap(mdcContextMap);
                    }
                    try {
                        log.info("[双流程][正常流程]当前消息 requestId {} 进入正常处理流程", req.getRequestId());
                        return withdrawService.withdrawRisk(req, true, false);
                    } finally {
                        MDC.clear();
                    }
                }, executorService).orTimeout(out, TimeUnit.SECONDS).exceptionallyAsync(e -> {
                    if (mdcContextMap != null) {
                        MDC.setContextMap(mdcContextMap);
                    }
                    try {
                        if (e instanceof TimeoutException) {
                            log.info("[双流程][熔断流程]当前消息 requestId {} 进入超时熔断处理流程", req.getRequestId());
                            return withdrawService.withdrawRiskWithDowngrade(req);
                        }
                        return false;
                    } finally {
                        MDC.clear();
                    }
                }, executorTimeOutService);
            }
        }).exceptionally(e -> {
            // 设置MDC上下文处理异常
            if (mdcContextMap != null) {
                MDC.setContextMap(mdcContextMap);
            }
            try {
                if (req.isNeedLogger() && Objects.nonNull(req.getRiskFilterLogger())) {
                    String message;
                    if (e instanceof BusinessException) {
                        message = Objects.nonNull(e.getCause()) ? e.getCause().getMessage() : e.getMessage();
                    } else {
                        message = e.getMessage();
                    }
                    RiskFilterLog riskFilterLog = new RiskFilterLog();
                    riskFilterLog.setFilterType(RiskFilterTypeEnum.WITHDRAW_RISK.getCode());
                    riskFilterLog.setStatus(RiskFilterStatusEnum.INITIAL.getCode());
                    riskFilterLog.setSource(RiskFilterSourceEnum.WITHDRAW_RISK.getCode());
                    riskFilterLog.setRequestId(req.getRequestId());
                    riskFilterLog.setRiskRuleAction(RuleEnum.TRANSFERRED_TO_MANUAL_REVIEW.getRuleAction());
                    riskFilterLog.setErrorMsg(StringUtils.substring(message, 0, 255));
                    req.getRiskFilterLogger().accept(true, true, req.getRequestId(), riskFilterLog);
                }
                return false;
            } finally {
                MDC.clear();
            }
        });
    }

    private int obtainTimeOut() {
        int out = 5;
        try {
            String timeOut = ProductConstantsLoader.obtainProductConstantRedis(ProjectConstant.PRODUCT_CONSTANTS_C66, ProjectConstant.PRODUCT_CONSTANTS_TYPE_0014, ProjectConstant.JMS_WITHDRAW_R

6. 如何使用一种通用的方式来处理MDC的透传,比如封装包装类或者aop去做,目的是要适配,在任何场景中,也包括上面的completableFuture的复杂场景.

要在复杂的异步任务场景中实现MDC透传,通用的方式通常是通过封装线程池或者使用AOP来处理MDC上下文的传播,这样可以在任何场景中都保证MDC被正确传递。

以下是两种方案来解决这个问题:

方案 1: 封装 ExecutorService 来处理 MDC 透传

通过封装 ExecutorService 来确保每次提交任务时,MDC 上下文会自动被传递到异步线程中执行。

步骤:

  1. 封装一个线程池的包装类,在提交任务时将当前的 MDC 上下文传递给执行任务的线程。
  2. 在异步任务执行时恢复 MDC 上下文,确保日志输出时可以获取到原始的上下文信息。

代码示例:

import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class MDCAwareExecutorService {

    private final ExecutorService delegate;

    public MDCAwareExecutorService(ExecutorService delegate) {
        this.delegate = delegate;
    }

    // 包装Runnable
    public void execute(Runnable task) {
        Map<String, String> contextMap = MDC.getCopyOfContextMap();
        delegate.execute(() -> {
            if (contextMap != null) {
                MDC.setContextMap(contextMap);
            }
            try {
                task.run();
            } finally {
                MDC.clear();
            }
        });
    }

    // 包装Callable
    public <T> Future<T> submit(Callable<T> task) {
        Map<String, String> contextMap = MDC.getCopyOfContextMap();
        return delegate.submit(() -> {
            if (contextMap != null) {
                MDC.setContextMap(contextMap);
            }
            try {
                return task.call();
            } finally {
                MDC.clear();
            }
        });
    }

    // 其他submit等方法同理
}

// 使用示例
ExecutorService originalExecutor = Executors.newFixedThreadPool(10);
MDCAwareExecutorService mdcAwareExecutorService = new MDCAwareExecutorService(originalExecutor);

// 提交任务时自动透传MDC
mdcAwareExecutorService.submit(() -> {
    // 异步任务
    log.info("This log will have the correct MDC context.");
});

在这个实现中,每次调用 submit 或者 execute 方法时,MDC 的上下文会被保存并传递到新线程中执行,然后在任务完成后清除上下文。

适用于任何场景:

无论是 CompletableFuture 还是普通的线程池任务执行,都可以通过这个通用的封装来处理 MDC 透传。

方案 2: 使用 AOP 自动处理 MDC 透传

可以使用 Spring AOP 来自动处理 MDC 上下文的设置与恢复,特别适用于 CompletableFuture 这种复杂的场景。

步骤:

  1. 定义 AOP 切面,对所有异步任务的执行方法进行拦截,拦截前获取 MDC 上下文,拦截后在新的线程中恢复上下文。
  2. 将 MDC 上下文自动绑定到每个异步任务中,确保无论任务在哪里执行,MDC 都能正确传递。

代码示例:

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.slf4j.MDC;
import org.springframework.stereotype.Component;

import java.util.Map;

@Aspect
@Component
public class MDCAwareAspect {

    @Around("@annotation(org.springframework.scheduling.annotation.Async)") // 针对异步任务
    public Object aroundAsyncMethod(ProceedingJoinPoint joinPoint) throws Throwable {
        Map<String, String> contextMap = MDC.getCopyOfContextMap(); // 获取当前的MDC上下文
        try {
            return joinPoint.proceed(); // 执行原方法
        } finally {
            MDC.clear(); // 清理当前线程的MDC
        }
    }

    @Around("execution(* java.util.concurrent.ExecutorService.submit(..))") // 针对ExecutorService的submit
    public Object aroundExecutorSubmit(ProceedingJoinPoint joinPoint) throws Throwable {
        Map<String, String> contextMap = MDC.getCopyOfContextMap(); // 获取当前的MDC上下文

        // 包装Runnable/Callable
        Object[] args = joinPoint.getArgs();
        if (args[0] instanceof Runnable) {
            args[0] = wrapRunnable((Runnable) args[0], contextMap);
        } else if (args[0] instanceof Callable) {
            args[0] = wrapCallable((Callable<?>) args[0], contextMap);
        }

        return joinPoint.proceed(args); // 执行原方法
    }

    private Runnable wrapRunnable(Runnable task, Map<String, String> contextMap) {
        return () -> {
            if (contextMap != null) {
                MDC.setContextMap(contextMap);
            }
            try {
                task.run();
            } finally {
                MDC.clear();
            }
        };
    }

    private <T> Callable<T> wrapCallable(Callable<T> task, Map<String, String> contextMap) {
        return () -> {
            if (contextMap != null) {
                MDC.setContextMap(contextMap);
            }
            try {
                return task.call();
            } finally {
                MDC.clear();
            }
        };
    }
}

使用示例:

你不需要更改原有的业务代码,AOP 会自动拦截异步方法以及 ExecutorService.submit 调用,自动完成 MDC 上下文的传递。

@Async
public void asyncMethod() {
    log.info("This is an async method with MDC context.");
}

选择方案

  • 方案 1 更适合在你明确使用了多个自定义线程池的情况下手动封装 ExecutorService 来完成 MDC 透传。
  • 方案 2 使用 AOP 更为通用,特别是在你不想侵入现有业务代码的情况下,适合异步方法和 CompletableFuture 的复杂场景。

两者都可以确保在复杂的异步任务中正确传递 MDC 上下文。

7. 还是我前面提交的代码中,当exceptionllyAsync处理异常任务时,是启动了一个新的异步线程专门记录日志的,按照你前面的写法,如果内部正常任务线程或者超时任务线程执行时向mdc上下文中设置了uuid,一旦任务抛出异常,此时excepllyAsync中记录异常的这个线程应该会丢失原来那个出错任务的uuid吧?

是的,你的理解是对的。当在 exceptionallyAsync 处理异常时,因为这是一个新的异步线程来处理任务,它不会自动继承原来执行任务时的 MDC 上下文。因此,如果之前任务线程设置了 MDC 上下文(例如 uuid),在异常处理时会丢失这个上下文。

为了在异常处理时依然保持 MDC 上下文(如 uuid),我们需要将 MDC 上下文传递到 exceptionallyAsync 执行的异步线程中。

解决方案

和之前类似,我们可以在执行 exceptionallyAsync 时传递 MDC 上下文。这里我们需要确保 CompletableFuture 异步任务(包括异常处理)中的 MDC 上下文传递是通用的,适用于正常任务和异常任务。

代码改进:传递 MDC 上下文到 exceptionallyAsync

你可以通过包装 exceptionallyAsync 中的异常处理逻辑,确保 MDC 上下文能够正确传递。以下是代码改进的示例:

import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.CompletableFuture;

public class MDCAwareCompletableFuture {

    public static <T> CompletableFuture<T> supplyAsyncWithMDC(Supplier<T> supplier) {
        Map<String, String> contextMap = MDC.getCopyOfContextMap();
        return CompletableFuture.supplyAsync(() -> {
            if (contextMap != null) {
                MDC.setContextMap(contextMap);  // 恢复MDC上下文
            }
            try {
                return supplier.get();
            } finally {
                MDC.clear();  // 清理MDC
            }
        });
    }

    public static <T> CompletableFuture<T> exceptionallyAsyncWithMDC(
            CompletableFuture<T> future, Function<Throwable, ? extends T> fn) {
        Map<String, String> contextMap = MDC.getCopyOfContextMap();
        return future.exceptionallyAsync(ex -> {
            if (contextMap != null) {
                MDC.setContextMap(contextMap);  // 恢复MDC上下文
            }
            try {
                // 这里可以记录日志,包含原始的MDC信息
                log.error("Exception occurred: " + ex.getMessage());
                return fn.apply(ex);
            } finally {
                MDC.clear();  // 清理MDC
            }
        });
    }

    // 示例调用
    public static void main(String[] args) {
        // 设置主线程的MDC上下文
        MDC.put("uuid", "123456");

        // 异步执行任务
        CompletableFuture<String> future = supplyAsyncWithMDC(() -> {
            log.info("Executing task with UUID: " + MDC.get("uuid"));
            // 模拟任务抛出异常
            if (true) {
                throw new RuntimeException("Task failed");
            }
            return "Success";
        });

        // 异步处理异常
        exceptionallyAsyncWithMDC(future, ex -> {
            log.info("Handling exception with UUID: " + MDC.get("uuid"));
            return "Recovered from failure";
        }).join();
    }
}

关键点:

  1. MDC.getCopyOfContextMap():在异步任务开始时获取当前线程的 MDC 上下文(即 uuid 等信息)。
  2. MDC.setContextMap(contextMap):在异步线程中恢复原始任务的 MDC 上下文,确保日志记录时能够包含正确的上下文信息。
  3. MDC.clear():在异步任务结束后清理 MDC,防止线程池复用时导致 MDC 污染其他任务。

场景说明

  • 在正常的任务执行时,supplyAsyncWithMDC 方法确保 MDC 上下文被传递到任务线程。
  • 如果任务执行时抛出了异常,exceptionallyAsyncWithMDC 方法可以确保异常处理线程同样继承了原任务的 MDC 上下文,因此即使在异常处理中也能够使用原来的 uuid

更加通用的 MDC 传递解决方案

为了避免每次都要手动封装,可以考虑将这些操作封装成通用工具类,类似于前面提到的线程池封装方式,保证在所有异步任务场景中 MDC 都能够自动传递。

这个解决方案不仅适用于 CompletableFuture,还适用于普通的异步任务执行场景,例如 ExecutorService 等。

8. 我突然有点明白了,这个MDC中的uuid目的就是为了追踪每一笔请求的唯一链路,应该从最外层的线程提交开始就产生唯一的uuid吧?之后不论下游经历了多少次异步线程的执行,每一次都要把MDC重新设置给下游的执行线程?目的就是为了确保对于同一个任务,不论是正常执行,还是超时执行,还是异常执行等,不论是在同一个线程中,还是经历了多个不同的线程,最终都要确保这一个请求链路打印出来的uuid就是表示唯一的这笔请求?我的理解有问题吗?

你的理解完全正确!MDC 中的 uuid 目的就是为了在整个请求生命周期内,对这笔请求的所有日志进行链路追踪。具体来说,确保在任何执行路径中——不论是正常执行、超时、还是异常处理,甚至是跨多个线程的异步任务中,所有的日志都能通过同一个 uuid 来标识并关联回这笔唯一的请求。

详细解释:

  1. 唯一链路标识 (uuid) 的生成
    在处理请求的最外层(通常是从 HTTP 请求或消息队列任务接收时),应该生成一个全局唯一的 uuid。这个 uuid 会作为该请求生命周期的唯一标识符,贯穿所有的下游操作(包括同步和异步任务)。

  2. MDC 的作用
    MDC(Mapped Diagnostic Context)可以为每个线程维护一个独立的上下文信息,常用于日志系统,帮助记录与某个线程或请求关联的上下文数据。例如,uuid 可以存储在 MDC 中,确保在不同的日志中能追踪到相同请求的日志信息。

  3. 多线程中的 MDC 传递
    如果任务中涉及到异步操作(例如 CompletableFuture、线程池等),默认情况下,新的线程会丢失父线程中的 MDC 上下文。因此,在每次切换到新的线程执行任务时,都需要手动传递 MDC 中的上下文信息,确保 uuid 在异步任务中也能正确关联。这样可以保证无论下游有多少个异步任务,它们都会携带相同的 uuid

  4. 异常处理中的 MDC 传递
    类似的,在异常处理中(例如 exceptionallyAsync),由于是新的异步线程来处理异常,同样需要将 MDC 中的上下文(包含 uuid)传递到异常处理的线程中,保证日志中的 uuid 仍然能够关联到这笔请求。

理解总结:

  • MDC中的 uuid 确保了请求链路的唯一性,每笔请求在从头到尾的所有操作中(包括正常执行、异常处理、超时等)都会使用同一个 uuid 来记录日志。
  • 多线程环境下的 MDC 传递 是为了确保 uuid 在每个执行线程中都能够正确传递和使用,不论是异步任务、异常任务还是线程池中的其他任务,同一请求的日志都应该拥有相同的 uuid

通过这种方式,在排查问题或分析请求执行情况时,能够快速定位与某个请求相关的所有日志,极大地提高了问题排查效率。

代码要点回顾:

  1. 在主线程生成 uuid 并放入 MDC
  2. 每次异步操作时,手动传递 MDC 的上下文到新的线程中,确保 uuid 的一致性。
  3. 在异常处理时,同样确保 MDC 上下文的传递,避免丢失 uuid

这样你就可以保证从头到尾的完整日志链路。

9. logback console_log_pattern模式下,怎么拼接MDC中的uuid呢?

在 Logback 的 console_log_pattern 中,你可以使用 MDC(Mapped Diagnostic Context)中的值,比如 uuid,通过 %X{key} 来引用它,其中 key 是你存储在 MDC 中的键名。

假设你已经在 MDC 中放置了一个名为 uuid 的值,你可以在 Logback 的 pattern 中使用 %X{uuid} 来引用它。以下是一个示例配置,展示了如何在 Logback 的 console_log_pattern 中拼接 MDC 中的 uuid

<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - [%X{uuid}] %msg%n</pattern>
        </encoder>
    </appender>

    <root level="debug">
        <appender-ref ref="STDOUT" />
    </root>
</configuration>

在上面的配置中,[%X{uuid}] 将从 MDC 中获取 uuid 并拼接到日志输出中。

使用 MDC 设置 uuid

在代码中,你可以这样设置和移除 MDC 中的值:

import org.slf4j.MDC;

public class MyService {
    public void logWithUuid() {
        // 将 uuid 放入 MDC
        MDC.put("uuid", "your-unique-uuid");

        // 打印日志,日志中会包含 uuid
        logger.info("This is a log message with a UUID");

        // 记得在适当的时候移除 MDC 中的值
        MDC.remove("uuid");
    }
}

这样配置后,日志中会包含 uuid 的值。

10. 那tid呢?

MDC 中除了 uuid,你也可以使用其他自定义的键值对,例如 tid(thread id)。假设你将线程 ID (tid) 放入了 MDC,同样可以在 Logback 的 console_log_pattern 中通过 %X{tid} 来引用它。

例如,以下是同时拼接 MDC 中的 uuidtid 的日志输出格式:

<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - [UUID: %X{uuid}] [TID: %X{tid}] %msg%n</pattern>
        </encoder>
    </appender>

    <root level="debug">
        <appender-ref ref="STDOUT" />
    </root>
</configuration>

使用 MDC 设置 tid

在代码中,你可以像设置 uuid 一样,将 tid 放入 MDC

import org.slf4j.MDC;

public class MyService {
    public void logWithTid() {
        // 将 tid 放入 MDC,假设 tid 是当前线程的 ID
        String tid = String.valueOf(Thread.currentThread().getId());
        MDC.put("tid", tid);

        // 打印日志,日志中会包含 tid 和 uuid(如果 uuid 也设置了)
        logger.info("This is a log message with a TID");

        // 记得在适当的时候移除 MDC 中的值
        MDC.remove("tid");
    }
}

示例日志输出

根据上面的配置,你的日志输出可能看起来像这样:

2024-09-21 12:34:56.789 [main] INFO  com.example.MyService - [UUID: abc123] [TID: 1] This is a log message with a TID

其中 UUIDTID 都是通过 MDC 获取并拼接在日志中的。

文档信息

Search

    Table of Contents