MPSC Queue 源码分析

开发/后端 · 阅读 1899 · 点赞 0

在阅读Netty源码中发现在创建EventLoop的时候,内部的taskQueue使用了MpscQueue,MPSC的全称是Multi producer single consumer,单消费者多生产者模型,多生产者会发生竞争要做到无锁使用了CAS,而单一消费者则不需要保证线程安全

而内部大量是用二进制的偶次方特性提高运算效率,并且解决False Sharing问题

创建一个PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks)

 private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
        return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
                : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
    }

传入的大小和 MAX_ALLOWED_MPSC_CAPACITY (1 << 30 2的30次方) 取小然后和 2048 取大值,也就是初始的chunk大小为2048最大则为1 << 30,初始的chunk为1024

  static <T> Queue<T> newMpscQueue(final int maxCapacity) {
            final int capacity = max(min(maxCapacity, MAX_ALLOWED_MPSC_CAPACITY), MIN_MAX_MPSC_CAPACITY);
            //MPSC_CHUNK_SIZE 1024 
            return newChunkedMpscQueue(MPSC_CHUNK_SIZE, capacity);
        }

继续调用父类构造器,再次判断初始chuck和capacity不能超过最大2的偶次方(2^31)

 public static int roundToPowerOfTwo(final int value) {
        if (value > MAX_POW2) {
            throw new IllegalArgumentException("There is no larger power of 2 int for value:"+value+" since it exceeds 2^31.");
        }
        if (value < 0) {
            throw new IllegalArgumentException("Given value:"+value+". Expecting value >= 0.");
        }
        final int nextPow2 = 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
        return nextPow2;
    }

获取下一个偶次方的值,去掉高位0还剩下多少位 取它的偶次方,value – 1 代表如果value恰好是偶次方则返回他本身,如果不是则取下一个距离最近的偶次方

 final int nextPow2 = 1 << (32 - Integer.numberOfLeadingZeros(value - 1));

numberOfLeadingZeros是integer提供的获取高位0的位运算二分算法,因为int是4个字节32位,第一个if无符号右移16位(刚好是一半,高位补0)如果等于0则高16位没有1,所以n + 16 , 然后左移16位将剩下的16位低位放到高位,下个判断处理1/4部分 以此类推

public static int numberOfLeadingZeros(int i) {
        // 如果是0就是32个0
        if (i == 0)
            return 32;
        // 不是0则至少有一个1
        int n = 1;
        //用2举个例子 00000000 00000000 00000000 00000010 2
        if (i >>> 16 == 0) { n += 16; i <<= 16; }
        //00000000 00000000 00000000 00000000 i >>> 16
        //00000000 00000010 00000000 00000000 i <<= 16
        // n = 1 + 16
        if (i >>> 24 == 0) { n +=  8; i <<=  8; }
        //00000000 00000000 00000000 00000000 i >>> 24
        //00000010 00000000 00000000 00000000 i <<= 8
        // n = 1 + 16 + 8
        if (i >>> 28 == 0) { n +=  4; i <<=  4; }
        //不满足
        if (i >>> 30 == 0) { n +=  2; i <<=  2; }
        //00000000 00000000 00000000 00000000 i >>> 30
        //00001000 00000000 00000000 00000000 i <<=  2
        // n = 1 + 16 + 8 + 2
        n -= i >>> 31;
        // 00001000 00000000 00000000 00000000
        // 00000000 00000000 00000000 00000000 >>> 31
        // n = n - 0
        return n;
    }

最后获取到最近的偶次方两倍的大小给到maxQueueCapacity

maxQueueCapacity = ((long) Pow2.roundToPowerOfTwo(maxCapacity)) << 1;

回过头来看一下MpscChunkedArrayQueue的结构,里面使用了大量的128byte padding,看到的第一眼就知道是用来填充cache line解决False Sharing问题,可问题是为什么使用128 byte而不是64byte?

我们知道市面上绝大部分cpu使用的是64 byte的cache line,下面简单证明一下,macos和linux均使用Intel cpu,可以看到均为64 byte 并且与L1、L2、L3等level无关

getconf LEVEL1_DCACHE_LINESIZE
64
#当然你也可以直接查看device
cat /sys/devices/system/cpu/cpu0/cache/index0/coherency_line_size 
64
# MAC OS
sysctl machdep.cpu.cache.linesize
machdep.cpu.cache.linesize: 64

MpscChunkedArrayQueue padding结构如下

128 byte padding ----
8 byte long maxQueueCapacity //最大长度偶次幂
16 byte JUMP = new Object() //填充生产者数组,消费时候使用
16 byte BUFFER_CONSUMED = new Object() //填充消费者数组,消费时使用
4 byte int CONTINUE_TO_P_INDEX_CAS 
4 byte int RETRY //重试
4 byte int QUEUE_FULL //队列已满
4 byte int QUEUE_RESIZE //扩容
8 byte long producerLimit //生产上线
8 byte long producerMask // 生产者掩码
8 byte long P_LIMIT_OFFSET  //生产者索引位
128 byte padding ----
8 byte long C_INDEX_OFFSET //消费者索引位
8 byte long consumerIndex //消费者下标
8 byte long consumerMask //消费者掩码
128 byte padding ----
8 byte P_INDEX_OFFSET
8 byte producerIndex
128 byte padding ----

