pop6 's Notes
1
Toggle navigation
pop6 's Notes
主页
About Me
归档
标签
SpringBoot笔记(9)-异步调用
2021-02-07 15:25:58
1165
0
0
pop6
> 参考: > * [How To Do @Async in Spring](https://www.baeldung.com/spring-async) * [Spring @Async rest controller example – Spring @EnableAsync](https://howtodoinjava.com/spring-boot2/rest/enableasync-async-controller/) * [Creating Asynchronous Methods](https://spring.io/guides/gs/async-method/) * [SpringBoot学习笔记(十七:异步调用) ](https://juejin.cn/post/6850418116464230408) # 简介 Web场景中,对于一些耗时的任务,为了保证及时响应,会进行异步调用。例如大批量数据的保存,异步调用执行,执行结果通过邮件、工作台等反馈。 异步调用本质是将异步任务放到单独线程执行,不阻塞主线程。在Java中,可使用 `ThreadPool`、`CompletableFuture` 实现异步任务,Spring 提供了 `@Async` 以支持异步调用。 # 启用异步调用 将`@EnableAsync`添加到Java Bean上,即可启用异步调用支持。 如下,将`@EnableAsync`加到SpringBoot启动类上(@SpringBootApplication标识的类会被注册为Bean)。 ``` @SpringBootApplication @EnableAsync public class RestApplication { ... } ``` 或者在其它Java Bean添加`@EnableAsync`。 ``` @Configuration @EnableAsync public class AsyncConfig { ... } ``` `@EnableAsync`注解有4个属性,一般情况使用默认值即可。 * **annotation**:设置标识异步调用的注解类型。默认情况下,将同时检测Spring的`@Async`和EJB 3.1 `@javax.ejb.Asynchronous`注解。 * **proxyTargetClass**:是否创建子类代理CGLIB,默认为false,即接口代理;仅当mode属性设置为 `AdviceMode.PROXY` 时此属性才有效。 * **mode**:代理类型。默认值为 `AdviceMode.PROXY`,即JDK代理。JDK代理不支持自调用,因为直接调用的本地方法,绕过了代理。如果想要支持自调用,则选择 `AdviceMode.ASPECTJ` 增强方式,并将Spring AOP切换为AspectJ即可。 * **order**:表示AsyncAnnotationBeanPostProcessor的顺序。默认值是 `Ordered.LOWEST_PRECEDENCE`,为了在所有其他后处理器之后运行,因此它可以为现有代理添加增强代码,而不是使用双重代理。 # 进行异步调用 `@Async`注解用于标识一个方法为异步的,该注解标记方法的内部代码会被放到单独线程去执行。 使用默认的`@EnableAsync`配置,需要注意两件事情: 1. 只能调用public方法。 2. **不支持自调用**,因为这绕过了动态代理方法,直接调用的目标类方法。 上面两种问题并非无法解决,将`@EnableAsync`的mode切换为`AdviceMode.ASPECTJ`,并将Spring AOP切换为AspectJ即可。Spring AOP切换为AspectJ可参考 [Spring Boot教程(20) – 用AspectJ实现AOP内部调用](https://fookwood.com/spring-boot-tutorial-20-aspectj)。 ## 无返回值 无返回值的异步方法如下: ``` @Async @Override public void execute0() throws Exception { log.info(">>>> 异步任务开始"); long start = System.currentTimeMillis(); Thread.sleep(5000); log.info("<<<< 异步任务结束,耗时:" + (System.currentTimeMillis() - start) + "ms"); } ``` 调用者如下: ``` @ApiOperation(value = "测试异步调用", notes = "使用@Async,无参") @GetMapping(path = "/test", produces = MediaType.APPLICATION_JSON_UTF8_VALUE) public ResponseResult test() throws Exception { log.info(">>>> 启动异步任务提交"); this.asyncService.execute0(); log.info("<<<< 结束异步任务提交"); return ResponseResult.success("调用成功"); } ``` 调用日志如下:可以看到 “提交异步调用” 的线程是 `nio-8085-exec-1`,“进行异步调用”的线程是 `task-1`。 ``` 2021-02-05 15:08:45.103 INFO 27624 --- [nio-8085-exec-1] o.x.r.controller.AsyncController : >>>> 启动异步任务提交 2021-02-05 15:08:45.106 INFO 27624 --- [nio-8085-exec-1] o.x.r.controller.AsyncController : <<<< 结束异步任务提交 2021-02-05 15:08:45.109 INFO 27624 --- [ task-1] o.x.r.service.impl.AsyncServiceImpl : >>>> 异步任务开始 2021-02-05 15:08:50.110 INFO 27624 --- [ task-1] o.x.r.service.impl.AsyncServiceImpl : <<<< 异步任务结束,耗时:5000ms ``` ## 有返回值 有返回值的异步方法如下:`ListenableFuture`为Spring提供的异步接口,继承自 `Future` ,提供了执行 callback 功能,`AsyncResult`为`ListenableFuture`实现类。 ``` @Async @Override public ListenableFuture<String> execute1() throws Exception { log.info(">>>> 异步任务开始"); long start = System.currentTimeMillis(); Thread.sleep(5000); log.info("<<<< 异步任务结束,耗时:" + (System.currentTimeMillis() - start) + "ms"); return new AsyncResult<>("Hello World!"); } ``` 调用者如下: ``` @ApiOperation(value = "测试异步调用", notes = "使用@Async,有参") @GetMapping(path = "/test1", produces = MediaType.APPLICATION_JSON_UTF8_VALUE) public ResponseResult test1() throws Exception { log.info(">>>> 启动异步任务提交"); ListenableFuture<String> ret = this.asyncService.execute1(); ret.addCallback( r -> { System.out.println("异步执行成功:" + r); }, e -> { System.out.println("异步执行失败:" + e.getMessage()); } ); log.info("<<<< 结束异步任务提交"); return ResponseResult.success("调用成功"); } ``` 调用日志如下:调用者和执行者处于不同线程,执行结果触发相关回调,可以获取执行结果。 ``` 2021-02-05 15:15:10.398 INFO 27624 --- [nio-8085-exec-3] o.x.r.controller.AsyncController : >>>> 启动异步任务提交 2021-02-05 15:15:10.424 INFO 27624 --- [ task-2] o.x.r.service.impl.AsyncServiceImpl : >>>> 异步任务开始 2021-02-05 15:15:10.429 INFO 27624 --- [nio-8085-exec-3] o.x.r.controller.AsyncController : <<<< 结束异步任务提交 2021-02-05 15:15:15.426 INFO 27624 --- [ task-2] o.x.r.service.impl.AsyncServiceImpl : <<<< 异步任务结束,耗时:5001ms 异步执行成功:Hello World! ``` 也可以使用`CompletableFuture`替换`ListenableFuture`作为异步返回值。 ``` @Async @Override public CompletableFuture<String> execute1() throws Exception { log.info(">>>> 异步任务开始"); long start = System.currentTimeMillis(); Thread.sleep(5000); log.info("<<<< 异步任务结束,耗时:" + (System.currentTimeMillis() - start) + "ms"); return CompletableFuture.completedFuture("Hello World!"); } ``` ## 合并多个异步结果 存在多个异步调研。 ``` @Async @Override public ListenableFuture<String> execute1() throws Exception { log.info(">>>> 异步任务开始"); long start = System.currentTimeMillis(); Thread.sleep(5000); log.info("<<<< 异步任务结束,耗时:" + (System.currentTimeMillis() - start) + "ms"); return new AsyncResult<>("Hello World!"); } @Async @Override public ListenableFuture<String> execute2() throws Exception { log.info(">>>> 异步任务开始"); long start = System.currentTimeMillis(); Thread.sleep(5000); log.info("<<<< 异步任务结束,耗时:" + (System.currentTimeMillis() - start) + "ms"); return new AsyncResult<>("Java!"); } ``` 自旋检测异步结果,合并结果。 ``` @ApiOperation(value = "测试多异步调用", notes = "使用@Async,有参") @GetMapping(path = "/test2", produces = MediaType.APPLICATION_JSON_UTF8_VALUE) public ResponseResult test2() throws Exception { log.info(">>>> 启动异步任务提交"); ListenableFuture<String> ret0 = this.asyncService.execute1(); ListenableFuture<String> ret1 = this.asyncService.execute2(); while (ret0.isDone() && ret1.isDone()) { Thread.sleep(100); } System.out.println("异步执行结果:" + ret0.get() + ret1.get()); log.info("<<<< 结束异步任务提交"); return ResponseResult.success("调用成功"); } ``` 如果方法的异步结果为`CompletableFuture`,可使用`CompletableFuture.allOf(ret0, ret1).join()`合并结果。 # 自定义Executor `@Async`本质是基于线程池进行异步任务提交,Spring Boot使用线程池的顺序如下。 1. 默认情况,Spring将搜索 `org.springframework.core.task.TaskExecutor` 的Bean。 2. 若步骤1未找到,则寻找 `java.util.concurrent.Executor` 且name为 `taskExecutor` 的Bean。 3. 若步骤1、2都未找到,将使用 `org.springframework.core.task.SimpleAsyncTaskExecutor`。 我们可以在应用级别、方法级别自定义Executor。如果未定义,SpringBoot内部会构造一个ThreadPoolTaskExecutor对象,其配置如下,可看到`maxPoolSIze、queueCapacity`过大,因此一般还是根据应用情况进行自定义。 ![](https://leanote.com/api/file/getImage?fileId=60e71bf9ab6441194c000a86) ## 方法级别Executor 定义一个配置类,返回Executor Bean对象。 ``` @Configuration @EnableAsync public class AsyncConfig { @Value("${executor.asyncInvoke.corePoolSize}") private int corePoolSize; @Value("${executor.asyncInvoke.maxPoolSize}") private int maxPoolSize; @Value("${executor.asyncInvoke.queueCapacity}") private int queueCapacity; @Value("${executor.asyncInvoke.threadNamePrefix}") private String threadNamePrefix; @Bean("alice") public ThreadPoolTaskExecutor asyncInvoke() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setThreadNamePrefix("alice-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); executor.initialize(); return executor; } @Bean("bob") public ThreadPoolTaskExecutor asyncInvoke2() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setThreadNamePrefix("bob-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); executor.initialize(); return executor; } } ``` `@Async`注解标记时指定线程池。 ``` @Async("alice") @Override public ListenableFuture<String> execute1() throws Exception { log.info(">>>> 异步任务开始"); long start = System.currentTimeMillis(); Thread.sleep(5000); log.info("<<<< 异步任务结束,耗时:" + (System.currentTimeMillis() - start) + "ms"); return new AsyncResult<>("Hello World!"); } @Async("bob") @Override public ListenableFuture<String> execute2() throws Exception { log.info(">>>> 异步任务开始"); long start = System.currentTimeMillis(); Thread.sleep(5000); log.info("<<<< 异步任务结束,耗时:" + (System.currentTimeMillis() - start) + "ms"); return new AsyncResult<>("Java!"); } ``` ## 应用级别Executor 实现 `AsyncConfigurer` 接口,重写 `getAsyncExecutor()` 方法,该线程池是 `@Async` 的默认线程池。 ``` public class AsyncCommonConfig implements AsyncConfigurer { @Value("${executor.asyncInvoke.corePoolSize}") private int corePoolSize; @Value("${executor.asyncInvoke.maxPoolSize}") private int maxPoolSize; @Value("${executor.asyncInvoke.queueCapacity}") private int queueCapacity; @Value("${executor.asyncInvoke.threadNamePrefix}") private String threadNamePrefix; @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setThreadNamePrefix(threadNamePrefix); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); executor.initialize(); return executor; } } ``` 根据`@Async`使用线程池的规则,也可以在AsyncConfig返回一个`TaskExecutor Bean`作为`@Async`的默认线程池。但是以这种方式,当定义多个`TaskExecutor`时,`@Async`会选取多个Bean中任意一个。 ``` @Configuration @EnableAsync public class AsyncConfig { ... @Bean public ThreadPoolTaskExecutor asyncInvoke() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setThreadNamePrefix(threadNamePrefix); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); executor.initialize(); return executor; } ``` # 异常处理 对于有返回值的异步调用,可以根据执行结果callback处理异常,或者调用`future.get()`阻塞主线程等待结果。 ``` ListenableFuture<String> ret = this.asyncService.execute1(); ret.addCallback( r -> { System.out.println("异步执行成功:" + r); }, e -> { System.out.println("异步执行失败:" + e.getMessage()); } ); ``` 对于无返回值的异步调用,异常不会传递到调用线程,因此需要进行一些配置。实现 `AsyncUncaughtExceptionHandler` 接口,定义异常处理。 ``` @Slf4j public class AsyncInvokeException implements AsyncUncaughtExceptionHandler { @Override public void handleUncaughtException(Throwable ex, Method method, Object... params) { log.error("Async invoke error; msg: " + ex + "; method: " + method.getName() + "; params: " + Arrays.toString(params)); } } ``` 继承`AsyncConfigurer`接口,重写 `getAsyncUncaughtExceptionHandler()`方法,该方法返回自定义的异步调用异常。 ``` @Configuration @EnableAsync public class AsyncCommonConfig implements AsyncConfigurer { @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new AsyncInvokeException(); } } ``` # 内部原理 @Async内部原理是 "**AOP + 线程池**",Spring增强@Async标记的方法,调用时将增强的方法提交到线程池执行。更多细节见 [Spring @EnableAsync 注解原理](https://plentymore.github.io/2018/12/29/Spring-EnableAsync-%E6%B3%A8%E8%A7%A3%E5%8E%9F%E7%90%86/)。
上一篇:
OLAP笔记
下一篇:
并发(1)-volitale、synchronized、final内存语义
0
赞
1165 人读过
新浪微博
微信
腾讯微博
QQ空间
人人网
提交评论
立即登录
, 发表评论.
没有帐号?
立即注册
0
条评论
More...
文档导航
没有帐号? 立即注册