C#中的带宽限制

我正在开发一个程序,在后台不断发送数据流,我想让用户为上传和下载限制设置上限。

我已经阅读了令牌桶和漏桶的算法,看起来后者似乎符合描述,因为这不是最大化网络带宽而是尽可能不引人注意的问题。

然而,我对如何实现这一点有点不确定。 一种自然的方法是扩展抽象Stream类以使扩展现有流量变得简单,但是这不需要额外线程的参与来发送数据同时接收(漏桶)吗? 任何有关其他实现的提示都将受到赞赏。

此外,虽然我可以修改程序接收的数据量,但带宽限制在C#级别的工作情况如何? 计算机是否仍会接收数据并将其保存,有效取消限制效果还是等到我要求接收更多数据?

编辑:我感兴趣的是限制传入和传出数据,我无法控制流的另一端。

请参阅有关ThrottledStream类的文章。 它应该符合您的需求。

我提出了arul提到的ThrottledStream-Class的不同实现。 我的版本使用WaitHandle和具有1s间隔的Timer:

public ThrottledStream(Stream parentStream, int maxBytesPerSecond=int.MaxValue) { MaxBytesPerSecond = maxBytesPerSecond; parent = parentStream; processed = 0; resettimer = new System.Timers.Timer(); resettimer.Interval = 1000; resettimer.Elapsed += resettimer_Elapsed; resettimer.Start(); } protected void Throttle(int bytes) { try { processed += bytes; if (processed >= maxBytesPerSecond) wh.WaitOne(); } catch { } } private void resettimer_Elapsed(object sender, ElapsedEventArgs e) { processed = 0; wh.Set(); } 

每当带宽限制超过线程时,线程将一直睡到下一秒开始。 无需计算最佳睡眠持续时间。

全面实施:

 public class ThrottledStream : Stream { #region Properties private int maxBytesPerSecond; ///  /// Number of Bytes that are allowed per second ///  public int MaxBytesPerSecond { get { return maxBytesPerSecond; } set { if (value < 1) throw new ArgumentException("MaxBytesPerSecond has to be >0"); maxBytesPerSecond = value; } } #endregion #region Private Members private int processed; System.Timers.Timer resettimer; AutoResetEvent wh = new AutoResetEvent(true); private Stream parent; #endregion ///  /// Creates a new Stream with Databandwith cap ///  ///  ///  public ThrottledStream(Stream parentStream, int maxBytesPerSecond=int.MaxValue) { MaxBytesPerSecond = maxBytesPerSecond; parent = parentStream; processed = 0; resettimer = new System.Timers.Timer(); resettimer.Interval = 1000; resettimer.Elapsed += resettimer_Elapsed; resettimer.Start(); } protected void Throttle(int bytes) { try { processed += bytes; if (processed >= maxBytesPerSecond) wh.WaitOne(); } catch { } } private void resettimer_Elapsed(object sender, ElapsedEventArgs e) { processed = 0; wh.Set(); } #region Stream-Overrides public override void Close() { resettimer.Stop(); resettimer.Close(); base.Close(); } protected override void Dispose(bool disposing) { resettimer.Dispose(); base.Dispose(disposing); } public override bool CanRead { get { return parent.CanRead; } } public override bool CanSeek { get { return parent.CanSeek; } } public override bool CanWrite { get { return parent.CanWrite; } } public override void Flush() { parent.Flush(); } public override long Length { get { return parent.Length; } } public override long Position { get { return parent.Position; } set { parent.Position = value; } } public override int Read(byte[] buffer, int offset, int count) { Throttle(count); return parent.Read(buffer, offset, count); } public override long Seek(long offset, SeekOrigin origin) { return parent.Seek(offset, origin); } public override void SetLength(long value) { parent.SetLength(value); } public override void Write(byte[] buffer, int offset, int count) { Throttle(count); parent.Write(buffer, offset, count); } #endregion } 

基于@ 0xDEADBEEF的解决方案,我基于Rx调度程序创建了以下(可测试的)解决方案:

 public class ThrottledStream : Stream { private readonly Stream parent; private readonly int maxBytesPerSecond; private readonly IScheduler scheduler; private readonly IStopwatch stopwatch; private long processed; public ThrottledStream(Stream parent, int maxBytesPerSecond, IScheduler scheduler) { this.maxBytesPerSecond = maxBytesPerSecond; this.parent = parent; this.scheduler = scheduler; stopwatch = scheduler.StartStopwatch(); processed = 0; } public ThrottledStream(Stream parent, int maxBytesPerSecond) : this (parent, maxBytesPerSecond, Scheduler.Immediate) { } protected void Throttle(int bytes) { processed += bytes; var targetTime = TimeSpan.FromSeconds((double)processed / maxBytesPerSecond); var actualTime = stopwatch.Elapsed; var sleep = targetTime - actualTime; if (sleep > TimeSpan.Zero) { using (var waitHandle = new AutoResetEvent(initialState: false)) { scheduler.Sleep(sleep).GetAwaiter().OnCompleted(() => waitHandle.Set()); waitHandle.WaitOne(); } } } public override bool CanRead { get { return parent.CanRead; } } public override bool CanSeek { get { return parent.CanSeek; } } public override bool CanWrite { get { return parent.CanWrite; } } public override void Flush() { parent.Flush(); } public override long Length { get { return parent.Length; } } public override long Position { get { return parent.Position; } set { parent.Position = value; } } public override int Read(byte[] buffer, int offset, int count) { var read = parent.Read(buffer, offset, count); Throttle(read); return read; } public override long Seek(long offset, SeekOrigin origin) { return parent.Seek(offset, origin); } public override void SetLength(long value) { parent.SetLength(value); } public override void Write(byte[] buffer, int offset, int count) { Throttle(count); parent.Write(buffer, offset, count); } } 

和一些只需要几毫秒的测试:

 [TestMethod] public void ShouldThrottleReading() { var content = Enumerable .Range(0, 1024 * 1024) .Select(_ => (byte)'a') .ToArray(); var scheduler = new TestScheduler(); var source = new ThrottledStream(new MemoryStream(content), content.Length / 8, scheduler); var target = new MemoryStream(); var t = source.CopyToAsync(target); t.Wait(10).Should().BeFalse(); scheduler.AdvanceTo(TimeSpan.FromSeconds(4).Ticks); t.Wait(10).Should().BeFalse(); scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks - 1); t.Wait(10).Should().BeFalse(); scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks); t.Wait(10).Should().BeTrue(); } [TestMethod] public void ShouldThrottleWriting() { var content = Enumerable .Range(0, 1024 * 1024) .Select(_ => (byte)'a') .ToArray(); var scheduler = new TestScheduler(); var source = new MemoryStream(content); var target = new ThrottledStream(new MemoryStream(), content.Length / 8, scheduler); var t = source.CopyToAsync(target); t.Wait(10).Should().BeFalse(); scheduler.AdvanceTo(TimeSpan.FromSeconds(4).Ticks); t.Wait(10).Should().BeFalse(); scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks - 1); t.Wait(10).Should().BeFalse(); scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks); t.Wait(10).Should().BeTrue(); }