文章目录
  1. 1. Disruptor中主要的等待策略有哪些?
  2. 2. BlockingWaitStrategy
  3. 3. SleepingWaitStrategy
  4. 4. BusySpinWaitStrategy
    1. 4.1. 如何实现利用线程亲和性绑定线程到具体的CPU?
  5. 5. YieldingWaitStrategy
  6. 6. 盘点等待策略
  7. 7. 扩展:单一写原则

我们接着介绍Disruptor高性能实现之道–等待策略。

等待策略waitStrategy是一种决定一个消费者如何等待生产者将event对象放入Disruptor的方式/策略。

等待策略waitStrategy是一个接口,它的所有实现都是针对消费者生效的。

Disruptor中主要的等待策略有哪些?

Disruptor中,等待策略waitStrategy有四个实现,分别是:

  • BlockingWaitStrategy:使用锁和条件变量实现的阻塞策略。如果不是将吞吐量和低延迟放在首位,则可以使用该策略。一般来说,这个策略的表现是中规中矩比较稳定的,它不会使CPU的负载飙高。

    虽然客观上说, BlockingWaitStrategy是最低效的策略,但其也是CPU使用率最低和最稳定的策略。

    在BlockingWaitStrategy内部维护了一个重入锁ReentrantLock和Condition;

  • SleepingWaitStrategy:性能表现和com.lmax.disruptor.BlockingWaitStrategy差不多,对CPU的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景;

    SleepingWaitStrategy是一种无锁的方式,它的CPU使用率也比较低。具体的实现原理为:循环等待并且在循环中间调用LockSupport.parkNanos(1)来睡眠,(在Linux系统上面睡眠时间60µs).

    SleepingWaitStrategy优点在于生产线程只需要计数,而不执行任何指令。并且没有条件变量的消耗。但是,事件对象从生产者到消费者传递的延迟变大了。SleepingWaitStrategy最好用在不需要低延迟,而且事件发布对于生产者的影响比较小的情况下。比如异步日志功能。

  • YieldingWaitStrategy:性能最好,适合用于低延迟的系统,在要求极高性能且事件处理线程数小于CPU逻辑核心数的场景中推荐使用此策略,例如CPU开启超线程的特性;

    虽然YieldingWaitStrategy性能最好,但是它的实现机制是让出cpu使用权,保证cpu不会空闲,从而使得cpu始终处于工作态,因此该策略会使用100%的CPU,因此建议慎用。

  • BusySpinWaitStrategy:该策略原则上来说应当是性能最高的,它将线程绑定在特定的CPU内核,但是同时该策略也是部署过程中最为苛刻的策略。

    BusySpinWaitStrategy发挥高性能的前提是事件处理线程比物理内核数目还要小的场景。例如:在禁用超线程技术的时候。

BlockingWaitStrategy

BlockingWaitStrategy是Disruptor中唯一使用到锁的地方。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public final class BlockingWaitStrategy implements WaitStrategy
{
// 可重入锁
private final Lock lock = new ReentrantLock();
// 条件变量
private final Condition processorNotifyCondition = lock.newCondition();
@Override
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
if (cursorSequence.get() < sequence)
{
lock.lock();
try
{
while (cursorSequence.get() < sequence)
{
barrier.checkAlert();
processorNotifyCondition.await();
}
}
finally
{
lock.unlock();
}
}
// 如果生产者新发布了事件,但是依赖的其他消费者还没处理完,则等待所依赖的消费者先处理
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
ThreadHints.onSpinWait();
}
return availableSequence;
}
@Override
public void signalAllWhenBlocking()
{
lock.lock();
try
{
processorNotifyCondition.signalAll();
}
finally
{
lock.unlock();
}
}

BlockingWaitStrategy的类长度不到100行,使用了Lock+Condition 实现了线程等待和唤醒操作。从而实现了生产者与消费者之间的同步。

消费者通过waitFor等待RingBuffer指定位置是否有可用数据,当存在可用数据,则消费者被唤醒。

