分布式锁及其实现

为什么需要分布式锁

​ 在单机部署的项目中,多线程间的并发控制可以由Java相关的并发处理API来控制线程间的通信和互斥。但是在分布式集群的系统中,单机部署情况下的并发控制策略就会失效了,单纯的Java API是不具备分布式环境下的并发控制能力的;所以这就需要一种跨JVM的互斥机制来控制对共享资源的访问,这就是分布式锁要解决的问题了

​ 在分布式场景下,CAP理论已经证明了任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项;所以为了保证在分布式环境下的数据最终一致性,需要很多的技术方案来支持,比如分布式事务、分布式锁等

分布式锁的特性

  1. 在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行
  2. 高可用、高性能的获取锁与释放锁
  3. 具备可重入特性
  4. 具备锁失效机制,防止死锁
  5. 具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败

分布式锁的三种实现方案

数据库实现

​ 数据库实现分布式锁主要是依赖唯一索引

(唯一索引:不允许具有索引值相同的行,从而禁止重复的索引或键值。数据库会在创建该索引时检查是否有重复的键值,并在每次使用 INSERT 或 UPDATE 语句时进行检查)

实现的思路:在数据库中创建一个表,表中包含方法名等字段,并在方法名字段上创建唯一索引,想要执行某个方法,就使用这个方法名向表中插入数据,因为做了唯一索引,所以即使多个请求同时提交到数据库,都只会保证只有一个操作能够成功,插入成功则获取到该方法的锁,执行完成后删除对应的行数据释放锁

