分布式锁服务宕机,ZooKeeper一般是以集群部署,如果出现ZooKeeper宕机,那么只要当前正常的服务器超过集群的半数,依然可以正常提供服务持有锁资源服务器宕机,假如一台服务器获取锁之后就宕机了, 那么就会导致其他服务器无法再获取该锁. 就会造成死锁问题, 在Curator中, 锁的信息都是保存在临时节点上, 如果持有锁资源的服务器宕机, 那么ZooKeeper 就会移除它的信息, 这时其他服务器就能进行获取锁操作。
zookeeper安装单机模式
http://www.javacui.com/opensource/445.html
SpringBoot集成Curator实现Zookeeper基本操作
http://www.javacui.com/tool/615.html
SpringBoot集成Curator实现Watch事件监听
http://www.javacui.com/tool/616.html
Zookeeper实现分布式锁的机制
使用zk的临时节点和有序节点,每个线程获取锁就是在zk创建一个临时有序的节点,比如在/lock/目录下。
创建节点成功后,获取/lock目录下的所有临时节点,再判断当前线程创建的节点是否是所有的节点的序号最小的节点。
如果当前线程创建的节点是所有节点序号最小的节点,则认为获取锁成功。
比如当前线程获取到的节点序号为/lock/003,然后所有的节点列表为[/lock/001,/lock/002,/lock/003],则对/lock/002这个节点添加一个事件监听器。
如果锁释放了,会唤醒下一个序号的节点,然后重新执行第3步,判断是否自己的节点序号是最小。
比如/lock/001释放了,/lock/002监听到时间,此时节点集合为[/lock/002,/lock/003],则/lock/002为最小序号节点,获取到锁。
锁分类
InterProcessSemaphoreMutex:分布式不可重入排它锁
InterProcessMutex:分布式可重入排它锁
InterProcessReadWriteLock:分布式读写锁
InterProcessMultiLock:多重共享锁,将多个锁作为单个实体管理的容器
InterProcessSemaphoreV2:共享信号量
Shared Lock 分布式非可重入锁
官网地址:http://curator.apache.org/curator-recipes/shared-lock.html
InterProcessSemaphoreMutex是一种不可重入的互斥锁,也就意味着即使是同一个线程也无法在持有锁的情况下再次获得锁,所以需要注意,不可重入的锁很容易在一些情况导致死锁,比如你写了一个递归。
Shared Reentrant Lockf分布式可重入锁
官网地址:http://curator.apache.org/curator-recipes/shared-reentrant-lock.html
此锁可以重入,但是重入几次需要释放几次。
InterProcessMutex通过在zookeeper的某路径节点下创建临时序列节点来实现分布式锁,即每个线程(跨进程的线程)获取同一把锁前,都需要在同样的路径下创建一个节点,节点名字由uuid+递增序列组成。而通过对比自身的序列数是否在所有子节点的第一位,来判断是否成功获取到了锁。当获取锁失败时,它会添加watcher来监听前一个节点的变动情况,然后进行等待状态。直到watcher的事件生效将自己唤醒,或者超时时间异常返回。
Shared Reentrant Read Write Lock可重入读写锁
官网地址:http://curator.apache.org/curator-recipes/shared-reentrant-read-write-lock.html
读写锁维护一对关联的锁,一个用于只读操作,一个用于写操作。只要没有写锁,读锁可以被多个用户同时持有,而写锁是独占的。
读写锁允许从写锁降级为读锁,方法是先获取写锁,然后就可以获取读锁。但是,无法从读锁升级到写锁。
Multi Shared Lock 多共享锁
官网地址:http://curator.apache.org/curator-recipes/multi-shared-lock.html
多个锁作为一个锁,可以同时在多个资源上加锁。一个维护多个锁对象的容器。当调用acquire()时,获取容器中所有的锁对象,请求失败时,释放所有锁对象。同样调用release()也会释放所有的锁。
Shared Semaphore共享信号量
官网地址:http://curator.apache.org/curator-recipes/shared-semaphore.html
一个计数的信号量类似JDK的Semaphore,所有使用相同锁定路径的jvm中所有进程都将实现进程间有限的租约。此外,这个信号量大多是“公平的” - 每个用户将按照要求的顺序获得租约。
有两种方式决定信号号的最大租约数。一种是由用户指定的路径来决定最大租约数,一种是通过SharedCountReader来决定。
如果未使用SharedCountReader,则不会进行内部检查比如A表现为有10个租约,进程B表现为有20个。因此,请确保所有进程中的所有实例都使用相同的numberOfLeases值。
acuquire()方法返回的是Lease对象,客户端在使用完后必须要关闭该lease对象(一般在finally中进行关闭),否则该对象会丢失。如果进程session丢失(如崩溃),该客户端拥有的所有lease会被自动关闭,此时其他端能够使用这些lease。
编码测试
package com.example.springboot; import com.example.springboot.tool.ZkConfiguration; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.locks.*; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import java.util.Arrays; import java.util.Collection; import java.util.concurrent.TimeUnit; /** * @Auther: Java小强 * @Date: 2022/2/4 - 19:33 * @Decsription: com.example.springboot * @Version: 1.0 */ @SpringBootTest(classes = Application.class) public class CuratorTest { @Autowired private ZkConfiguration zk; // 共享信号量,多个信号量 @Test public void testInterProcessSemaphoreV22() throws Exception { CuratorFramework client = zk.curatorFramework(); // 创建一个信号量, Curator 以公平锁的方式进行实现 final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/lock", 3); new Thread(new Runnable() { @Override public void run() { // 获取锁对象 try { String threadName = Thread.currentThread().getName(); // 获取2个许可 Collection<Lease> acquire = semaphore.acquire(2); System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>"); Thread.sleep(2 * 1000); semaphore.returnAll(acquire); System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>"); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { // 获取锁对象 try { String threadName = Thread.currentThread().getName(); // 获取一个许可 Lease lease = semaphore.acquire(); System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>"); Thread.sleep(2 * 1000); semaphore.returnLease(lease); System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>"); } catch (Exception e) { e.printStackTrace(); } } }).start(); while (true) { } } // 共享信号量 @Test public void testInterProcessSemaphoreV2() throws Exception { CuratorFramework client = zk.curatorFramework(); // 创建一个信号量, Curator 以公平锁的方式进行实现 final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/lock", 1); new Thread(new Runnable() { @Override public void run() { // 获取锁对象 try { String threadName = Thread.currentThread().getName(); // 获取一个许可 Lease lease = semaphore.acquire(); System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>"); Thread.sleep(2 * 1000); semaphore.returnLease(lease); System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>"); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { // 获取锁对象 try { String threadName = Thread.currentThread().getName(); // 获取一个许可 Lease lease = semaphore.acquire(); System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>"); Thread.sleep(2 * 1000); semaphore.returnLease(lease); System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>"); } catch (Exception e) { e.printStackTrace(); } } }).start(); while (true) { } } // 多重共享锁 @Test public void testInterProcessMultiLock() throws Exception { CuratorFramework client = zk.curatorFramework(); // 可重入锁 final InterProcessLock interProcessLock1 = new InterProcessMutex(client, "/lock"); // 不可重入锁 final InterProcessLock interProcessLock2 = new InterProcessSemaphoreMutex(client, "/lock"); // 创建多重锁对象 final InterProcessLock lock = new InterProcessMultiLock(Arrays.asList(interProcessLock1, interProcessLock2)); new Thread(new Runnable() { @Override public void run() { try { String threadName = Thread.currentThread().getName(); // 获取参数集合中的所有锁 lock.acquire(); // 因为存在一个不可重入锁, 所以整个 InterProcessMultiLock 不可重入 System.out.println(threadName + "----->" + lock.acquire(2, TimeUnit.SECONDS)); // interProcessLock1 是可重入锁, 所以可以继续获取锁 System.out.println(threadName + "----->" + interProcessLock1.acquire(2, TimeUnit.SECONDS)); // interProcessLock2 是不可重入锁, 所以获取锁失败 System.out.println(threadName + "----->" + interProcessLock2.acquire(2, TimeUnit.SECONDS)); } catch (Exception e) { e.printStackTrace(); } } }).start(); while (true) { } } // 分布式读写锁 @Test public void testReadWriteLock() throws Exception { CuratorFramework client = zk.curatorFramework(); // 创建共享可重入读写锁 final InterProcessReadWriteLock locl1 = new InterProcessReadWriteLock(client, "/lock"); final InterProcessReadWriteLock lock2 = new InterProcessReadWriteLock(client, "/lock"); new Thread(new Runnable() { @Override public void run() { try { String threadName = Thread.currentThread().getName(); locl1.writeLock().acquire(); // 获取锁对象 System.out.println(threadName + "获取写锁>>>>>>>>>>>>>>>>>>>>>"); Thread.sleep(1 * 1000); locl1.readLock().acquire(); // 获取读锁,锁降级 System.out.println(threadName + "获取读锁>>>>>>>>>>>>>>>>>>>>>"); Thread.sleep(1 * 1000); locl1.readLock().release(); System.out.println(threadName + "释放读锁<<<<<<<<<<<<<<<<<<<<<"); locl1.writeLock().release(); System.out.println(threadName + "释放写锁<<<<<<<<<<<<<<<<<<<<<"); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { String threadName = Thread.currentThread().getName(); lock2.writeLock().acquire(); // 获取锁对象 System.out.println(threadName + "获取写锁>>>>>>>>>>>>>>>>>>>>>"); Thread.sleep(1 * 1000); lock2.readLock().acquire(); // 获取读锁,锁降级 System.out.println(threadName + "获取读锁>>>>>>>>>>>>>>>>>>>>>"); Thread.sleep(1 * 1000); lock2.readLock().release(); System.out.println(threadName + "释放读锁<<<<<<<<<<<<<<<<<<<<<"); lock2.writeLock().release(); System.out.println(threadName + "释放写锁<<<<<<<<<<<<<<<<<<<<<"); } catch (Exception e) { e.printStackTrace(); } } }).start(); while (true) { } } // 分布式可重入排它锁 @Test public void testInterProcessMutex() throws Exception { CuratorFramework client = zk.curatorFramework(); // 分布式可重入排它锁 final InterProcessLock lock = new InterProcessMutex(client, "/lock"); final InterProcessLock lock2 = new InterProcessMutex(client, "/lock"); new Thread(new Runnable() { @Override public void run() { try { String threadName = Thread.currentThread().getName(); lock.acquire(); // 获取锁对象 System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>"); lock.acquire(); // 测试锁重入 System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>"); Thread.sleep(1 * 1000); lock.release(); System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<"); lock.release(); System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<"); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { String threadName = Thread.currentThread().getName(); lock.acquire(); // 获取锁对象 System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>"); lock.acquire(); // 测试锁重入 System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>"); Thread.sleep(1 * 1000); lock.release(); System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<"); lock.release(); System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<"); } catch (Exception e) { e.printStackTrace(); } } }).start(); while (true) { } // 顺序不一定,但是同一个线程可以多次获取,获取几次就必须释放几次,其他线程才能获取到锁 } // 分布式不可重入排它锁 @Test void testInterProcessSemaphoreMutex() throws Exception { CuratorFramework client = zk.curatorFramework(); // 分布式不可重入排它锁 final InterProcessLock lock = new InterProcessSemaphoreMutex(client, "/lock"); final InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, "/lock"); new Thread(new Runnable() { @Override public void run() { try { String threadName = Thread.currentThread().getName(); lock.acquire(); // 获取锁对象 System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>"); // 测试锁重入 Thread.sleep(2 * 1000); lock.release(); System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<"); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { // 获取锁对象 try { String threadName = Thread.currentThread().getName(); lock2.acquire(); System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>"); Thread.sleep(2 * 1000); lock2.release(); System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<"); } catch (Exception e) { e.printStackTrace(); } } }).start(); while (true) { } // 顺序不一定,但是必须是获取后再释放其他线程才能获取到锁 } }
END
Java小强
未曾清贫难成人,不经打击老天真。
自古英雄出炼狱,从来富贵入凡尘。
发表评论: