起步
Python 提供的多线程模型中并没有提供读写锁,读写锁相对于单纯的互斥锁,适用性更高,可以多个线程同时占用读模式的读写锁,但是只能一个线程占用写模式的读写锁。
通俗点说就是当没有写锁时,就可以加读锁且任意线程可以同时加;而写锁只能有一个线程,且必须在没有读锁时才能加上。
简单的实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
import threading class RWlock( object ): def __init__( self ): self ._lock = threading.Lock() self ._extra = threading.Lock() self .read_num = 0 def read_acquire( self ): with self ._extra: self .read_num + = 1 if self .read_num = = 1 : self ._lock.acquire() def read_release( self ): with self ._extra: self .read_num - = 1 if self .read_num = = 0 : self ._lock.release() def write_acquire( self ): self ._lock.acquire() def write_release( self ): self ._lock.release() |
这是读写锁的一个简单的实现,self.read_num 用来保存获得读锁的线程数,这个属性属于临界区,对其操作也要加锁,所以这里需要一个保护内部数据的额外的锁 self._extra 。
但是这个锁是不公平的。理想情况下,线程获得所的机会应该是一样的,不管线程是读操作还是写操作。而从上述代码可以看到,读请求都会立即设置 self.read_num += 1,不管有没有获得锁,而写请求想要获得锁还得等待 read_num 为 0 。
所以这个就造成了只有锁没有被占用或者没有读请求时,可以获得写权限。我们应该想办法避免读模式锁长期占用。
读写锁的优先级
读写锁也有分 读优先 和 写优先。上面的代码就属于读优先。
如果要改成写优先,那就换成去记录写线程的引用计数,读和写在同时竞争时,可以让写线程增加写的计数,这样可使读线程的读锁一直获取不到, 因为读线程要先判断写的引用计数,若不为0,则等待其为 0,然后进行读。这部分代码不罗列了。
但这样显然不够灵活。我们不需要两个相似的读写锁类。我们希望重构我们代码,使它更强大。
改进
为了能够满足自定义优先级的读写锁,要记录等待的读写线程数,并且需要两个条件 threading.Condition 用来处理哪方优先的通知。计数引用可以扩大语义:正数:表示正在读操作的线程数,负数:表示正在写操作的线程数(最多-1)
在获取读操作时,先然后判断时候有等待的写线程,没有,进行读操作,有,则等待读的计数加 1 后等待 Condition 通知;等待读的计数减 1,计数引用加 1,继续读操作,若条件不成立,循环等待;
在获取写操作时,若锁没有被占用,引用计数减 1,若被占用,等待写线程数加 1,等待写条件 Condition 的通知。
读模式和写模式的释放都是一样,需要根据判断去通知对应的 Condition:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
class RWLock( object ): def __init__( self ): self .lock = threading.Lock() self .rcond = threading.Condition( self .lock) self .wcond = threading.Condition( self .lock) self .read_waiter = 0 # 等待获取读锁的线程数 self .write_waiter = 0 # 等待获取写锁的线程数 self .state = 0 # 正数:表示正在读操作的线程数 负数:表示正在写操作的线程数(最多-1) self .owners = [] # 正在操作的线程id集合 self .write_first = True # 默认写优先,False表示读优先 def write_acquire( self , blocking = True ): # 获取写锁只有当 me = threading.get_ident() with self .lock: while not self ._write_acquire(me): if not blocking: return False self .write_waiter + = 1 self .wcond.wait() self .write_waiter - = 1 return True def _write_acquire( self , me): # 获取写锁只有当锁没人占用,或者当前线程已经占用 if self .state = = 0 or ( self .state < 0 and me in self .owners): self .state - = 1 self .owners.append(me) return True if self .state > 0 and me in self .owners: raise RuntimeError( 'cannot recursively wrlock a rdlocked lock' ) return False def read_acquire( self , blocking = True ): me = threading.get_ident() with self .lock: while not self ._read_acquire(me): if not blocking: return False self .read_waiter + = 1 self .rcond.wait() self .read_waiter - = 1 return True def _read_acquire( self , me): if self .state < 0 : # 如果锁被写锁占用 return False if not self .write_waiter: ok = True else : ok = me in self .owners if ok or not self .write_first: self .state + = 1 self .owners.append(me) return True return False def unlock( self ): me = threading.get_ident() with self .lock: try : self .owners.remove(me) except ValueError: raise RuntimeError( 'cannot release un-acquired lock' ) if self .state > 0 : self .state - = 1 else : self .state + = 1 if not self .state: if self .write_waiter and self .write_first: # 如果有写操作在等待(默认写优先) self .wcond.notify() elif self .read_waiter: self .rcond.notify_all() elif self .write_waiter: self .wcond.notify() read_release = unlock write_release = unlock |
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://segmentfault.com/a/1190000016900930