Spring线程池异步传递MDC信息

Spring线程池异步传递MDC信息,第1张

Spring线程池异步传递MDC信息

目录

1. 什么是MDC

2. 引入MDC打印步骤

2.1 pom依赖 

2.2 log4j2打印日志配置文件

3 步骤演示

3.1 单线程业务使用示例

postman查询示例

查询代码

查询日志

3.2 自定义MDC异步线程池

 自定义异步MDC线程池代码

 初始化线程池

通过注解和注入方式使用

入口代码

结果示例

3.3 包装单个线程 

包装MDCRunable 

包装MDCCallable

声明普通线程池

使用包装线程

4 总结


1. 什么是MDC

MDC,英文全称是 Mapped Diagnostic Context,含义是映射调试上下文。它是 log4j 和 logback 提供的一种方便在多线程条件下记录日志的类。MDC 可以看成是一个与当前线程绑定的哈希表,可以往其中添加键值对。

2. 引入MDC打印步骤

基于springboot项目中集成MDC。

1 引入log4j2的jar包

2 配置log4j3的配置

3 在打印日志前往MDC中塞入值

2.1 pom依赖 


    4.0.0

    
        org.springframework.boot
        spring-boot-starter-parent
        2.1.18.RELEASE
         
    

    com.example.demo
    elasticsearch
    0.0.1-SNAPSHOT
    elasticsearch
    Demo project for Spring Boot

    
        1.8
        6.8.5
        1.18.12
    

    
        
        
            org.springframework.boot
            spring-boot-starter
            
                
                    spring-boot-starter-logging
                    org.springframework.boot
                
            
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        

        
            org.elasticsearch.client
            elasticsearch-rest-high-level-client
            ${elasticsearch.client.version}
        

        
            org.elasticsearch
            elasticsearch
            ${elasticsearch.client.version}
        

        
            org.projectlombok
            lombok
            ${lombok.version}
        
        
            org.springframework.boot
            spring-boot-starter-web
        

        
         
            org.springframework.boot
            spring-boot-starter-log4j2
        
    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    


2.2 log4j2打印日志配置文件

resource目录下创建log4j.xml配置如下

 application.properties中添加:

##  这个是做es留下来的,不需要写
host.address.array==192.168.40.129:9200,192.168.40.129:9201,192.168.40.129:9202

logging.config=classpath:log4j.xml
logging.level.root=info

log4j.xml中添加: 



    
        
        
            
                [%-5p] %d %c [%X{REQUEST_UUID}] - %m%n
            
        
        
        
            
                [%-5p] %d %c %X{REQUEST_UUID} - %m%n
            
        
    
    
        
            
        
            
        
            
        
    

3 步骤演示 3.1 单线程业务使用示例

设想场景,查询根据书名查询书籍的信息。

postman查询示例

一般在header中设置属性requestId为32位随机数,以保证这次请求唯一,在后端比较好查询这次数据流。

查询代码

这里简单的从request中header获取前端传入的requestId。

当然也有更优雅的方式,比方自定义filter,controller执行前的filter中提前初始化MDC信息。

@GetMapping("/{bookName}")
    public String queryBookInfos(HttpServletRequest request,@PathVariable String bookName){
        String requestId = request.getHeader("requestId");

        if(StringUtils.isEmpty(requestId)){
            requestId=new UUIDGenerator().toString();
        }

        // 这里的key值需要与log4j.xml中的MDC传递的信息关键字一致
        MDC.put("REQUEST_UUID", requestId);

        // 模拟查询数据库打印
        log.info("开始查询名字叫{}的书籍",bookName);

        MDC.clear();

        return "已经查到书籍"+bookName;
    }
查询日志

查询的日志中有MDC的requestid信息,方便定位问题。

[DEBUG] 2022-01-24 15:46:03,560 org.springframework.web.servlet.DispatcherServlet [] - GET "/v1/book/%E4%B8%89%E5%9B%BD%E6%BC%94%E4%B9%89", parameters={}
[DEBUG] 2022-01-24 15:46:03,564 org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerMapping [] - Mapped to public java.lang.String com.example.demo.elasticsearch.controller.BookController.queryBookInfos(javax.servlet.http.HttpServletRequest,java.lang.String)
[INFO ] 2022-01-24 15:46:03,575 com.example.demo.elasticsearch.controller.BookController [123456789123456789] - 开始查询名字叫三国演义的书籍
[DEBUG] 2022-01-24 15:46:03,584 org.springframework.web.servlet.mvc.method.annotation.RequestResponseBodyMethodProcessor [] - Using 'text/plain', given [**, text/plain, *
public class MdcThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

    
    public MdcThreadPoolTaskExecutor(int corePoolSize, int maxPoolSize,
        int keepAliveTime, int queueCapacity, String poolNamePrefix) {
        setCorePoolSize(corePoolSize);
        setMaxPoolSize(maxPoolSize);
        setKeepAliveSeconds(keepAliveTime);
        setQueueCapacity(queueCapacity);
        setThreadNamePrefix(poolNamePrefix);
    }

    
    private Map getContextForTask() {
        return MDC.getCopyOfContextMap();
    }

    
    @Override
    public void execute(@NonNull Runnable command) {
        super.execute(wrap(command, getContextForTask()));
    }

    @NonNull
    @Override
    public Future submit(@NonNull Runnable task) {
        return super.submit(wrap(task, getContextForTask()));
    }

    @NonNull
    @Override
    public  Future submit(@NonNull Callable task) {
        return super.submit(wrap(task, getContextForTask()));
    }

    
    private static  Callable wrap(final Callable task, final Map context) {
        return () -> {
            if (!context.isEmpty()) {
                MDC.setContextMap(context);
            }

            try {
                return task.call();
            } finally {
                if (!context.isEmpty()) {
                    MDC.clear();
                }
            }
        };
    }

    
    private static Runnable wrap(final Runnable runnable, final Map context) {
        return () -> {
            if (!context.isEmpty()) {
                MDC.setContextMap(context);
            }

            try {
                runnable.run();
            } finally {
                if (!context.isEmpty()) {
                    MDC.clear();
                }
            }
        };
    }

}
 初始化线程池
