我们有使用Avro的Kafka流 . 我需要使用python将它连接到Spark Stream .

我用下面的代码来做到这一点:

kvs = KafkaUtils.createDirectStream(ssc, topic, {'bootstrap.servers': brokers}, valueDecoder=decoder)

然后我得到了轰鸣声错误 .

调用o44.awaitTermination时发生错误 . 2018-10-11 15:58:01 INFO DAGScheduler:54 - 工作3失败:runRD在PythonRDD.scala:149,花了1.403049 s 2018-10-11 15:58:01 INFO JobScheduler:54 - 完成工作流工作1539253680000 ms.0来自作业时间1539253680000 ms 2018-10-11 15:58:01错误JobScheduler:91 - 运行作业流作业时出错1539253680000 ms.0 org.apache.spark.SparkException:Python引发了异常:Traceback (最近一次调用最后一次):文件“/XXXXXX/spark2/python/lib/pyspark.zip/pyspark/streaming/util.py”,第65行,在调用中r = self.func(t,* rdds)文件“/ XXXXXX / spark2 / python / lib / pyspark.zip / pyspark / streaming / dstream.py“,第171行,在takeAndPrint拍摄= rdd.take(num 1)文件”/XXXXXX/spark2/python/lib/pyspark.zip/ pyspark / rdd.py“,第1375行,取res = self.context.runJob(self,takeUpToNumLeft,p)文件”/XXXXXX/spark2/python/lib/pyspark.zip/pyspark/context.py“,第1013行,在runJob中sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(),mappedRDD._jrdd,partitions)文件“/ XXXXXX / spark2 / python / l ib / py4j-0.10.7-src.zip / py4j / java_gateway.py“,第1257行,在调用answer,self.gateway_client,self.target_id,self.name)文件”/ XXXXXX / spark2 / python / lib / py4j -0.10.7-src.zip/py4j/protocol.py“,第328行,格式为get_return_value(target_id,” . ,name),值)Py4JJavaError:调用z时发生错误:org.apache.spark.api .python.PythonRDD.runJob . :org.apache.spark.SparkException:作业由于阶段失败而中止:阶段3.0中的任务0失败4次,最近失败:阶段3.0中丢失任务0.3(TID 8,gen-CLUSTER_NODE, Actuator 2):org.apache .spark.SparkException:无法连接到主题TOPIC_NAME 1的领导者:java.nio.channels.ClosedChannelException

但是,在终端上显示此错误并终止进程之前,我可以使用波纹管代码打印RDD

kvs.pprint()

什么是领导者?我们怎么能过来这个呢?