1
2
3
4
5
6
7
8
9
/**
* @see Sequencer#publish(long)
*/
@Override
public void publish(long sequence)
{
cursor.set(sequence);
waitStrategy.signalAllWhenBlocking();
}
  • 如果生产者新发布了事件,但是依赖的其他消费者还没处理完,则等待所依赖的消费者先处理
  • 生产者新发布时间,会唤醒等待中的消费者。

SleepingWaitStrategy

SleepingWaitStrategy没有用到锁,这表明它无需调用signalAllWhenBlocking方法做唤醒处理。

SleepingWaitStrategy核心是通过Thread.yield + LockSupport.parkNanos,实现生产者和消费者之间的同步。

也就是说省去了生产线程的通知操作,官方源码注释如下:

1
2
3
4
* This strategy is a good compromise between performance and CPU resource.
* Latency spikes can occur after quiet periods. It will also reduce the impact
* on the producing thread as it will not need signal any conditional variables
* to wake up the event handling thread.

大意是说,SleepingWaitStrategy策略在性能和CPU资源消耗之间取得了平衡,接下来去看看关键代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static final int DEFAULT_RETRIES = 200;
private static final long DEFAULT_SLEEP = 100;
private final int retries;
private final long sleepTimeNs;
@Override
public long waitFor(
final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
throws AlertException
{
long availableSequence;
int counter = retries; // 默认值为DEFAULT_RETRIES = 200;
while ((availableSequence = dependentSequence.get()) < sequence)
{
counter = applyWaitMethod(barrier, counter);
}
return availableSequence;
}
@Override
public void signalAllWhenBlocking()
{
}

waitFor 方法核心是while循环,我们可以看到,while循环没有任何的break操作,他就是个死循环。

counter默认值为200,自旋重试一定次数,如果在重试过程中,出现了可用sequence,也就是生产者往RingBuffer中生产了数据,则直接返回可用的序列号。

只要消费者没有等到可用的数据,就会一直循环,执行applyWaitMethod。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private int applyWaitMethod(final SequenceBarrier barrier, int counter)
throws AlertException
{
barrier.checkAlert();
if (counter > 100)
{
--counter;
}
else if (counter > 0)
{
--counter;
Thread.yield();
}
else
{
LockSupport.parkNanos(sleepTimeNs);
}
return counter;
}

这里的核心就是counter计数器,完全是无锁的。

当计数器高于100时就执行减一的操作(最快响应),当计数器在100到0之间时每次都交出CPU执行时间(最省资源),其他时候就睡眠固定时间:

如果重试指定次数以后,还是没有可用序列号,则继续自旋重试:

  • 0-100:每重试一次,便调用Thread.yield方法,让渡CPU的使用权,让其它线程可以使用CPU。当该线程再次获取CPU使用权时,继续重试,如果还没有可用的序列号,则继续放弃CPU使用权等待。此循环最多100次。
  • 加入在等待过程中还是没有可用的序列号,则调用LockSupport.parkNanos方法阻塞消费线程,阻塞时长通过SleepingWaitStrategy构造方法设置,一直阻塞到出现了可用的sequence(一直阻塞到生产者生产了数据)。
  • 当LockSupport.parkNanos方法由于超时返回后,还没有可用的sequence序列号,则该线程获取CPU使用权以后,可能继续调用LockSupport.parkNanos方法阻塞线程。

跟其它几种等待策略相比,它既没有直接使用锁,也没有直接自旋。属于一种在性能和CPU资源之间折中的方案。

BusySpinWaitStrategy

BusySpinWaitStrategy的实现代码行数只有几十行,从它的注释可以看出: 该策略将线程绑定到了特定的CPU内核。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* Busy Spin strategy that uses a busy spin loop for {@link EventProcessor}s waiting on a barrier.
* <p>
* This strategy will use CPU resource to avoid syscalls which can introduce latency jitter. It is best
* used when threads can be bound to specific CPU cores.
*/
public final class BusySpinWaitStrategy implements WaitStrategy
{
@Override
public long waitFor(
final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
ThreadHints.onSpinWait();
}
return availableSequence;
}
@Override
public void signalAllWhenBlocking()
{
}
}

