一、Zookeeper实现分布式锁思路
1.1.分布式锁应具备哪些条件
分布式锁应该具备以下条件:
- 在分布式环境下,在一个时间点,最多被一个请求方持有
- 高可用、高性能的获取和释放
- 具备可重入性
- 具备失效机制,防止死锁
- 具备非阻塞特性,即没有获取到锁将直接返回失败
1.2.ZooKeeper实现分布式锁所具备的特性
1.2.1.高可用、高性能
- ZooKeeper的数据是内存读取的,本身就具备高性能特性
- ZooKeeper具备很好的故障恢复能力
- 在集群模式下,当Leader节点宕机后,当前集群会选出新的Leader处理请求
- ZooKeeper的ZAB协议保证了数据一致性问题
1.2.2.失效机制
ZooKeeper的有序临时节点,在客户端宕机或者断开后,会自动删除.这保证了不会因为环境问题来导致节点不会删除.
1.3.用ZooKeeper如何实现分布式锁
分布式锁本质,就是多个资源竞争者对一份资源的排他占有.
那么在ZooKeeper中我们可以使用临时有序节点来完成这项工作:
假设我们定义当前分布式锁的名称为lock,
我们在ZooKeeper中定义了一个根目录假设为: /dlock/locks
那么我们就可以在/dlock/locks/lock下创建临时有序节点,当多个竞争者都想获取这把锁时,就会出现以下情况:
[zk: 127.0.0.1:2181(CONNECTED) 85] ls /dlock/locks/lock
[lock0000000000, lock0000000001, lock0000000002, lock0000000003, lock0000000004, lock0000000005, lock0000000006, lock0000000007, lock0000000008, lock0000000009, lock0000000010, lock0000000011, lock0000000012, lock0000000013, lock0000000014, lock0000000015, lock0000000016, lock0000000017, lock0000000018, lock0000000019, lock0000000020, lock0000000021, lock0000000022, lock0000000023, lock0000000024, lock0000000025, lock0000000026, lock0000000027, lock0000000028, lock0000000029, lock0000000030, lock0000000031, lock0000000032, lock0000000033, lock0000000034, lock0000000035, lock0000000036, lock0000000037, lock0000000038, lock0000000039, lock0000000040, lock0000000041, lock0000000042, lock0000000043, lock0000000044, lock0000000045, lock0000000046, lock0000000047, lock0000000048, lock0000000049, lock0000000050, lock0000000051, lock0000000052, lock0000000053, lock0000000054, lock0000000055, lock0000000056, lock0000000057, lock0000000058, lock0000000059, lock0000000060, lock0000000061, lock0000000062, lock0000000063, lock0000000064, lock0000000065, lock0000000066, lock0000000067, lock0000000068, lock0000000069, lock0000000070, lock0000000071, lock0000000072, lock0000000073, lock0000000074, lock0000000075, lock0000000076, lock0000000077, lock0000000078, lock0000000079, lock0000000080, lock0000000081, lock0000000082, lock0000000083, lock0000000084, lock0000000085, lock0000000086, lock0000000087, lock0000000088, lock0000000089, lock0000000090, lock0000000091, lock0000000092, lock0000000093, lock0000000094, lock0000000095, lock0000000096, lock0000000097, lock0000000098, lock0000000099]
上面的例子我是用了100个线程并发竞争,则在/dlock/locks/lock下创建了100个临时有序节点.
怎么定义谁持有这把锁呢?
因为这些创建的节点是有序的,我们可以定义最小的那个节点就是锁的持有者.
怎么释放锁呢?
你可以主动删除你创建的临时节点
假如现在你的线程创建了节点:lock0000000000,那么你调用ZooKeeper.delete后删除该节点,可以认为是释放锁
释放后,别的竞争者怎么获取到当前锁呢?
我们可以利用ZooKeeper的watcher机制,但是我们不能监控通过getChildren监控/dlock/locks/lock节点变化,因为当lock0000000000被删除后,它会通知所有监控/dlock/locks/lock子节点变化的客户端,进而引发羊群效应,实际上这也是没有太大益处的,因为下一个持有锁的一定是最小的节点,而该节点一定是离lock0000000000节点最近的节点,因为/dlock/locks/lock下的节点是有序的.
在本例子中,我们只需要在lock0000000001节点上添加lock0000000000的watcher,可以通过exists的方式添加.
锁的重入,我们怎么判断?
我们可以在锁的内部维护一个int 变量入int state=1,当重入时,我们去对该变量加1,当释放锁时,判断state==1,如果true,删除当前节点,否则只对state减1
下面我们来实现一个demo级的ZooKeeper分布式锁.
二、ZooKeeper分布式锁代码
接口定义:
package com.xu.dlock.core;
public interface DLock {
/**
* 阻塞获取锁
* @return
*/
boolean lock();
/**
* 释放锁
* @return
*/
boolean unlock();
}
package com.xu.dlock.core;
public interface DLockFactory {
/**
* 拿到对象
* @param key
* @return
*/
DLock getLock(String key);
}
获取锁的工厂实现:
@Setter
@Getter
public class ZkDLockFactory implements DLockFactory {
private String DLOCK_ROOT_PATH="/dlock/locks";
private ZooKeeper zooKeeper;
public DLock getLock(String key) {
String path = getPath(key);
try {
zooKeeper.create(path,"".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (Exception e) {
}finally {
try {
Stat stat= zooKeeper.exists(path,false);
if(stat == null){
return null;
}
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
DLock lock = new ZkLock(path,key,zooKeeper);
return lock;
}
private String getPath(String key) {
return DLOCK_ROOT_PATH+"/"+key;
}
}
锁的实现:
package com.xu.dlock.core.lock;
import com.xu.dlock.core.DLock;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class ZkLock implements DLock {
private String path;
private String name;
private String lockPath ;
private ZooKeeper zk;
private int state ;
public ZkLock(String path, String name, ZooKeeper zk) {
this.path = path;
this.name = name;
this.zk = zk;
this.state=0;
}
public boolean lock() {
boolean flag= lockInternal();
if(flag){
state++;
}
return flag;
}
private boolean lockInternal(){
try {
String result = zk.create(getPath(), "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
this.lockPath = result;
List<String> waits = zk.getChildren(path, false);
Collections.sort(waits);
String[] paths=result.split("/");
String curNodeName = paths[paths.length-1];
if (waits.get(0).equalsIgnoreCase(curNodeName)) {
return true;
}
CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < waits.size(); i++) {
String cur = waits.get(i);
if (!cur.equalsIgnoreCase(curNodeName)) {
continue;
}
String prePath = path+"/"+waits.get(i - 1);
zk.exists(prePath, new Watcher() {
public void process(WatchedEvent event) {
latch.countDown();
}
});
break;
}
latch.await();
return true;
} catch (Exception e) {
System.out.println(e.getMessage());
}
return false;
}
private String getPath() {
return path+"/"+name;
}
public boolean unlock() {
if(state>1){
state--;
return true;
}
try {
Stat stat=zk.exists(lockPath,false);
int version= stat.getVersion();
zk.delete(lockPath,version);
state--;
return true;
} catch (Exception e) {
System.out.println("unlock:"+lockPath+" ,exception,");
}
return false;
}
}
测试代码:
package com.xu.dlock.test;
import com.xu.dlock.core.DLock;
import com.xu.dlock.core.factory.ZkDLockFactory;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class DLockTest {
public final static Random random = new Random();
public static void main(String[] args) {
ZooKeeper zk = getZkClient();
CountDownLatch latch = new CountDownLatch(100);
ZkDLockFactory factory = new ZkDLockFactory();
factory.setZooKeeper(zk);
for(int i = 0;i<100;i++){
int finalI = i;
Thread t = new Thread(()->{
exec(factory);
System.out.println("Thread_"+ finalI +"释放锁完成");
latch.countDown();
},"Thread_"+i);
t.start();
}
try {
latch.await();
TimeUnit.SECONDS.sleep(5);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("测试完成");
}
public static void exec(ZkDLockFactory factory){
DLock lock=factory.getLock("lock");
System.out.println("Thread:"+Thread.currentThread().getName()+",尝试获取锁");
boolean flag=lock.lock();
System.out.println("Thread:"+Thread.currentThread().getName()+",尝试获取锁,结果:"+flag);
try {
TimeUnit.MILLISECONDS.sleep(random.nextInt(30));
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unlock();
System.out.println("Thread:"+Thread.currentThread().getName()+",释放锁锁");
}
public static ZooKeeper getZkClient(){
try {
ZooKeeper zooKeeper = new ZooKeeper("192.168.56.101:2181", 200000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getState() == Event.KeeperState.SyncConnected){
System.out.println("连接成功");
}
}
});
return zooKeeper;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}