使用.Net HttpListener进行multithreading处理

我有一个听众:

listener = new HttpListener(); listener.Prefixes.Add(@"http://+:8077/"); listener.Start(); listenerThread = new Thread(HandleRequests); listenerThread.Start(); 

我正在处理请求:

 private void HandleRequests() { while (listener.IsListening) { var context = listener.BeginGetContext(new AsyncCallback(ListenerCallback), listener); context.AsyncWaitHandle.WaitOne(); } } private void ListenerCallback(IAsyncResult ar) { var listener = ar.AsyncState as HttpListener; var context = listener.EndGetContext(ar); //do some stuff } 

我想以这样的方式写出void Stop()

  1. 它将阻塞,直到所有当前处理的请求结束(即等待所有线程“执行某些操作”)。
  2. 虽然它将等待已经启动的请求,但它将不允许任何更多的请求(即在ListenerCallback的开头返回)。
  3. 之后它将调用listener.Stop()listener.IsListening变为false)。

怎么会写?

编辑 :您对此解决方案有何看法? 安全吗?

 public void Stop() { lock (this) { isStopping = true; } resetEvent.WaitOne(); //initially set to true listener.Stop(); } private void ListenerCallback(IAsyncResult ar) { lock (this) { if (isStopping) return; resetEvent.Reset(); numberOfRequests++; } var listener = ar.AsyncState as HttpListener; var context = listener.EndGetContext(ar); //do some stuff lock (this) { if (--numberOfRequests == 0) resetEvent.Set(); } } 

为了完整性,如果您管理自己的工作线程,它会是什么样子:

 class HttpServer : IDisposable { private readonly HttpListener _listener; private readonly Thread _listenerThread; private readonly Thread[] _workers; private readonly ManualResetEvent _stop, _ready; private Queue _queue; public HttpServer(int maxThreads) { _workers = new Thread[maxThreads]; _queue = new Queue(); _stop = new ManualResetEvent(false); _ready = new ManualResetEvent(false); _listener = new HttpListener(); _listenerThread = new Thread(HandleRequests); } public void Start(int port) { _listener.Prefixes.Add(String.Format(@"http://+:{0}/", port)); _listener.Start(); _listenerThread.Start(); for (int i = 0; i < _workers.Length; i++) { _workers[i] = new Thread(Worker); _workers[i].Start(); } } public void Dispose() { Stop(); } public void Stop() { _stop.Set(); _listenerThread.Join(); foreach (Thread worker in _workers) worker.Join(); _listener.Stop(); } private void HandleRequests() { while (_listener.IsListening) { var context = _listener.BeginGetContext(ContextReady, null); if (0 == WaitHandle.WaitAny(new[] { _stop, context.AsyncWaitHandle })) return; } } private void ContextReady(IAsyncResult ar) { try { lock (_queue) { _queue.Enqueue(_listener.EndGetContext(ar)); _ready.Set(); } } catch { return; } } private void Worker() { WaitHandle[] wait = new[] { _ready, _stop }; while (0 == WaitHandle.WaitAny(wait)) { HttpListenerContext context; lock (_queue) { if (_queue.Count > 0) context = _queue.Dequeue(); else { _ready.Reset(); continue; } } try { ProcessRequest(context); } catch (Exception e) { Console.Error.WriteLine(e); } } } public event Action ProcessRequest; } 

那么有几种方法可以解决这个问题…这是一个简单的例子,它使用信号量来跟踪正在进行的工作,以及在所有工人完成时引发的信号。 这应该给你一个基本的想法来工作。

下面的解决方案并不理想,理想情况下我们应该在调用BeginGetContext之前获取信号量。 这使关闭变得更加困难,所以我选择使用这种更简化的方法。 如果我这样做是为了’真实’,我可能会编写自己的线程管理而不是依赖ThreadPool。 这将允许更可靠的关闭。

无论如何这里是完整的例子:

 class TestHttp { static void Main() { using (HttpServer srvr = new HttpServer(5)) { srvr.Start(8085); Console.WriteLine("Press [Enter] to quit."); Console.ReadLine(); } } } class HttpServer : IDisposable { private readonly int _maxThreads; private readonly HttpListener _listener; private readonly Thread _listenerThread; private readonly ManualResetEvent _stop, _idle; private readonly Semaphore _busy; public HttpServer(int maxThreads) { _maxThreads = maxThreads; _stop = new ManualResetEvent(false); _idle = new ManualResetEvent(false); _busy = new Semaphore(maxThreads, maxThreads); _listener = new HttpListener(); _listenerThread = new Thread(HandleRequests); } public void Start(int port) { _listener.Prefixes.Add(String.Format(@"http://+:{0}/", port)); _listener.Start(); _listenerThread.Start(); } public void Dispose() { Stop(); } public void Stop() { _stop.Set(); _listenerThread.Join(); _idle.Reset(); //aquire and release the semaphore to see if anyone is running, wait for idle if they are. _busy.WaitOne(); if(_maxThreads != 1 + _busy.Release()) _idle.WaitOne(); _listener.Stop(); } private void HandleRequests() { while (_listener.IsListening) { var context = _listener.BeginGetContext(ListenerCallback, null); if (0 == WaitHandle.WaitAny(new[] { _stop, context.AsyncWaitHandle })) return; } } private void ListenerCallback(IAsyncResult ar) { _busy.WaitOne(); try { HttpListenerContext context; try { context = _listener.EndGetContext(ar); } catch (HttpListenerException) { return; } if (_stop.WaitOne(0, false)) return; Console.WriteLine("{0} {1}", context.Request.HttpMethod, context.Request.RawUrl); context.Response.SendChunked = true; using (TextWriter tw = new StreamWriter(context.Response.OutputStream)) { tw.WriteLine("

Hello World

"); for (int i = 0; i < 5; i++) { tw.WriteLine("

{0} @ {1}

", i, DateTime.Now); tw.Flush(); Thread.Sleep(1000); } tw.WriteLine(""); } } finally { if (_maxThreads == 1 + _busy.Release()) _idle.Set(); } } }

我已经在我的问题的EDIT部分咨询了我的代码,我决定接受它进行一些修改:

 public void Stop() { lock (locker) { isStopping = true; } resetEvent.WaitOne(); //initially set to true listener.Stop(); } private void ListenerCallback(IAsyncResult ar) { lock (locker) //locking on this is a bad idea, but I forget about it before { if (isStopping) return; resetEvent.Reset(); numberOfRequests++; } try { var listener = ar.AsyncState as HttpListener; var context = listener.EndGetContext(ar); //do some stuff } finally //to make sure that bellow code will be executed { lock (locker) { if (--numberOfRequests == 0) resetEvent.Set(); } } } 

只需调用listener.Stop()即可。 这不会终止已经建立的任何连接,但会阻止任何新连接。

这使用BlockingCollection类型的队列来为请求提供服务。 它可以原样使用。 您应该从这个派生一个类并覆盖Response。

 using System; using System.Collections.Concurrent; using System.Net; using System.Text; using System.Threading; namespace Service { class HttpServer : IDisposable { private HttpListener httpListener; private Thread listenerLoop; private Thread[] requestProcessors; private BlockingCollection messages; public HttpServer(int threadCount) { requestProcessors = new Thread[threadCount]; messages = new BlockingCollection(); httpListener = new HttpListener(); } public virtual int Port { get; set; } = 80; public virtual string[] Prefixes { get { return new string[] {string.Format(@"http://+:{0}/", Port )}; } } public void Start(int port) { listenerLoop = new Thread(HandleRequests); foreach( string prefix in Prefixes ) httpListener.Prefixes.Add( prefix ); listenerLoop.Start(); for (int i = 0; i < requestProcessors.Length; i++) { requestProcessors[i] = StartProcessor(i, messages); } } public void Dispose() { Stop(); } public void Stop() { messages.CompleteAdding(); foreach (Thread worker in requestProcessors) worker.Join(); httpListener.Stop(); listenerLoop.Join(); } private void HandleRequests() { httpListener.Start(); try { while (httpListener.IsListening) { Console.WriteLine("The Linstener Is Listening!"); HttpListenerContext context = httpListener.GetContext(); messages.Add(context); Console.WriteLine("The Linstener has added a message!"); } } catch(Exception e) { Console.WriteLine (e.Message); } } private Thread StartProcessor(int number, BlockingCollection messages) { Thread thread = new Thread(() => Processor(number, messages)); thread.Start(); return thread; } private void Processor(int number, BlockingCollection messages) { Console.WriteLine ("Processor {0} started.", number); try { for (;;) { Console.WriteLine ("Processor {0} awoken.", number); HttpListenerContext context = messages.Take(); Console.WriteLine ("Processor {0} dequeued message.", number); Response (context); } } catch { } Console.WriteLine ("Processor {0} terminated.", number); } public virtual void Response(HttpListenerContext context) { SendReply(context, new StringBuilder("NULLThis site not yet implementd.") ); } public static void SendReply(HttpListenerContext context, StringBuilder responseString ) { byte[] buffer = System.Text.Encoding.UTF8.GetBytes(responseString.ToString()); context.Response.ContentLength64 = buffer.Length; System.IO.Stream output = context.Response.OutputStream; output.Write(buffer, 0, buffer.Length); output.Close(); } } } 

这是如何使用它的示例。 无需使用事件或任何锁定块。 BlockingCollection解决了所有这些问题。

 using System; using System.Collections.Concurrent; using System.IO; using System.Net; using System.Text; using System.Threading; namespace Service { class Server { public static void Main (string[] args) { HttpServer Service = new QuizzServer (8); Service.Start (80); for (bool coninute = true; coninute ;) { string input = Console.ReadLine ().ToLower(); switch (input) { case "stop": Console.WriteLine ("Stop command accepted."); Service.Stop (); coninute = false; break; default: Console.WriteLine ("Unknown Command: '{0}'.",input); break; } } } } }