当没有可用sequence时,消费者会一直执行while循环,具体的逻辑为 ThreadHints.onSpinWait();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static final MethodHandle ON_SPIN_WAIT_METHOD_HANDLE;
public static void onSpinWait()
{
// Call java.lang.Thread.onSpinWait() on Java SE versions that support it. Do nothing otherwise.
// This should optimize away to either nothing or to an inlining of java.lang.Thread.onSpinWait()
if (null != ON_SPIN_WAIT_METHOD_HANDLE)
{
try
{
ON_SPIN_WAIT_METHOD_HANDLE.invokeExact();
}
catch (final Throwable ignore)
{
}
}
}

当ON_SPIN_WAIT_METHOD_HANDLE 不为空,则执行 ON_SPIN_WAIT_METHOD_HANDLE.invokeExact(); 底层是一个native方法。

那么我们可以猜想,如果ON_SPIN_WAIT_METHOD_HANDLE为空,那么这个外层的while循环就是一个纯粹的自旋操作,也就是说这个操作非常消耗CPU。

ON_SPIN_WAIT_METHOD_HANDLE为空是一个比较严重的场景,它的初始化逻辑为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# com.lmax.disruptor.util.ThreadHints
static
{
final MethodHandles.Lookup lookup = MethodHandles.lookup();
MethodHandle methodHandle = null;
try
{
methodHandle = lookup.findStatic(Thread.class, "onSpinWait", methodType(void.class));
}
catch (final Exception ignore)
{
}
ON_SPIN_WAIT_METHOD_HANDLE = methodHandle;
}

可以看到,这里的methodHandle其实就是Thread类中的onSpinWait方法,

如果Thread类没有onSpinWait方法那么使用BusySpinWaitStrategy作为等待策略就在RingBuffer中没有数据时,消费线程就会执行自旋空转,这个操作很耗费CPU。

那么问题就变成了,Thread类中是否存在onSpinWait 方法的问题了。

有趣的是,onSpinWait方法在JDK1.9之后才添加到了Thread类中,也就是说,对于JDK1.8(包括1.8)之前的用户而言,使用BusySpinWaitStrategy就意味着,找不到Thread类的onSpinWait方法,而最终导致消费者阻塞在waitFor方法上,执行无意义的自旋操作,把CPU负载打满(就是一个while(true)死循环)。

在jdk1.9及以上版本中,Thread.onSpinWait是有意义的。它会通知CPU当前线程处于循环查询的状态,CPU得知该状态后就会调度更多CPU资源给其他线程,从而缓解死循环对当前cpu核的压力。

回过头来,BusySpinWaitStrategy的注释告诉我们:如果使用该策略,尽量绑定线程到固定的CPU核心。但是同样的,该策略与YieldingWaitStrategy策略相比,会出现当没有可用序列号时长期占用CPU而让出CPU使用权(死循环),导致其它线程无法获取CPU使用权。

如何实现利用线程亲和性绑定线程到具体的CPU?

那么这个操作又该如何实现呢?

通过使用net.openhft.affinity包,就可以实现线程亲和性,它会强制你的应用线程运行在特定的一个或多个cpu上。

maven依赖为:


net.openhft
affinity
3.0.6

在初始化Disruptor实例时,ThreadFactory参数传入affinity线程亲和工厂。

以Spring项目中实例化Disruptor为例:

disruptor-init.png


openhft-affinity.png

YieldingWaitStrategy

