ThreadPoolExecutor是JDK中的线程池实现,这个类实现了一个线程池需要的各个方法,它提供了任务提交、线程管理、监控等方法。
下面是ThreadPoolExecutor类的构造方法源码,其他创建线程池的方法最终都会导向这个构造方法,共有7个参数:corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException(); this .acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; } |
这些参数都通过volatile修饰:
1
2
3
4
5
6
7
8
9
10
|
public class ThreadPoolExecutor extends AbstractExecutorService { private final BlockingQueue<Runnable> workQueue; private volatile ThreadFactory threadFactory; private volatile RejectedExecutionHandler handler; private volatile long keepAliveTime; // 是否允许核心线程被回收 private volatile boolean allowCoreThreadTimeOut; private volatile int corePoolSize; private volatile int maximumPoolSize; } |
corePoolSize:核心线程数
线程池维护的最小线程数量,核心线程创建后不会被回收(注意:设置allowCoreThreadTimeout=true后,空闲的核心线程超过存活时间也会被回收)。
大于核心线程数的线程,在空闲时间超过keepAliveTime后会被回收。
线程池刚创建时,里面没有一个线程,当调用 execute() 方法添加一个任务时,如果正在运行的线程数量小于corePoolSize,则马上创建新线程并运行这个任务。
maximumPoolSize:最大线程数
线程池允许创建的最大线程数量。
当添加一个任务时,核心线程数已满,线程池还没达到最大线程数,并且没有空闲线程,工作队列已满的情况下,创建一个新线程,然后从工作队列的头部取出一个任务交由新线程来处理,而将刚提交的任务放入工作队列尾部。
keepAliveTime:空闲线程存活时间
当一个可被回收的线程的空闲时间大于keepAliveTime,就会被回收。
可被回收的线程:
设置allowCoreThreadTimeout=true的核心线程。大于核心线程数的线程(非核心线程)。
unit:时间单位
keepAliveTime的时间单位:
1
2
3
4
5
6
7
|
TimeUnit.NANOSECONDS TimeUnit.MICROSECONDS TimeUnit.MILLISECONDS // 毫秒 TimeUnit.SECONDS TimeUnit.MINUTES TimeUnit.HOURS TimeUnit.DAYS |
workQueue:工作队列
新任务被提交后,会先添加到工作队列,任务调度时再从队列中取出任务。工作队列实现了BlockingQueue接口。
JDK默认的工作队列有五种:
1.ArrayBlockingQueue 数组型阻塞队列:数组结构,初始化时传入大小,有界,FIFO,使用一个重入锁,默认使用非公平锁,入队和出队共用一个锁,互斥。
2。LinkedBlockingQueue 链表型阻塞队列:链表结构,默认初始化大小为Integer.MAX_VALUE,有界(近似无解),FIFO,使用两个重入锁分别控制元素的入队和出队,用Condition进行线程间的唤醒和等待。
3.SynchronousQueue 同步队列:容量为0,添加任务必须等待取出任务,这个队列相当于通道,不存储元素。
4.PriorityBlockingQueue 优先阻塞队列:无界,默认采用元素自然顺序升序排列。
5.DelayQueue 延时队列:无界,元素有过期时间,过期的元素才能被取出。
threadFactory:线程工厂
创建线程的工厂,可以设定线程名、线程编号等。
默认线程工厂:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
/** * The default thread factory */ static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger( 1 ); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger( 1 ); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null ) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-" ; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0 ); if (t.isDaemon()) t.setDaemon( false ); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } } |
handler:拒绝策略
当线程池线程数已满,并且工作队列达到限制,新提交的任务使用拒绝策略处理。可以自定义拒绝策略,拒绝策略需要实现RejectedExecutionHandler接口。
JDK默认的拒绝策略有四种:
1.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
2.DiscardPolicy:丢弃任务,但是不抛出异常。可能导致无法发现系统的异常状态。
3.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务。
4.CallerRunsPolicy:由调用线程处理该任务。
默认拒绝策略:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
/** * The default rejected execution handler */ private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException( "Task " + r.toString() + " rejected from " + e.toString()); } } |
自定义线程池工具
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * 线程池工厂工具 * * @author 向振华 * @date 2021/04/11 10:24 */ public class ThreadPoolFactory { /** * 生成固定大小的线程池 * * @param threadName 线程名称 * @return 线程池 */ public static ExecutorService createFixedThreadPool(String threadName) { AtomicInteger threadNumber = new AtomicInteger( 0 ); return new ThreadPoolExecutor( // 核心线程数 desiredThreadNum(), // 最大线程数 desiredThreadNum() * 2 , // 空闲线程存活时间 60L, // 空闲线程存活时间单位 TimeUnit.SECONDS, // 工作队列 new ArrayBlockingQueue<>( 1024 ), // 线程工厂 new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, threadName + "-" + threadNumber.getAndIncrement()); } }, // 拒绝策略 new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (!executor.isShutdown()) { try { //尝试阻塞式加入任务队列 executor.getQueue().put(r); } catch (Exception e) { //保持线程的中断状态 Thread.currentThread().interrupt(); } } } }); } /** * 理想的线程数,使用 2倍cpu核心数 */ public static int desiredThreadNum() { return Runtime.getRuntime().availableProcessors() * 2 ; } } |
到此这篇关于Java多线程之线程池七个参数详解的文章就介绍到这了,更多相关java线程池详解内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://blog.csdn.net/Anenan/article/details/115603481