Rx运算符指向不同的序列

重要提示 :有关结果的说明和更多详细信息,请查看我的答案

我需要对通常复制的一系列对象/事件进行分组和过滤,并使用TimeSpan间隔缓冲它们。 我尝试用大理石图解释它更好:

XXXXXYYYZZZZXXYZZ 

会产生

 X---Y---Z---X---Y---Z 

其中X,Y和Z是不同的事件类型,’ – ‘表示间隔。 另外,我还想通过一个关键属性区分它可用于所有类型,因为它们有一个共同的基类:

 X, Y, Z : A 

和A包含属性Key。 使用符号Xa表示X.Key = a,最终样本将是:

 Xa-Xb-Xa-Yb-Yc-Za-Za-Zc-Zb-Zc 

会产生

 Xa-Xb---Yb-Yc-Za-Zc-Zb 

任何人都可以帮我组合所需的Linq运算符(可能是DistinctUntilChanged和Buffer)来实现这种行为吗? 谢谢

更新18.08.12

根据要求,我试着给出一个更好的解释。 我们有设备收集和发送事件到Web服务。 这些设备具有旧的逻辑(由于向后兼容性,我们无法对其进行更改)并且它们会一直发送事件,直到它们收到确认为止; 在确认之后,他们将下一个事件发送到队列中,依此类推。 事件包含单元的网络地址和一些其他属性,用于区分每个设备的队列中的事件。 事件如下所示:

 class Event { public string NetworkAddress { get; } public string EventCode { get; } public string AdditionalAttribute { get; } } 

目标是每隔5秒处理从所有设备接收的区分事件,将信息存储在数据库中(这就是为什么我们不想批量执行)并将ack发送到设备。 让我们举两个设备和一些事件做一个例子:

 Device 'a': Event 1 (a1): NetworkAddress = '1', EventCode = A, AdditionalAttribute = 'x' Event 2 (a2): NetworkAddress = '1', EventCode = A, AdditionalAttribute = 'y' Event 3 (a3): NetworkAddress = '1', EventCode = B, AdditionalAttribute = 'x' Device 'b': Event 1 (b1): NetworkAddress = '2', EventCode = A, AdditionalAttribute = 'y' Event 2 (b2): NetworkAddress = '2', EventCode = B, AdditionalAttribute = 'x' Event 3 (b3): NetworkAddress = '2', EventCode = B, AdditionalAttribute = 'y' Event 4 (b4): NetworkAddress = '2', EventCode = C, AdditionalAttribute = 'x' Pn are the operations done by our server, explained later 

可能的大理石图(输入流+输出流):

 Device 'a' : -[a1]-[a1]-[a1]----------------[a2]-[a2]-[a2]-[a3]-[a3]-[a3]-... Device 'b' : ------[b1]-[b1]-[b2]-[b2]-[b2]------[b3]-[b3]-[b4]-[b4]-[b4]-... Time : ------------[1s]-----------[2s]------------[3s]------------[4s]- DB/acks (rx output) : ------------[P1]-----------[P2]------------[P3]------------[P4]- P1: Server stores and acknowledges [a1] and [b1] P2: " " " " [b2] P3: " " " " [a2] and [b3] P4: " " " " [a3] and [b4] 

最后我认为它可能是基本运算符的简单组合,但我是Rx的新手,我有点困惑,因为似乎有很多运算符(或运算符的组合)来获得相同的输出流。

更新19.08.12

请记住,此代码在服务器上运行,它应该运行几天没有内存泄漏…我不确定主题的行为。 目前,对于每个事件,我调用一个服务上的push操作,该服务调用Subject的OnNext,我应该在其上构建查询(如果我对主题的使用没有错)。

更新20.08.12

