Java小强个人技术博客站点    手机版
当前位置: 首页 >> Java >> Curator分布式锁

Curator分布式锁

14470 Java | 2022-2-5

分布式锁服务宕机,ZooKeeper一般是以集群部署,如果出现ZooKeeper宕机,那么只要当前正常的服务器超过集群的半数,依然可以正常提供服务持有锁资源服务器宕机,假如一台服务器获取锁之后就宕机了, 那么就会导致其他服务器无法再获取该锁. 就会造成死锁问题, 在Curator中, 锁的信息都是保存在临时节点上, 如果持有锁资源的服务器宕机, 那么ZooKeeper 就会移除它的信息, 这时其他服务器就能进行获取锁操作。


zookeeper安装单机模式

http://www.javacui.com/opensource/445.html 

SpringBoot集成Curator实现Zookeeper基本操作

http://www.javacui.com/tool/615.html 

SpringBoot集成Curator实现Watch事件监听

http://www.javacui.com/tool/616.html 


Zookeeper实现分布式锁的机制

使用zk的临时节点和有序节点,每个线程获取锁就是在zk创建一个临时有序的节点,比如在/lock/目录下。

创建节点成功后,获取/lock目录下的所有临时节点,再判断当前线程创建的节点是否是所有的节点的序号最小的节点。

如果当前线程创建的节点是所有节点序号最小的节点,则认为获取锁成功。

比如当前线程获取到的节点序号为/lock/003,然后所有的节点列表为[/lock/001,/lock/002,/lock/003],则对/lock/002这个节点添加一个事件监听器。

如果锁释放了,会唤醒下一个序号的节点,然后重新执行第3步,判断是否自己的节点序号是最小。

比如/lock/001释放了,/lock/002监听到时间,此时节点集合为[/lock/002,/lock/003],则/lock/002为最小序号节点,获取到锁。


锁分类

InterProcessSemaphoreMutex:分布式不可重入排它锁

InterProcessMutex:分布式可重入排它锁

InterProcessReadWriteLock:分布式读写锁

InterProcessMultiLock:多重共享锁,将多个锁作为单个实体管理的容器

InterProcessSemaphoreV2:共享信号量


Shared Lock 分布式非可重入锁

官网地址:http://curator.apache.org/curator-recipes/shared-lock.html 

InterProcessSemaphoreMutex是一种不可重入的互斥锁,也就意味着即使是同一个线程也无法在持有锁的情况下再次获得锁,所以需要注意,不可重入的锁很容易在一些情况导致死锁,比如你写了一个递归。

Shared Reentrant Lockf分布式可重入锁

官网地址:http://curator.apache.org/curator-recipes/shared-reentrant-lock.html 

此锁可以重入,但是重入几次需要释放几次。

InterProcessMutex通过在zookeeper的某路径节点下创建临时序列节点来实现分布式锁,即每个线程(跨进程的线程)获取同一把锁前,都需要在同样的路径下创建一个节点,节点名字由uuid+递增序列组成。而通过对比自身的序列数是否在所有子节点的第一位,来判断是否成功获取到了锁。当获取锁失败时,它会添加watcher来监听前一个节点的变动情况,然后进行等待状态。直到watcher的事件生效将自己唤醒,或者超时时间异常返回。

Shared Reentrant Read Write Lock可重入读写锁

官网地址:http://curator.apache.org/curator-recipes/shared-reentrant-read-write-lock.html 

读写锁维护一对关联的锁,一个用于只读操作,一个用于写操作。只要没有写锁,读锁可以被多个用户同时持有,而写锁是独占的。

读写锁允许从写锁降级为读锁,方法是先获取写锁,然后就可以获取读锁。但是,无法从读锁升级到写锁。

Multi Shared Lock 多共享锁

官网地址:http://curator.apache.org/curator-recipes/multi-shared-lock.html 

