Last active
August 29, 2015 14:04
-
-
Save developmentalmadness/e9c780c26b07fa5767af to your computer and use it in GitHub Desktop.
NEventStore for idempotency prototype thrown together in about 2-3 hours (NEventStore and Unity are the only dependencies)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using CommonDomain; | |
using CommonDomain.Core; | |
using Microsoft.Practices.Unity; | |
using NEventStore; | |
using NEventStore.Dispatcher; | |
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace WMSEventStorePrototype | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
try | |
{ | |
var container = new UnityContainer(); | |
var dispatcher = new MyDispatcher(container); | |
var store = Wireup.Init() | |
.UsingInMemoryPersistence() | |
.InitializeStorageEngine() | |
.UsingJsonSerialization() | |
.UsingSynchronousDispatchScheduler(dispatcher) | |
.Build(); | |
container.RegisterInstance<IStoreEvents>(store); | |
container.RegisterInstance<MyAdapterService>(new MyAdapterService()); | |
dispatcher.RegisterHandler<EventA, MyAggregate>(); | |
dispatcher.RegisterHandler<EventB, MyAggregate>(); | |
using (store) | |
{ | |
using (var stream = store.CreateStream("I can build my own unique stream id")) | |
{ | |
var rev = stream.StreamRevision; | |
stream.Add(new EventMessage { Body = new EventA { Version = ++rev } }); | |
stream.CommitChanges(Guid.NewGuid()); | |
stream.Add(new EventMessage { Body = new EventA { Version = ++rev } }); | |
stream.CommitChanges(Guid.NewGuid()); | |
stream.Add(new EventMessage { Body = new EventA { Version = ++rev } }); | |
stream.CommitChanges(Guid.NewGuid()); | |
stream.Add(new EventMessage { Body = new EventB { Version = ++rev } }); | |
stream.CommitChanges(Guid.NewGuid()); | |
stream.Add(new EventMessage { Body = new EventB { Version = ++rev } }); | |
stream.CommitChanges(Guid.NewGuid()); | |
stream.Add(new EventMessage { Body = new EventB { Version = ++rev } }); | |
stream.CommitChanges(Guid.NewGuid()); | |
stream.Add(new EventMessage { Body = new EventB { Version = ++rev } }); | |
stream.CommitChanges(Guid.NewGuid()); | |
stream.Add(new EventMessage { Body = new EventB { Version = ++rev } }); | |
stream.CommitChanges(Guid.NewGuid()); // the commit id arbitrarily identifies the changeset and doesn't need to be tied to the aggregate directly (unless I could generate a consitent guid from my hash) | |
} | |
} | |
} | |
catch (Exception ex) | |
{ | |
Console.ForegroundColor = ConsoleColor.Red; | |
Console.WriteLine(ex.Message); | |
Console.WriteLine(ex.StackTrace); | |
Console.ResetColor(); | |
} | |
} | |
} | |
public class MyDispatcher : IDispatchCommits | |
{ | |
IUnityContainer container; | |
Dictionary<Type, Action<IUnityContainer, IEvent>> registry = new Dictionary<Type, Action<IUnityContainer, IEvent>>(); | |
public MyDispatcher(IUnityContainer container) | |
{ | |
this.container = container; | |
} | |
public void Dispatch(ICommit commit) | |
{ | |
var store = container.Resolve<IStoreEvents>(); | |
int minRevision = int.MinValue; | |
var snapshot = store.Advanced.GetSnapshot(commit.StreamId, int.MaxValue); | |
if (snapshot != null) | |
minRevision = snapshot.StreamRevision; | |
else | |
snapshot = new Snapshot(commit.StreamId, commit.StreamRevision, new MyMemento()); | |
var stream = store.OpenStream(commit.StreamId, minRevision, int.MaxValue); | |
using (var child = container.CreateChildContainer()) | |
{ | |
child.RegisterInstance<ISnapshot>(snapshot); | |
child.RegisterInstance<IEventStream>(stream); | |
foreach (var msg in commit.Events) | |
{ | |
// TODO: keep handler around so we don't have to re-initialize on each iteration (don't worry about it for now - all our commits only have a single event) | |
registry[msg.Body.GetType()](child, (IEvent)msg.Body); | |
} | |
} | |
} | |
public void Dispose() { } | |
public void RegisterHandler<T, H>() | |
where H : IHandler<T> | |
where T : IEvent | |
{ | |
registry.Add(typeof(T), (unity, evt) => ((IHandler<T>)unity.Resolve(typeof(H))).Handle(evt)); | |
} | |
} | |
public class MyAdapterService | |
{ | |
public void UpdateA(int id, int value, int version) | |
{ | |
Console.WriteLine("UpdateA - Id: {0}, Value: {1}, Version: {2}", id, value, version); | |
} | |
public void UpdateB(int id, int value, int version) | |
{ | |
Console.WriteLine("UpdateB - Id: {0}, Value: {1}, Version: {2}", id, value, version); | |
} | |
} | |
public interface IHandler<T> where T : IEvent | |
{ | |
void Handle(object message); | |
} | |
public interface IEvent | |
{ | |
string MessageId { get; set; } | |
int Version { get; set; } | |
} | |
public class MyMemento : IMemento | |
{ | |
public int RecordId { get; set; } | |
public int TotalA { get; set; } | |
public int TotalB { get; set; } | |
public Guid Id | |
{ | |
get; set; | |
} | |
public int Version | |
{ | |
get; set; | |
} | |
} | |
public class MyAggregate : IHandler<EventA>, IHandler<EventB> | |
{ | |
IEventStream stream; | |
ISnapshot snapshot; | |
MyAdapterService adapter; | |
int id = 0; | |
int totalA = 0; | |
int totalB = 0; | |
public MyAggregate(IEventStream stream, ISnapshot snapshot, MyAdapterService adapter) | |
{ | |
this.stream = stream; | |
this.snapshot = snapshot; | |
this.adapter = adapter; | |
if (snapshot != null) | |
{ | |
var m = (MyMemento)snapshot.Payload; | |
id = m.RecordId; | |
totalA = m.TotalA; | |
totalB = m.TotalB; | |
} | |
Register<EventA>(applyRegistry, Apply); | |
Register<EventB>(applyRegistry, Apply); | |
Register<EventA>(updateRegistry, Update); | |
Register<EventB>(updateRegistry, Update); | |
foreach (var msg in stream.CommittedEvents) | |
{ | |
Apply(msg); | |
} | |
} | |
Dictionary<Type, Action<object>> updateRegistry = new Dictionary<Type, Action<object>>(); | |
Dictionary<Type, Action<object>> applyRegistry = new Dictionary<Type, Action<object>>(); | |
private void Register<T>(IDictionary<Type, Action<object>> registry, Action<T> action) | |
{ | |
if (action == null) throw new ArgumentNullException("action"); | |
registry.Add(typeof(T), evt => action((T)evt)); | |
} | |
public void Handle(object msg) | |
{ | |
Action<object> handler; | |
if (updateRegistry.TryGetValue(msg.GetType(), out handler)) | |
{ | |
handler(msg); | |
} | |
else | |
{ | |
throw new NotSupportedException(String.Format("No update handler is registered for the type '{0}'", msg.GetType())); | |
} | |
} | |
private void Update(EventA @event) | |
{ | |
adapter.UpdateA(id, totalA, @event.Version); | |
} | |
private void Update(EventB @event) | |
{ | |
adapter.UpdateB(id, totalB, @event.Version); | |
} | |
private void Apply(EventMessage msg) | |
{ | |
Action<object> handler; | |
if (applyRegistry.TryGetValue(msg.Body.GetType(), out handler)) | |
{ | |
handler(msg.Body); | |
} | |
else | |
{ | |
throw new NotSupportedException(String.Format("No apply handler is registered for the type '{0}'", msg.Body.GetType())); | |
} | |
} | |
private void Apply(EventA @event) | |
{ | |
totalA++; | |
} | |
private void Apply(EventB @event) | |
{ | |
totalB++; | |
} | |
} | |
public class EventA : IEvent | |
{ | |
public int Version { get; set; } | |
public string MessageId | |
{ | |
get; set; | |
} | |
} | |
public class EventB : IEvent | |
{ | |
public int Version { get; set; } | |
public string MessageId | |
{ | |
get; set; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment