文章目录
  1. 1. 对于数组元素预加载的补充解释
  2. 2. Disruptor是如何通过位运算提升取模效率的?
  3. 3. 缓存行填充与局部性原理
  4. 4. 再次分析Disruptor对变量的缓存行填充原理
  5. 5. 无锁的Disruptor
    1. 5.1. 举个栗子
    2. 5.2. Disruptor如何进行CAS
    3. 5.3. CAS指令
    4. 5.4. CAS就没有什么问题么?

在之前的文章中,我们讨论了Disruptor高性能实现机制中的:

  • RingBuffer环形队列及内存预加载
  • 缓存行填充避免伪共享

本文开始之前先对之前没有讲到的细节进行补充。

对于数组元素预加载的补充解释

private void fill(EventFactory<E> eventFactory)
{
    for (int i = 0; i < bufferSize; i++)
    {
        entries[BUFFER_PAD + i] = eventFactory.newInstance();
    }
}

一次性填充慢整个数组,这样做是一个比较有技巧的做法,Disruptor通过填充慢数组,在运行时改变对象的值来达到防止Java垃圾回收(GC)产生的系统开销。

换句话说就是它不需要垃圾回收。

Disruptor是如何通过位运算提升取模效率的?

我们已经知道,RingBufferSize为2的N次方时,可以通过位于运算提升取模效率,公式为:

seq & (ringBufferSize - 1) == index

即:当前event的sequence与RingBufferSize-1的差进行位于运算,就等价于sequence Mod RingBufferSize,但是效率更高。

在Disruptor的源码中具体是如何利用该机制的?

@Override
public E get(long sequence)
{
    return elementAt(sequence);
}

Disruptor通过get(sequence)从RingBuffer中取出下一个可用的sequence位于RingBuffer中的下标,具体实现在elementAt方法中。

// com.lmax.disruptor.RingBufferFields#elementAt
protected final E elementAt(long sequence)
{
    return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}

可以看到elementAt是通过UNSAFE直接调用底层方法getObject,通过递增序列号获取与序列号对应的数组元素。

缓存行填充与局部性原理

我们知道Disruptor是通过缓存行填充避免了伪共享问题。

实际上这与 “局部性原理” 息息相关。

解释下什么叫做:局部性原理。

程序的局部性原理指的是在一段时间内程序的执行会限定在一个局部范围内。
这里的“局部性”可以从两个方面来理解,一个是时间局部性,另一个是空间局部性。

时间局部性指的是程序中的某条指令一旦被执行,不久之后这条指令很可能再次被执行;如果某条数据被访问,不久之后这条数据很可能再次被访问。

而空间局部性是指某块内存一旦被访问,不久之后这块内存附近的内存也很可能被访问。

CPU缓存读写就利用了局部性原理。

当CPU从主内存加载数据A时,它会将数据A缓存至CPU的高速缓存cache中。除了A会被缓存,A附近的数据也会被缓存。

根据局部性原理分析,由于A会被访问,那么A周围的其他数据也很有可能会被访问,如果一并加载则会提升程序的性能。

但是由于多核CPU同时修改同一缓存行,导致缓存行失效后重新加载主内存,因此出现了伪共享的问题。

再次分析Disruptor对变量的缓存行填充原理

首先看一下Disruptor中对 INITIAL_CURSOR_VALUE 的特殊处理。