多个锁作为一个锁,可以同时在多个资源上加锁。一个维护多个锁对象的容器。当调用acquire()时,获取容器中所有的锁对象,请求失败时,释放所有锁对象。同样调用release()也会释放所有的锁。

Shared Semaphore共享信号量

官网地址:http://curator.apache.org/curator-recipes/shared-semaphore.html 

一个计数的信号量类似JDK的Semaphore,所有使用相同锁定路径的jvm中所有进程都将实现进程间有限的租约。此外,这个信号量大多是“公平的” - 每个用户将按照要求的顺序获得租约。

有两种方式决定信号号的最大租约数。一种是由用户指定的路径来决定最大租约数,一种是通过SharedCountReader来决定。

如果未使用SharedCountReader,则不会进行内部检查比如A表现为有10个租约,进程B表现为有20个。因此,请确保所有进程中的所有实例都使用相同的numberOfLeases值。

acuquire()方法返回的是Lease对象,客户端在使用完后必须要关闭该lease对象(一般在finally中进行关闭),否则该对象会丢失。如果进程session丢失(如崩溃),该客户端拥有的所有lease会被自动关闭,此时其他端能够使用这些lease。


编码测试

package com.example.springboot;

import com.example.springboot.tool.ZkConfiguration;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.*;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;

/**
 * @Auther: Java小强
 * @Date: 2022/2/4 - 19:33
 * @Decsription: com.example.springboot
 * @Version: 1.0
 */
@SpringBootTest(classes = Application.class)
public class CuratorTest {
    @Autowired
    private ZkConfiguration zk;

    // 共享信号量,多个信号量
    @Test
    public void testInterProcessSemaphoreV22() throws Exception {
        CuratorFramework client = zk.curatorFramework();
        // 创建一个信号量, Curator 以公平锁的方式进行实现
        final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/lock", 3);

        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    String threadName = Thread.currentThread().getName();
                    // 获取2个许可
                    Collection<Lease> acquire = semaphore.acquire(2);
                    System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(2 * 1000);
                    semaphore.returnAll(acquire);
                    System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    String threadName = Thread.currentThread().getName();
                    // 获取一个许可
                    Lease lease = semaphore.acquire();
                    System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(2 * 1000);
                    semaphore.returnLease(lease);
                    System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        while (true) {
        }
    }

    // 共享信号量
    @Test
    public void testInterProcessSemaphoreV2() throws Exception {
        CuratorFramework client = zk.curatorFramework();
        // 创建一个信号量, Curator 以公平锁的方式进行实现
        final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/lock", 1);

        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    String threadName = Thread.currentThread().getName();
                    // 获取一个许可
                    Lease lease = semaphore.acquire();
                    System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(2 * 1000);
                    semaphore.returnLease(lease);
                    System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    String threadName = Thread.currentThread().getName();
                    // 获取一个许可
                    Lease lease = semaphore.acquire();
                    System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(2 * 1000);
                    semaphore.returnLease(lease);
                    System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        while (true) {
        }
    }

    // 多重共享锁
    @Test
    public void testInterProcessMultiLock() throws Exception {
        CuratorFramework client = zk.curatorFramework();
        // 可重入锁
        final InterProcessLock interProcessLock1 = new InterProcessMutex(client, "/lock");
        // 不可重入锁
        final InterProcessLock interProcessLock2 = new InterProcessSemaphoreMutex(client, "/lock");
        // 创建多重锁对象
        final InterProcessLock lock = new InterProcessMultiLock(Arrays.asList(interProcessLock1, interProcessLock2));

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String threadName = Thread.currentThread().getName();
                    // 获取参数集合中的所有锁
                    lock.acquire();

                    // 因为存在一个不可重入锁, 所以整个 InterProcessMultiLock 不可重入
                    System.out.println(threadName + "----->" + lock.acquire(2, TimeUnit.SECONDS));
                    // interProcessLock1 是可重入锁, 所以可以继续获取锁
                    System.out.println(threadName + "----->" + interProcessLock1.acquire(2, TimeUnit.SECONDS));
                    // interProcessLock2 是不可重入锁, 所以获取锁失败
                    System.out.println(threadName + "----->" + interProcessLock2.acquire(2, TimeUnit.SECONDS));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        while (true) {
        }
    }

