Java小强个人技术博客站点    手机版
当前位置: 首页 >> 理论 >> 多线程同步计数器CountDownLatch,CyclicBarrier,Semaphore

多线程同步计数器CountDownLatch,CyclicBarrier,Semaphore

17300 理论 | 2022-9-27

CountDownLatch

CountDownLatch是一个同步工具类,它允许一个或多个线程等待其他线程一系列操作的完成。

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.

我们来看两个示例

@Test
public void CountDownLatchTest1() throws Exception {
	// 这里只有一个计数器,子线程都要等待主线程释放这个计数器才会开始执行,子任务等核心任务
	CountDownLatch countDownLatch = new CountDownLatch(1);
	for (int i = 1; i <= 3; i++) {
		new Thread(() -> {
			System.out.println(Thread.currentThread().getName() + "准备好了");
			try {
				countDownLatch.await();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println(Thread.currentThread().getName() + "开始奔跑");
		}, String.valueOf(i) + "号运动员\t").start();
	}
	System.out.println("运动员们进场");
	//睡眠,保证所有子线程创建完毕都进入run方法,执行await()方法
	Thread.sleep(500);
	System.out.println("裁判就位");
	System.out.println("预备");
	System.out.println("鸣枪");
	// 裁判释放计数器,子线程从await()堵塞中跳出
	countDownLatch.countDown();
}

上面是子任务等待计数器释放,用可以是核心任务把任务分发以后等待子任务完成

@Test
public void CountDownLatchTest2() throws InterruptedException {
	// 一共三个子任务需要执行,核心任务等子任务
	CountDownLatch countDownLatch = new CountDownLatch(3);
	System.out.println("主管开始分发各个子流程进行处理");
	for (int i = 1; i <= 3; i++) {
		new Thread(() -> {
			System.out.println("业务分支流程" + Thread.currentThread().getName() + "开始处理");
			countDownLatch.countDown();
			System.out.println("业务分支流程" + Thread.currentThread().getName() + "处理完成");
		}, String.valueOf(i)).start();
	}
	countDownLatch.await();
	System.out.println("主管业务处理结果汇总进行后续处理");
}



CyclicBarrier

CyclicBarrier同步工具类,循环屏障,通过它可以让一组线程等待至某个屏障(同步点)之后再全部同步执行。

所有线程被释放之后,CyclicBarrier可以被重新利用。

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.

@Test
public void CyclicBarrierTest() throws Exception {
	// 必须给定一个int类型参数parties,来表示参与屏障等待的线程的总个数
	CyclicBarrier cb = new CyclicBarrier(3, ()->{
		System.out.println("匹配完成");
	});
	System.out.println("游戏开始匹配……");
	for (int i = 0; i < 7; i++) {
		new Thread(() -> {
			boolean target = true;
			System.out.println(Thread.currentThread().getName() + "开始匹配");
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			try {
				cb.await(3, TimeUnit.SECONDS);
			} catch (Exception e) {
				target = false;
				System.out.println(Thread.currentThread().getName() + "匹配失败,请重新匹配");
			} finally {
				if (target) {
					System.out.println(Thread.currentThread().getName() + "匹配成功,开始游戏");
				}
			}
		}, String.valueOf(i) + "号玩家\t").start();
	}
	Thread.sleep(10 * 1000);
}

关于CyclicBarrier的底层执行流程总结:

1、初始化CyclicBarrier中的各种成员变量,包括parties、count以及Runnable(可选)

2、当调用await()方法时,底层会先检查计数器是否已经归零,如果是的话,那么就首先执行可选的Runnable,接下来开始下一个generation;(注意:这里只是调用Runnable的run()方法,并不是调用start()方法开启另一个线程)

3、在下一个分代中,将会重置count值为parties,并且创建新的Generation实例;

4、同时会调用Condition的singalAll方法,唤醒所有在屏障前面等待的线程,让其开始继续执行;(注意:当有可选的Runnable时,是执行完run()方法中的汇总操作,其他线程才会继续执行)

5、如果计数器没有归零,那么当前的调用线程将会通过Condition的await方法,在屏障前进行等待

6、以上所有执行流程均在lock锁的控制范围内,不会出现并发情况

7、在下一个分代时,该屏障又可以继续使用,例如计数器是3,线程1,线程2和线程3冲破了当前屏障后,下一个分代的屏障可以去给线程4,线程5和线程6使用,也可以又给线程1,线程2和线程3使用。


CountDownLatch和CyclicBarrier两者区别

CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。

CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。

CountDownLatch会阻塞主线程,CyclicBarrier不会阻塞主线程,只会阻塞子线程。

某线程中断CyclicBarrier会抛出异常,避免了所有线程无限等待。

总结

CountDownLatch:一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;

CyclicBarrier:多个线程互相等待,直到到达同一个同步点,再继续一起执行。

对于CountDownLatch来说,重点是“一个线程(多个线程)等待”,而其他的N个线程在完成“某件事情”之后,可以终止,也可以等待。而对于CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。


Semaphore

信号量,多线程并发控制工具,可以控制同时访问共享资源的线程个数。

@Test
public void SemaphoreTest() throws InterruptedException {
	// 一共三个信号量
	Semaphore semaphore = new Semaphore(3);
	// 5辆车需要进场
	for (int i = 0; i < 5; i++) {
		new Thread(() -> {
			try {
				System.out.println(Thread.currentThread().getName() + "需要进场");
				if (semaphore.availablePermits() == 0) {
					System.out.println(Thread.currentThread().getName() + "车位不足,等待中");
				}
				// 堵塞等待
				semaphore.acquire();
				System.out.println(Thread.currentThread().getName() + "成功进场");
				Thread.sleep(new Random().nextInt(2000));
				System.out.println(Thread.currentThread().getName() + "已经出场");
				semaphore.release();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}, i + "号车\t").start();
	}
	Thread.sleep(10 * 1000);
}


推荐您阅读更多有关于“ 多线程 Semaphore 信号量 计数器 CountDownLatch CyclicBarrier ”的文章

上一篇:并发编程之AtomicInteger,AtomicLong,LongAdder 下一篇:SpringBoot中application.yml引入多个YML文件

猜你喜欢

发表评论: