首页 文章

Linux上的Java BlockingQueue延迟很高

提问于
浏览
25

我正在使用BlockingQueue:s(尝试使用ArrayBlockingQueue和LinkedBlockingQueue)在我正在处理的应用程序中的不同线程之间传递对象 . 性能和延迟在这个应用程序中相对重要,所以我很好奇使用BlockingQueue在两个线程之间传递对象需要多长时间 . 为了衡量这一点,我写了一个简单的程序,有两个线程(一个消费者和一个 生产环境 者),我让 生产环境 者将时间戳(使用System.nanoTime())传递给消费者,参见下面的代码 .

我记得在某个论坛上的某个地方读过,对于试过这个的人来说花了大约10微秒(不知道操作系统和硬件是什么),所以当我花了大约30微秒时,我并不感到惊讶Windows 7机箱(英特尔E7500核心2双核CPU,2.93GHz),同时在后台运行许多其他应用程序 . 然而,当我在速度更快的Linux服务器(两个Intel X5677 3.46GHz四核CPU,运行Debian 5和内核2.6.26-2-amd64)上进行相同的测试时,我感到非常惊讶 . 我预计延迟会低于我的Windows框,但相反它会高得多 - 约75 - 100微秒!两个测试都是使用Sun的Hotspot JVM版本1.6.0-23完成的 .

有没有其他人在Linux上做过类似的测试?或者有人知道为什么Linux上的速度会慢得多(硬件更好),与Windows相比,Linux上的线程切换是否会慢得多?如果是这种情况,看起来Windows实际上更适合某种应用程序 . 任何帮助我理解相对较高的数字的帮助都非常感谢 .

Edit:
在DaveC的评论之后,我还做了一个测试,我将JVM(在Linux机器上)限制为单个核心(即在同一核心上运行的所有线程) . 这大大改变了结果 - 延迟降至20微秒以下,即比Windows机器上的结果更好 . 我还做了一些测试,我将 生产环境 者线程限制为一个核心,将消费者线程限制为另一个核心(尝试将它们放在同一个套接字和不同的套接字上),但这似乎没有帮助 - 延迟仍然是〜75微秒 . 顺便说一句,这个测试应用程序几乎就是我在执行测试时在机器上运行的所有应用程序 .

有谁知道这些结果是否有意义?如果 生产环境 者和消费者在不同的核心上运行,它真的应该慢得多吗?任何输入都非常感谢 .

Edited again (6 January):
我尝试了对代码和运行环境的不同更改:

  • 我将Linux内核升级到2.6.36.2(从2.6.26.2开始) . 内核升级后,测量时间变为60微秒,变化非常小,从升级前的75-100开始 . 为 生产环境 者和消费者线程设置CPU关联性没有任何影响,除非将它们限制在同一个核心 . 在同一核心上运行时,测得的延迟为13微秒 .

  • 在原始代码中,我让 生产环境 者在每次迭代之间进入休眠1秒钟,以便给消费者足够的时间来计算经过的时间并将其打印到控制台 . 如果我删除对Thread.sleep()的调用,而是让 生产环境 者和消费者在每次迭代中调用barrier.await()(消费者在将经过的时间打印到控制台后调用它),则测量的延迟从60微秒至10微秒以下 . 如果在同一核心上运行线程,则延迟低于1微秒 . 任何人都可以解释为什么这会显着减少延迟?我的第一个猜测是,这个改变产生了生成器在消费者调用queue.take()之前调用queue.put()的效果,所以消费者永远不必阻止,但在玩了一个修改版本的ArrayBlockingQueue后,我发现这个猜测是假的 - 消费者确实阻止了 . 如果您有其他猜测,请告诉我 . (顺便说一句,如果我让 生产环境 者同时调用Thread.sleep()和barrier.await(),则延迟保持在60微秒) .

  • 我也尝试了另一种方法 - 而不是调用queue.take(),我调用了queue.poll(),超时为100微秒 . 这会将平均延迟降低到10微秒以下,但当然会占用更多的CPU(但繁忙等待的CPU密集程度可能更低) .