突然想起来jdk8+提供了@Contended注解用来自动填充padding,于是我使用JOL查看了如下对象的布局

  public class JavaContended{
        private long var0;
        @Contended
        private long var1;
    }

可以看到@Contended的实现也是基于128 byte对齐的,为啥呢? 其实这只是默认的填充大小可能是为了适配128cache line的cpu 如果恰好是cache line是128 byte而padding是64的话依然会有False Sharing问题,所以牺牲了更大的空间来换取时间

io.netty.example.echo.test.JavaContended object internals:
OFF  SZ   TYPE DESCRIPTION               VALUE
  0   8        (object header: mark)     N/A
  8   4        (object header: class)    N/A
 12   4        (alignment/padding gap)   
 16   8   long JavaContended.var0        N/A
 24 128        (alignment/padding gap)   
152   8   long JavaContended.var1        N/A
160 128        (object alignment gap)    
Instance size: 288 bytes
Space losses: 132 bytes internal + 128 bytes external = 260 bytes total

Jvm提供的关于Contended一些参数

-XX:-RestrictContended Jdk1.8启用Contended
-XX:ContendedPaddingWidth 调整padding大小
-XX:-EnableContended 开启关闭

继续看mpsc queue的源码,初始化最大容量后前还会super调用父类构造器

    // 距离初始容量最近的偶次幂
    int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity);
    // 掩码做位运算用 保证永远是偶数低位为0
    long mask = (p2capacity - 1) << 1;
    // 实际存储数据的数组 + 1 本质是为了添加jump对象做链表跳转否则越界了
    E[] buffer = allocateRefArray(p2capacity + 1);
    //生产者buf
    producerBuffer = buffer;
    producerMask = mask;
    //消费者buf
    consumerBuffer = buffer;
    consumerMask = mask;
    //producerLimit = mask本质是控制数组能添加几次元素
    //因为pindex每次 + 2 所以当小于等于mask时都会直接添加
    //本质就是控制数组添加数量为数组长度-2时进入扩容方法
    soProducerLimit(mask);

添加元素的方法

 @Override
    public boolean offer(final E e)
    {
        if (null == e)
        {
            throw new NullPointerException();
        }

        long mask;
        E[] buffer;
        long pIndex;

        while (true)
        {
            //生产上线值
            long producerLimit = lvProducerLimit();
            //生产者index 初始是0
            pIndex = lvProducerIndex();
            //此处仅在处于扩容期间QUEUE_RESIZE cas pindex期间 其他线程进入会等于1
            if ((pIndex & 1) == 1)
            {
                continue;
            }
           
            mask = this.producerMask;
            buffer = this.producerBuffer;
            //消费者index <= 消费上线 触发扩容,否则pIndex+2
            if (producerLimit <= pIndex)
            {
                int result = offerSlowPath(mask, pIndex, producerLimit);
                switch (result)
                {
                    case CONTINUE_TO_P_INDEX_CAS://跳出Switch执行casProducerIndex
                        break;
                    case RETRY: //重试
                        continue;
                    case QUEUE_FULL: //满了入队失败
                        return false;
                    case QUEUE_RESIZE: //尝试扩容或入队
                        resize(mask, buffer, pIndex, e, null);
                        return true;
                }
            }

            if (casProducerIndex(pIndex, pIndex + 2))
            {
                break;
            }
        }
        //获取下标对应的数组偏移量
        final long offset = modifiedCalcCircularRefElementOffset(pIndex, mask);
        //invalid缓存行使其它线程从内存重新读取
        soRefElement(buffer, offset, e); // release element e
        return true;
    }

这个方法就是用来判断switch的分支

 private int offerSlowPath(long mask, long pIndex, long producerLimit)
    {
        //生产index 没poll过所以是0
        final long cIndex = lvConsumerIndex();
        long bufferCapacity = getCurrentBufferCapacity(mask);

        if (cIndex + bufferCapacity > pIndex)
        {
            if (!casProducerLimit(producerLimit, cIndex + bufferCapacity))
            {
                // retry from top
                return RETRY;
            }
            else
            {
                // continue to pIndex CAS
                return CONTINUE_TO_P_INDEX_CAS;
            }
        }
        // maxQueueCapacity - (pIndex - cIndex) 满了
        else if (availableInQueue(pIndex, cIndex) <= 0)
        {
            // offer should return false;
            return QUEUE_FULL;
        }
        // 否则就把pIndex + 1然后返回resize
        else if (casProducerIndex(pIndex, pIndex + 1))
        {
            // trigger a resize
            return QUEUE_RESIZE;
        }
        else
        {
            // failed resize attempt, retry from top
            return RETRY;
        }
    }

