BlockingQueue二

news/2024/7/5 2:08:05

接着上篇BlockingQueue没讲完的

LinkedTransferQueue

LinkedTransferQueue是一个由链表结构组成的无界阻塞队列,相对于其它阻塞队列,LinkedBlockingQueue可以算是LinkedBlockingQueue与SynhronoousQueue结合,LinkedtransferQueue是一种无界阻塞队列,底层基于单链表实现,其内部结构分为数据节点、请求节点,基于CAS无锁算法实现

与前面类似不再赘述

        final boolean isData;   
        volatile Object item;  
        volatile Node next;
        volatile Thread waiter;

其中节点操作过程类似于SynchronousQueue
在这里插入图片描述
在这里插入图片描述
与SynchronousQueue有区别的是这个可以设置是否阻塞当前线程

NOW=0表示即时操作(可能失败),即不会阻塞调用线程
poll(获取并移除首元素,如果队列为空,直接返回null)
tryTransfer(尝试将元素传递给消费者,如果没有等待的消费者则立即返回false,也不会将元素入队)

ASYNC=1表示异步操作(必然成功)
xfer被操作线程调用时,无论xfer操作过程多久完成,调用者都不会阻塞等待
offer,put,add(插入指定元素到队尾,由于是无界队列,所以会立即返回true)

SYNC=2表示同步操作(阻塞调用线程)
只有xfer操作过程达到了调用线程所期望的结果,调用者才会继续向下执行

TIMED=3表示限时同步操作

PriorityBlockingQueue

优先级队列,里面是数组,但是数组与普通数组不一样,里面的数组维护了一颗堆的二叉树
默认大小为11,但是这个可以扩容

	//默认容量
	private static final int DEFAULT_INITIAL_CAPACITY = 11;
	//最大容量
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
	//存储数据
    private transient Object[] queue;
	//元素个数
    private transient int size;
	//比较
    private transient Comparator<? super E> comparator;
	//锁
    private final ReentrantLock lock;
	//等待
    private final Condition notEmpty;
	
    private transient volatile int allocationSpinLock;

扩容

如果容量小于64的时候,扩容为原来两倍+2;
如果容量大于64的时候,扩容为原来1.5倍

    private void tryGrow(Object[] array, int oldCap) {
        lock.unlock(); //扩容前先释放锁(扩容可能会费时,先让出锁,让出队线程可以正常操作)
        Object[] newArray = null;
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {//通过CAS操作确保只有一个线程可以扩容
            try {
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : 
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {//大于当前最大容量则可能溢出
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)//扩大一个元素也溢出或者超过最大容量则抛出异常
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;//扩容后如果超过最大容量,则只扩大到最大容量
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];//根据最新容量初始化一个新数组
            } finally {
                allocationSpinLock = 0;
            }
        }
        if (newArray == null) //如果是空,说明前面CAS失败,有线程在扩容,让出CPU
            Thread.yield();
        lock.lock();//这里重新加锁是确保数组复制操作只有一个线程能进行
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);//将旧的元素复制到新数组
        }
    }

添加元素

添加元素不会阻塞线程,因为该队列是一个无界队列,因为可以扩容,所以添加元素不会出现阻塞

    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

取出元素

取出元素需要判断是否为空,如果为空则需要等待,不然直接返回

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }

    private E dequeue() {
        int n = size - 1;
        if (n < 0)
            return null;
        else {
            Object[] array = queue;
            E result = (E) array[0];
            E x = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
            return result;
        }
    }

里面主要堆的上浮与下沉

