请教下Disruptor 流水线的问题
在使用Disruptor 实现流水线模式,要求任务1和任务2处理完后再处理任务3,如果任务1或者任务2出现异常任务3将不会处理,不知道如何实现,请教各位大神
相关代码
var disruptor = new Disruptor<ValueEntry>(() => new ValueEntry(), _ringSize, TaskScheduler.Default);
var handler1 = new ValueAdditionHandler();
var handler2 = new ValueAdditionHandler1();
var handler3 = new Persisten();
disruptor.HandleExceptionsWith(new ExceptionHandler());
disruptor.HandleEventsWith(handler1, handler2).Then(handler3);
var ringBuffer = disruptor.Start();
var total = 100;
var i = 0;
while (i < total)
{
long sequenceNo = ringBuffer.Next();
ValueEntry entry = ringBuffer[sequenceNo];
entry.Value = _random.Next();
ringBuffer.Publish(sequenceNo);
i++;
}
看了下Disruptor 源码 没有对这样的情况进行处理,LMAX的文章里面有说可以处理这类情况。
以下是lmax 发布事件核心代码:
long nextSequence = _sequence.Value + 1L;
while (true)
{
try
{
long availableSequence = _sequenceBarrier.WaitFor(nextSequence);
while (nextSequence <= availableSequence)
{
evt = _ringBuffer[nextSequence];
_eventHandler.OnNext(evt, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
_sequence.LazySet(nextSequence - 1L);
}
catch (AlertException)
{
if (!_running.ReadFullFence())
{
break;
}
}
catch (Exception ex)
{
_exceptionHandler.HandleEventException(ex, nextSequence, evt);
_sequence.LazySet(nextSequence);
nextSequence++;
}
}