public voID Save<T>(T aggregate,int? expectedVersion = null) where T : AggregateRoot { if (expectedVersion != null && _eventStore.Get(typeof(T),aggregate.ID,expectedVersion.Value).Any()) throw new ConcurrencyException(aggregate.ID); var i = 0; foreach (var @event in aggregate.GetUncommittedChanges()) { // ... [irrelevant code removed] ... _eventStore.Save(typeof(T),@event); _publisher.Publish(@event); } aggregate.MarkChangesAsCommitted(); }
令我不安的是,这个方法是在聚合被告知将它们标记为已提交之前提交要发布给订阅者的事件.因此,如果观察给定事件的事件处理程序阻塞,则聚合将不会提交先前事件处理程序可能已被通知的已提交更改.
为什么我不会将_publisher.Publish(@event)移动到aggregate.MarkChangesAsCommitted()之后,就像这样.我错过了什么?
public voID Save<T>(T aggregate,expectedVersion.Value).Any()) throw new ConcurrencyException(aggregate.ID); var events = aggregate.GetUncommittedChanges(); foreach (var @event in events) { // ... [irrelevant code removed] ... _eventStore.Save(typeof(T),@event); } aggregate.MarkChangesAsCommitted(); _publisher.Publish(events); }解决方法 这两种方法都存在问题,因为Save和Publish之间可能存在错误,无论调用这两种方法的顺序如何.这可能导致未发布的事件被发布或保存的事件未被发布.存在内存状态损坏(在聚合对象中)的问题也存在(尽管可以通过简单地捕获由事件处理程序产生的错误来处理).
该问题的一个解决方案是使用两阶段提交(例如,如果您的事件存储是基于sql Server且发布者是基于MSMQ的,则可用).但是,这具有性能,可伸缩性和 *** 作含义,并且不允许后期订阅者(见下文).
更好的方法是允许对事件感兴趣的各方将它们从事件存储中拉出(理想情况下,将其与某种通知机制相结合或长时间轮询以使其更具“反应性”).这将跟踪最后收到的事件的责任转移给订户,允许
>迟到的订阅者(在存储事件后很长时间内加入)接收旧事件以及新事件,
>没有两阶段提交的可靠性.
在搜索“使用事件存储作为队列”之类的内容时,您应该找到更多有关此方法的信息,而Greg的答案中的视频可能会为此添加很多内容.
一个常见的算法就是这个:
>事件存储为每个保存的事件分配一个检查点令牌(例如,序列号);
>订阅者向事件存储区询问新事件(定期,基于长轮询,对推送通知做出反应等),从他们知道的最后一个检查点令牌(如果有的话)开始,
>事件存储从该检查点令牌开始发送较新的事件以及新的检查点令牌,
>订阅者处理事件,并且如果可能的话,以原子方式存储新的检查点令牌以及它们产生的任何副作用;
>如果无法进行原子保存,他们可以在产生副作用后存储新的检查点令牌,并且他们需要一种方法来忽略他们已经看到的事件,以防中间存在错误(事件处理被认为是“幂等”);
>订阅者再次从#2开始.
我想补充一点,我不认为忽略保存/发布问题生产就绪的事件存储.有关替代方案,请参阅Greg Young’s Event Store或(目前或多或少未维护)NEventStore.
总结以上是内存溢出为你收集整理的事件 – CQRS存储库/事件发布者全部内容,希望文章能够帮你解决事件 – CQRS存储库/事件发布者所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)