扩容方法,本质是在构建一个链表,总结一下如果初始长度为2那么则是数组套链表每一次都会新建一个链表放到旧数组里,因为pindex + 2 永远和plimit相等直到容量满了返回false,如果长度大于2那么则会填满数组(例如初始长度4(其实+1是5)当添加的元素数量等于长度-2时则会进行扩容,将当前元素放入新数组中并放入旧数组,然后在前面添加一个jump此时数组长度刚好为5

private void resize(long oldMask, E[] oldBuffer, long pIndex, E e, Supplier<E> s)
    {
        assert (e != null && s == null) || (e == null || s != null);
        //oldBuffer的长度
        int newBufferLength = getNextBufferSize(oldBuffer);
        final E[] newBuffer;
        try
        {
            //创建一个和原来长度一样的数组
            newBuffer = allocateRefArray(newBufferLength);
        }
        catch (OutOfMemoryError oom)
        {
            assert lvProducerIndex() == pIndex + 1;
            soProducerIndex(pIndex);
            throw oom;
        }
        // 将新数组指向 producerBuffer 也就是每次resize传递进来的都是newBuffer
        producerBuffer = newBuffer;
        final int newMask = (newBufferLength - 2) << 1;
        producerMask = newMask;
        //原数组的内存地址偏移量
        final long offsetInOld = modifiedCalcCircularRefElementOffset(pIndex, oldMask);
        //新数组的内存地址偏移量
        final long offsetInNew = modifiedCalcCircularRefElementOffset(pIndex, newMask);
        //将元素e添加到新数组中
        soRefElement(newBuffer, offsetInNew, e == null ? s.get() : e);// element in new array
        //将新数组添加到旧数组的下一个索引位
        //比如 offer 1,2
        //长这个样子[1,[2](new)](old)
        soRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);// buffer linked

        // ASSERT code
        final long cIndex = lvConsumerIndex();
        //计算剩余剩余可用大小
        final long availableInQueue = availableInQueue(pIndex, cIndex);
        RangeUtil.checkPositive(availableInQueue, "availableInQueue");
        //赋值给ProducerLimit
        soProducerLimit(pIndex + Math.min(newMask, availableInQueue));
        //生产者索引 + 2
        soProducerIndex(pIndex + 2);
        //将旧数组原位置添加jump占位
        //长这个样子[1,JUMP,[2](new)](old)
        soRefElement(oldBuffer, offsetInOld, JUMP);
    }

由于是单消费者模式,无并发问题所以poll方法就比较简单了

public E poll()
    {
        //就是初始的生产者数组
        final E[] buffer = consumerBuffer;
        //从0开始
        final long index = lpConsumerIndex();
        //和生产者的初始mask一样
        final long mask = consumerMask;
        //获取数组初始元素内存偏移量
        final long offset = modifiedCalcCircularRefElementOffset(index, mask);
        //获取数组index位置的元素
        Object e = lvRefElement(buffer, offset);
        if (e == null)
        {
            //可以拿到元素但是没有拿到 可能是因为cache line 还没有share本线程还看不到
            //自旋去获取
            if (index != lvProducerIndex())
            {
                do
                {
                    e = lvRefElement(buffer, offset);
                }
                while (e == null);
            }
            else
            {
                //读写索引一致则确实没有元素可以拿了
                return null;
            }
        }
        //如果是JUMP则下一个元素必定是一个新数组的引用
        if (e == JUMP)
        {
            final E[] nextBuffer = nextBuffer(buffer, mask);
            return newBufferPoll(nextBuffer, index);
        }
        //将该位置为空(容量不变)
        soRefElement(buffer, offset, null); // release element null
        //和pindex一样每次 + 2
        soConsumerIndex(index + 2); // release cIndex
        //返回命中的元素
        return (E) e;
    }

nextBuffer其实和resize差不多,将当前consumerBuffer改为下一个数组,并且将当前位置使用BUFFER_CONSUMED填充用来标记已经消费过了

 private E[] nextBuffer(final E[] buffer, final long mask)
    {
        final long offset = nextArrayOffset(mask);
        final E[] nextBuffer = (E[]) lvRefElement(buffer, offset);
        consumerBuffer = nextBuffer;
        consumerMask = (length(nextBuffer) - 2) << 1;
        soRefElement(buffer, offset, BUFFER_CONSUMED);
        return nextBuffer;
    }

总结

多生产者模型和NioEventLoop需要同时处理多个事件比较契合,而且做到无锁并发提高了性能,通过链表方式扩容减少了array copy,巧妙利用偶次方计算长度,临界值,大量使用内存偏移+unsafe cas极大提高了性能和原子性解决无锁处理并发