Java多线程异步执行任务、定时多线程执行任务
使用@Aysnc注解思路:
-
开启异步执行,配置异步线程池信息
-
使用@Async注解开启异步执行
手动调用异步
-
设置线程池
-
创建异步管理器,用于调用异步任务
-
创建异步任务生产工厂,用于执行异步任务
springboot异步@Async配置
-
package com.nsk666.framework.config; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; /** * @author niushuaikui * @description 配置@Async注解线程池 * @date 2023/4/6 10:34 */ @Configuration @EnableAsync @Slf4j public class AsyncConfig implements AsyncConfigurer { /** * 使用@Async注解,则调用此异步线程池 */ @Bean("asyncPoolTaskExecutor") public ThreadPoolTaskExecutor executor(){ ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //核心线程数 taskExecutor.setCorePoolSize(10); //线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程 taskExecutor.setMaxPoolSize(100); //缓存队列 taskExecutor.setQueueCapacity(50); //许的空闲时间,当超过了核心线程出之外的线程在空闲时间到达之后会被销毁 taskExecutor.setKeepAliveSeconds(200); //异步方法内部线程名称 taskExecutor.setThreadNamePrefix("async-"); /** * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略 * 通常有以下四种策略: * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 * ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功 */ taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.initialize(); return taskExecutor; } /** * 捕获使用@Async时触发的异常,记录到日志文件 */ @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler(){ return (throwable, method, obj) -> { log.error("Exception Caught in Thread - {} ",Thread.currentThread().getName()); log.error("Exception message - {}",throwable.getMessage()); log.error("Method name - {}", method.getName()); log.error("Method params - {}", JSONObject.toJSONString(obj)); }; } }
- 在合适的方法上使用@Async
手动调用异步
-
设置线程池
-
package com.nsk666.common.config.thread; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import com.nsk666.common.utils.Threads; /** * 线程池配置 **/ @Configuration public class ThreadPoolConfig { // 核心线程池大小 private int corePoolSize = 50; // 最大可创建的线程数 private int maxPoolSize = 200; // 队列最大长度 private int queueCapacity = 1000; // 线程池维护线程所允许的空闲时间 private int keepAliveSeconds = 300; @Bean(name = "threadPoolTaskExecutor") public ThreadPoolTaskExecutor threadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setMaxPoolSize(maxPoolSize); executor.setCorePoolSize(corePoolSize); executor.setQueueCapacity(queueCapacity); executor.setKeepAliveSeconds(keepAliveSeconds); // 线程池对拒绝任务(无线程可用)的处理策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } /** * 执行周期性或定时任务 */ @Bean(name = "scheduledExecutorService") protected ScheduledExecutorService scheduledExecutorService() { return new ScheduledThreadPoolExecutor(corePoolSize, new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build(), new ThreadPoolExecutor.CallerRunsPolicy()) { @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); Threads.printException(r, t); } }; } }!\[图片上传中...\]
-
创建异步任务管理器
-
package com.nsk666.framework.manager; import java.util.TimerTask; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import com.nsk666.common.utils.Threads; import com.nsk666.common.utils.spring.SpringUtils; /** * 异步任务管理器 */ public class AsyncManager { /** * 操作延迟10毫秒 */ protected final int OPERATE\\\_DELAY\\\_TIME = 10; /** * 异步操作任务调度线程池 */ protected ScheduledExecutorService executor = SpringUtils.getBean("scheduledExecutorService"); /** * 单例模式 */ private AsyncManager() { } private static AsyncManager me = new AsyncManager(); public static AsyncManager me() { return me; } /** * 执行任务 * * @param task 任务 */ public void execute(TimerTask task) { executor.schedule(task, OPERATE\\\_DELAY\\\_TIME, TimeUnit.MILLISECONDS); } /** * 停止任务线程池 */ public void shutdown() { Threads.shutdownAndAwaitTermination(executor); } }
-
创建异步工厂
-
package com.nsk666.framework.manager.factory; import java.util.TimerTask; import com.nsk666.common.constant.Constants; import com.nsk666.common.utils.AddressUtils; import com.nsk666.common.utils.LogUtils; import com.nsk666.common.utils.ServletUtils; import com.nsk666.common.utils.ShiroUtils; import com.nsk666.common.utils.StringUtils; import com.nsk666.common.utils.spring.SpringUtils; import com.nsk666.system.domain.SysLogininfor; import com.nsk666.system.service.impl.SysLogininforServiceImpl; import eu.bitwalker.useragentutils.UserAgent; /** * 异步工厂(产生任务用) */ public class AsyncFactory { /** * 记录登录信息 * * @param username 用户名 * @param status 状态 * @param message 消息 * @param args 列表 * @return 任务task */ public static TimerTask recordLogininfor(final String username, final String status, final String message, final Object... args) { final UserAgent userAgent = UserAgent.parseUserAgentString(ServletUtils.getRequest().getHeader("User-Agent")); final String ip = ShiroUtils.getIp(); return new TimerTask() { @Override public void run() { String address = AddressUtils.getRealAddressByIP(ip); StringBuilder s = new StringBuilder(); s.append(LogUtils.getBlock(ip)); s.append(address); s.append(LogUtils.getBlock(username)); s.append(LogUtils.getBlock(status)); s.append(LogUtils.getBlock(message)); // 打印信息到日志 sys\\\_user\\\_logger.info(s.toString(), args); // 获取客户端操作系统 String os = userAgent.getOperatingSystem().getName(); // 获取客户端浏览器 String browser = userAgent.getBrowser().getName(); // 封装对象 SysLogininfor logininfor = new SysLogininfor(); logininfor.setLoginName(username); logininfor.setIpaddr(ip); logininfor.setLoginLocation(address); logininfor.setBrowser(browser); logininfor.setOs(os); logininfor.setMsg(message); // 日志状态 if (StringUtils.equalsAny(status, Constants.LOGIN_SUCCESS, Constants.LOGOUT, Constants.REGISTER)) { logininfor.setStatus(Constants.SUCCESS); } else if (Constants.LOGIN_FAIL.equals(status)) { logininfor.setStatus(Constants.FAIL); } // 插入数据 SpringUtils.getBean(SysLogininforServiceImpl.class).insertLogininfor(logininfor); } }; } }
-
在合适的地方调用
-
AsyncManager.me().execute(AsyncFactory.recordLogininfor(loginName, Constants.LOGOUT, MessageUtils.message("user.logout.success")));