同时运行x个Web请求

我们公司有一个Web服务,我想通过我自己的C#中的HTTPWebRequest客户端发送XML文件(存储在我的驱动器上)。 这已经有效了。 Web服务同时支持5个同步请求(一旦服务器上的处理完成,我就从Web服务获得响应)。 每个请求处理大约需要5分钟。

投掷太多请求(> 5)会导致我的客户超时。 此外,这可能导致服务器端的错误和不连贯的数据。 在服务器端进行更改不是一个选项(来自不同的供应商)。

现在,我的Webrequest客户端将发送XML并使用Webrequest等待响应result.AsyncWaitHandle.WaitOne();

但是,这样,虽然Web服务支持5,但是一次只能处理一个请求。我尝试使用BackgroundworkerThreadpool但是它们同时创建了太多请求,这使得它们对我没用。 有什么建议,怎么可以解决这个问题? 用5个线程创建我自己的Threadpool ? 有什么建议,如何实现呢?

简单的方法是创建5个线程(除了:这是一个奇数!),它使用BlockingCollection的xml文件。

就像是:

 var bc = new BlockingCollection(); for ( int i = 0 ; i < 5 ; i++ ) { new Thread( () => { foreach ( var xml in bc.GetConsumingEnumerable() ) { // do work } } ).Start(); } bc.Add( xml_1 ); bc.Add( xml_2 ); ... bc.CompleteAdding(); // threads will end when queue is exhausted 

如果您使用的是.Net 4,这看起来非常适合Parallel.ForEach() 。 您可以设置其MaxDegreeOfParallelism ,这意味着您可以保证一次不再处理任何项目。

 Parallel.ForEach(items, new ParallelOptions { MaxDegreeOfParallelism = 5 }, ProcessItem); 

这里, ProcessItem是一种通过访问您的服务器并阻塞直到处理完成来处理一个项目的方法。 如果你愿意,你可以使用lambda代替。

创建自己的五个线程的线程池并不棘手 – 只需创建描述请求的并发对象队列,并有五个线程循环执行任务。 添加AutoResetEvent,您可以确保在没有需要处理的请求时它们不会疯狂地旋转。

将响应返回到正确的调用线程可能会很棘手。 如果你的代码的其余部分是如何工作的,那么我会采用不同的方法创建一个限制器,它有点像监视器,但允许5个同时线程而不是只有一个:

 private static class RequestLimiter { private static AutoResetEvent _are = new AutoResetEvent(false); private static int _reqCnt = 0; public ResponseObject DoRequest(RequestObject req) { for(;;) { if(Interlocked.Increment(ref _reqCnt) <= 5) { //code to create response object "resp". Interlocked.Decrement(ref _reqCnt); _are.Set(); return resp; } else { if(Interlocked.Decrement(ref _reqCnt) >= 5)//test so we don't end up waiting due to race on decrementing from finished thread. _are.WaitOne(); } } } } 

您可以编写一个小帮助器方法,它会阻塞当前线程,直到所有线程完成执行给定的操作委托。

 static void SpawnThreads(int count, Action action) { var countdown = new CountdownEvent(count); for (int i = 0; i < count; i++) { new Thread(() => { action(); countdown.Signal(); }).Start(); } countdown.Wait(); } 

然后使用BlockingCollection (线程安全集合)来跟踪您的xml文件。 通过使用上面的帮助方法,您可以编写如下内容:

 static void Main(string[] args) { var xmlFiles = new BlockingCollection(); // Add some xml files.... SpawnThreads(5, () => { using (var web = new WebClient()) { web.UploadFile(xmlFiles.Take()); } }); Console.WriteLine("Done"); Console.ReadKey(); } 

更新

更好的方法是上传文件异步,这样就不会浪费资源来为IO任务使用线程。

你可以再写一个辅助方法:

 static void SpawnAsyncs(int count, Action action) { var countdown = new CountdownEvent(count); for (int i = 0; i < count; i++) { action(countdown); } countdown.Wait(); } 

并使用它像:

 static void Main(string[] args) { var urlXML = new BlockingCollection>(); urlXML.Add(Tuple.Create("http://someurl.com", "filename")); // Add some more to collection... SpawnAsyncs(5, c => { using (var web = new WebClient()) { var current = urlXML.Take(); web.UploadFileCompleted += (s, e) => { // some code to mess with e.Result (response) c.Signal(); }; web.UploadFileAsyncAsync(new Uri(current.Item1), current.Item2); } }); Console.WriteLine("Done"); Console.ReadKey(); }