首页 文章

Java中的UDP DatagramSocket线程的高CPU使用率

提问于
浏览
0

我正在运行一个多线程java服务器应用程序,其中包括在3个不同的线程上从3个不同的组播源(端口)接收UDP数据包 .

它运行在最近的双插槽redhat机箱上(总共8个核心(4 x 2 cpu),没有超线程) .

“top”命令显示cpu使用率为250~300% . shift-H显示2个线程,使用率约为99%,1个为70% . 快速线程jstack分析显示这些线程对应于我的UDP处理线程 .

考虑到CPU速度与UDP消息速率(大约300 msg /秒,大约250字节的有效负载),我对CPU使用水平感到有些惊讶,我正在研究这个问题 . 有趣的是,第三个线程(对应较低的CPU使用率)具有较低的数据速率(50~100 msg / s)

我已经包含了一些调试代码来测量花费大部分时间的地方,它似乎是在DatagramSocket的“receive()”方法中:

_running    = true;
_buf        = new byte[300];
_packet     = new DatagramPacket(_buf, _buf.length);

while(_running) {
    try {
        long t0 = System.nanoTime();
        _inSocket.receive(_packet);
        long t1 = System.nanoTime();
        this.handle(_packet);
        long t2 = System.nanoTime();
        long waitingAndReceiveTime = t1-t0;
        long handleTime = t2-t1;
        _logger.info("{} : {} : update : {} : {}", t1, _port, waitingAndReceiveTime, handleTime);
    }
    catch(Exception e) {
        _logger.error("Exception while receiving multicast packet", e);
    }
}

handleTime平均值为4000ns,速度非常快,无法负责CPU的使用 . waitingAndReceiveTime要高得多,从大约30,000ns到几ms . 我知道该方法是阻塞的,所以时间包括时间阻塞和接收时间 .

我有几个问题:

  • 我怀疑某事奇怪吗?

  • 我在想"receive()"是阻塞的,它不应该"waste" CPU周期,所以等待部分不应该对高CPU使用负责,对吧?

  • 有没有办法分割时间阻塞的测量值,以及接收方法中接收数据报的时间?

  • 什么可能导致这种高CPU使用率?

EDIT :我使用了中断合并参数,将rx-usecs置于0,将rx-frames置于10.我现在可以看到以下内容:

  • UPD消息确实出现在10个组中:对于每个组,第一个消息具有LONG waitingAndReceiveTime(> = 1ms),并且以下9个waitingAndReceiveTime要短得多(~2000ns) . (handleTime是一样的)

  • CPU使用率降低!对于2个第一线程,下降到大约55% .

仍然不知道如何解决这个问题

1 回答

  • 0

    不是真的回答但是:

    有一件事我可以向你保证,这不是Java代码 . 我在Python中做了一个多线程的UDP服务器,它做了同样的事情,CPU使用率在3到4秒内跳到100% . 我猜它确实与UDP本身有关,因为我还制作了一个多线程TCP服务器,它几乎没有达到CPU使用率的10% .

    这是代码:

    import socket
    from _thread import*
    import threading
    import time
    def threaded(s,serverIP,serverPort):
        while True:
            try:
                d = s.recvfrom(128)
                data = d[0]
                addr = d[1]
                message= str(data)
                if (message== "b'1'"):
                    time.sleep(5)
                s.sendto(str.encode(message) , addr)
                print(message)
            except:
                break
        s.close()
    
    def Main():
        serverPort = 11000
        serverIP= "127.0.0.1"
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.bind((serverIP, serverPort))
    
        while True:
            start_new_thread(threaded, (s,serverIP,serverPort))
        s.close)
    
    if __name__ == '__main__':
        Main()
    

    注意:

    如果你找到了答案,请告诉我 . 祝好运 .

相关问题