我在Kafka上做Spark Streaming . 流式传输作业开始正常并运行几个小时后才会遇到以下问题:
17/05/18 03:44:47错误执行程序:阶段1864.0中的任务8.0中的异常(TID 27968)java.lang.AssertionError:断言失败:无法获取spark-executor-c10f4ea9-a1c6-4a9f-b87f的记录-8d6ff66e10a5 madlytics-rt_1 3 1150964759在org.apache上的org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)的scala.Predef $ .assert(Predef.scala:170)中查询60000后.spark.streaming.kafka010.KafkaRDD $ KafkaRDDIterator.next(KafkaRDD.scala:227)at org.apache.spark.streaming.kafka010.KafkaRDD $ KafkaRDDIterator.next(KafkaRDD.scala:193)at scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:409)at scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:409)at scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:409)at at scala.collection.Iterator $$ anon $ 13.hasNext(Iterator.scala:462)at scala.collection.Iterator $$ anon $ 11.hasNext(Iterator.scala:408)at scala.collection.Iterator $$ anon $ 13.hasNext( Iterator.scala:461)在scala.collection.Iterator $$ anon $ 13 . hasNext(Iterator.scala:461)at scala.collection.Iterator $$ anon $ 13.hasNext(Iterator.scala:461)at scala.collection.Iterator $$ anon $ 11.hasNext(Iterator.scala:408)at scala.collection .Iterator $$ anon $ 13.hasNext(Iterator.scala:461)at scala.collection.Iterator $$ anon $ 11.hasNext(Iterator.scala:408)at scala.collection.Iterator $$ anon $ 13.hasNext(Iterator.scala :461)在org.apache.spark的org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)的scala.collection.Iterator $$ anon $ 11.hasNext(Iterator.scala:408) . 在org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)的org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:)中的shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63): 47)atg.apache.spark.scheduler.Task.run(Task.scala:86)at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:274)at java.util.concurrent.ThreadPoolExecutor java.util.concurrent.ThreadPoolExec中的.runWorker(ThreadPoolExecutor.java:1142) utor $ Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745)
此外,我按照此处的建议适当增加了 heartbeat.interval.ms
, session.timeout.ms
和 request.timeout.ms
的值:https://issues.apache.org/jira/browse/SPARK-19275
以下是一些相关的配置:
batch.interval = 60s
spark.streaming.kafka.consumer.poll.ms = 60000
session.timeout.ms = 60000 (default: 30000)
heartbeat.interval.ms = 6000 (default: 3000)
request.timeout.ms = 90000 (default: 40000)
此外,Kafka集群是一个5节点,我正在阅读的主题有15个分区 . 下面列出了其他一些Kafka配置:
num.network.threads=8
num.io.threads=8
任何帮助都感激不尽 . 谢谢 .
2 回答
根据我的经验,这种特殊的失败是 Kafka 集群的一个过载症状 . 通常的嫌疑人总是GC世界停止和线程挨饿 .
最重要的是,表面上 Kafka 的一切都可能很好,但也许不是 .
在添加分区后,它是否花费了大量时间重新 balancer ?或者,由于您执行的所有负载测试,它是否保持了一个巨大的偏移主题?
我曾经发生的事情是表面上的集群很好,但是这个超时就出现了 . 在一个全新甚至更小的集群上,这个问题就消失了 .
我使用一个非常明显的简单配置更改解决了这个问题,但我花了一些时间才意识到如何处理这样的默认(错误)配置 .
主要问题是Spark config
spark.streaming.kafka.consumer.poll.ms
(KafkaRDD中的默认 512 ms)或spark.network.timeout
(默认为120sec,如果未设置spark.streaming.kafka.consumer.poll.ms
)始终小于Kafka consumerrequest.timeout.ms
(Kafka newconsumerapi中默认 305000 ms)...因此火花轮询总是超时之前超时发生在Kafka消费者请求/民意调查(当Kafka主题中没有可用的记录时) .只需将
spark.streaming.kafka.consumer.poll.ms
增加到大于Kafkarequest.timeout.ms
的值即可 . 同时将Kafka consumermax.poll.interval.ms
调整为始终小于request.timeout.ms
.Q.E.D和祝你好运 .