Java多线程异步执行任务、定时多线程执行任务

使用@Aysnc注解思路:

  • 开启异步执行,配置异步线程池信息

  • 使用@Async注解开启异步执行

手动调用异步

  • 设置线程池

  • 创建异步管理器,用于调用异步任务

  • 创建异步任务生产工厂,用于执行异步任务

springboot异步@Async配置

  • package com.nsk666.framework.config;
    
    import com.alibaba.fastjson.JSONObject;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.AsyncConfigurer;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import java.util.concurrent.ThreadPoolExecutor;
    
    /**
     * @author niushuaikui
     * @description 配置@Async注解线程池
     * @date 2023/4/6 10:34
     */
    @Configuration
    @EnableAsync
    @Slf4j
    public class AsyncConfig  implements AsyncConfigurer {
        /**
             * 使用@Async注解,则调用此异步线程池 
            */
        @Bean("asyncPoolTaskExecutor")
        public ThreadPoolTaskExecutor executor(){
            ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
            //核心线程数
            taskExecutor.setCorePoolSize(10);
            //线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程
            taskExecutor.setMaxPoolSize(100);
            //缓存队列
            taskExecutor.setQueueCapacity(50);
            //许的空闲时间,当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
            taskExecutor.setKeepAliveSeconds(200);
            //异步方法内部线程名称
            taskExecutor.setThreadNamePrefix("async-");
            /**
             * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
             * 通常有以下四种策略:
             * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
             * ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
             * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
             * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
             */
            taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            taskExecutor.initialize();
            return taskExecutor;
        }
    
        /**
         *  捕获使用@Async时触发的异常,记录到日志文件
         */
        @Override
        public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler(){
            return (throwable, method, obj) -> {
                log.error("Exception Caught in Thread - {} ",Thread.currentThread().getName());
                log.error("Exception message - {}",throwable.getMessage());
                log.error("Method name - {}", method.getName());
                log.error("Method params - {}", JSONObject.toJSONString(obj));
    
            };
        }
    
    }
    
  • 在合适的方法上使用@Async

