首页 文章

Kafka - > Flink DataStream - > MongoDB

提问于
浏览
8

我想设置Flink,以便将数据流从Apache Kafka转换并重定向到MongoDB . 出于测试目的,我 Build 在flink-streaming-connectors.kafka示例(https://github.com/apache/flink)之上 .

Faf正在为Kafka流提供正确的红色,我可以映射它们等等,但是当我想将每个收到的和转换后的消息保存到MongoDB时会出现问题 . 我发现的关于MongoDB集成的唯一例子是来自github的flink-mongodb-test . 不幸的是,它使用静态数据源(数据库),而不是数据流 .

我相信MongoDB应该有一些DataStream.addSink实现,但显然没有 .

实现它的最佳方法是什么?我是否需要编写自定义接收器功能或者我可能缺少某些东西?也许它应该以不同的方式完成?

我没有任何解决方案,所以任何建议将不胜感激 .

下面是一个例子,我正在获得什么作为输入以及我需要存储为输出 .

Apache Kafka Broker <-------------- "AAABBBCCCDDD" (String)
Apache Kafka Broker --------------> Flink: DataStream<String>

Flink: DataStream.map({
    return ("AAABBBCCCDDD").convertTo("A: AAA; B: BBB; C: CCC; D: DDD")
})
.rebalance()
.addSink(MongoDBSinkFunction); // store the row in MongoDB collection

正如您在本示例中所看到的,我主要使用Flink进行Kafka的消息流缓冲和一些基本的解析 .

2 回答

  • 3

    Flink目前没有Streaming MongoDB接收器 .

    但是,有两种方法可以将数据写入MongoDB:

    • 使用Flink的 DataStream.write() 调用 . 它允许您使用任何OutputFormat(来自Batch API)和流 . 使用Flink的HadoopOutputFormatWrapper,您可以使用官方的MongoDB Hadoop连接器

    • 自己实施水槽 . 使用Streaming API实现接收器非常简单,我确信MongoDB有一个很好的Java客户端库 .

    两种方法都不提供任何复杂的加工保证 . 但是,当您使用Flink与Kafka(并启用检查点)时,您将具有至少一次语义:在错误情况下,数据将再次流式传输到MongoDB接收器 . 如果您正在进行幂等更新,则重做这些更新不应导致任何不一致 .

    如果你真的需要MongoDB的一次性语义,你应该提交JIRA in Flink并与社区讨论如何实现它 .

  • 3

    作为Robert Metzger的替代方案,您可以将结果再次写入Kafka,然后使用其中一个维护的kafka连接器删除MongoDB数据库中主题的内容 .

    Kafka - > Flink - > Kafka - > Mongo / Anything

    通过这种方法,您可以保留“至少一次语义”的行为 .

相关问题