首页 文章

在群集模式下读取文本文件时出现异常

提问于
浏览
2

我使用spark读取了一个文本文件并将其保存在JavaRDD中,并尝试打印保存在RDD中的数据 . 我正在一个拥有主服务器和两个从服务器的集群中运行我的代码 . 但是我得到异常,例如, containers exceeds thresholds ,而迭代RDD . 代码在独立模式下运行完美 .

我的代码:

SparkContext sc = new SparkContext("spark://master.com:7077","Spark-Phoenix");
JavaSparkContext jsc = new JavaSparkContext(sc);

JavaRDD<String> trs_testing = jsc.textFile("hdfs://master.com:9000/Table/sample");


//using iterator
Iterator<String> iStr= trs_testing.toLocalIterator();

while(iStr.hasNext()){ //here I am getting exception    
    System.out.println("itr next : " + iStr.next());        
}   

//using foreach()
trs_testing.foreach(new VoidFunction<String>() {//here I am getting exception
        private static final long serialVersionUID = 1L;

        @Override public void call(String line) throws Exception {
            System.out.println(line);       
        }           
});

//using collect()
for(String line:trs_testing.collect()){//here I am getting exception 
    System.out.println(line);
}

//using foreachPartition()
trs_testing.foreachPartition(new VoidFunction<Iterator<String>>() {//here I am getting exception 
        private static final long serialVersionUID = 1L;

        @Override public void call(Iterator<String> arg0) throws Exception {          
            while (arg0.hasNext()) {            
                String line = arg0.next();
                System.out.println(line);
            }
    }
});

例外:

ERROR TaskSchedulerImpl在master.com上丢失执行程序0:远程RPC客户端已取消关联 . 可能由于容器超出阈值或网络问题 . 检查驱动程序日志以获取WARN消息 . ERROR TaskSchedulerImpl在slave1.com上丢失执行程序1:远程RPC客户端已取消关联 . 可能由于容器超出阈值或网络问题 . 检查驱动程序日志以获取WARN消息 . ERROR TaskSchedulerImpl在master.com上丢失执行程序2:远程RPC客户端已取消关联 . 可能由于容器超出阈值或网络问题 . 检查驱动程序日志以获取WARN消息 . ERROR TaskSchedulerImpl在slave2.com上丢失执行程序3:远程RPC客户端已取消关联 . 可能由于容器超出阈值或网络问题 . 检查驱动程序日志以获取WARN消息 . ERROR TaskSetManager阶段0.0中的任务0失败了4次;中止作业线程“main”中的异常org.apache.spark.SparkException:作业因阶段失败而中止:阶段0.0中的任务0失败4次,最近失败:阶段0.0中丢失任务0.3(TID 3,slave1.com) :ExecutorLostFailure(执行程序3退出由其中一个正在运行的任务导致)原因:远程RPC客户端已取消关联 . 可能由于容器超出阈值或网络问题 . 检查驱动程序日志以获取WARN消息 . 驱动程序堆栈跟踪:在org.apache.spark.scheduler.DAGScheduler.org $阿帕奇$火花$ $调度$$ DAGScheduler failJobAndIndependentStages(DAGScheduler.scala:1454)在org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1在org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.适用(DAGScheduler.scala:1441)在scala.collection.mutable.ResizableArray $ class.foreach(ResizableArray.scala:申请(1442 DAGScheduler.scala): 59)位于org.apache.spark.scheduler.DAGScheduler上的org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)中的scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) anonfun $ handleTaskSetFailed $ 1.适用(DAGScheduler.scala:811)在org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.适用(DAGScheduler.scala:811)在scala.Option.foreach(Option.scala:257 )org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler . 阶:1667)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)在org.apache.spark.util .EventLoop $$匿名$ 1.run(EventLoop.scala:48)在org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)在org.apache.spark.SparkContext.runJob(SparkContext.scala:1890 )org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)at org.apache.spark.rdd.RDD $$ anonfun $ take $ 1.适用(RDD.scala:1324)在org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:151)在org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:112) org.apache.spark.rdd.RDD.withScope(RDD.scala:358)org.apache.spark.rdd.RDD.take(RDD.scala:1298)atg.apache.spark.rdd.RDD $$ anonfun $ first $ 1.apply(RDD.scala:1338)at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperation Scope.scala:151)位于org.apache的org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:112)atg.apache.spark.rdd.RDD.withScope(RDD.scala:358) . spark.rdd.RDD.first(RDD.scala:1337)at com.test.java.InsertE.main(InsertE.java:147)

2 回答

  • 0

    我得到了解决方案 . 我正在通过我的机器执行代码,而我的主机和从机在远程服务器上运行 . 我将我的代码导出到远程服务器,最终能够处理数据 .

  • 0

    在本地系统/独立模式下运行Spark作业时,所有数据都将在同一台计算机上,因此您将能够迭代并打印数据 .

    当Spark作业在集群模式/环境中运行时,数据将被拆分为片段并分发到集群中的所有计算机(RDD - 弹性分布式数据集) . 因此,要以这种方式打印,您必须使用 foreach() 函数 .

    Try this:

    trs_testing.foreach(new VoidFunction<String>(){ 
              public void call(String line) {
                  System.out.println(line);
              }
    });
    

相关问题