首页 文章

使用.Net HttpListener进行多线程处理

提问于
浏览
25

我有一个听众:

listener = new HttpListener();
listener.Prefixes.Add(@"http://+:8077/");
listener.Start();
listenerThread = new Thread(HandleRequests);
listenerThread.Start();

我正在处理请求:

private void HandleRequests()
{
    while (listener.IsListening)
    {
        var context = listener.BeginGetContext(new AsyncCallback(ListenerCallback), listener);
        context.AsyncWaitHandle.WaitOne();
    }
}

private void ListenerCallback(IAsyncResult ar)
{
    var listener = ar.AsyncState as HttpListener;

    var context = listener.EndGetContext(ar);

    //do some stuff
}

我想用这样的方式写 void Stop()

  • 它将阻塞,直到所有当前处理的请求结束(即等待所有线程到"do some stuff") .

  • 虽然它将等待已经开始的请求,但它将不允许任何更多的请求(即在 ListenerCallback 的开头返回) .

  • 之后它将调用 listener.Stop()listener.IsListening 变为false) .

怎么可能写?

EDIT :您对此解决方案有何看法?安全吗?

public void Stop() 
{
    lock (this)
    {
        isStopping = true;
    }
    resetEvent.WaitOne(); //initially set to true
    listener.Stop();
}

private void ListenerCallback(IAsyncResult ar)
{
    lock (this)
    {
        if (isStopping)
            return;

        resetEvent.Reset();
        numberOfRequests++;
    }

    var listener = ar.AsyncState as HttpListener;

    var context = listener.EndGetContext(ar);

    //do some stuff

    lock (this)
    {
        if (--numberOfRequests == 0)
            resetEvent.Set();
    }
}

