我们正在评估Siddhi作为我们应用的嵌入式CEP处理器 . 在进行规模测试时,我们发现随着规则数量的增加,插入事件所需的时间会因每个唯一ID而显着增加 . 例如:
-
创建10条规则(使用窗口和ID分区)
-
一次加载1000个唯一条目 . 跟踪时间 . 请注意,当您接近100K唯一条目时,插入性能会从ms - >很多秒增加 . 您的规则越多,这次也会增加 .
-
现在为每条记录加载"next"时间 - 无论ID如何,插入时间都保持不变 .
这是一个代码文件,它重现了这个:
public class ScaleSiddhiTest {
private SiddhiManager siddhiManager = new SiddhiManager();
@Test
public void testWindow() throws InterruptedException {
String plan = "@Plan:name('MetricPlan') \n" +
"define stream metricStream (id string, timestamp long, metric1 double,metric2 double); \n" +
"partition with (id of metricStream) begin \n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric1) as value, 'Metric1-rule0' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric2) as value, 'Metric2-rule1' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric1) as value, 'Metric1-rule2' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric2) as value, 'Metric2-rule3' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric1) as value, 'Metric1-rule4' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric2) as value, 'Metric2-rule5' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric1) as value, 'Metric1-rule6' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric2) as value, 'Metric2-rule7' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric1) as value, 'Metric1-rule8' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric2) as value, 'Metric2-rule9' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric1) as value, 'Metric1-rule10' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric2) as value, 'Metric2-rule11' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric1) as value, 'Metric1-rule12' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric2) as value, 'Metric2-rule13' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric1) as value, 'Metric1-rule14' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric2) as value, 'Metric2-rule15' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric1) as value, 'Metric1-rule16' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric2) as value, 'Metric2-rule17' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric1) as value, 'Metric1-rule18' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric2) as value, 'Metric2-rule19' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"end ;";
// Generating runtime
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(plan);
AtomicInteger counter = new AtomicInteger();
// Adding callback to retrieve output events from query
executionPlanRuntime.addCallback("outputStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
counter.addAndGet(events.length);
}
});
// Starting event processing
executionPlanRuntime.start();
// Retrieving InputHandler to push events into Siddhi
InputHandler inputHandler = executionPlanRuntime.getInputHandler("metricStream");
int numOfUniqueItems = 10000;
IntStream.range(0, 2).forEach(curMinute->{
long iterationStartTime = System.currentTimeMillis();
AtomicLong lastStart = new AtomicLong(System.currentTimeMillis());
IntStream.range(0, numOfUniqueItems).forEach(id->{
try {
inputHandler.send(TimeUnit.MINUTES.toMillis(curMinute), new Object[]{id, TimeUnit.MINUTES.toMillis(curMinute), 10.0, 20.0});
if( id > 0 && id % 1000 == 0 ){
long ls = lastStart.get();
long curTime = System.currentTimeMillis();
lastStart.set(curTime);
System.out.println("It took " + (curTime - ls) + " ms to load the last 1000 entities. Num Alarms So Far: " + counter.get());
}
} catch (Exception e ){
throw new RuntimeException(e);
}
});
System.out.println("It took " + (System.currentTimeMillis() - iterationStartTime) + "ms to load the last " + numOfUniqueItems);
});
// Shutting down the runtime
executionPlanRuntime.shutdown();
siddhiManager.shutdown();
}
}
这是我的问题:
-
我们在这里做了哪些不正确的事情可能会导致初始负载性能问题?
-
有哪些建议可以解决这个问题?
更新:根据下面的建议答案,我更新了测试以使用group而不是分区 . 每个对象的初始加载都显示相同的增长,除非更糟糕:具体来说,我将规则更改为:
@Plan:name('MetricPlan')
define stream metricStream (id string, timestamp long, metric1 double,metric2 double);
from metricStream#window.externalTime(timestamp, 300000)
select id, avg(metric1) as value, 'Metric1-rule0' as ruleName
group by id
having value>-1.000000
insert into outputStream;
...
以下是Group By vs Partition By的结果输出 . 两者都显示了初始负载的增长 .
Group By Load Results
Load 10K Items - Group By
It took 3098 ms to load the last 1000 entities. Num Alarms So Far: 20020
It took 2507 ms to load the last 1000 entities. Num Alarms So Far: 40020
It took 5993 ms to load the last 1000 entities. Num Alarms So Far: 60020
It took 4878 ms to load the last 1000 entities. Num Alarms So Far: 80020
It took 6079 ms to load the last 1000 entities. Num Alarms So Far: 100020
It took 8466 ms to load the last 1000 entities. Num Alarms So Far: 120020
It took 11840 ms to load the last 1000 entities. Num Alarms So Far: 140020
It took 12634 ms to load the last 1000 entities. Num Alarms So Far: 160020
It took 14779 ms to load the last 1000 entities. Num Alarms So Far: 180020
It took 87053ms to load the last 10000
Load Same 10K Items - Group By
It took 31 ms to load the last 1000 entities. Num Alarms So Far: 220020
It took 22 ms to load the last 1000 entities. Num Alarms So Far: 240020
It took 19 ms to load the last 1000 entities. Num Alarms So Far: 260020
It took 19 ms to load the last 1000 entities. Num Alarms So Far: 280020
It took 17 ms to load the last 1000 entities. Num Alarms So Far: 300020
It took 20 ms to load the last 1000 entities. Num Alarms So Far: 320020
It took 17 ms to load the last 1000 entities. Num Alarms So Far: 340020
It took 18 ms to load the last 1000 entities. Num Alarms So Far: 360020
It took 18 ms to load the last 1000 entities. Num Alarms So Far: 380020
It took 202ms to load the last 10000
Partition By Load Results
Load 10K Items - Partition By
It took 1148 ms to load the last 1000 entities. Num Alarms So Far: 20020
It took 1870 ms to load the last 1000 entities. Num Alarms So Far: 40020
It took 1393 ms to load the last 1000 entities. Num Alarms So Far: 60020
It took 1745 ms to load the last 1000 entities. Num Alarms So Far: 80020
It took 2040 ms to load the last 1000 entities. Num Alarms So Far: 100020
It took 2108 ms to load the last 1000 entities. Num Alarms So Far: 120020
It took 3068 ms to load the last 1000 entities. Num Alarms So Far: 140020
It took 2798 ms to load the last 1000 entities. Num Alarms So Far: 160020
It took 3532 ms to load the last 1000 entities. Num Alarms So Far: 180020
It took 23363ms to load the last 10000
Load Same 10K Items - Partition By
It took 39 ms to load the last 1000 entities. Num Alarms So Far: 220020
It took 21 ms to load the last 1000 entities. Num Alarms So Far: 240020
It took 30 ms to load the last 1000 entities. Num Alarms So Far: 260020
It took 22 ms to load the last 1000 entities. Num Alarms So Far: 280020
It took 35 ms to load the last 1000 entities. Num Alarms So Far: 300020
It took 26 ms to load the last 1000 entities. Num Alarms So Far: 320020
It took 25 ms to load the last 1000 entities. Num Alarms So Far: 340020
It took 34 ms to load the last 1000 entities. Num Alarms So Far: 360020
It took 48 ms to load the last 1000 entities. Num Alarms So Far: 380020
It took 343ms to load the last 10000
这种类型的增长几乎意味着在加载未找到的ID时会将其与其他所有ID进行比较,而不是利用哈希等 . 因此,我们看到的线性增长是唯一ID的数量增加 .
1 回答
是的,行为符合预期 . 当您为每个新ID使用带ID的分区时,您将创建一个新的分区实例,因此如果您的分区很大,则可能需要更多时间来创建分区 . 自从已经为唯一ID创建分区以来,它第二次处理得更快 .
在您的情况下,我不认为使用分区是一个理想的解决方案 . 仅当您有内部流或使用非基于时间的窗口时,分区才有用 .
例如 .
partition with (id of metricStream) begin from metricStream ... insert into #TempStream ; from #TempStream .... select ... insert into outputStream; end;
如果您只想对基于时间的聚合进行分组,请使用
group by
关键字 .from metricStream#window.externalTime(timestamp, 300000) select id, avg(metric1) as value, 'Metric1-rule18' as ruleName group by id having value>-1.000000 insert into outputStream;