目前的实施,包括validation测试; 这就是我尝试过的,@ yamen似乎也是如此

 public interface IEventService { // Persists the events void Add(IEnumerable events); } public class Event { public string Description { get; set; } } ///  /// Implements the logic to handle events. ///  public class EventManager : IDisposable { private static readonly TimeSpan EventHandlingPeriod = TimeSpan.FromSeconds(5); private readonly Subject subject = new Subject(); private readonly IDisposable subscription; private readonly object locker = new object(); private readonly IEventService eventService; ///  /// Initializes a new instance of the  class. ///  /// The scheduler. public EventManager(IEventService eventService, IScheduler scheduler) { this.eventService = eventService; this.subscription = this.CreateQuery(scheduler); } ///  /// Pushes the event. ///  /// The event message. public void PushEvent(EventMessage eventMessage) { Contract.Requires(eventMessage != null); this.subject.OnNext(eventMessage); } ///  /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. ///  /// 2 public void Dispose() { this.Dispose(true); } private void Dispose(bool disposing) { if (disposing) { // Dispose unmanaged resources } this.subject.Dispose(); this.subscription.Dispose(); } private IDisposable CreateQuery(IScheduler scheduler) { var buffered = this.subject .DistinctUntilChanged(new EventComparer()) .Buffer(EventHandlingPeriod, scheduler); var query = buffered .Subscribe(this.HandleEvents); return query; } private void HandleEvents(IList eventMessages) { Contract.Requires(eventMessages != null); var events = eventMessages.Select(this.SelectEvent); this.eventService.Add(events); } private Event SelectEvent(EventMessage message) { return new Event { Description = "evaluated description" }; } private class EventComparer : IEqualityComparer { public bool Equals(EventMessage x, EventMessage y) { return x.NetworkAddress == y.NetworkAddress && x.EventCode == y.EventCode && x.Attribute == y.Attribute; } public int GetHashCode(EventMessage obj) { var s = string.Concat(obj.NetworkAddress + "_" + obj.EventCode + "_" + obj.Attribute); return s.GetHashCode(); } } } public class EventMessage { public string NetworkAddress { get; set; } public byte EventCode { get; set; } public byte Attribute { get; set; } // Other properties } 

而且测试:

 public void PushEventTest() { const string Address1 = "A:2.1.1"; const string Address2 = "A:2.1.2"; var eventServiceMock = new Mock(); var scheduler = new TestScheduler(); var target = new EventManager(eventServiceMock.Object, scheduler); var eventMessageA1 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 }; var eventMessageB1 = new EventMessage { NetworkAddress = Address2, EventCode = 1, Attribute = 5 }; var eventMessageA2 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 }; scheduler.Schedule(() => target.PushEvent(eventMessageA1)); scheduler.Schedule(TimeSpan.FromSeconds(1), () => target.PushEvent(eventMessageB1)); scheduler.Schedule(TimeSpan.FromSeconds(2), () => target.PushEvent(eventMessageA1)); scheduler.AdvanceTo(TimeSpan.FromSeconds(6).Ticks); eventServiceMock.Verify(s => s.Add(It.Is<List>(list => list.Count == 2)), Times.Once()); scheduler.Schedule(TimeSpan.FromSeconds(3), () => target.PushEvent(eventMessageB1)); scheduler.AdvanceTo(TimeSpan.FromSeconds(11).Ticks); eventServiceMock.Verify(s => s.Add(It.Is<List>(list => list.Count == 1)), Times.Once()); } 

此外,我再次评论说,软件可以运行几天没有问题,处理成千上万的消息非常重要。 为了说清楚:测试没有通过当前的实现。

我不确定这是否完全符合您的要求,但您可能会使用group关键字明确地对元素进行group ,然后在重新组合它们之前单独操作各种IObservable

例如,如果我们有类定义,如

 class A { public char Key { get; set; } } class X : A { } ... 

和一个Subject

 Subject subject = new Subject(); 

那我们就可以写了

 var buffered = from a in subject group a by new { Type = a.GetType(), Key = a.Key } into g from buffer in g.Buffer(TimeSpan.FromMilliseconds(300)) where buffer.Any() select new { Count = buffer.Count, Type = buffer.First().GetType().Name, Key = buffer.First().Key }; buffered.Do(Console.WriteLine).Subscribe(); 

我们可以使用您提供的数据对此进行测试:

 subject.OnNext(new X { Key = 'a' }); Thread.Sleep(100); subject.OnNext(new X { Key = 'b' }); Thread.Sleep(100); subject.OnNext(new X { Key = 'a' }); Thread.Sleep(100); ... subject.OnCompleted(); 

要获得您提供的输出:

 { Count = 2, Type = X, Key = a } { Count = 1, Type = X, Key = b } { Count = 1, Type = Y, Key = b } { Count = 1, Type = Y, Key = c } { Count = 2, Type = Z, Key = a } { Count = 2, Type = Z, Key = c } { Count = 1, Type = Z, Key = b } 

不确定这是否正是您想要的,但它似乎支持您的用例。

首先,让我们定义要使用的基类(您可以轻松地修改它以满足您的需求):

 public class MyEvent { public string NetworkAddress { set; get; } public string EventCode { set; get; } } 

让我们将您的设备设置为IObservable的数组 – 您可能会以不同的方式提供这些设备,并且下面需要进行更改以适应当然。 这些设备将产生一个随机延迟在0.5到1.5秒之间的值。

 var deviceA = new MyEvent[] { new MyEvent() {NetworkAddress = "A", EventCode = "1"}, new MyEvent() {NetworkAddress = "A", EventCode = "1"}, new MyEvent() {NetworkAddress = "A", EventCode = "2"} }; var deviceB = new MyEvent[] { new MyEvent() {NetworkAddress = "B", EventCode = "1"}, new MyEvent() {NetworkAddress = "B", EventCode = "2"}, new MyEvent() {NetworkAddress = "B", EventCode = "2"}, new MyEvent() {NetworkAddress = "B", EventCode = "3"} }; var random = new Random(); var deviceARand = deviceA.ToObservable().Select(a => Observable.Return(a).Delay(TimeSpan.FromMilliseconds(random.Next(500,1500)))).Concat(); var deviceBRand = deviceB.ToObservable().Select(b => Observable.Return(b).Delay(TimeSpan.FromMilliseconds(random.Next(500,1500)))).Concat(); var devices = new IObservable[] { deviceARand, deviceBRand }; 

现在让我们采用所有这些单独的设备流,使它们“不同”,并将它们合并为一个主流:

 var stream = devices.Aggregate(Observable.Empty(), (acc, device) => acc.DistinctUntilChanged(a => a.EventCode).Merge(device)); 

一旦你有了这个,定期使用这个流只是用Buffer缓冲它:

 stream.Buffer(TimeSpan.FromSeconds(1)).Subscribe(x => { /* code here works on a list of the filtered events per second */ }); 

在搜索和实验之后,我将一些代码放在一起,产生了我期望的输出:

 static void Main(string[] args) { const string Address1 = "A:2.1.1"; const string Address2 = "A:2.1.2"; var comparer = new EventComparer(); var eventMessageA1 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 }; var eventMessageB1 = new EventMessage { NetworkAddress = Address2, EventCode = 1, Attribute = 5 }; var eventMessageA2 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 5 }; var list = new[] { eventMessageA1, eventMessageA1, eventMessageB1, eventMessageA2, eventMessageA1, eventMessageA1 }; var queue = new BlockingCollection(); Observable.Interval(TimeSpan.FromSeconds(2)).Subscribe ( l => list.ToList().ForEach(m => { Console.WriteLine("Producing {0} on thread {1}", m, Thread.CurrentThread.ManagedThreadId); queue.Add(m); }) ); // subscribing queue.GetConsumingEnumerable() .ToObservable() .Buffer(TimeSpan.FromSeconds(5)) .Subscribe(e => { Console.WriteLine("Queue contains {0} items", queue.Count); e.Distinct(comparer).ToList().ForEach(m => Console.WriteLine("{0} - Consuming: {1} (queue contains {2} items)", DateTime.UtcNow, m, queue.Count)); } ); Console.WriteLine("Type enter to exit"); Console.ReadLine(); } public class EventComparer : IEqualityComparer { public bool Equals(EventMessage x, EventMessage y) { var result = x.NetworkAddress == y.NetworkAddress && x.EventCode == y.EventCode && x.Attribute == y.Attribute; return result; } public int GetHashCode(EventMessage obj) { var s = string.Concat(obj.NetworkAddress + "_" + obj.EventCode + "_" + obj.Attribute); return s.GetHashCode(); } } public class EventMessage { public string NetworkAddress { get; set; } public byte EventCode { get; set; } public byte Attribute { get; set; } public override string ToString() { const string Format = "{0} ({1}, {2})"; var s = string.Format(Format, this.NetworkAddress, this.EventCode, this.Attribute); return s; } } 

无论如何,监视应用程序,似乎这会导致内存泄漏。 我现在的问题是:

  • 什么导致内存泄漏? [请参阅下面的更新]
  • 这是最好的方法(如果我将distinct放在第一个observable上,我不会在下一个缓冲区中获取其他事件,但每个缓冲区中的项应该与其他缓冲区隔离)?
  • 如何使用测试调度程序编写测试?

更新

似乎内存增量只持续几分钟,然后值稳定。 我会进行长时间的测试。 当然,这将是绝对可以接受的行为。

更新26.08.12

  • 正如我在上一次更新中已经提到的那样,内存使用率在启动后的几分钟内只会(并且缓慢地)增加。 8小时后,消耗的内存稳定,正常波动范围为几KB)
  • 这个问题与我的非常相似,建议的Drain扩展可以很好地应用于我的问题(仍有待validation)

无论如何,我认为我的问题仍然是使用测试调度程序进行unit testing

谢谢弗朗切斯科