来自 技术 2019-04-17 00:00 的文章

LinkedBlockingQueue源码解析 - CodeBear

上一篇博客,我们介绍了ArrayBlockQueue,知道了它是基于数组实现的有界阻塞队列,既然有基于数组实现的,那么一定有基于链表实现的队列了,没错,当然有,这就是我们今天的主角:LinkedBlockingQueue。ArrayBlockQueue是有界的,那么LinkedBlockingQueue是有界还是无界的呢?我觉得可以说是有界的,也可以说是无界的,为什么这么说呢?看下去你就知道了。

和上篇博客一样,我们还是先看下LinkedBlockingQueue的基本应用,然后解析LinkedBlockingQueue的核心代码。

LinkedBlockingQueue基本应用

public static void main(String[] args) throws InterruptedException { LinkedBlockingQueue<Integer> linkedBlockingQueue = new LinkedBlockingQueue(); linkedBlockingQueue.add(15); linkedBlockingQueue.add(60); linkedBlockingQueue.offer(50); linkedBlockingQueue.put(100); System.out.println(linkedBlockingQueue); System.out.println(linkedBlockingQueue.size()); System.out.println(linkedBlockingQueue.take()); System.out.println(linkedBlockingQueue); System.out.println(linkedBlockingQueue.poll()); System.out.println(linkedBlockingQueue); System.out.println(linkedBlockingQueue.peek()); System.out.println(linkedBlockingQueue); System.out.println(linkedBlockingQueue.remove(50)); System.out.println(linkedBlockingQueue); }

运行结果:

[15, 60, 50, 100]415[60, 50, 100]60[50, 100]50[50, 100]true[100]

代码比较简单,先试着分析下:

创建了一个LinkedBlockingQueue 。分别使用add/offer/put方法向LinkedBlockingQueue中添加元素,其中add方法执行了两次。打印出LinkedBlockingQueue:[15, 60, 50, 100]。打印出LinkedBlockingQueue的size:4。使用take方法弹出第一个元素,并打印出来:15。打印出LinkedBlockingQueue:[60, 50, 100]。使用poll方法弹出第一个元素,并打印出来:60。打印出LinkedBlockingQueue:[50, 100]。使用peek方法弹出第一个元素,并打印出来:50。打印出LinkedBlockingQueue:[50, 100]。使用remove方法,移除值为50的元素,返回true。打印出LinkedBlockingQueue:100。

代码比较简单,但是还是有些细节不明白:

底层是如何保证线程安全性的?数据保存在哪里,以什么形式保存的?offer/add/put都是往队列里面添加元素,区别是什么?poll/take/peek都是弹出队列的元素,区别是什么?

要解决上面的疑问,最好的途径还是看源码,下面我们就来看看LinkedBlockingQueue的核心源码。

LinkedBlockingQueue源码解析

构造方法

LinkedBlockingQueue提供了三个构造方法,如下图所示:我们一个一个来分析。

LinkedBlockingQueue()

public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }

无参的构造方法竟然直接把“锅”甩出去了,甩给了另外一个构造方法,但是我们要注意传的参数:Integer.MAX_VALUE。

LinkedBlockingQueue(int capacity)

public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }判断传入的capacity是否合法,如果不大于0,直接抛出异常。把传入的capacity赋值给capacity。新建一个Node节点,并且把此节点赋值给head和last字段。

这个capacity是什么呢?如果大家对代码有一定的感觉的话,应该很容易猜到这是LinkedBlockingQueue的最大容量。如果我们调用无参的构造方法来创建LinkedBlockingQueue的话,那么它的最大容量就是Integer.MAX_VALUE,我们把它称为“无界”,但是我们也可以指定最大容量,那么此队列又是一个“有界”队列了,所以有些博客很草率的说LinkedBlockingQueue是有界队列,或者是无界队列,个人认为这是不严谨的。

我们再来看看这个Node是个什么鬼:

static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } }

是不是有一种莫名的亲切感,很明显,这是单向链表的实现呀,next指向的就是下一个Node。

LinkedBlockingQueue(Collection<? extends E> c)