1
2
3
4
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
{
public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
protected long p1, p2, p3, p4, p5, p6, p7;

RingBuffer继承于RingBufferField

1
abstract class RingBufferFields<E> extends RingBufferPad

RingBufferFields继承于RingBufferPad

1
2
3
4
abstract class RingBufferPad
{
protected long p1, p2, p3, p4, p5, p6, p7;
}

那么我们就知道,在INITIAL_CURSOR_VALUE前后各填充了7个long型变量。

前面的 7 个来自继承的 RingBufferPad 类,后面的 7 个则是直接定义在 RingBuffer 类里
面。

这14个变量没有任何实际的用途。既不会去读也不会去写他们。

padding.png

可以看到,常量INITIAL_CURSOR_VALUE前后各填充了7个long型变量,无论CPU高速缓存如何加载缓存行(一个缓存行8个long型长度),整个缓存行都没有会发生变更的数据,这个8个long类型的缓存行无论如何加载上面的内存行,都能够读到常量,且不会加载除了常量的其他变量。

而INITIAL_CURSOR_VALUE是一个常量,也不会进行修改。所以一旦它被加载到CPU Cache 之后,只要被频繁地读取访问,就不会再被换出 Cache 了。这也就意味着对于这个值的读取速度,会是一直是 CPU Cache 的访问速度,而不是内存的访问速度

这有效的解决了伪共享的问题。

无锁的Disruptor

JUC中的队列BlockingQueue是通过加锁实现对生产者和消费者的协调。

加锁就意味着需要牺牲高性能,换来线程安全。

有没有办法既能高性能,还能线程安全?

Disruptor给出的答案是,“无锁”。

无锁,并不是完全消除锁,而是指没有OS层面的锁。

Disruptor通过CAS(Compare And Swap)指令实现了无锁化。具体的指令是cmpxchg,本文会做简单讲解。感兴趣的读者可以自行搜索资料了解详细内容。

简单解释下CAS具体干了什么事情。

CAS, 比较并交换,Compare And Swap。顾名思义,就是通过比较值是否发生变化,决定是否要重新赋值。
如果在操作期间,值没有被其他线程操作,那么就将期望的值赋值给它,否则发现期望的值与旧值不等,说明值已经变更,则不执行操作,返回操作失败。

举个栗子

比如说,旧值oldValue为1,期望的值expectValue为1,新值newValue为2。如果没有其他线程修改旧值,那么

  • expectValue == oldValue
  • 将newValue写入,当前值为2

如果在操作过程中,oldValue被其他线程操作修改为2,那么当前线程的expectValue(1)与oldValue(2)比较就不等,写入失败。

Disruptor如何进行CAS

我们知道Disruptor核心数据结构为RingBuffer,Disruptor为RingBuffer分配了一个Sequence对象,用于标识RingBuffer的头和尾,这个标识不是通过指针实现的,而是通过序号。

这个序号也就是Sequence。

虽然逻辑上RingBuffer是一个环形数组,但是在内存中是以一个普通的数组形式存在的。

RingBuffer中通过对比序号的方式对生产者和消费者间的资源进行协调。

每当生产者要往队列中加入新数据,生产者都会将当前sequence + 准备加入队列的数据量,与消费者所在位置进行比较,以判断是否存在足够的空间放这些数据,而不至于覆盖掉消费者没有消费的数据。

用体育术语就叫“套圈”。

如图所示:ringBufferSize=16,生产者当前sequence指向20,消费者sequence指向27。

cas.png

我们简单计算一下这个场景下,生产者能否继续写入4个元素。

  • 对于消费者而言,27 MOD 16 = 11
  • 对于生产者而言,20 + 4 = 24(预计写入的最大序号),24 MOD 16 = 8
  • 生产者若成功写入4个元素,则sequence指向数组的第8个位置,8 < 11, 表明生产者没有覆盖消费者的进度。
  • 生产者不需要等待消费者,直接生产数据即可。而且并不会覆盖消费者未处理完的数据。

cas1.png

实际上,Disruptor的代码实现就是通过compareAndSet方法实现了CAS无锁化操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Atomically add the supplied value.
*
* @param increment The value to add to the sequence.
* @return The value after the increment.
*/
public long addAndGet(final long increment)
{
long currentValue;
long newValue;
do
{
currentValue = get();
newValue = currentValue + increment;
}
while (!compareAndSet(currentValue, newValue));
return newValue;
}

我们可以看到,这里通过while循环不断尝试CAS操作,如果CAS操作不成功就会自旋重试,这个操作并没有使用OS层面的锁,因此效率要大幅高于OS层面的锁机制(管程)。

addAndGet调用了compareAndSet方法:

1
2
3
4
5
6
7
8
9
10
11
/**
* Perform a compare and set operation on the sequence.
*
* @param expectedValue The expected current value.
* @param newValue The value to update to.
* @return true if the operation succeeds, false otherwise.
*/
public boolean compareAndSet(final long expectedValue, final long newValue)
{
return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
}

可以看到最终是调用了UNSAFE的compareAndSwapLong方法,该方法为native方法,在JVM层面调用了CAS指令。

CAS指令

上文我们提到,Disruptor的CAS最终调用的是CPU层面的机器指令cmpxchg

该指令的详细描述:

1
2
3
compxchg [ax] (隐式参数,EAX 累加器),
[bx] (源操作数地址),
[cx] (目标操作数地址)

简单解释下:

  • cmpxchg指令有三个操作数,操作数ax不在指令里面出现,是一个隐式的操作数,准确地说它是EAX累加寄存器里面的值。
  • 操作数bx是源操作数,指令会对比这个操作数和上面的累加寄存器里面的值是否相等,如果相等
    CPU 会把 ZF(也就是条件码寄存器里面零标志位的值)设置为 1,然后再把操作数cx(也就是目标操作数)设置到源操作数的地址上。
  • 如果不相等的话,就把源操作数里面的值设置到累加器寄存器里面

由于cmpxchg是cpu级别的指令,因此直接调用就可以保证cas操作的原子性。

由于去除了OS层面的锁,即便CAS存在比较操作与自旋操作,其本质也是无锁化操作,这种无锁化机制消除了上下文切换,对于CPU极为友好,因此运行效率很快。

事实上,在JUC包中,也提供了大量的CAS相关工作类方便我们操作,这些类一般以atomic开头,如果去研究其实现,我们同样会发现最终是通过UNSAFE调用了底层的CAS实现,实现无锁化操作,减少上下文切换,提升代码运行速率。

加锁导致的上下文切换之所以会显著影响代码运行速度,主要原因在于获取锁的过程中,CPU需要等待OS层面的锁竞争结果,对于没有获取锁的线程需要进行挂起,此时就需要进行上下文切换。

上下文切换会把挂起线程的寄存器中的数据放到线程栈,该操作会导致加载到高速缓存中的数据也失效,进而导致程序运行速率比无锁更慢。

CAS就没有什么问题么?

当然CAS操作同样也会存在缺点,那就是由于CAS操作本身需要进行对比,如果不相等则会一直自旋(busy-wait),这样的操作会使得cpu的负载升高,全功率满负荷运行。



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

文章目录
  1. 1. 对于数组元素预加载的补充解释
  2. 2. Disruptor是如何通过位运算提升取模效率的?
  3. 3. 缓存行填充与局部性原理
  4. 4. 再次分析Disruptor对变量的缓存行填充原理
  5. 5. 无锁的Disruptor
    1. 5.1. 举个栗子
    2. 5.2. Disruptor如何进行CAS
    3. 5.3. CAS指令
    4. 5.4. CAS就没有什么问题么?
Fork me on GitHub