请教下Disruptor 流水线的问题

ztzcom 14-08-29
         

在使用Disruptor 实现流水线模式,要求任务1和任务2处理完后再处理任务3,如果任务1或者任务2出现异常任务3将不会处理,不知道如何实现,请教各位大神
相关代码
var disruptor = new Disruptor<ValueEntry>(() => new ValueEntry(), _ringSize, TaskScheduler.Default);
var handler1 = new ValueAdditionHandler();
var handler2 = new ValueAdditionHandler1();
var handler3 = new Persisten();
disruptor.HandleExceptionsWith(new ExceptionHandler());
disruptor.HandleEventsWith(handler1, handler2).Then(handler3);
var ringBuffer = disruptor.Start();
var total = 100;
var i = 0;
while (i < total)
{
long sequenceNo = ringBuffer.Next();
ValueEntry entry = ringBuffer[sequenceNo];
entry.Value = _random.Next();
ringBuffer.Publish(sequenceNo);
i++;
}

看了下Disruptor 源码 没有对这样的情况进行处理,LMAX的文章里面有说可以处理这类情况。
以下是lmax 发布事件核心代码:
long nextSequence = _sequence.Value + 1L;
while (true)
{
try
{
long availableSequence = _sequenceBarrier.WaitFor(nextSequence);
while (nextSequence <= availableSequence)
{
evt = _ringBuffer[nextSequence];
_eventHandler.OnNext(evt, nextSequence, nextSequence == availableSequence);
nextSequence++;
}

_sequence.LazySet(nextSequence - 1L);
}
catch (AlertException)
{
if (!_running.ReadFullFence())
{
break;
}
}
catch (Exception ex)
{
_exceptionHandler.HandleEventException(ex, nextSequence, evt);
_sequence.LazySet(nextSequence);
nextSequence++;
}
}

         

banq
2014-08-30 09:32

推荐从Disruptor外部去考虑,从事件和消息角度考虑,结合apache kafka一起考虑。
参考kafka在大数据处理中如何对消息实现事务:http://www.jdon.com/45698#23143267
[该贴被banq于2014-08-31 10:07修改过]

ztzcom
2014-09-02 14:18

查看banq 关于LMAX的文章中有介绍说LMAX 改变内存状态时事件必须确保已经持久化
请问banq lmax 是如何保证的呢?
查看Disruptor 源码没有看到相关的例子或实现