对缓冲的Observable进行排序

我有一批令牌非常快,而且处理器相对较慢。 令牌有三种子类型,我希望它们按优先级处理。 因此,我希望令牌在生成后进行缓冲,并等待处理并按优先级对缓冲区进行排序。

这是我的课程:

public enum Priority { High = 3, Medium = 2, Low = 1 } public class Base : IComparable { public int Id { get; set; } public int CompareTo(Base other) { return Id.CompareTo(other.Id); } } public class Foo : Base { } public class Bar : Base { } public class Baz : Base { } public class Token : IComparable { private readonly string _toString; public Foo Foo { get; } public Bar Bar { get; } public Baz Baz { get; } public Priority Priority => Baz == null ? Bar == null ? Priority.High : Priority.Medium : Priority.Low; public int CompareTo(Token other) { if (Priority > other.Priority) { return -1; } if (Priority < other.Priority) { return 1; } switch (Priority) { case Priority.High: return Foo.CompareTo(other.Foo); case Priority.Medium: return Bar.CompareTo(other.Bar); case Priority.Low: return Baz.CompareTo(other.Baz); default: throw new ArgumentOutOfRangeException(); } } public override string ToString() { return _toString; } public Token(Foo foo) { _toString = $"{nameof(Foo)}:{foo.Id}"; Foo = foo; } public Token(Foo foo, Bar bar) : this(foo) { _toString += $":{nameof(Bar)}:{bar.Id}"; Bar = bar; } public Token(Foo foo, Baz baz) : this(foo) { _toString += $":{nameof(Baz)}:{baz.Id}"; Baz = baz; } } 

