首页 文章

当kafka代理重新启动时,out队列中的所有消息都无法传递给代理

提问于
浏览
2

我正在使用C Kafka客户端:librdkafka . lib在这里https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_example.cpp . 我的程序正在向代理写入2000000条消息 . 在此过程中,我重新启动了代理 . 有时,没有消息无法传递给代理 . 有时大约100,000条消息无法传递给经纪人 . queue.buffering.max.messages=100000 . It seems that all the messages in the out queue were lost? 错误是 RdKafka::Message delivery report: Local: Unknown partition .

I found new problems:(1) sometimes, about 200 messages are sent to broker twice.(2) Sometimes, a message was sent to broker already, but the dr_cb() is called. It told me that this message failed to be delivered to broker. 我试图弄清楚它是经纪人还是客户的问题 . 有人有类似的问题吗?实际上,我需要在客户端和代理服务器之间提供可靠的传输和传输报告 . 我正在考虑现在使用 C 客户端 . 不确定这个问题是否会再次发生......

经纪人的日志是:

[2015-07-21 17:48:33,471] INFO 0成功当选为领导者(kafka.server.ZookeeperLeaderElector)[2015-07-21 17:48:33,717] INFO新领导者为0(kafka.server.ZookeeperLeaderElector $ LeaderChangeListener)[2015-07-21 17:48:33,718] ERROR [KafkaApi-0]处理请求时出错名称:TopicMetadataRequest;版本:0; CorrelationId:5017; ClientId:rdkafka;主题:test(kafka.server.KafkaApis)kafka.admin.AdminOperationException:复制因子:比可用代理大1:0在kafka.admin.AdminUtils $ .assignReplicasToBrokers(AdminUtils.scala:70)at kafka.admin.AdminUtils $ . createTopic(AdminUtils.scala:171)at kafka.server.KafkaApis $$ anonfun $ 19.apply(KafkaApis.scala:520)at kafka.server.KafkaApis $$ anonfun $ 19.apply(KafkaApis.scala:503)at scala.collection .TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:194)at scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:194)at scala.collection.immutable.Set $ Set1.foreach (set.scala:86)at scala.collection.TraversableLike $ class.map(TraversableLike.scala:194)at scala.collection.immutable.Set $ Set1.scala $ collection $ SetLike $$ super $ map(Set.scala: 73)scala.collection.SetLike $ class.map(SetLike.scala:93)at scala.collection.immutable.Set $ Set1.map(Set.scala:73)at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala :503)at kafka.server.KafkaApis.handleTopicMe tadataRequest(KafkaApis.scala:542)位于kafka.server.KafkaApis.handle(KafkaApis.scala:62)at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)at java.lang.Thread.run(Thread.java) :745)[2015-07-21 17:48:33,743] INFO注册经纪人0在路径/经纪人/ ids / 0,地址为cyclops-9803:9092 . (kafka.utils.ZkUtils $)[2015-07-21 17:48:33,759] INFO [Kafka Server 0],启动(kafka.server.KafkaServer)[2015-07-21 17:48:33,803] INFO关闭套接字连接到/127.0.0.1 . (kafka.network.Processor)[2015-07-21 17:48:33,858] INFO [代理0上的ReplicaFetcherManager]删除分区的fetcher [test,0](kafka.server.ReplicaFetcherManager)[2015-07-21 17: 48:34,000] INFO [代理0上的ReplicaFetcherManager]删除了分区的fetcher [test,0](kafka.server.ReplicaFetcherManager)[2015-07-21 17:48:34,017] INFO关闭到/127.0.0.1的套接字连接 . (kafka.network.Processor)

我的 生产环境 者配置是:

