尝试使用kafka主题使用以下命令在Spark中创建输入流但是会收到错误 .

这是我第一次尝试使用kafka进行Spark Streaming

版本细节,

  • Spark版本:spark-2.2.0-bin-hadoop2.7

  • Kafka 版本:kafka_2.11-0.11.0.0

  • Zookeepr版本:zookeeper-3.4.10

  • Spark Streaming jar文件: spark-streaming-kafka-0-8-assembly_2.10-2.2.0.jar

(Zookeeper&kafka已启动并正在运行 . 我可以在kafka控制台中创建 生产环境 者和消费者 . )

PySpark Notebook命令:

import os    
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.10:2.2.0 pyspark-shell'    
from pyspark.streaming.kafka import *    
from pyspark import SparkContext    
from pyspark.streaming import StreamingContext    
sc = SparkContext("local[2]", "NetworkWordCount")    
ssc = StreamingContext(sc, 5)    
topics=["kafka-test"]    
kafkaParams ={"bootstrap.servers" : "localhost:9092"}    

kafkaStream = KafkaUtils.createDirectStream(ssc,topics, kafkaParams )

********************错误信息:

Py4JJavaError: An error occurred while calling o26.createDirectStreamWithoutMessageHandler.
: java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
    at kafka.utils.Pool.<init>(Pool.scala:28)
    at kafka.consumer.FetchRequestAndResponseStatsRegistry$.<init>(FetchRequestAndResponseStats.scala:60)
    at kafka.consumer.FetchRequestAndResponseStatsRegistry$.<clinit>(FetchRequestAndResponseStats.scala)
    at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:39)
    at org.apache.spark.streaming.kafka.KafkaCluster.connect(KafkaCluster.scala:59)
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:364)
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:361)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
    at org.apache.spark.streaming.kafka.KafkaCluster.org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:361)
    at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:132)
    at org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:119)
    at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
    at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:720)
    at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStreamWithoutMessageHandler(KafkaUtils.scala:688)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: scala.collection.GenTraversableOnce$class
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 26 more