这是我的生产者代码:

 var random = new Random(); var bazId = 0; var barId = 0; var fooTokens = (from id in Observable.Interval(TimeSpan.FromSeconds(1)) .Select(Convert.ToInt32) .Take(3) select new Token(new Foo { Id = id })) .Publish(); var barTokens = (from fooToken in fooTokens from id in Observable.Range(0, random.Next(5, 10)) .Select(_ => Interlocked.Increment(ref barId)) select new Token(fooToken.Foo, new Bar { Id = id })) .Publish(); var bazTokens = (from barToken in barTokens from id in Observable.Range(0, random.Next(1, 5)) .Select(_ => Interlocked.Increment(ref bazId)) select new Token(barToken.Foo, new Baz { Id = id })) .Publish(); var tokens = bazTokens.Merge(barTokens) .Merge(fooTokens) .Do(dt => { Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}"); }); // Subscription bazTokens.Connect(); barTokens.Connect(); fooTokens.Connect(); 

但是我对如何缓冲和排序令牌感到有些困惑。 如果我这样做,令牌似乎同时产生和消耗,这表明在幕后有一些缓冲,但我无法控制它。

 tokens.Subscribe(dt => { Thread.Sleep(TimeSpan.FromMilliseconds(250)); Console.ForegroundColor = ConsoleColor.Green; Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}"); }); 

如果我使用TPL Dataflow ActionBlock ,我可以看到令牌正确生成并正确处理,但我仍然不确定如何进行排序。

 var proc = new ActionBlock(dt => { Thread.Sleep(TimeSpan.FromMilliseconds(250)); Console.ForegroundColor = ConsoleColor.Green; Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}"); }); tokens.Subscribe(dt => proc.Post(dt)); 

任何想法或指示下一步将不胜感激!

更新:

我得到了一些工作。 我添加了一个帮助程序来清理显示测试数据的代码:

 private static void Display(Token dt, ConsoleColor col, int? wait = null) { if (wait.HasValue) { Thread.Sleep(TimeSpan.FromMilliseconds(wait.Value)); } Console.ForegroundColor = col; Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}"); } 

我添加了一个SortedSet

 var set = new SortedSet(); var tokens = bazTokens .Merge(barTokens) .Merge(fooTokens) .Do(dt => Display(dt, ConsoleColor.Red)); tokens.Subscribe(dt => set.Add(dt)); 

我还添加了一个消费者,虽然我不喜欢我的实现:

 var source = new CancellationTokenSource(); Task.Run(() => { while (!source.IsCancellationRequested) { var dt = set.FirstOrDefault(); if (dt == null) { continue; } if (set.Remove(dt)) { Display(dt, ConsoleColor.Green, 250); } } }, source.Token); 

所以,现在我得到了我正在寻找的结果,但是a)我对于轮询而不满意b)如果我想要多个消费者,我将遇到竞争条件。 所以,如果有人有,我仍然在寻找更好的实施方案!

你想要的容器是一个优先级队列,遗憾的是.net运行时没有实现(c ++ stl / cli中有,但是priority_queue不能用于其他语言)。

现有的非MS容器可以填充此角色,您需要搜索并查看结果以选择满足您需求的容器。

使用Dataflow,您可以过滤令牌,使每个优先级在管道中沿着不同的路径向下。 通过在每个优先级类型链接上使用谓词来过滤令牌。 然后由您决定如何根据优先级给出偏好。

排序:

 var highPriority = new ActionBlock(dt => { Thread.Sleep(TimeSpan.FromMilliseconds(250)); Console.ForegroundColor = ConsoleColor.Green; Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}"); }); var midPriority = new ActionBlock(dt => { Thread.Sleep(TimeSpan.FromMilliseconds(250)); Console.ForegroundColor = ConsoleColor.Green; Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}"); }); var lowPriority = new ActionBlock(dt => { Thread.Sleep(TimeSpan.FromMilliseconds(250)); Console.ForegroundColor = ConsoleColor.Green; Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}"); }); var proc = new BufferBlock(); proc.LinkTo(highPriority, dt => dt.Priority == Priority.High); proc.LinkTo(midPriority, dt => dt.Priority == Priority.Medium); proc.LinkTo(lowPriority, dt => dt.Priority == Priority.Low); tokens.Subscribe(dt => proc.Post(dt)); 

优先考虑优先级较高的项目的一种方法是允许多于默认的顺序处理。 您可以通过为每个优先级块设置MaxDegreeOfParallelism来实现。

给予偏好:

 var highPriOptions = new DataflowLinkOptions(){MaxDegreeOfParallelism = 3} var highPriority = new ActionBlock(dt => { Thread.Sleep(TimeSpan.FromMilliseconds(250)); Console.ForegroundColor = ConsoleColor.Green; Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}"); }, highPriOptions); var midPriOptions = new DataflowLinkOptions(){MaxDegreeOfParallelism = 2} var midPriority = new ActionBlock(dt => { Thread.Sleep(TimeSpan.FromMilliseconds(250)); Console.ForegroundColor = ConsoleColor.Green; Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}"); }, midPriOptions); var lowPriority = new ActionBlock(dt => { Thread.Sleep(TimeSpan.FromMilliseconds(250)); Console.ForegroundColor = ConsoleColor.Green; Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}"); }); var proc = new BufferBlock(); proc.LinkTo(highPriority, dt => dt.Priority == Priority.High); proc.LinkTo(midPriority, dt => dt.Priority == Priority.Medium); proc.LinkTo(lowPriority, dt => dt.Priority == Priority.Low); tokens.Subscribe(dt => proc.Post(dt)); 

这些样本绝不是完整的,但至少应该给你这个想法。

好吧,所以我用一个普通的lock来访问SortedSet ,然后增加了消费者的数量,它似乎工作正常,所以尽管我还没有能够提出完整的RX或拆分RX / TPL DataFlow解决方案,这现在做我想要的,所以我只会显示我在原始问题中更新后所做的更改并将其留在那里。

 var set = new SortedSet(); var locker = new object(); var tokens = bazTokens .Merge(barTokens) .Merge(fooTokens) .Do(dt => Display(dt, ConsoleColor.Red)); tokens.Subscribe(dt => { lock (locker) { set.Add(dt); } }); for (var i = 0; i < Environment.ProcessorCount; i++) { Task.Run(() => { while (!source.IsCancellationRequested) { Token dt; lock (locker) { dt = set.FirstOrDefault(); } if (dt == null) { continue; } bool removed; lock (locker) { removed = set.Remove(dt); } if (removed) { Display(dt, ConsoleColor.Green, 750); } } }, source.Token); } 

感谢发布解决方案的人,感谢您花费的时间。

我认为这里的难题是你真正追求的是拉模型的结果,基于快速,热,推动源。 您似乎想要的是尚未收到的“最高”优先级,但问题是“收到了什么?” 如果你有多个订阅者,以不同的速度运行,他们每个人都可以拥有自己对“最高”的看法。

所以我看待它的方式是你想要将源合并到一种被动的,优先级(排序的)队列中,当观察者准备就绪时从中拉出结果。

我通过使用一个信号回到缓冲区接近说“我的一个观察者现在已经准备好看到优先列表的状态”。 这是通过使用接收可观察关闭信号的缓冲区重载来实现的。 该缓冲区包含收到的新元素列表,我只是将其合并到最后一个列表中,“最高”。

代码只是针对这个问题的演示代码 – 可能存在错误:

  using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Reactive.Concurrency; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Text; using System.Threading; using System.Threading.Tasks; namespace RxTests { class Program { static void Main(string[] args) { var p = new Program(); p.TestPrioritisedBuffer(); Console.ReadKey(); } void TestPrioritisedBuffer() { var source1 = Observable.Interval(TimeSpan.FromSeconds(1)).Do((source) => Console.WriteLine("Source1:"+source)); var source2 = Observable.Interval(TimeSpan.FromSeconds(5)).Scan((x,y)=>(x+100)).Do((source) => Console.WriteLine("Source2:" + source)); ; BehaviorSubject closingSelector = new BehaviorSubject(true); var m = Observable.Merge(source1, source2). Buffer(closingSelector). Select(s => new { list =s.ToList(), max=(long)0 }). Scan((x, y) => { var list = x.list.Union(y.list).OrderBy(k=>k); var max = list.LastOrDefault(); var res = new { list = list.Take(list.Count()-1).ToList(), max= max }; return res; } ). Do((sorted) => Console.WriteLine("Sorted max:" + sorted.max + ". Priority queue length:" + sorted.list.Count)). ObserveOn(Scheduler.Default); //observe on other thread m.Subscribe((v)=> { Console.WriteLine("Observed: "+v.max); Thread.Sleep(3000); closingSelector.OnNext(true); }) ; } } }