使用SocketAsyncEventArgs开发TCP服务器,它是Windows服务的异步方法 . 我在Main的开头有这两行代码:
ThreadPool.SetMaxThreads(15000, 30000);
ThreadPool.SetMinThreads(10000, 20000);
并且都返回true(记录返回值) . 现在2000到3000个客户端开始向此服务器发送消息,它开始接受连接(我计算连接数,它是预期的 - 有一个连接池) . 服务器进程的线程数将增长到~2050到~3050 . 到现在为止还挺好!
现在有一个Received方法,在ReceiveAsync返回true或SocketAsyncEventArgs的Completed事件后调用 .
问题就此开始了:无论客户端连接多少以及发送的消息数量多少,接收的消息最多只能在一秒钟内调用20次!随着客户数量的增加,这个数字(20)下降到~10 .
环境:TCP服务器和客户端正在同一台计算机上进行模拟 . 我在两台机器上测试了代码,一台有2核CPU和4GB RAM,另一台有8核CPU和12GB RAM . (还没有)数据丢失,有时我在每次接收操作中都会收到多条消息 . 没关系 . 但是如何才能增加接收操作的数量?
关于实现的附加说明:代码很大,包含许多不同的逻辑 . 总体描述如下:我有一个SocketAsyncEventArgs用于接受新连接 . 它很棒 . 现在,对于每个新接受的连接,我创建一个新的SocketAsyncEventArgs来接收数据 . 我把这个(为接收创建的SocketAsyncEventArgs)放在一个池中 . 它不会被重用,但它的UserToken被用于跟踪连接;例如那些断开连接的连接或那些7分钟内没有发送任何数据的连接将被关闭和处理(SocketAsyncEventArgs的AcceptSocket将被关闭(两者),关闭和处理,SocketAsyncEventArgs对象本身也是如此) . 这是一个执行这些任务的Sudo类,但所有其他逻辑和日志记录以及错误检查和其他任何内容都被删除,以使其简单明了(也许更容易发现有问题的代码):
class Sudo
{
Socket _listener;
int _port = 8797;
public Sudo()
{
var ipEndPoint = new IPEndPoint(IPAddress.Any, _port);
_listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_listener.Bind(ipEndPoint);
_listener.Listen(100);
Accept(null);
}
void Accept(SocketAsyncEventArgs acceptEventArg)
{
if (acceptEventArg == null)
{
acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += AcceptCompleted;
}
else acceptEventArg.AcceptSocket = null;
bool willRaiseEvent = _listener.AcceptAsync(acceptEventArg); ;
if (!willRaiseEvent) Accepted(acceptEventArg);
}
void AcceptCompleted(object sender, SocketAsyncEventArgs e)
{
Accepted(e);
}
void Accepted(SocketAsyncEventArgs e)
{
var acceptSocket = e.AcceptSocket;
var readEventArgs = CreateArg(acceptSocket);
var willRaiseEvent = acceptSocket.ReceiveAsync(readEventArgs);
Accept(e);
if (!willRaiseEvent) Received(readEventArgs);
}
SocketAsyncEventArgs CreateArg(Socket acceptSocket)
{
var arg = new SocketAsyncEventArgs();
arg.Completed += IOCompleted;
var buffer = new byte[64 * 1024];
arg.SetBuffer(buffer, 0, buffer.Length);
arg.AcceptSocket = acceptSocket;
arg.SocketFlags = SocketFlags.None;
return arg;
}
void IOCompleted(object sender, SocketAsyncEventArgs e)
{
switch (e.LastOperation)
{
case SocketAsyncOperation.Receive:
Received(e);
break;
default: break;
}
}
void Received(SocketAsyncEventArgs e)
{
if (e.SocketError != SocketError.Success || e.BytesTransferred == 0 || e.Buffer == null || e.Buffer.Length == 0)
{
// Kill(e);
return;
}
var bytesList = new List<byte>();
for (var i = 0; i < e.BytesTransferred; i++) bytesList.Add(e.Buffer[i]);
var bytes = bytesList.ToArray();
Process(bytes);
ReceiveRest(e);
Perf.IncOp();
}
void ReceiveRest(SocketAsyncEventArgs e)
{
e.SocketFlags = SocketFlags.None;
for (int i = 0; i < e.Buffer.Length; i++) e.Buffer[i] = 0;
e.SetBuffer(0, e.Buffer.Length);
var willRaiseEvent = e.AcceptSocket.ReceiveAsync(e);
if (!willRaiseEvent) Received(e);
}
void Process(byte[] bytes) { }
}
1 回答
它减速的原因是因为这些线程中的每一个都需要上下文切换,这是一个相对昂贵的操作 . 您添加的线程越多,CPU的百分比就越多地仅用于上下文切换而不是实际代码 .
你以一种相当奇怪的方式击中了每个客户端的单线程瓶颈 . 服务器端异步的重点是减少线程数 - 每个客户端没有一个线程,但理想情况下,系统中每个逻辑处理器只有一个或两个 .
您发布的异步代码看起来很好,所以我只能猜测您的
Process
方法有一些容易被忽视的非异步,阻塞其中的I / O,即数据库或文件访问 . 当I / O阻塞时,.NET线程池检测到这一点并自动旋转一个新线程 - 这里基本上是失控的,而Process
中的I / O是一个瓶颈 .异步管道确实需要100%异步才能从中获得任何显着的好处 . Half-in将让您编写复杂的代码,其性能与简单的同步代码一样差 .
如果你绝对不能使
Process
方法完全异步,你可能会有一些运气伪造它 . 让一些东西在队列中等待由一个有限大小的小型线程池进行处理 .