Sebastian Bełczyk Simple implementation of EventSt
声明:这个实现既不健壮又不一定有效率,只是演示EventStore的事件流。
这是使用getEventStore,安装方式见该作者另外一篇:EventStore - Getting Started
保存事件的流程:
重建聚合的流程:
事件是领域事件, domain event,其对象如下:
public class Event<T> where T : Aggregate { public Guid Id { get; set; } public Guid AggregateId { get; set; } public Event() { Id = Guid.NewGuid(); } } |
比如某个具体的增加新应用的事件代码是:
public class NewApplicationAddedEvent : Event<ApplicationAggregate> { public string Name { get; set; } public string Surname { get; set; } public Guid ApplicationId { get; set; } public NewApplicationAddedEvent(string name, string surname, Guid applicationId) { Name = name; Surname = surname; ApplicationId = applicationId; } } |
聚合根实体代码:
public class ApplicationAggregate : Aggregate { public string Name { get; set; } public string Surname { get; set; } public DateTime ApplicationDate { get; set; } private void Handle(NewApplicationAddedEvent @event) { Name = @event.Name; Surname = @event.Surname; Id = @event.ApplicationId; ApplicationDate = DateTime.Now; } private void Handle(NameOnApplicationChangedEvent @event) { Name = @event.Name; Surname = @event.Surname; } } |
写入EventStore的仓储代码:
public void SaveEvent(Event<T> @event) { //产生类似ApplicationAggregate-33c5590d-13bc-4c16-8f29-1e737fea9427的流名称 var streamName = GetStreamName(@event.AggregateId); var metadata = new Dictionary<string, object> { {"AggregateType",typeof(T).FullName}, }; //把领域事件转为事件数据 保存到EventStore connection.AppendToStream(streamName, ExpectedVersion.Any, DomainEventToEventData(@event.Id, @event, metadata)); } |
从EventStore中恢复重建聚合:
public T Get(Guid aggregateId) { var events = ReadAllEventsInStream(GetStreamName(aggregateId),connection); var aggregate = (T)Activator.CreateInstance(typeof (T)); foreach (var @event in events) { var appEvent =(Event) ResolvedEventToDomainEvent(@event); ApplyEventToAggragate(appEvent, aggregate); } return aggregate; } |
private void ApplyEventToAggragate(Event<T> @event, T aggregate) { var handleMethods = typeof (T).GetMethods(BindingFlags.NonPublic | BindingFlags.Instance) .Where(x => x.Name == "Handle"); var method = handleMethods.Single(x => x.GetParameters().First().ParameterType.FullName == @event.GetType().FullName); method.Invoke(aggregate,new object[]{@event}); } |
完整代码:
public class EventStoreRepository<T> : EventStoreRpositoryBasey<T> where T : Aggregate { public T Get(Guid aggregateId) { var events = ReadAllEventsInStream(GetStreamName(aggregateId), connection); var aggregate = (T)Activator.CreateInstance(typeof(T)); foreach (var @event in events) { var appEvent = (Event<T>)ResolvedEventToDomainEvent(@event); ApplyEventToAggragate(appEvent, aggregate); } return aggregate; } private void ApplyEventToAggragate(Event<T> @event, T aggregate) { var handleMethods = typeof(T).GetMethods(BindingFlags.NonPublic | BindingFlags.Instance) .Where(x => x.Name == "Handle"); var method = handleMethods.Single(x => x.GetParameters().First().ParameterType.FullName == @event.GetType().FullName); method.Invoke(aggregate, new object[] { @event }); } public void SaveEvent(Event<T> @event) { var streamName = GetStreamName(@event.AggregateId); var metadata = new Dictionary<string, object> { {"AggregateType",typeof(T).FullName}, }; connection.AppendToStream(streamName, ExpectedVersion.Any, DomainEventToEventData(@event.Id, @event, metadata)); } } public class EventStoreRpositoryBase<T> where T : Aggregate { protected IEventStoreConnection connection; public EventStoreRpositoryBasey() { connection = EventStoreConnection.Create(new IPEndPoint(IPAddress.Loopback, 1113)); connection.Connect(); } protected string GetStreamName(Guid aggregateId) { return string.Format("{0}-{1}", typeof(T).Name, aggregateId); } /// <summary> /// Serialize @event object to JSON and converts to byte array, builds metadata for event and builds EventData object. /// </summary> protected EventData DomainEventToEventData(Guid eventId, object @event, IDictionary<string, object> headers = null) { if (headers == null) headers = new Dictionary<string, object>(); var serializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.None }; var data = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(@event, serializerSettings)); var eventHeaders = new Dictionary<string, object>(headers) { { "EventClrTypeName", @event.GetType().AssemblyQualifiedName } }; var metadata = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(eventHeaders, serializerSettings)); var typeName = @event.GetType().Name; return new EventData(eventId, typeName, true, data, metadata); } /// <summary> /// Reads all events in stream /// </summary> protected IEnumerable<ResolvedEvent> ReadAllEventsInStream(string streamName, IEventStoreConnection connection, int pageSize = 500) { var result = new List<ResolvedEvent>(); var coursor = StreamPosition.Start; StreamEventsSlice events = null; do { events = connection.ReadStreamEventsForward(streamName, coursor, pageSize, false); result.AddRange(events.Events); coursor = events.NextEventNumber; } while (events != null && !events.IsEndOfStream); return result; } /// <summary> /// Conversts ResolvedEvent pulled from the stream to domain event. /// </summary> protected object ResolvedEventToDomainEvent(ResolvedEvent @event) { var metadataJSON = Encoding.UTF8.GetString(@event.Event.Metadata); var metadata = JsonConvert.DeserializeObject<Dictionary<string, object>>(metadataJSON); var eventDataJSON = Encoding.UTF8.GetString(@event.Event.Data); var eventType = Assembly.GetExecutingAssembly().GetTypes().Single(x => x.AssemblyQualifiedName == metadata["EventClrTypeName"].ToString()); return JsonConvert.DeserializeObject(eventDataJSON, eventType); } } |