首页 文章

来自KafkaConsumer的NiFi Flowfile属性

提问于
浏览
1

我一直在尝试从Spark Streaming中的Kafka消息访问NiFi Flowfile属性 . 我使用Java作为语言 .

场景是NiFI使用GetSFTP处理器从FTP位置读取二进制文件,并使用publishKafka处理器将byte []消息发布到Kafka . 这些byte []属性使用Spark Streaming作业转换为ASCII数据,这些解码的ASCII写入Kafka进行进一步处理,并使用NiFi处理器保存到HDFS .

我的问题是我无法跟踪二进制文件名和解码的ASCII文件 . 我必须在我的解码ASCII中添加一个 Headers 部分(用于文件名,文件大小,记录计数等),但我无法弄清楚如何从KafkaConsumer对象的NiFi Flowfile访问文件名 . 有没有办法可以使用标准的NiFi处理器来做到这一点?或者,请分享任何其他建议,以实现此功能 . 谢谢 .

1 回答

  • 1

    所以你的数据流是:

    FTP - > NiFi - > Kafka - > Spark Streaming - > Kafka - > NiFi - > HDFS?

    目前Kafka在每条消息上都没有元数据属性(虽然我相信这可能会出现在Kafka 0.11中),因此当NiFi向主题发布消息时,它当前无法传递带有消息的流文件属性 .

    您必须构建某种类型的包装器数据格式(可能是JSON或Avro),其中包含您需要的其他属性的原始内容,以便您可以将整个内容作为一条消息的内容发布到Kafka .

    另外,我不清楚你在Spark流媒体工作中到底在做什么,但是有没有理由你不能在NiFi中做那个部分?它听起来并不像涉及窗口或连接的任何复杂,所以你可能会简化一些事情并让NiFi进行解码,然后将NiFi写入Kafka和HDFS .

相关问题