首页 文章

Siddhi发送性能问题 - 嵌入式

提问于
浏览
1

我们正在评估Siddhi作为我们应用的嵌入式CEP处理器 . 在进行规模测试时,我们发现随着规则数量的增加,插入事件所需的时间会因每个唯一ID而显着增加 . 例如:

  • 创建10条规则(使用窗口和ID分区)

  • 一次加载1000个唯一条目 . 跟踪时间 . 请注意,当您接近100K唯一条目时,插入性能会从ms - >很多秒增加 . 您的规则越多,这次也会增加 .

  • 现在为每条记录加载"next"时间 - 无论ID如何,插入时间都保持不变 .

这是一个代码文件,它重现了这个:

public class ScaleSiddhiTest {

    private SiddhiManager siddhiManager = new SiddhiManager();

    @Test
    public void testWindow() throws InterruptedException {

        String plan = "@Plan:name('MetricPlan') \n" +
                "define stream metricStream (id string, timestamp long, metric1 double,metric2 double); \n" +
                "partition with (id of metricStream) begin \n" +
                "\n" +
                "from metricStream#window.externalTime(timestamp, 300000) \n" +
                "select id, avg(metric1) as value, 'Metric1-rule0' as ruleName\n" +
                "having value>-1.000000 \n" +
                "insert into outputStream;\n" +
                "\n" +
                "from metricStream#window.externalTime(timestamp, 300000) \n" +
                "select id, avg(metric2) as value, 'Metric2-rule1' as ruleName\n" +
                "having value>-1.000000 \n" +
                "insert into outputStream;\n" +
                "\n" +
                "from metricStream#window.externalTime(timestamp, 300000) \n" +
                "select id, avg(metric1) as value, 'Metric1-rule2' as ruleName\n" +
                "having value>-1.000000 \n" +
                "insert into outputStream;\n" +
                "\n" +
                "from metricStream#window.externalTime(timestamp, 300000) \n" +
                "select id, avg(metric2) as value, 'Metric2-rule3' as ruleName\n" +
                "having value>-1.000000 \n" +
                "insert into outputStream;\n" +
                "\n" +
                "from metricStream#window.externalTime(timestamp, 300000) \n" +
                "select id, avg(metric1) as value, 'Metric1-rule4' as ruleName\n" +
                "having value>-1.000000 \n" +
                "insert into outputStream;\n" +
                "\n" +
                "from metricStream#window.externalTime(timestamp, 300000) \n" +
                "select id, avg(metric2) as value, 'Metric2-rule5' as ruleName\n" +
                "having value>-1.000000 \n" +
                "insert into outputStream;\n" +
                "\n" +
                "from metricStream#window.externalTime(timestamp, 300000) \n" +
                "select id, avg(metric1) as value, 'Metric1-rule6' as ruleName\n" +
                "having value>-1.000000 \n" +
                "insert into outputStream;\n" +
                "\n" +
                "from metricStream#window.externalTime(timestamp, 300000) \n" +
                "select id, avg(metric2) as value, 'Metric2-rule7' as ruleName\n" +
                "having value>-1.000000 \n" +
                "insert into outputStream;\n" +
                "\n" +
                "from metricStream#window.externalTime(timestamp, 300000) \n" +
                "select id, avg(metric1) as value, 'Metric1-rule8' as ruleName\n" +
                "having value>-1.000000 \n" +
                "insert into outputStream;\n" +
                "\n" +
                "from metricStream#window.externalTime(timestamp, 300000) \n" +
                "select id, avg(metric2) as value, 'Metric2-rule9' as ruleName\n" +
                "having value>-1.000000 \n" +
                "insert into outputStream;\n" +
                "\n" +
                "from metricStream#window.externalTime(timestamp, 300000) \n" +
                "select id, avg(metric1) as value, 'Metric1-rule10' as ruleName\n" +
                "having value>-1.000000 \n" +
                "insert into outputStream;\n" +
                "\n" +
                "from metricStream#window.externalTime(timestamp, 300000) \n" +
                "select id, avg(metric2) as value, 'Metric2-rule11' as ruleName\n" +
                "having value>-1.000000 \n" +
                "insert into outputStream;\n" +
                "\n" +
                "from metricStream#window.externalTime(timestamp, 300000) \n" +
                "select id, avg(metric1) as value, 'Metric1-rule12' as ruleName\n" +
                "having value>-1.000000 \n" +
                "insert into outputStream;\n" +
                "\n" +
                "from metricStream#window.externalTime(timestamp, 300000) \n" +
                "select id, avg(metric2) as value, 'Metric2-rule13' as ruleName\n" +
                "having value>-1.000000 \n" +
                "insert into outputStream;\n" +
                "\n" +
                "from metricStream#window.externalTime(timestamp, 300000) \n" +
                "select id, avg(metric1) as value, 'Metric1-rule14' as ruleName\n" +
                "having value>-1.000000 \n" +
                "insert into outputStream;\n" +
                "\n" +
                "from metricStream#window.externalTime(timestamp, 300000) \n" +
                "select id, avg(metric2) as value, 'Metric2-rule15' as ruleName\n" +
                "having value>-1.000000 \n" +
                "insert into outputStream;\n" +
                "\n" +
                "from metricStream#window.externalTime(timestamp, 300000) \n" +
                "select id, avg(metric1) as value, 'Metric1-rule16' as ruleName\n" +
                "having value>-1.000000 \n" +
                "insert into outputStream;\n" +
                "\n" +
                "from metricStream#window.externalTime(timestamp, 300000) \n" +
                "select id, avg(metric2) as value, 'Metric2-rule17' as ruleName\n" +
                "having value>-1.000000 \n" +
                "insert into outputStream;\n" +
                "\n" +
                "from metricStream#window.externalTime(timestamp, 300000) \n" +
                "select id, avg(metric1) as value, 'Metric1-rule18' as ruleName\n" +
                "having value>-1.000000 \n" +
                "insert into outputStream;\n" +
                "\n" +
                "from metricStream#window.externalTime(timestamp, 300000) \n" +
                "select id, avg(metric2) as value, 'Metric2-rule19' as ruleName\n" +
                "having value>-1.000000 \n" +
                "insert into outputStream;\n" +
                "end ;";


        // Generating runtime
        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(plan);

        AtomicInteger counter = new AtomicInteger();

        // Adding callback to retrieve output events from query
        executionPlanRuntime.addCallback("outputStream", new StreamCallback() {
            @Override
            public void receive(Event[] events) {
                counter.addAndGet(events.length);
            }
        });

        // Starting event processing
        executionPlanRuntime.start();

        // Retrieving InputHandler to push events into Siddhi
        InputHandler inputHandler = executionPlanRuntime.getInputHandler("metricStream");

        int numOfUniqueItems = 10000;

        IntStream.range(0, 2).forEach(curMinute->{
            long iterationStartTime =  System.currentTimeMillis();
            AtomicLong lastStart = new AtomicLong(System.currentTimeMillis());
            IntStream.range(0, numOfUniqueItems).forEach(id->{
                try {
                    inputHandler.send(TimeUnit.MINUTES.toMillis(curMinute), new Object[]{id, TimeUnit.MINUTES.toMillis(curMinute), 10.0, 20.0});
                    if( id > 0 && id % 1000 == 0 ){
                        long ls = lastStart.get();
                        long curTime = System.currentTimeMillis();
                        lastStart.set(curTime);
                        System.out.println("It took " + (curTime - ls) + " ms to load the last 1000 entities. Num Alarms So Far: " + counter.get());
                    }
                } catch (Exception e ){
                    throw new RuntimeException(e);
                }
            });
            System.out.println("It took " + (System.currentTimeMillis() - iterationStartTime) + "ms to load the last " + numOfUniqueItems);
        });

        // Shutting down the runtime
        executionPlanRuntime.shutdown();

        siddhiManager.shutdown();
    }

}

这是我的问题:

  • 我们在这里做了哪些不正确的事情可能会导致初始负载性能问题?

  • 有哪些建议可以解决这个问题?

更新:根据下面的建议答案,我更新了测试以使用group而不是分区 . 每个对象的初始加载都显示相同的增长,除非更糟糕:具体来说,我将规则更改为:

@Plan:name('MetricPlan') 
define stream metricStream (id string, timestamp long, metric1 double,metric2 double); 

from metricStream#window.externalTime(timestamp, 300000) 
select id, avg(metric1) as value, 'Metric1-rule0' as ruleName
group by id 
having value>-1.000000 
insert into outputStream;

...

以下是Group By vs Partition By的结果输出 . 两者都显示了初始负载的增长 .

Group By Load Results

Load 10K Items - Group By        
It took 3098 ms to load the last 1000 entities. Num Alarms So Far: 20020        
It took 2507 ms to load the last 1000 entities. Num Alarms So Far: 40020        
It took 5993 ms to load the last 1000 entities. Num Alarms So Far: 60020        
It took 4878 ms to load the last 1000 entities. Num Alarms So Far: 80020        
It took 6079 ms to load the last 1000 entities. Num Alarms So Far: 100020        
It took 8466 ms to load the last 1000 entities. Num Alarms So Far: 120020        
It took 11840 ms to load the last 1000 entities. Num Alarms So Far: 140020        
It took 12634 ms to load the last 1000 entities. Num Alarms So Far: 160020        
It took 14779 ms to load the last 1000 entities. Num Alarms So Far: 180020        
It took 87053ms to load the last 10000        

Load Same 10K Items - Group By        
It took 31 ms to load the last 1000 entities. Num Alarms So Far: 220020        
It took 22 ms to load the last 1000 entities. Num Alarms So Far: 240020        
It took 19 ms to load the last 1000 entities. Num Alarms So Far: 260020        
It took 19 ms to load the last 1000 entities. Num Alarms So Far: 280020        
It took 17 ms to load the last 1000 entities. Num Alarms So Far: 300020        
It took 20 ms to load the last 1000 entities. Num Alarms So Far: 320020        
It took 17 ms to load the last 1000 entities. Num Alarms So Far: 340020        
It took 18 ms to load the last 1000 entities. Num Alarms So Far: 360020        
It took 18 ms to load the last 1000 entities. Num Alarms So Far: 380020        
It took 202ms to load the last 10000

Partition By Load Results

Load 10K Items - Partition By        
It took 1148 ms to load the last 1000 entities. Num Alarms So Far: 20020        
It took 1870 ms to load the last 1000 entities. Num Alarms So Far: 40020        
It took 1393 ms to load the last 1000 entities. Num Alarms So Far: 60020        
It took 1745 ms to load the last 1000 entities. Num Alarms So Far: 80020        
It took 2040 ms to load the last 1000 entities. Num Alarms So Far: 100020        
It took 2108 ms to load the last 1000 entities. Num Alarms So Far: 120020        
It took 3068 ms to load the last 1000 entities. Num Alarms So Far: 140020        
It took 2798 ms to load the last 1000 entities. Num Alarms So Far: 160020        
It took 3532 ms to load the last 1000 entities. Num Alarms So Far: 180020        
It took 23363ms to load the last 10000        

Load Same 10K Items - Partition By        
It took 39 ms to load the last 1000 entities. Num Alarms So Far: 220020        
It took 21 ms to load the last 1000 entities. Num Alarms So Far: 240020        
It took 30 ms to load the last 1000 entities. Num Alarms So Far: 260020        
It took 22 ms to load the last 1000 entities. Num Alarms So Far: 280020        
It took 35 ms to load the last 1000 entities. Num Alarms So Far: 300020        
It took 26 ms to load the last 1000 entities. Num Alarms So Far: 320020        
It took 25 ms to load the last 1000 entities. Num Alarms So Far: 340020        
It took 34 ms to load the last 1000 entities. Num Alarms So Far: 360020        
It took 48 ms to load the last 1000 entities. Num Alarms So Far: 380020        
It took 343ms to load the last 10000

这种类型的增长几乎意味着在加载未找到的ID时会将其与其他所有ID进行比较,而不是利用哈希等 . 因此,我们看到的线性增长是唯一ID的数量增加 .

1 回答

  • 0

    是的,行为符合预期 . 当您为每个新ID使用带ID的分区时,您将创建一个新的分区实例,因此如果您的分区很大,则可能需要更多时间来创建分区 . 自从已经为唯一ID创建分区以来,它第二次处理得更快 .

    在您的情况下,我不认为使用分区是一个理想的解决方案 . 仅当您有内部流或使用非基于时间的窗口时,分区才有用 .

    例如 .

    partition with (id of metricStream) begin from metricStream ... insert into #TempStream ; from #TempStream .... select ... insert into outputStream; end;

    如果您只想对基于时间的聚合进行分组,请使用 group by 关键字 .

    from metricStream#window.externalTime(timestamp, 300000) select id, avg(metric1) as value, 'Metric1-rule18' as ruleName group by id having value>-1.000000 insert into outputStream;

相关问题