public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE);//调用第二个构造方法,传入的capacity是Int的最大值,可以说 是一个无界队列。 final ReentrantLock putLock = this.putLock; putLock.lock(); //开启排他锁 try { int n = 0;//用于记录LinkedBlockingQueue的size //循环传入的c集合 for (E e : c) { if (e == null)//如果e==null,则抛出空指针异常 throw new NullPointerException(); if (n == capacity)//如果n==capacity,说明到了最大的容量,则抛出“Queue full”异常 throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e));//入队操作 ++n;//n自增 } count.set(n);//设置count } finally { putLock.unlock();//释放排他锁 } }调用第二个构造方法,传入了int的最大值,所以可以说此时LinkedBlockingQueue是无界队列。开启排他锁putLock 。定义了一个变量n,用来记录当前LinkedBlockingQueue的size。循环传入的集合,如果其中的元素为null,则抛出空指针异常,如果n==capacity,说明到了最大的容量,则抛出“Queue full”异常,否则执行enqueue操作来进行入队,然后n进行自增。设置count为n,由此可知,count就是LinkedBlockingQueue的size了。在finally中释放排他锁putLock 。

offer

public boolean offer(E e) { if (e == null) throw new NullPointerException();//如果传入的元素为NULL,抛出异常 final AtomicInteger count = this.count;//取出count if (count.get() == capacity)//如果count==capacity,说明到了最大容量,直接返回false return false; int c = -1;//表示size Node<E> node = new Node<E>(e);//新建Node节点 final ReentrantLock putLock = this.putLock; putLock.lock();//开启排他锁 try { if (count.get() < capacity) {//如果count<capacity,说明还没有达到最大容量 enqueue(node);//入队操作 c = count.getAndIncrement();//获得count,赋值给c后完成自增操作 if (c + 1 < capacity)//如果c+1 <capacity,说明还有剩余的空间,唤醒因为调用notFull的await方法而被阻塞的线程 notFull.signal(); } } finally { putLock.unlock();//在finally中释放排他锁 } if (c == 0)//如果c==0,说明释放putLock的时候,队列中有一个元素,则调用signalNotEmpty signalNotEmpty(); return c >= 0; }如果传进来的元素为null,则抛出异常。把本类实例的count赋值给局部变量count。如果count==capacity,说明到了最大的容量,直接返回false。定义局部变量c,用来表示size,初始值是-1。新建Node节点。开启排他锁putLock。如果count>=capacity,说明到了最大的容量,释放排他锁后,返回false,因为此时c=-1,c>=0为false;如果count<capacity,说明还有剩余空间,继续往下执行。这里需要思考一个问题,为什么第三步已经判断过了是否还有剩余空间,这里还要再判断一次呢?因为可能有多个线程都在执行add/offer/put方法,当队列没有满的时候,多个线程同时执行到第三步(第三步的时候还没有开启排他锁),然后同时往下走,所以开启排他锁后,还需要重新判断下。执行入队操作。获得count,并且赋值给c后,完成自增的操作。注意,是先赋值后自增,赋值和自增的先后顺序会直接影响到后面的判断逻辑。如果c+1<capacity,说明还有剩余的空间,唤醒因为调用notFull的await方法而被阻塞的线程。这里为什么要+1再进行判断?因为在第9步中,是先赋值后自增,也就是说局部变量c保存的还是入队之前LinkedBlockingQueue的size,所以要先进行+1操作,得到的才是当前LinkedBlockingQueue的size。在finally中,释放排他锁putLock。如果c==0,说明在释放putLock排他锁的时候,队列中有且只有一个元素,则调用signalNotEmpty方法。让我们来看看signalNotEmpty方法:

private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }

代码比较简单,就是开启排他锁,唤醒因为调用notEmpty的await方法而被阻塞的线程,但是这里需要注意,这里获得的排他锁已经不再是putLock,而是takeLock。

add

public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }

add方法直接调用了offer方法,但是add和offer还不完全一样,当队列满了,如果调用offer方法,会直接返回false,但是调用add方法,会抛出"Queue full"的异常。

put

