首页 文章

为什么我的火花工作卡在 Kafka 流媒体

提问于
浏览
1

火花作业提交到由minicube创建的kubernetes集群中的spark集群后的输出:

----------------- RUNNING ----------------------
[Stage 0:>                                                          (0 + 0) / 2]17/06/16 16:08:15 INFO VerifiableProperties: Verifying properties
17/06/16 16:08:15 INFO VerifiableProperties: Property group.id is overridden to xxx
17/06/16 16:08:15 INFO VerifiableProperties: Property zookeeper.connect is overridden to 
xxxxxxxxxxxxxxxxxxxxx
[Stage 0:>                                                          (0 + 0) / 2]

来自spark web ui的信息:

foreachRDD at myfile.scala:49详细信息org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:625)myfile.run(myfile.scala:49)Myjob $ .main(Myjob.scala:100) Myjob.main(Myjob.scala)sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)java.lang .reflect.Method.invoke(Method.java:498)org.apache.spark.deploy.SparkSubmit $ .org $ apache $ spark $ deploy $ SparkSubmit $$ runMain(SparkSubmit.scala:743)org.apache.spark.deploy .SparkSubmit $ .doRunMain $ 1(SparkSubmit.scala:187)org.apache.spark.deploy.SparkSubmit $ .submit(SparkSubmit.scala:212)org.apache.spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:126 )org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

我的代码:

println("----------------- RUNNING ----------------------");
    eventsStream.foreachRDD { rdd =>
        println("xxxxxxxxxxxxxxxxxxxxx")
        //println(rdd.count());
    if( !rdd.isEmpty )
    {
      println("yyyyyyyyyyyyyyyyyyyyyyy")
        val df = sqlContext.read.json(rdd);
        df.registerTempTable("data");

        val rules = rulesSource.rules();
        var resultsRDD : RDD[(String,String,Long,Long,Long,Long,Long,Long)]= sc.emptyRDD;
        rules.foreach { rule =>
        ...
        }

        sqlContext.dropTempTable("data")
    }
    else
    {
        println("-------");
        println("NO DATA");
        println("-------");
    }
}

任何的想法?谢谢

UPDATE

我的火花工作在独立火花的码头 Worker 容器中运行良好 . 但是如果提交给kubernetes集群中的spark集群,它就会卡在kafka流中 . 不明白为什么?

spark master的yaml文件来自https://github.com/phatak-dev/kubernetes-spark/blob/master/spark-master.yaml

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  labels:
    name: spark-master
  name: spark-master
spec:
  replicas: 1
  template:
    metadata:
      labels:
        name: spark-master
    spec:
      containers:
      - name : spark-master
        image: spark-2.1.0-bin-hadoop2.6 
        imagePullPolicy: "IfNotPresent"
        name: spark-master
        ports:
        - containerPort: 7077
          protocol: TCP
        command:
         - "/bin/bash"
         - "-c"
         - "--"
        args :
- './start-master.sh ; sleep infinity'

1 回答

  • 1

    日志将有助于诊断问题 .

    基本上你不能在RDD操作中创建另一个RDD . 即 rdd1.map{rdd2.count()} 无效

    查看 implicit sqlContext 导入后RDD如何转换为数据帧 .

    import sqlContext.implicits._
            eventsStream.foreachRDD { rdd =>
    
                println("yyyyyyyyyyyyyyyyyyyyyyy")
    
                val df = rdd.toDF(); 
                df.registerTempTable("data");
                .... //Your logic here.
                sqlContext.dropTempTable("data")
            }
    

相关问题