首页 文章

OSGi PushStream很慢

提问于
浏览
3

在尝试OSGi PushStream库时,我觉得它真的很慢 . 我创建了两个使用PushStream执行相同操作的方法,另一个使用简单的BlockingQueue(请参阅下面的代码),结果如下:

Queue needs 3 milliseconds to process 1000 events.
PushStream needs 31331 milliseconds to process 1000 events.

为什么PushStream会变慢?我做错了什么?

代码

使用PushStream:

public class TestPush{

    @Test
    public void testPushStream() throws Exception {
        final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());

        final PushStreamProvider psp = new PushStreamProvider();
        final SimplePushEventSource<Integer> source =
              psp.buildSimpleEventSource(Integer.class).withQueuePolicy(QueuePolicyOption.BLOCK).build();

        final Deferred<Instant> startD = pf.deferred();
        final Deferred<Instant> endD = pf.deferred();

        psp.createStream(source).onClose(() -> endD.resolve( Instant.now()) ).forEach((i) -> {
            if (i == 0) {
                startD.resolve( Instant.now() );
            }
        });

        final Promise<Long> nbEvent = psp.createStream(source).count();

        for (int i = 0; i < 1000; i++) {
            source.publish(i);
        }
        source.endOfStream();

        System.out.println("PushStream needs "
        + Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
        + " milliseconds to process " + nbEvent.getValue() + " events.");
    }

使用ArrayBlockingQueue:

@Test
    public void testBlockingQueue() throws Exception {
        final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());

        final Executor e = Executors.newFixedThreadPool(1);
        final ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<>(32);

        final Deferred<Instant> startD = pf.deferred();
        final Deferred<Instant> endD = pf.deferred();
        final Deferred<Integer> nbEvent = pf.deferred();

        e.execute( () -> {
            try {
                Integer i = 0;
                Integer last = 0;
                do {
                    i = abq.take();

                    if (i == 0) {
                        startD.resolve(Instant.now());
                    } else if (i != -1) {
                        last = i;
                    }
                }
                while (i != -1);
                endD.resolve(Instant.now());
                nbEvent.resolve(last + 1);
            }
            catch (final InterruptedException exception) {
                exception.printStackTrace();
            }
        });

        for (int i = 0; i < 1000; i++) {
            abq.put(i);
        }
        abq.put(-1);

        System.out.println("Queue needs "
        + Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
        + " milliseconds to process " + nbEvent.getPromise().getValue() + " events.");
    }
}