Edited again (10 January) - Problem solved:
ninjalj建议延迟~60微秒是因为CPU不得不从更深的睡眠状态中醒来 - 而且他是完全正确的!在BIOS中禁用C状态后,延迟减少到<10微秒 . 这就解释了为什么我在上面的第2点获得了更好的延迟 - 当我更频繁地发送对象时,CPU保持足够忙,不能进入更深的睡眠状态 . 非常感谢所有花时间阅读我的问题并在此分享您的想法的人!

...

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CyclicBarrier;

public class QueueTest {

    ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(10);
    Thread consumerThread;
    CyclicBarrier barrier = new CyclicBarrier(2);
    static final int RUNS = 500000;
    volatile int sleep = 1000;

    public void start() {
        consumerThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    barrier.await();
                    for(int i = 0; i < RUNS; i++) {
                        consume();

                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } 
            }
        });
        consumerThread.start();

        try {
            barrier.await();
        } catch (Exception e) { e.printStackTrace(); }

        for(int i = 0; i < RUNS; i++) {
            try {
                if(sleep > 0)
                    Thread.sleep(sleep);
                produce();

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public void produce() {
        try {
            queue.put(System.nanoTime());
        } catch (InterruptedException e) {
        }
    }

    public void consume() {
        try {
            long t = queue.take();
            long now = System.nanoTime();
            long time = (now - t) / 1000; // Divide by 1000 to get result in microseconds
            if(sleep > 0) {
                System.out.println("Time: " + time);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        QueueTest test = new QueueTest();
        System.out.println("Starting...");
        // Run first once, ignoring results
        test.sleep = 0;
        test.start();
        // Run again, printing the results
        System.out.println("Starting again...");
        test.sleep = 1000;
        test.start();
    }
}

4 回答

  • 3

    如果延迟是关键的并且您不需要严格的FIFO语义,那么您可能需要考虑JSR-166的LinkedTransferQueue . 它可以消除,以便相反的操作可以交换值而不是在队列数据结构上进行同步 . 这种方法有助于减少争用,实现并行交换,并避免线程休眠/唤醒惩罚 .

  • 6

    您的测试不是衡量队列切换延迟的一个好方法,因为您有一个线程读取队列,该队列在再次执行之前同步写入 System.out (执行字符串和长连接) . 要正确测量这一点,您需要将此活动移出此线程,并在获取线程中尽可能少地工作 .

    你最好只在接受者中进行计算(然后现在)并将结果添加到其他集合中,该集合由另一个输出结果的线程定期排干 . 我倾向于通过添加到通过AtomicReference访问的适当规定的数组支持结构来实现这一点(因此报告线程只需要在该引用上使用该存储结构的另一个实例的getAndSet来获取最新批次的结果;例如make 2列表,将一个设置为活动,每个xsa线程唤醒并交换主动和被动的线程) . 然后,您可以报告某些分布而不是每个结果(例如,十分位数范围),这意味着您不会在每次运行时生成大量日志文件并获取为您打印的有用信息 .

    FWIW我同意Peter Lawrey所说的时间,如果延迟真的很关键,那么你需要考虑忙于等待适当的cpu亲和力(即将核心专用于该线程)

    EDIT after Jan 6

    如果我删除对Thread.sleep()的调用,而是让 生产环境 者和消费者在每次迭代中调用barrier.await()(消费者在将经过的时间打印到控制台后调用它),则测量的延迟会减少从60微秒到10微秒以下 . 如果在同一核心上运行线程,则延迟低于1微秒 . 任何人都可以解释为什么这会显着减少延迟?

    您正在查看 java.util.concurrent.locks.LockSupport#park (和相应的 unpark )和 Thread#sleep 之间的区别 . 大多数j.u.c. stuff Build 在 LockSupport 上(通常通过_235960_提供或直接提供),这个(在Hotspot中)解析为 sun.misc.Unsafe#park (和 unpark ),这往往最终落在pthread(posix线程)lib的手中 . 通常 pthread_cond_broadcast 唤醒, pthread_cond_waitpthread_cond_timedwaitBlockingQueue#take 之类的东西 .

    我可以't say I'曾经看过 Thread#sleep 实际上是如何实现的(但是我会想到它会导致它被调度程序以比pthread信令机制更激进的方式降级,这就是什么帐户对于延迟差异 .

  • 0

    如果可以,我会使用一个ArrayBlockingQueue . 当我使用它时,Linux上的延迟在8-18微秒之间 . 有些注意事项 .

    • 成本主要是唤醒线程所需的时间 . 当你唤醒一个线程时,它的数据/代码不会在缓存中,所以你会发现,如果你计算线程唤醒后发生的事情的时间比你重复运行相同的事情要长2-5倍 .

    • 某些操作使用OS调用(例如锁定/循环障碍),这些操作在低延迟情况下通常比繁忙等待更昂贵 . 我建议你努力等待你的制作人而不是使用CyclicBarrier . 您也可以忙着等待您的消费者,但这在真实系统上可能会非常昂贵 .

  • 1

    @Peter Lawrey

    某些操作使用OS调用(例如锁定/循环障碍)

    那些不是OS(内核)调用 . 通过简单的CAS实现(在x86上也带有免费的内存栅栏)

    还有一个:除非你知道为什么(你使用它),否则不要使用ArrayBlockingQueue .

    @OP:看看ThreadPoolExecutor,它提供了出色的 生产环境 者/消费者框架 .

    编辑如下:

    要减少延迟(限制繁忙等待),将队列更改为SynchronousQueue,在启动使用者之前添加以下内容

    ...
    consumerThread.setPriority(Thread.MAX_PRIORITY);
    consumerThread.start();
    

    这是你能得到的最好的 .


    Edit2:这里有同步 . 队列 . 而不是打印结果 .

    package t1;
    
    import java.math.BigDecimal;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.SynchronousQueue;
    
    public class QueueTest {
    
        static final int RUNS = 250000;
    
        final SynchronousQueue<Long> queue = new SynchronousQueue<Long>();
    
        int sleep = 1000;
    
        long[] results  = new long[0];
        public void start(final int runs) throws Exception {
            results = new long[runs];
            final CountDownLatch barrier = new CountDownLatch(1);
            Thread consumerThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    barrier.countDown();
                    try {
    
                        for(int i = 0; i < runs; i++) {                        
                            results[i] = consume(); 
    
                        }
                    } catch (Exception e) {
                        return;
                    } 
                }
            });
            consumerThread.setPriority(Thread.MAX_PRIORITY);
            consumerThread.start();
    
    
            barrier.await();
            final long sleep = this.sleep;
            for(int i = 0; i < runs; i++) {
                try {                
                    doProduce(sleep);
    
                } catch (Exception e) {
                    return;
                }
            }
        }
    
        private void doProduce(final long sleep) throws InterruptedException {
            produce();
        }
    
        public void produce() throws InterruptedException {
            queue.put(new Long(System.nanoTime()));//new Long() is faster than value of
        }
    
        public long consume() throws InterruptedException {
            long t = queue.take();
            long now = System.nanoTime();
            return now-t;
        }
    
        public static void main(String[] args) throws Throwable {           
            QueueTest test = new QueueTest();
            System.out.println("Starting + warming up...");
            // Run first once, ignoring results
            test.sleep = 0;
            test.start(15000);//10k is the normal warm-up for -server hotspot
            // Run again, printing the results
            System.gc();
            System.out.println("Starting again...");
            test.sleep = 1000;//ignored now
            Thread.yield();
            test.start(RUNS);
            long sum = 0;
            for (long elapsed: test.results){
                sum+=elapsed;
            }
            BigDecimal elapsed = BigDecimal.valueOf(sum, 3).divide(BigDecimal.valueOf(test.results.length), BigDecimal.ROUND_HALF_UP);        
            System.out.printf("Avg: %1.3f micros%n", elapsed); 
        }
    }
    

相关问题