是否有IEnumerable实现只迭代它的源(例如LINQ)一次

提供的items是q LINQ表达式的结果:

 var items = from item in ItemsSource.RetrieveItems() where ... 

假设每个项目的生成需要一些不可忽略的时间。

有两种操作模式:

  1. 使用foreach将允许开始使用集合开头的项目,而不是最终可用的项目。 但是,如果我们想稍后再次处理相同的集合,我们将不得不复制保存它:

     var storedItems = new List(); foreach(var item in items){ Process(item); storedItems .Add(item); } // Later foreach(var item in storedItems){ ProcessMore(item); } 

    因为如果我们只是做了foreach(... in items)那么temsSource.RetrieveItems()将再次被调用。

  2. 我们可以在前面使用.ToList() ,但这会迫使我们在开始处理第一个项目之前等待检索最后一个项目。

问题 :是否存在IEnumerable实现,它将像常规LINQ查询结果一样首次迭代,但会在进程中实现,以便第二个foreach将迭代存储的值?

一个有趣的挑战,所以我必须提供自己的解决方案。 事实上,我的解决方案现在很有趣,因为版本3.版本2是我根据Servy的反馈进行的简化。 然后我意识到我的解决方案有很大的缺点。 如果缓存的可枚举的第一个枚举没有完成,则不会进行缓存。 许多LINQ扩展(如FirstTake只会枚举足够的可枚举来完成工作,我不得不更新到版本3以使其与缓存一起工作。

问题是关于可枚举的后续枚举,它不涉及并发访问。 不过我决定让我的解决方案线程安全。 它增加了一些复杂性和一些开销,但应该允许在所有场景中使用该解决方案。

 public static class EnumerableExtensions { public static IEnumerable Cached(this IEnumerable source) { if (source == null) throw new ArgumentNullException("source"); return new CachedEnumerable(source); } } class CachedEnumerable : IEnumerable { readonly Object gate = new Object(); readonly IEnumerable source; readonly List cache = new List(); IEnumerator enumerator; bool isCacheComplete; public CachedEnumerable(IEnumerable source) { this.source = source; } public IEnumerator GetEnumerator() { lock (this.gate) { if (this.isCacheComplete) return this.cache.GetEnumerator(); if (this.enumerator == null) this.enumerator = source.GetEnumerator(); } return GetCacheBuildingEnumerator(); } public IEnumerator GetCacheBuildingEnumerator() { var index = 0; T item; while (TryGetItem(index, out item)) { yield return item; index += 1; } } bool TryGetItem(Int32 index, out T item) { lock (this.gate) { if (!IsItemInCache(index)) { // The iteration may have completed while waiting for the lock. if (this.isCacheComplete) { item = default(T); return false; } if (!this.enumerator.MoveNext()) { item = default(T); this.isCacheComplete = true; this.enumerator.Dispose(); return false; } this.cache.Add(this.enumerator.Current); } item = this.cache[index]; return true; } } bool IsItemInCache(Int32 index) { return index < this.cache.Count; } IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } } 

扩展名使用如下( sequenceIEnumerable ):

 var cachedSequence = sequence.Cached(); // Pulling 2 items from the sequence. foreach (var item in cachedSequence.Take(2)) // ... // Pulling 2 items from the cache and the rest from the source. foreach (var item in cachedSequence) // ... // Pulling all items from the cache. foreach (var item in cachedSequence) // ... 

如果枚举了枚举的部分枚举,则会有轻微的泄漏(例如cachedSequence.Take(2).ToList()ToList使用的枚举器将被cachedSequence.Take(2).ToList()但是底层的源枚举器不会被丢弃。这是因为前2个如果对后续项目的请求进行了缓存,则会缓存项目并保持源枚举器处于活动状态。在这种情况下,源枚举器仅在易于进行垃圾收集时清理(与可能的大缓存同时进行)。

查看Reactive Extentsions库 – 有一个MemoizeAll()扩展,它将在访问IEnumerable后缓存这些项目,并存储它们以供将来访问。

请参阅Bart De Smet MemoizeAll 这篇博文,了解MemoizeAll和其他Rx方法。

编辑 :这实际上现在可以在单独的Interactive Extensions包中找到 – 可以从NuGet或Microsoft Download获得 。

 public static IEnumerable SingleEnumeration(this IEnumerable source) { return new SingleEnumerator(source); } private class SingleEnumerator : IEnumerable { private CacheEntry cacheEntry; public SingleEnumerator(IEnumerable sequence) { cacheEntry = new CacheEntry(sequence.GetEnumerator()); } public IEnumerator GetEnumerator() { if (cacheEntry.FullyPopulated) { return cacheEntry.CachedValues.GetEnumerator(); } else { return iterateSequence(cacheEntry).GetEnumerator(); } } IEnumerator IEnumerable.GetEnumerator() { return this.GetEnumerator(); } } private static IEnumerable iterateSequence(CacheEntry entry) { using (var iterator = entry.CachedValues.GetEnumerator()) { int i = 0; while (entry.ensureItemAt(i) && iterator.MoveNext()) { yield return iterator.Current; i++; } } } private class CacheEntry { public bool FullyPopulated { get; private set; } public ConcurrentQueue CachedValues { get; private set; } private static object key = new object(); private IEnumerator sequence; public CacheEntry(IEnumerator sequence) { this.sequence = sequence; CachedValues = new ConcurrentQueue(); } ///  /// Ensure that the cache has an item a the provided index. If not, take an item from the /// input sequence and move to the cache. /// /// The method is thread safe. ///  /// True if the cache already had enough items or /// an item was moved to the cache, /// false if there were no more items in the sequence. public bool ensureItemAt(int index) { //if the cache already has the items we don't need to lock to know we //can get it if (index < CachedValues.Count) return true; //if we're done there's no race conditions hwere either if (FullyPopulated) return false; lock (key) { //re-check the early-exit conditions in case they changed while we were //waiting on the lock. //we already have the cached item if (index < CachedValues.Count) return true; //we don't have the cached item and there are no uncached items if (FullyPopulated) return false; //we actually need to get the next item from the sequence. if (sequence.MoveNext()) { CachedValues.Enqueue(sequence.Current); return true; } else { FullyPopulated = true; return false; } } } } 

所以这已被编辑(基本上)以支持multithreading访问。 有几个线程可以请求项目,并且逐个项目,它们将被缓存。 它不需要等待整个序列被迭代以使其返回缓存的值。 以下示例程序演示了这一点:

 private static IEnumerable interestingIntGenertionMethod(int maxValue) { for (int i = 0; i < maxValue; i++) { Thread.Sleep(1000); Console.WriteLine("actually generating value: {0}", i); yield return i; } } public static void Main(string[] args) { IEnumerable sequence = interestingIntGenertionMethod(10) .SingleEnumeration(); int numThreads = 3; for (int i = 0; i < numThreads; i++) { int taskID = i; Task.Factory.StartNew(() => { foreach (int value in sequence) { Console.WriteLine("Task: {0} Value:{1}", taskID, value); } }); } Console.WriteLine("Press any key to exit..."); Console.ReadKey(true); } 

你真的需要看到它来了解这里的力量。 一旦单个线程强制生成下一个实际值,所有剩余的线程都可以立即打印该生成的值,但如果没有要打印的线程的未缓存值,它们将全部等待。 (显然线程/线程池调度可能导致一个任务花费更长的时间来打印它的值而不是需要。)