Disruptor中怎么實(shí)現(xiàn)一個高性能隊(duì)列,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。

創(chuàng)新互聯(lián)是專業(yè)的輝縣網(wǎng)站建設(shè)公司,輝縣接單;提供成都網(wǎng)站制作、成都網(wǎng)站設(shè)計(jì),網(wǎng)頁設(shè)計(jì),網(wǎng)站設(shè)計(jì),建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行輝縣網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴(kuò)展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團(tuán)隊(duì),希望更多企業(yè)前來合作!
import java.util.concurrent.ThreadFactory
import com.lmax.disruptor.dsl.{Disruptor, ProducerType}
import com.lmax.disruptor.{BlockingWaitStrategy,EventFactory,EventHandler,EventTranslatorOneArg,WaitStrategy}
object DisruptorTest {
val disruptor = {
val factory = new EventFactory[Event] {
override def newInstance(): Event = Event(-1)
}
val threadFactory = new ThreadFactory(){
override def newThread(r: Runnable): Thread = new Thread(r)
}
val disruptor = new Disruptor[Event](factory, 4, threadFactory, ProducerType.SINGLE,
new BlockingWaitStrategy())
disruptor.handleEventsWith(TestHandler).`then`(ThenHandler)
disruptor
}
val translator = new EventTranslatorOneArg[Event, Int]() {
override def translateTo(event: Event, sequence: Long, arg: Int): Unit = {
event.id = arg
println(s"translator: ${event}, sequence: ${sequence}, arg: ${arg}")
}
}
def main(args: Array[String]): Unit = {
disruptor.start()
(0 until 100).foreach { i =>
disruptor.publishEvent(translator, i)
}
disruptor.shutdown()
}
}
case class Event(var id: Int) {
override def toString: String = s"event: ${id}"
}
object TestHandler extends EventHandler[Event] {
override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = {
println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}")
}
}
object ThenHandler extends EventHandler[Event] {
override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = {
println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}")
}
}先看 Disruptor 構(gòu)造方法
public Disruptor(final EventFactory<T> eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy) {
this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
new BasicExecutor(threadFactory));
}在看 RingBuffer.create, 最終通過 fill 方法 將 eventFactory.newInstance() 作為默認(rèn)值,塞到 ringBuffer 里面
public static <E> RingBuffer<E> create(ProducerType producerType,
EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {
switch (producerType) {
case SINGLE:
return createSingleProducer(factory, bufferSize, waitStrategy);
case MULTI:
return createMultiProducer(factory, bufferSize, waitStrategy);
default:
throw new IllegalStateException(producerType.toString());
}
}
public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize,
WaitStrategy waitStrategy) {
SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer<E>(factory, sequencer);
}
RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {
this.sequencer = sequencer;
this.bufferSize = sequencer.getBufferSize();
if (bufferSize < 1) {
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1) {
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
this.indexMask = bufferSize - 1;
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
fill(eventFactory);
}
private void fill(EventFactory<E> eventFactory) {
for (int i = 0; i < bufferSize; i++) {
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}首先看 disruptor.start(): 消費(fèi)事件消息入口
private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<>();
public RingBuffer<T> start() {
checkOnlyStartedOnce();
for (final ConsumerInfo consumerInfo : consumerRepository) {
consumerInfo.start(executor);
}
return ringBuffer;
}consumerRepository 類型由 disruptor.handleEventsWith(TestHandler) 初始化, 并構(gòu)造事件消息處理鏈
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {
return createEventProcessors(new Sequence[0], handlers);
}
EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers) {
checkNotStarted();
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {
final EventHandler<? super T> eventHandler = eventHandlers[i];
final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);
if (exceptionHandler != null) {
batchEventProcessor.setExceptionHandler(exceptionHandler);
}
consumerRepository.add(batchEventProcessor, eventHandler, barrier);
processorSequences[i] = batchEventProcessor.getSequence();
}
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}回頭看 disruptor.start() 中的 consumerInfo.start(executor) executor = new BasicExecutor(threadFactory),BasicExecutor 在每次 execute 任務(wù)時,都會 new thread **但是 consumerRepository 的數(shù)量是有限的,所以 new thread 也沒啥問題
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy) {
this(
RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
new BasicExecutor(threadFactory));
}
private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor) {
this.ringBuffer = ringBuffer;
this.executor = executor;
}
@Override
public void start(final java.util.concurrent.Executor executor){
//EventProcessor extends Runnable
//executor = BasicExecutor
executor.execute(eventprocessor);
}
public final class BatchEventProcessor<T> implements EventProcessor {
@Override
public void run() {
if (running.compareAndSet(IDLE, RUNNING)) {
sequenceBarrier.clearAlert();
notifyStart();
try {
if (running.get() == RUNNING) {
processEvents();
}
} finally {
notifyShutdown();
running.set(IDLE);
}
} else {
if (running.get() == RUNNING) {
throw new IllegalStateException("Thread is already running");
} else {
earlyExit();
}
}
}
}
private void processEvents() {
T event = null;
long nextSequence = sequence.get() + 1L;
while (true) {
try {
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null) {
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}
while (nextSequence <= availableSequence) {
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
sequence.set(availableSequence);
} catch (final TimeoutException e) {
notifyTimeout(sequence.get());
} catch (final AlertException ex) {
if (running.get() != RUNNING) {
break;
}
} catch (final Throwable ex) {
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}executor.execute 也就是 BasicExecutor.execute(eventHandler) 會異步的執(zhí)行 eventHandler, 也就是調(diào)用 BatchEventProcessor.run 方法
問題來了,既然是異步執(zhí)行,多個 eventHandler 是怎么按照順序去處理事件消息的?
我們看 processEvents 方法執(zhí)行邏輯
先獲取 BatchEventProcessor.sequence 并 +1
通過 sequenceBarrier.waitFor 也就是 WaitStrategy.waitFor 獲取到可用的 availableSequence
先看下 BlockingWaitStrategy.waitFor 的實(shí)現(xiàn)
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence,
SequenceBarrier barrier)
throws AlertException, InterruptedException {
long availableSequence;
if (cursorSequence.get() < sequence) {
lock.lock();
try {
while (cursorSequence.get() < sequence) {
barrier.checkAlert();
processorNotifyCondition.await();
}
}
finally {
lock.unlock();
}
}
while ((availableSequence = dependentSequence.get()) < sequence) {
barrier.checkAlert();
}
return availableSequence;
}如果 cursorSequence(ringbuffer 的索引) < sequence(batchEventProcessor 的索引) 則batchEventProcessor掛起等待 否則 就用 dependentSequence作為 availableSequence 返回 然后 batchEventProcessor 會將 availableSequence 索引之前的數(shù)據(jù)一次性處理完,并更新自身的 sequence 索引值
dependentSequence 由 ProcessingSequenceBarrier 構(gòu)造方法初始化
final class ProcessingSequenceBarrier implements SequenceBarrier {
private final WaitStrategy waitStrategy;
private final Sequence dependentSequence;
private volatile boolean alerted = false;
private final Sequence cursorSequence;
private final Sequencer sequencer;
ProcessingSequenceBarrier(final Sequencer sequencer, final WaitStrategy waitStrategy,
final Sequence cursorSequence, final Sequence[] dependentSequences) {
this.sequencer = sequencer;
this.waitStrategy = waitStrategy;
this.cursorSequence = cursorSequence;
if (0 == dependentSequences.length) {
dependentSequence = cursorSequence;
} else {
dependentSequence = new FixedSequenceGroup(dependentSequences);
}
}
}在 Disruptor.createEventProcessors 中的, 進(jìn)行了初始化 ProcessingSequenceBarrier final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences) createEventProcessors 僅會被 Disruptor.handleEventsWith和 EventHandlerGroup.handleEventsWith
public class Disruptor<T> {
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {
return createEventProcessors(new Sequence[0], handlers);
}
EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,
final EventHandler<? super T>[] eventHandlers) {
checkNotStarted();
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {
final EventHandler<? super T> eventHandler = eventHandlers[i];
final BatchEventProcessor<T> batchEventProcessor =
new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);
if (exceptionHandler != null) {
batchEventProcessor.setExceptionHandler(exceptionHandler);
}
consumerRepository.add(batchEventProcessor, eventHandler, barrier);
processorSequences[i] = batchEventProcessor.getSequence();
}
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}
}
public class EventHandlerGroup<T> {
private final Disruptor<T> disruptor;
private final ConsumerRepository<T> consumerRepository;
private final Sequence[] sequences;
EventHandlerGroup(final Disruptor<T> disruptor, final ConsumerRepository<T> consumerRepository,
final Sequence[] sequences) {
this.disruptor = disruptor;
this.consumerRepository = consumerRepository;
this.sequences = Arrays.copyOf(sequences, sequences.length);
}
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {
return disruptor.createEventProcessors(sequences, handlers);
}
public final EventHandlerGroup<T> then(final EventHandler<? super T>... handlers) {
return handleEventsWith(handlers);
}
}EventHandlerGroup 會拷貝一份 batchEventProcessor 中的 sequence demo 例子中 disruptor.handleEventsWith(TestHandler).then(ThenHandler) 通過 then 方法將 TestHandler 中的 sequence 傳遞給 ThenHandler 這樣 ThenHandler 就依賴了 TestHandler, ThenHandler 就會在 TestHandler 后執(zhí)行
接著看 disruptor.publishEvent(translator, i) 就是往 ringBuffer 里面放數(shù)據(jù),
public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0) {
final long sequence = sequencer.next();
translateAndPublish(translator, sequence, arg0);
}
private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0) {
try {
translator.translateTo(get(sequence), sequence, arg0);
} finally {
sequencer.publish(sequence);
}
}
public E get(long sequence) {
return elementAt(sequence);
}get(sequence) 根據(jù) sequence [ringbuffer 索引] 獲取 ringbuffer 數(shù)組里的對象 translator 將其處理替換完后,ringbuffer 數(shù)組的的值將是新的值,publish 將會更新索引的標(biāo)記位 waitStrategy.signalAllWhenBlocking() 會通知阻塞等待的消費(fèi)者去繼續(xù)消費(fèi)消息
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
@Override
public void publish(long sequence) {
cursor.set(sequence);
waitStrategy.signalAllWhenBlocking();
}流程理清楚了,我們看看 知識點(diǎn)
ringbuffer
內(nèi)存使用率很高,不會造成內(nèi)存碎片,幾乎沒有浪費(fèi)。業(yè)務(wù)處理的同一時間,訪問的內(nèi)存數(shù)據(jù)段集中。 可以更好的適應(yīng)不同系統(tǒng),取得較高的性能。內(nèi)存的物理布局簡單單一,不太容易發(fā)生內(nèi)存越界、懸空指針等 bug,出了問題也容易在內(nèi)存級別分析調(diào)試。 做出來的系統(tǒng)容易保持健壯。
cpu cache
CPU 訪問內(nèi)存時會等待,導(dǎo)致計(jì)算資源大量閑置,降低 CPU 整體吞吐量。 由于內(nèi)存數(shù)據(jù)訪問的熱點(diǎn)集中性,在 CPU 和內(nèi)存之間用較為快速而成本較高(相對于內(nèi)存)的介質(zhì)做一層緩存,就顯得性價(jià)比極高了
看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進(jìn)一步的了解或閱讀更多相關(guān)文章,請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝您對創(chuàng)新互聯(lián)的支持。
本文標(biāo)題:Disruptor中怎么實(shí)現(xiàn)一個高性能隊(duì)列
網(wǎng)頁網(wǎng)址:http://chinadenli.net/article22/ggpecc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供App開發(fā)、移動網(wǎng)站建設(shè)、網(wǎng)站設(shè)計(jì)、網(wǎng)站排名、響應(yīng)式網(wǎng)站、域名注冊
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)