如何使用Azure TableClient 2.0 BeginExecuteQuerySegmented

我试图异步获取表中的所有条目,但我无法弄清楚如何使用延续令牌 。 我怀疑我需要采用我的匿名方法并将其转换为委托,然后使用延续令牌递归调用它。

如何获取以下代码并执行异步调用并获取新API中的所有条目?

Task GetAllTableEntries(CloudTable tbl, string[] urls, string name, CancellationToken token) { TableRequestOptions reqOptions = new TableRequestOptions() { }; OperationContext ctx = new OperationContext() { ClientRequestID = "" }; object state = null; // Register Cancelation Token ICancellableAsyncResult result = null; TableQuery qry = new TableQuery(); TableContinuationToken tok = null; result = tbl.BeginExecuteQuerySegmented(qry, tok, reqOptions, ctx, (o) => { var response = (o.AsyncState as CloudTable).EndExecuteQuerySegmented(o); Console.WriteLine("Found " + response.Results.Count + " records"); // The following code was used in the previous version of the SDK // //26: // add first segment of data //27: pageData.CompletedList.AddRange( //28: from wu in response.Results //29: select new CompletedWorkUnit(wu)); //30: //31: // continue fetching segments to complete page //32: while (response.HasMoreResults) //33: { //34: response = response.GetNext(); //35: pageData.CompletedList.AddRange( //36: from wu in response.Results //37: select new CompletedWorkUnit(wu)); //38: } //39: //40: // set continuation token for next page request //41: pageData.ContinuationToken = response.ContinuationToken; //42: evt.Set(); }, state); // Add cancellation token according to guidance from Table Client 2.0 Breaking Changes blog entry token.Register((o) => result.Cancel(), state); 

请试试这个:

  static void ExecuteQuery() { TableContinuationToken token = null; TableRequestOptions reqOptions = new TableRequestOptions() { }; OperationContext ctx = new OperationContext() { ClientRequestID = "" }; long totalEntitiesRetrieved = 0; while (true) { CloudTable table = cloudTableClient.GetTableReference("MyTable"); TableQuery query = (new TableQuery()).Take(100); System.Threading.ManualResetEvent evt = new System.Threading.ManualResetEvent(false); var result = table.BeginExecuteQuerySegmented(query, token, reqOptions, ctx, (o) => { var response = (o.AsyncState as CloudTable).EndExecuteQuerySegmented(o); token = response.ContinuationToken; int recordsRetrieved = response.Count(); totalEntitiesRetrieved += recordsRetrieved; Console.WriteLine("Records retrieved in this attempt = " + recordsRetrieved + " | Total records retrieved = " + totalEntitiesRetrieved); evt.Set(); }, table); evt.WaitOne(); if (token == null) { break; } } } 

我注意到的一件事是,如果我执行一个返回动态表实体的查询,我得到一个与DateTimeOffset相关的错误。 这就是我最终创建一个临时实体的原因。

希望这可以帮助。

这是另一种选择,但这次是在TPL /任务并行库中。 完整源代码可在此处获得

  /* The following overloads of ExecuteQuerySegmentedAsync is executed like this) */ CloudTableClient client = acct.CreateCloudTableClient(); CloudTable tableSymmetricKeys = client.GetTableReference("SymmetricKeys5"); TableContinuationToken token = new TableContinuationToken() { }; TableRequestOptions opt = new TableRequestOptions() { }; OperationContext ctx = new OperationContext() { ClientRequestID = "ID" }; CancellationToken cancelToken = new CancellationToken(); List taskList = new List(); while (true) { Task> task3 = tableSymmetricKeys.ExecuteQuerySegmentedAsync(query, token, opt, ctx, cancelToken); // Run the method task3.Wait(); token = task3.Result.ContinuationToken; Console.WriteLine("Records retrieved in this attempt = " + task3.Result.Count ()); if (token == null) { break; } else { // persist token // token.WriteXml() } } */ // Overload #4 public static Task> ExecuteQuerySegmentedAsync(this CloudTable tbl, TableQuery query, TableContinuationToken continuationToken, TableRequestOptions opt, OperationContext ctx ,CancellationToken token ) { ICancellableAsyncResult result = null; if (opt == null && ctx == null) result = tbl.BeginExecuteQuerySegmented(query, continuationToken, null, tbl); else result = tbl.BeginExecuteQuerySegmented(query, continuationToken, opt, ctx, null, tbl); // Add cancellation token according to guidance from Table Client 2.0 Breaking Changes blog entry var cancellationRegistration = token.Register(result.Cancel); return Task.Factory.FromAsync(result, iAsyncResult => { CloudTable currentTable = iAsyncResult.AsyncState as CloudTable; //cancellationRegistration.Dispose(); return currentTable.EndExecuteQuerySegmented(result); }); } 

这个人为ExecuteQueryAsync做了一些扩展方法,用于包装分段方法

https://github.com/glueckkanja/tasync/blob/master/StorageExtensions.cs

这是@Gaurav Mantri代码的替代实现。

 public class AzureTableQueryState { public CloudTable CloudTable { get; set; } public TableContinuationToken Token { get; set; } public ManualResetEvent Evt { get; set; } //i'm guessing that's what this is //any other variables you were using public int TotalEntitiesRetrieved { get; set; } } // snip.... while (true) { // Initialize variables TableQuery query = (new TableQuery()).Take(100); AzureTableQueryState queryState = new AzureTableQueryState(); queryState.Evt = new System.Threading.ManualResetEvent(false); queryState.TotalEntitiesRetrieved = 0; AsyncCallback asyncCallback = (iAsyncResult) => { AzureTableQueryState state = iAsyncResult.AsyncState as AzureTableQueryState; var response = state.CloudTable.EndExecuteQuerySegmented(iAsyncResult); token = response.ContinuationToken; int recordsRetrieved = response.Results.Count; state.TotalEntitiesRetrieved += recordsRetrieved; Console.WriteLine("Records retrieved in this attempt = " + recordsRetrieved + " | Total records retrieved = " + state.TotalEntitiesRetrieved); state.Evt.Set(); }; // Run the method var result = tableSymmetricKeys.BeginExecuteQuerySegmented(query, token, opt, ctx, asyncCallback, tableSymmetricKeys); // Add cancellation token according to guidance from Table Client 2.0 Breaking Changes blog entry cancelToken.Register((o) => result.Cancel(), null); queryState.Evt.WaitOne(); if (token == null) { break; } else { // persist token // token.WriteXml() } }