c#.net 4.5 async / multithread?

我正在编写一个从网页上抓取数据的C#控制台应用程序。

此应用程序将访问大约8000个网页并刮取数据(每页上的数据格式相同)。

我现在没有异步方法,也没有multithreading。

但是,我需要它更快。 它只使用了大约3%-6%的CPU,我想是因为它花时间等待下载html。(WebClient.DownloadString(url))

这是我程序的基本流程

DataSet alldata; foreach(var url in the8000urls) { // ScrapeData downloads the html from the url with WebClient.DownloadString // and scrapes the data into several datatables which it returns as a dataset. DataSet dataForOnePage = ScrapeData(url); //merge each table in dataForOnePage into allData } // PushAllDataToSql(alldata); 

我一直试图multithreading,但不知道如何正确开始。 我正在使用.net 4.5并且我的理解是异步并且在4.5中等待使得这更容易编程但我仍然有点迷失。

我的想法是继续制作这条线异步的新线程

 DataSet dataForOnePage = ScrapeData(url); 

然后每个人完成,跑

 //merge each table in dataForOnePage into allData 

任何人都可以指出我正确的方向如何在.net 4.5 c#中使该行异步,然后我的合并方法运行完成?

谢谢。

编辑:这是我的ScrapeData方法:

 public static DataSet GetProperyData(CookieAwareWebClient webClient, string pageid) { var dsPageData = new DataSet(); // DOWNLOAD HTML FOR THE REO PAGE AND LOAD IT INTO AN HTMLDOCUMENT string url = @"https://domain.com?&id=" + pageid + @"restofurl"; string html = webClient.DownloadString(url); var doc = new HtmlDocument(); doc.LoadHtml(html ); // A BUNCH OF PARSING WITH HTMLAGILITY AND STORING IN dsPageData return dsPageData ; } 

如果你想使用asyncawait关键字(虽然你没有,但它们确实在.NET 4.5中更容易),你首先想要改变你的ScrapeData方法以返回一个Task实例使用async关键字,如下:

 async Task ScrapeDataAsync(Uri url) { // Create the HttpClientHandler which will handle cookies. var handler = new HttpClientHandler(); // Set cookies on handler. // Await on an async call to fetch here, convert to a data // set and return. var client = new HttpClient(handler); // Wait for the HttpResponseMessage. HttpResponseMessage response = await client.GetAsync(url); // Get the content, await on the string content. string content = await response.Content.ReadAsStringAsync(); // Process content variable here into a data set and return. DataSet ds = ...; // Return the DataSet, it will return Task. return ds; } 

请注意,您可能希望远离WebClient类,因为它在异步操作中不支持Task 。 在.NET 4.5中更好的选择是HttpClient类 。 我选择使用上面的HttpClient 。 另外,请查看HttpClientHandler类 ,特别是CookieContainer属性 ,您将使用该属性向每个请求发送cookie。

但是,这意味着您很可能不得不使用await关键字等待另一个异步操作,在这种情况下,很可能是下载页面。 您必须定制下载数据的调用以使用异步版本并await这些调用。

一旦完成,你通常会调用await ,但是在这种情况下你不能这样做,因为你会await一个变量。 在这种情况下,您正在运行循环,因此每次迭代都会重置变量。 在这种情况下,最好将Task存储在一个数组中,如下所示:

 DataSet alldata = ...; var tasks = new List>(); foreach(var url in the8000urls) { // ScrapeData downloads the html from the url with // WebClient.DownloadString // and scrapes the data into several datatables which // it returns as a dataset. tasks.Add(ScrapeDataAsync(url)); } 

将数据合并到allData 。 为此,您希望在返回的Task实例上调用ContinueWith方法 ,并执行将数据添加到allData的任务:

 DataSet alldata = ...; var tasks = new List>(); foreach(var url in the8000urls) { // ScrapeData downloads the html from the url with // WebClient.DownloadString // and scrapes the data into several datatables which // it returns as a dataset. tasks.Add(ScrapeDataAsync(url).ContinueWith(t => { // Lock access to the data set, since this is // async now. lock (allData) { // Add the data. } }); } 

然后,您可以使用Task类上的WhenAll方法等待所有任务,并await

 // After your loop. await Task.WhenAll(tasks); // Process allData 

但是,请注意您有一个foreach ,而WhenAll采用IEnumerable实现。 这是一个很好的指标,它适合使用LINQ,它是:

 DataSet alldata; var tasks = from url in the8000Urls select ScrapeDataAsync(url).ContinueWith(t => { // Lock access to the data set, since this is // async now. lock (allData) { // Add the data. } }); await Task.WhenAll(tasks); // Process allData 

如果您愿意,也可以选择不使用查询语法,在这种情况下无关紧要。

请注意,如果包含方法未标记为async (因为您在控制台应用程序中并且必须在应用程序终止之前等待结果),那么您只需在调用WhenAll时调用返回的Task上的Wait方法 :

 // This will block, waiting for all tasks to complete, all // tasks will run asynchronously and when all are done, then the // code will continue to execute. Task.WhenAll(tasks).Wait(); // Process allData. 

也就是说,重点是,您希望将Task实例收集到一个序列中,然后在处理allData之前等待整个序列。

但是,如果可以的话,我建议在将数据合并到allData之前尝试处理数据; 除非数据处理需要整个 DataSet ,否则通过处理返回获得的数据,可以获得更多性能提升,而不是等待所有数据返回。

您还可以使用TPL Dataflow ,它非常适合此类问题。

在这种情况下,您构建一个“数据流网格”,然后您的数据流过它。

这个实际上更像是管道而不是“网格”。 我要分三步:从URL下载(字符串)数据; 将(字符串)数据解析为HTML,然后解析为DataSet ; 并将DataSet合并到主DataSet

首先,我们创建将进入网格的块:

 DataSet allData; var downloadData = new TransformBlock( async pageid => { System.Net.WebClient webClient = null; var url = "https://domain.com?&id=" + pageid + "restofurl"; return await webClient.DownloadStringTaskAsync(url); }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded, }); var parseHtml = new TransformBlock( html => { var dsPageData = new DataSet(); var doc = new HtmlDocument(); doc.LoadHtml(html); // HTML Agility parsing return dsPageData; }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded, }); var merge = new ActionBlock( dataForOnePage => { // merge dataForOnePage into allData }); 

然后我们将三个块链接在一起以创建网格:

 downloadData.LinkTo(parseHtml); parseHtml.LinkTo(merge); 

接下来,我们开始将数据泵入网格:

 foreach (var pageid in the8000urls) downloadData.Post(pageid); 

最后,我们等待网格中的每个步骤完成(这也将干净地传播任何错误):

 downloadData.Complete(); await downloadData.Completion; parseHtml.Complete(); await parseHtml.Completion; merge.Complete(); await merge.Completion; 

TPL Dataflow的优点在于您可以轻松控制每个部分的并行程度。 目前,我已将下载和解析块设置为Unbounded ,但您可能希望限制它们。 合并块使用默认的最大并行度1,因此合并时不需要锁定。

我建议阅读我对async / await合理完整介绍 。

首先,让所有东西都异步,从较低级别的东西开始:

 public static async Task ScrapeDataAsync(string pageid) { CookieAwareWebClient webClient = ...; var dsPageData = new DataSet(); // DOWNLOAD HTML FOR THE REO PAGE AND LOAD IT INTO AN HTMLDOCUMENT string url = @"https://domain.com?&id=" + pageid + @"restofurl"; string html = await webClient.DownloadStringTaskAsync(url).ConfigureAwait(false); var doc = new HtmlDocument(); doc.LoadHtml(html); // A BUNCH OF PARSING WITH HTMLAGILITY AND STORING IN dsPageData return dsPageData; } 

然后您可以按如下方式使用它(使用与LINQ的async ):

 DataSet alldata; var tasks = the8000urls.Select(async url => { var dataForOnePage = await ScrapeDataAsync(url); //merge each table in dataForOnePage into allData }); await Task.WhenAll(tasks); PushAllDataToSql(alldata); 

并使用我的AsyncEx库中的AsyncContext ,因为这是一个控制台应用程序 :

 class Program { static int Main(string[] args) { try { return AsyncContext.Run(() => MainAsync(args)); } catch (Exception ex) { Console.Error.WriteLine(ex); return -1; } } static async Task MainAsync(string[] args) { ... } } 

而已。 无需锁定或延续或任何此类。

我相信你不需要asyncawait这里的东西。 他们可以帮助您在需要将工作转移到非GUI线程的桌面应用程序中。 在我看来,在你的情况下使用Parallel.ForEach方法会更好。 像这样的东西:

  DataSet alldata; var bag = new ConcurrentBag(); Parallel.ForEach(the8000urls, url => { // ScrapeData downloads the html from the url with WebClient.DownloadString // and scrapes the data into several datatables which it returns as a dataset. DataSet dataForOnePage = ScrapeData(url); // Add data for one page to temp bag bag.Add(dataForOnePage); }); //merge each table in dataForOnePage into allData from bag PushAllDataToSql(alldata);