处理聚合的所有事件

请参阅下面的第一个永久订阅:

namespace PersistentSubscription { internal class Program { private static void Main() { var subscription = new PersistentSubscriptionClient(); subscription.Start(); } } public class PersistentSubscriptionClient { private IEventStoreConnection _conn; private const string STREAM = "$ce-customer"; private const string GROUP = "a_test_group"; private const int DEFAULTPORT = 1113; private static readonly UserCredentials User = new UserCredentials("admin", "changeit"); private EventStorePersistentSubscriptionBase _subscription; public void Start() { var settings = ConnectionSettings.Create(); using (_conn = EventStoreConnection.Create(settings, new IPEndPoint(IPAddress.Loopback, DEFAULTPORT))) { _conn.ConnectAsync().Wait(); CreateSubscription(); ConnectToSubscription(); Console.WriteLine("waiting for events. press enter to exit"); Console.ReadLine(); } } private void ConnectToSubscription() { var bufferSize = 10; var autoAck = true; Action eventAppeared = EventAppeared; _subscription = _conn.ConnectToPersistentSubscription(STREAM, GROUP, eventAppeared, SubscriptionDropped, User, bufferSize, autoAck); } private void SubscriptionDropped(EventStorePersistentSubscriptionBase eventStorePersistentSubscriptionBase, SubscriptionDropReason subscriptionDropReason, Exception ex) { ConnectToSubscription(); } private static void EventAppeared(EventStorePersistentSubscriptionBase eventStorePersistentSubscriptionBase, ResolvedEvent resolvedEvent) { MemoryStream stream = new MemoryStream(resolvedEvent.Event.Data); IFormatter formatter = new BinaryFormatter(); stream.Seek(0, SeekOrigin.Begin); try { CustomerCreated customerCreated = (CustomerCreated)formatter.Deserialize(stream); Console.WriteLine(customerCreated); } catch (Exception e) { var test = "test"; } } private void CreateSubscription() { PersistentSubscriptionSettings settings = PersistentSubscriptionSettings.Create() .DoNotResolveLinkTos() .StartFromCurrent(); try { _conn.CreatePersistentSubscriptionAsync(STREAM, GROUP, settings, User).Wait(); } catch (AggregateException ex) { if (ex.InnerException.GetType() != typeof(InvalidOperationException) && ex.InnerException?.Message != $"Subscription group {GROUP} on stream {STREAM} already exists") { throw; } } } } } 

我的第一个客户如下:

 using System; using System.IO; using System.Net; using System.Runtime.Serialization; using System.Runtime.Serialization.Formatters.Binary; using System.Text; using EventStore.ClientAPI; namespace WritingEvents { class Program { static void Main(string[] args) { const int DEFAULTPORT = 1113; var settings = ConnectionSettings.Create(); using (var conn = EventStoreConnection.Create(settings, new IPEndPoint(IPAddress.Loopback, DEFAULTPORT))) { conn.ConnectAsync().Wait(); CustomerCreated c1 = new CustomerCreated { Id = Guid.NewGuid(), Name = "Maria" }; EventData customerCreated1 = GetEventDataFor(c1); conn.AppendToStreamAsync("customer-100", ExpectedVersion.Any, customerCreated1).Wait(); } } private static EventData GetEventDataFor(CustomerCreated customerCreated) { IFormatter formatter = new BinaryFormatter(); MemoryStream stream = new MemoryStream(); formatter.Serialize(stream, customerCreated); byte[] customerCreatedEventByteArray = stream.ToArray(); return new EventData( Guid.NewGuid(), "eventType", true, customerCreatedEventByteArray, null ); } } [Serializable] public class CustomerCreated { public Guid Id { get; set; } public string Name { get; set; } } } 

我运行服务器,然后运行客户端。 在服务器端反序列化CustomerCreated事件时,我看到一个错误。 错误是:“在解析完成之前遇到了流的结束”。

如果我更改此行:

 private const string STREAM = "$ce-customer"; 

对此:

 private const string STREAM = "customer-100"; 

然后反序列化在服务器端正常工作。

我如何处理所有客户事件 – 而不仅仅是客户100?

我在启动Event Store时有--run-projections=all 。 我也启用了所有预测:

在此处输入图像描述

这个问题帮助了我: 使用Event Store Client API(.NET),如何写入流并将一个事件链接到另一个事件?

我只需要改变这个:

 PersistentSubscriptionSettings settings = PersistentSubscriptionSettings.Create() .DoNotResolveLinkTos() //Specifically this line .StartFromCurrent(); 

对此:

 PersistentSubscriptionSettings settings = PersistentSubscriptionSettings.Create() .ResolveLinkTos() //Specifically this line .StartFromCurrent(); 

DoNotResolveLinkTos获取原始事件的链接 ,而ResolveLinkTos获取实际事件本身。 因此,我试图反序列化链接对象,这导致exception。