另一个上浮的方法除了比较器不同以外其它都类似,所以就讲这一个
假设我们构造的是小根堆

    private static <T> void siftUpComparable(int k, T x, Object[] array) {
    	// 其中k就是当前放的末尾的位置
        Comparable<? super T> key = (Comparable<? super T>) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1; //找到其父节点
            Object e = array[parent];
            if (key.compareTo((T) e) >= 0) //如果当前放入的值大于其父节点则跳出,否则继续
                break;
            // 到这里说明当前放入的值小于其父节点,与父节点交换位置,并且k变为父节点的位置
            array[k] = e; 
            k = parent;
        }
        array[k] = key;
    }
 private static <T> void siftDownComparable(int k, T x, Object[] array,int n) {
        if (n > 0) {
            Comparable<? super T> key = (Comparable<? super T>)x;
            int half = n >>> 1;           
            while (k < half) {
                int child = (k << 1) + 1;   
                Object c = array[child];
                int right = child + 1;
                if (right < n &&
                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                    c = array[child = right];   // 如果右孩子比左孩子小,则弄成右孩子
                if (key.compareTo((T) c) <= 0) //如果传入的值小于孩子则退出
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = key;
        }
    }

LinkedBlockingDeque

与LinkedBlockingQueue类似,只是这个是可以从两端存取,而LinkedBlockingQueue是单链表只能从一边存取,同时LinkedBlockingDeque只有一把锁,如果两把锁的话容易造成下标出错

DelayQueue

其中内部也是由一个PriorityQueue维护一个优先队列

add

    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

take

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();  //如果队列为空阻塞
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll(); //如果到期了就返回
                    first = null; // don't retain ref while waiting
                    if (leader != null) // 没有到期且leader不为空,等待
                        available.await();
                    else { //头节点为空,设置当前线程为头节点
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

Leader-Follower线程模型

在Leader-follower线程模型中每个线程有三种模式:

leader:只有一个线程成为leader,如DelayQueue如果有一个线程在等待元素到期,则其他线程就会阻塞等待
follower:会一直尝试争抢leader,抢到leader之后才开始干活
processing:处理中的线程

感谢这位大佬 双子孤狼的博客


http://lihuaxi.xjx100.cn/news/307374.html

相关文章

(For Final Exam)Linux操作系统期末整理

1.linux文件权限一共10位长度第一段指文件类型 第二段指拥有者所具有的权限 第三段指所属组的成员对于这个文件具有的权限 第四段指其他人对于这个文件有没有任何权限2.shell是一种命令解释程序 3.>将标准输出信息写入一个新文件(重定向),>>将标准输出信息添加到一个…

第一章 算法在计算中的作用

第1章 算法在计算中的作用第一周 记于2022/12/4“是否存在一个通用的过程(算法)。可以自动判定任意命题是否正确?” 否 算法:一个定义明确的是可计算过程(Input -> Computational Procedure / algorithm -> Output) 算法是正确的:若对每个输入实例算法都以…

2023-余炳森五套卷-数学一

每一次考研都当最后一次,然后就也不要怕考上。 ——圙2023-余炳森5-1 T1 证明 \(\displaystyle\lim_{n\rightarrow\infty}a_n=0\) 常常可利用 \(\displaystyle\sum_{n=1}^\infty a_n\) 收敛;而至于命题二,\(a_n\in(0,\,1)\),其极限(若存在)则是可以取到端点的 \…

【信号与系统】相位卷绕以及连续信号的符号表示

相位卷绕 从一个例子入手,对于复指数信号 x ( t ) = e ( α + j ω ) t x(t)=e^{(\alpha+j\omega)t} x(t)=<

证券期货业信息技术服务连续性管理指南 连续性资源

连续性资源 概述 行业机构根据业务影响分析和信息技术 服务连续性风险评估结果&#xff0c;在信息技术服务连续性策略指导下&#xff0c;建设信息技术服务连续性 管理所需资源并定期维护&#xff0c;保障信息技术服务连续性计划顺利执行。行业机构将信息技术服务连续性作为信…

python--pip常用命令、国内PyPI镜像、使用pip安装第三方库

让我们来看看具体内容&#xff1a; 一. pip常用命令 列出已安装的包&#xff1a; pip freeze or pip list 导出requirements.txt&#xff1a; pip freeze ><目录>/requirements.txt 在线安装包&#xff08;模块库&#xff09;&#xff1a; pip install <包名>…

如何利用InVest模型估算区域产水量

1.什么是InVEST模型 InVEST模型&#xff08;Integrated Valuation of Ecosystem Services and Tradeoffs &#xff09;是生态系统服务评估与权衡模型的简称&#xff0c;是美国自然资本项目组开发的、用于评估生态系统服务功能量及其经济价值、支持生态系统管理和决策的一套模型…

牛客网刷题(BC72、BC18、BC83、BC84、BC41、BC31、BC17、BC6)

目录 一、BC72 平均身高 二、BC18 计算带余除法​编辑 三、BC83 被5整除问题​编辑 四、BC84计算y的值 五、BC41 你是天才吗&#xff1f; 六、BC31 发布信息 七、BC17 计算表达式的值 八、BC6 小飞机 **太简单的题就没有文字叙述了~ 一、BC72 平均身高 #define _CRT_S…