@Configuration
public class MyConfig {

    @Bean(name = "mdcThreadPoolTaskExecutor" )
    public AsyncTaskExecutor mdcThreadPoolTaskExecutor() {
        // 这里也可以用metrics来监控线程池,上报数据到prometheus
        return new MdcThreadPoolTaskExecutor(null,3,
            5, 2, TimeUnit.SECONDS,1000,"mdcThreadPoolTaskExecutor");
    }
}
通过注解和注入方式使用

注解方式,主要使用@Async注解

@Slf4j
@Component
public class IndexService {

    @Async("mdcThreadPoolTaskExecutor")
    public void doSomething(Long  networkId){
        log.info("errptsfsdfds  {}",networkId);
    }
}

注入方式,有三种注入,这里列举一种注入。

    @Bean(name = "newMdcThreadPoolTaskExecutor" )
    public MdcThreadPoolTaskExecutor newMdcThreadPoolTaskExecutor() {
        // 这里也可以用metrics来监控线程池,上报数据到prometheus
        return new MdcThreadPoolTaskExecutor(null,3,
            5, 2, TimeUnit.SECONDS,1000,"mdcThreadPoolTaskExecutor");
    }
@Slf4j
@Component
public class IndexService {

    @Autowired
    private MdcThreadPoolTaskExecutor mdcThreadPoolTaskExecutor;

    public void doSomething(Long networkId) {
        // 如果使用这种方式,不需要自定义线程池,使用原始线程池,自定义单个线程,并在单个线程种包装MDC信息即可
        mdcThreadPoolTaskExecutor.submit(() -> {
            log.info("对  {} 做了 *** 作", networkId);
        });
    }

}
入口代码
@Slf4j
@RestController
@RequestMapping("/v1/index")
public class IndexController {

    @Autowired
    private IndexService indexService;

    @GetMapping
    public String get(HttpServletRequest request){

        String requestId = request.getHeader("requestId");

        if(StringUtils.isEmpty(requestId)){
            requestId=new UUIDGenerator().toString();
        }

        // 模拟要查询的列表,每个列表值开启了一个新的线程
        Long[] networkIds={123L,45L,67L,78L,899L,1000L};
        MDC.put("REQUEST_UUID",requestId);

        String s = MDC.get("REQUEST_UUID");
        for(Long networkId:networkIds){

            // 这里传入的reuqest+列表值,组成新的requestId,注意新的requestId长度
            MDC.put("REQUEST_UUID",s+"-"+networkId);
            indexService.doSomething(networkId);
        }

        return "HHHHH";
    }
}
结果示例

 PS:这里不探讨分布式调用链。

3.3 包装单个线程 

包装单个线程也是一种实现方式,利用构造函数可以携带额外的参数,比较灵活。

包装MDCRunable 
public class MDCRunable implements Runnable {

    private Map copyOfContextMap;

    private Runnable runnable;

    public MDCRunable(Runnable runnable) {
        this.copyOfContextMap = MDC.getCopyOfContextMap();
        this.runnable = runnable;
    }

    @Override
    public void run() {
        if (!copyOfContextMap.isEmpty()) {
            MDC.setContextMap(copyOfContextMap);
        }
        try {
            runnable.run();
        } finally {
            if (!copyOfContextMap.isEmpty()) {
                MDC.clear();
            }
        }
    }
}
包装MDCCallable
public class MDCCallable implements Callable {

    private Map copyOfContextMap;

    private Callable callable;

    public MDCCallable(Callable callable) {
        this.copyOfContextMap = MDC.getCopyOfContextMap();
        this.callable = callable;
    }

    @Override
    public Object call() throws Exception {
        if (!copyOfContextMap.isEmpty()) {
            MDC.setContextMap(copyOfContextMap);
        }
        try {
            return callable.call();
        } finally {
            if (!copyOfContextMap.isEmpty()) {
                MDC.clear();
            }
        }

    }
}
 
声明普通线程池 

  @Bean(name = "threadPoolTaskExecutor" )
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        // 这里也可以用metrics来监控线程池,上报数据到prometheus
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(22);
        threadPoolTaskExecutor.setMaxPoolSize(22);
        threadPoolTaskExecutor.setThreadNamePrefix("pool");

        return threadPoolTaskExecutor;
    }
使用包装线程
@Slf4j
@Component
public class IndexService {

    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    public void doSomething(Long networkId) {
        // 如果使用这种方式,不需要自定义线程池,使用原始线程池,自定义单个线程,并在单个线程种包装MDC信息即可
        threadPoolTaskExecutor.execute(
            new MDCRunable(() -> log.info("对  {} 做了 *** 作", networkId))
        );
    }

}
4 总结

这里不介绍分布式调用链这里的线程池调用写得有点问题,带有执行的结果应该用线程池submit,没有执行结果的直接用execute即可。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5718639.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)