什么类型的IProducerConsumerCollection 用于我的任务?

我只有100个传感器,每个“测量”自己的数据。 我只有一个DataSender应该从“传感器”发送信息。 应发送最新信息。

通道的带宽可能小于100个传感器产生的数据。 在这种情况下,可以跳过一些数据 – 但我们应该“大致公平”。 例如,我们可以跳过每个传感器的每秒测量值。

我不知道每个传感器生成数据的频率,但通常它们会经常生成数据。

在我的其他post之后:

  • 如何创建总是在单独的线程中运行的单例?
  • 修改了生产者/消费者的例子,有什么问题吗?

我已经决定我有经典的制片人/消费者问题,有:

  • 100名制片人和
  • 1消费者

我被建议使用BlockingCollectionBlockingCollection的唯一问题 – 一旦添加了项目,就无法替换它。 但是在我的应用程序中,如果传感器产生一个新值,并且Consumer没有处理之前的值,则应该替换该值。

我应该使用ConcurentDictionaryConcurentBag来执行该任务吗?

从概念上讲, 我只需要一个包含100个元素的数组。

传感器#33应将其值替换为数组[33]:

 | Sensor | Value | |--------|-------| | 1 | | | 2 | | | 3 | | /......../......./ | 32 | | | 33 | 101.9 | | 34 | | /......../......./ | 98 | | | 99 | | | 100 | | 

Consumer应该从array[33]获取值,如果不为null,则发送它并将数组[33]设置为null。 Consumer应该对数组中的任何非空值做出反应。

我认为你应该实现自己的IProducerConsumerCollection 。 这就是为什么它是一个界面:这样你就可以轻松制作自己的界面了。

您可以使用DictionaryQueue来确保接收数据是公平的,即如果您只有一个设备可以非常快速地生成数据,那么您将不会仅从此设备发送数据。

 public class DeviceDataQueue : IProducerConsumerCollection> { private readonly object m_lockObject = new object(); private readonly Dictionary m_data = new Dictionary(); private readonly Queue m_queue = new Queue(); //some obviously implemented methods elided, just make sure they are thread-safe public int Count { get { return m_queue.Count; } } public object SyncRoot { get { return m_lockObject; } } public bool IsSynchronized { get { return true; } } public bool TryAdd(Tuple item) { var device = item.Item1; var data = item.Item2; lock (m_lockObject) { if (!m_data.ContainsKey(device)) m_queue.Enqueue(device); m_data[device] = data; } return true; } public bool TryTake(out Tuple item) { lock (m_lockObject) { if (m_queue.Count == 0) { item = null; return false; } var device = m_queue.Dequeue(); var data = m_data[device]; m_data.Remove(device); item = Tuple.Create(device, data); return true; } } } 

沿着这些方向使用时:

 Queue = new BlockingCollection>( new DeviceDataQueue()); Device1 = new Device(1, TimeSpan.FromSeconds(3), Queue); Device2 = new Device(2, TimeSpan.FromSeconds(5), Queue); while (true) { var tuple = Queue.Take(); var device = tuple.Item1; var data = tuple.Item2; Console.WriteLine("{0}: Device {1} produced data at {2}.", DateTime.Now, device.Id, data.Created); Thread.Sleep(TimeSpan.FromSeconds(2)); } 

它产生以下输出:

 30.4.2011 20:40:43: Device 1 produced data at 30.4.2011 20:40:43. 30.4.2011 20:40:45: Device 2 produced data at 30.4.2011 20:40:44. 30.4.2011 20:40:47: Device 1 produced data at 30.4.2011 20:40:47. 30.4.2011 20:40:49: Device 2 produced data at 30.4.2011 20:40:49. 30.4.2011 20:40:51: Device 1 produced data at 30.4.2011 20:40:51. 30.4.2011 20:40:54: Device 2 produced data at 30.4.2011 20:40:54. 

而不是使用其他数据结构,做另一个技巧。 您的集合中的元素无法替换,但您可以存储迷你容器,而不是存储实际值。 如果要替换,实际上是替换容器中的值,而不是替换容器。

 class ElementFromQueue { public object SensorData; } ... ElementFromQueue elem = new ElementFromQueue(); elem.SensorData = new object(); ... queue.Add(elem); //Element is in queue now ... elem.SensorData = new object(); //Update the data, simulating replace 

或者只是创建一个指向传感器编号的索引队列。 弹出一个值时,将从另一个可更新的集合中查询最新的传感器值