我正在使用Apache Camel来消费来自kafka主题的消息,然后处理消息,同时处理如果发生异常,我将该消息重定向到另一个kafka主题并在单独的路由中处理该消息 . 所以我的路线如下 .
from(“kafka1”) . process(“someProcessor”) . end(); onException(Throwable.class).process(exchange - > {exchange.getIn() . setBody(“带有错误详情的消息”)}) . to(“kafka2”);
上面的代码实际上是在同一个kafka(kafka1)中发送错误消息 .
我通过在onException进程中设置exchange.getIn() . setHeader(KafkaConstants.TOPIC,“kafka2”))来解决这个问题 . 这是预期的行为吗?为什么它会忽略kafka2并使用kafka1呢?
1)使用的骆驼版本 - 2.14.0
2)Kafka endpoints URL -
消费者-
from("kafka:" + ("kafka.broker") + "?topic="
+ ("offer.kafka.topic")
+ "&zookeeperHost=" + ("kafka.zookeeper.host")
+ "&zookeeperPort=" + ("kafka.zookeeper.port")
+ "&groupId=" + ("offer.kafka.group.id")
+ "&consumerStreams=" + ("kafka.streams")
+ "&autoCommitIntervalMs=" + ("product.kafka.consumer.auto.commit.intervals")
+ "&zookeeperConnectionTimeoutMs=" + ("zookeeper.connection.timeout")
+ "&rebalanceMaxRetries=" + ("kafka.rebalance.max.retries")
+ "&rebalanceBackoffMs=" + ("kafka.rebalance.backoffs.ms")
+ "&zookeeperSessionTimeoutMs=" + ("zookeeper.session.timeout")
+ "&autoOffsetReset=" + ("kafka.auto.offset.reset")
+ "&fetchMessageMaxBytes=" + ("kafka.fetch.message.max.bytes")
+ "&socketReceiveBufferBytes=" + ("receive.buffer.bytes"))
.routeId("offerEventRoute").to("direct:offerEventRoute");
制片人 -
to("kafka:" + ("error.kafka.broker") + "?topic="
+ ("error.kafka.topic")
+ "&zookeeperHost=" + ("error.kafka.zookeeper.host")
+ "&zookeeperPort=" + ("error.kafka.zookeeper.port")
+ "&groupId=" + ("error.kafka.group.id")
+ "&zookeeperConnectionTimeoutMs=" + ("error.zookeeper.connection.timeout")
+ "&rebalanceMaxRetries=" + ("rebalance.max.retries")
+ "&rebalanceBackoffMs=" + ("rebalance.backoffs.ms")
+ "&zookeeperSessionTimeoutMs=" + ("zookeeper.session.timeout")
+ "&autoOffsetReset=" + ("auto.offset.reset")
+ "&messageSendMaxRetries=" + ("error.max.retries")
+ "&serializerClass=kafka.serializer.StringEncoder"
);
2 回答
您需要在 生产环境 者kafka endpoints 中将bridgeEndPoint设置为true . 否则,它会在交换标头中查找主题名称,并将其用作 生产环境 者的主题名称 .
默认情况下为false .
你能否提供一些关于代码的更多细节,比如
1)使用的骆驼版本
2)您的Kafka endpoints URL .
您是否有机会在 endpoints URL中使用“bridgeEndpoint”属性 .