请教下Disruptor 流水线的问题

14-08-29 ztzcom
         

在使用Disruptor 实现流水线模式,要求任务1和任务2处理完后再处理任务3,如果任务1或者任务2出现异常任务3将不会处理,不知道如何实现,请教各位大神

相关代码

var disruptor = new Disruptor<ValueEntry>(() => new ValueEntry(), _ringSize, TaskScheduler.Default);

var handler1 = new ValueAdditionHandler();

var handler2 = new ValueAdditionHandler1();

var handler3 = new Persisten();

disruptor.HandleExceptionsWith(new ExceptionHandler());

disruptor.HandleEventsWith(handler1, handler2).Then(handler3);

var ringBuffer = disruptor.Start();

var total = 100;

var i = 0;

while (i < total)

{

long sequenceNo = ringBuffer.Next();

ValueEntry entry = ringBuffer[sequenceNo];

entry.Value = _random.Next();

ringBuffer.Publish(sequenceNo);

i++;

}

看了下Disruptor 源码 没有对这样的情况进行处理,LMAX的文章里面有说可以处理这类情况。

以下是lmax 发布事件核心代码:

long nextSequence = _sequence.Value + 1L;

while (true)

{

try

{

long availableSequence = _sequenceBarrier.WaitFor(nextSequence);

while (nextSequence <= availableSequence)

{

evt = _ringBuffer[nextSequence];

_eventHandler.OnNext(evt, nextSequence, nextSequence == availableSequence);

nextSequence++;

}

_sequence.LazySet(nextSequence - 1L);

}

catch (AlertException)

{

if (!_running.ReadFullFence())

{

break;

}

}

catch (Exception ex)

{

_exceptionHandler.HandleEventException(ex, nextSequence, evt);

_sequence.LazySet(nextSequence);

nextSequence++;

}

}

         

banq
2014-08-30 09:32

推荐从Disruptor外部去考虑,从事件和消息角度考虑,结合apache kafka一起考虑。

参考kafka在大数据处理中如何对消息实现事务:http://www.jdon.com/45698#23143267

[该贴被banq于2014-08-31 10:07修改过]

ztzcom
2014-09-02 14:18

查看banq 关于LMAX的文章中有介绍说LMAX 改变内存状态时事件必须确保已经持久化

请问banq lmax 是如何保证的呢?

查看Disruptor 源码没有看到相关的例子或实现