当你有多个进程或线程访问相同的数据时,竞争条件是一个威胁。本文探讨了在发现竞争条件后如何测试它们。
Incrmnt
你在一个名为“Incrmnt”的火热新创公司工作,该公司只做一件事情,并且做得比较好。
你展示一个全局计数器和一个加号,用户可以点击加号,此时计数器加一。这太简单了,而且容易使人上瘾。毫无疑问这就是接下来的大事情。
投资者们争先恐后的进入了董事会,但你有一个大问题。
竞争条件
在你的内测中,Abraham和Belinda是如此的兴奋,以至于每个人都点了100次加号按钮。你的服务器日志显示了200次请求,但计数器却显示为173。很明显,有一些请求没有被加上。
先将“Incrmnt变成了一坨屎”的新闻抛到脑后,你检查下代码(本文用到的所有代码都能在Github上找到)。
1
2
3
4
5
6
7
8
9
10
|
# incrmnt.py import db def increment(): count = db.get_count() new_count = count + 1 db.set_count(new_count) return new_count |
你的Web服务器使用多进程处理流量请求,所以这个函数能在不同的线程中同时执行。如果你没掌握好时机,将会发生:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
# 线程1和线程2在不同的进程中同时执行 # 为了展示的目的,在这里并排放置 # 在垂直方向分开它们,以说明在每个时间点上执行什么代码 # Thread 1(线程1) # Thread 2(线程2) def increment(): def increment(): # get_count returns 0 count = db.get_count() # get_count returns 0 again count = db.get_count() new_count = count + 1 # set_count called with 1 db.set_count(new_count) new_count = count + 1 # set_count called with 1 again db.set_count(new_count) |
所以尽管增加了两次计数,但最终只增加了1。
你知道你可以修改这个代码,变为线程安全的,但是在你那么做之前,你还想写一个测试证明竞争的存在。
重现竞争
在理想情况下,测试应该尽可能的重现上面的场景。竞争的关键因素是:
?两个 get_count 调用必须在两个 set_count 调用之前执行,从而使得两个线程中的计数具有相同的值。
set_count 调用,什么时候执行都没关系,只要它们都在 get_count 调用之后即可。
简单起见,我们试着重现这个嵌套的情形。这里整 个Thread 2 在 Thread 1 的首个 get_count 调用之后执行:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
# Thread 1 # Thread 2 def increment(): # get_count returns 0 count = db.get_count() def increment(): # get_count returns 0 again count = db.get_count() # set_count called with 1 new_count = count + 1 db.set_count(new_count) # set_count called with 1 again new_count = count + 1 db.set_count(new_count) |
before_after 是一个库,它提供了帮助重现这种情形的工具。它可以在一个函数之前或之后插入任意代码。
before_after 依赖于 mock 库,它用来补充一些功能。如果你不熟悉 mock,我建议阅读一些优秀的文档。文档中特别重要的部分是 Where To Patch。
我们希望,Thread 1 调用 get_count 后,执行全部的 Thread 2 ,之后恢复执行 Thread 1。
我们的测试代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
# test_incrmnt.py import unittest import before_after import db import incrmnt class TestIncrmnt(unittest.TestCase): def setUp( self ): db.reset_db() def test_increment_race( self ): # after a call to get_count, call increment with before_after.after( 'incrmnt.db.get_count' , incrmnt.increment): # start off the race with a call to increment incrmnt.increment() count = db.get_count() self .assertEqual(count, 2 ) |
在首次 get_count 调用之后,我们使用 before_after 的上下文管理器 after 来插入另外一个 increment 的调用。
在默认情况下,before_after只调用一次 after 函数。在这个特殊的情况下这是很有用的,因为否则的话堆栈会溢出(increment调用get_count,get_coun t也调用 increment,increment 又调用get_count…)。
这个测试失败了,因为计数等于1,而不是2。现在我们有一个重现了竞争条件的失败测试,一起来修复。
防止竞争
我们将要使用一个简单的锁机制来减缓竞争。这显然不是理想的解决方案,更好的解决方法是使用原子更新进行数据存储——但这种方法能更好地示范 before_after 在测试多线程应用程序上的作用。
在 incrmnt.py 中添加一个新函数:
1
2
3
4
5
|
# incrmnt.py def locking_increment(): with db.get_lock(): return increment() |
它保证在同一时间只有一个线程对计数进行读写操作。如果一个线程试图获取锁,而锁被另外一个线程保持,将会引发 CouldNotLock 异常。
现在我们增加这样一个测试:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
# test_incrmnt.py def test_locking_increment_race( self ): def erroring_locking_increment(): # Trying to get a lock when the other thread has it will cause a # CouldNotLock exception - catch it here or the test will fail with self .assertRaises(db.CouldNotLock): incrmnt.locking_increment() with before_after.after( 'incrmnt.db.get_count' , erroring_locking_increment): incrmnt.locking_increment() count = db.get_count() self .assertEqual(count, 1 ) |
现在在同一时间,就只有一个线程能够增加计数了。
减缓竞争
我们这里还有一个问题,通过上边这种方式,如果两个请求冲突,一个不会被登记。为了缓解这个问题,我们可以让 increment 重新链接服务器(有一个简洁的方式,就是用类似 funcy retry 的东西):
1
2
3
4
5
6
7
8
|
# incrmnt.py def retrying_locking_increment(): @retry (tries = 5 , errors = db.CouldNotLock) def _increment(): return locking_increment() return _increment() |
当我们需要比这种方法提供的更大规模的操作时,可以将 increment 作为一个原子更新或事务转移到我们的数据库中,让其在远离我们的应用程序的地方承担责任。
总结
Incrmnt 现在不存在竞争了,人们可以愉快地点击一整天,而不用担心自己不被计算在内。
这是一个简单的例子,但是 before_after 可以用于更复杂的竞争条件,以确保你的函数能正确地处理所有情形。能够在单线程环境中测试和重现竞争条件是一个关键,它能让你更确定你正在正确地处理竞争条件。