首页 文章

使用Kafka的Streams API处理错误消息

提问于
浏览
21

我有一个基本的流处理流程,看起来像

master topic -> my processing in a mapper/filter -> output topics

我想知道处理“坏消息”的最佳方法 . 这可能是我无法正确反序列化的消息,或者处理/过滤逻辑可能以某种意外的方式失败(我没有外部依赖,所以不应该有这种类型的瞬态错误) .

我正在考虑将所有处理/过滤代码包装在try catch中,如果出现异常,则路由到“错误主题” . 然后我可以研究该消息并对其进行修改或修改我的代码,然后将其重播为master . 如果我让任何异常传播,则流似乎被卡住并且不再拾取消息 .

  • 这种方法被认为是最佳做法吗?

  • 有没有一种方便的Kafka溪流来处理这个问题?我不认为有DLQ的概念......

  • 在"bad message"上阻止 Kafka 干扰的替代方法是什么?

  • 有哪些替代错误处理方法?

为了完整性,这里是我的代码(伪ish):

class Document {
    // Fields
}

class AnalysedDocument {

    Document document;
    String rawValue;
    Exception exception;
    Analysis analysis;

    // All being well
    AnalysedDocument(Document document, Analysis analysis) {...}

    // Analysis failed
    AnalysedDocument(Document document, Exception exception) {...}

    // Deserialisation failed
    AnalysedDocument(String rawValue, Exception exception) {...}
}

KStreamBuilder builder = new KStreamBuilder();
KStream<String, AnalysedPolecatDocument> analysedDocumentStream = builder
    .stream(Serdes.String(), Serdes.String(), "master")
    .mapValues(new ValueMapper<String, AnalysedDocument>() {
         @Override
         public AnalysedDocument apply(String rawValue) {
             Document document;
             try {
                 // Deserialise
                 document = ...
             } catch (Exception e) {
                 return new AnalysedDocument(rawValue, exception);
             }
             try {
                 // Perform analysis
                 Analysis analysis = ...
                 return new AnalysedDocument(document, analysis);
             } catch (Exception e) {
                 return new AnalysedDocument(document, exception);
             }
         }
    });

// Branch based on whether analysis mapping failed to produce errorStream and successStream
errorStream.to(Serdes.String(), customPojoSerde(), "error");
successStream.to(Serdes.String(), customPojoSerde(), "analysed");

KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

任何帮助非常感谢 .

