1. MDC追踪日志的底层原理?
MDC(Mapped Diagnostic Context,映射诊断上下文)追踪日志的底层原理基于线程本地存储(ThreadLocal)机制来实现的。MDC 允许你为每个线程维护独立的上下文信息,并确保这些信息能在日志输出时自动添加到日志条目中,方便日志追踪。
以下是 MDC 追踪日志的底层原理:
线程局部变量(ThreadLocal): MDC 使用
ThreadLocal
来为每个线程存储一个 Map(key-value 键值对)。每个线程都有自己的独立存储,彼此之间互不干扰。MDC 会将你设置的上下文数据存储在这个线程局部变量中。日志输出时上下文自动注入: 当某个线程在执行任务时,MDC 中的上下文数据(例如,用户 ID、请求 ID 等)会随着日志记录一起输出。日志框架(如 Logback、Log4j 等)会在每次记录日志时自动从 MDC 中提取相关信息并插入日志消息的模板中。这是通过日志框架的布局(layout)实现的,通常你会在日志格式配置中通过占位符(如
%X{key}
)来引用 MDC 数据。跨线程传递 MDC: 虽然
ThreadLocal
使 MDC 在单个线程中可以正常工作,但跨线程的场景(例如在线程池或异步任务中)需要手动进行 MDC 的传递。这通常可以通过手动复制 MDC 上下文到子线程或使用工具类(如MDC.copyOfContextMap()
和MDC.setContextMap()
)来实现。很多框架(如 Spring、ExecutorService)也提供了对 MDC 的支持,以自动传播上下文。清理 MDC: 由于 MDC 的上下文是存储在线程局部变量中,因此在每次请求或任务结束时,必须手动调用
MDC.clear()
来清理上下文信息,避免数据泄漏或干扰后续的日志记录。
MDC 使用示例:
import org.slf4j.MDC;
public class LogExample {
public static void main(String[] args) {
try {
// 设置MDC上下文
MDC.put("userId", "12345");
MDC.put("requestId", "abcde");
// 记录日志
log.info("This is a log with MDC context");
} finally {
// 清理MDC上下文
MDC.clear();
}
}
}
在日志配置中可以使用如下格式:
<pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg [%X{userId}, %X{requestId}]%n</pattern>
这将确保每条日志都会输出 userId
和 requestId
的值。
总的来说,MDC 提供了一种简便的方式来在同一线程内维护上下文信息,并且通过日志框架将这些上下文与日志记录绑定在一起,方便日志追踪和排查问题。
2. 结合复杂的completableFuture中启动了很多异步线程池,该如何透传MDC呢?现在有的请求有uuid,有的请求缺失了uuid
在 CompletableFuture
等异步任务场景下,由于 MDC
基于 ThreadLocal
,默认情况下 MDC 中的上下文信息不会自动传递到异步线程池中的新线程中。因此,当你在使用 CompletableFuture
执行异步任务时,MDC 中的信息可能会丢失,导致某些请求的日志中缺少必要的上下文信息(如 UUID
)。为了解决这个问题,你需要手动实现 MDC 的上下文传递,确保异步任务中的线程也能正确获取和使用主线程的 MDC 数据。
解决方案:手动传递 MDC 上下文
我们可以通过手动传递 MDC 的上下文信息,将主线程的 MDC 数据复制到异步线程中。在执行 CompletableFuture
任务时,可以通过包装任务逻辑来实现这一点。
步骤 1: 拷贝并传递 MDC
在主线程执行 CompletableFuture
任务时,先获取主线程的 MDC 上下文副本,并将其传递给异步线程。在异步线程中,设置 MDC 上下文,并在任务执行完成后清理。
实现方法
获取当前线程的 MDC 上下文: 使用
MDC.getCopyOfContextMap()
方法可以获取当前线程的 MDC 数据副本,作为Map<String, String>
。在异步线程中设置 MDC: 使用
MDC.setContextMap()
方法将主线程中的 MDC 上下文传递给异步线程。清理 MDC 上下文: 确保在异步任务结束时清理 MDC 上下文,防止数据泄漏或影响后续任务。
示例代码
以下是一个使用 CompletableFuture
进行异步操作,并且正确传递 MDC 的示例:
import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class AsyncMDCExample {
// 包装异步任务,确保MDC上下文能在异步线程中传递
public static <T> CompletableFuture<T> supplyAsyncWithMDC(
CompletableFuture<T> future, ExecutorService executor, Map<String, String> contextMap) {
return future.supplyAsync(() -> {
try {
// 在异步线程中设置MDC上下文
if (contextMap != null) {
MDC.setContextMap(contextMap);
}
// 执行任务逻辑
return performTask();
} finally {
// 确保清理MDC上下文
MDC.clear();
}
}, executor);
}
public static String performTask() {
// 模拟任务逻辑
return "Task Result";
}
public static void main(String[] args) {
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
try {
// 设置主线程的MDC上下文
MDC.put("traceId", "12345");
MDC.put("requestId", "abcde");
// 获取主线程的MDC上下文副本
Map<String, String> contextMap = MDC.getCopyOfContextMap();
// 启动多个异步任务,透传MDC上下文
CompletableFuture<String> future1 = supplyAsyncWithMDC(
new CompletableFuture<>(), executor, contextMap);
CompletableFuture<String> future2 = supplyAsyncWithMDC(
new CompletableFuture<>(), executor, contextMap);
// 等待所有任务完成
CompletableFuture.allOf(future1, future2).join();
// 获取任务结果
System.out.println(future1.join());
System.out.println(future2.join());
} finally {
// 关闭线程池
executor.shutdown();
// 清理主线程的MDC上下文
MDC.clear();
}
}
}
步骤 2: 简化MDC传递
为简化 MDC 传递的流程,可以使用一些工具类,包装 Runnable
和 Callable
,自动处理 MDC 上下文的获取、设置和清理。例如:
import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.Callable;
public class MDCContextUtils {
// 包装Callable以传递MDC
public static <V> Callable<V> wrapWithMDC(Callable<V> task, Map<String, String> contextMap) {
return () -> {
try {
// 设置MDC上下文
if (contextMap != null) {
MDC.setContextMap(contextMap);
}
// 执行任务
return task.call();
} finally {
// 清理MDC上下文
MDC.clear();
}
};
}
// 包装Runnable以传递MDC
public static Runnable wrapWithMDC(Runnable task, Map<String, String> contextMap) {
return () -> {
try {
// 设置MDC上下文
if (contextMap != null) {
MDC.setContextMap(contextMap);
}
// 执行任务
task.run();
} finally {
// 清理MDC上下文
MDC.clear();
}
};
}
}
这样你就可以很方便地在使用 CompletableFuture
时传递 MDC 信息:
CompletableFuture<Void> future = CompletableFuture.runAsync(
MDCContextUtils.wrapWithMDC(() -> performTask(), MDC.getCopyOfContextMap()), executor);
总结
在 CompletableFuture
或其他异步场景中使用 MDC 透传,可以通过如下步骤解决:
- 使用
MDC.getCopyOfContextMap()
获取当前线程的 MDC 上下文副本。 - 使用
MDC.setContextMap()
在异步线程中设置 MDC 上下文。 - 确保在任务执行完后使用
MDC.clear()
清理上下文。
通过这些步骤,你可以确保在复杂的异步操作中,MDC 上下文不会丢失,从而保证所有请求的日志都有一致的上下文信息。
3. 有如下一段代码:
package com.riskcontrol.cron.support;
import com.riskcontrol.common.enums.RiskFilterSourceEnum;
import com.riskcontrol.common.enums.RiskFilterStatusEnum;
import com.riskcontrol.common.enums.RiskFilterTypeEnum;
import com.riskcontrol.common.enums.RuleEnum;
import com.riskcontrol.common.exception.BusinessException;
import com.riskcontrol.common.utils.LogUtils;
import com.riskcontrol.cron.constants.ProjectConstant;
import com.riskcontrol.cron.engine.OriWithdrawReq;
import com.riskcontrol.cron.engine.WithdrawContext;
import com.riskcontrol.cron.entity.RiskFilterLog;
import com.riskcontrol.cron.service.WithdrawService;
import com.riskcontrol.cron.utils.ProductConstantsLoader;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static com.riskcontrol.cron.enums.WithdrawFilterEnum.CURRENT_ENTER_MANUALLY;
import static com.riskcontrol.cron.enums.WithdrawFilterEnum.NEXT_ENTER_MANUALLY;
@Component
@Slf4j
public class WithdrawServiceDelegate {
@Resource(name = "withdrawExecutorService")
private ExecutorService executorService;
@Resource(name = "withdrawExecutorTimeOutService")
private ExecutorService executorTimeOutService;
@Resource
private WithdrawService withdrawService;
/**
* 处理取款消息
*
* @param req
*/
public void handleWithdraw(OriWithdrawReq req) {
log.info("进入熔断流程处理,正常流程和熔断流程异步进行");
// 设置风控日志记录器
req.setRiskFilterLogger(withdrawService::handleRiskLog);
int out = obtainTimeOut();
Map<String, String> mdcContextMap = LogUtils.getMDCContextMap();
CompletableFuture.supplyAsync(() -> needManual(req), executorService).thenCompose(needManual -> {
if (Objects.isNull(needManual)) {
log.info("[双流程]当前消息 requestId {} 非法,消息丢弃", req.getRequestId());
return null;
}
if (needManual) {
// 转人工处理
return CompletableFuture.supplyAsync(() -> {
log.info("[双流程]当前消息 requestId {} 上一笔请求状态为-4,直接进入人工处理", req.getRequestId());
LogUtils.setMDCContextMap(mdcContextMap);
return withdrawService.modifyExceptionPrompt(withdrawService.weaveLogicForWithdrawContext(new WithdrawContext(), c -> {
c.setReq(req);
c.setAutoApprove(false);
c.setExceptionPromptType(CURRENT_ENTER_MANUALLY.getType());
c.setExceptionPrompt(CURRENT_ENTER_MANUALLY.getFilterMsg());
return c;
}));
}, executorService);
} else {
return CompletableFuture.supplyAsync(() -> {
log.info("[双流程][正常流程]当前消息 requestId {} 进入正常处理流程", req.getRequestId());
LogUtils.setMDCContextMap(mdcContextMap);
// 无论是否超时,都执行正常流程
return withdrawService.withdrawRisk(req, true, false);
}, executorService).orTimeout(out, TimeUnit.SECONDS).exceptionallyAsync(e -> {
if (e instanceof TimeoutException) {
// 处理5s超时,超时后同时执行降级流程
log.info("[双流程][熔断流程]当前消息 requestId {} 进入超时熔断处理流程", req.getRequestId());
LogUtils.setMDCContextMap(mdcContextMap);
return withdrawService.withdrawRiskWithDowngrade(req);
}
return false;
}, executorTimeOutService);
}
}).exceptionally(e -> {
if (req.isNeedLogger() && Objects.nonNull(req.getRiskFilterLogger())) {
String message;
if (e instanceof BusinessException) {
message = Objects.nonNull(e.getCause()) ? e.getCause().getMessage() : e.getMessage();
} else {
message = e.getMessage();
}
RiskFilterLog riskFilterLog = new RiskFilterLog();
riskFilterLog.setFilterType(RiskFilterTypeEnum.WITHDRAW_RISK.getCode());
riskFilterLog.setStatus(RiskFilterStatusEnum.INITIAL.getCode());
riskFilterLog.setSource(RiskFilterSourceEnum.WITHDRAW_RISK.getCode());
riskFilterLog.setRequestId(req.getRequestId());
riskFilterLog.setRiskRuleAction(RuleEnum.TRANSFERRED_TO_MANUAL_REVIEW.getRuleAction());
riskFilterLog.setErrorMsg(StringUtils.substring(message, 0, 255));
req.getRiskFilterLogger().accept(true, true, req.getRequestId(), riskFilterLog);
}
return false;
});
}
/**
* 获取任务执行超时时间
*
* @return
*/
private int obtainTimeOut() {
int out = 5;
try {
String timeOut = ProductConstantsLoader.obtainProductConstantRedis(ProjectConstant.PRODUCT_CONSTANTS_C66, ProjectConstant.PRODUCT_CONSTANTS_TYPE_0014, ProjectConstant.JMS_WITHDRAW_RISK_LISTENER_TIMEOUT);
out = Integer.valueOf(timeOut);
} catch (Exception e) {
log.error("JMS_WITHDRAW_RISK_LISTENER_TIMEOUT 配置无效,取默认值5秒钟");
}
log.info("[双流程]任务超时时间设置为{}秒钟", out);
return out;
}
/**
* 当前订单是否需要转人工
*
* @param req 当前订单信息
* @return 是否需要转人工 true:需要 false:不需要 null:无效消息
*/
private Boolean needManual(OriWithdrawReq req) {
return Optional.ofNullable(withdrawService.queryLastOrder(req)).map(e -> NEXT_ENTER_MANUALLY.getType().equals(e.getExceptionPromptType())).orElse(null);
}
}
在这段流程中,MDC在整个异步流程中的传递有没有问题?
在这段异步处理流程中,MDC
的传递方式有一些潜在的问题:
问题点
MDC
在多个异步任务中的传递不完整:虽然在异步任务启动时通过LogUtils.setMDCContextMap(mdcContextMap)
将MDC
上下文传递到新线程中,但这种做法仅限于每个具体的异步任务内部。如果多个异步任务之间存在嵌套调用或组合(如thenCompose
等),MDC
上下文可能在新的子任务中丢失。exceptionallyAsync
中的MDC
传递:你在异常处理中使用了exceptionallyAsync
来捕获异步任务中的异常,但没有显式传递MDC
上下文。如果发生异常,MDC
上下文在异常处理线程中可能会丢失。
解决方案
为了确保 MDC
在所有异步任务中都能正确传递,特别是在复杂的异步任务链(如 thenCompose
、exceptionallyAsync
等)中,我们需要在每个异步任务的线程中设置 MDC
上下文,并在任务完成后清理 MDC
。
1. 对每个异步任务封装 MDC 逻辑
可以通过对每个 CompletableFuture
操作进行封装,将 MDC
上下文传递给所有线程。
public class MDCContextUtils {
// 包装Runnable以传递MDC
public static Runnable wrapWithMDC(Runnable task, Map<String, String> contextMap) {
return () -> {
try {
// 设置MDC上下文
if (contextMap != null) {
MDC.setContextMap(contextMap);
}
// 执行任务
task.run();
} finally {
// 清理MDC上下文
MDC.clear();
}
};
}
// 包装Supplier以传递MDC
public static <T> Supplier<T> wrapWithMDC(Supplier<T> task, Map<String, String> contextMap) {
return () -> {
try {
if (contextMap != null) {
MDC.setContextMap(contextMap);
}
return task.get();
} finally {
MDC.clear();
}
};
}
// 包装Function以传递MDC
public static <T, R> Function<T, R> wrapWithMDC(Function<T, R> task, Map<String, String> contextMap) {
return (T t) -> {
try {
if (contextMap != null) {
MDC.setContextMap(contextMap);
}
return task.apply(t);
} finally {
MDC.clear();
}
};
}
}
2. 在异步任务链中使用封装后的 MDC 传递
在处理每个 CompletableFuture
异步任务时,使用上面的封装方法确保 MDC
上下文能够在各个异步线程之间正确传递。
更新 handleWithdraw
方法中的异步调用逻辑如下:
public void handleWithdraw(OriWithdrawReq req) {
log.info("进入熔断流程处理,正常流程和熔断流程异步进行");
// 设置风控日志记录器
req.setRiskFilterLogger(withdrawService::handleRiskLog);
int out = obtainTimeOut();
Map<String, String> mdcContextMap = LogUtils.getMDCContextMap();
CompletableFuture.supplyAsync(
MDCContextUtils.wrapWithMDC(() -> needManual(req), mdcContextMap), executorService
).thenCompose(needManual -> {
if (Objects.isNull(needManual)) {
log.info("[双流程]当前消息 requestId {} 非法,消息丢弃", req.getRequestId());
return null;
}
if (needManual) {
return CompletableFuture.supplyAsync(
MDCContextUtils.wrapWithMDC(() -> {
log.info("[双流程]当前消息 requestId {} 上一笔请求状态为-4,直接进入人工处理", req.getRequestId());
return withdrawService.modifyExceptionPrompt(
withdrawService.weaveLogicForWithdrawContext(new WithdrawContext(), c -> {
c.setReq(req);
c.setAutoApprove(false);
c.setExceptionPromptType(CURRENT_ENTER_MANUALLY.getType());
c.setExceptionPrompt(CURRENT_ENTER_MANUALLY.getFilterMsg());
return c;
})
);
}, mdcContextMap), executorService
);
} else {
return CompletableFuture.supplyAsync(
MDCContextUtils.wrapWithMDC(() -> {
log.info("[双流程][正常流程]当前消息 requestId {} 进入正常处理流程", req.getRequestId());
return withdrawService.withdrawRisk(req, true, false);
}, mdcContextMap), executorService
).orTimeout(out, TimeUnit.SECONDS).exceptionallyAsync(e -> {
if (e instanceof TimeoutException) {
log.info("[双流程][熔断流程]当前消息 requestId {} 进入超时熔断处理流程", req.getRequestId());
return withdrawService.withdrawRiskWithDowngrade(req);
}
return false;
}, executorTimeOutService);
}
}).exceptionally(e -> {
if (req.isNeedLogger() && Objects.nonNull(req.getRiskFilterLogger())) {
String message = e instanceof BusinessException ? e.getCause() != null ? e.getCause().getMessage() : e.getMessage() : e.getMessage();
RiskFilterLog riskFilterLog = new RiskFilterLog();
riskFilterLog.setFilterType(RiskFilterTypeEnum.WITHDRAW_RISK.getCode());
riskFilterLog.setStatus(RiskFilterStatusEnum.INITIAL.getCode());
riskFilterLog.setSource(RiskFilterSourceEnum.WITHDRAW_RISK.getCode());
riskFilterLog.setRequestId(req.getRequestId());
riskFilterLog.setRiskRuleAction(RuleEnum.TRANSFERRED_TO_MANUAL_REVIEW.getRuleAction());
riskFilterLog.setErrorMsg(StringUtils.substring(message, 0, 255));
req.getRiskFilterLogger().accept(true, true, req.getRequestId(), riskFilterLog);
}
return false;
});
}
3. exceptionallyAsync
的处理
对于 exceptionallyAsync
异常处理逻辑,也应该传递 MDC
上下文,确保在处理超时异常时 MDC
上下文不会丢失。
.exceptionallyAsync(e -> {
LogUtils.setMDCContextMap(mdcContextMap);
try {
if (e instanceof TimeoutException) {
log.info("[双流程][熔断流程]当前消息 requestId {} 进入超时熔断处理流程", req.getRequestId());
return withdrawService.withdrawRiskWithDowngrade(req);
}
return false;
} finally {
MDC.clear();
}
}, executorTimeOutService);
总结
通过对 CompletableFuture
的异步任务进行封装,手动传递 MDC
上下文,可以保证在复杂的异步任务链中 MDC
上下文不丢失,从而确保日志记录中的上下文信息(如 UUID
)一致性。
4. 你先用手动透传的方式描述,目前这种包装的比较抽象
在你当前代码的异步流程中,MDC(Mapped Diagnostic Context)上下文传递可能会丢失,因为CompletableFuture
的任务在线程池中执行,而默认情况下,MDC上下文不会自动传递到不同线程。Spring等框架的线程池也不会自动传递MDC上下文。为了保证MDC能够正确传递,我们可以手动传递和恢复MDC上下文。
手动传递MDC上下文的方式是将MDC内容复制到每个异步任务的执行上下文中,在每个任务执行时设置MDC,执行完后清除MDC。
你可以通过以下方式手动传递MDC上下文:
修改后的代码示例
import org.slf4j.MDC;
public class WithdrawServiceDelegate {
@Resource(name = "withdrawExecutorService")
private ExecutorService executorService;
@Resource(name = "withdrawExecutorTimeOutService")
private ExecutorService executorTimeOutService;
@Resource
private WithdrawService withdrawService;
public void handleWithdraw(OriWithdrawReq req) {
log.info("进入熔断流程处理,正常流程和熔断流程异步进行");
req.setRiskFilterLogger(withdrawService::handleRiskLog);
int out = obtainTimeOut();
Map<String, String> mdcContextMap = MDC.getCopyOfContextMap(); // 获取当前线程的MDC上下文
CompletableFuture.supplyAsync(() -> {
// 在异步任务中设置MDC
if (mdcContextMap != null) {
MDC.setContextMap(mdcContextMap);
}
try {
return needManual(req);
} finally {
// 确保任务结束后清除MDC,避免内存泄漏
MDC.clear();
}
}, executorService).thenCompose(needManual -> {
if (Objects.isNull(needManual)) {
log.info("[双流程]当前消息 requestId {} 非法,消息丢弃", req.getRequestId());
return null;
}
if (needManual) {
return CompletableFuture.supplyAsync(() -> {
if (mdcContextMap != null) {
MDC.setContextMap(mdcContextMap);
}
try {
log.info("[双流程]当前消息 requestId {} 上一笔请求状态为-4,直接进入人工处理", req.getRequestId());
return withdrawService.modifyExceptionPrompt(withdrawService.weaveLogicForWithdrawContext(new WithdrawContext(), c -> {
c.setReq(req);
c.setAutoApprove(false);
c.setExceptionPromptType(CURRENT_ENTER_MANUALLY.getType());
c.setExceptionPrompt(CURRENT_ENTER_MANUALLY.getFilterMsg());
return c;
}));
} finally {
MDC.clear();
}
}, executorService);
} else {
return CompletableFuture.supplyAsync(() -> {
if (mdcContextMap != null) {
MDC.setContextMap(mdcContextMap);
}
try {
log.info("[双流程][正常流程]当前消息 requestId {} 进入正常处理流程", req.getRequestId());
return withdrawService.withdrawRisk(req, true, false);
} finally {
MDC.clear();
}
}, executorService).orTimeout(out, TimeUnit.SECONDS).exceptionallyAsync(e -> {
if (mdcContextMap != null) {
MDC.setContextMap(mdcContextMap);
}
try {
if (e instanceof TimeoutException) {
log.info("[双流程][熔断流程]当前消息 requestId {} 进入超时熔断处理流程", req.getRequestId());
return withdrawService.withdrawRiskWithDowngrade(req);
}
return false;
} finally {
MDC.clear();
}
}, executorTimeOutService);
}
}).exceptionally(e -> {
if (req.isNeedLogger() && Objects.nonNull(req.getRiskFilterLogger())) {
String message;
if (e instanceof BusinessException) {
message = Objects.nonNull(e.getCause()) ? e.getCause().getMessage() : e.getMessage();
} else {
message = e.getMessage();
}
RiskFilterLog riskFilterLog = new RiskFilterLog();
riskFilterLog.setFilterType(RiskFilterTypeEnum.WITHDRAW_RISK.getCode());
riskFilterLog.setStatus(RiskFilterStatusEnum.INITIAL.getCode());
riskFilterLog.setSource(RiskFilterSourceEnum.WITHDRAW_RISK.getCode());
riskFilterLog.setRequestId(req.getRequestId());
riskFilterLog.setRiskRuleAction(RuleEnum.TRANSFERRED_TO_MANUAL_REVIEW.getRuleAction());
riskFilterLog.setErrorMsg(StringUtils.substring(message, 0, 255));
req.getRiskFilterLogger().accept(true, true, req.getRequestId(), riskFilterLog);
}
return false;
});
}
private int obtainTimeOut() {
int out = 5;
try {
String timeOut = ProductConstantsLoader.obtainProductConstantRedis(ProjectConstant.PRODUCT_CONSTANTS_C66, ProjectConstant.PRODUCT_CONSTANTS_TYPE_0014, ProjectConstant.JMS_WITHDRAW_RISK_LISTENER_TIMEOUT);
out = Integer.valueOf(timeOut);
} catch (Exception e) {
log.error("JMS_WITHDRAW_RISK_LISTENER_TIMEOUT 配置无效,取默认值5秒钟");
}
log.info("[双流程]任务超时时间设置为{}秒钟", out);
return out;
}
private Boolean needManual(OriWithdrawReq req) {
return Optional.ofNullable(withdrawService.queryLastOrder(req)).map(e -> NEXT_ENTER_MANUALLY.getType().equals(e.getExceptionPromptType())).orElse(null);
}
}
关键变化:
- 获取MDC上下文:在异步任务开始之前,将当前线程的MDC上下文通过
MDC.getCopyOfContextMap()
获取。 - 异步任务中设置MDC:在每个异步任务中执行前,通过
MDC.setContextMap(mdcContextMap)
将之前保存的上下文传递给当前线程。 - 清除MDC:确保异步任务执行结束后调用
MDC.clear()
清除当前线程中的MDC上下文,防止内存泄漏。
通过这种方式,你可以手动传递MDC上下文,确保在异步操作中日志的上下文信息不会丢失。
5. 异常任务中怎么没有设置呢?
你说得对,异常处理(即exceptionally
和exceptionallyAsync
)的部分也应该设置MDC上下文,以确保在处理异常时,MDC能够正确传递和恢复。以下是将MDC上下文传递到异常处理部分的修改:
修改后的代码(包含异常任务中的MDC设置):
import org.slf4j.MDC;
public class WithdrawServiceDelegate {
@Resource(name = "withdrawExecutorService")
private ExecutorService executorService;
@Resource(name = "withdrawExecutorTimeOutService")
private ExecutorService executorTimeOutService;
@Resource
private WithdrawService withdrawService;
public void handleWithdraw(OriWithdrawReq req) {
log.info("进入熔断流程处理,正常流程和熔断流程异步进行");
req.setRiskFilterLogger(withdrawService::handleRiskLog);
int out = obtainTimeOut();
Map<String, String> mdcContextMap = MDC.getCopyOfContextMap(); // 获取当前线程的MDC上下文
CompletableFuture.supplyAsync(() -> {
if (mdcContextMap != null) {
MDC.setContextMap(mdcContextMap);
}
try {
return needManual(req);
} finally {
MDC.clear();
}
}, executorService).thenCompose(needManual -> {
if (Objects.isNull(needManual)) {
log.info("[双流程]当前消息 requestId {} 非法,消息丢弃", req.getRequestId());
return null;
}
if (needManual) {
return CompletableFuture.supplyAsync(() -> {
if (mdcContextMap != null) {
MDC.setContextMap(mdcContextMap);
}
try {
log.info("[双流程]当前消息 requestId {} 上一笔请求状态为-4,直接进入人工处理", req.getRequestId());
return withdrawService.modifyExceptionPrompt(withdrawService.weaveLogicForWithdrawContext(new WithdrawContext(), c -> {
c.setReq(req);
c.setAutoApprove(false);
c.setExceptionPromptType(CURRENT_ENTER_MANUALLY.getType());
c.setExceptionPrompt(CURRENT_ENTER_MANUALLY.getFilterMsg());
return c;
}));
} finally {
MDC.clear();
}
}, executorService);
} else {
return CompletableFuture.supplyAsync(() -> {
if (mdcContextMap != null) {
MDC.setContextMap(mdcContextMap);
}
try {
log.info("[双流程][正常流程]当前消息 requestId {} 进入正常处理流程", req.getRequestId());
return withdrawService.withdrawRisk(req, true, false);
} finally {
MDC.clear();
}
}, executorService).orTimeout(out, TimeUnit.SECONDS).exceptionallyAsync(e -> {
if (mdcContextMap != null) {
MDC.setContextMap(mdcContextMap);
}
try {
if (e instanceof TimeoutException) {
log.info("[双流程][熔断流程]当前消息 requestId {} 进入超时熔断处理流程", req.getRequestId());
return withdrawService.withdrawRiskWithDowngrade(req);
}
return false;
} finally {
MDC.clear();
}
}, executorTimeOutService);
}
}).exceptionally(e -> {
// 设置MDC上下文处理异常
if (mdcContextMap != null) {
MDC.setContextMap(mdcContextMap);
}
try {
if (req.isNeedLogger() && Objects.nonNull(req.getRiskFilterLogger())) {
String message;
if (e instanceof BusinessException) {
message = Objects.nonNull(e.getCause()) ? e.getCause().getMessage() : e.getMessage();
} else {
message = e.getMessage();
}
RiskFilterLog riskFilterLog = new RiskFilterLog();
riskFilterLog.setFilterType(RiskFilterTypeEnum.WITHDRAW_RISK.getCode());
riskFilterLog.setStatus(RiskFilterStatusEnum.INITIAL.getCode());
riskFilterLog.setSource(RiskFilterSourceEnum.WITHDRAW_RISK.getCode());
riskFilterLog.setRequestId(req.getRequestId());
riskFilterLog.setRiskRuleAction(RuleEnum.TRANSFERRED_TO_MANUAL_REVIEW.getRuleAction());
riskFilterLog.setErrorMsg(StringUtils.substring(message, 0, 255));
req.getRiskFilterLogger().accept(true, true, req.getRequestId(), riskFilterLog);
}
return false;
} finally {
MDC.clear();
}
});
}
private int obtainTimeOut() {
int out = 5;
try {
String timeOut = ProductConstantsLoader.obtainProductConstantRedis(ProjectConstant.PRODUCT_CONSTANTS_C66, ProjectConstant.PRODUCT_CONSTANTS_TYPE_0014, ProjectConstant.JMS_WITHDRAW_R
6. 如何使用一种通用的方式来处理MDC的透传,比如封装包装类或者aop去做,目的是要适配,在任何场景中,也包括上面的completableFuture的复杂场景.
要在复杂的异步任务场景中实现MDC透传,通用的方式通常是通过封装线程池或者使用AOP来处理MDC上下文的传播,这样可以在任何场景中都保证MDC被正确传递。
以下是两种方案来解决这个问题:
方案 1: 封装 ExecutorService
来处理 MDC 透传
通过封装 ExecutorService
来确保每次提交任务时,MDC 上下文会自动被传递到异步线程中执行。
步骤:
- 封装一个线程池的包装类,在提交任务时将当前的 MDC 上下文传递给执行任务的线程。
- 在异步任务执行时恢复 MDC 上下文,确保日志输出时可以获取到原始的上下文信息。
代码示例:
import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class MDCAwareExecutorService {
private final ExecutorService delegate;
public MDCAwareExecutorService(ExecutorService delegate) {
this.delegate = delegate;
}
// 包装Runnable
public void execute(Runnable task) {
Map<String, String> contextMap = MDC.getCopyOfContextMap();
delegate.execute(() -> {
if (contextMap != null) {
MDC.setContextMap(contextMap);
}
try {
task.run();
} finally {
MDC.clear();
}
});
}
// 包装Callable
public <T> Future<T> submit(Callable<T> task) {
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return delegate.submit(() -> {
if (contextMap != null) {
MDC.setContextMap(contextMap);
}
try {
return task.call();
} finally {
MDC.clear();
}
});
}
// 其他submit等方法同理
}
// 使用示例
ExecutorService originalExecutor = Executors.newFixedThreadPool(10);
MDCAwareExecutorService mdcAwareExecutorService = new MDCAwareExecutorService(originalExecutor);
// 提交任务时自动透传MDC
mdcAwareExecutorService.submit(() -> {
// 异步任务
log.info("This log will have the correct MDC context.");
});
在这个实现中,每次调用 submit
或者 execute
方法时,MDC 的上下文会被保存并传递到新线程中执行,然后在任务完成后清除上下文。
适用于任何场景:
无论是 CompletableFuture
还是普通的线程池任务执行,都可以通过这个通用的封装来处理 MDC 透传。
方案 2: 使用 AOP 自动处理 MDC 透传
可以使用 Spring AOP 来自动处理 MDC 上下文的设置与恢复,特别适用于 CompletableFuture
这种复杂的场景。
步骤:
- 定义 AOP 切面,对所有异步任务的执行方法进行拦截,拦截前获取 MDC 上下文,拦截后在新的线程中恢复上下文。
- 将 MDC 上下文自动绑定到每个异步任务中,确保无论任务在哪里执行,MDC 都能正确传递。
代码示例:
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.slf4j.MDC;
import org.springframework.stereotype.Component;
import java.util.Map;
@Aspect
@Component
public class MDCAwareAspect {
@Around("@annotation(org.springframework.scheduling.annotation.Async)") // 针对异步任务
public Object aroundAsyncMethod(ProceedingJoinPoint joinPoint) throws Throwable {
Map<String, String> contextMap = MDC.getCopyOfContextMap(); // 获取当前的MDC上下文
try {
return joinPoint.proceed(); // 执行原方法
} finally {
MDC.clear(); // 清理当前线程的MDC
}
}
@Around("execution(* java.util.concurrent.ExecutorService.submit(..))") // 针对ExecutorService的submit
public Object aroundExecutorSubmit(ProceedingJoinPoint joinPoint) throws Throwable {
Map<String, String> contextMap = MDC.getCopyOfContextMap(); // 获取当前的MDC上下文
// 包装Runnable/Callable
Object[] args = joinPoint.getArgs();
if (args[0] instanceof Runnable) {
args[0] = wrapRunnable((Runnable) args[0], contextMap);
} else if (args[0] instanceof Callable) {
args[0] = wrapCallable((Callable<?>) args[0], contextMap);
}
return joinPoint.proceed(args); // 执行原方法
}
private Runnable wrapRunnable(Runnable task, Map<String, String> contextMap) {
return () -> {
if (contextMap != null) {
MDC.setContextMap(contextMap);
}
try {
task.run();
} finally {
MDC.clear();
}
};
}
private <T> Callable<T> wrapCallable(Callable<T> task, Map<String, String> contextMap) {
return () -> {
if (contextMap != null) {
MDC.setContextMap(contextMap);
}
try {
return task.call();
} finally {
MDC.clear();
}
};
}
}
使用示例:
你不需要更改原有的业务代码,AOP 会自动拦截异步方法以及 ExecutorService.submit
调用,自动完成 MDC 上下文的传递。
@Async
public void asyncMethod() {
log.info("This is an async method with MDC context.");
}
选择方案
- 方案 1 更适合在你明确使用了多个自定义线程池的情况下手动封装
ExecutorService
来完成 MDC 透传。 - 方案 2 使用 AOP 更为通用,特别是在你不想侵入现有业务代码的情况下,适合异步方法和
CompletableFuture
的复杂场景。
两者都可以确保在复杂的异步任务中正确传递 MDC 上下文。
7. 还是我前面提交的代码中,当exceptionllyAsync处理异常任务时,是启动了一个新的异步线程专门记录日志的,按照你前面的写法,如果内部正常任务线程或者超时任务线程执行时向mdc上下文中设置了uuid,一旦任务抛出异常,此时excepllyAsync中记录异常的这个线程应该会丢失原来那个出错任务的uuid吧?
是的,你的理解是对的。当在 exceptionallyAsync
处理异常时,因为这是一个新的异步线程来处理任务,它不会自动继承原来执行任务时的 MDC
上下文。因此,如果之前任务线程设置了 MDC
上下文(例如 uuid
),在异常处理时会丢失这个上下文。
为了在异常处理时依然保持 MDC
上下文(如 uuid
),我们需要将 MDC
上下文传递到 exceptionallyAsync
执行的异步线程中。
解决方案
和之前类似,我们可以在执行 exceptionallyAsync
时传递 MDC
上下文。这里我们需要确保 CompletableFuture
异步任务(包括异常处理)中的 MDC
上下文传递是通用的,适用于正常任务和异常任务。
代码改进:传递 MDC
上下文到 exceptionallyAsync
你可以通过包装 exceptionallyAsync
中的异常处理逻辑,确保 MDC
上下文能够正确传递。以下是代码改进的示例:
import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
public class MDCAwareCompletableFuture {
public static <T> CompletableFuture<T> supplyAsyncWithMDC(Supplier<T> supplier) {
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return CompletableFuture.supplyAsync(() -> {
if (contextMap != null) {
MDC.setContextMap(contextMap); // 恢复MDC上下文
}
try {
return supplier.get();
} finally {
MDC.clear(); // 清理MDC
}
});
}
public static <T> CompletableFuture<T> exceptionallyAsyncWithMDC(
CompletableFuture<T> future, Function<Throwable, ? extends T> fn) {
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return future.exceptionallyAsync(ex -> {
if (contextMap != null) {
MDC.setContextMap(contextMap); // 恢复MDC上下文
}
try {
// 这里可以记录日志,包含原始的MDC信息
log.error("Exception occurred: " + ex.getMessage());
return fn.apply(ex);
} finally {
MDC.clear(); // 清理MDC
}
});
}
// 示例调用
public static void main(String[] args) {
// 设置主线程的MDC上下文
MDC.put("uuid", "123456");
// 异步执行任务
CompletableFuture<String> future = supplyAsyncWithMDC(() -> {
log.info("Executing task with UUID: " + MDC.get("uuid"));
// 模拟任务抛出异常
if (true) {
throw new RuntimeException("Task failed");
}
return "Success";
});
// 异步处理异常
exceptionallyAsyncWithMDC(future, ex -> {
log.info("Handling exception with UUID: " + MDC.get("uuid"));
return "Recovered from failure";
}).join();
}
}
关键点:
MDC.getCopyOfContextMap()
:在异步任务开始时获取当前线程的 MDC 上下文(即uuid
等信息)。MDC.setContextMap(contextMap)
:在异步线程中恢复原始任务的 MDC 上下文,确保日志记录时能够包含正确的上下文信息。MDC.clear()
:在异步任务结束后清理 MDC,防止线程池复用时导致 MDC 污染其他任务。
场景说明
- 在正常的任务执行时,
supplyAsyncWithMDC
方法确保MDC
上下文被传递到任务线程。 - 如果任务执行时抛出了异常,
exceptionallyAsyncWithMDC
方法可以确保异常处理线程同样继承了原任务的MDC
上下文,因此即使在异常处理中也能够使用原来的uuid
。
更加通用的 MDC 传递解决方案
为了避免每次都要手动封装,可以考虑将这些操作封装成通用工具类,类似于前面提到的线程池封装方式,保证在所有异步任务场景中 MDC
都能够自动传递。
这个解决方案不仅适用于 CompletableFuture
,还适用于普通的异步任务执行场景,例如 ExecutorService
等。
8. 我突然有点明白了,这个MDC中的uuid目的就是为了追踪每一笔请求的唯一链路,应该从最外层的线程提交开始就产生唯一的uuid吧?之后不论下游经历了多少次异步线程的执行,每一次都要把MDC重新设置给下游的执行线程?目的就是为了确保对于同一个任务,不论是正常执行,还是超时执行,还是异常执行等,不论是在同一个线程中,还是经历了多个不同的线程,最终都要确保这一个请求链路打印出来的uuid就是表示唯一的这笔请求?我的理解有问题吗?
你的理解完全正确!MDC
中的 uuid
目的就是为了在整个请求生命周期内,对这笔请求的所有日志进行链路追踪。具体来说,确保在任何执行路径中——不论是正常执行、超时、还是异常处理,甚至是跨多个线程的异步任务中,所有的日志都能通过同一个 uuid
来标识并关联回这笔唯一的请求。
详细解释:
唯一链路标识 (
uuid
) 的生成:
在处理请求的最外层(通常是从 HTTP 请求或消息队列任务接收时),应该生成一个全局唯一的uuid
。这个uuid
会作为该请求生命周期的唯一标识符,贯穿所有的下游操作(包括同步和异步任务)。MDC
的作用:
MDC
(Mapped Diagnostic Context)可以为每个线程维护一个独立的上下文信息,常用于日志系统,帮助记录与某个线程或请求关联的上下文数据。例如,uuid
可以存储在MDC
中,确保在不同的日志中能追踪到相同请求的日志信息。多线程中的
MDC
传递:
如果任务中涉及到异步操作(例如CompletableFuture
、线程池等),默认情况下,新的线程会丢失父线程中的MDC
上下文。因此,在每次切换到新的线程执行任务时,都需要手动传递MDC
中的上下文信息,确保uuid
在异步任务中也能正确关联。这样可以保证无论下游有多少个异步任务,它们都会携带相同的uuid
。异常处理中的
MDC
传递:
类似的,在异常处理中(例如exceptionallyAsync
),由于是新的异步线程来处理异常,同样需要将MDC
中的上下文(包含uuid
)传递到异常处理的线程中,保证日志中的uuid
仍然能够关联到这笔请求。
理解总结:
- MDC中的
uuid
确保了请求链路的唯一性,每笔请求在从头到尾的所有操作中(包括正常执行、异常处理、超时等)都会使用同一个uuid
来记录日志。 - 多线程环境下的
MDC
传递 是为了确保uuid
在每个执行线程中都能够正确传递和使用,不论是异步任务、异常任务还是线程池中的其他任务,同一请求的日志都应该拥有相同的uuid
。
通过这种方式,在排查问题或分析请求执行情况时,能够快速定位与某个请求相关的所有日志,极大地提高了问题排查效率。
代码要点回顾:
- 在主线程生成
uuid
并放入MDC
。 - 每次异步操作时,手动传递
MDC
的上下文到新的线程中,确保uuid
的一致性。 - 在异常处理时,同样确保
MDC
上下文的传递,避免丢失uuid
。
这样你就可以保证从头到尾的完整日志链路。
9. logback console_log_pattern模式下,怎么拼接MDC中的uuid呢?
在 Logback 的 console_log_pattern
中,你可以使用 MDC
(Mapped Diagnostic Context)中的值,比如 uuid
,通过 %X{key}
来引用它,其中 key
是你存储在 MDC
中的键名。
假设你已经在 MDC
中放置了一个名为 uuid
的值,你可以在 Logback 的 pattern
中使用 %X{uuid}
来引用它。以下是一个示例配置,展示了如何在 Logback 的 console_log_pattern
中拼接 MDC
中的 uuid
:
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - [%X{uuid}] %msg%n</pattern>
</encoder>
</appender>
<root level="debug">
<appender-ref ref="STDOUT" />
</root>
</configuration>
在上面的配置中,[%X{uuid}]
将从 MDC
中获取 uuid
并拼接到日志输出中。
使用 MDC
设置 uuid
在代码中,你可以这样设置和移除 MDC
中的值:
import org.slf4j.MDC;
public class MyService {
public void logWithUuid() {
// 将 uuid 放入 MDC
MDC.put("uuid", "your-unique-uuid");
// 打印日志,日志中会包含 uuid
logger.info("This is a log message with a UUID");
// 记得在适当的时候移除 MDC 中的值
MDC.remove("uuid");
}
}
这样配置后,日志中会包含 uuid
的值。
10. 那tid呢?
MDC
中除了 uuid
,你也可以使用其他自定义的键值对,例如 tid
(thread id)。假设你将线程 ID (tid
) 放入了 MDC
,同样可以在 Logback 的 console_log_pattern
中通过 %X{tid}
来引用它。
例如,以下是同时拼接 MDC
中的 uuid
和 tid
的日志输出格式:
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - [UUID: %X{uuid}] [TID: %X{tid}] %msg%n</pattern>
</encoder>
</appender>
<root level="debug">
<appender-ref ref="STDOUT" />
</root>
</configuration>
使用 MDC
设置 tid
在代码中,你可以像设置 uuid
一样,将 tid
放入 MDC
:
import org.slf4j.MDC;
public class MyService {
public void logWithTid() {
// 将 tid 放入 MDC,假设 tid 是当前线程的 ID
String tid = String.valueOf(Thread.currentThread().getId());
MDC.put("tid", tid);
// 打印日志,日志中会包含 tid 和 uuid(如果 uuid 也设置了)
logger.info("This is a log message with a TID");
// 记得在适当的时候移除 MDC 中的值
MDC.remove("tid");
}
}
示例日志输出
根据上面的配置,你的日志输出可能看起来像这样:
2024-09-21 12:34:56.789 [main] INFO com.example.MyService - [UUID: abc123] [TID: 1] This is a log message with a TID
其中 UUID
和 TID
都是通过 MDC
获取并拼接在日志中的。
文档信息
- 本文作者:Marshall