我有一个从SQL数据库到Elasticsearch的管道,如下所示:
-
使用logstash-input-jdbc从SQL数据库输入
-
各种事件的各种过滤和变异
-
使用logstash-filter-aggregate根据group_id属性聚合事件
-
使用logstash-output-elasticsearch将聚合事件输出到Elasticsearch
实际上,这条管道的吞吐量非常低 . 我知道这是由于聚合步骤(执行一些相对繁重的处理),我想使用几个线程/进程来提高性能(允许我使用多个核心) .
但是,logstash-filter-aggregate插件不支持多个过滤器工作程序 - 可能是因为它无法保证应该组合到一个聚合事件中的事件将由同一个工作程序处理 .
我目前的解决方案是运行几个logstash实例,其中每个实例从SQL数据库中选择某个group_id子集 . 但是,这有很多开销 . 有没有更好的方法来使用logstash-filter-aggregate的多个核心?
1 回答
那里你有点洞 .
Aggregate
是需要序列化事件流的过滤器之一,因为它将需要的状态视为一个或多个事件 . 每当您需要序列化时,您的吞吐量将仅限于单个核心,以确保过滤器工作人员能够看到所需的所有事件 . 与数据库一样,解决此问题的方法是对数据集进行分片 . 你已经发现了 .实际上,分片是解决此问题的最佳单阶段logstash解决方案 .
如果你想去那里,有一个多阶段的解决方案 . 那是 Build 第二个管道 . 它会像这样工作:
第一个管道接收事件并将相关事件标记为相关事件,但不执行聚合 .
第一个管道输出到
elasticsearch
,就像你一样 .第二个管道使用
elasticsearch
输入来查询看起来像是未聚合事件的事件 .聚合层聚合完整的事件(包含所有部分)
聚合事件被刷新到Elasticsearch .
或者,您可以使用非logstash方法在ElasticSearch中执行聚合 .