public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException();//如果传入的元素为NULL,抛出异常 int c = -1;//表示size Node<E> node = new Node<E>(e);//新建Node节点 final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count;//获得count putLock.lockInterruptibly();//开启排他锁 try { //如果到了最大容量,调用notFull的await方法,等待唤醒,用while循环,是为了防止虚假唤醒 while (count.get() == capacity) { notFull.await(); } enqueue(node);//入队 c = count.getAndIncrement();//count先赋值给c后,再进行自增操作 if (c + 1 < capacity)//如果c+1<capacity,调用notFull的signal方法,唤醒因为调用notFull的await方法而被阻塞的线程 notFull.signal(); } finally { putLock.unlock();//释放排他锁 } if (c == 0)//如果队列中有一个元素,唤醒因为调用notEmpty的await方法而被阻塞的线程 signalNotEmpty(); }如果传入的元素为NULL,则抛出异常。定义一个局部变量c,来表示size,初始值是-1。新建Node节点。把本类实例中的count赋值给局部变量count。开启排他锁putLock。如果到了最大容量,则调用notFull的await方法,阻塞当前线程,等待其他线程调用notFull的signal方法来唤醒自己,这里用while循环是为了防止虚假唤醒。执行入队操作。count先赋值给c后,再进行自增操作。如果c+1<capacity,说明还有剩余的空间,则调用notFull的signal方法,唤醒因为调用notFull的await方法而被阻塞的线程。释放排他锁putLock。如果队列中有且只有一个元素,唤醒因为调用notEmpty的await方法而被阻塞的线程。

enqueue

private void enqueue(Node<E> node) { last = last.next = node; }

入队操作是不是特别简单,就是把传入的Node节点,赋值给last节点的next字段,再赋值给last字段,从而形成一个单向链表。

小总结

至此offer/add/put的核心源码已经分析完毕,我们来做一个小总结,offer/add/put都是添加元素的方法,不过他们之间还是有所区别的,当队列满了,调用以上三个方法会出现不同的情况:

offer:直接返回false。add:虽然内部也调用了offer方法,但是队列满了,会抛出异常。put:线程会阻塞住,等待唤醒。

size

public int size() { return count.get(); }

没什么好说的,count记录着LinkedBlockingQueue的size,获得后返回就是了。

take

public E take() throws InterruptedException { E x; int c = -1;//size final AtomicInteger count = this.count;//获得count final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly();//开启排他锁 try { while (count.get() == 0) {//说明目前队列中没有数据 notEmpty.await();//阻塞,等待唤醒 } x = dequeue();//出队 c = count.getAndDecrement();//先赋值,后自减 if (c > 1)//如果size>1,说明在出队之前,队列中有至少两个元素 notEmpty.signal();//唤醒因为调用notEmpty的await方法而被阻塞的线程 } finally { takeLock.unlock();//释放排他锁 } if (c == capacity)//如果队列中还有一个剩余空间 signalNotFull(); return x; }定义局部变量c,用来表示size,初始值是-1。把本类实例的count字段赋值给临时变量count。开启响应中断的排他锁takeLock 。如果count==0,说明目前队列中没有数据,就阻塞当前线程,等待唤醒,直到其他线程调用了notEmpty的signal方法唤醒了当前线程。用while循环是为了防止虚假唤醒。进行出队操作。count先赋值给c后,在进行自减操作,这里需要注意是先赋值,后自减。如果c>1,也就是size>1,结合上面的先赋值,后自减,可知如果满足条件,说明在出队之前,队列中至少有两个元素,则调用notEmpty的signal方法,唤醒因为调用notEmpty的await方法而被阻塞的线程。释放排他锁takeLock 。如果执行出队后,队列中有且只有一个剩余空间,换个说法,就是执行出队操作前,队列是满的,则调用signalNotFull方法。

我们再来看下signalNotFull方法:

private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }开启排他锁,注意这里的排他锁是putLock 。调用notFull的signal方法,唤醒因为调用notFull的await方法而被阻塞的线程。释放排他锁putLock 。

poll

public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }

相比take方法,最大的区别就如果队列为空,执行take方法会阻塞当前线程,直到被唤醒,而poll方法,直接返回null。

peek

public E peek() { if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } }

peek方法,只是拿到头节点的值,但是不会移除该节点。

dequeue

private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }

没什么好说的,就是弹出元素,并且移除弹出的元素。

小总结

至此take/poll/peek的核心源码已经分析完毕,我们来做一个小总结,take/poll/peek都是获得头节点值的方法,不过他们之间还是有所区别的:

take:当队列为空,会阻塞当前线程,直到被唤醒。会进行出队操作,移除获得的节点。poll:当队列为空,直接返回null。会进行出队操作,移除获得的节点。peek:当队列为空,直接返回null。不会移除节点。

LinkedBlockingQueue的核心源码分析到这里完毕了,谢谢大家。