我使用JMX作为度量报告器来获取Flink指标,但有没有办法将其作为终端输出?
我想为每个运营商绘制 numRecordsInPerSecond
进行性能分析,我该怎么办?
我已经看到了累加器的一些例子,但它没有给我正确的见解我如何进行Flink的性能分析 . 我会在这里举个例子
这是我的Flink程序的执行计划,我有多个问题,但我想问基本问题
-
如何测量每个运算符的延迟,然后将其相加以计算复杂事件的总延迟 .
-
如何衡量输出吞吐量?目前,我已经在select函数中编写了一些代码,它们计算了所见的复杂事件数和Flink引擎启动时间 . 这是最好的方法吗?
但基本问题仍然存在,即如何通过代码在终端输出中显示Flink metrics中提到的系统指标的输出,因为我想绘制图表以获得性能,而JMX的问题在于它显示了我需求的指标从某种意义上说,我在JMX控制台中单击该特定指标时会看到这些值,这不适合分析系统 .
P.S - 我发现在StackOverflow上有一个question用于计算吞吐量和延迟,答案是这样的
private static class MyMapper extends RichMapFunction<String, Object> {
private transient Meter meter;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
}
@Override
public Object map(String value) throws Exception {
this.meter.markEvent();
return value;
}
}
我在我的代码中添加了上面的类,但是没有看到任何输出,我也想知道这段代码将如何显示吞吐量或延迟,因为我们没有提到我们想要找到延迟的运算符?例如,我想在执行计划中间而不是在计划结束时找到运营商的吞吐量,上面的代码是否会为我做这些?
1 回答
对于Flink仪表板上列出的每个组件,您已经拥有所有延迟和每秒输入/输出数量,因此无需为每个组件计算每秒输入/输出的额外自定义计数器或度量标准 .
如果你想要实现自己的计数器/仪表,那么你需要这个代码,你必须将它映射到你所针对的任何组件 .