YieldingWaitStrategy相比于SleepingWaitStrategy,实现机制就很激进,它完全基于Thread.yield出让cpu使用权,让CPU利用率保持在100%。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public final class YieldingWaitStrategy implements WaitStrategy
{
private static final int SPIN_TRIES = 100;
@Override
public long waitFor(
final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
int counter = SPIN_TRIES;
while ((availableSequence = dependentSequence.get()) < sequence)
{
counter = applyWaitMethod(barrier, counter);
}
return availableSequence;
}
@Override
public void signalAllWhenBlocking()
{
}

当消费者没有获取到可用的sequence,则循环执行applyWaitMethod。直到存在可用的sequence,就返回该sequence。

返回sequence之后就可以根据该sequence从RingBuffer中get出这个sequence对应的event,执行业务操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private int applyWaitMethod(final SequenceBarrier barrier, int counter)
throws AlertException
{
barrier.checkAlert();
// counter默认为100,在减小到0之前不会进入if分支
if (0 == counter)
{
Thread.yield();
}
else
{
--counter;
}
return counter;
}
}
  • 首先,counter默认为100,在减小到0之前不会进入if分支,直接进入else,执行减1操作。

    也就是说,首先会自旋重试100次(此值可设置,默认100次),如果在重试过程中,存在可用的序列号,则直接返回可用的序列号。

  • 如果自旋了100次,counter减到0了,还是没有得到可用的sequence序列号,那么就会调用Thread.yield方法,让渡CPU的使用权,让其它线程可以争抢到CPU使用权。当该线程再次获取CPU使用权时,继续该过程:如果没有可用的序列号,则继续放弃CPU使用权等待。

从分析我们可以看出,YieldingWaitStrategy基本上是在等待sequence期间,不断的通过Thread.yield出让CPU的使用权,因此这个策略会让CPU使用率保持在100%的满负荷,生产中强烈推荐 不要使用

盘点等待策略

  • BlockingWaitStrategy:基于ReentrantLock的等待&&唤醒机制实现等待逻辑,该策略是Disruptor的默认策略,比较节省CPU,生产环境推荐使用;
  • BusySpinWaitStrategy:持续自旋,不推荐使用,会造成CPU负载100%;
  • DummyWaitStrategy:返回的Sequence值为0,正常情况下不使用
  • LiteBlockingWaitStrategy:基于BlockingWaitStrategy的轻量级等待策略,在没有锁竞争的时候会省去唤醒操作,但是作者说测试不充分,因此不建议使用
  • TimeoutBlockingWaitStrategy:带超时的等待策略,超时后会执行业务指定的处理逻辑
  • LiteTimeoutBlockingWaitStrategy:基于TimeoutBlockingWaitStrategy的策略,当没有锁竞争的时候会省去唤醒操作
  • SleepingWaitStrategy:三段式策略,第一阶段自旋,第二阶段执行Thread.yield让出CPU,第三阶段睡眠执行时间,反复的睡眠
  • YieldingWaitStrategy:二段式策略,第一阶段自旋,第二阶段执行Thread.yield交出CPU
  • PhasedBackoffWaitStrategy:四段式策略,第一阶段自旋指定次数,第二阶段自旋指定时间,第三阶段执行Thread.yield交出CPU,第四阶段调用成员变量的waitFor方法,该成员变量可以被设置为BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy三个中的一个

扩展:单一写原则

在并发系统中提高性能最好的方式之一就是单一写原则,Disruptor中生产者就体现了这一原则。

如果在你的代码中仅仅有一个事件生产者,那么可以设置为单一生产者模式来提高系统的性能。

单一写的好处在于:完全不需要考虑同步多个写线程,写入操作没有上下文切换,并且是线程安全的(写入串行化)。

关于单一写原则,可以阅读: https://mechanical-sympathy.blogspot.com/2011/09/single-writer-principle.html

版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

文章目录
  1. 1. Disruptor中主要的等待策略有哪些?
  2. 2. BlockingWaitStrategy
  3. 3. SleepingWaitStrategy
  4. 4. BusySpinWaitStrategy
    1. 4.1. 如何实现利用线程亲和性绑定线程到具体的CPU?
  5. 5. YieldingWaitStrategy
  6. 6. 盘点等待策略
  7. 7. 扩展:单一写原则
Fork me on GitHub