我在通过python集成spark Kafka时遇到了这个问题 . 我已经改变了所有的火花库,但仍面临同样的问题 .

这是错误消息:

Traceback(最近一次调用最后一次):文件“D:/Work/kafka_wordcount.py”,第18行,kvs = KafkaUtils.createStream(ssc,zkQuorum,“spark-streaming-consumer”,{topic:1})文件“D:\ softwares \ ApacheSpark \ spark-2.2.0-bin-hadoop2.7 \ python \ lib \ pyspark.zip \ pyspark \ streaming \ kafka.py”,第70行,在createStream文件“D:\ softwares \ ApacheSpark中\ spark-2.2.0-bin-hadoop2.7 \ python \ lib \ py4j-0.10.4-src.zip \ py4j \ java_gateway.py“,第1133行,在调用文件”D:\ softwares \ ApacheSpark \ spark- 2.2.0-bin-hadoop2.7 \ python \ lib \ py4j-0.10.4-src.zip \ py4j \ protocol.py“,第319行,在get_return_value py4j.protocol.Py4JJavaError:调用o27.createStream时发生错误 . :java.lang.NoClassDefFoundError:org / apache / spark /在java.security.SecureClassLoader.defineClass上的java.lang.ClassLoader.defineClass(ClassLoader.java:760)的java.lang.ClassLoader.defineClass1(Native Method)中记录(位于java.net.URLClassLoader.access的java.net.URLClassLoader.defineClass(URLClassLoader.java:467)中的SecureClassLoader.java:142)java.net.URLClassLoader $ 1.run(URLClassLoader.java)中的$ 100(URLClassLoader.java:73) :368)java.net.URLClassLoader $ 1.run(URLClassLoader.java:362)java.security.AccessController.doPrivileged(Native Method),java.net.URLClassLoader.findClass(URLClassLoader.java:361)at java.lang位于org.apache.spark的java.lang.ClassLoader.loadClass(ClassLoader.java:357)的sun.misc.Launcher $ AppClassLoader.loadClass(Launcher.java:331)中的.ClassLoader.loadClass(ClassLoader.java:424) . streaming.kafka.Kenter代码hereafkaUtils $ .createStream(KafkaUtils.scala:81)位于org.apache.spark.streaming的org.apache.spark.streaming.kafka.KafkaUtils $ .createStream(KafkaUtils.scala:151) .kafka.KafkaUtilsPythonHelper.createStream(KafkaUtils.scala:555)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl .java:43)在java.lang.reflect.Method.invoke(Method.java:497)py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java: 357)py4j.Gateway.invoke(Gateway.java:280)py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)at py4j.commands.CallCommand.execute(CallCommand.java:79)py4j.GatewayConnection . 在java.lang.Thread.run上运行(GatewayConnection.java:214)(Thread.java:745)引起:java.lang.ClassNotFoundException:org.apache.spark.Logging at java.net.URLClassLoader.findClass(URLClassLoader . java:381)在sun.misc.Launcher的java.lang.ClassLoader.loadClass(ClassLoader.java:424)$ AppClassLoader.loadClass(Launcher.ja) va:331)在java.lang.ClassLoader.loadClass(ClassLoader.java:357)... 26更多

请建议任何解决方案 . 这是源代码:

import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 2)
    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic],{"bootstrap.servers": brokers})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
                  .map(lambda word: (word, 1)) \
                  .reduceByKey(lambda a, b: a+b)
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()

我使用python来集成spark-kafka,具体如下:

spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0 kafkacount.py localhost:9092 test_spark

spark version is 1.6.1.
python 2.6
kafka-client 0.10

我已经更改了spark库和kafka库但没有得到输出 .