    // 分布式读写锁
    @Test
    public void testReadWriteLock() throws Exception {
        CuratorFramework client = zk.curatorFramework();
        // 创建共享可重入读写锁
        final InterProcessReadWriteLock locl1 = new InterProcessReadWriteLock(client, "/lock");
        final InterProcessReadWriteLock lock2 = new InterProcessReadWriteLock(client, "/lock");

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String threadName = Thread.currentThread().getName();
                    locl1.writeLock().acquire(); // 获取锁对象
                    System.out.println(threadName + "获取写锁>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(1 * 1000);
                    locl1.readLock().acquire(); // 获取读锁,锁降级
                    System.out.println(threadName + "获取读锁>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(1 * 1000);
                    locl1.readLock().release();
                    System.out.println(threadName + "释放读锁<<<<<<<<<<<<<<<<<<<<<");
                    locl1.writeLock().release();
                    System.out.println(threadName + "释放写锁<<<<<<<<<<<<<<<<<<<<<");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String threadName = Thread.currentThread().getName();
                    lock2.writeLock().acquire(); // 获取锁对象
                    System.out.println(threadName + "获取写锁>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(1 * 1000);
                    lock2.readLock().acquire(); // 获取读锁,锁降级
                    System.out.println(threadName + "获取读锁>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(1 * 1000);
                    lock2.readLock().release();
                    System.out.println(threadName + "释放读锁<<<<<<<<<<<<<<<<<<<<<");
                    lock2.writeLock().release();
                    System.out.println(threadName + "释放写锁<<<<<<<<<<<<<<<<<<<<<");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        while (true) {
        }
    }

    // 分布式可重入排它锁
    @Test
    public void testInterProcessMutex() throws Exception {
        CuratorFramework client = zk.curatorFramework();
        // 分布式可重入排它锁
        final InterProcessLock lock = new InterProcessMutex(client, "/lock");
        final InterProcessLock lock2 = new InterProcessMutex(client, "/lock");

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String threadName = Thread.currentThread().getName();
                    lock.acquire(); // 获取锁对象
                    System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");
                    lock.acquire(); // 测试锁重入
                    System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(1 * 1000);
                    lock.release();
                    System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");
                    lock.release();
                    System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String threadName = Thread.currentThread().getName();
                    lock.acquire(); // 获取锁对象
                    System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");
                    lock.acquire(); // 测试锁重入
                    System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(1 * 1000);
                    lock.release();
                    System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");
                    lock.release();
                    System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        while (true) {
        }
//        顺序不一定,但是同一个线程可以多次获取,获取几次就必须释放几次,其他线程才能获取到锁
    }


    // 分布式不可重入排它锁
    @Test
    void testInterProcessSemaphoreMutex() throws Exception {
        CuratorFramework client = zk.curatorFramework();
        // 分布式不可重入排它锁
        final InterProcessLock lock = new InterProcessSemaphoreMutex(client, "/lock");
        final InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, "/lock");

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String threadName = Thread.currentThread().getName();
                    lock.acquire(); // 获取锁对象
                    System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");
                    // 测试锁重入
                    Thread.sleep(2 * 1000);
                    lock.release();
                    System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    String threadName = Thread.currentThread().getName();
                    lock2.acquire();
                    System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(2 * 1000);
                    lock2.release();
                    System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        while (true) {
        }
//        顺序不一定,但是必须是获取后再释放其他线程才能获取到锁
    }
}

END

推荐您阅读更多有关于“ 分布式 Curator 排它锁 读写锁 信号量 ”的文章

上一篇:zookeeper安装集群模式 下一篇:SpringBoot集成Curator实现Watch事件监听

猜你喜欢

发表评论: