首页 文章

自定义Flume拦截器:intercept()方法为同一事件多次调用

提问于
浏览
2

TL;DR

当Flume源无法将事务推送到管道中的下一个通道时,它是否始终为下一次尝试保留事件实例?

一般来说,拥有一个有状态的Flume拦截器是否安全,其中事件的处理取决于先前处理的事件?

Full problem description:

我正在考虑利用Apache Kafka提供的关于主题分区在消费者群体中的消费者之间分配以在现有的基于Flume的日志整合架构中执行流重复数据删除的方式的可能性 .

使用Kafka Source for Flume和自定义路由到Kafka主题分区,我可以确保每个应该进入相同逻辑“重复数据删除队列”的事件将由集群中的单个Flume代理处理(只要没有代理在集群内停止/启动) . 我使用定制的Flume拦截器进行以下设置:

[KafkaSource with deduplication interceptor] - >()MemoryChannel) - > [HDFSSink]

似乎当Flume Kafka源代码运行器无法将一批事件推送到内存通道时,作为批处理一部分的事件实例将再次传递给我的拦截器的 intercept() 方法 . 在这种情况下,很容易将标记(以Flume事件 Headers 的形式)添加到已处理事件,以区分实际重复项与已重新处理的失败批次中的事件 .

但是,我想知道是否有明确保证在下次尝试时保留失败事务中的事件实例,或者是否有可能从实际源(在本例中为Kafka)再次读取事件并重新读取从零建造 . 在这种情况下,我的拦截器会认为这些事件是重复的并丢弃它们,即使它们从未被传递到 Channels .

EDIT

这就是我的拦截器如何区分已处理的Event实例与未处理的事件:

public Event intercept(Event event) {
  Map<String,String> headers = event.getHeaders();
  // tagHeaderName is the name of the header used to tag events, never null
  if( !tagHeaderName.isEmpty() ) {
    // Don't look further if event was already processed...
    if( headers.get(tagHeaderName)!=null )
      return event;
    // Mark it as processed otherwise...
    else
      headers.put(tagHeaderName, "");
  }
  // Continue processing of event...
}

1 回答

  • 0

    我遇到了类似的问题:

    当接收器写入失败时,Kafka Source仍然保留已经由拦截器处理的数据 . 在下一次尝试中,这些数据将发送到拦截器,并一次又一次地进行处理 . 通过阅读KafkaSource的代码,我相信它是错误的 .

    我的拦截器将从原始消息中删除一些信息,并将修改原始消息 . 由于此错误,重试机制将永远不会按预期工作 .

    到目前为止,这并非易事 .

相关问题