首页 文章

Storm KafkaSpout停止使用来自Kafka Topic的消息

提问于
浏览
4

我的问题是Storm KafkaSpout在一段时间后停止使用来自Kafka主题的消息 . 在storm中启用调试时,我得到如下日志文件:

2016-07-05 03:58:26.097 oasdtask [INFO] Emitting:packet_spout __metrics [#object [org.apache.storm.metric.api.IMetricsConsumer $ TaskInfo 0x2c35b34f“org.apache.storm.metric.api.IMetricsConsumer $ TaskInfo @ 2c35b34f“] [#object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x798f1e35”[__ack-count =]“] #object [org.apache.storm.metric.api .IMetricsConsumer $ DataPoint 0x230867ec“[__sendqueue = {sojourn_time_ms = 0.0,write_pos = 5411461,read_pos = 5411461,overflow = 0,arrival_rate_secs = 0.0,capacity = 1024,population = 0}]”] #object [org.apache.storm . metric.api.IMetricsConsumer $ DataPoint 0x7cdec8eb“[__complete-latency =]”] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x658fc59“[__skipped-max-spout = 0]” ] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x3c1f3a50“[__receive = {sojourn_time_ms = 4790.5,write_pos = 2468305,read_pos = 2468304,overflow = 0,arrival_rate_secs = 0.20874647740319383,capacity = 1024,population = 1 }]“] #object [或g.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x262d7906“[__skipped-inactive = 0]”] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x73648c7e“[kafkaPartition = {Partition {host = slave103:9092,topic = packet,partition = 12} / fetchAPICallCount = 0,Partition {host = slave103:9092,topic = packet,partition = 12} / fetchAPILatencyMax = null,Partition {host = slave103:9092,topic = packet, partition = 12} / lostMessageCount = 0,Partition {host = slave103:9092,topic = packet,partition = 12} / fetchAPILatencyMean = null,Partition {host = slave103:9092,topic = packet,partition = 12} / fetchAPIMessageCount = 0 }] “] #object [org.apache.storm.metric.api.IMetricsConsumer $数据点0x4e43df61”[kafkaOffset = {分组/ totalLatestCompletedOffset = 154305947,分组/ partition_12 / spoutLag = 82472754,分组/ totalEarliestTimeOffset = 233919465,分组/ partition_12 / earliestTimeOffset = 233919465,packet / partition_12 / latestEmittedOffset = 154307691,packet / partition_12 / latestTimeOffset = 236778701,packet / totalLatestEmittedOffset = 1543076 91,packet / partition_12 / latestCompletedOffset = 154305947,packet / totalLatestTimeOffset = 236778701,packet / totalSpoutLag = 82472754}]“] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x49fe816b”[__transfer-count = {ack_init = 0,默认= 0, metric = 0}]“] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x63e2bdc0”[__fail-count = {}]“] #object [org.apache.storm .metric.api.IMetricsConsumer $ DataPoint 0x3b17bb7b“[__ skipped-throttle = 1086120]”] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x1315a68c“[__emit-count = {ack_init = 0,默认= 0 , metrictrics = 0}]“]]] 2016-07-05 03:58:55.042 oasdexecutor [INFO]处理收到的消息FOR -2 TUPLE:source:__system:-1,stream:__tick,id:{},[ 30] 2016-07-05 03:59:25.042 oasdexecutor [INFO]处理收到的消息FOR -2 TUPLE:source:__system:-1,stream:__tick,id:{},[30] 2016-07-05 03 :59:25.946 oasdexecutor [INFO]处理收到的消息FOR -2 TUPLE:source:system:-1,s tream: metrics_tick,id:{},[60]

我的测试拓扑非常简单,One KafkaSpout和另一个Counter Bolt . 当拓扑正常工作时, FORTUPLE 之间的值为正数;当拓扑停止使用消息时,该值变为负数 . 所以我很好奇是什么导致 Processing received message FOR -2 TUPLE 的问题,以及如何解决这个问题?

顺便说一句,我的实验环境是:

OS:Red Hat Enterprise Linux Server 7.0版(Maipo)Kafka:0.10.0.0 Storm:1.0.1

2 回答

  • -1

    在stom邮件列表的帮助下,我能够调整KafkaSpout并解决问题 . 以下设置适用于我 .

    config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 2048);
    config.put(Config.TOPOLOGY_BACKPRESSURE_ENABLE, false);
    config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
    config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
    

    我通过发送20k-50k批次进行测试,在突发之间暂停1秒 . 每条消息都是2048字节 .

    我正在运行3节点集群,我的拓扑有4个spout,主题有64个分区 .

    在200M消息之后它仍在工作....

  • 2
    • 检查制作人是否实际上正在写入您期望的主题 .

    • 确保喷口可以在网络级别到达Kafka . 您可以使用Telnet命令进行检查 .

    • 鲸鱼喷水可以到达Zookeeper吗?使用Telnet再次检查 .

    资料来源:KafkaSpout is not receiving anything from Kafka

    如果以上三个是真的,那么:

    Kafka有固定的主题保留窗口 . 如果保留已满,它会从尾部丢弃消息 .

    所以在这里可能会发生什么:将数据推送到kafka的速度比消费者消费消息的速度要快 .

    资料来源:Storm-kafka spout not fast enough to process the information

相关问题