了解多任务的异步/等待模式

目前,我很难理解使用异步和等待模式的多任务处理。 为了测试一个案例,我编写了以下代码来理解一些基础知识;

public partial class MainWindow : Window { public MainWindow() { InitializeComponent(); } private int global_int = 10; public async Task RunAsyncTask() { // This method runs asynchronously. await Task.Run(() => Calculate()); return global_int; } private int Calculate() { Console.WriteLine("Ticket count: " + --global_int); return global_int; } private async void Start_Button_Click(object sender, RoutedEventArgs e) { List<Task> list = new List<Task>(); Console.WriteLine("\nReseting: " ); global_int = 10; for (int i = 0; i < 10; i++) { var task = RunAsyncTask(); list.Add(task); } await Task.WhenAll(list.ToArray<Task>()); Console.WriteLine("\nFinished: " + global_int); } } 

理念/目标:

10个客户,10个门票,每个客户购买一张票,最后将没有可用的票。

问题:

当我运行代码时,我实际上并不总是得到相同的结果(总是期待0票)。 实际问题在哪里?

那么,我怎样才能以一种结果编写代码,结果总是相同的。

输出1:

 Reseting: Ticket count: 9 Ticket count: 8 Ticket count: 8 Ticket count: 7 Ticket count: 5 Ticket count: 6 Ticket count: 4 Ticket count: 3 Ticket count: 2 Ticket count: 1 Finished: 1 

输出2:

 Reseting: Ticket count: 9 Ticket count: 8 Ticket count: 7 Ticket count: 6 Ticket count: 5 Ticket count: 4 Ticket count: 3 Ticket count: 2 Ticket count: 1 Ticket count: 0 Finished: 0 

如果您想了解如何在仍然使用异步模式的同时以单线程方式调度任务,您可能会对此代码感兴趣。

 class Program { static void Main(string[] args) { InitiateCalculations().Wait(); Console.WriteLine("Finished: {0}", global_int); } // LimitedConcurrencyLevelTaskScheduler from // https://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler // Provides a task scheduler that ensures a maximum concurrency level while // running on top of the thread pool. public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler { public static TaskFactory SingleFactory { get; private set; } static LimitedConcurrencyLevelTaskScheduler() { SingleFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(1)); } // Indicates whether the current thread is processing work items. [ThreadStatic] private static bool _currentThreadIsProcessingItems; // The list of tasks to be executed private readonly LinkedList _tasks = new LinkedList(); // protected by lock(_tasks) // The maximum concurrency level allowed by this scheduler. private readonly int _maxDegreeOfParallelism; // Indicates whether the scheduler is currently processing work items. private int _delegatesQueuedOrRunning = 0; // Creates a new instance with the specified degree of parallelism. public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism) { if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism"); _maxDegreeOfParallelism = maxDegreeOfParallelism; } // Queues a task to the scheduler. protected sealed override void QueueTask(Task task) { // Add the task to the list of tasks to be processed. If there aren't enough // delegates currently queued or running to process tasks, schedule another. lock (_tasks) { _tasks.AddLast(task); if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism) { ++_delegatesQueuedOrRunning; NotifyThreadPoolOfPendingWork(); } } } // Inform the ThreadPool that there's work to be executed for this scheduler. private void NotifyThreadPoolOfPendingWork() { ThreadPool.UnsafeQueueUserWorkItem(_ => { // Note that the current thread is now processing work items. // This is necessary to enable inlining of tasks into this thread. _currentThreadIsProcessingItems = true; try { // Process all available items in the queue. while (true) { Task item; lock (_tasks) { // When there are no more items to be processed, // note that we're done processing, and get out. if (_tasks.Count == 0) { --_delegatesQueuedOrRunning; break; } // Get the next item from the queue item = _tasks.First.Value; _tasks.RemoveFirst(); } // Execute the task we pulled out of the queue base.TryExecuteTask(item); } } // We're done processing items on the current thread finally { _currentThreadIsProcessingItems = false; } }, null); } // Attempts to execute the specified task on the current thread. protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { // If this thread isn't already processing a task, we don't support inlining if (!_currentThreadIsProcessingItems) return false; // If the task was previously queued, remove it from the queue if (taskWasPreviouslyQueued) // Try to run the task. if (TryDequeue(task)) return base.TryExecuteTask(task); else return false; else return base.TryExecuteTask(task); } // Attempt to remove a previously scheduled task from the scheduler. protected sealed override bool TryDequeue(Task task) { lock (_tasks) return _tasks.Remove(task); } // Gets the maximum concurrency level supported by this scheduler. public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } } // Gets an enumerable of the tasks currently scheduled on this scheduler. protected sealed override IEnumerable GetScheduledTasks() { bool lockTaken = false; try { Monitor.TryEnter(_tasks, ref lockTaken); if (lockTaken) return _tasks; else throw new NotSupportedException(); } finally { if (lockTaken) Monitor.Exit(_tasks); } } } static private int global_int = 10; public static Task RunAsyncTask() { return LimitedConcurrencyLevelTaskScheduler.SingleFactory.StartNew(Calculate); } private static int Calculate() { Thread.Sleep(500); Console.WriteLine("Ticket count: {0} Thread: {1}", --global_int, Thread.CurrentThread.ManagedThreadId); return global_int; } private static async Task InitiateCalculations() { List> list = new List>(); Console.WriteLine("\nReseting: "); global_int = 10; for (int i = 0; i < 10; i++) { var task = RunAsyncTask(); list.Add(task); } await Task.WhenAll(list.ToArray>()); } } 
 --global_int 

这不是一个线程安全的操作。 多个线程正在读取和写入global_int ,从而导致竞争条件。 有一个方便的类叫做Interlocked来保持简单的int操作primefaces,将你的Calculate方法改为:

 Console.WriteLine("Ticket count: " + Interlocked.Decrement(ref global_int);