手动调用异步

  • 设置线程池

  • package com.nsk666.common.config.thread;
    
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.ScheduledThreadPoolExecutor;
    import java.util.concurrent.ThreadPoolExecutor;
    
    import org.apache.commons.lang3.concurrent.BasicThreadFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    import com.nsk666.common.utils.Threads;
    
    /**
     * 线程池配置 
     **/
    @Configuration
    public class ThreadPoolConfig {
        // 核心线程池大小
        private int corePoolSize = 50;
    
        // 最大可创建的线程数
        private int maxPoolSize = 200;
    
        // 队列最大长度
        private int queueCapacity = 1000;
    
        // 线程池维护线程所允许的空闲时间
        private int keepAliveSeconds = 300;
    
        @Bean(name = "threadPoolTaskExecutor")
        public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setMaxPoolSize(maxPoolSize);
            executor.setCorePoolSize(corePoolSize);
            executor.setQueueCapacity(queueCapacity);
            executor.setKeepAliveSeconds(keepAliveSeconds);
            // 线程池对拒绝任务(无线程可用)的处理策略
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            return executor;
        }
    
        /**
         * 执行周期性或定时任务
         */
        @Bean(name = "scheduledExecutorService")
        protected ScheduledExecutorService scheduledExecutorService() {
            return new ScheduledThreadPoolExecutor(corePoolSize,
                    new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build(),
                    new ThreadPoolExecutor.CallerRunsPolicy()) {
                @Override
                protected void afterExecute(Runnable r, Throwable t) {
                    super.afterExecute(r, t);
                    Threads.printException(r, t);
                }
            };
        }
    }!\[图片上传中...\]
    
  • 创建异步任务管理器

  • package com.nsk666.framework.manager;
    
    import java.util.TimerTask;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    import com.nsk666.common.utils.Threads;
    import com.nsk666.common.utils.spring.SpringUtils;
    
    /**
     * 异步任务管理器 
     */
    public class AsyncManager {
        /**
         * 操作延迟10毫秒
         */
        protected final int OPERATE\\\_DELAY\\\_TIME = 10;
    
        /**
         * 异步操作任务调度线程池
         */
        protected ScheduledExecutorService executor = SpringUtils.getBean("scheduledExecutorService");
    
        /**
         * 单例模式
         */
        private AsyncManager() {
        }
    
        private static AsyncManager me = new AsyncManager();
    
        public static AsyncManager me() {
            return me;
        }
    
        /**
         * 执行任务
         *
         * @param task 任务
         */
        public void execute(TimerTask task) {
            executor.schedule(task, OPERATE\\\_DELAY\\\_TIME, TimeUnit.MILLISECONDS);
        }
    
        /**
         * 停止任务线程池
         */
        public void shutdown() {
            Threads.shutdownAndAwaitTermination(executor);
        }
    }
    
  • 创建异步工厂

  • package com.nsk666.framework.manager.factory;
    
    import java.util.TimerTask; 
    import com.nsk666.common.constant.Constants;
    import com.nsk666.common.utils.AddressUtils;
    import com.nsk666.common.utils.LogUtils;
    import com.nsk666.common.utils.ServletUtils;
    import com.nsk666.common.utils.ShiroUtils;
    import com.nsk666.common.utils.StringUtils;
    import com.nsk666.common.utils.spring.SpringUtils; 
    import com.nsk666.system.domain.SysLogininfor; 
    import com.nsk666.system.service.impl.SysLogininforServiceImpl;
    import eu.bitwalker.useragentutils.UserAgent;
    
    /**
     * 异步工厂(产生任务用) 
     */
    public class AsyncFactory {
    
        /**
         * 记录登录信息
         *
         * @param username 用户名
         * @param status   状态
         * @param message  消息
         * @param args     列表
         * @return 任务task
         */
        public static TimerTask recordLogininfor(final String username, final String status, final String message, final Object... args) {
            final UserAgent userAgent = UserAgent.parseUserAgentString(ServletUtils.getRequest().getHeader("User-Agent"));
            final String ip = ShiroUtils.getIp();
            return new TimerTask() {
                @Override
                public void run() {
                    String address = AddressUtils.getRealAddressByIP(ip);
                    StringBuilder s = new StringBuilder();
                    s.append(LogUtils.getBlock(ip));
                    s.append(address);
                    s.append(LogUtils.getBlock(username));
                    s.append(LogUtils.getBlock(status));
                    s.append(LogUtils.getBlock(message));
                    // 打印信息到日志
                    sys\\\_user\\\_logger.info(s.toString(), args);
                    // 获取客户端操作系统
                    String os = userAgent.getOperatingSystem().getName();
                    // 获取客户端浏览器
                    String browser = userAgent.getBrowser().getName();
                    // 封装对象
                    SysLogininfor logininfor = new SysLogininfor();
                    logininfor.setLoginName(username);
                    logininfor.setIpaddr(ip);
                    logininfor.setLoginLocation(address);
                    logininfor.setBrowser(browser);
                    logininfor.setOs(os);
                    logininfor.setMsg(message);
                    // 日志状态
                    if (StringUtils.equalsAny(status, Constants.LOGIN_SUCCESS, Constants.LOGOUT, Constants.REGISTER)) {
                        logininfor.setStatus(Constants.SUCCESS);
                    } else if (Constants.LOGIN_FAIL.equals(status)) {
                        logininfor.setStatus(Constants.FAIL);
                    }
                    // 插入数据
                    SpringUtils.getBean(SysLogininforServiceImpl.class).insertLogininfor(logininfor);
                }
            };
        }
    
    
    }
    
  • 在合适的地方调用

  • AsyncManager.me().execute(AsyncFactory.recordLogininfor(loginName, Constants.LOGOUT, 
    MessageUtils.message("user.logout.success")));