5 回答

  • 2

    为了完整性,如果您管理自己的工作线程,它会是什么样子:

    class HttpServer : IDisposable
    {
        private readonly HttpListener _listener;
        private readonly Thread _listenerThread;
        private readonly Thread[] _workers;
        private readonly ManualResetEvent _stop, _ready;
        private Queue<HttpListenerContext> _queue;
    
        public HttpServer(int maxThreads)
        {
            _workers = new Thread[maxThreads];
            _queue = new Queue<HttpListenerContext>();
            _stop = new ManualResetEvent(false);
            _ready = new ManualResetEvent(false);
            _listener = new HttpListener();
            _listenerThread = new Thread(HandleRequests);
        }
    
        public void Start(int port)
        {
            _listener.Prefixes.Add(String.Format(@"http://+:{0}/", port));
            _listener.Start();
            _listenerThread.Start();
    
            for (int i = 0; i < _workers.Length; i++)
            {
                _workers[i] = new Thread(Worker);
                _workers[i].Start();
            }
        }
    
        public void Dispose()
        { Stop(); }
    
        public void Stop()
        {
            _stop.Set();
            _listenerThread.Join();
            foreach (Thread worker in _workers)
                worker.Join();
            _listener.Stop();
        }
    
        private void HandleRequests()
        {
            while (_listener.IsListening)
            {
                var context = _listener.BeginGetContext(ContextReady, null);
    
                if (0 == WaitHandle.WaitAny(new[] { _stop, context.AsyncWaitHandle }))
                    return;
            }
        }
    
        private void ContextReady(IAsyncResult ar)
        {
            try
            {
                lock (_queue)
                {
                    _queue.Enqueue(_listener.EndGetContext(ar));
                    _ready.Set();
                }
            }
            catch { return; }
        }
    
        private void Worker()
        {
            WaitHandle[] wait = new[] { _ready, _stop };
            while (0 == WaitHandle.WaitAny(wait))
            {
                HttpListenerContext context;
                lock (_queue)
                {
                    if (_queue.Count > 0)
                        context = _queue.Dequeue();
                    else
                    {
                        _ready.Reset();
                        continue;
                    }
                }
    
                try { ProcessRequest(context); }
                catch (Exception e) { Console.Error.WriteLine(e); }
            }
        }
    
        public event Action<HttpListenerContext> ProcessRequest;
    }
    
  • 57

    那么有几种方法可以解决这个问题...这是一个简单的例子,它使用信号量来跟踪正在进行的工作,以及在所有 Worker 完成时引发的信号 . 这应该给你一个基本的想法来工作 .

    下面的解决方案并不理想,理想情况下我们应该在调用BeginGetContext之前获取信号量 . 这使关闭变得更加困难,所以我选择使用这种更简化的方法 . 如果我这样做是为了'真实',我可能会编写自己的线程管理而不是依赖ThreadPool . 这将允许更可靠的关闭 .

    无论如何这里是完整的例子:

    class TestHttp
    {
        static void Main()
        {
            using (HttpServer srvr = new HttpServer(5))
            {
                srvr.Start(8085);
                Console.WriteLine("Press [Enter] to quit.");
                Console.ReadLine();
            }
        }
    }
    
    
    class HttpServer : IDisposable
    {
        private readonly int _maxThreads;
        private readonly HttpListener _listener;
        private readonly Thread _listenerThread;
        private readonly ManualResetEvent _stop, _idle;
        private readonly Semaphore _busy;
    
        public HttpServer(int maxThreads)
        {
            _maxThreads = maxThreads;
            _stop = new ManualResetEvent(false);
            _idle = new ManualResetEvent(false);
            _busy = new Semaphore(maxThreads, maxThreads);
            _listener = new HttpListener();
            _listenerThread = new Thread(HandleRequests);
        }
    
        public void Start(int port)
        {
            _listener.Prefixes.Add(String.Format(@"http://+:{0}/", port));
            _listener.Start();
            _listenerThread.Start();
        }
    
        public void Dispose()
        { Stop(); }
    
        public void Stop()
        {
            _stop.Set();
            _listenerThread.Join();
            _idle.Reset();
    
            //aquire and release the semaphore to see if anyone is running, wait for idle if they are.
            _busy.WaitOne();
            if(_maxThreads != 1 + _busy.Release())
                _idle.WaitOne();
    
            _listener.Stop();
        }
    
        private void HandleRequests()
        {
            while (_listener.IsListening)
            {
                var context = _listener.BeginGetContext(ListenerCallback, null);
    
                if (0 == WaitHandle.WaitAny(new[] { _stop, context.AsyncWaitHandle }))
                    return;
            }
        }
    
        private void ListenerCallback(IAsyncResult ar)
        {
            _busy.WaitOne();
            try
            {
                HttpListenerContext context;
                try
                { context = _listener.EndGetContext(ar); }
                catch (HttpListenerException)
                { return; }
    
                if (_stop.WaitOne(0, false))
                    return;
    
                Console.WriteLine("{0} {1}", context.Request.HttpMethod, context.Request.RawUrl);
                context.Response.SendChunked = true;
                using (TextWriter tw = new StreamWriter(context.Response.OutputStream))
                {
                    tw.WriteLine("<html><body><h1>Hello World</h1>");
                    for (int i = 0; i < 5; i++)
                    {
                        tw.WriteLine("<p>{0} @ {1}</p>", i, DateTime.Now);
                        tw.Flush();
                        Thread.Sleep(1000);
                    }
                    tw.WriteLine("</body></html>");
                }
            }
            finally
            {
                if (_maxThreads == 1 + _busy.Release())
                    _idle.Set();
            }
        }
    }
    
  • 4

    我在EDIT的一部分问题中咨询了我的代码,我决定接受一些修改:

    public void Stop() 
    {
        lock (locker)
        {
            isStopping = true;
        }
        resetEvent.WaitOne(); //initially set to true
        listener.Stop();
    }
    
    private void ListenerCallback(IAsyncResult ar)
    {
        lock (locker) //locking on this is a bad idea, but I forget about it before
        {
            if (isStopping)
                return;
    
            resetEvent.Reset();
            numberOfRequests++;
        }
    
        try
        {
            var listener = ar.AsyncState as HttpListener;
    
            var context = listener.EndGetContext(ar);
    
            //do some stuff
        }
        finally //to make sure that bellow code will be executed
        {
            lock (locker)
            {
                if (--numberOfRequests == 0)
                    resetEvent.Set();
            }
        }
    }
    
  • 0

    只需调用listener.Stop()即可 . 这不会终止已经 Build 的任何连接,但会阻止任何新连接 .

  • 0

    这使用BlockingCollection类型的队列来为请求提供服务 . 它可以原样使用 . 您应该从这个派生一个类并覆盖Response .

    using System;
    using System.Collections.Concurrent;
    using System.Net;
    using System.Text;
    using System.Threading;
    
    namespace Service
    {
        class HttpServer : IDisposable
        {
            private HttpListener httpListener;
            private Thread listenerLoop;
            private Thread[] requestProcessors;
            private BlockingCollection<HttpListenerContext> messages;
    
            public HttpServer(int threadCount)
            {
                requestProcessors = new Thread[threadCount];
                messages = new BlockingCollection<HttpListenerContext>();
                httpListener = new HttpListener();
            }
    
            public virtual int Port { get; set; } = 80;
    
            public virtual string[] Prefixes
            {
                get { return new string[] {string.Format(@"http://+:{0}/", Port )}; }
            }
    
            public void Start(int port)
            {
                listenerLoop = new Thread(HandleRequests);
    
                foreach( string prefix in Prefixes ) httpListener.Prefixes.Add( prefix );
    
                listenerLoop.Start();
    
                for (int i = 0; i < requestProcessors.Length; i++)
                {
                    requestProcessors[i] = StartProcessor(i, messages);
                }
            }
    
            public void Dispose() { Stop(); }
    
            public void Stop()
            {
                messages.CompleteAdding();
    
                foreach (Thread worker in requestProcessors) worker.Join();
    
                httpListener.Stop();
                listenerLoop.Join();
            }
    
            private void HandleRequests()
            {
                httpListener.Start();
                try 
                {
                    while (httpListener.IsListening)
                    {
                        Console.WriteLine("The Linstener Is Listening!");
                        HttpListenerContext context = httpListener.GetContext();
    
                        messages.Add(context);
                        Console.WriteLine("The Linstener has added a message!");
                    }
                }
                catch(Exception e)
                {
                    Console.WriteLine (e.Message);
                }
            }
    
            private Thread StartProcessor(int number, BlockingCollection<HttpListenerContext> messages)
            {
                Thread thread = new Thread(() => Processor(number, messages));
                thread.Start();
                return thread;
            }
    
            private void Processor(int number, BlockingCollection<HttpListenerContext> messages)
            {
                Console.WriteLine ("Processor {0} started.", number);
                try
                {
                    for (;;)
                    {
                        Console.WriteLine ("Processor {0} awoken.", number);
                        HttpListenerContext context = messages.Take();
                        Console.WriteLine ("Processor {0} dequeued message.", number);
                        Response (context);
                    }
                } catch { }
    
                Console.WriteLine ("Processor {0} terminated.", number);
            }
    
            public virtual void Response(HttpListenerContext context)
            {
                SendReply(context, new StringBuilder("<html><head><title>NULL</title></head><body>This site not yet implementd.</body></html>") );
            }
    
            public static void SendReply(HttpListenerContext context, StringBuilder responseString )
            {
                byte[] buffer = System.Text.Encoding.UTF8.GetBytes(responseString.ToString());
                context.Response.ContentLength64 = buffer.Length;
                System.IO.Stream output = context.Response.OutputStream;
                output.Write(buffer, 0, buffer.Length);
                output.Close();
            }
        }
    }
    

    这是如何使用它的示例 . 无需使用事件或任何锁定块 . BlockingCollection解决了所有这些问题 .

    using System;
    using System.Collections.Concurrent;
    using System.IO;
    using System.Net;
    using System.Text;
    using System.Threading;
    
    namespace Service
    {
      class Server
      {
        public static void Main (string[] args)
        {
            HttpServer Service = new QuizzServer (8);
            Service.Start (80);
            for (bool coninute = true; coninute ;)
            {
                string input = Console.ReadLine ().ToLower();
                switch (input)
                {
                    case "stop":
                        Console.WriteLine ("Stop command accepted.");
                        Service.Stop ();
                        coninute = false;
                        break;
                    default:
                        Console.WriteLine ("Unknown Command: '{0}'.",input);
                        break;
                }
            }
        }
      }
    }
    

相关问题