LinkedTransferQueue 一个基于链表的无边界阻塞队列。这个队列按照生产者FIFO(先进先出)的顺序提供元素。队列的头节点是等待时间最长的生产者,队尾是等待时间最短的生产者。它有个特殊的方法 transfer,若调用时有消费者等待,则直接匹配将元素转移给消费者,若没有消费者等待则自旋后阻塞。



public class LinkedTransferQueue
/** head of the queue; null until first enqueue */ transient volatile Node head; /** tail of the queue; null until first append */ private transient volatile Node tail; /** * Queue nodes. Uses Object, not E, for items to allow forgetting * them after use. Relies heavily on Unsafe mechanics to minimize * unnecessary ordering constraints: Writes that are intrinsically * ordered wrt other accesses or CASes use simple relaxed forms. */ //链表节点,存储元素和阻塞的消费者 static final class Node {
//初始化的时候,若是消费者则为false,生产者为true final boolean isData; // false if this is a request node //初始化的时候如果是生产者不会为空,之后用于匹配节点 volatile Object item; // initially non-null if isData; CASed to match volatile Node next; //若等待中的线程需要被阻塞,则存储阻塞的线程 volatile Thread waiter; // null until waiting }}

内部的入队列和出队列调用的都是同一个方法 xfer(),通过一个特殊的转换标识来区分,下面的注释很清楚了,搞清楚每个方法对应的特殊值。

/* * Possible values for "how" argument in xfer method. */private static final int NOW   = 0; // for untimed poll, tryTransferprivate static final int ASYNC = 1; // for offer, put, addprivate static final int SYNC  = 2; // for transfer, takeprivate static final int TIMED = 3; // for timed poll, tryTransfer

transfer 方法,若有消费者等待直接将元素转移给消费者。

/** * Transfers the element to a consumer, waiting if necessary to do so. * * 

More precisely, transfers the specified element immediately * if there exists a consumer already waiting to receive it (in * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), * else inserts the specified element at the tail of this queue * and waits until the element is received by a consumer. * * @throws NullPointerException if the specified element is null */public void transfer(E e) throws InterruptedException {

if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted(); // failure possible only due to interrupt throw new InterruptedException(); }}public boolean tryTransfer(E e) {
return xfer(e, true, NOW, 0) == null;}public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) return true; if (!Thread.interrupted()) return false; throw new InterruptedException();}


public E take() throws InterruptedException {
E e = xfer(null, false, SYNC, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException();}public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E e = xfer(null, false, TIMED, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) return e; throw new InterruptedException();}public E poll() {
return xfer(null, false, NOW, 0);}

重点看下转移方法 xfer

/** * Implements all queuing methods. See above for explanation. * * @param e the item or null for take * @param haveData true if this is a put, else a take * @param how NOW, ASYNC, SYNC, or TIMED * @param nanos timeout in nanosecs, used only if mode is TIMED * @return an item if matched, else e * @throws NullPointerException if haveData mode but e is null */private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null)) //生产者进来,元素不能为空 throw new NullPointerException(); //若需要,为当前进来的线程创建一个节点,插入到队列 Node s = null; // the node to append, if needed retry: for (;;) {
// restart on append race //找第一个节点,不匹配继续向下找,直到下一个节点为空退出 for (Node h = head, p = h; p != null;) {
// find & match first node boolean isData = p.isData; Object item = p.item; //节点被取消或者已被转移就不匹配 if (item != p && (item != null) == isData) {
// unmatched //看当前线程是否和节点的操作类型一致 if (isData == haveData) // can't match //一致的话退出,看是否需要加入队尾 break; //操作不一致代表匹配上了,执行交换元素 if (p.casItem(item, e)) {
// match //为了节省 CAS 操作的开销,节点完成转换后,不会立即更新头节点 for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton //>=2个节点匹配了,才更新第三个节点为头节点 if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext(); break; } // advance and retry //CAS失败 if ((h = head) == null || (q = h.next) == null || !q.isMatched()) //若现在的头节点的下一个节点没有匹配成功,不更新头节点 break; // unless slack < 2 } //唤醒等待的线程 LockSupport.unpark(p.waiter); return LinkedTransferQueue.
cast(item); } } Node n = p.next; //节点不匹配,往后移 p = (p != n) ? n : (h = head); // Use head if p offlist } //没有限制时间的 poll、tryTransfer 会直接退出,其他的方法需要加入队尾 if (how != NOW) {
// No matches available if (s == null) //构建一个新节点 s = new Node(e, haveData); //尝试加入到队尾 Node pred = tryAppend(s, haveData); if (pred == null) //前驱节点为空代表入队失败,重头再来一遍 continue retry; // lost race vs opposite mode if (how != ASYNC) //需要等待被匹配 return awaitMatch(s, pred, e, (how == TIMED), nanos); } return e; // not waiting }}


