我正在使用kafka和flink . 在一个简单的程序中,我使用了flinks FlinkKafkaConsumer09,为它分配了组ID .
根据Kafka的行为,当我在相同的主题上运行2个消费者时,它应该像消息队列一样工作 . 我认为这应该是这样的:如果发送给Kafka的2条消息,每个或一个flink程序将完全处理2条消息两次(比如总共2行输出) .
但实际结果是,每个程序将收到2条消息 .
我曾尝试使用kafka服务器下载附带的消费者客户端 . 它以记录的方式工作(处理了2条消息) .
我尝试在flink程序的相同主要功能中使用2个kafka消费者 . 完全处理了4条消息 .
我还尝试运行2个flink实例,并为每个实例分配了kafka使用者的相同程序 . 4条消息 .
有任何想法吗?这是我期望的输出:
1> Kafka and Flink2 says: element-65
2> Kafka and Flink1 says: element-66
这是我总是得到的错误输出:
1> Kafka and Flink2 says: element-65
1> Kafka and Flink1 says: element-65
2> Kafka and Flink2 says: element-66
2> Kafka and Flink1 says: element-66
以下是代码段:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
messageStream.rebalance().map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return "Kafka and Flink1 says: " + value;
}
}).print();
env.execute();
}
我试图运行它两次以及另一种方式:为Main函数中的每一个创建2个数据流和env.execute() .
2 回答
今天在Flink用户邮件列表上有一个非常相似的问题,但我找不到在此发布的链接 . 所以这里是答案的一部分:
也许这可以为你澄清一些事情 .
此外,还有一篇关于与Flink和Kafka合作的博客文章可能对您有所帮助(https://data-artisans.com/blog/kafka-flink-a-practical-how-to) .
因为没有太多使用flink kafka消费者的group.id而不是对zookeeper进行抵消 . 就flink kafka消费者而言,是否存在任何偏移监控方式 . 我可以看到有一种方法[在消费者群体/消费者偏移检查器的帮助下]用于控制台消费者,但不能用于flink kafka消费者 .
我们想看看我们的flink kafka消费者是如何落后/滞后于kafka主题大小[在给定时间点主题中的消息总数],可以将它放在分区级别 .