public class EventStoreRepository<T> : EventStoreRpositoryBasey<T> where T : Aggregate { public T Get(Guid aggregateId) { var events = ReadAllEventsInStream(GetStreamName(aggregateId), connection); var aggregate = (T)Activator.CreateInstance(typeof(T)); foreach (var @event in events) { var appEvent = (Event<T>)ResolvedEventToDomainEvent(@event); ApplyEventToAggragate(appEvent, aggregate); } return aggregate; } private void ApplyEventToAggragate(Event<T> @event, T aggregate) { var handleMethods = typeof(T).GetMethods(BindingFlags.NonPublic | BindingFlags.Instance) .Where(x => x.Name == "Handle"); var method = handleMethods.Single(x => x.GetParameters().First().ParameterType.FullName == @event.GetType().FullName); method.Invoke(aggregate, new object[] { @event }); } public void SaveEvent(Event<T> @event) { var streamName = GetStreamName(@event.AggregateId); var metadata = new Dictionary<string, object> { {"AggregateType",typeof(T).FullName}, }; connection.AppendToStream(streamName, ExpectedVersion.Any, DomainEventToEventData(@event.Id, @event, metadata)); } } public class EventStoreRpositoryBase<T> where T : Aggregate { protected IEventStoreConnection connection; public EventStoreRpositoryBasey() { connection = EventStoreConnection.Create(new IPEndPoint(IPAddress.Loopback, 1113)); connection.Connect(); } protected string GetStreamName(Guid aggregateId) { return string.Format("{0}-{1}", typeof(T).Name, aggregateId); } /// <summary> /// Serialize @event object to JSON and converts to byte array, builds metadata for event and builds EventData object. /// </summary> protected EventData DomainEventToEventData(Guid eventId, object @event, IDictionary<string, object> headers = null) { if (headers == null) headers = new Dictionary<string, object>(); var serializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.None }; var data = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(@event, serializerSettings)); var eventHeaders = new Dictionary<string, object>(headers) { { "EventClrTypeName", @event.GetType().AssemblyQualifiedName } }; var metadata = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(eventHeaders, serializerSettings)); var typeName = @event.GetType().Name; return new EventData(eventId, typeName, true, data, metadata); } /// <summary> /// Reads all events in stream /// </summary> protected IEnumerable<ResolvedEvent> ReadAllEventsInStream(string streamName, IEventStoreConnection connection, int pageSize = 500) { var result = new List<ResolvedEvent>(); var coursor = StreamPosition.Start; StreamEventsSlice events = null; do { events = connection.ReadStreamEventsForward(streamName, coursor, pageSize, false); result.AddRange(events.Events); coursor = events.NextEventNumber; } while (events != null && !events.IsEndOfStream); return result; } /// <summary> /// Conversts ResolvedEvent pulled from the stream to domain event. /// </summary> protected object ResolvedEventToDomainEvent(ResolvedEvent @event) { var metadataJSON = Encoding.UTF8.GetString(@event.Event.Metadata); var metadata = JsonConvert.DeserializeObject<Dictionary<string, object>>(metadataJSON); var eventDataJSON = Encoding.UTF8.GetString(@event.Event.Data); var eventType = Assembly.GetExecutingAssembly().GetTypes().Single(x => x.AssemblyQualifiedName == metadata["EventClrTypeName"].ToString()); return JsonConvert.DeserializeObject(eventDataJSON, eventType); } }
|