2 回答

  • 19

    目前,Kafka Streams仅提供有限的错误处理功能 . 正在进行的工作是为了简化这一过程 . 目前,您的整体方法似乎是一个很好的方法 .

    关于处理de / serialization错误的一条评论:手动处理这些错误,需要你进行de / serialization "manually" . 这意味着,您需要为您的Streams应用程序的输入/输出主题配置 ByteArraySerde 以获取键和值,并添加执行de /序列化的 map() (即, KStream<byte[],byte[]> -> map() -> KStream<keyType,valueType> - 或者如果您还想要捕获,则添加其他方式序列化例外) . 否则,您不能 try-catch 反序列化异常 .

    使用您当前的方法,您"only"验证给定的字符串代表一个有效的文档 - 但可能是这样,消息本身已损坏,并且无法在源操作符中首先转换为 String . 因此,您实际上并未使用代码覆盖反序列化异常 . 但是,如果您确定反序列化异常永远不会发生,那么您的方法也是足够的 .

    Update

    此问题通过KIP-161解决,并将包含在下一版本1.0.0中 . 它允许您通过参数 default.deserialization.exception.handler 注册回调 . 每次反序列化期间发生异常时都会调用该处理程序,并允许您返回 DeserializationResponseCONTINUE - >删除记录以继续,或者 FAIL 这是默认值) .

    Update 2

    使用KIP-210(将成为Kafka 1.1的一部分),通过注册 ProductionExceptionHandler via config default.production.exception.handler 可以返回 CONTINUE ,也可以处理 生产环境 者方面的错误,类似于消费者部分 .

  • 20

    Update Mar 23, 2018: Kafka 1.0通过KIP-161为错误的错误消息("poison pills")提供了比我下面描述的更好更容易的处理 . 请参阅Kafka 1.0文档中的default.deserialization.exception.handler .

    这可能是我无法正确反序列化的消息[...]

    好的,我的答案主要关注(de)序列化问题,因为这可能是大多数用户处理的最棘手的场景 .

    [...]或者处理/过滤逻辑可能以某种意外的方式失败(我没有外部依赖,所以不应该有那种瞬态错误) .

    同样的思考(用于反序列化)也可以应用于处理逻辑中的失败 . 在这里,大多数人倾向于倾向于下面的选项2(减去反序列化部分),但是YMMV .

    我正在考虑将所有处理/过滤代码包装在try catch中,如果引发异常,则路由到“错误主题” . 然后我可以研究该消息并对其进行修改或修改我的代码,然后将其重播为master . 如果我让任何异常传播,则流似乎被卡住并且不再拾取消息 . 这种方法被认为是最佳做法吗?

    是的,目前这是要走的路 . 本质上,两种最常见的模式是(1)跳过损坏的消息或(2)将损坏的记录发送到隔离主题,即死信队列 .

    有没有方便的Kafka溪流来处理这个问题?我不认为有DLQ的概念......

    是的,有办法处理这个问题,包括使用死信队列 . 然而,它(至少恕我直言)还不方便 . 如果你对如何有任何反馈API应该允许你处理这个 - 例如通过新的或更新的方法,配置设置(“如果序列化/反序列化失败将有问题的记录发送到此隔离主题”) - 请告诉我们 . :-)

    什么是阻止 Kafka 干扰“坏消息”的替代方法?有哪些替代错误处理方法?

    请参阅下面的示例 .

    FWIW,Kafka社区还在讨论添加一个新的CLI工具,允许您跳过已损坏的消息 . 但是,作为Kafka Streams API的用户,我认为您希望直接在代码中处理此类方案,并且仅作为最后的手段回退到CLI实用程序 .

    以下是Kafka Streams DSL处理损坏的记录/消息的一些模式,即"poison pills" . 这取自http://docs.confluent.io/current/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-messages

    Option 1: Skip corrupted records with flatMap

    这可以说是大多数用户想要做的事情 .

    • 我们使用 flatMap 因为它允许您为每个输入记录输出零个,一个或多个输出记录 . 在记录损坏的情况下,我们不输出任何内容(零记录),从而忽略/跳过损坏的记录 .

    • 这种方法的好处与此处列出的其他方法相比:我们只需要手动反序列化一次记录一次!

    • 此方法的缺点: flatMap "marks"潜在数据重新分区的输入流,即如果您执行基于密钥的操作(如分组( groupBy / groupByKey )或之后加入),您的数据将在后台重新分区 . 由于这可能是一个代价高昂的步骤,我们不需要对键进行操作(因此将它们保存为 byte[] 格式的"raw"键),您可以从 flatMap 更改为 flatMapValues ,即使您加入也不会导致数据重新分区/ group /稍后聚合流 .

    代码示例:

    Serde<byte[]> bytesSerde = Serdes.ByteArray();
    Serde<String> stringSerde = Serdes.String();
    Serde<Long> longSerde = Serdes.Long();
    
    // Input topic, which might contain corrupted messages
    KStream<byte[], byte[]> input = builder.stream(bytesSerde, bytesSerde, inputTopic);
    
    // Note how the returned stream is of type KStream<String, Long>,
    // rather than KStream<byte[], byte[]>.
    KStream<String, Long> doubled = input.flatMap(
        (k, v) -> {
          try {
            // Attempt deserialization
            String key = stringSerde.deserializer().deserialize(inputTopic, k);
            long value = longSerde.deserializer().deserialize(inputTopic, v);
    
            // Ok, the record is valid (not corrupted).  Let's take the
            // opportunity to also process the record in some way so that
            // we haven't paid the deserialization cost just for "poison pill"
            // checking.
            return Collections.singletonList(KeyValue.pair(key, 2 * value));
          }
          catch (SerializationException e) {
            // log + ignore/skip the corrupted message
            System.err.println("Could not deserialize record: " + e.getMessage());
          }
          return Collections.emptyList();
        }
    );
    

    Option 2: dead letter queue with branch

    与选项1(忽略损坏的记录)相比,选项2通过将它们从“主”输入流中过滤掉并将它们写入隔离主题(想想:死信队列)来保留损坏的消息 . 缺点是,对于有效记录,我们必须支付两次手动反序列化费用 .

    KStream<byte[], byte[]> input = ...;
    
    KStream<byte[], byte[]>[] partitioned = input.branch(
        (k, v) -> {
          boolean isValidRecord = false;
          try {
            stringSerde.deserializer().deserialize(inputTopic, k);
            longSerde.deserializer().deserialize(inputTopic, v);
            isValidRecord = true;
          }
          catch (SerializationException ignored) {}
          return isValidRecord;
        },
        (k, v) -> true
    );
    
    // partitioned[0] is the KStream<byte[], byte[]> that contains
    // only valid records.  partitioned[1] contains only corrupted
    // records and thus acts as a "dead letter queue".
    KStream<String, Long> doubled = partitioned[0].map(
        (key, value) -> KeyValue.pair(
            // Must deserialize a second time unfortunately.
            stringSerde.deserializer().deserialize(inputTopic, key),
            2 * longSerde.deserializer().deserialize(inputTopic, value)));
    
    // Don't forget to actually write the dead letter queue back to Kafka!
    partitioned[1].to(Serdes.ByteArray(), Serdes.ByteArray(), "quarantine-topic");
    

    Option 3: Skip corrupted records with filter

    我只提到这个是完整的 . 此选项看起来像是选项1和2的混合,但比其中任何一个都差 . 与选项1相比,您必须为有效记录支付手动反序列化费用两次(不好!) . 与选项2相比,您将无法在死信队列中保留损坏的记录 .

    KStream<byte[], byte[]> validRecordsOnly = input.filter(
        (k, v) -> {
          boolean isValidRecord = false;
          try {
            bytesSerde.deserializer().deserialize(inputTopic, k);
            longSerde.deserializer().deserialize(inputTopic, v);
            isValidRecord = true;
          }
          catch (SerializationException e) {
            // log + ignore/skip the corrupted message
            System.err.println("Could not deserialize record: " + e.getMessage());
          }
          return isValidRecord;
        }
    );
    KStream<String, Long> doubled = validRecordsOnly.map(
        (key, value) -> KeyValue.pair(
            // Must deserialize a second time unfortunately.
            stringSerde.deserializer().deserialize(inputTopic, key),
            2 * longSerde.deserializer().deserialize(inputTopic, value)));
    

    任何帮助非常感谢 .

    我希望我能帮忙 . 如果是,我将非常感谢您就如何改进Kafka Streams API以便以比现在更好/更方便的方式处理故障/异常的反馈 . :-)

相关问题