缓存异步操作

我正在寻找一种缓存异步操作结果的优雅方法。

我首先有一个像这样的同步方法:

public String GetStuff(String url) { WebRequest request = WebRequest.Create(url); using (var response = request.GetResponse()) using (var sr = new StreamReader(response.GetResponseStream())) return sr.ReadToEnd(); } 

然后我让它异步:

 public async Task GetStuffAsync(String url) { WebRequest request = WebRequest.Create(url); using (var response = await request.GetResponseAsync()) using (var sr = new StreamReader(response.GetResponseStream())) return await sr.ReadToEndAsync(); } 

然后我决定我应该缓存结果,所以我不需要经常在外面查询:

 ConcurrentDictionary _cache = new ConcurrentDictionary(); public async Task GetStuffAsync(String url) { return _cache.GetOrAdd(url, await GetStuffInternalAsync(url)); } private async Task GetStuffInternalAsync(String url) { WebRequest request = WebRequest.Create(url); using (var response = await request.GetResponseAsync()) using (var sr = new StreamReader(response.GetResponseStream())) return await sr.ReadToEndAsync(); } 

然后我读了一篇关于缓存Task如何更好的文章(观看video),因为创建它们很昂贵:

 ConcurrentDictionary<String, Task> _cache = new ConcurrentDictionary<String, Task>(); public Task GetStuffAsync(String url) { return _cache.GetOrAdd(url, GetStuffInternalAsync(url)); } private async Task GetStuffInternalAsync(String url) { WebRequest request = WebRequest.Create(url); using (var response = await request.GetResponseAsync()) using (var sr = new StreamReader(response.GetResponseStream())) return await sr.ReadToEndAsync(); } 

现在的问题是,如果请求失败(例如:HTTP 401),缓存将包含一个失败的Task ,我将不得不重置应用程序,因为无法重新发送请求。

是否有一种优雅的方式使用ConcurrentDictionary只缓存成功的任务并仍然具有primefaces行为?

首先,你的两种方法都是错误的,因为它们不会为你节省任何请求(尽管第二种方法至少可以节省你的时间)。

您的第一个代码( await代码)执行此操作:

  1. 提出要求。
  2. 等待请求完成。
  3. 如果缓存中已存在结果,请忽略请求的结果。

你的第二个代码删除了第2步,因此速度更快,但你仍然在做很多不必要的请求。

你应该做的是使用带有委托的GetOrAdd()的重载 :

 public Task GetStuffAsync(String url) { return _cache.GetOrAdd(url, GetStuffInternalAsync); } 

这并不能完全消除被忽略的请求的可能性,但确实使它们的可能性降低了。 (为此,你可以尝试取消你知道被忽略的请求,但我不认为这是值得的。)


现在问你的实际问题。 我认为你应该做的是使用AddOrUpdate()方法 。 如果该值尚未存在,请添加它。 如果它在那里,如果它出现故障则更换它:

 public Task GetStuffAsync(String url) { return _cache.AddOrUpdate( url, GetStuffInternalAsync, (u, task) => { if (task.IsCanceled || task.IsFaulted) return GetStuffInternalAsync(u); return task; }); } 

将这些失败的任务保留为负缓存实际上是合理的(并且取决于您的设计和性能,至关重要)。 否则,如果一个url总是失败,那么一次又一次地使用它会破坏使用缓存的意义。

您需要的是一种不时清除缓存的方法。 最简单的方法是使用一个替换ConcurrentDictionarry实例的计时器。 更强大的解决方案是构建您自己的LruDictionary或类似的东西。

我已经为MemoryCache创建了一个包装器,它基本上缓存了Lazy>对象,并且可以解决以下所有问题:

  • 不会启动并行或不必要的操作来获取值。 多个呼叫站点或线程可以等待来自缓存的相同值。
  • 失败的任务不会被缓存。 (没有负面缓存。)
  • 缓存用户无法从缓存中获取无效结果,即使该值在await期间无效也是如此。

我的博客中进一步解释了该解决方案, GitHub上提供了完整的工作代码。

这是一种缓存异步操作结果的方法,可以保证没有缓存未命中。

在接受的答案中,如果在循环中多次请求相同的URL(取决于SynchronizationContext),或者从多个线程请求Web请求将被发送出去,直到有一个响应被缓存,此时缓存将开始获取用过的。

下面的方法为每个唯一键创建一个SemaphoreSlim对象。 这将防止长时间运行的异步操作对同一个键运行多次,同时允许它同时针对不同的键运行。 显然,保留SemaphoreSlim对象以防止缓存未命中的开销很大,因此根据用例可能不值得。 但是,如果保证没有缓存未命中比这更重要。

 private readonly ConcurrentDictionary _keyLocks = new ConcurrentDictionary(); private readonly ConcurrentDictionary _cache = new ConcurrentDictionary(); public async Task GetSomethingAsync(string key) { string value; // get the semaphore specific to this key var keyLock = _keyLocks.GetOrAdd(key, x => new SemaphoreSlim(1)); await keyLock.WaitAsync(); try { // try to get value from cache if (!_cache.TryGetValue(key, out value)) { // if value isn't cached, get it the long way asynchronously value = await GetSomethingTheLongWayAsync(); // cache value _cache.TryAdd(key, value); } } finally { keyLock.Release(); } return value; } 

编辑:正如评论中提到的@mtkachenko,可以在此方法的开头执行额外的缓存检查,以潜在地跳过锁获取步骤。

这项工作对我来说:

 ObjectCache _cache = MemoryCache.Default; static object _lockObject = new object(); public Task GetAsync(string cacheKey, Func> func, TimeSpan? cacheExpiration = null) where T : class { var task = (T)_cache[cacheKey]; if (task != null) return task; lock (_lockObject) { task = (T)_cache[cacheKey](cacheKey); if (task != null) return task; task = func(); Set(cacheKey, task, cacheExpiration); task.ContinueWith(t => { if (t.Status != TaskStatus.RanToCompletion) _cache.Remove(cacheKey); }); } return task; } 

另一个简单的方法是将Lazy扩展为AsyncLazy ,如下所示:

 public class AsyncLazy : Lazy> { public AsyncLazy(Func> taskFactory, LazyThreadSafetyMode mode) : base(() => Task.Factory.StartNew(() => taskFactory()).Unwrap(), mode) { } public TaskAwaiter GetAwaiter() { return Value.GetAwaiter(); } } 

然后你可以这样做:

 private readonly ConcurrentDictionary> _cache = new ConcurrentDictionary>(); public async Task GetStuffAsync(string url) { return await _cache.GetOrAdd(url, new AsyncLazy( () => GetStuffInternalAsync(url), LazyThreadSafetyMode.ExecutionAndPublication)); }