具有动态maxCount的SemaphoreSlim

我遇到了一个问题,我需要限制对另一个Web服务器的调用次数。 它会有所不同,因为服务器是共享的,也许它可能有更多或更少的容量。

我在考虑使用SemaphoreSlim类,但没有公共属性来更改最大计数。

我应该将我的SemaphoreSlim类包装在另一个将处理最大计数的类中吗? 有没有更好的方法?

编辑:

这是我正在尝试的:

using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Semaphore { class Program { static SemaphoreSlim _sem = new SemaphoreSlim(10,10000); static void Main(string[] args) { int max = 15; for (int i = 1; i <= 50; i++) { new Thread(Enter).Start(new int[] { i, max}); } Console.ReadLine(); max = 11; for (int i = 1; i <= 50; i++) { new Thread(Enter).Start(new int[] { i, max }); } } static void Enter(object param) { int[] arr = (int[])param; int id = arr[0]; int max = arr[1]; try { Console.WriteLine(_sem.CurrentCount); if (_sem.CurrentCount <= max) _sem.Release(1); else { _sem.Wait(1000); Console.WriteLine(id + " wants to enter"); Thread.Sleep((1000 * id) / 2); // can be here at Console.WriteLine(id + " is in!"); // Only three threads } } catch(Exception ex) { Console.WriteLine("opps ", id); Console.WriteLine(ex.Message); } finally { _sem.Release(); } } } } 

问题:

1-_sem.Wait(1000)应该取消执行超过1000ms的线程,不是吗?

2 – 我是否有使用发布/等待的想法?

您无法更改最大计数,但您可以创建具有非常高的最大计数的SemaphoreSlim ,并保留其中一些。 看到这个构造函数 。

所以我们假设绝对最大并发调用数是100,但最初你希望它是25.你初始化你的信号量:

 SemaphoreSlim sem = new SemaphoreSlim(25, 100); 

因此25是可以同时服务的请求数。 你保留了其他75。

如果您想增加允许的数量,只需调用Release(num) 。 如果你打电话给Release(10) ,那么这个数字会变为35。

现在,如果要减少可用请求的数量,则必须多次调用WaitOne 。 例如,如果要从可用计数中删除10:

 for (var i = 0; i < 10; ++i) { sem.WaitOne(); } 

这有可能阻塞,直到其他客户端释放信号量。 也就是说,如果您允许35个并发请求,并且您希望将其减少到25,但是已经有35个客户端具有活动请求,那么WaitOne将阻塞,直到客户端调用Release ,并且循环将不会终止,直到10个客户端发布。

  1. 获取信号量。
  2. 将容量设置为比您需要的容量高得多的容量。
  3. 将初始容量设置为您希望的实际最大容量。
  4. 将信号量发给别人使用。

此时,您可以等待所需的信号量(没有相应的释放调用)来降低容量。 您可以多次释放信号量(没有相应的等待呼叫)以增加有效容量。

如果这是你正在做的事情,你可以创建自己的信号量类,组成一个SemaphoreSlim并封装这个逻辑。 如果您的代码已经释放信号量而没有先等待它,那么这个组合也是必不可少的; 使用您自己的类,您可以确保此类版本是no-ops。 (也就是说,你应该避免把自己置于那个位置,真的。)

好的,我可以解决我的问题看单声道项目。

 // SemaphoreSlim.cs // // Copyright (c) 2008 Jérémie "Garuma" Laval // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. // // using System; using System.Diagnostics; using System.Threading.Tasks; namespace System.Threading { public class SemaphoreSlimCustom : IDisposable { const int spinCount = 10; const int deepSleepTime = 20; private object _sync = new object(); int maxCount; int currCount; bool isDisposed; public int MaxCount { get { lock (_sync) { return maxCount; } } set { lock (_sync) { maxCount = value; } } } EventWaitHandle handle; public SemaphoreSlimCustom (int initialCount) : this (initialCount, int.MaxValue) { } public SemaphoreSlimCustom (int initialCount, int maxCount) { if (initialCount < 0 || initialCount > maxCount || maxCount < 0) throw new ArgumentOutOfRangeException ("The initialCount argument is negative, initialCount is greater than maxCount, or maxCount is not positive."); this.maxCount = maxCount; this.currCount = initialCount; this.handle = new ManualResetEvent (initialCount > 0); } public void Dispose () { Dispose(true); } protected virtual void Dispose (bool disposing) { isDisposed = true; } void CheckState () { if (isDisposed) throw new ObjectDisposedException ("The SemaphoreSlim has been disposed."); } public int CurrentCount { get { return currCount; } } public int Release () { return Release(1); } public int Release (int releaseCount) { CheckState (); if (releaseCount < 1) throw new ArgumentOutOfRangeException ("releaseCount", "releaseCount is less than 1"); // As we have to take care of the max limit we resort to CAS int oldValue, newValue; do { oldValue = currCount; newValue = (currCount + releaseCount); newValue = newValue > maxCount ? maxCount : newValue; } while (Interlocked.CompareExchange (ref currCount, newValue, oldValue) != oldValue); handle.Set (); return oldValue; } public void Wait () { Wait (CancellationToken.None); } public bool Wait (TimeSpan timeout) { return Wait ((int)timeout.TotalMilliseconds, CancellationToken.None); } public bool Wait (int millisecondsTimeout) { return Wait (millisecondsTimeout, CancellationToken.None); } public void Wait (CancellationToken cancellationToken) { Wait (-1, cancellationToken); } public bool Wait (TimeSpan timeout, CancellationToken cancellationToken) { CheckState(); return Wait ((int)timeout.TotalMilliseconds, cancellationToken); } public bool Wait (int millisecondsTimeout, CancellationToken cancellationToken) { CheckState (); if (millisecondsTimeout < -1) throw new ArgumentOutOfRangeException ("millisecondsTimeout", "millisecondsTimeout is a negative number other than -1"); Stopwatch sw = Stopwatch.StartNew(); Func stopCondition = () => millisecondsTimeout >= 0 && sw.ElapsedMilliseconds > millisecondsTimeout; do { bool shouldWait; int result; do { cancellationToken.ThrowIfCancellationRequested (); if (stopCondition ()) return false; shouldWait = true; result = currCount; if (result > 0) shouldWait = false; else break; } while (Interlocked.CompareExchange (ref currCount, result - 1, result) != result); if (!shouldWait) { if (result == 1) handle.Reset (); break; } SpinWait wait = new SpinWait (); while (Thread.VolatileRead (ref currCount) <= 0) { cancellationToken.ThrowIfCancellationRequested (); if (stopCondition ()) return false; if (wait.Count > spinCount) { int diff = millisecondsTimeout - (int)sw.ElapsedMilliseconds; int timeout = millisecondsTimeout < 0 ? deepSleepTime : Math.Min (Math.Max (diff, 1), deepSleepTime); handle.WaitOne (timeout); } else wait.SpinOnce (); } } while (true); return true; } public WaitHandle AvailableWaitHandle { get { return handle; } } public Task WaitAsync () { return Task.Factory.StartNew (() => Wait ()); } public Task WaitAsync (CancellationToken cancellationToken) { return Task.Factory.StartNew (() => Wait (cancellationToken), cancellationToken); } public Task WaitAsync (int millisecondsTimeout) { return Task.Factory.StartNew (() => Wait (millisecondsTimeout)); } public Task WaitAsync (TimeSpan timeout) { return Task.Factory.StartNew (() => Wait (timeout)); } public Task WaitAsync (int millisecondsTimeout, CancellationToken cancellationToken) { return Task.Factory.StartNew (() => Wait (millisecondsTimeout, cancellationToken), cancellationToken); } public Task WaitAsync (TimeSpan timeout, CancellationToken cancellationToken) { return Task.Factory.StartNew (() => Wait (timeout, cancellationToken), cancellationToken); } } }