Skip to content

Instantly share code, notes, and snippets.

@developmentalmadness
Last active August 29, 2015 14:04
Show Gist options
  • Save developmentalmadness/e9c780c26b07fa5767af to your computer and use it in GitHub Desktop.
Save developmentalmadness/e9c780c26b07fa5767af to your computer and use it in GitHub Desktop.
NEventStore for idempotency prototype thrown together in about 2-3 hours (NEventStore and Unity are the only dependencies)
using CommonDomain;
using CommonDomain.Core;
using Microsoft.Practices.Unity;
using NEventStore;
using NEventStore.Dispatcher;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace WMSEventStorePrototype
{
class Program
{
static void Main(string[] args)
{
try
{
var container = new UnityContainer();
var dispatcher = new MyDispatcher(container);
var store = Wireup.Init()
.UsingInMemoryPersistence()
.InitializeStorageEngine()
.UsingJsonSerialization()
.UsingSynchronousDispatchScheduler(dispatcher)
.Build();
container.RegisterInstance<IStoreEvents>(store);
container.RegisterInstance<MyAdapterService>(new MyAdapterService());
dispatcher.RegisterHandler<EventA, MyAggregate>();
dispatcher.RegisterHandler<EventB, MyAggregate>();
using (store)
{
using (var stream = store.CreateStream("I can build my own unique stream id"))
{
var rev = stream.StreamRevision;
stream.Add(new EventMessage { Body = new EventA { Version = ++rev } });
stream.CommitChanges(Guid.NewGuid());
stream.Add(new EventMessage { Body = new EventA { Version = ++rev } });
stream.CommitChanges(Guid.NewGuid());
stream.Add(new EventMessage { Body = new EventA { Version = ++rev } });
stream.CommitChanges(Guid.NewGuid());
stream.Add(new EventMessage { Body = new EventB { Version = ++rev } });
stream.CommitChanges(Guid.NewGuid());
stream.Add(new EventMessage { Body = new EventB { Version = ++rev } });
stream.CommitChanges(Guid.NewGuid());
stream.Add(new EventMessage { Body = new EventB { Version = ++rev } });
stream.CommitChanges(Guid.NewGuid());
stream.Add(new EventMessage { Body = new EventB { Version = ++rev } });
stream.CommitChanges(Guid.NewGuid());
stream.Add(new EventMessage { Body = new EventB { Version = ++rev } });
stream.CommitChanges(Guid.NewGuid()); // the commit id arbitrarily identifies the changeset and doesn't need to be tied to the aggregate directly (unless I could generate a consitent guid from my hash)
}
}
}
catch (Exception ex)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine(ex.Message);
Console.WriteLine(ex.StackTrace);
Console.ResetColor();
}
}
}
public class MyDispatcher : IDispatchCommits
{
IUnityContainer container;
Dictionary<Type, Action<IUnityContainer, IEvent>> registry = new Dictionary<Type, Action<IUnityContainer, IEvent>>();
public MyDispatcher(IUnityContainer container)
{
this.container = container;
}
public void Dispatch(ICommit commit)
{
var store = container.Resolve<IStoreEvents>();
int minRevision = int.MinValue;
var snapshot = store.Advanced.GetSnapshot(commit.StreamId, int.MaxValue);
if (snapshot != null)
minRevision = snapshot.StreamRevision;
else
snapshot = new Snapshot(commit.StreamId, commit.StreamRevision, new MyMemento());
var stream = store.OpenStream(commit.StreamId, minRevision, int.MaxValue);
using (var child = container.CreateChildContainer())
{
child.RegisterInstance<ISnapshot>(snapshot);
child.RegisterInstance<IEventStream>(stream);
foreach (var msg in commit.Events)
{
// TODO: keep handler around so we don't have to re-initialize on each iteration (don't worry about it for now - all our commits only have a single event)
registry[msg.Body.GetType()](child, (IEvent)msg.Body);
}
}
}
public void Dispose() { }
public void RegisterHandler<T, H>()
where H : IHandler<T>
where T : IEvent
{
registry.Add(typeof(T), (unity, evt) => ((IHandler<T>)unity.Resolve(typeof(H))).Handle(evt));
}
}
public class MyAdapterService
{
public void UpdateA(int id, int value, int version)
{
Console.WriteLine("UpdateA - Id: {0}, Value: {1}, Version: {2}", id, value, version);
}
public void UpdateB(int id, int value, int version)
{
Console.WriteLine("UpdateB - Id: {0}, Value: {1}, Version: {2}", id, value, version);
}
}
public interface IHandler<T> where T : IEvent
{
void Handle(object message);
}
public interface IEvent
{
string MessageId { get; set; }
int Version { get; set; }
}
public class MyMemento : IMemento
{
public int RecordId { get; set; }
public int TotalA { get; set; }
public int TotalB { get; set; }
public Guid Id
{
get; set;
}
public int Version
{
get; set;
}
}
public class MyAggregate : IHandler<EventA>, IHandler<EventB>
{
IEventStream stream;
ISnapshot snapshot;
MyAdapterService adapter;
int id = 0;
int totalA = 0;
int totalB = 0;
public MyAggregate(IEventStream stream, ISnapshot snapshot, MyAdapterService adapter)
{
this.stream = stream;
this.snapshot = snapshot;
this.adapter = adapter;
if (snapshot != null)
{
var m = (MyMemento)snapshot.Payload;
id = m.RecordId;
totalA = m.TotalA;
totalB = m.TotalB;
}
Register<EventA>(applyRegistry, Apply);
Register<EventB>(applyRegistry, Apply);
Register<EventA>(updateRegistry, Update);
Register<EventB>(updateRegistry, Update);
foreach (var msg in stream.CommittedEvents)
{
Apply(msg);
}
}
Dictionary<Type, Action<object>> updateRegistry = new Dictionary<Type, Action<object>>();
Dictionary<Type, Action<object>> applyRegistry = new Dictionary<Type, Action<object>>();
private void Register<T>(IDictionary<Type, Action<object>> registry, Action<T> action)
{
if (action == null) throw new ArgumentNullException("action");
registry.Add(typeof(T), evt => action((T)evt));
}
public void Handle(object msg)
{
Action<object> handler;
if (updateRegistry.TryGetValue(msg.GetType(), out handler))
{
handler(msg);
}
else
{
throw new NotSupportedException(String.Format("No update handler is registered for the type '{0}'", msg.GetType()));
}
}
private void Update(EventA @event)
{
adapter.UpdateA(id, totalA, @event.Version);
}
private void Update(EventB @event)
{
adapter.UpdateB(id, totalB, @event.Version);
}
private void Apply(EventMessage msg)
{
Action<object> handler;
if (applyRegistry.TryGetValue(msg.Body.GetType(), out handler))
{
handler(msg.Body);
}
else
{
throw new NotSupportedException(String.Format("No apply handler is registered for the type '{0}'", msg.Body.GetType()));
}
}
private void Apply(EventA @event)
{
totalA++;
}
private void Apply(EventB @event)
{
totalB++;
}
}
public class EventA : IEvent
{
public int Version { get; set; }
public string MessageId
{
get; set;
}
}
public class EventB : IEvent
{
public int Version { get; set; }
public string MessageId
{
get; set;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment