网站首页
Java
站长
开源
框架
理论
JS
Linux
DB
服务器
NET
生活
软件
PHP
其他
您的位置:首页 > 理论 > 并发编程BlockingQueue、BlockingDeque
并发编程BlockingQueue、BlockingDeque
2022-10-2    2218    0

微信截图_20221002150837.jpg


BlockingQueue

也叫做阻塞队列,在某些情况下对BlockingQueue的访问可能会造成阻塞。被阻塞的情况主要有如下两种:

当队列满了的时候进行入队列操作

当队列空了的时候进行出队列操作

阻塞队列一共有四套方法用来进行增、删、查,当每套方法对应的操作不能马上执行时会有不同的反应,下面这个表格就分类列出了这些方法:

微信截图_20221002150032.jpg

这4类方法有不同的行为

1,Throws Exception

如果操作不能立即被执行,那么将抛出异常

2.Special Value

如果操作不能被立即执行,那么将返回一个异常值(一般情况下是true/false)

3.Blocks

如果操作不能被立即执行,那么操作将会阻塞,直至可以执行。

4Times Out

如果操作不能被立即执行,那么操作将会阻塞,直至可以执行,或者超时时间到。函数的返回值说明了操作是否执行成功。

说明:不能向BlockingQueue中插入null,否则会报NullPointerException


BlockingQueue的实现类

ArrayBlockingQueue

特点:ArrayBlockingQueue是一个有边界的阻塞队列,它的内部实现是一个数组。有边界的意思是它的容量是有限的,我们必须在其初始化的时候指定它的容量大小,容量大小一旦指定就不可改变

排序方式:ArrayBlockingQueue是以先进先出的方式存储数据,最新插入的对象是尾部,最新移出的对象是头部。

DelayQueue

DelayQueue阻塞的是其内部元素,DelayQueue中的元素必须实现java.util.concurrent.Delayed接口。Delayed接口getDelay()方法的返回值就是队列元素被释放前的保持时间,如果返回0或者一个负值,就意味着该元素已经到期需要被释放,此时DelayedQueue会通过其take()方法释放此对象。由于还继承了Comparable接口,所以DelayedQueue中的元素需要进行排序,一般情况,我们都是按元素过期时间的优先级进行排序。

LinkedBlockingQueue

LinkedBlockingQueue阻塞队列大小的配置是可选的,如果我们初始化时指定一个大小,它就是有边界的,如果不指定,它就是无边界的。说是无边界,其实是采用了默认大小为Integer.MAX_VALUE的容量 。它的内部实现是一个链表。和ArrayBlockingQueue一样,LinkedBlockingQueue 也是以先进先出的方式存储数据,最新插入的对象是尾部,最新移出的对象是头部。

PriorityBlockingQueue

PriorityBlockingQueue是一个没有边界的队列,它的排序规则和 java.util.PriorityQueue一样。需要注意,PriorityBlockingQueue中允许插入null对象。所有插入PriorityBlockingQueue的对象必须实现 java.lang.Comparable接口,队列优先级的排序规则就是按照我们对这个接口的实现来定义的。另外,我们可以从PriorityBlockingQueue获得一个迭代器Iterator,但这个迭代器并不保证按照优先级顺序进行迭代。

SynchronousQueue

SynchronousQueue队列内部仅允许容纳一个元素。当一个线程插入一个元素后会被阻塞,除非这个元素被另一个线程消费。


其他队列

PriorityQueue

PriorityQueue 一个基于优先级的无界优先级队列。优先级队列的元素按照其自然顺序进行排序,或者根据构造队列时提供的 Comparator 进行排序,具体取决于所使用的构造方法。该队列不允许使用 null 元素也不允许插入不可比较的对象(没有实现Comparable接口的对象)。PriorityQueue 队列的头指排序规则最小那哥元素。如果多个元素都是最小值则随机选一个。PriorityQueue 是一个无界队列,但是初始的容量(实际是一个Object[]),随着不断向优先级队列添加元素,其容量会自动扩容,无需指定容量增加策略的细节。

ConcurrentLinkedQueue

一个基于链接节点的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。新的元素插入到队列的尾部,队列获取操作从队列头部获得元素。当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许使用 null 元素。


BlockingDeque

是java.util.concurrent包中的一个双端队列,向其中加入元素或从中取出元素都是线程安全的,如果不完全不能对BlockingDequeue插入或者取出元素,那么将会阻塞线程,deque 是 “Double Ended Queue”的简称。因此一个deque可以从两端插入和取出元素的。

一个线程可以插入元素到队列的任一端。如果队列full,那么线程将会阻塞,直到其他线程从队列中取出一个元素为止。如果队列empty,那么从队列中取元素的线程将会阻塞,直到其他线程插入一个元素为止


BlockingDeque 方法

BlockingDeque有4组不同的方法用于插入,移除以及检查双端队列中的元素。如果不能立即执行所请求的操作,则每组方法的行为都不同。这是一个方法表:

微信截图_20221002144928.jpg

这4类方法有不同的行为

1,Throws Exception

如果操作不能立即被执行,那么将抛出异常

2.Special Value

如果操作不能被立即执行,那么将返回一个异常值(一般情况下是true/false)

3.Blocks

如果操作不能被立即执行,那么操作将会阻塞,直至可以执行。

4Times Out

如果操作不能被立即执行,那么操作将会阻塞,直至可以执行,或者超时时间到。函数的返回值说明了操作是否执行成功。


相关双端队列

LinkedBlockingDeque

LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列,即可以从队列的两端插入和移除元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。

ArrayDeque

无容量大小限制,容量按需增长;非线程安全队列,无同步策略,不支持多线程安全访问;当用作栈时,性能优于Stack,当用于队列时,性能优于LinkedList;两端都可以操作;具有fail-fast特征;不能存储null;支持双向迭代器遍历。


示例代码:

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueTest {
    public static void main(final String[] arguments) throws InterruptedException {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);
        new Thread(producer).start();
        new Thread(consumer).start();
        Thread.sleep(4000);
    }
    static class Producer implements Runnable {
        private BlockingQueue<Integer> queue;
        public Producer(BlockingQueue queue) {
            this.queue = queue;
        }
        @Override
        public void run() {
            Random random = new Random();
            try {
                int result = random.nextInt(100);
                Thread.sleep(1000);
                queue.put(result);
                System.out.println("Added: " + result);
                result = random.nextInt(100);
                Thread.sleep(1000);
                queue.put(result);
                System.out.println("Added: " + result);
                result = random.nextInt(100);
                Thread.sleep(1000);
                queue.put(result);
                System.out.println("Added: " + result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    static class Consumer implements Runnable {
        private BlockingQueue<Integer> queue;
        public Consumer(BlockingQueue queue) {
            this.queue = queue;
        }
        @Override
        public void run() {
            try {
                System.out.println("Removed: " + queue.take());
                System.out.println("Removed: " + queue.take());
                System.out.println("Removed: " + queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

打印内容:

Added: 73
Removed: 73
Added: 80
Removed: 80
Added: 25
Removed: 25

Process finished with exit code 0


END

上一篇: MySQL的sql_mode模式说明
下一篇: Future机制
发表评论:
您的网名:
个人主页:
编辑内容: