跳到主要内容

22、Java JUC源码分析 - 队列-SynchronousQueue

SynchronousQueue是一种比较特殊的阻塞队列,不同于之前的阻塞队列,特点为:

1、 每次put必须有take的存在,也就是说生产者将一个元素put传递给一个消费者,消费者不存在就put不成功;

2、 内部没有容量来维持存放元素,所以size,迭代等一些方法没有意义;

3、 使用cas操作,没有使用锁;

4、 通过栈(非公平),队列(公平)2中结构来支持公平\非公平策略;

newCachedThreadPool线程池使用了这种队列。

<span style="font-size:18px;">abstract static class Transferer {
    /**
     * put和take都调用这个函数
	 * e不为null,put操作,表示生产者将e转交给一个消费者
	 * e为null,take操作,表示消费者获取一个生产者转交的数据
	 * timed和nanos支持超时
     */
    abstract Object transfer(Object e, boolean timed, long nanos);
}</span>

这个是栈和队列的公共基类,所有的put,take操作最后都是调用这个transfer方法。

先来看下非公平的栈。

看下栈的SNode结构:

 

static final class SNode {
    volatile SNode next;        // next node in stack
    volatile SNode match;       // 本节点的匹配节点
    volatile Thread waiter;     // 等待线程,用于park,unpark
    Object item;                // put时data,take时null
    int mode; //节点模式
    // Note: item and mode fields don't need to be volatile
    // since they are always written before, and read after,
    // other volatile/atomic operations.
    // 利用到到volatile语义的内存屏障
    SNode(Object item) {
        this.item = item;
    }

    boolean casNext(SNode cmp, SNode val) {
        return cmp == next &&
            UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    /**
     * 匹配当前和s节点
     */
    boolean tryMatch(SNode s) {
        if (match == null &&
            UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { //match为null,则cas修改为s
            Thread w = waiter; //如果节点有等待的线程,那就置null,unpark
            if (w != null) {    // waiters need at most one unpark
                waiter = null;
                LockSupport.unpark(w);
            }
            return true;
        }
        return match == s; //如果match已经存在或者cas失败,那就直接匹配match跟s
    }

    /**
     * 匹配线程修改成自己
     */
    void tryCancel() {
        UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
    }

    boolean isCancelled() {
        return match == this;
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long matchOffset;
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class k = SNode.class;
            matchOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("match"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

再看下栈的结构:

static final class TransferStack extends Transferer {
    
    /* 节点状态 */
    /** 未匹配的消费者,take的时候 */
    static final int REQUEST    = 0;
    /** 未匹配的生产者,put的时候 */
    static final int DATA       = 1;
    /** 有其他线程匹配 */
    static final int FULFILLING = 2;

    /** true:已经有节点匹配 */
    static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }

    /** The head (top) of the stack */
    volatile SNode head;

    boolean casHead(SNode h, SNode nh) {
        return h == head &&
            UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
    }

    /**
     * Creates or resets fields of a node. Called only from transfer
     * where the node to push on stack is lazily created and
     * reused when possible to help reduce intervals between reads
     * and CASes of head and to avoid surges of garbage when CASes
     * to push nodes fail due to contention.
     */
    static SNode snode(SNode s, Object e, SNode next, int mode) {
        if (s == null) s = new SNode(e);
        s.mode = mode;
        s.next = next;
        return s;
    }

    /**
     * Puts or takes an item.
     */
    Object transfer(Object e, boolean timed, long nanos) {
        ...
		...
    }

    /**
     * Spins/blocks until node s is matched by a fulfill operation.
     *
     * @param s the waiting node
     * @param timed true if timed wait
     * @param nanos timeout value
     * @return matched node, or s if cancelled
     */
    SNode awaitFulfill(SNode s, boolean timed, long nanos) {
        ...
		...
    }

    /**
     * 如果s节点是头结点、栈里面空的或者头结点有其他线程在匹配,那就自旋等等,说不定自旋一下,等下一次就有机会了
     */
    boolean shouldSpin(SNode s) {
        SNode h = head;
        return (h == s || h == null || isFulfilling(h.mode));
    }

    /**
     * Unlinks s from the stack.
     */
    void clean(SNode s) {
        ...
		...
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long headOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class k = TransferStack.class;
            headOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("head"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

transfer等几个相关方法,放上面太长了,单独提出来学习下:

Object transfer(Object e, boolean timed, long nanos) {
    /*
     * Basic algorithm is to loop trying one of three actions:
     *
     * 1. If apparently empty or already containing nodes of same
     *    mode, try to push node on stack and wait for a match,
     *    returning it, or null if cancelled.
     *
     * 2. If apparently containing node of complementary mode,
     *    try to push a fulfilling node on to stack, match
     *    with corresponding waiting node, pop both from
     *    stack, and return matched item. The matching or
     *    unlinking might not actually be necessary because of
     *    other threads performing action 3:
     *
     * 3. If top of stack already holds another fulfilling node,
     *    help it out by doing its match and/or pop
     *    operations, and then continue. The code for helping
     *    is essentially the same as for fulfilling, except
     *    that it doesn't return the item.
	 *
	 * transfer主要处理3种情况:
	 * 1.栈为空或跟头结点模式相同,那就入栈,等待匹配
	 * 2.可以匹配,那就入栈修改头结点标记FULFILLING|mode,然后匹配出栈
	 * 3.发现其他线程在匹配,那就帮忙把匹配的节点出栈unlink
     */

    SNode s = null; // constructed/reused as needed
    int mode = (e == null) ? REQUEST : DATA; //根据e来决定mode,e入参取决于是put还是take操作

    for (;;) {
        SNode h = head;
        if (h == null || h.mode == mode) {  // empty or same-mode
            if (timed && nanos <= 0) {      // 超时了,
                if (h != null && h.isCancelled())
                    casHead(h, h.next);     // pop cancelled node
                else
                    return null;
            } else if (casHead(h, s = snode(s, e, h, mode))) {//没有超时或者不需要超时,那就新建node入栈
                SNode m = awaitFulfill(s, timed, nanos); //入栈后就等待其他线程来匹配,匹配后返回匹配的节点
                if (m == s) {               // 如果返回的匹配节点就是自己,那说明节点被取消
                    clean(s); //清理,返回null
                    return null;
                }
				//上面s为head,这里head变化了,说明有其他线程入栈,然后匹配唤醒了s,则推进下
				//考虑先transfer(e,true,10),然后在10时间内transfer(null,false,0)情况
                if ((h = head) != null && h.next == s) 
                    casHead(h, s.next);     // help s's fulfiller
                return (mode == REQUEST) ? m.item : s.item;
            }
        } else if (!isFulfilling(h.mode)) { // 在mode不同,头结点没有其他在匹配情况下
            if (h.isCancelled())            // 头结点被取消,那就重新设置head
                casHead(h, h.next);         // pop and retry
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {//否则新节点入栈
                for (;;) { // loop until matched or waiters disappear
                    SNode m = s.next;       // m为s的匹配节点
                    if (m == null) {        // m丢失了,可能被其他线程匹配了
                        casHead(s, null);   // 出栈,重新走主流程
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                    if (m.tryMatch(s)) { //s和m匹配,方法修改m的match为s,并unpark m上等待线程
                        casHead(s, mn);     // 弹出s和m节点
                        return (mode == REQUEST) ? m.item : s.item;
                    } else                  // 不匹配,说明有其他线程在匹配
                        s.casNext(m, mn);   // help unlink
                }
            }
        } else {                            // 到这里说明栈顶在匹配了,那就推进下匹配流程,类似上面那个else if流程,只是没返回
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))          // help match
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}

/** 可用的处理器个数 */
static final int NCPUS = Runtime.getRuntime().availableProcessors();

/** 限时最大的空旋 */
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;

/** 不限时最大空旋 */
static final int maxUntimedSpins = maxTimedSpins * 16;

/** 空旋的阈值 */
static final long spinForTimeoutThreshold = 1000L;

/**
 * 自旋等待匹配或者cancel掉
 */
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    /*
     * When a node/thread is about to block, it sets its waiter
     * field and then rechecks state at least one more time
     * before actually parking, thus covering race vs
     * fulfiller noticing that waiter is non-null so should be
     * woken.
     * 阻塞park前先设置节点的waiter线程,这样匹配的时候可以唤醒该线程
	 * 
     * When invoked by nodes that appear at the point of call
     * to be at the head of the stack, calls to park are
     * preceded by spins to avoid blocking when producers and
     * consumers are arriving very close in time.  This can
     * happen enough to bother only on multiprocessors.
     * 节点调用时,如果正好在栈顶,通过自旋运气好,说不定下一次就匹配了。多处理器时可能发生这种情况。
	 * 
     * The order of checks for returning out of main loop
     * reflects fact that interrupts have precedence over
     * normal returns, which have precedence over
     * timeouts. (So, on timeout, one last check for match is
     * done before giving up.) Except that calls from untimed
     * SynchronousQueue.{poll/offer} don't check interrupts
     * and don't wait at all, so are trapped in transfer
     * method rather than calling awaitFulfill.
     */
    long lastTime = timed ? System.nanoTime() : 0;
    Thread w = Thread.currentThread();
    SNode h = head;
    int spins = (shouldSpin(s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0); //自旋次数
    for (;;) { //自旋
        if (w.isInterrupted())
            s.tryCancel(); //线程被中断了,那就cancel,match设置成自己
        SNode m = s.match;
        if (m != null) // 匹配节点存在就返回
            return m;
        if (timed) { 
            long now = System.nanoTime();
            nanos -= now - lastTime;
            lastTime = now;
            if (nanos <= 0) {
                s.tryCancel(); //超时就cancel
                continue;
            }
        }
        if (spins > 0)
            spins = shouldSpin(s) ? (spins-1) : 0; //空旋减1
        else if (s.waiter == null) 
            s.waiter = w; // establish waiter so can park next iter 空旋结束还没有匹配,设置waiter,下一次会park
        else if (!timed) //如果不限制超时,那就park
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold) //限制了超时,看看自旋的阈值,通俗讲就是看看,自旋划算还是park划算
            LockSupport.parkNanos(this, nanos);
    }
}

/**
 * Returns true if node s is at head or there is an active
 * fulfiller.
 */
boolean shouldSpin(SNode s) {
    SNode h = head;
    return (h == s || h == null || isFulfilling(h.mode));
}

/**
 * 从栈中unlink节点s
 */
void clean(SNode s) {
    s.item = null;   // forget item
    s.waiter = null; // forget thread

    /*
     * At worst we may need to traverse entire stack to unlink
     * s. If there are multiple concurrent calls to clean, we
     * might not see s if another thread has already removed
     * it. But we can stop when we see any node known to
     * follow s. We use s.next unless it too is cancelled, in
     * which case we try the node one past. We don't check any
     * further because we don't want to doubly traverse just to
     * find sentinel.
     */

    SNode past = s.next;
    if (past != null && past.isCancelled())
        past = past.next;

    // Absorb cancelled nodes at head
    SNode p;
    while ((p = head) != null && p != past && p.isCancelled())
        casHead(p, p.next); //从头结点到past节点去掉连续的cancel节点

    // Unsplice embedded nodes
    while (p != null && p != past) { //上面是去掉连续的cancel节点,这里去掉不连续的cancel节点
        SNode n = p.next;
        if (n != null && n.isCancelled())
            p.casNext(n, n.next);
        else
            p = n;
    }
}

处理还是挺复杂的,主要是不像之前那些用锁来控制。看的时候想下假如有多个线程,各自不同操作,会出现什么情况,然后跟代码走一遍。

公平的队列的结构不想看了,transfer的流程跟栈差不多,模式相同入队列,不同则从头结点匹配。参考里面那个哥们2种情况都有,真强悍。

这个阻塞队列看的太累,改天还要再看看。