服务器之家

服务器之家 > 正文

Spring Boot 异步框架的使用详解

时间:2021-07-10 10:27     来源/作者:JavaDog

1. 前言

随着数据量和调用量的增长,用户对应用的性能要求越来越高。另外,在实际的服务中,还存在着这样的场景:系统在组装数据的时候,对于数据的各个部分的获取实际上是没有前后依赖关系的。这些问题都很容易让我们想到将这些同步调用全都改造为异步调用。不过自己实现起来比较麻烦,还容易出错。好在spring已经提供了该问题的解决方案,而且使用起来十分方便。

2.spring异步执行框架的使用方法

2.1 maven 依赖

spring异步执行框架的相关bean包含在spring-context和spring-aop模块中,所以只要引入上述的模块即可。

2.2 开启异步任务支持

spring提供了@enableasync的注解来标注是否启用异步任务支持。使用方式如下:

?
1
2
3
4
@configuration
@enableasync
public class appconfig {
}

note: @enableasync必须要配合@configuration使用,否则会不生效

2.3 方法标记为异步调用

将同步方法的调用改为异步调用也很简单。对于返回值为void的方法,直接加上@async注解即可。对于有返回值的方法,除了加上上述的注解外,还需要将方法的返回值修改为future类型和将返回值用asyncresult包装起来。如下所示:

?
1
2
3
4
5
6
7
8
9
10
11
12
// 无返回值的方法直接加上注解即可。
@async
public void method1() {
 ...
}
 
