我正在为Elasticsearch编写一个Kafka Sink连接器 .
我在SinkTask类中实现了start,put,flush,close方法 .
但是,我不知道Kafka Sink Connector的行为究竟是什么 .
如果Connect Worker通过 put()
方法重复执行从Kafka Broker获取SinkRecord的所有任务,在内部处理它,然后将数据发送到Elasticsearch,我想知道何时在内部操作偏移提交并且它与 flush()
方法相关联 .
另外,我想知道这种重复性工作的顺序是否固定 . 例如,可以在put完成之前刷新或提交吗?
我正在开发一个连接,它从远程代理接收数据并将数据放入另一个远程服务器的elasticsearch . 在这种情况下,我们正在测试如果运行连接的服务器的网络暂时断开连接会发生什么 . 我不明白Kafka Connect的工作原理 .
如果您知道在这种情况下邮件可能丢失或重复,请询问解释 .
谢谢 .
1 回答
Connect工作者消耗的偏移量应更新
__consumer_offsets
内部Kafka主题 .只要您正在监视连接器的
/status
endpoints 以获取非故障状态,并且您可以在您正在使用的Kafka主题的保留期内重新启动它,那么应该几乎没有数据丢失 .另外,正如评论中指出kafka-connect-elasticsearch已经存在,您可以检查该提交和刷新语义的代码 . 如果你做叉子,请把它放在PR帮助社区;)
此外,Logstash具有Kafka输入和Elasticsearch输出,因此如果您正在运行ELK堆栈,那么这可能是比编写自己的连接器更好的选择