Tag: concurrency

TPL Dataflow仅在完成所有源数据块时保证完成

两个完成转换后,如何重新编写代码完成的代码? 我认为完成意味着它被标记为完成并且“出队列”是空的? public Test() { broadCastBlock = new BroadcastBlock(i => { return i; }); transformBlock1 = new TransformBlock(i => { Console.WriteLine(“1 input count: ” + transformBlock1.InputCount); Thread.Sleep(50); return (“1_” + i); }); transformBlock2 = new TransformBlock(i => { Console.WriteLine(“2 input count: ” + transformBlock1.InputCount); Thread.Sleep(20); return (“2_” + i); }); processorBlock = new ActionBlock(i => […]

更新ConcurrentDictionary中的值字段

我试图更新ConcurrentDictionary中的条目,如下所示: class Class1 { public int Counter { get; set; } } class Test { private ConcurrentDictionary dict = new ConcurrentDictionary(); public void TestIt() { foreach (var foo in dict) { foo.Value.Counter = foo.Value.Counter + 1; // Simplified example } } } 基本上我需要遍历字典并更新每个Value上的字段。 我从文档中了解到我需要避免使用Value属性。 相反,我认为我需要使用TryUpdate,除了我不想替换我的整个对象。 相反,我想更新对象上的字段。 在PFX团队博客上阅读此博客条目之后:也许我需要使用AddOrUpdate并且在添加委托中什么都不做。 有没有人对如何做到这一点有任何见解? 我在字典中有成千上万的对象,我需要每30秒左右更新一次。 创建新的以更新属性可能是不可行的。 我需要克隆现有对象,更新它并替换字典中的对象。 我还需要在克隆/添加周期的持续时间内锁定它。 呸。 我想做的是迭代对象并尽可能直接更新Counter属性。 我的最新研究让我看到了Parallel.ForEach,听起来不错,但它不应该用于更新状态的动作。 […]

Reactive Extensions:订阅者中的并发

我试图围绕Reactive Extensions对并发性的支持,并且很难获得我追求的结果。 所以我可能还没有得到它 。 我有一个源,它将数据发送到流中的速度比订阅者可以使用它的速度快。 我更喜欢配置流,以便使用另一个线程从流中为每个新项调用订阅者,以便订阅者同时运行多个线程。 我能够确保订户的线程安全性。 以下示例演示了此问题: Observable.Interval( TimeSpan.FromSeconds(1)) .Do( x => Console.WriteLine(“{0} Thread: {1} Source value: {2}”, DateTime.Now, Thread.CurrentThread.ManagedThreadId, x)) .ObserveOn(NewThreadScheduler.Default) .Subscribe(x => { Console.WriteLine(“{0} Thread: {1} Observed value: {2}”, DateTime.Now, Thread.CurrentThread.ManagedThreadId, x); Thread.Sleep(5000); // Simulate long work time }); 控制台输出如下所示(删除日期): 4:25:20 PM Thread: 6 Source value: 0 4:25:20 PM Thread: 11 Observed […]

NewThreadScheduler.Default计划在同一个线程上的所有工作

我目前正试图用RX .NET来解决并发问题,并对某些事情感到困惑。 我想并行运行四个相对较慢的任务,所以我假设NewThreadScheduler.Default将是要走的路,因为它“代表一个在一个单独的线程上调度每个工作单元的对象”。 。 这是我的设置代码: static void Test() { Console.WriteLine(“Starting. Thread {0}”, Thread.CurrentThread.ManagedThreadId); var query = Enumerable.Range(1, 4); var obsQuery = query.ToObservable(NewThreadScheduler.Default); obsQuery.Subscribe(DoWork, Done); Console.WriteLine(“Last line. Thread {0}”, Thread.CurrentThread.ManagedThreadId); } static void DoWork(int i) { Thread.Sleep(500); Console.WriteLine(“{0} Thread {1}”, i, Thread.CurrentThread.ManagedThreadId); } static void Done() { Console.WriteLine(“Done. Thread {0}”, Thread.CurrentThread.ManagedThreadId); } 我假设“X线程Y”每次都会输出不同的线程ID,但实际输出是: Starting. Thread 1 […]

为什么ConcurrentDictionary.GetOrAdd(key,valueFactory)允许调用valueFactory两次?

我使用并发字典作为线程安全的静态缓存,并注意到以下行为: 来自GetOrAdd上的MSDN文档 : 如果在不同的线程上同时调用GetOrAdd,可能会多次调用addValueFactory,但是对于每次调用,它的键/值对可能不会添加到字典中。 我希望能够保证工厂只被召唤一次。 是否有任何方法可以使用ConcurrentDictionary API执行此操作而无需借助我自己的单独同步(例如锁定valueFactory)? 我的用例是valueFactory在动态模块中生成类型,所以如果同时运行同一个键的两个valueFactories,我点击: System.ArgumentException: Duplicate type name within an assembly.

Singleton仔细检查并发问题

fallowing子句来自jetbrains.net在阅读了这篇以及网上的其他文章之后,我仍然不明白在第一个线程进入锁之后如何返回null。 有人确实理解它可以帮助我并以更人性化的方式解释它吗? “考虑以下代码: public class Foo { private static Foo instance; private static readonly object padlock = new object(); public static Foo Get() { if (instance == null) { lock (padlock) { if (instance == null) { instance = new Foo(); } } } return instance; } }; 给定上述代码,初始化Foo实例的写入可能会延迟,直到写入实例值,从而产生实例返回处于单元化状态的对象的可能性。 为了避免这种情况,必须使实例值变为易失性。 “