前言
前面介绍了排它锁,共享锁的实现机制,本篇继续学习aqs中的另外一个内容-condition。想必学过java的都知道object.wait和object.notify,同时也应该知晓这两个方法的使用离不开synchronized关键字。synchronized是jvm级别提供的同步原语,它的实现机制隐藏在jvm实现中。作为lock系列功能中的condition,就是用来实现类似 object.wait和object.notify 对应功能的。
使用场景
为了更好的理解lock和condition的使用场景,下面我们先来实现这样一个功能:有多个生产者,多个消费者,一个产品容器,我们假设容器最多可以放3个产品,如果满了,生产者需要等待产品被消费,如果没有产品了,消费者需要等待。我们的目标是一共生产10个产品,最终消费10个产品,如何在多线程环境下完成这一挑战呢?下面是我简单实现的一个demo,仅供参考。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
package com.lock.condition.test; import java.util.linkedlist; import java.util.concurrent.timeunit; import java.util.concurrent.atomic.atomicinteger; import java.util.concurrent.locks.condition; import java.util.concurrent.locks.reentrantlock; public class lockconditiontest { // 生产 和 消费 的最大总数 public static int totalcount = 10 ; // 已经生产的产品数 public static volatile int hasproducecount = 0 ; // 已经消费的产品数 public static volatile int hasconsumecount = 0 ; // 容器最大容量 public static int containersize = 3 ; // 使用公平策略的可重入锁,便于观察演示结果 public static reentrantlock lock = new reentrantlock( true ); public static condition notempty = lock.newcondition(); public static condition notfull = lock.newcondition(); // 容器 public static linkedlist<integer> container = new linkedlist<integer>(); // 用于标识产品 public static atomicinteger idgenerator = new atomicinteger(); public static void main(string[] args) { thread p1 = new thread( new producer(), "p-1" ); thread p2 = new thread( new producer(), "p-2" ); thread p3 = new thread( new producer(), "p-3" ); thread c1 = new thread( new consumer(), "c-1" ); thread c2 = new thread( new consumer(), "c-2" ); thread c3 = new thread( new consumer(), "c-3" ); c1.start(); c2.start(); c3.start(); p1.start(); p2.start(); p3.start(); try { c1.join(); c2.join(); c3.join(); p1.join(); p2.join(); p3.join(); } catch (exception e){ } system.out.println( " done. " ); } static class producer implements runnable{ @override public void run() { while ( true ){ lock.lock(); try { // 容器满了,需要等待非满条件 while (container.size() >= containersize){ notfull.await(); } // 到这里表明容器未满,但需要再次判断是否已经完成了任务 if (hasproducecount >= totalcount){ system.out.println(thread.currentthread().getname()+ " producer exit" ); return ; } int product = idgenerator.incrementandget(); // 把生产出来的产品放入容器 container.addlast(product); system.out.println(thread.currentthread().getname() + " product " + product); hasproducecount++; // 通知消费线程可以去消费了 notempty.signal(); } catch (interruptedexception e) { } finally { lock.unlock(); } } } } static class consumer implements runnable{ @override public void run() { while ( true ){ lock.lock(); try { if (hasconsumecount >= totalcount){ system.out.println(thread.currentthread().getname()+ " consumer exit" ); return ; } // 一直等待有产品了,再继续往下消费 while (container.isempty()){ notempty.await( 2 , timeunit.seconds); if (hasconsumecount >= totalcount){ system.out.println(thread.currentthread().getname()+ " consumer exit" ); return ; } } integer product = container.removefirst(); system.out.println(thread.currentthread().getname() + " consume " + product); hasconsumecount++; // 通知生产线程可以继续生产产品了 notfull.signal(); } catch (interruptedexception e) { } finally { lock.unlock(); } } } } } |
一次执行结果如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
p- 1 product 1 p- 3 product 2 p- 2 product 3 c- 3 consume 1 c- 2 consume 2 c- 1 consume 3 p- 1 product 4 p- 3 product 5 p- 2 product 6 c- 3 consume 4 c- 2 consume 5 c- 1 consume 6 p- 1 product 7 p- 3 product 8 p- 2 product 9 c- 3 consume 7 c- 2 consume 8 c- 1 consume 9 p- 1 product 10 p- 3 producer exit p- 2 producer exit c- 3 consume 10 c- 2 consumer exit c- 1 consumer exit p- 1 producer exit c- 3 consumer exit done. |
从结果可以发现已经达到我们的目的了。
深入理解condition的实现原理
上面的示例只是为了展示 lock结合condition可以实现的一种经典场景,在有了感性的认识之后,我们将一步一步来观察lock和condition是如何协作完成这一任务的,这也是本篇的核心内容。
为了更好的理解和演示这一个过程,我们使用到的锁是使用公平策略模式的,我们会使用上面例子运作的流程。我们会使用到3个生产线程,3个消费线程,分别表示 p1、p2、p3和c1、c2、c3。
condition的内部实现是使用节点链来实现的,每个条件实例对应一个节点链,我们有notempty 和 notfull 两个条件实例,所以会有两个等待节点链。
一切准备就绪 ,开始我们的探索之旅。
1、线程c3执行,然后发现没有产品可以消费,执行 notempty.await,进入等待队列中等候。
2、线程c2和线程c1执行,然后发现没有产品可以消费,执行 notempty.await,进入等待队列中等候。
3、 线程 p1 启动,得到了锁,p1开始生产产品,这时候p3抢在p2之前,执行了lock操作,结果p2和p3都处于等待状态,入同步队列等待。<喎�"/kf/ware/vc/" target="_blank" class="keylink">vcd4ncjxwpjxpbwcgywx0pq=="这里写图片描述" src="/uploadfile/collfiles/20160912/20160912092710536.png" id="theimg" src="/uploads/allimg/210202/11330054a-2.jpg" />
事情开始变得有趣了,p1执行一次生产后,执行了 notempty.signal,其效果就是把 not empty等待列表中的头节点,即c3节点移到同步等待列队中,重新参与抢占锁。
5、p3生产完了产品后,继续notempty.signal,同时释放锁,释放锁后会唤醒p2线程,然后p3在下一轮尝试获取锁的时候,再次入队。
6、接着,p2继续生产,生产后执行 notempty.signal,同时释放锁,释放锁后唤醒c3线程,然后p2在下一轮尝试取锁的时候,入列。
7、c3进行消费,你可以看到,现在 not empty等待列队中已经没有等待节点了,由于我们使用的是公平策略排它锁,这就会导致同步队列中的节点一个接着一个执行,而目前同步队列中的节点排列为一生产,一消费,这不难可以知道,接下来代码已经不会进入 wait条件了,所以一个一个轮流执行就是,比如c3,执行完了,继续notfull.signal(); 然后释放锁,入队,这里要明白,notfull.signal();这句代码其实没有作用了,因为 not full等待队列中没有任何等待线程节点。 c3执行后,状态如下图所示:
8、后面的事情我想大家都可以想得出来是怎样一步一步交替执行的了。
总结
本篇基于一个实例来演示结合lock和condition如何实现生产-消费模式,而且只讨论一种可能执行的流程,是想更简单的表述aqs底层是如何实现的。基于上面这个演示过程,针对其它的执行流程,其原来也是一样的。condition内部使用一个节点链来保存所有 wait状态的线程,当对应条件被signal的时候,就会把等待节点转移到同步队列中,继续竞争锁。原理其实并不复杂,有兴趣的朋友可以翻阅源码。
以上就是本文关于java并发等待条件的实现原理详解的全部内容,希望对大家有所帮助。有什么问题可以随时留言,小编会及时回复大家的。感谢朋友们对本站的支持!
原文链接:https://www.2cto.com/kf/201609/546940.html