首页 文章

SparkStreaming Kafka:轮询60000后无法获得记录

提问于
浏览
3

我在Kafka上做Spark Streaming . 流式传输作业开始正常并运行几个小时后才会遇到以下问题:

17/05/18 03:44:47错误执行程序:阶段1864.0中的任务8.0中的异常(TID 27968)java.lang.AssertionError:断言失败:无法获取spark-executor-c10f4ea9-a1c6-4a9f-b87f的记录-8d6ff66e10a5 madlytics-rt_1 3 1150964759在org.apache上的org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)的scala.Predef $ .assert(Predef.scala:170)中查询60000后.spark.streaming.kafka010.KafkaRDD $ KafkaRDDIterator.next(KafkaRDD.scala:227)at org.apache.spark.streaming.kafka010.KafkaRDD $ KafkaRDDIterator.next(KafkaRDD.scala:193)at scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:409)at scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:409)at scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:409)at at scala.collection.Iterator $$ anon $ 13.hasNext(Iterator.scala:462)at scala.collection.Iterator $$ anon $ 11.hasNext(Iterator.scala:408)at scala.collection.Iterator $$ anon $ 13.hasNext( Iterator.scala:461)在scala.collection.Iterator $$ anon $ 13 . hasNext(Iterator.scala:461)at scala.collection.Iterator $$ anon $ 13.hasNext(Iterator.scala:461)at scala.collection.Iterator $$ anon $ 11.hasNext(Iterator.scala:408)at scala.collection .Iterator $$ anon $ 13.hasNext(Iterator.scala:461)at scala.collection.Iterator $$ anon $ 11.hasNext(Iterator.scala:408)at scala.collection.Iterator $$ anon $ 13.hasNext(Iterator.scala :461)在org.apache.spark的org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)的scala.collection.Iterator $$ anon $ 11.hasNext(Iterator.scala:408) . 在org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)的org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:)中的shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63): 47)atg.apache.spark.scheduler.Task.run(Task.scala:86)at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:274)at java.util.concurrent.ThreadPoolExecutor java.util.concurrent.ThreadPoolExec中的.runWorker(ThreadPoolExecutor.java:1142) utor $ Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745)

此外,我按照此处的建议适当增加了 heartbeat.interval.mssession.timeout.msrequest.timeout.ms 的值:https://issues.apache.org/jira/browse/SPARK-19275

以下是一些相关的配置:

batch.interval = 60s
spark.streaming.kafka.consumer.poll.ms = 60000
session.timeout.ms = 60000 (default: 30000)
heartbeat.interval.ms = 6000 (default: 3000)
request.timeout.ms = 90000 (default: 40000)

此外,Kafka集群是一个5节点,我正在阅读的主题有15个分区 . 下面列出了其他一些Kafka配置:

num.network.threads=8
num.io.threads=8

任何帮助都感激不尽 . 谢谢 .

2 回答

  • 0

    根据我的经验,这种特殊的失败是 Kafka 集群的一个过载症状 . 通常的嫌疑人总是GC世界停止和线程挨饿 .

    最重要的是,表面上 Kafka 的一切都可能很好,但也许不是 .

    在添加分区后,它是否花费了大量时间重新 balancer ?或者,由于您执行的所有负载测试,它是否保持了一个巨大的偏移主题?

    我曾经发生的事情是表面上的集群很好,但是这个超时就出现了 . 在一个全新甚至更小的集群上,这个问题就消失了 .

  • 0

    我使用一个非常明显的简单配置更改解决了这个问题,但我花了一些时间才意识到如何处理这样的默认(错误)配置 .

    主要问题是Spark config spark.streaming.kafka.consumer.poll.msKafkaRDD中的默认 512 ms)或 spark.network.timeout (默认为120sec,如果未设置 spark.streaming.kafka.consumer.poll.ms )始终小于Kafka consumer request.timeout.msKafka newconsumerapi中默认 305000 ms)...因此火花轮询总是超时之前超时发生在Kafka消费者请求/民意调查(当Kafka主题中没有可用的记录时) .

    只需将 spark.streaming.kafka.consumer.poll.ms 增加到大于Kafka request.timeout.ms 的值即可 . 同时将Kafka consumer max.poll.interval.ms 调整为始终小于 request.timeout.ms .

    Q.E.D和祝你好运 .

相关问题