EventStore的简单实现

13-11-25 banq
Sebastian Bełczyk Simple implementation of EventSt

声明:这个实现既不健壮又不一定有效率,只是演示EventStore的事件流。

下载演示源码.NET

这是使用getEventStore,安装方式见该作者另外一篇:EventStore - Getting Started

保存事件的流程:

重建聚合的流程:

事件是领域事件, domain event,其对象如下:

public class Event<T> where T : Aggregate
{
    public Guid Id { get; set; }
    public Guid AggregateId { get; set; }
    public Event()
    {
        Id = Guid.NewGuid();
    }
}
<p>

比如某个具体的增加新应用的事件代码是:

public class NewApplicationAddedEvent : Event<ApplicationAggregate>
{
    public string Name { get; set; }
    public string Surname { get; set; }
    public Guid ApplicationId { get; set; }
 
    public NewApplicationAddedEvent(string name, string surname, Guid applicationId)
    {
        Name = name;
        Surname = surname;
        ApplicationId = applicationId;
    }
}
<p>

聚合根实体代码:

public class ApplicationAggregate : Aggregate
{
    public string Name { get; set; }
    public string Surname { get; set; }
    public DateTime ApplicationDate { get; set; }
 
    private void Handle(NewApplicationAddedEvent @event)
    {
        Name = @event.Name;
        Surname = @event.Surname;
        Id = @event.ApplicationId;
        ApplicationDate = DateTime.Now;
    }
 
    private void Handle(NameOnApplicationChangedEvent @event)
    {
        Name = @event.Name;
        Surname = @event.Surname;
    }
}
<p>

写入EventStore的仓储代码:

public void SaveEvent(Event<T> @event)
{
  //产生类似ApplicationAggregate-33c5590d-13bc-4c16-8f29-1e737fea9427的流名称
    var streamName = GetStreamName(@event.AggregateId);
    var metadata = new Dictionary<string, object>
    {
        {"AggregateType",typeof(T).FullName},
    };
   //把领域事件转为事件数据 保存到EventStore
    connection.AppendToStream(streamName, ExpectedVersion.Any, DomainEventToEventData(@event.Id, @event, metadata));
}
<p>

从EventStore中恢复重建聚合:

public T Get(Guid aggregateId)
{
    var events = ReadAllEventsInStream(GetStreamName(aggregateId),connection);
 
    var aggregate = (T)Activator.CreateInstance(typeof (T));
 
    foreach (var @event in events)
    {
        var appEvent =(Event) ResolvedEventToDomainEvent(@event);
        ApplyEventToAggragate(appEvent, aggregate);
    }
 
    return aggregate;
}
<p>

根据流名称,从EventStore中读取这个流中的所有事件,调用ApplyEventToAggragate重建,ApplyEventToAggragate具体代码:

private void ApplyEventToAggragate(Event<T> @event, T aggregate)
{
    var handleMethods = typeof (T).GetMethods(BindingFlags.NonPublic | BindingFlags.Instance)
        .Where(x => x.Name == "Handle");
 
    var method = handleMethods.Single(x => x.GetParameters().First().ParameterType.FullName == @event.GetType().FullName);
 
    method.Invoke(aggregate,new object[]{@event});
}
<p>

首先将EventStore中的ResolvedEvent 格式转为Domain Event,然后再调用聚合根实体的重建方法。

完整代码:

public class EventStoreRepository<T> : EventStoreRpositoryBasey<T> where T : Aggregate
{
 
    public T Get(Guid aggregateId)
    {
        var events = ReadAllEventsInStream(GetStreamName(aggregateId), connection);
 
        var aggregate = (T)Activator.CreateInstance(typeof(T));
 
        foreach (var @event in events)
        {
            var appEvent = (Event<T>)ResolvedEventToDomainEvent(@event);
            ApplyEventToAggragate(appEvent, aggregate);
        }
 
        return aggregate;
    }
 
    private void ApplyEventToAggragate(Event<T> @event, T aggregate)
    {
        var handleMethods = typeof(T).GetMethods(BindingFlags.NonPublic | BindingFlags.Instance)
            .Where(x => x.Name == "Handle");
 
        var method = handleMethods.Single(x => x.GetParameters().First().ParameterType.FullName == @event.GetType().FullName);
 
        method.Invoke(aggregate, new object[] { @event });
    }
 
    public void SaveEvent(Event<T> @event)
    {
        var streamName = GetStreamName(@event.AggregateId);
        var metadata = new Dictionary<string, object>
            {
                {"AggregateType",typeof(T).FullName},
            };
        connection.AppendToStream(streamName, ExpectedVersion.Any, DomainEventToEventData(@event.Id, @event, metadata));
    }
}
 
public class EventStoreRpositoryBase<T> where T : Aggregate
{
    protected IEventStoreConnection connection;
 
    public EventStoreRpositoryBasey()
    {
        connection = EventStoreConnection.Create(new IPEndPoint(IPAddress.Loopback, 1113));
        connection.Connect();
    }
 
    protected string GetStreamName(Guid aggregateId)
    {
        return string.Format("{0}-{1}", typeof(T).Name, aggregateId);
    }
 
    /// <summary>
    /// Serialize @event object to JSON and converts to byte array, builds metadata for event and builds EventData object.
    /// </summary>
    protected EventData DomainEventToEventData(Guid eventId, object @event, IDictionary<string, object> headers = null)
    {
        if (headers == null) headers = new Dictionary<string, object>();
        var serializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.None };
 
        var data = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(@event, serializerSettings));
 
        var eventHeaders = new Dictionary<string, object>(headers)
                {
                    {
                        "EventClrTypeName", @event.GetType().AssemblyQualifiedName
                    }
                };
        var metadata = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(eventHeaders, serializerSettings));
        var typeName = @event.GetType().Name;
 
        return new EventData(eventId, typeName, true, data, metadata);
    }
 
    /// <summary>
    /// Reads all events in stream
    /// </summary>
    protected IEnumerable<ResolvedEvent> ReadAllEventsInStream(string streamName, IEventStoreConnection connection, int pageSize = 500)
    {
        var result = new List<ResolvedEvent>();
        var coursor = StreamPosition.Start;
        StreamEventsSlice events = null;
        do
        {
            events = connection.ReadStreamEventsForward(streamName, coursor, pageSize, false);
            result.AddRange(events.Events);
            coursor = events.NextEventNumber;
 
        } while (events != null && !events.IsEndOfStream);
 
        return result;
    }
 
    /// <summary>
    /// Conversts ResolvedEvent pulled from the stream to domain event.
    /// </summary>
    protected object ResolvedEventToDomainEvent(ResolvedEvent @event)
    {
        var metadataJSON = Encoding.UTF8.GetString(@event.Event.Metadata);
        var metadata = JsonConvert.DeserializeObject<Dictionary<string, object>>(metadataJSON);
        var eventDataJSON = Encoding.UTF8.GetString(@event.Event.Data);
 
        var eventType =
            Assembly.GetExecutingAssembly().GetTypes().Single(x => x.AssemblyQualifiedName ==   metadata["EventClrTypeName"].ToString());
 
        return JsonConvert.DeserializeObject(eventDataJSON, eventType);
    }
 
}
<p>

1
猜你喜欢