前言
无论是synchronized还是lock,都运行在线程级别上,必须运行在同一个jvm中。如果竞争资源的进程不在同一个jvm中时,这样线程锁就无法起到作用,必须使用分布式锁来控制多个进程对资源的访问。
分布式锁的实现一般有三种方式,使用mysql数据库行锁,基于redis的分布式锁,以及基于zookeeper的分布式锁。本文中我们重点看一下redis如何实现分布式锁。
首先,看一下用于实现分布式锁的两个redis基础命令:
setnx key value
这里的setnx,是"set if not exists"的缩写,表示当指定的key值不存在时,为key设定值为value。如果key存在,则设定失败。
setex key timeout value
setex命令为指定的key设置值及其过期时间(以秒为单位)。如果key已经存在,setex命令将会替换旧的值。
基于这两个指令,我们能够实现:
使用setnx 命令,保证同一时刻只有一个线程能够获取到锁使用setex 命令,保证锁会超期释放,从而不因一个线程长期占有一个锁而导致死锁。
这里将两个命令结合在一起使用的原因是,在正常情况下,如果只使用setnx 命令,使用完成后使用delete命令删除锁进行释放,不存在什么问题。但是如果获取分布式锁的线程在运行中挂掉了,那么锁将不被释放。如果使用setex 设置了过期时间,即使线程挂掉,也可以自动进行锁的释放。
手写redis分布式锁
接下来,我们基于redis+spring手写实现一个分布式锁。首先配置jedis连接池:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
@configuration public class config { @bean public jedispool jedispool(){ jedispoolconfig jedispoolconfig= new jedispoolconfig(); jedispoolconfig.setmaxidle( 100 ); jedispoolconfig.setminidle( 1 ); jedispoolconfig.setmaxwaitmillis( 2000 ); jedispoolconfig.settestonborrow( true ); jedispoolconfig.settestonreturn( true ); jedispool jedispool= new jedispool(jedispoolconfig, "127.0.0.1" , 6379 ); return jedispool; } } |
实现redislock分布式锁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
public class redislock implements lock { @autowired jedispool jedispool; private static final string key = "lock" ; private threadlocal<string> threadlocal = new threadlocal<>(); @override public void lock() { boolean b = trylock(); if (b) { return ; } try { timeunit.milliseconds.sleep( 50 ); } catch (exception e) { e.printstacktrace(); } lock(); //递归调用 } @override public boolean trylock() { setparams setparams = new setparams(); setparams.ex( 10 ); setparams.nx(); string s = uuid.randomuuid().tostring(); jedis resource = jedispool.getresource(); string lock = resource.set(key, s, setparams); resource.close(); if ( "ok" .equals(lock)) { threadlocal.set(s); return true ; } return false ; } //解锁判断锁是不是自己加的 @override public void unlock(){ //调用lua脚本解锁 string script= "if redis.call(\"get\",keys[1]==argv[1] then\n" + " return redis.call(\"del\",keys[1])\n" + "else\n" + " return 0\n" + "end" ; jedis resource = jedispool.getresource(); object eval=resource.eval(script, arrays.aslist(key),arrays.aslist(threadlocal.get())); if (integer.valueof(eval.tostring())== 0 ){ resource.close(); throw new runtimeexception( "解锁失败" ); } /* *不写成下面这种也是因为不是原子操作,和ex、nx相同 string s = resource.get(key); if (threadlocal.get().equals(s)){ resource.del(key); } */ resource.close(); } @override public void lockinterruptibly() throws interruptedexception { } @override public boolean trylock( long time, timeunit unit) throws interruptedexception { return false ; } @override public condition newcondition() { return null ; } } |
简单对上面代码中需要注意的地方做一解释:
加锁过程中,使用setparams 同时设置nx和ex的值,保证原子操作通过threadlocal保存key对应的value,通过value来判断锁是否当前线程自己加的,避免线程错乱解锁释放锁的过程中,使用lua脚本进行删除,保证redis在执行此脚本时不执行其他操作,从而保证操作的原子性
但是,这段手写的代码可能会存在一个问题,就是不能保证业务逻辑一定能被执行完成,因为设置了锁的过期时间可能导致过期。
redisson
基于上面存在的问题,我们可以使用redisson分布式可重入锁。redisson内部提供了一个监控锁的看门狗,它的作用是在redisson实例被关闭前,不断的延长锁的有效期。
引入依赖:
1
2
3
4
5
|
<dependency> <groupid>org.redisson</groupid> <artifactid>redisson</artifactid> <version> 3.10 . 7 </version> </dependency> |
配置redissonclient,然后我们对常用方法进行测试。
1
2
3
4
5
6
7
8
9
10
|
@configuration public class redissonconfig { @bean public redissonclient redissonclient(){ config config= new config(); config.usesingleserver().setaddress( "redis://127.0.0.1:6379" ); redissonclient redissonclient= redisson.create(config); return redissonclient; } } |
lock()
先写一个测试接口:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
@getmapping ( "/lock" ) public string test() { rlock lock = redissonclient.getlock( "lock" ); lock.lock(); system.out.println(thread.currentthread().getname()+ " get redisson lock" ); try { system.out.println( "do something" ); timeunit.seconds.sleep( 20 ); } catch (interruptedexception e) { e.printstacktrace(); } lock.unlock(); system.out.println(thread.currentthread().getname()+ " release lock" ); return "locked" ; } |
进行测试,同时发送两个请求,redisson锁生效:
lock(long leasetime, timeunit unit)
redisson可以给lock()方法提供leasetime参数来指定加锁的时间,超过这个时间后锁可以自动释放。测试接口:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
@getmapping ( "/lock2" ) public string test2() { rlock lock = redissonclient.getlock( "lock" ); lock.lock( 10 ,timeunit.seconds); system.out.println(thread.currentthread().getname()+ " get redisson lock" ); try { system.out.println( "do something" ); timeunit.seconds.sleep( 20 ); } catch (interruptedexception e) { e.printstacktrace(); } system.out.println(thread.currentthread().getname()+ " release lock" ); return "locked" ; } |
运行结果:
可以看出,在第一个线程还没有执行完成时,就释放了redisson锁,第二个线程进入后,两个线程可以同时执行被锁住的代码逻辑。这样可以实现无需调用unlock方法手动解锁。
trylock(long waittime, long leasetime, timeunit unit)
trylock方法会尝试加锁,最多等待waittime秒,上锁以后过leasetime秒自动解锁;如果没有等待时间,锁不住直接返回false。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
@getmapping ( "/lock3" ) public string test3() { rlock lock = redissonclient.getlock( "lock" ); try { boolean res = lock.trylock( 5 , 30 , timeunit.seconds); if (res){ try { system.out.println(thread.currentthread().getname()+ " 获取到锁,返回true" ); system.out.println( "do something" ); timeunit.seconds.sleep( 20 ); } finally { lock.unlock(); system.out.println(thread.currentthread().getname()+ " 释放锁" ); } } else { system.out.println(thread.currentthread().getname()+ " 未获取到锁,返回false" ); } } catch (interruptedexception e) { e.printstacktrace(); } return "lock" ; } |
运行结果:
可见在第一个线程获得锁后,第二个线程超过等待时间仍未获得锁,返回false放弃获得锁的过程。
除了以上单机redisson锁以外,还支持我们之前提到过的哨兵模式和集群模式,只需要改变config的配置即可。以集群模式为例:
1
2
3
4
5
6
7
8
9
10
11
12
|
@bean public redissonclient redissonclient(){ config config= new config(); config.useclusterservers().addnodeaddress( "redis://172.20.5.170:7000" ) .addnodeaddress( "redis://172.20.5.170:7001" ) .addnodeaddress( "redis://172.20.5.170:7002" ) .addnodeaddress( "redis://172.20.5.170:7003" ) .addnodeaddress( "redis://172.20.5.170:7004" ) .addnodeaddress( "redis://172.20.5.170:7005" ); redissonclient redissonclient = redisson.create(config); return redissonclient; } |
redlock红锁
下面介绍一下redisson红锁redissonredlock,该对象也可以用来将多个rlock对象关联为一个红锁,每个rlock对象实例可以来自于不同的redisson实例。
redissonredlock针对的多个redis节点,这多个节点可以是集群,也可以不是集群。当我们使用redissonredlock时,只要在大部分节点上加锁成功就算成功。看一下使用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
@getmapping ( "/testredlock" ) public void testredlock() { config config1 = new config(); config1.usesingleserver().setaddress( "redis://172.20.5.170:6379" ); redissonclient redissonclient1 = redisson.create(config1); config config2 = new config(); config2.usesingleserver().setaddress( "redis://172.20.5.170:6380" ); redissonclient redissonclient2 = redisson.create(config2); config config3 = new config(); config3.usesingleserver().setaddress( "redis://172.20.5.170:6381" ); redissonclient redissonclient3 = redisson.create(config3); string resourcename = "redlock" ; rlock lock1 = redissonclient1.getlock(resourcename); rlock lock2 = redissonclient2.getlock(resourcename); rlock lock3 = redissonclient3.getlock(resourcename); redissonredlock redlock = new redissonredlock(lock1, lock2, lock3); boolean islock; try { islock = redlock.trylock( 5 , 30 , timeunit.seconds); if (islock) { system.out.println( "do something" ); timeunit.seconds.sleep( 20 ); } } catch (exception e) { e.printstacktrace(); } finally { redlock.unlock(); } } |
相对于单redis节点来说,redissonredlock的优点在于防止了单节点故障造成整个服务停止运行的情况;并且在多节点中锁的设计,及多节点同时崩溃等各种意外情况有自己独特的设计方法。使用redissonredlock,性能方面会比单节点redis分布式锁差一些,但可用性比普通锁高很多。
总结
到此这篇关于巧用redis实现分布式锁详细介绍的文章就介绍到这了,更多相关redis分布式锁内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://blog.csdn.net/Monsterof/article/details/122090628