为每种特定的同步问题提供了解决方案,同步器是一些使线程能够等待另一个线程的对象,允许它们协调动作。最常用的同步器是CountDownLatch和Semaphore,不常用的是Barrier 和Exchanger
Semaphore
Semaphore【信号标;旗语】,通过计数器控制对共享资源的访问。
测试类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
package concurrent; import concurrent.thread.SemaphoreThread; import java.util.concurrent.Semaphore; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 创建时间:2016年08月08日 * 描述: */ public class SemaphoreTest { public static void main(String[] args) { //在Thread里声明并不是同一个对象 Semaphore semaphore = new Semaphore( 3 ); SemaphoreThread testA = new SemaphoreThread( "A" , semaphore); SemaphoreThread testB = new SemaphoreThread( "B" , semaphore); SemaphoreThread testC = new SemaphoreThread( "C" , semaphore); SemaphoreThread testD = new SemaphoreThread( "D" , semaphore); SemaphoreThread testE = new SemaphoreThread( "E" , semaphore); SemaphoreThread testF = new SemaphoreThread( "F" , semaphore); SemaphoreThread testG = new SemaphoreThread( "G" , semaphore); testA.start(); testB.start(); testC.start(); testD.start(); testE.start(); testF.start(); testG.start(); } } |
线程写法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
package concurrent.thread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.Semaphore; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 创建时间:2016年08月08日 * 描述: */ public class SemaphoreThread extends Thread { private static final Logger logger = LogManager.getLogger(SemaphoreThread. class ); //创建有3个信号量的信号量计数器 public Semaphore semaphore; public SemaphoreThread(String name, Semaphore semaphore) { setName(name); this .semaphore = semaphore; } @Override public void run() { try { logger.debug(getName() + " 取号等待... " + System.currentTimeMillis()); //取出一个信号 semaphore.acquire(); logger.debug(getName() + " 提供服务... " + System.currentTimeMillis()); sleep( 1000 ); logger.debug(getName() + " 完成服务... " + System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } logger.debug(getName() + " 释放... " + System.currentTimeMillis()); //释放一个信号 semaphore.release(); } } |
执行结果【以下所有输出结果中[]中为线程名称- 后为输出的内容】:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
[C] - C 取号等待... 1470642024037 [F] - F 取号等待... 1470642024036 [E] - E 取号等待... 1470642024036 [B] - B 取号等待... 1470642024037 [D] - D 取号等待... 1470642024037 [A] - A 取号等待... 1470642023965 [D] - D 提供服务... 1470642024039 [C] - C 提供服务... 1470642024039 [G] - G 取号等待... 1470642024036 [F] - F 提供服务... 1470642024040 [D] - D 完成服务... 1470642025039 [C] - C 完成服务... 1470642025039 [D] - D 释放... 1470642025040 [F] - F 完成服务... 1470642025040 [C] - C 释放... 1470642025041 [B] - B 提供服务... 1470642025042 [A] - A 提供服务... 1470642025042 [F] - F 释放... 1470642025043 [E] - E 提供服务... 1470642025043 [A] - A 完成服务... 1470642026043 [B] - B 完成服务... 1470642026043 [B] - B 释放... 1470642026043 [A] - A 释放... 1470642026043 [G] - G 提供服务... 1470642026044 [E] - E 完成服务... 1470642026045 [E] - E 释放... 1470642026045 [G] - G 完成服务... 1470642027045 [G] - G 释放... 1470642027046 |
可以看到,当3个信号量被领取完之后,之后的线程会阻塞在领取信号的位置,当有信号量释放之后才会继续执行。
CountDownLatch
CountDownLatch【倒计时锁】,线程中调用countDownLatch.await()使进程进入阻塞状态,当达成指定次数后(通过countDownLatch.countDown())继续执行每个线程中剩余的内容。
一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。
测试类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
package concurrent.thread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.CountDownLatch; public class package concurrent; import concurrent.thread.CountDownLatchThread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 创建时间:2016年08月08日 * 描述: */ public class CountDownLatchTest { private static final Logger logger = LogManager.getLogger(CountDownLatchTest. class ); public static void main(String[] args) throws InterruptedException { //设定当达成三个计数时触发 CountDownLatch countDownLatch = new CountDownLatch( 3 ); new CountDownLatchThread( "A" , countDownLatch).start(); new CountDownLatchThread( "B" , countDownLatch).start(); new CountDownLatchThread( "C" , countDownLatch).start(); new CountDownLatchThread( "D" , countDownLatch).start(); new CountDownLatchThread( "E" , countDownLatch).start(); for ( int i = 3 ; i > 0 ; i--) { Thread.sleep( 1000 ); logger.debug(i); countDownLatch.countDown(); } } } |
线程类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
package concurrent.thread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.CountDownLatch; public class CountDownLatchThread extends Thread { private static final Logger logger = LogManager.getLogger(CountDownLatchThread. class ); //计数器 private CountDownLatch countDownLatch; public CountDownLatchThread(String name, CountDownLatch countDownLatch) { setName(name); this .countDownLatch = countDownLatch; } @Override public void run() { logger.debug( "执行操作..." ); try { sleep( 1000 ); } catch (InterruptedException e) { e.printStackTrace(); } logger.debug( "等待计数器达到标准..." ); try { //让线程进入阻塞状态,等待计数达成后释放 countDownLatch.await(); logger.debug( "计数达成,继续执行..." ); } catch (InterruptedException e) { e.printStackTrace(); } } } |
执行结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
[E] - 执行操作... [B] - 执行操作... [A] - 执行操作... [C] - 执行操作... [D] - 执行操作... [main] DEBUG concurrent.CountDownLatchTest - 3 [B] - 等待计数器达到标准... [E] - 等待计数器达到标准... [C] - 等待计数器达到标准... [D] - 等待计数器达到标准... [A] - 等待计数器达到标准... [main] DEBUG concurrent.CountDownLatchTest - 2 [main] DEBUG concurrent.CountDownLatchTest - 1 [E] - 计数达成,继续执行... [C] - 计数达成,继续执行... [B] - 计数达成,继续执行... [D] - 计数达成,继续执行... [A] - 计数达成,继续执行... |
CyclicBarrier
CyclicBarrier【Cyclic周期,循环的 Barrier屏障,障碍】循环的等待阻塞的线程个数到达指定数量后使参与计数的线程继续执行并可执行特定线程(使用不同构造函数可以不设定到达后执行),其他线程仍处于阻塞等待再一次达成指定个数。
测试类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
package concurrent; import concurrent.thread.CyclicBarrierThread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierTest { private static final Logger logger = LogManager.getLogger(CyclicBarrierTest. class ); public static void main(String[] args) { //可以使用CyclicBarrier(int parties)不设定到达后执行的内容 CyclicBarrier cyclicBarrier = new CyclicBarrier( 5 , () -> { logger.debug( "---计数到达后执行的内容----" ); } ); new CyclicBarrierThread( "A" , cyclicBarrier).start(); new CyclicBarrierThread( "B" , cyclicBarrier).start(); new CyclicBarrierThread( "C" , cyclicBarrier).start(); new CyclicBarrierThread( "D" , cyclicBarrier).start(); new CyclicBarrierThread( "E" , cyclicBarrier).start(); new CyclicBarrierThread( "A2" , cyclicBarrier).start(); new CyclicBarrierThread( "B2" , cyclicBarrier).start(); new CyclicBarrierThread( "C2" , cyclicBarrier).start(); new CyclicBarrierThread( "D2" , cyclicBarrier).start(); new CyclicBarrierThread( "E2" , cyclicBarrier).start(); //需要注意的是,如果线程数不是上面设置的等待数量的整数倍,比如这个程序中又加了个线程, // 那么当达到5个数量时,只会执行达到时的五个线程的内容, // 剩余一个线程会出于阻塞状态导致主线程无法退出,程序无法结束 // new CyclicBarrierThread("F", cyclicBarrier).start();//将这行注释去掉程序无法自动结束 } } |
线程类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
package concurrent.thread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierThread extends Thread { private static final Logger logger = LogManager.getLogger(CyclicBarrierThread. class ); private CyclicBarrier cyclicBarrier; public CyclicBarrierThread(String name, CyclicBarrier cyclicBarrier) { super (name); this .cyclicBarrier = cyclicBarrier; } @Override public void run() { logger.debug( "执行操作..." ); try { int time = new Random().nextint( 10 ) * 1000 ; logger.debug( "休眠" + time/ 1000 + "秒" ); sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } logger.debug( "等待计数器达到标准..." ); try { //让线程进入阻塞状态,等待计数达成后释放 cyclicBarrier.await(); logger.debug( "计数达成,继续执行..." ); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } |
执行结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
[A] - 执行操作... [A] - 休眠 0 秒 [E2] - 执行操作... [E2] - 休眠 5 秒 [D2] - 执行操作... [D2] - 休眠 4 秒 [C2] - 执行操作... [C2] - 休眠 4 秒 [B2] - 执行操作... [B2] - 休眠 6 秒 [A2] - 执行操作... [A2] - 休眠 8 秒 [E] - 执行操作... [E] - 休眠 5 秒 [D] - 执行操作... [D] - 休眠 0 秒 [C] - 执行操作... [C] - 休眠 3 秒 [B] - 执行操作... [B] - 休眠 7 秒 [A] - 等待计数器达到标准... [D] - 等待计数器达到标准... [C] - 等待计数器达到标准... [D2] - 等待计数器达到标准... [C2] - 等待计数器达到标准... [C2] DEBUG concurrent.CyclicBarrierTest - ---计数到达后执行的内容---- [C2] - 计数达成,继续执行... [A] - 计数达成,继续执行... [C] - 计数达成,继续执行... [D2] - 计数达成,继续执行... [D] - 计数达成,继续执行... [E2] - 等待计数器达到标准... [E] - 等待计数器达到标准... [B2] - 等待计数器达到标准... [B] - 等待计数器达到标准... [A2] - 等待计数器达到标准... [A2] DEBUG concurrent.CyclicBarrierTest - ---计数到达后执行的内容---- [E] - 计数达成,继续执行... [B2] - 计数达成,继续执行... [E2] - 计数达成,继续执行... [B] - 计数达成,继续执行... [A2] - 计数达成,继续执行... |
可以想象成以前不正规的长途汽车站的模式:
不正规的长途汽车站会等待座位坐满之后才发车,到达目的地之后继续等待然后循环进行。每个人都是一个Thread,上车后触发cyclicBarrier.await();,当坐满时就是达到指定达成数的时候,车辆发车就是达成后统一执行的内容,发车后车上的人们就可以聊天之类的操作了【我们暂且理解为上车后人们就都不能动了O(∩_∩)O~】。
CountDownLatch与CyclicBarrier区别:
CountDownLatch是一个或多个线程等待计数达成后继续执行,await()调用并没有参与计数。
CyclicBarrier则是N个线程等待彼此执行到零界点之后再继续执行,await()调用的同时参与了计数,并且CyclicBarrier支持条件达成后执行某个动作,而且这个过程是循环性的。
Exchanger
Exchanger 用于线程间进行数据交换
可以在对中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger 可能被视为 SynchronousQueue 的双向形式。 Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。
用法示例:以下是重点介绍的一个类,该类使用 Exchanger 在线程间交换缓冲区,因此,在需要时,填充缓冲区的线程获取一个新腾空的缓冲区,并将填满的缓冲区传递给腾空缓冲区的线程。 测试类:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
package concurrent; import concurrent.pojo.ExchangerPojo; import concurrent.thread.ExchangerThread; import java.util.HashMap; import java.util.concurrent.Exchanger; public class ExchangerTest { public static void main(String[] args) { Exchanger<HashMap<String, ExchangerPojo>> exchanger = new Exchanger<>(); new ExchangerThread( "A" , exchanger).start(); new ExchangerThread( "B" , exchanger).start(); } } |
实体类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
package concurrent.pojo; import com.alibaba.fastjson.JSON; import java.util.Date; import java.util.List; public class ExchangerPojo { private int intVal; private String strVal; private List<String> strList; private Date date; public ExchangerPojo( int intVal, String strVal, List<String> strList, Date date) { this .intVal = intVal; this .strVal = strVal; this .strList = strList; this .date = date; } public int getIntVal() { return intVal; } public void setIntVal( int intVal) { this .intVal = intVal; } public String getStrVal() { return strVal; } public void setStrVal(String strVal) { this .strVal = strVal; } public List<String> getStrList() { return strList; } public void setStrList(List<String> strList) { this .strList = strList; } public Date getDate() { return date; } public void setDate(Date date) { this .date = date; } @Override public String toString() { return JSON.toJSONString( this ); } } |
线程类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
package concurrent.thread; import concurrent.pojo.ExchangerPojo; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.*; import java.util.concurrent.Exchanger; public class ExchangerThread extends Thread { private Exchanger<HashMap<String, ExchangerPojo>> exchanger; private static final Logger logger = LogManager.getLogger(ExchangerThread. class ); public ExchangerThread(String name, Exchanger<HashMap<String, ExchangerPojo>> exchanger) { super (name); this .exchanger = exchanger; } @Override public void run() { HashMap<String, ExchangerPojo> map = new HashMap<>(); logger.debug(getName() + "提供者提供数据..." ); Random random = new Random(); for ( int i = 0 ; i < 3 ; i++) { int index = random.nextint( 10 ); List<String> list = new ArrayList<>(); for ( int j = 0 ; j < index; j++) { list.add( "list ---> " + j); } ExchangerPojo pojo = new ExchangerPojo(index, getName() + "提供的数据" , list, new Date()); map.put( "第" + i + "个数据" , pojo); } try { int time = random.nextint( 10 ); logger.debug(getName() + "等待" + time + "秒...." ); for ( int i = time; i > 0 ; i--) { sleep( 1000 ); logger.debug(getName() + "---->" + i); } //等待exchange是会进入阻塞状态,可以在一个线程中与另一线程多次交互,此处就不写多次了 HashMap<String, ExchangerPojo> getMap = exchanger.exchange(map); time = random.nextint( 10 ); logger.debug(getName() + "接受到数据等待" + time + "秒...." ); for ( int i = time; i > 0 ; i--) { sleep( 1000 ); logger.debug(getName() + "---->" + i); } getMap.forEach((x, y) -> { logger.debug(x + " -----> " + y.toString()); } ); } catch (InterruptedException e) { e.printStackTrace(); } } } |
执行结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
[B] - B提供者提供数据... [A] - A提供者提供数据... [A] - A等待 2 秒.... [B] - B等待 0 秒.... [A] - A----> 2 [A] - A----> 1 [B] - B接受到数据等待 1 秒.... [A] - A接受到数据等待 4 秒.... [B] - B----> 1 [A] - A----> 4 [B] - 第 0 个数据 -----> { "date" : 1470652252049 , "intVal" : 5 , "strList" :[ "list ---> 0" , "list ---> 1" , "list ---> 2" , "list ---> 3" , "list ---> 4" ], "strVal" : "A提供的数据" } [B] - 第 1 个数据 -----> { "date" : 1470652252049 , "intVal" : 1 , "strList" :[ "list ---> 0" ], "strVal" : "A提供的数据" } [B] - 第 2 个数据 -----> { "date" : 1470652252049 , "intVal" : 4 , "strList" :[ "list ---> 0" , "list ---> 1" , "list ---> 2" , "list ---> 3" ], "strVal" : "A提供的数据" } [A] - A----> 3 [A] - A----> 2 [A] - A----> 1 [A] - 第 0 个数据 -----> { "date" : 1470652252057 , "intVal" : 1 , "strList" :[ "list ---> 0" ], "strVal" : "B提供的数据" } [A] - 第 1 个数据 -----> { "date" : 1470652252057 , "intVal" : 6 , "strList" :[ "list ---> 0" , "list ---> 1" , "list ---> 2" , "list ---> 3" , "list ---> 4" , "list ---> 5" ], "strVal" : "B提供的数据" } [A] - 第 2 个数据 -----> { "date" : 1470652252057 , "intVal" : 6 , "strList" :[ "list ---> 0" , "list ---> 1" , "list ---> 2" , "list ---> 3" , "list ---> 4" , "list ---> 5" ], "strVal" : "B提供的数据" } |
Phaser
Phaser个人感觉兼具了CountDownLatch与CyclicBarrier的功能,并提供了分阶段的能力。
实现分阶段的CyclicBarrier的功能
测试代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
package concurrent; import concurrent.thread.PhaserThread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.Phaser; public class PhaserTest { private static final Logger logger = LogManager.getLogger(PhaserTest. class ); public static void main(String[] args) { Phaser phaser = new Phaser() { /**此方法有2个作用: * 1、当每一个阶段执行完毕,此方法会被自动调用,因此,重载此方法写入的代码会在每个阶段执行完毕时执行,相当于CyclicBarrier的barrierAction。 * 2、当此方法返回true时,意味着Phaser被终止,因此可以巧妙的设置此方法的返回值来终止所有线程。例如:若此方法返回值为 phase>=3,其含义为当整个线程执行了4个阶段后,程序终止。 * */ @Override protected Boolean onAdvance( int phase, int registeredParties) { logger.debug( "阶段--->" + phase); logger.debug( "注册的线程数量--->" + registeredParties); return super .onAdvance(phase, registeredParties); } } ; for ( int i = 3 ; i > 0 ; i--) { new PhaserThread( "第" + i + "个" , phaser).start(); } } } |
线程代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
package concurrent.thread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Random; import java.util.concurrent.Phaser; public class PhaserThread extends Thread { private Phaser phaser; private static final Logger logger = LogManager.getLogger(PhaserThread. class ); public PhaserThread(String name, Phaser phaser) { super (name); this .phaser = phaser; //把当前线程注册到Phaser this .phaser.register(); logger.debug( "name为" + name + "的线程注册了" + this .phaser.getRegisteredParties() + "个线程" ); } @Override public void run() { logger.debug( "进入..." ); phaser.arrive(); for ( int i = 6 ; i > 0 ; i--) { int time = new Random().nextint( 5 ); try { logger.debug( "睡眠" + time + "秒" ); sleep(time * 1000 ); if (i == 1 ) { logger.debug( "未完成的线程数量:" + phaser.getUnarrivedParties()); logger.debug( "最后一次触发,并注销自身" ); phaser.arriveAndDeregister(); logger.debug( "未完成的线程数量:" + phaser.getUnarrivedParties()); } else { logger.debug( "未完成的线程数量:" + phaser.getUnarrivedParties()); logger.debug(i + "--->触发并阻塞..." ); phaser.arriveAndAwaitAdvance(); //相当于CyclicBarrier.await(); logger.debug( "未完成的线程数量:" + phaser.getUnarrivedParties()); } } catch (InterruptedException e) { e.printStackTrace(); } } logger.debug( "注销完成之后注册的线程数量--->" + phaser.getRegisteredParties()); } } |
执行结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
[main] - name为第 3 个的线程注册了 1 个线程 [main] - name为第 2 个的线程注册了 2 个线程 [main] - name为第 1 个的线程注册了 3 个线程 [第 3 个] - 进入... [第 2 个] - 进入... [第 3 个] - 睡眠 2 秒 [第 2 个] - 睡眠 1 秒 [第 1 个] - 进入... [第 1 个] - 阶段---> 0 [第 1 个] - 注册的线程数量---> 3 [第 1 个] - 睡眠 4 秒 [第 2 个] - 未完成的线程数量: 3 [第 2 个] - 6 --->触发并阻塞... [第 3 个] - 未完成的线程数量: 2 [第 3 个] - 6 --->触发并阻塞... [第 1 个] - 未完成的线程数量: 1 [第 1 个] - 6 --->触发并阻塞... [第 1 个] - 阶段---> 1 [第 1 个] - 注册的线程数量---> 3 [第 1 个] - 未完成的线程数量: 3 [第 3 个] - 未完成的线程数量: 3 [第 2 个] - 未完成的线程数量: 3 [第 1 个] - 睡眠 1 秒 [第 3 个] - 睡眠 0 秒 [第 2 个] - 睡眠 4 秒 [第 3 个] - 未完成的线程数量: 3 [第 3 个] - 5 --->触发并阻塞... [第 1 个] - 未完成的线程数量: 2 [第 1 个] - 5 --->触发并阻塞... [第 2 个] - 未完成的线程数量: 1 [第 2 个] - 5 --->触发并阻塞... [第 2 个] - 阶段---> 2 [第 2 个] - 注册的线程数量---> 3 [第 2 个] - 未完成的线程数量: 3 [第 3 个] - 未完成的线程数量: 3 [第 1 个] - 未完成的线程数量: 3 [第 2 个] - 睡眠 0 秒 [第 3 个] - 睡眠 2 秒 [第 2 个] - 未完成的线程数量: 3 [第 1 个] - 睡眠 2 秒 [第 2 个] - 4 --->触发并阻塞... [第 3 个] - 未完成的线程数量: 2 [第 1 个] - 未完成的线程数量: 2 [第 3 个] - 4 --->触发并阻塞... [第 1 个] - 4 --->触发并阻塞... [第 1 个] - 阶段---> 3 [第 1 个] - 注册的线程数量---> 3 [第 1 个] - 未完成的线程数量: 3 [第 3 个] - 未完成的线程数量: 3 [第 2 个] - 未完成的线程数量: 3 [第 1 个] - 睡眠 2 秒 [第 3 个] - 睡眠 1 秒 [第 2 个] - 睡眠 4 秒 [第 3 个] - 未完成的线程数量: 3 [第 3 个] - 3 --->触发并阻塞... [第 1 个] - 未完成的线程数量: 2 [第 1 个] - 3 --->触发并阻塞... [第 2 个] - 未完成的线程数量: 1 [第 2 个] - 3 --->触发并阻塞... [第 2 个] - 阶段---> 4 [第 2 个] - 注册的线程数量---> 3 [第 2 个] - 未完成的线程数量: 3 [第 3 个] - 未完成的线程数量: 3 [第 1 个] - 未完成的线程数量: 3 [第 2 个] - 睡眠 2 秒 [第 1 个] - 睡眠 2 秒 [第 3 个] - 睡眠 4 秒 [第 2 个] - 未完成的线程数量: 3 [第 1 个] - 未完成的线程数量: 3 [第 2 个] - 2 --->触发并阻塞... [第 1 个] - 2 --->触发并阻塞... [第 3 个] - 未完成的线程数量: 1 [第 3 个] - 2 --->触发并阻塞... [第 3 个] - 阶段---> 5 [第 3 个] - 注册的线程数量---> 3 [第 3 个] - 未完成的线程数量: 3 [第 1 个] - 未完成的线程数量: 3 [第 2 个] - 未完成的线程数量: 3 [第 3 个] - 睡眠 2 秒 [第 1 个] - 睡眠 3 秒 [第 2 个] - 睡眠 0 秒 [第 2 个] - 未完成的线程数量: 3 [第 2 个] - 最后一次触发,并注销自身 [第 2 个] - 未完成的线程数量: 2 [第 2 个] - 注销完成之后注册的线程数量---> 2 [第 3 个] - 未完成的线程数量: 2 [第 3 个] - 最后一次触发,并注销自身 [第 3 个] - 未完成的线程数量: 1 [第 3 个] - 注销完成之后注册的线程数量---> 1 [第 1 个] - 未完成的线程数量: 1 [第 1 个] - 最后一次触发,并注销自身 [第 1 个] - 阶段---> 6 [第 1 个] - 注册的线程数量---> 0 [第 1 个] - 未完成的线程数量: 0 [第 1 个] - 注销完成之后注册的线程数量---> 0 |
上面代码中,当所有线程进行到arriveAndAwaitAdvance()时会触发计数并且将线程阻塞,等计数数量等于注册线程数量【即所有线程都执行到了约定的地方时,会放行,是所有线程得以继续执行,并触发onAction事件】。我们可以在onAction中根据不同阶段执行不同内容的操作。
实现分阶段的CountDownLatch的功能
只需将上面的测试类更改如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
package concurrent; import concurrent.thread.PhaserThread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.Phaser; import static jodd.util.ThreadUtil.sleep; public class PhaserTest { private static final Logger logger = LogManager.getLogger(PhaserTest. class ); public static void main(String[] args) { //这里其实相当于已经注册了3个线程,但是并没有实际的线程 int coutNum= 3 ; Phaser phaser = new Phaser(coutNum) { /**此方法有2个作用: * 1、当每一个阶段执行完毕,此方法会被自动调用,因此,重载此方法写入的代码会在每个阶段执行完毕时执行,相当于CyclicBarrier的barrierAction。 * 2、当此方法返回true时,意味着Phaser被终止,因此可以巧妙的设置此方法的返回值来终止所有线程。例如:若此方法返回值为 phase>=3,其含义为当整个线程执行了4个阶段后,程序终止。 * */ @Override protected Boolean onAdvance( int phase, int registeredParties) { logger.debug( "阶段--->" + phase); logger.debug( "注册的线程数量--->" + registeredParties); return registeredParties==coutNum; //当后只剩下coutNum个线程时说明所有真实的注册的线程已经运行完成,测试可以终止Phaser } } ; for ( int i = 3 ; i > 0 ; i--) { new PhaserThread( "第" + i + "个" , phaser).start(); } //当phaser未终止时循环注册这块儿可以使用实际的业务处理 while (!phaser.isTerminated()) { sleep( 1000 ); logger.debug( "触发一次" ); phaser.arrive(); //相当于countDownLatch.countDown(); } } } |
总结
以上就是本文关于Java多线程同步器代码详解的全部内容,希望对大家有所帮助。如有不足之处,欢迎留言指出。
原文链接:http://blog.csdn.net/wfg18801733667/article/details/52179510