有些异步线程场景,需要我们获取主线程对应的上下文信息,否则无法进行逻辑处理
1、MDC --设置请求链路追踪的traceID,子线程知道可以进行链路追踪。
2、Request Headers 设置的特殊标识信息,存放在 TransmittableThreadLocal中,子线程需要知道相关信息。
3、自定义的一些线程上线文ThreadLocal变量
使用原生的 ExecutorService 有诸多不便,我么必须每次都手动把线程上下文的值复制进去,有很很多冗余代码
ThreadPoolTaskExecutor可以解决这个问题,是我么的代码可以稍稍优化那么一点点
package com.seed.server.config.components;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.seed.server.entity.User;
import com.seed.server.utils.UserContext;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.MDC;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author chenshang
*/
@Getter
@Component
@Slf4j
public class ThreadPoolComponent {
/**
* 通用线程池
*/
private ExecutorService commonThreadPool;
private ExecutorService changeLogThreadPool;
private ThreadPoolTaskExecutor studythreadPoolTaskExecutor;
private ThreadPoolTaskExecutor demothreadPoolTaskExecutor;
@PostConstruct
private void init() {
// 线程异常的时候的策略,打印日志
RejectedExecutionHandler handler = (r, executor) -> log.error("pool rejected, activeCount={}, corePoolSize={}, maxPoolSize={}, largestPoolSize={}, poolSize={}, queueSize={}", executor.getActiveCount(), executor.getCorePoolSize(), executor.getMaximumPoolSize(), executor.getLargestPoolSize(), executor.getPoolSize(), CollectionUtils.size(executor.getQueue()));
commonThreadPool = new ThreadPoolExecutor(5, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setNameFormat("commonThreadPool-%d").build(), handler);
changeLogThreadPool = new ThreadPoolExecutor(5, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setNameFormat("changeLogThreadPool-%d")
.build(), handler);
studythreadPoolTaskExecutor = createThreadPoolTaskExecutor(4, 8, new ThreadFactoryBuilder().setNameFormat("changeLogThreadPool-%d").build(), handler);
demothreadPoolTaskExecutor = createThreadPoolTaskExecutor(5, 10, new ThreadFactoryBuilder().setNameFormat("changeLogThreadPool-%d").build(), handler);
}
private ThreadPoolTaskExecutor createThreadPoolTaskExecutor(int corePoolSize, int maximumPoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maximumPoolSize);
executor.setThreadFactory(threadFactory);
executor.setRejectedExecutionHandler(handler);
executor.setTaskDecorator(runnable -> {
// 这里获取是mdc的上下文,也可以获取RequestContextHolder,具体根据你的业务需要操作即可
Map<String, String> mdc = MDC.getCopyOfContextMap();
User user = UserContext.getUser();
return () -> {
try {
Optional.ofNullable(mdc).ifPresent(MDC::setContextMap);
Optional.ofNullable(user).ifPresent(UserContext::setUser);
runnable.run();
} finally {
// 务必记得clear,否则可能会产生内存泄露
MDC.clear();
UserContext.clear();
}
};
});
executor.initialize();
return executor;
}
@PreDestroy
private void destroy() {
try {
commonThreadPool.shutdown();
changeLogThreadPool.shutdown();
demothreadPoolTaskExecutor.shutdown();
} catch (Exception e) {
log.error("ThreadPoolComponent.destroy shutdown error", e);
}
}
}