处理聚合的所有事件
请参阅下面的第一个永久订阅:
namespace PersistentSubscription { internal class Program { private static void Main() { var subscription = new PersistentSubscriptionClient(); subscription.Start(); } } public class PersistentSubscriptionClient { private IEventStoreConnection _conn; private const string STREAM = "$ce-customer"; private const string GROUP = "a_test_group"; private const int DEFAULTPORT = 1113; private static readonly UserCredentials User = new UserCredentials("admin", "changeit"); private EventStorePersistentSubscriptionBase _subscription; public void Start() { var settings = ConnectionSettings.Create(); using (_conn = EventStoreConnection.Create(settings, new IPEndPoint(IPAddress.Loopback, DEFAULTPORT))) { _conn.ConnectAsync().Wait(); CreateSubscription(); ConnectToSubscription(); Console.WriteLine("waiting for events. press enter to exit"); Console.ReadLine(); } } private void ConnectToSubscription() { var bufferSize = 10; var autoAck = true; Action eventAppeared = EventAppeared; _subscription = _conn.ConnectToPersistentSubscription(STREAM, GROUP, eventAppeared, SubscriptionDropped, User, bufferSize, autoAck); } private void SubscriptionDropped(EventStorePersistentSubscriptionBase eventStorePersistentSubscriptionBase, SubscriptionDropReason subscriptionDropReason, Exception ex) { ConnectToSubscription(); } private static void EventAppeared(EventStorePersistentSubscriptionBase eventStorePersistentSubscriptionBase, ResolvedEvent resolvedEvent) { MemoryStream stream = new MemoryStream(resolvedEvent.Event.Data); IFormatter formatter = new BinaryFormatter(); stream.Seek(0, SeekOrigin.Begin); try { CustomerCreated customerCreated = (CustomerCreated)formatter.Deserialize(stream); Console.WriteLine(customerCreated); } catch (Exception e) { var test = "test"; } } private void CreateSubscription() { PersistentSubscriptionSettings settings = PersistentSubscriptionSettings.Create() .DoNotResolveLinkTos() .StartFromCurrent(); try { _conn.CreatePersistentSubscriptionAsync(STREAM, GROUP, settings, User).Wait(); } catch (AggregateException ex) { if (ex.InnerException.GetType() != typeof(InvalidOperationException) && ex.InnerException?.Message != $"Subscription group {GROUP} on stream {STREAM} already exists") { throw; } } } } }
我的第一个客户如下:
using System; using System.IO; using System.Net; using System.Runtime.Serialization; using System.Runtime.Serialization.Formatters.Binary; using System.Text; using EventStore.ClientAPI; namespace WritingEvents { class Program { static void Main(string[] args) { const int DEFAULTPORT = 1113; var settings = ConnectionSettings.Create(); using (var conn = EventStoreConnection.Create(settings, new IPEndPoint(IPAddress.Loopback, DEFAULTPORT))) { conn.ConnectAsync().Wait(); CustomerCreated c1 = new CustomerCreated { Id = Guid.NewGuid(), Name = "Maria" }; EventData customerCreated1 = GetEventDataFor(c1); conn.AppendToStreamAsync("customer-100", ExpectedVersion.Any, customerCreated1).Wait(); } } private static EventData GetEventDataFor(CustomerCreated customerCreated) { IFormatter formatter = new BinaryFormatter(); MemoryStream stream = new MemoryStream(); formatter.Serialize(stream, customerCreated); byte[] customerCreatedEventByteArray = stream.ToArray(); return new EventData( Guid.NewGuid(), "eventType", true, customerCreatedEventByteArray, null ); } } [Serializable] public class CustomerCreated { public Guid Id { get; set; } public string Name { get; set; } } }
我运行服务器,然后运行客户端。 在服务器端反序列化CustomerCreated事件时,我看到一个错误。 错误是:“在解析完成之前遇到了流的结束”。
如果我更改此行:
private const string STREAM = "$ce-customer";
对此:
private const string STREAM = "customer-100";
然后反序列化在服务器端正常工作。
我如何处理所有客户事件 – 而不仅仅是客户100?
我在启动Event Store时有--run-projections=all
。 我也启用了所有预测:
这个问题帮助了我: 使用Event Store Client API(.NET),如何写入流并将一个事件链接到另一个事件?
我只需要改变这个:
PersistentSubscriptionSettings settings = PersistentSubscriptionSettings.Create() .DoNotResolveLinkTos() //Specifically this line .StartFromCurrent();
对此:
PersistentSubscriptionSettings settings = PersistentSubscriptionSettings.Create() .ResolveLinkTos() //Specifically this line .StartFromCurrent();
DoNotResolveLinkTos
获取原始事件的链接 ,而ResolveLinkTos获取实际事件本身。 因此,我试图反序列化链接对象,这导致exception。