分布式锁设计
宋标 Lv5

Redis分布式锁实现

1.代码实现

每个方法对应一个案例,且说明了每个方法的漏洞逐步升级解决。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/**
* @author songbiao
* @desc 模拟分布式锁
*/
public class DistributeLockStudy {

public static final String LOCK = "lock";
public Jedis jedis = new Jedis("127.0.0.1", 6379);

/**
*
* @param workTime 占用锁时间
* @param intervalTime 获得锁间隔时间
* @throws InterruptedException
*/
public void getLock1(long workTime, long intervalTime) throws InterruptedException {
// 存在漏洞: 当多个服务器同时走到这条语句,互斥就失效。
while (jedis.exists(LOCK)) {
System.out.println(Thread.currentThread().getName() + "间隔秒: " + intervalTime);
TimeUnit.SECONDS.sleep(intervalTime);
}
jedis.setex(LOCK, workTime, System.currentTimeMillis() + "");
System.out.println(Thread.currentThread().getName() + " 获得锁");
}

public void getLock2(long workTime, long intervalTime) throws InterruptedException {
// 解决了上述的漏洞,还存在一个问题,线程可重入
while (jedis.setnx(LOCK, System.currentTimeMillis() + "") == 0) {
System.out.println(Thread.currentThread().getName() + "间隔秒: " + intervalTime);
TimeUnit.SECONDS.sleep(intervalTime);
}
// 问题: 如果set后在这一步前宕机,将会导致死锁。
jedis.expire(LOCK, workTime);
System.out.println(Thread.currentThread().getName() + " 获得锁");
}

/**
* lua脚本解决可重入和原子性问题
* @param workTime
* @param intervalTime
* @param value 解决可重入
* @throws InterruptedException
*/
public void getLock3(String workTime, long intervalTime, String value) throws InterruptedException {
// lua脚本解决了可重入和原子性的问题,很有一个隐患的问题,如果锁超时自动释放, 当前线程还没有完成任务,其它线程获得锁就会出现线程安全问题。
String lockScript =
"local key = KEYS[1]\n" +
"local requestId = KEYS[2]\n" +
"local ttl = tonumber(KEYS[3])\n" +
"local result = redis.call('setnx', key, requestId)\n" +
"if result == 1 then\n" +
" redis.call('expire', key, ttl)\n" +
"else\n" +
" result = -1;\n" +
" local value = redis.call('get', key)\n" +
" if (value == requestId) then\n" +
" result = 1;\n" +
" redis.call('expire', key, ttl)\n" +
" end\n" +
"end\n" +
"return result";
String sha = jedis.scriptLoad(lockScript);
while ((Long)jedis.evalsha(sha, 3, LOCK, value, workTime) == -1) {
System.out.println(Thread.currentThread().getName() + "间隔秒: " + intervalTime);
TimeUnit.SECONDS.sleep(intervalTime);
}
System.out.println(Thread.currentThread().getName() + " 获得锁");
}


@Test
public void Test() throws InterruptedException {
for (int i = 0; i < 10; i++) {
String uuid = UUID.randomUUID().toString() + "";
new Thread(new Runnable() {
@Override
public void run() {
try {
new DistributeLockStudy().getLock3("60", 1, uuid);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
Thread.currentThread().join();
}

}

2.锁续约

解决锁超时业务代码还没有执行完的问题,自实现的锁续约代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
       public void getLock4(String workTime, long intervalTime, String value) throws InterruptedException {
getLock3(workTime, intervalTime, value);
watch(Thread.currentThread(), Long.parseLong(workTime), LOCK, value);
// 忙60秒钟
TimeUnit.SECONDS.sleep(60);
}

/**
* 锁续约
* @param workThread
* @param workTimeSecond
* @param key
* @param value
*/
private void watch(Thread workThread, long workTimeSecond, String key, String value) {
long base = (long) (workTimeSecond * (1.0 / 3));
AtomicLong timer = new AtomicLong(0);
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
System.out.println("[监测持有锁线程] 计时器:" + timer.get());
TimeUnit.SECONDS.sleep(1);
if (workThread.isAlive() && timer.addAndGet(1) > base) {
long addTime = jedis.ttl(LOCK) + workTimeSecond;
jedis.expire(LOCK, addTime);
System.out.println("[锁续约] 持有锁时间:" + jedis.ttl(LOCK));
timer.set(0);
} else if (!workThread.isAlive()) {
Assert.isTrue(unlock(key, value), "解锁失败!!");
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}

public boolean unlock(String key, String value) {
String unlockScript =
"local key = KEYS[1]\n" +
"local requestId = KEYS[2]\n" +
"local value = redis.call('get', key)\n" +
"if value == requestId then\n" +
" redis.call('del', key);\n" +
" return 1;\n" +
"end\n" +
"return -1";
String sha = jedis.scriptLoad(unlockScript);
System.out.println("释放锁:" + key);
return (long)jedis.evalsha(sha, 2, key, value) == 1;
}


@Test
public void addLockTime() throws InterruptedException {

new Thread(new Runnable() {
@Override
public void run() {
DistributeLockStudy dlock = new DistributeLockStudy();
try {
dlock.getLock4("10", 1, UUID.randomUUID().toString() + "");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

Thread.currentThread().join();
}

3.RedLock

事实上这类琐最大的缺点就是加锁时只作用在一个Redis节点上,即使Redis通过sentinel保证高可用,如果这个master节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况:

  • 在Redis的master节点上拿到了锁;
  • 但是这个加锁的key还没有同步到slave节点;
  • master故障,发生故障转移,slave节点升级为master节点;
  • 导致锁丢失。

正因为如此,Redis作者antirez基于分布式环境下提出了一种更高级的分布式锁的实现方式:Redlock。笔者认为,Redlock也是Redis所有分布式锁实现方式中唯一能让面试官高潮的方式。

Redlock实现
antirez提出的redlock算法大概是这样的:

在Redis的分布式环境中,我们假设有N个Redis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。我们确保将在N个实例上使用与在Redis单实例下相同方法获取和释放锁。现在我们假设有5个Redis master节点,同时我们需要在5台服务器上面运行这些Redis实例,这样保证他们不会同时都宕掉。

为了取到锁,客户端应该执行以下操作:

  • 获取当前Unix时间,以毫秒为单位。
  • 依次尝试从5个实例,使用相同的key和具有唯一性的value(例如UUID)获取锁。当向Redis请求获取锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试去另外一个Redis实例请求获取锁。
  • 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(N/2+1,这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
  • 如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。
  • 如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功,防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
static class RedLock {
public final static RedissonRedLock RED_LOCK;

static {
Config config1 = new Config();
config1.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redissonClient1 = Redisson.create(config1);

Config config2 = new Config();
config2.useSingleServer().setAddress("redis://127.0.0.1:6380");
RedissonClient redissonClient2 = Redisson.create(config2);

Config config3 = new Config();
config3.useSingleServer().setAddress("redis://127.0.0.1:6381");
RedissonClient redissonClient3 = Redisson.create(config3);

String resourceName = "REDLOCK_KEY";

RLock lock1 = redissonClient1.getLock(resourceName);
RLock lock2 = redissonClient2.getLock(resourceName);
RLock lock3 = redissonClient3.getLock(resourceName);
// 向3个redis实例尝试加锁
RED_LOCK = new RedissonRedLock(lock1, lock2, lock3);
}

public static boolean tryLock(long waitTime, TimeUnit timeUnit) {
try {
// isLock = redLock.tryLock();
// waitTime时间内是否成功获得锁
return RED_LOCK.tryLock(waitTime, timeUnit);
} catch (Exception e) {
e.printStackTrace();
}
return false;
}

}

public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
new Thread(() -> {
while (true) {
try {
// 默认30秒租约时间,redisson自动续约
if (RedLock.tryLock(500, TimeUnit.MILLISECONDS)) {
System.out.println(Thread.currentThread().getName() + " 获得锁");
// do something...
TimeUnit.SECONDS.sleep(5);
break;
}
System.out.println(Thread.currentThread().getName() + " 等待锁");
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + " 业务执行完毕..");
RedLock.RED_LOCK.unlock();
}).start();
}
}

参考:https://www.jianshu.com/p/7e47a4503b87

 评论