同步线程协同程序

我试图让线程在之前等待对方,以便保持同步。

在我的实际程序中,我有很多IObjectObserved对象(在他们自己的线程上)发送事件,我想保持所有内容同步,所以IObjectListener (在它自己的线程上)可以监听这些对象之一50次然后订阅另一个及时赶上第51届活动。

我还没有那么远,但我认为同步线程是主要问题。 我设法通过使用AutoResetEvent的双向信令来实现这一点。 有没有更好的方法来做到这一点?

 class Program { static EventWaitHandle _ready = new AutoResetEvent(true); static EventWaitHandle _go = new AutoResetEvent(false); static EventWaitHandle _ready1 = new AutoResetEvent(true); static EventWaitHandle _go1 = new AutoResetEvent(false); static EventWaitHandle _ready2 = new AutoResetEvent(true); static EventWaitHandle _go2 = new AutoResetEvent(false); static void Main(string[] args) { new Thread(Waiter).Start(); new Thread(Waiter1).Start(); new Thread(Waiter2).Start(); for (; ; ) { _ready.WaitOne(); _ready1.WaitOne(); _ready2.WaitOne(); Console.WriteLine("new round"); _go.Set(); _go1.Set(); _go2.Set(); } } static void Waiter() { for (; ; ) { _go.WaitOne(); Thread.Sleep(1000); Console.WriteLine("Waiter run"); _ready.Set(); } } static void Waiter1() { for (; ; ) { _go1.WaitOne(); Thread.Sleep(5000); Console.WriteLine("water1 run"); _ready1.Set(); } } static void Waiter2() { for (; ; ) { _go2.WaitOne(); Thread.Sleep(500); Console.WriteLine("water2 run"); _ready2.Set(); } } } 

你可以简化一些事情。

  • 您可以使用一个CountdownEvent而不是等待3个句柄发出信号。 cde.Wait将阻止,直到它被发出3次信号。
  • 您还可以使用SemaphoreSlim释放多个线程,而不是使用3个不同的句柄。 sem.Release(3)将解锁最多3个在sem.Wait()上阻塞的线程。

 static CountdownEvent cde = new CountdownEvent(3); static SemaphoreSlim sem = new SemaphoreSlim(3); static void X() { new Thread(Waiter).Start(); new Thread(Waiter).Start(); new Thread(Waiter).Start(); for (; ; ) { cde.Wait(); Debug.WriteLine("new round"); cde.Reset(3); sem.Release(3); } } static void Waiter() { for (; ; ) { sem.Wait(); Thread.Sleep(1000); Debug.WriteLine("Waiter run"); cde.Signal(); } } 

请注意,现在所有线程都可以重用相同的代码。


编辑

