EventStore的简单实现

Sebastian Bełczyk Simple implementation of EventSt

声明:这个实现既不健壮又不一定有效率,只是演示EventStore的事件流。

下载演示源码.NET

这是使用getEventStore,安装方式见该作者另外一篇:EventStore - Getting Started

保存事件的流程:

重建聚合的流程:

事件是领域事件, domain event,其对象如下:


public class Event<T> where T : Aggregate
{
public Guid Id { get; set; }
public Guid AggregateId { get; set; }
public Event()
{
Id = Guid.NewGuid();
}
}

比如某个具体的增加新应用的事件代码是:


public class NewApplicationAddedEvent : Event<ApplicationAggregate>
{
public string Name { get; set; }
public string Surname { get; set; }
public Guid ApplicationId { get; set; }

public NewApplicationAddedEvent(string name, string surname, Guid applicationId)
{
Name = name;
Surname = surname;
ApplicationId = applicationId;
}
}

聚合根实体代码:


public class ApplicationAggregate : Aggregate
{
public string Name { get; set; }
public string Surname { get; set; }
public DateTime ApplicationDate { get; set; }

private void Handle(NewApplicationAddedEvent @event)
{
Name = @event.Name;
Surname = @event.Surname;
Id = @event.ApplicationId;
ApplicationDate = DateTime.Now;
}

private void Handle(NameOnApplicationChangedEvent @event)
{
Name = @event.Name;
Surname = @event.Surname;
}
}

写入EventStore的仓储代码:


public void SaveEvent(Event<T> @event)
{
//产生类似ApplicationAggregate-33c5590d-13bc-4c16-8f29-1e737fea9427的流名称
var streamName = GetStreamName(@event.AggregateId);
var metadata = new Dictionary<string, object>
{
{
"AggregateType",typeof(T).FullName},
};
//把领域事件转为事件数据 保存到EventStore
connection.AppendToStream(streamName, ExpectedVersion.Any, DomainEventToEventData(@event.Id, @event, metadata));
}

从EventStore中恢复重建聚合:


public T Get(Guid aggregateId)
{
var events = ReadAllEventsInStream(GetStreamName(aggregateId),connection);

var aggregate = (T)Activator.CreateInstance(typeof (T));

foreach (var @event in events)
{
var appEvent =(Event) ResolvedEventToDomainEvent(@event);
ApplyEventToAggragate(appEvent, aggregate);
}

return aggregate;
}

根据流名称,从EventStore中读取这个流中的所有事件,调用ApplyEventToAggragate重建,ApplyEventToAggragate具体代码:

private void ApplyEventToAggragate(Event<T> @event, T aggregate)
{
var handleMethods = typeof (T).GetMethods(BindingFlags.NonPublic | BindingFlags.Instance)
.Where(x => x.Name == "Handle");

var method = handleMethods.Single(x => x.GetParameters().First().ParameterType.FullName == @event.GetType().FullName);

method.Invoke(aggregate,new object[]{@event});
}

首先将EventStore中的ResolvedEvent 格式转为Domain Event,然后再调用聚合根实体的重建方法。

完整代码:


public class EventStoreRepository<T> : EventStoreRpositoryBasey<T> where T : Aggregate
{

public T Get(Guid aggregateId)
{
var events = ReadAllEventsInStream(GetStreamName(aggregateId), connection);

var aggregate = (T)Activator.CreateInstance(typeof(T));

foreach (var @event in events)
{
var appEvent = (Event<T>)ResolvedEventToDomainEvent(@event);
ApplyEventToAggragate(appEvent, aggregate);
}

return aggregate;
}

private void ApplyEventToAggragate(Event<T> @event, T aggregate)
{
var handleMethods = typeof(T).GetMethods(BindingFlags.NonPublic | BindingFlags.Instance)
.Where(x => x.Name == "Handle");

var method = handleMethods.Single(x => x.GetParameters().First().ParameterType.FullName == @event.GetType().FullName);

method.Invoke(aggregate, new object[] { @event });
}

public void SaveEvent(Event<T> @event)
{
var streamName = GetStreamName(@event.AggregateId);
var metadata = new Dictionary<string, object>
{
{
"AggregateType",typeof(T).FullName},
};
connection.AppendToStream(streamName, ExpectedVersion.Any, DomainEventToEventData(@event.Id, @event, metadata));
}
}

public class EventStoreRpositoryBase<T> where T : Aggregate
{
protected IEventStoreConnection connection;

public EventStoreRpositoryBasey()
{
connection = EventStoreConnection.Create(new IPEndPoint(IPAddress.Loopback, 1113));
connection.Connect();
}

protected string GetStreamName(Guid aggregateId)
{
return string.Format(
"{0}-{1}", typeof(T).Name, aggregateId);
}

/// <summary>
/// Serialize @event object to JSON and converts to byte array, builds metadata for event and builds EventData object.
/// </summary>
protected EventData DomainEventToEventData(Guid eventId, object @event, IDictionary<string, object> headers = null)
{
if (headers == null) headers = new Dictionary<string, object>();
var serializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.None };

var data = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(@event, serializerSettings));

var eventHeaders = new Dictionary<string, object>(headers)
{
{
"EventClrTypeName", @event.GetType().AssemblyQualifiedName
}
};
var metadata = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(eventHeaders, serializerSettings));
var typeName = @event.GetType().Name;

return new EventData(eventId, typeName, true, data, metadata);
}

/// <summary>
/// Reads all events in stream
/// </summary>
protected IEnumerable<ResolvedEvent> ReadAllEventsInStream(string streamName, IEventStoreConnection connection, int pageSize = 500)
{
var result = new List<ResolvedEvent>();
var coursor = StreamPosition.Start;
StreamEventsSlice events = null;
do
{
events = connection.ReadStreamEventsForward(streamName, coursor, pageSize, false);
result.AddRange(events.Events);
coursor = events.NextEventNumber;

} while (events != null && !events.IsEndOfStream);

return result;
}

/// <summary>
/// Conversts ResolvedEvent pulled from the stream to domain event.
/// </summary>
protected object ResolvedEventToDomainEvent(ResolvedEvent @event)
{
var metadataJSON = Encoding.UTF8.GetString(@event.Event.Metadata);
var metadata = JsonConvert.DeserializeObject<Dictionary<string, object>>(metadataJSON);
var eventDataJSON = Encoding.UTF8.GetString(@event.Event.Data);

var eventType =
Assembly.GetExecutingAssembly().GetTypes().Single(x => x.AssemblyQualifiedName == metadata[
"EventClrTypeName"].ToString());

return JsonConvert.DeserializeObject(eventDataJSON, eventType);
}

}