首页 文章

20使用SocketAsyncEventArgs每秒接收

提问于
浏览
9

使用SocketAsyncEventArgs开发TCP服务器,它是Windows服务的异步方法 . 我在Main的开头有这两行代码:

ThreadPool.SetMaxThreads(15000, 30000);
ThreadPool.SetMinThreads(10000, 20000);

并且都返回true(记录返回值) . 现在2000到3000个客户端开始向此服务器发送消息,它开始接受连接(我计算连接数,它是预期的 - 有一个连接池) . 服务器进程的线程数将增长到~2050到~3050 . 到现在为止还挺好!

现在有一个Received方法,在ReceiveAsync返回true或SocketAsyncEventArgs的Completed事件后调用 .

问题就此开始了:无论客户端连接多少以及发送的消息数量多少,接收的消息最多只能在一秒钟内调用20次!随着客户数量的增加,这个数字(20)下降到~10 .

环境:TCP服务器和客户端正在同一台计算机上进行模拟 . 我在两台机器上测试了代码,一台有2核CPU和4GB RAM,另一台有8核CPU和12GB RAM . (还没有)数据丢失,有时我在每次接收操作中都会收到多条消息 . 没关系 . 但是如何才能增加接收操作的数量?

关于实现的附加说明:代码很大,包含许多不同的逻辑 . 总体描述如下:我有一个SocketAsyncEventArgs用于接受新连接 . 它很棒 . 现在,对于每个新接受的连接,我创建一个新的SocketAsyncEventArgs来接收数据 . 我把这个(为接收创建的SocketAsyncEventArgs)放在一个池中 . 它不会被重用,但它的UserToken被用于跟踪连接;例如那些断开连接的连接或那些7分钟内没有发送任何数据的连接将被关闭和处理(SocketAsyncEventArgs的AcceptSocket将被关闭(两者),关闭和处理,SocketAsyncEventArgs对象本身也是如此) . 这是一个执行这些任务的Sudo类,但所有其他逻辑和日志记录以及错误检查和其他任何内容都被删除,以使其简单明了(也许更容易发现有问题的代码):

class Sudo
{
    Socket _listener;
    int _port = 8797;

    public Sudo()
    {
        var ipEndPoint = new IPEndPoint(IPAddress.Any, _port);
        _listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        _listener.Bind(ipEndPoint);

        _listener.Listen(100);

        Accept(null);
    }

    void Accept(SocketAsyncEventArgs acceptEventArg)
    {
        if (acceptEventArg == null)
        {
            acceptEventArg = new SocketAsyncEventArgs();
            acceptEventArg.Completed += AcceptCompleted;
        }
        else acceptEventArg.AcceptSocket = null;

        bool willRaiseEvent = _listener.AcceptAsync(acceptEventArg); ;

        if (!willRaiseEvent) Accepted(acceptEventArg);
    }

    void AcceptCompleted(object sender, SocketAsyncEventArgs e)
    {
        Accepted(e);
    }

    void Accepted(SocketAsyncEventArgs e)
    {
        var acceptSocket = e.AcceptSocket;
        var readEventArgs = CreateArg(acceptSocket);

        var willRaiseEvent = acceptSocket.ReceiveAsync(readEventArgs);

        Accept(e);

        if (!willRaiseEvent) Received(readEventArgs);
    }

    SocketAsyncEventArgs CreateArg(Socket acceptSocket)
    {
        var arg = new SocketAsyncEventArgs();
        arg.Completed += IOCompleted;

        var buffer = new byte[64 * 1024];
        arg.SetBuffer(buffer, 0, buffer.Length);

        arg.AcceptSocket = acceptSocket;

        arg.SocketFlags = SocketFlags.None;

        return arg;
    }

    void IOCompleted(object sender, SocketAsyncEventArgs e)
    {
        switch (e.LastOperation)
        {
            case SocketAsyncOperation.Receive:
                Received(e);
                break;
            default: break;
        }
    }

    void Received(SocketAsyncEventArgs e)
    {
        if (e.SocketError != SocketError.Success || e.BytesTransferred == 0 || e.Buffer == null || e.Buffer.Length == 0)
        {
            // Kill(e);
            return;
        }

        var bytesList = new List<byte>();
        for (var i = 0; i < e.BytesTransferred; i++) bytesList.Add(e.Buffer[i]);

        var bytes = bytesList.ToArray();

        Process(bytes);

        ReceiveRest(e);

        Perf.IncOp();
    }

    void ReceiveRest(SocketAsyncEventArgs e)
    {
        e.SocketFlags = SocketFlags.None;
        for (int i = 0; i < e.Buffer.Length; i++) e.Buffer[i] = 0;
        e.SetBuffer(0, e.Buffer.Length);

        var willRaiseEvent = e.AcceptSocket.ReceiveAsync(e);
        if (!willRaiseEvent) Received(e);
    }

    void Process(byte[] bytes) { }
}

1 回答

  • 4

    它减速的原因是因为这些线程中的每一个都需要上下文切换,这是一个相对昂贵的操作 . 您添加的线程越多,CPU的百分比就越多地仅用于上下文切换而不是实际代码 .

    你以一种相当奇怪的方式击中了每个客户端的单线程瓶颈 . 服务器端异步的重点是减少线程数 - 每个客户端没有一个线程,但理想情况下,系统中每个逻辑处理器只有一个或两个 .

    您发布的异步代码看起来很好,所以我只能猜测您的 Process 方法有一些容易被忽视的非异步,阻塞其中的I / O,即数据库或文件访问 . 当I / O阻塞时,.NET线程池检测到这一点并自动旋转一个新线程 - 这里基本上是失控的,而 Process 中的I / O是一个瓶颈 .

    异步管道确实需要100%异步才能从中获得任何显着的好处 . Half-in将让您编写复杂的代码,其性能与简单的同步代码一样差 .

    如果你绝对不能使 Process 方法完全异步,你可能会有一些运气伪造它 . 让一些东西在队列中等待由一个有限大小的小型线程池进行处理 .

相关问题