c#.net 4.5 async / multithread?
我正在编写一个从网页上抓取数据的C#控制台应用程序。
此应用程序将访问大约8000个网页并刮取数据(每页上的数据格式相同)。
我现在没有异步方法,也没有multithreading。
但是,我需要它更快。 它只使用了大约3%-6%的CPU,我想是因为它花时间等待下载html。(WebClient.DownloadString(url))
这是我程序的基本流程
DataSet alldata; foreach(var url in the8000urls) { // ScrapeData downloads the html from the url with WebClient.DownloadString // and scrapes the data into several datatables which it returns as a dataset. DataSet dataForOnePage = ScrapeData(url); //merge each table in dataForOnePage into allData } // PushAllDataToSql(alldata);
我一直试图multithreading,但不知道如何正确开始。 我正在使用.net 4.5并且我的理解是异步并且在4.5中等待使得这更容易编程但我仍然有点迷失。
我的想法是继续制作这条线异步的新线程
DataSet dataForOnePage = ScrapeData(url);
然后每个人完成,跑
//merge each table in dataForOnePage into allData
任何人都可以指出我正确的方向如何在.net 4.5 c#中使该行异步,然后我的合并方法运行完成?
谢谢。
编辑:这是我的ScrapeData方法:
public static DataSet GetProperyData(CookieAwareWebClient webClient, string pageid) { var dsPageData = new DataSet(); // DOWNLOAD HTML FOR THE REO PAGE AND LOAD IT INTO AN HTMLDOCUMENT string url = @"https://domain.com?&id=" + pageid + @"restofurl"; string html = webClient.DownloadString(url); var doc = new HtmlDocument(); doc.LoadHtml(html ); // A BUNCH OF PARSING WITH HTMLAGILITY AND STORING IN dsPageData return dsPageData ; }
如果你想使用async
和await
关键字(虽然你没有,但它们确实在.NET 4.5中更容易),你首先想要改变你的ScrapeData
方法以返回一个Task
实例使用async
关键字,如下:
async Task ScrapeDataAsync(Uri url) { // Create the HttpClientHandler which will handle cookies. var handler = new HttpClientHandler(); // Set cookies on handler. // Await on an async call to fetch here, convert to a data // set and return. var client = new HttpClient(handler); // Wait for the HttpResponseMessage. HttpResponseMessage response = await client.GetAsync(url); // Get the content, await on the string content. string content = await response.Content.ReadAsStringAsync(); // Process content variable here into a data set and return. DataSet ds = ...; // Return the DataSet, it will return Task . return ds; }
请注意,您可能希望远离WebClient
类,因为它在异步操作中不支持Task
。 在.NET 4.5中更好的选择是HttpClient
类 。 我选择使用上面的HttpClient
。 另外,请查看HttpClientHandler
类 ,特别是CookieContainer
属性 ,您将使用该属性向每个请求发送cookie。
但是,这意味着您很可能不得不使用await
关键字等待另一个异步操作,在这种情况下,很可能是下载页面。 您必须定制下载数据的调用以使用异步版本并await
这些调用。
一旦完成,你通常会调用await
,但是在这种情况下你不能这样做,因为你会await
一个变量。 在这种情况下,您正在运行循环,因此每次迭代都会重置变量。 在这种情况下,最好将Task
存储在一个数组中,如下所示:
DataSet alldata = ...; var tasks = new List>(); foreach(var url in the8000urls) { // ScrapeData downloads the html from the url with // WebClient.DownloadString // and scrapes the data into several datatables which // it returns as a dataset. tasks.Add(ScrapeDataAsync(url)); }
将数据合并到allData
。 为此,您希望在返回的Task
实例上调用ContinueWith
方法 ,并执行将数据添加到allData
的任务:
DataSet alldata = ...; var tasks = new List>(); foreach(var url in the8000urls) { // ScrapeData downloads the html from the url with // WebClient.DownloadString // and scrapes the data into several datatables which // it returns as a dataset. tasks.Add(ScrapeDataAsync(url).ContinueWith(t => { // Lock access to the data set, since this is // async now. lock (allData) { // Add the data. } }); }
然后,您可以使用Task
类上的WhenAll
方法等待所有任务,并await
:
// After your loop. await Task.WhenAll(tasks); // Process allData
但是,请注意您有一个foreach
,而WhenAll
采用IEnumerable
实现。 这是一个很好的指标,它适合使用LINQ,它是:
DataSet alldata; var tasks = from url in the8000Urls select ScrapeDataAsync(url).ContinueWith(t => { // Lock access to the data set, since this is // async now. lock (allData) { // Add the data. } }); await Task.WhenAll(tasks); // Process allData
如果您愿意,也可以选择不使用查询语法,在这种情况下无关紧要。
请注意,如果包含方法未标记为async
(因为您在控制台应用程序中并且必须在应用程序终止之前等待结果),那么您只需在调用WhenAll
时调用返回的Task
上的Wait
方法 :
// This will block, waiting for all tasks to complete, all // tasks will run asynchronously and when all are done, then the // code will continue to execute. Task.WhenAll(tasks).Wait(); // Process allData.
也就是说,重点是,您希望将Task
实例收集到一个序列中,然后在处理allData
之前等待整个序列。
但是,如果可以的话,我建议在将数据合并到allData
之前尝试处理数据; 除非数据处理需要整个 DataSet
,否则通过处理返回时获得的数据,可以获得更多性能提升,而不是等待所有数据返回。
您还可以使用TPL Dataflow ,它非常适合此类问题。
在这种情况下,您构建一个“数据流网格”,然后您的数据流过它。
这个实际上更像是管道而不是“网格”。 我要分三步:从URL下载(字符串)数据; 将(字符串)数据解析为HTML,然后解析为DataSet
; 并将DataSet
合并到主DataSet
。
首先,我们创建将进入网格的块:
DataSet allData; var downloadData = new TransformBlock( async pageid => { System.Net.WebClient webClient = null; var url = "https://domain.com?&id=" + pageid + "restofurl"; return await webClient.DownloadStringTaskAsync(url); }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded, }); var parseHtml = new TransformBlock( html => { var dsPageData = new DataSet(); var doc = new HtmlDocument(); doc.LoadHtml(html); // HTML Agility parsing return dsPageData; }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded, }); var merge = new ActionBlock( dataForOnePage => { // merge dataForOnePage into allData });
然后我们将三个块链接在一起以创建网格:
downloadData.LinkTo(parseHtml); parseHtml.LinkTo(merge);
接下来,我们开始将数据泵入网格:
foreach (var pageid in the8000urls) downloadData.Post(pageid);
最后,我们等待网格中的每个步骤完成(这也将干净地传播任何错误):
downloadData.Complete(); await downloadData.Completion; parseHtml.Complete(); await parseHtml.Completion; merge.Complete(); await merge.Completion;
TPL Dataflow的优点在于您可以轻松控制每个部分的并行程度。 目前,我已将下载和解析块设置为Unbounded
,但您可能希望限制它们。 合并块使用默认的最大并行度1,因此合并时不需要锁定。
我建议阅读我对async
/ await
合理完整介绍 。
首先,让所有东西都异步,从较低级别的东西开始:
public static async Task ScrapeDataAsync(string pageid) { CookieAwareWebClient webClient = ...; var dsPageData = new DataSet(); // DOWNLOAD HTML FOR THE REO PAGE AND LOAD IT INTO AN HTMLDOCUMENT string url = @"https://domain.com?&id=" + pageid + @"restofurl"; string html = await webClient.DownloadStringTaskAsync(url).ConfigureAwait(false); var doc = new HtmlDocument(); doc.LoadHtml(html); // A BUNCH OF PARSING WITH HTMLAGILITY AND STORING IN dsPageData return dsPageData; }
然后您可以按如下方式使用它(使用与LINQ的async
):
DataSet alldata; var tasks = the8000urls.Select(async url => { var dataForOnePage = await ScrapeDataAsync(url); //merge each table in dataForOnePage into allData }); await Task.WhenAll(tasks); PushAllDataToSql(alldata);
并使用我的AsyncEx库中的AsyncContext
,因为这是一个控制台应用程序 :
class Program { static int Main(string[] args) { try { return AsyncContext.Run(() => MainAsync(args)); } catch (Exception ex) { Console.Error.WriteLine(ex); return -1; } } static async Task MainAsync(string[] args) { ... } }
而已。 无需锁定或延续或任何此类。
我相信你不需要async
并await
这里的东西。 他们可以帮助您在需要将工作转移到非GUI线程的桌面应用程序中。 在我看来,在你的情况下使用Parallel.ForEach
方法会更好。 像这样的东西:
DataSet alldata; var bag = new ConcurrentBag(); Parallel.ForEach(the8000urls, url => { // ScrapeData downloads the html from the url with WebClient.DownloadString // and scrapes the data into several datatables which it returns as a dataset. DataSet dataForOnePage = ScrapeData(url); // Add data for one page to temp bag bag.Add(dataForOnePage); }); //merge each table in dataForOnePage into allData from bag PushAllDataToSql(alldata);