Tag: task parallel library

为什么迭代GetConsumingEnumerable()不会完全清空底层的阻塞集合

在尝试创建简单管道时,使用任务并行库, BlockingCollection , ConcurrentQueue和GetConsumingEnumerable ,我有一个可量化且可重复的问题。 简而言之,将条目添加到一个线程的默认BlockingCollection (在引擎盖下依赖于ConcurrentQueue )并不能保证它们将从另一个调用该线程的线程中弹出BlockingCollection 。 GetConsumingEnumerable()方法。 我创建了一个非常简单的Winforms应用程序来重现/模拟它,它只是将整数打印到屏幕上。 Timer1负责排队工作项……它使用一个名为_tracker的并发字典,以便它知道它已经添加到阻塞集合中的内容。 Timer2只记录BlockingCollection和_tracker的计数状态 START按钮启动Paralell.ForEach ,它只是遍历阻塞集合GetConsumingEnumerable()并开始将它们打印到第二个列表框。 STOP按钮停止Timer1防止更多条目被添加到阻塞集合中。 public partial class Form1 : Form { private int Counter = 0; private BlockingCollection _entries; private ConcurrentDictionary _tracker; private CancellationTokenSource _tokenSource; private TaskFactory _factory; public Form1() { _entries = new BlockingCollection(); _tracker = new ConcurrentDictionary(); _tokenSource = new CancellationTokenSource(); […]

使用线程处理队列的最有效方法

我有一个队列,其上放置了待处理的傅立叶变换请求(相对耗时的操作) – 在某些情况下,我们每秒可以获得数千个变换请求,因此它必须快速。 我正在升级旧代码以使用.net 4,以及移植到TPL。 我想知道处理这个队列的最有效(最快的吞吐量)方式是什么样的。 我想使用所有可用的核心。 目前我正在尝试使用BlockingCollection。 我创建了一个队列处理程序类,它产生了4个任务,这些任务阻塞了BlockingCollection并等待传入​​的工作。 然后他们处理该挂起的转换。 码: public class IncomingPacketQueue : IDisposable { BlockingCollection _packetQ = new BlockingCollection(); public IncomingPacketQueue(int workerCount) { for (int i = 0; i < workerCount; i++) { Task.Factory.StartNew(Consume); } } public void EnqueueSweep(IncomingPacket incoming) { _packetQ.Add(incoming); } private void Consume() { foreach (var sweep in _packetQ.GetConsumingEnumerable()) […]

Parallel ForEach的本地初始化如何工作?

我不确定在Parallel.ForEach中使用本地init函数,如msdn文章中所述: http : //msdn.microsoft.com/en-us/library/dd997393.aspx Parallel.ForEach(nums, // source collection () => 0, // method to initialize the local variable (j, loop, subtotal) => // method invoked by the loop on each iteration { subtotal += nums[j]; //modify local variable return subtotal; // value to be passed to next iteration },… ()=> 0如何初始化任何东西? 变量的名称是什么以及如何在循环逻辑中使用它?

捕获AggregateException

我试图抛出并捕获AggregateException。 我没有在C#上使用exception,但我发现的行为有点令人惊讶。 我的代码是: var numbers = Enumerable.Range(0, 20); try { var parallelResult = numbers.AsParallel() .Where(i => IsEven(i)); parallelResult.ForAll(e => Console.WriteLine(e)); } catch (AggregateException e) { Console.WriteLine(“There was {0} exceptions”, e.InnerExceptions.Count()); } 它正在调用函数IsEven private static bool IsEven(int i) { if (i % 10 == 0) throw new AggregateException(“i”); return i % 2 == 0; } 抛出AggregateException。 […]

如何使用.NET 4中的任务并行库链接异步操作?

我正在尝试以编程方式将C#4中的异步操作链接起来,例如写入给定的Stream对象。 我最初是“手动”执行此操作,将回调从一个操作挂钩到下一个操作,但我想我会尝试使用.NET 4任务并行库来省去重新发明并发轮的麻烦。 首先,我将异步调用包装在Tasks中,如下所示: public static Task CreateWriteTask(Stream stream, byte[] data) { return Task.Factory.FromAsync(stream.BeginWrite, stream.EndWrite, data, 0, data.Length, null); } Continuations使链接同步操作变得非常容易(如果您原谅不幸的方法名称): public static Task ChainFlush(Stream stream, Task precedingTask) { return precedingTask.ContinueWith(x => stream.Flush()); } 但是没有Task.ContinueWith方法的版本以与Task.ContinueWith相同的方式接受异步操作。 所以,假设我坚持使用TPL,我正在寻找这种方法的正确实现: public static Task ChainWrite(Stream stream, byte[] data, Task precedingTask) { //? }

队列或等待列表中的C#异步任务

我有一个像这样的异步任务: public async Task DoWork() { } 我现在有一个: List tmp = new List(); 我在哪里添加任务。 我开始这样的任务: foreach (Task t in tmp) { await t; } 现在我的问题: 什么是启动任务的最佳方式,同时只运行其中3个(直到其他人在等待)? 我想我需要像队列/等待名单一样的东西? 在队列启动后还应该可以添加更多任务。 我使用的是.NET 4.5。 谢谢你的任何建议

如何使两个SQL查询真正异步

我的问题是基于一个真正的项目问题,但我从来没有使用过System.Threading.Tasks库或执行任何涉及线程的严肃编程,所以我的问题可能是缺乏关于特定库的知识以及对异步的更普遍的误解真的意味着在编程方面。 所以我的真实情况就是这个 – 我需要获取有关用户的数据。 在我目前的情况下,它是财务数据,所以假设我需要特定用户的所有Accounts ,所有Deposits和所有Consignations 。 在我的情况下,这意味着查询每个属性的数百万条记录,并且每个查询本身相对较慢,但是获取Accounts比获取Deposits慢几倍。 所以我为我要使用的三种银行产品定义了三个类,当我想获取某些用户的所有银行产品的数据时,我做了类似这样的事情: List accounts = GetAccountsForClient(int clientId); List deposits = GetDepositsForClient(int clientId); List consignations = GetConsignationsForClient(int clientId); 所以问题从这里开始我需要同时获取所有这三个列表,因为我要将它们传递给我显示所有用户数据的视图。 但正如现在一样,执行是同步的(如果我在这里正确使用这个术语),那么收集所有三种产品的数据的总时间是: Total_Time = Time_To_Get_Accounts + Time_To_Get_Deposits + Time_To_Get_Consignations 这不好,因为每个查询都比较慢,所以总时间很长,而且, accounts查询比其他两个查询花费的时间要多得多,所以今天进入我脑海的想法是 – “如果我能做什么同时执行此查询“。 也许这是我对这个主题最大的误解,但对我来说,最接近这个想法是让它们异步,所以也许Total_Time不会是最慢查询的时间,但是会比所有三个查询的总和快得多。 由于我的代码很复杂,我创建了一个简单的用例,我认为,反映了我想要做的很好。 我有两种方法: public static async Task GetAccounts() { int total1 = 0; using (SqlConnection connection = new […]

在数据流网络中使用BufferBlock 的好处

我想知道使用链接到一个或多个ActionBlocks的BufferBlock是否有好处,除了限制(使用BoundedCapacity),而不是直接发布到ActionBlock(只要不需要限制)。

在TryReceiveAll之后使用OutputAvailableAsync的BufferBlock死锁

在处理这个问题 的答案时 ,我写了这个片段: var buffer = new BufferBlock(); var producer = Task.Run(async () => { while (true) { await Task.Delay(TimeSpan.FromMilliseconds(100)); buffer.Post(null); Console.WriteLine(“Post ” + buffer.Count); } }); var consumer = Task.Run(async () => { while (await buffer.OutputAvailableAsync()) { IList items; buffer.TryReceiveAll(out items); Console.WriteLine(“TryReceiveAll ” + buffer.Count); } }); await Task.WhenAll(consumer, producer); 生产者应该每100毫秒将项目发布到缓冲区,并且消费者应该清除缓冲区中的所有项目并异步地等待更多项目显示。 实际发生的是生产者清除所有项目一次,然后再也不会超出OutputAvailableAsync 。 如果我切换消费者逐个删除项目,它将作为例外工作: […]

如何使用C#4.0编写可伸缩的套接字服务器?

我想编写一个简单的套接字服务器,但是我希望它可以垂直扩展,例如,不是每个连接创建一个线程,也不是很长时间运行的任务,这可能会占用所有线程。 服务器接收包含查询的请求并流式传输任意大的结果。 我希望使用C#4中提供的技术和库来实现这一点的惯用方法,重点是简单的代码,而不是原始性能。 重新打开套接字服务器是可伸缩系统的有用部分。 如果要水平缩放,则有不同的技术。 如果您从未创建套接字服务器,则应该无法回答此问题。