ThreadPoolTaskExecutor


有些异步线程场景,需要我们获取主线程对应的上下文信息,否则无法进行逻辑处理
1、MDC --设置请求链路追踪的traceID,子线程知道可以进行链路追踪。
2、Request Headers 设置的特殊标识信息,存放在 TransmittableThreadLocal中,子线程需要知道相关信息。
3、自定义的一些线程上线文ThreadLocal变量

使用原生的 ExecutorService 有诸多不便,我么必须每次都手动把线程上下文的值复制进去,有很很多冗余代码
ThreadPoolTaskExecutor可以解决这个问题,是我么的代码可以稍稍优化那么一点点

package com.seed.server.config.components;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.seed.server.entity.User;
import com.seed.server.utils.UserContext;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.MDC;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author chenshang
 */
@Getter
@Component
@Slf4j
public class ThreadPoolComponent {

    /**
     * 通用线程池
     */
    private ExecutorService commonThreadPool;
    private ExecutorService changeLogThreadPool;
    private ThreadPoolTaskExecutor studythreadPoolTaskExecutor;
    private ThreadPoolTaskExecutor demothreadPoolTaskExecutor;


    @PostConstruct
    private void init() {
        // 线程异常的时候的策略,打印日志
        RejectedExecutionHandler handler = (r, executor) -> log.error("pool rejected, activeCount={}, corePoolSize={}, maxPoolSize={}, largestPoolSize={}, poolSize={}, queueSize={}", executor.getActiveCount(), executor.getCorePoolSize(), executor.getMaximumPoolSize(), executor.getLargestPoolSize(), executor.getPoolSize(), CollectionUtils.size(executor.getQueue()));

        commonThreadPool = new ThreadPoolExecutor(5, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setNameFormat("commonThreadPool-%d").build(), handler);
        changeLogThreadPool = new ThreadPoolExecutor(5, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setNameFormat("changeLogThreadPool-%d")
                                                                                                                                              .build(), handler);
        studythreadPoolTaskExecutor = createThreadPoolTaskExecutor(4, 8, new ThreadFactoryBuilder().setNameFormat("changeLogThreadPool-%d").build(), handler);
        demothreadPoolTaskExecutor = createThreadPoolTaskExecutor(5, 10, new ThreadFactoryBuilder().setNameFormat("changeLogThreadPool-%d").build(), handler);
    }

    private ThreadPoolTaskExecutor createThreadPoolTaskExecutor(int corePoolSize, int maximumPoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maximumPoolSize);
        executor.setThreadFactory(threadFactory);
        executor.setRejectedExecutionHandler(handler);
        executor.setTaskDecorator(runnable -> {
            // 这里获取是mdc的上下文,也可以获取RequestContextHolder,具体根据你的业务需要操作即可
            Map<String, String> mdc = MDC.getCopyOfContextMap();
            User user = UserContext.getUser();
            return () -> {
                try {
                    Optional.ofNullable(mdc).ifPresent(MDC::setContextMap);
                    Optional.ofNullable(user).ifPresent(UserContext::setUser);
                    runnable.run();
                } finally {
                    // 务必记得clear,否则可能会产生内存泄露
                    MDC.clear();
                    UserContext.clear();
                }
            };
        });
        executor.initialize();
        return executor;
    }

    @PreDestroy
    private void destroy() {
        try {
            commonThreadPool.shutdown();
            changeLogThreadPool.shutdown();
            demothreadPoolTaskExecutor.shutdown();
        } catch (Exception e) {
            log.error("ThreadPoolComponent.destroy shutdown error", e);
        }
    }
}

评论