1 回答

  • 10

    这是一个有趣的问题:)

    为什么PushStream会变慢?我做错了什么?

    感谢您不仅仅假设PushStream实现很糟糕 . 在这种情况下,它更慢,因为(可能没有意识到)你问它!

    第1部分 - 缓冲

    默认情况下,PushStream是缓冲的 . 这意味着它们包含一个队列,在处理事件之前将事件放入其中 . 因此,缓冲会做一些对吞吐速度产生负面影响的事情 .

    • 它在管道中添加了一个额外的队列/出队步骤

    • 它在事件处理中添加了一个额外的线程切换

    • 缓冲区的默认策略是返回与缓冲区已满的相关的背压 .

    在这种情况下,绝大多数减速是由于背压 . 当您使用 psp.createStream(source) 创建流时,它将设置一个32个元素的缓冲区和一个基于缓冲区大小的线性背压策略,当满的时候返回一秒,当它有一个项目时返回31毫秒 . 值得注意的是,每个元素31毫升可以增加30秒!

    重要的是,SimplePushEventSource始终承认来自添加到其中的使用者的压力请求 . 这意味着您可以尽可能快地将事件抽入SimplePushEventSource,但它们只会按照管道请求的速度提供 .

    如果我们从您正在创建的推送流中删除缓冲,那么我们将进行以下测试:

    @Test
    public void testPushStream2() throws Exception {
        final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
    
        final PushStreamProvider psp = new PushStreamProvider();
        final SimplePushEventSource<Integer> source =
              psp.buildSimpleEventSource(Integer.class)
              .withQueuePolicy(QueuePolicyOption.BLOCK)
              .build();
    
        final Deferred<Instant> startD = pf.deferred();
        final Deferred<Instant> endD = pf.deferred();
    
        psp.buildStream(source).unbuffered().build().onClose(() -> endD.resolve( Instant.now()) ).forEach((i) -> {
            if (i == 0) {
                startD.resolve( Instant.now() );
            }
        });
    
        final Promise<Long> nbEvent = psp.buildStream(source).unbuffered().build().count();
    
        for (int i = 0; i < 1000; i++) {
            source.publish(i);
        }
        source.endOfStream();
    
        System.out.println("PushStream needs "
        + Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
        + " milliseconds to process " + nbEvent.getValue() + " events.");
    }
    

    运行它(在我的机器上)的结果是:

    PushStream needs 39 milliseconds to process 1000 events.
    

    这显然更接近你的预期,但它仍然明显变慢 . 请注意,我们可能仍然有一些缓冲,但调整了PushbackPolicy . 这会给我们带来更快的吞吐量,但不会像这样快 .

    第2部分 - 管道长度

    接下来要注意的是你正在使用 onClose() 处理程序 . 这为您的推送流管道添加了一个额外的阶段 . 您实际上可以将onClose移动为promise的结果,减少管道的长度(您只需要运行一次) .

    @Test
    public void testPushStream3() throws Exception {
        final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
    
        final PushStreamProvider psp = new PushStreamProvider();
        final SimplePushEventSource<Integer> source =
                psp.buildSimpleEventSource(Integer.class)
                .withQueuePolicy(QueuePolicyOption.BLOCK)
                .build();
    
        final Deferred<Instant> startD = pf.deferred();
        final Deferred<Instant> endD = pf.deferred();
    
        psp.buildStream(source).unbuffered().build().forEach((i) -> {
            if (i == 0) {
                startD.resolve( Instant.now() );
            }
        });
    
        final Promise<Long> nbEvent = psp.buildStream(source).unbuffered().build().count()
                .onResolve(() -> endD.resolve( Instant.now()));
    
        for (int i = 0; i < 1000; i++) {
            source.publish(i);
        }
        source.endOfStream();
    
        System.out.println("PushStream needs "
                + Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
                + " milliseconds to process " + nbEvent.getValue() + " events.");
    }
    

    这个版本的结果(在我的机器上)是:

    PushStream needs 21 milliseconds to process 1000 events.
    

    第3部分 - 多路复用传送

    "raw array blocking queue"示例和PushStream示例之间的关键区别在于您实际创建了两个PushStream . 第一个用于捕获开始时间,第二个用于计算事件 . 这会强制SimplePushEventSource在多个使用者之间复用事件 .

    如果我们将行为折叠到单个管道中以便SimplePushEventSource可以使用快速路径传递,该怎么办?

    @Test
    public void testPushStream4() throws Exception {
        final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
    
        final PushStreamProvider psp = new PushStreamProvider();
        final SimplePushEventSource<Integer> source =
                psp.buildSimpleEventSource(Integer.class)
                .withQueuePolicy(QueuePolicyOption.BLOCK)
                .build();
    
        final Deferred<Instant> startD = pf.deferred();
        final Deferred<Instant> endD = pf.deferred();
    
        final Promise<Long> nbEvent = psp.buildStream(source).unbuffered().build()
                .filter(i -> {
                    if (i == 0) {
                        startD.resolve( Instant.now() );
                    }
                    return true;
                })
                .count()
                .onResolve(() -> endD.resolve( Instant.now()));
    
        for (int i = 0; i < 1000; i++) {
            source.publish(i);
        }
        source.endOfStream();
    
        System.out.println("PushStream needs "
                + Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
                + " milliseconds to process " + nbEvent.getValue() + " events.");
    }
    

    这个版本的结果(在我的机器上)是:

    PushStream needs 3 milliseconds to process 1000 events.
    

    摘要

    PushStreams是一种快速有效的消费异步到达事件的方法,但了解哪种缓冲行为适合您的应用程序非常重要 . 如果您想要快速迭代一大堆数据,那么您需要小心设置,因为缓冲默认值是针对不同的用例而设计的!

相关问题