全局配置client.id = rdkafka metadata.broker.list = localhost:9092 message.max.bytes = 4000000 receive.message.max.bytes = 100000000 metadata.request.timeout.ms = 900000 topic.metadata.refresh.interval . ms = -1 topic.metadata.refresh.fast.cnt = 10 topic.metadata.refresh.fast.interval.ms = 250 topic.metadata.refresh.sparse = false socket.timeout.ms = 300000 socket.send.buffer . bytes = 0 socket.receive.buffer.bytes = 0 socket.keepalive.enable = false socket.max.fails = 10 broker.address.ttl = 300000 broker.address.family = any statistics.interval.ms = 0 error_cb = 0x5288a60 stats_cb = 0x5288ba0 log_cb = 0x54942a0 log_level = 6 socket_cb = 0x549e6c0 open_cb = 0x54acf90 opaque = 0x9167898 internal.termination.signal = 0 queued.min.messages = 100000 queued.max.messages.kbytes = 1000000 fetch.wait.max.ms = 100 fetch.message.max.bytes = 1048576 fetch.min.bytes = 1 fetch.error.backoff.ms = 500 queue.buffering.max.messages = 100000 queue.buffering.max.ms = 1000 message.send.max.retries = 10 retry.backoff.ms = 100 compression.codec = none batch.num.messages = 100 0 delivery.report.only.error = true主题配置request.required.acks = 1 enforce.isr.cnt = 0 request.timeout.ms = 5000 message.timeout.ms = 300000 produce.offset.report = false auto.commit .enable = true auto.commit.interval.ms = 60000 auto.offset.reset = maximum offset.store.path = . offset.store.sync.interval.ms = -1 offset.store.method = file consume.callback.max.messages = 0

消费者输出是:

[2015-07-22 20:57:21,052]警告从代理[id:0,host:cyclops-9803,port:9092]获取主题[Set(test)]的相关ID为1的主题元数据失败(kafka . client.ClientUtils $)java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)at kafka.producer.SyncProducer.liftedTree1 $ 1(SyncProducer.scala:73)at kafka.producer.SyncProducer . kafka $ producer $ syncProducer $$ doSend(SyncProducer.scala:72)at kafka.producer.SyncProducer.send(SyncProducer.scala:113)at at位于kafka.consumer.ConsumerFetcherManager的kafka.client.ClientUtils $ .fetchTopicMetadata(ClientUtils.scala:93)中的kafka.client.ClientUtils $ .fetchTopicMetadata(ClientUtils.scala:58)$ LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) .utils.ShutdownableThread.run(ShutdownableThread.scala:60)[2015-07-22 20:57:21,073] WARN [console-consumer-88480_cyclops-9803-1437598630859-416c8038-leader-finder-thread],找不到领导者for Set([test,0])(kafka.consumer.ConsumerFetcherManager $ LeaderFinderThread)kafka.common.KafkaException:从代理[ArrayBuffer(id:0,host:cyclops-9803)获取主题[Set(test)]的主题元数据端口:9092)]在kafka.client.ClientUtils $ .fetchTopicMetadata(ClientUtils.scala:72)kafka.consumer.ConsumerFetcherManager $ LeaderFinderThread.doWork(ConsumerFetcherManager)的kafka.client.ClientUtils $ .fetchTopicMetadata(ClientUtils.scala:93)失败了.scala:66)at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)引起:java.nio.channels.Clos位于kafka.producer.SyncProducer.liftedTree1 $ 1(SyncProducer.scala:73)的kafka.network.BlockingChannel.send(BlockingChannel.scala:100)中的edChannelException位于kafka.producer.SyncProducer.kafka $ producer $ SyncProducer $$ doSend(SyncProducer . scala:72)at kafka.producer.SyncProducer.send(SyncProducer.scala:113)at kafka.client.ClientUtils $ .fetchTopicMetadata(ClientUtils.scala:58)

欢迎任何建议 . 谢谢 .

1 回答

  • 0

    在asyn模式下,客户端应该处理这种问题 . 不知道如何确保out队列中的消息可以100%的概率传递给代理 . 我们可以做的是确保输出队列中的消息 . 如果交付失败,我们应该再次将消息放入队列 . 如果交付失败,则调用dr_cb() . 在此函数中,尝试再次将消息放入out队列 . 也许这不是最好的方式 . 但现在,我正在使用这种方式 .

相关问题