// 有返回值的方法需要修改返回值。
@async
public future<object> method2() {
 ...
 return new asyncresult<>(object);

2.4 方法调用

对于void的方法,和普通的调用没有任何区别。对于非void的方法,由于返回值是future类型,所以需要用get()方法来获取返回值。如下所示:

?
1
2
3
4
5
6
7
8
9
10
public static void main(string[] args) {
  service.method1();
  future<object> futureresult = service.method2();
  object result;
  try {
     result = futureresult.get();
    } catch (interruptedexception | executionexception e) {
      ...
    }
}

3. 原理简介

这块的源码的逻辑还是比较简单的,主要是spring帮我们生成并管理了一个线程池,然后方法调用的时候使用动态代理将方法的执行包装为callable类型并提交到线程池中执行。核心的实现逻辑在asyncexecutioninterceptor类的invoke()方法中。如下所示:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@override
public object invoke(final methodinvocation invocation) throws throwable {
  class<?> targetclass = (invocation.getthis() != null ? aoputils.gettargetclass(invocation.getthis()) : null);
  method specificmethod = classutils.getmostspecificmethod(invocation.getmethod(), targetclass);
  final method userdeclaredmethod = bridgemethodresolver.findbridgedmethod(specificmethod);
 
  asynctaskexecutor executor = determineasyncexecutor(userdeclaredmethod);
  if (executor == null) {
    throw new illegalstateexception(
        "no executor specified and no default executor set on asyncexecutioninterceptor either");
  }
 
  callable<object> task = new callable<object>() {
    @override
    public object call() throws exception {
      try {
        object result = invocation.proceed();
        if (result instanceof future) {
          return ((future<?>) result).get();
        }
      }
      catch (executionexception ex) {
        handleerror(ex.getcause(), userdeclaredmethod, invocation.getarguments());
      }
      catch (throwable ex) {
        handleerror(ex, userdeclaredmethod, invocation.getarguments());
      }
      return null;
    }
  };
 
  return dosubmit(task, executor, invocation.getmethod().getreturntype());
}

4.自定义taskexecutor及异常处理

4.1自定义taskexecutor

spring查找taskexecutor逻辑是:

1. 如果spring context中存在唯一的taskexecutor bean,那么就使用这个bean。

2. 如果1中的bean不存在,那么就会查找是否存在一个beanname为taskexecutor且是java.util.concurrent.executor实例的bean,有则使用这个bean。

3. 如果1、2中的都不存在,那么spring就会直接使用默认的executor,即simpleasynctaskexecutor。

在第2节的实例中,我们直接使用的是spring默认的taskexecutor。但是对于每一个新的任务,simpleaysnctaskexecutor都是直接创建新的线程来执行,所以无法重用线程。具体的执行的代码如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@override
public void execute(runnable task, long starttimeout) {
  assert.notnull(task, "runnable must not be null");
  runnable tasktouse = (this.taskdecorator != null ? this.taskdecorator.decorate(task) : task);
  if (isthrottleactive() && starttimeout > timeout_immediate) {
    this.concurrencythrottle.beforeaccess();
    doexecute(new concurrencythrottlingrunnable(tasktouse));
  }
  else {
    doexecute(tasktouse);
  }
}
 
protected void doexecute(runnable task) {
  thread thread = (this.threadfactory != null ? this.threadfactory.newthread(task) : createthread(task));
  thread.start();
}

所以我们在使用的时候,最好是使用自定义的taskexecutor。结合上面描述的spring查找taskexecutor的逻辑,最简单的自定义的方法是使用@bean注解。示例如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
// threadpooltaskexecutor的配置基本等同于线程池
@bean("taskexecutor")
public executor getasyncexecutor() {
  threadpooltaskexecutor taskexecutor = new threadpooltaskexecutor();
  taskexecutor.setmaxpoolsize(max_pool_size);
  taskexecutor.setcorepoolsize(core_pool_size);
  taskexecutor.setqueuecapacity(core_pool_size * 10);
  taskexecutor.setthreadnameprefix("wssys-async-task-thread-pool");
  taskexecutor.setwaitfortaskstocompleteonshutdown(true);
  taskexecutor.setawaitterminationseconds(60 * 10);
  taskexecutor.setrejectedexecutionhandler(new threadpoolexecutor.abortpolicy());
  return taskexecutor;
}

另外,spring还提供了一个asyncconfigurer接口,通过实现该接口,除了可以实现自定义executor以外,还可以自定义异常的处理。代码如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@configuration
@slf4j
public class asyncconfig implements asyncconfigurer {
 
  private static final int max_pool_size = 50;
 
  private static final int core_pool_size = 20;
 
  @override
  @bean("taskexecutor")
  public executor getasyncexecutor() {
    threadpooltaskexecutor taskexecutor = new threadpooltaskexecutor();
 
    taskexecutor.setmaxpoolsize(max_pool_size);
    taskexecutor.setcorepoolsize(core_pool_size);
    taskexecutor.setqueuecapacity(core_pool_size * 10);
    taskexecutor.setthreadnameprefix("async-task-thread-pool");
    taskexecutor.setwaitfortaskstocompleteonshutdown(true);
    taskexecutor.setawaitterminationseconds(60 * 10);
    taskexecutor.setrejectedexecutionhandler(new threadpoolexecutor.abortpolicy());
    return taskexecutor;
  }
 
  @override
  public asyncuncaughtexceptionhandler getasyncuncaughtexceptionhandler() {
    return (ex, method, params) -> log.error("invoke async method occurs error. method: {}, params: {}",
      method.getname(), json.tojsonstring(params), ex);
  }
 
}

note:

spring还提供了一个asyncconfigurersupport类,该类也实现了asyncconfigurer接口,且方法的返回值都是null,旨在提供一个方便的实现。

当getasyncexecutor()方法返回null的时候,spring会使用默认的处理器(强烈不推荐)。

当getasyncuncaughtexceptionhandler()返回null的时候,spring会使用simpleasyncuncaughtexceptionhandler来处理异常,该类会打印出异常的信息。

所以对该类的使用,最佳的实践是继承该类,并且覆盖实现getasyncexecutor()方法。

4.2 异常处理

spring异步框架对异常的处理如下所示:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 所在类:asyncexecutionaspectsupport
protected void handleerror(throwable ex, method method, object... params) throws exception {
  if (future.class.isassignablefrom(method.getreturntype())) {
    reflectionutils.rethrowexception(ex);
  }
  else {
    // could not transmit the exception to the caller with default executor
    try {
      this.exceptionhandler.handleuncaughtexception(ex, method, params);
    }
    catch (throwable ex2) {
      logger.error("exception handler for async method '" + method.togenericstring() +
          "' threw unexpected exception itself", ex2);
    }
  }
}

从代码来看,如果返回值是future类型,那么直接将异常抛出。如果返回值不是future类型(基本上包含的是所有返回值void类型的方法,因为如果方法有返回值,必须要用future包装起来),那么会调用handleuncaughtexception方法来处理异常。

注意:在handleuncaughtexception()方法中抛出的任何异常,都会被spring catch住,所以没有办法将void的方法再次抛出并传播到上层调用方的!!!

关于spring 这个设计的缘由我的理解是:既然方法的返回值是void,就说明调用方不关心方法执行是否成功,所以也就没有必要去处理方法抛出的异常。如果需要关心异步方法是否成功,那么返回值改为boolean就可以了。

4.4 最佳实践的建议

  1. @async可以指定方法执行的executor,用法:@async("mytaskexecutor")。推荐指定executor,这样可以避免因为executor配置没有生效而spring使用默认的executor的问题。
  2. 实现接口asyncconfigurer的时候,方法getasyncexecutor()必须要使用@bean,并指定bean的name。如果不使用@bean,那么该方法返回的executor并不会被spring管理。用java doc api的原话是:is not a fully managed spring bean.(具体含义没有太理解,不过亲测不加这个注解无法正常使用)
  3. 由于其本质上还是基于代理实现的,所以如果一个类中有a、b两个异步方法,而a中存在对b的调用,那么调用a方法的时候,b方法不会去异步执行的。
  4. 在异步方法上标注@transactional是无效的。
  5. future.get()的时候,最好使用get(long timeout, timeunit unit)方法,避免长时间阻塞。
  6. listenablefuture和completablefuture也是推荐使用的,他们相比future,提供了对异步调用的各个阶段或过程进行介入的能力。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://juejin.im/post/5c3fe71ff265da61223a966c

标签:

相关文章

热门资讯

2020微信伤感网名听哭了 让对方看到心疼的伤感网名大全
2020微信伤感网名听哭了 让对方看到心疼的伤感网名大全 2019-12-26
yue是什么意思 网络流行语yue了是什么梗
yue是什么意思 网络流行语yue了是什么梗 2020-10-11
背刺什么意思 网络词语背刺是什么梗
背刺什么意思 网络词语背刺是什么梗 2020-05-22
苹果12mini价格表官网报价 iPhone12mini全版本价格汇总
苹果12mini价格表官网报价 iPhone12mini全版本价格汇总 2020-11-13
2021德云社封箱演出完整版 2021年德云社封箱演出在线看
2021德云社封箱演出完整版 2021年德云社封箱演出在线看 2021-03-15
返回顶部