首页 文章

Spark 1.6 kafka在dataproc py4j错误上流式传输

提问于
浏览
4

我收到以下错误:

Py4JError(u'An在调用o73.createDirectStreamWithoutMessageHandler时发生错误.Trace:\ npy4j.Py4JException:方法createDirectStreamWithoutMessageHandler([class org.apache.spark.streaming.api.java.JavaStreamingContext,类java.util.HashMap,类java . util.HashSet,类java.util.HashMap])不存在\ n \ tat py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)\ n \ tat py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344 )\ n \ tat py4j.Gateway.invoke(Gateway.java:252)\ n \ tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\ n \ tat py4j.commands.CallCommand.execute(CallCommand.java) :79)\ n \ tat py4j.GatewayConnection.run(GatewayConnection.java:209)\ n \ tat java.lang.Thread.run(Thread.java:745)\ n \ n',)

我正在使用spark-streaming-kafka-assembly_2.10-1.6.0.jar(它出现在我所有节点master上的/ usr / lib / hadoop / lib /文件夹中)

(编辑)实际错误是:java.lang.NoSuchMethodError:org.apache.hadoop.yarn.util.Apps.crossPlatformify(Ljava / lang / String;)Ljava / lang / String;

这是由于错误的hadoop版本造成的 . 因此,应该使用正确的hadoop版本编译spark:

mvn -Phadoop-2.6 -Dhadoop.version=2.7.2 -DskipTests clean package

这将导致外部/ kafka-assembly / target文件夹中的jar .

1 回答

  • 1

    使用图像版本1,我已经成功运行了pyspark streaming / kafka example wordcount

    在每个示例中,“ad-kafka-inst”是我的测试kafka实例,带有“测试”主题 .

    • 使用没有初始化操作的集群:
    $ gcloud dataproc jobs submit pyspark --cluster ad-kafka2 --properties spark.jars.packages=org.apache.spark:spark-streaming-kafka_2.10:1.6.0 ./kafka_wordcount.py ad-kafka-inst:2181 test
    
    • 对完整的kafka程序集使用初始化操作:

    • 下载/解压缩spark-1.6.0.tgz

    • 构建:

    $ mvn -Phadoop-2.6 -Dhadoop.version=2.7.2 package
    
    • 将spark-streaming-kafka-assembly_2.10-1.6.0.jar上传到新的GCS存储桶(例如MYBUCKET) .

    • 在同一GCS存储桶中创建以下初始化操作(例如,gs://MYBUCKET/install_spark_kafka.sh):

    $ #!/bin/bash
    
    gsutil cp gs://MY_BUCKET/spark-streaming-kafka-assembly_2.10-1.6.0.jar /usr/lib/hadoop/lib/
    chmod 755 /usr/lib/hadoop/lib/spark-streaming-kafka-assembly_2.10-1.6.0.jar
    
    • 使用上述初始化操作启动集群:
    $ gcloud dataproc clusters create ad-kafka-init --initialization-actions gs://MYBUCKET/install_spark_kafka.sh
    
    • 开始流式字数:
    $ gcloud dataproc jobs submit pyspark --cluster ad-kafka-init ./kafka_wordcount.py ad-kafka-inst:2181 test
    

相关问题