/** * Tries to append node s as tail. * * @param s the node to append * @param haveData true if appending in data mode * @return null on failure due to losing race with append in * different mode, else s's predecessor, or s itself if no * predecessor */private Node tryAppend(Node s, boolean haveData) {
for (Node t = tail, p = t;;) {
// move p to last node and append Node n, u; // temps for reads of next & tail if (p == null && (p = head) == null) {
//队列为空,初始化 if (casHead(null, s)) return s; // initialize } //检查尾部节点能否作为前驱 else if (p.cannotPrecede(haveData)) return null; // lost race vs opposite mode //尾节点变动,向下遍历,找到最后的节点,继续循环 else if ((n = p.next) != null) // not last; keep traversing p = p != t && t != (u = tail) ? (t = u) : // stale tail (p != n) ? n : null; // restart if off list //设置自己为尾节点 else if (!p.casNext(null, s)) //CAS失败,找到最后的节点,继续循环 p = p.next; // re-read on CAS failure else {
if (p != t) {
// update if slack now >= 2 //和设置头节点一样,不立即修改尾节点 while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance and retry (s = s.next) != null && s != t); } //返回前驱 return p; } }}/** * Returns true if a node with the given mode cannot be * appended to this node because this node is unmatched and * has opposite data mode. */final boolean cannotPrecede(boolean haveData) {
boolean d = isData; Object x; //操作类型和s不匹配,并且没有转移成功,表明尾节点不能作为前驱 return d != haveData && (x = item) != this && (x != null) == d;}


若是方法 transfer, take timed 为 false
若是方法 timed poll, tryTransfer timed 为 true

/** * Spins/yields/blocks until node s is matched or caller gives up. * * @param s the waiting node * @param pred the predecessor of s, or s itself if it has no * predecessor, or null if unknown (the null case does not occur * in any current calls but may in possible future extensions) * @param e the comparison value for checking match * @param timed if true, wait only until timeout elapses * @param nanos timeout in nanosecs, used only if timed is true * @return matched item, or e if unmatched on interrupt or timeout */private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); int spins = -1; // initialized after first item and cancel checks ThreadLocalRandom randomYields = null; // bound if needed for (;;) {
Object item = s.item; if (item != e) {
// matched // assert item != s; //s节点已经被转换了,清除s s.forgetContents(); // avoid garbage //阻塞的线程被相反的操作唤醒后,在这里退出 return LinkedTransferQueue.
cast(item); } //线程被中断,或者任务已超时 if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, s)) {
// cancel //从队列中移除 unsplice(pred, s); //被中断或者超时自动唤醒,在这里退出 return e; } //计算需要自旋的次数 if (spins < 0) {
// establish spins at/near front if ((spins = spinsFor(pred, s.isData)) > 0) randomYields = ThreadLocalRandom.current(); } else if (spins > 0) {
// spin --spins; if (randomYields.nextInt(CHAINED_SPINS) == 0) Thread.yield(); // occasionally yield } else if (s.waiter == null) {
//自旋达到最大限制仍不能匹配成功 s.waiter = w; // request unpark then recheck } else if (timed) {
nanos = deadline - System.nanoTime(); if (nanos > 0L) //设置了超时时间,但还没有超时,阻塞剩余 nanos 时间 LockSupport.parkNanos(this, nanos); } else {
//transfer, take 操作直接阻塞,等待被唤醒 LockSupport.park(this); } }}

LinkedTransferQueue 可以看做是 LinkedBlockingQueue 和 SynchronousQueue 的结合

LinkedBlockingQueue 能够存储元素,但是内部使用了大量的锁,并发效率不高。
SynchronousQueue 内部不能存储元素,可能会导致大量生产者阻塞。