正如评论中所述,一个线程可能会窃取另一个线程。 如果您不希望这种情况发生, Barrier将完成这项工作:

  private static Barrier _barrier; private static void SynchronizeThreeThreads() { _barrier = new Barrier(3, b => Debug.WriteLine("new round")); new Thread(Waiter).Start(); new Thread(Waiter).Start(); new Thread(Waiter).Start(); //let the threads run for 5s Thread.Sleep(5000); } static void Waiter() { while(true) { Debug.WriteLine("Thread {0} done.", Thread.CurrentThread.ManagedThreadId); _barrier.SignalAndWait(); } } 

您将3个参与者添加到屏障中。 到达障碍的第一和第二参与者(即,执行_barrier.SignalAndWait() )将阻止,直到其余参与者也到达。 当所有参与者都已到达障碍时,他们将全部被释放并进入另一轮。

请注意,我将一个lambda传递给了barrier构造函数 – 这就是“post-phase action”,一个将所有参与者到达障碍之后执行的动作, 然后释放它们。

在可能的情况下,你真的不应该阻止协作执行的线程,特别是如果涉及多个线程的话。

下面是使用async/await和custom awaiters的原始逻辑(保留循环)的实现,没有阻塞代码。 在实现这样的协同程序时,自定义awaiters非常方便。

 using System; using System.Threading; using System.Threading.Tasks; class Program { static void Main(string[] args) { new Program().RunAsync().Wait(); } async Task RunAsync() { var ready1 = new CoroutineEvent(initialState: true); var go1 = new CoroutineEvent(initialState: false); var ready2 = new CoroutineEvent(initialState: true); var go2 = new CoroutineEvent(initialState: false); var ready3 = new CoroutineEvent(initialState: true); var go3 = new CoroutineEvent(initialState: false); var waiter1 = Waiter(1, go1, ready1); var waiter2 = Waiter(2, go2, ready2); var waiter3 = Waiter(3, go3, ready3); while (true) { await ready1.WaitAsync(); ready1.Reset(); await ready2.WaitAsync(); ready2.Reset(); await ready3.WaitAsync(); ready2.Reset(); Console.WriteLine("new round"); go1.Set(); go2.Set(); go3.Set(); } } async Task Waiter(int n, CoroutineEvent go, CoroutineEvent ready) { while (true) { await go.WaitAsync(); go.Reset(); await Task.Delay(500).ConfigureAwait(false); Console.WriteLine("Waiter #" + n + " + run, thread: " + Thread.CurrentThread.ManagedThreadId); ready.Set(); } } public class CoroutineEvent { volatile bool _signalled; readonly Awaiter _awaiter; public CoroutineEvent(bool initialState = true) { _signalled = initialState; _awaiter = new Awaiter(this); } public bool IsSignalled { get { return _signalled; } } public void Reset() { _signalled = false; } public void Set() { var wasSignalled = _signalled; _signalled = true; if (!wasSignalled) _awaiter.Continue(); } public Awaiter WaitAsync() { return _awaiter; } public class Awaiter: System.Runtime.CompilerServices.INotifyCompletion { volatile Action _continuation; readonly CoroutineEvent _owner; internal Awaiter(CoroutineEvent owner) { _owner = owner; } static void ScheduleContinuation(Action continuation) { ThreadPool.QueueUserWorkItem((state) => ((Action)state)(), continuation); } public void Continue() { lock (this) { var continuation = _continuation; _continuation = null; if (continuation != null) ScheduleContinuation(continuation); } } // custom Awaiter methods public Awaiter GetAwaiter() { return this; } public bool IsCompleted { get { lock (this) return _owner.IsSignalled; } } public void GetResult() { } // INotifyCompletion public void OnCompleted(Action continuation) { lock (this) { if (_continuation != null) throw new InvalidOperationException(); if (_owner.IsSignalled) ScheduleContinuation(continuation); else _continuation = continuation; } } } } } 

首先,谢谢你们的帮助。 我想我会在这里发布我的最终解决方案以获得完整性。

我使用了Barrier方法,对交替运行(集合)线程进行了一些更改。 此代码交换一个类型线程和两个类型线程之间的执行。

 static void Main(string[] args) { SynchronizeThreeThreads(); Console.ReadKey(); } private static Barrier oneBarrier; private static Barrier twoBarrier; private static EventWaitHandle TwoDone = new AutoResetEvent(false); private static EventWaitHandle OneDone = new AutoResetEvent(false); private static void SynchronizeThreeThreads() { //Barrier hand off to each other (other barrier's threads do work between set and waitone) //Except last time when twoBarrier does not wait for OneDone int runCount = 2; int count = 0; oneBarrier = new Barrier(0, (b) => { Console.WriteLine("one done"); OneDone.Set(); TwoDone.WaitOne(); Console.WriteLine("one starting"); }); twoBarrier = new Barrier(0, (b) => { Console.WriteLine("two done"); TwoDone.Set(); count++; if (count != runCount) { OneDone.WaitOne(); Console.WriteLine("two starting"); } }); //Create tasks sorted into two groups List oneTasks = new List() { new Task(() => One(runCount)), new Task(() => One(runCount)), new Task(() => One(runCount)) }; List twoTasks = new List() { new Task(() => Two(runCount)), new Task(() => TwoAlt(runCount)) }; oneBarrier.AddParticipants(oneTasks.Count); twoBarrier.AddParticipants(twoTasks.Count); //Start Tasks. Ensure oneBarrier does work before twoBarrier oneTasks.ForEach(task => task.Start()); OneDone.WaitOne(); twoTasks.ForEach(task => task.Start()); //Wait for all Tasks to finish oneTasks.ForEach(task => task.Wait()); twoTasks.ForEach(task => task.Wait()); Console.WriteLine("done"); } static void One(int runCount) { for (int i = 0; i <= runCount; i++) { Thread.Sleep(100); Console.WriteLine("One " + i.ToString()); oneBarrier.SignalAndWait(); } } static void Two(int runCount) { for (int i = 0; i <= runCount; i++) { Thread.Sleep(500); Console.WriteLine("Two " + i.ToString()); twoBarrier.SignalAndWait(); } } static void TwoAlt(int runCount) { for (int i = 0; i <= runCount; i++) { Thread.Sleep(10); Console.WriteLine("TwoAlt " + i.ToString()); twoBarrier.SignalAndWait(); } }