第7章 Java并发包中并发队列原理剖析

    PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都返回优先级最高或最低的元素。内部使用二叉堆实现。

    PriorityBlockingQueue内部有一个数组queue,用来存放队列元素。allocationSpinLock是个自旋锁,通过CAS操作来保证同时只有一个线程可以扩容队列,状态为0或1。

    由于这是一个优先队列,所以有一个comparator用来比较元素大小。

    下面为构造函数:

    可知默认队列容量为11,默认比较器为null,也就是使用元素的compareTo方法进行比较来确定元素的优先级,这意味着队列元素必须实现Comparable接口。

    boolean offer()

    1. if (e == null)
    2. throw new NullPointerException();
    3. // 获取独占锁
    4. final ReentrantLock lock = this.lock;
    5. lock.lock();
    6. int n, cap;
    7. Object[] array;
    8. // 扩容
    9. while ((n = size) >= (cap = (array = queue).length))
    10. tryGrow(array, cap);
    11. try {
    12. Comparator<? super E> cmp = comparator;
    13. if (cmp == null)
    14. // 通过对二叉堆的上浮操作保证最大或最小的元素总在根节点
    15. siftUpComparable(n, e, array);
    16. else
    17. // 使用了自定义比较器
    18. siftUpUsingComparator(n, e, array, cmp);
    19. size = n + 1;
    20. // 激活因调用take()方法被阻塞的线程
    21. notEmpty.signal();
    22. } finally {
    23. // 释放锁
    24. lock.unlock();
    25. }
    26. return true;
    27. }

    流程比较简单,下面主要看扩容和建堆操作。

    先看扩容。

    1. private void tryGrow(Object[] array, int oldCap) {
    2. // 由前面的代码可知,调用tryGrow函数前先获取了独占锁,
    3. // 由于扩容比较费时,此处先释放锁,
    4. // 让其他线程可以继续操作(如果满足可操作的条件的话),
    5. // 以提升并发性能
    6. lock.unlock();
    7. Object[] newArray = null;
    8. // 通过allocationSpinLock保证同时最多只有一个线程进行扩容操作。
    9. if (allocationSpinLock == 0 &&
    10. UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,0, 1)) {
    11. try {
    12. // 当容量比较小时,一次只增加2容量
    13. // 比较大时增加一倍
    14. int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : (oldCap >> 1));
    15. // 溢出检测
    16. if (newCap - MAX_ARRAY_SIZE > 0) {
    17. int minCap = oldCap + 1;
    18. if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
    19. throw new OutOfMemoryError();
    20. newCap = MAX_ARRAY_SIZE;
    21. }
    22. if (newCap > oldCap && queue == array)
    23. newArray = new Object[newCap];
    24. } finally {
    25. // 释放锁,没用CAS是因为同时最多有一个线程操作allocationSpinLock
    26. }
    27. }
    28. // 如果当前线程发现有其他线程正在对队列进行扩容,
    29. // 则调用yield方法尝试让出CPU资源促使扩容操作尽快完成
    30. if (newArray == null)
    31. Thread.yield();
    32. lock.lock();
    33. if (newArray != null && queue == array) {
    34. queue = newArray;
    35. System.arraycopy(array, 0, newArray, 0, oldCap);
    36. }
    1. private static <T> void siftUpComparable(int k, T x, Object[] array) {
    2. Comparable<? super T> key = (Comparable<? super T>) x;
    3. while (k > 0) {
    4. // 获取父节点,设子节点索引为k,
    5. // 则由二叉堆的性质可知,父节点的索引总为(k - 1) >>> 1
    6. int parent = (k - 1) >>> 1;
    7. // 获取父节点对应的值
    8. Object e = array[parent];
    9. // 只有子节点的值小于父节点的值时才上浮
    10. if (key.compareTo((T) e) >= 0)
    11. break;
    12. array[k] = e;
    13. k = parent;
    14. }
    15. array[k] = key;
    16. }

    如果了解二叉堆的话,此处代码是十分容易理解的。关于二叉堆,可参看。

    E poll()

    poll方法通过调用dequeue方法使最大或最小的节点出队并将其返回。

    下面来看二叉堆的下沉操作。

    1. private static <T> void siftDownComparable(int k, T x, Object[] array, int n) {
    2. if (n > 0) {
    3. Comparable<? super T> key = (Comparable<? super T>)x;
    4. int half = n >>> 1;
    5. while (k < half) {
    6. // child为两个子节点(如果有的话)中较小的那个对应的索引
    7. int child = (k << 1) + 1;
    8. Object c = array[child];
    9. int right = child + 1;
    10. // 通过比较保证child对应的为较小值的索引
    11. if (right < n &&
    12. ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
    13. c = array[child = right];
    14. if (key.compareTo((T) c) <= 0)
    15. break;
    16. // 下沉,将较小的子节点换到父节点位置
    17. array[k] = c;
    18. k = child;
    19. }
    20. array[k] = key;
    21. }
    22. }

    同上,对下沉操作有疑问的话可参考上述文章。

    void put(E e)

    调用了offer

    1. public void put(E e){
    2. offer(e);
    3. }

    E take()

    take操作的作用是获取二叉堆的根节点元素,如果队列为空则阻塞。

    1. public E take() throws InterruptedException {
    2. final ReentrantLock lock = this.lock;
    3. // 阻塞可被中断
    4. lock.lockInterruptibly();
    5. E result;
    6. try {
    7. // 队列为空就将当前线程放入notEmpty条件队列
    8. // 使用while循环判断是为了避免虚假唤醒
    9. while ( (result = dequeue()) == null)
    10. notEmpty.await();
    11. } finally {
    12. lock.unlock();
    13. }
    14. return result;
    15. }

    DelayQueue并发队列是一个无界阻塞延迟队列,队列中的每一个元素都有一个过期时间,当从队列中获取元素时只有过期元素才会出列。队列头元素是最快要过期的元素。

    第7章 Java并发包中并发队列原理剖析 - 图1

    available是由lock生成的条件变量,用以实现线程间的同步。

    leader是leader-follower模式的变体,用于减少不必要的线程等待。当一个线程调用队列的take方法变为leader线程后,它会调用条件变量available.waitNanos(delay)等待delay时间,但是其他线程(follower)则会调用available.await()进行无限等待。leader线程延迟时间过期后,会退出take方法,并通过调用available.signal()方法唤醒一个follower线程,被唤醒的线程会被选举为新的leader线程。

    boolean offer(E e)

    1. final ReentrantLock lock = this.lock;
    2. lock.lock();
    3. try {
    4. // 添加新元素
    5. q.offer(e);
    6. if (q.peek() == e) {
    7. leader = null;
    8. available.signal();
    9. }
    10. return true;
    11. } finally {
    12. lock.unlock();
    13. }
    14. }

    上述代码首先获取独占锁,然后添加元素到优先级队列,由于q是优先级队列,所以添加元素后,调用q.peek()方法返回的并不一定是当前添加的元素。当如果q.peek() == e,说明当前元素是最先要过期的,那么重置leader线程为null并激活available条件队列里的一个线程,告诉它队列里面有元素了。

    E take()

    获取并移除队列里面过期的元素,如果队列里面没有过期元素则等待。

    1. public E take() throws InterruptedException {
    2. final ReentrantLock lock = this.lock;
    3. // 可中断
    4. lock.lockInterruptibly();
    5. try {
    6. for (;;) {
    7. E first = q.peek();
    8. // 为空则等待
    9. if (first == null)
    10. available.await();
    11. else {
    12. long delay = first.getDelay(NANOSECONDS);
    13. // 过期则成功获取
    14. if (delay <= 0)
    15. return q.poll();
    16. // 执行到此处,说明头元素未过期
    17. first = null; // don't retain ref while waiting
    18. // follower无限等待,直到被唤醒
    19. if (leader != null)
    20. available.await();
    21. else {
    22. Thread thisThread = Thread.currentThread();
    23. leader = thisThread;
    24. try {
    25. // leader等待lelay时间,则头元素必定已经过期
    26. available.awaitNanos(delay);
    27. } finally {
    28. // 重置leader,给follower称为leader的机会
    29. if (leader == thisThread)
    30. leader = null;
    31. }
    32. }
    33. }
    34. }
    35. } finally {
    36. if (leader == null && q.peek() != null)
    37. // 唤醒一个follower线程
    38. available.signal();
    39. lock.unlock();
    40. }
    41. }

    一个线程调用take方法时,会首先查看头元素是否为空,为空则直接等待,否则判断是否过期。 若头元素已经过期,则直接通过poll获取并移除,否则判断是否有leader线程。 若有leader线程则一直等待,否则自己成为leader并等待头元素过期。

    E poll()

    获取并移除头过期元素,如果没有过期元素则返回null。

    1. public E poll() {
    2. final ReentrantLock lock = this.lock;
    3. lock.lock();
    4. try {
    5. E first = q.peek();
    6. // 若队列为空或没有元素过期则直接返回null
    7. if (first == null || first.getDelay(NANOSECONDS) > 0)
    8. return null;
    9. else
    10. return q.poll();
    11. } finally {
    12. lock.unlock();

    int size()

    计算队列元素个数,包含过期的和未过期的。

    相关笔记:《Java并发编程之美》阅读笔记