1
2
3
4
5
6
7
8
9
CREATE TABLE `distributed_lock` (
`id` int(11) NOT NULL COMMENT '主键',
`method_name` varchar(64) NOT NULL COMMENT '方法名(需要锁住的方法名)',
`desc` varchar(255) NOT NULL COMMENT '备注信息',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `index_method_name` (`method_name`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;

数据库实现分布式锁的增强

  1. 该分布式锁依赖数据库的可用性,如果数据库是单点且挂掉,那么分布式锁功能失效
    • 解决方案:
      • 多机部署,数据同步,数据库主备切换
  2. 同一个线程在释放锁之前,行数据一直存在,无法再次插入数据;这种情况下该分布式锁不具备可重入性
    • 解决方案:在表中新增一列用于记录当前获取到锁的机器和线程信息,在该线程再次获取锁的时候,先查询表中机器和线程信息是否和当前机器和线程相同,若相同则直接获取锁
  3. 没有锁失效的机制可能会出现在获取锁之后,数据库宕机,对应的行数据没有被删除,等到数据库服务器恢复后,表中的数据仍然存在,从而无法再获取到锁;或者释放锁失败
    • 解决方案
      • 在表中新增一列,用于记录失效时间,并且需要有定时任务清除这些失效的数据;此时也需要根据业务需求考虑定时任务的执行时间,不能过长或者过短
      • 多机部署,数据同步,数据库主备切换
  4. 阻塞锁特性,在代码逻辑中增加失败重试机制(while循环),根据业务需求多次去获取锁直到成功或者达到失败次数后返回等等

数据库实现分布式锁的问题

​ 虽然我们对method_name 使用了唯一索引,并且显示使用for update来使用行级锁。

​ 但是,MySql会对查询进行优化,即便在条件中使用了索引字段,但是否使用索引来检索数据是由 MySQL 通过判断不同执行计划的代价来决定的,如果 MySQL 认为全表扫效率更高,比如对一些很小的表,它就不会使用索引,这种情况下 InnoDB 将使用表锁,而不是行锁

Redis实现分布式锁

实现思路

- setnx:当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0

1. 获取锁的时候,使用setnx加锁,锁的value值可以是一个随机生成的UUID,并使用expire命令为锁添加一个超时时间,超过该时间则自动释放锁
2. 获取锁的时候设置一个获取锁的超时时间,若超过这个时间则放弃获取锁
3. 释放锁的时候,通过随机生成的UUID去匹对锁的键值对是否对应,若是则执行delete释放锁



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/**
* redis实现分布式锁
*/
@Component
public class DistributedLock {
@Autowired
private JedisPool jedisPool;

/**
* 加锁
* @param lockName 存放redis中的key
* @param acquireTimeOut 分布式锁的过期时间
* @param timeout 获取锁的超时时间
* @return
*/
public String lockWithTimeOut(String lockName, int acquireTimeOut, long timeout) {
/**
* 先setnx key是否成功;
* 成功则设置随机值(UUID),然后设置过期时间,返回随机值给释放锁用
*
* 失败则计算获取锁的超时时间,时间未到则自旋获取锁直到成功或者达到超时时间
*/
String identifier = UUID.randomUUID().toString().replaceAll("-", "");
timeout = System.currentTimeMillis() + timeout;
String reIdentifier = "";

Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.select(0);
//带超时时间的循环获取锁实现锁阻塞特性
while(System.currentTimeMillis() < timeout){
Long setnx = jedis.setnx(lockName, identifier);
if (setnx != null && setnx == 1){
//设置过期时间
jedis.expire(lockName, acquireTimeOut);
reIdentifier = identifier;
break;
}else {
//这一步很重要
//如果key已经存在,查看过期时间,如果该key无过期时间则重新设置过期时间,以免发生死锁
Long ttl = jedis.ttl(lockName);
if (ttl == -1){
jedis.expire(lockName, acquireTimeOut);
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
System.out.println("线程中断");
Thread.currentThread().interrupt();
}
}
}
} catch (Exception e) {
//TODO 处理异常
e.printStackTrace();
} finally {
if (jedis != null)
jedis.close();
}
return reIdentifier;
}

/**
* 释放锁
* @param lockName 锁的名称
* @param identifier 锁的标识(用来验证锁中的val是否一致)
* @return
*/
public boolean releaseLock(String lockName, String identifier){
Jedis jedis = null;
boolean flag = false;
try {
jedis = jedisPool.getResource();
jedis.select(0);
jedis.watch(lockName);
String result = jedis.get(lockName);
if (result != null && identifier.equals(result)){
Transaction multi = jedis.multi();
multi.del(lockName);
List<Object> exec = multi.exec();
if (exec != null && exec.size() > 0){
flag = true;
}
}
jedis.unwatch();
return flag;
} catch (Exception e) {
//TODO 处理异常
e.printStackTrace();
return false;
} finally {
if (jedis != null)
jedis.close();
}
}

}


#### Redis分布式锁的增强

1. 锁失效时间
- 锁失效的时间需要根据实际业务需求来设置一个合适的值
- 如果设置的失效时间太短,方法没等执行完,锁就自动释放了,那么就会产生并发问题
- 如果设置的时间太长,其他获取锁的线程就可能要平白的多等一段时间
2. 可利用while循环去获取锁,可以设置重试间隔时间和最大重试时间来实现锁阻塞特性
3. 不可重入
- 解决方案
- 线程获取到锁之后,把当前主机信息和线程信息保存起来,下次再获取之前先检查自己是不是当前锁的拥有者;释放锁的时候将这些信息删除
4. 单点故障
- 解决方案:
- Redis集群,Redis主从



#### Redis实现分布式锁存在的问题

​ 这类最大的缺点就是它加锁时只作用在一个Redis节点上,即使Redis通过sentinel保证高可用,如果这个master节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况:

1. 在Redis的master节点上拿到了锁
2. 但是这个加锁的key还没有同步到slave节点
3. master故障,发生故障转移,slave节点升级为master节点
4. 导致锁丢失



### Zookeeper实现分布式锁

实现思路:

​ 每个客户端对某个方法加锁时,在zookeeper上的与该方法对应的指定节点的目录下,生成一个唯一的瞬时有序节点(EPHEMERAL_SEQUENTIAL)

​ 使用Zookeeper可以实现的分布式锁是阻塞的,客户端可以通过在ZK中创建瞬时有序节点,并且在节点上绑定监听器,一旦节点发生变化,ZK会通知客户端,客户端可以检查自己创建的节点是不是当前所有节点中序号最小的,如果是那么自己就获取到锁,反之则继续等待

​ 当释放锁的时候,只需将这个瞬时节点删除即可。同时,其可以避免服务宕机导致的锁无法释放,而产生的死锁问题,因为瞬时节点在会话断开后就会自动删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
/**
* Zookeeper 实现分布式锁
*/
public class ZooKeeperLock implements Watcher {

// ZK对象
private ZooKeeper zk = null;
// 分布式锁的根节点
private String rootLockNode;
// 竞争资源,用来生成子节点名称
private String lockName;
// 当前锁
private String currentLock;
// 等待的锁(前一个锁)
private String waitLock;
// 计数器(用来在加锁失败时阻塞加锁线程)
private CountDownLatch countDownLatch;
// 超时时间
private int sessionTimeout = 30000;


/**
* 构造器中创建ZK链接,创建锁的根节点
*
* @param zkAddress ZK的地址
* @param rootLockNode 根节点名称
* @param lockName 子节点名称
*/
public ZooKeeperLock(String zkAddress, String rootLockNode, String lockName) {
this.rootLockNode = rootLockNode;
this.lockName = lockName;
try {
/**
* 创建连接,zkAddress格式为:IP:PORT
* watcher监听器为自身
*/
zk = new ZooKeeper(zkAddress, this.sessionTimeout, this);
/**
* 检测锁的根节点是否存在,不存在则创建
*/
Stat stat = zk.exists(rootLockNode, false);
if (null == stat) {
zk.create(rootLockNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 加锁方法,先尝试加锁,不能加锁则等待上一个锁的释放
*
* @return
*/
public boolean lock() {
if (this.tryLock()) {
System.out.println("线程【" + Thread.currentThread().getName() + "】加锁(" + this.currentLock + ")成功!");
return true;
} else {
return waitOtherLock(this.waitLock, this.sessionTimeout);
}
}

public boolean tryLock() {
// 分隔符
String split = "_lock_";
if (this.lockName.contains("_lock_")) {
throw new RuntimeException("lockName can't contains '_lock_' ");
}
try {
/**
* 创建锁节点(临时有序节点)并且得到节点名称
*
* path: 根节点/子锁名称+分隔符
*/
this.currentLock = zk.create(this.rootLockNode + "/" + this.lockName + split, new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

System.out.println("线程【" + Thread.currentThread().getName()
+ "】创建锁节点(" + this.currentLock + ")成功,开始竞争...");

/**
* 获取所有子节点
*/
List<String> nodes = zk.getChildren(this.rootLockNode, false);
/**
* 获取所有正在竞争lockName的锁
*/
List<String> lockNodes = new ArrayList<String>();
for (String nodeName : nodes) {
if (nodeName.split(split)[0].equals(this.lockName)) {
lockNodes.add(nodeName);
}
}
Collections.sort(lockNodes);

/**
* 获取最小节点与当前锁节点比对加锁
*
* 比对最小节点的名称是否跟刚才创建的临时节点名称一致
* 一致则证明当前加锁成功
*/
String currentLockPath = this.rootLockNode + "/" + lockNodes.get(0);
if (this.currentLock.equals(currentLockPath)) {
return true;
}

/**
* 加锁失败,设置前一节点为等待锁节点
*/
String currentLockNode = this.currentLock.substring(this.currentLock.lastIndexOf("/") + 1);
int preNodeIndex = Collections.binarySearch(lockNodes, currentLockNode) - 1;
this.waitLock = lockNodes.get(preNodeIndex);

} catch (Exception e) {
e.printStackTrace();
}
return false;
}

/**
* 等待获取锁,带超时时间
*
* @param waitLock 当前节点的前一个锁
* @param sessionTimeout 等待获取锁的超时时间
* @return
*/
private boolean waitOtherLock(String waitLock, int sessionTimeout) {
boolean islock = false;
try {
// 监听等待锁节点
String waitLockNode = this.rootLockNode + "/" + waitLock;
Stat stat = zk.exists(waitLockNode, true);
if (null != stat) {
System.out.println("线程【" + Thread.currentThread().getName()
+ "】锁(" + this.currentLock + ")加锁失败,等待锁(" + waitLockNode + ")释放...");
/**
* 设置计数器,使用计数器阻塞线程,带超时时间
*/
this.countDownLatch = new CountDownLatch(1);
islock = this.countDownLatch.await(sessionTimeout, TimeUnit.MILLISECONDS);
this.countDownLatch = null;
if (islock) {
System.out.println("线程【" + Thread.currentThread().getName() + "】锁("
+ this.currentLock + ")加锁成功,锁(" + waitLockNode + ")已经释放");
} else {
System.out.println("线程【" + Thread.currentThread().getName() + "】锁("
+ this.currentLock + ")加锁失败...");
}
} else {
islock = true;
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return islock;
}

/**
* 释放分布式锁
*
* @throws InterruptedException
*/
public void unlock() throws InterruptedException {
try {
Stat stat = zk.exists(this.currentLock, false);
if (null != stat) {
System.out.println("线程【" + Thread.currentThread().getName() + "】释放锁 " + this.currentLock);
zk.delete(this.currentLock, -1);
this.currentLock = null;
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
} finally {
zk.close();
}
}

/**
* 节点监听器回调
*
* @param watchedEvent
*/
@Override
public void process(WatchedEvent watchedEvent) {
/**
* 监听节点删除的事件
* 计数器减一,恢复线程操作
*/
if (null != this.countDownLatch && watchedEvent.getType() == Event.EventType.NodeDeleted) {
this.countDownLatch.countDown();
}
}
}

Curator的分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* Curator 实现的分布式锁:
* InterProcessMutex: 分布式可重入排它锁
* InterProcessSemaphoreMutex: 分布式排它锁
* InterProcessReadWriteLock: 分布式读写锁
* InterProcessMultiLock: 将多个锁作为单个实体管理的容器
*/
public class CuratorLock {

public static void main(String[] args) {
/**
* 设置重试策略,创建zk客户端
* curator链接zookeeper的策略:ExponentialBackoffRetry
* baseSleepTimeMs:初始sleep的时间
* maxRetries:最大重试次数
* maxSleepMs:最大重试时间
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
// 启动客户端
client.start();
/**
* 创建分布式可重入排他锁,监听客户端为client,锁的根节点为/locks
*/
InterProcessMutex mutex = new InterProcessMutex(client, "/locks");
try {
/**
* 加锁操作
* public boolean acquire(long time, TimeUnit unit)
* 第一个参数是超时时间
* 第二个参数是时间的单位
*/
mutex.acquire(3, TimeUnit.SECONDS);

/**
* 释放锁
*/
mutex.release();
} catch (Exception e) {
e.printStackTrace();
} finally {
client.close();
}
}
}

ZK实现分布式锁的问题

使用Zookeeper也有可能带来并发问题:由于网络抖动,客户端可ZK集群的session连接断了,那么zk以为客户端挂了,就会删除临时节点,这时候其他客户端就可以获取到分布式锁了,就可能产生并发问题。

​ 这个问题不常见是因为zk有重试机制,一旦zk集群检测不到客户端的心跳,就会重试,Curator客户端支持多种重试策略。多次重试之后还不行的话才会删除临时节点(所以选择一个合适的重试策略也比较重要,要在锁的粒度和并发之间找一个平衡)

三者的比较

  1. 从性能角度(从高到低)
    • 缓存 > Zookeeper >= 数据库
  2. 从可靠性角度(从高到低)
    • Zookeeper > 缓存 > 数据库