需要将大型数据集写入CSV文件 . 以下是我的示例代码

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PropertiesLoaderUtils;
import org.apache.spark.sql.api.java.UDF3;
import org.apache.spark.sql.api.java.UDF2;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.apache.spark.sql.Dataset;

public class TestUdf3{
public static void main(String[] args) {
 System.setProperty("hadoop.home.dir", "F:\\JAVA\\winutils");
 JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));
 SQLContext sqlContext = new SQLContext(sc);
 List<Row> manufactuerSynonymData = new ArrayList<Row>();

try{    
  SparkSession spark = SparkSession.builder().appName("JavaTokenizerExample").getOrCreate();
  HashMap<String, String> options = new HashMap<String, String>();options.put("header", "true");options.put("path", "D:\\xls\\Source25K.csv");  //Load source excel file
  Dataset<Row> SourcePropertSet = sqlContext.load("com.databricks.spark.csv", options) ;


 Resource resource = new ClassPathResource("/ActaulManufacturerSynonym.properties");
 Properties allProperties = PropertiesLoaderUtils.loadProperties(resource);   
 StructType schemaManufactuerSynonymDictionary = new StructType(new StructField[] {new StructField("ManufacturerSynonymSource", DataTypes.StringType, false, Metadata.empty()), new StructField("ManufacturerSynonymTarget", DataTypes.StringType, false, Metadata.empty()) });


     Set<String> setuniqueManufacturerEntries=allProperties.stringPropertyNames();
     Row individualRowEntry;
     for (String individualManufacturerEntry : setuniqueManufacturerEntries) {
          individualRowEntry=RowFactory.create(individualManufacturerEntry,allProperties.getProperty(individualManufacturerEntry));
          manufactuerSynonymData.add(individualRowEntry);
     }

     Dataset<Row> SynonaymList = spark.createDataFrame(manufactuerSynonymData, schemaManufactuerSynonymDictionary).withColumn("ManufacturerSynonymSource", lower(col("ManufacturerSynonymSource")));
     SynonaymList.show(90,false);

     UDF2<String, String, Boolean> contains = new UDF2<String, String, Boolean>() {
     private static final long serialVersionUID = -5239951370238629896L;

     @Override
     public Boolean call(String t1, String t2) throws Exception {

               return t1.matches(t2);
     }
   };

     spark.udf().register("contains", contains, DataTypes.BooleanType);

     UDF3<String, String, String, String> replaceWithTerm = new UDF3<String, String, String, String>() {

     private static final long serialVersionUID = -2882956931420910207L;

     @Override

     public String call(String t1, String t2, String t3) throws Exception {

         return t1 .replaceAll(t2,t3);         
     }
 };
     spark.udf().register("replaceWithTerm", replaceWithTerm, DataTypes.StringType);

     Dataset<Row> joined = SourcePropertSet.join(SynonaymList, callUDF("contains", SourcePropertSet.col("manufacturersource"), SynonaymList.col("ManufacturerSynonymSource"))).withColumn("ManufacturerSource", callUDF("replaceWithTerm",SourcePropertSet.col("manufacturersource"),SynonaymList.col("ManufacturerSynonymSource"), SynonaymList.col("ManufacturerSynonymTarget")));
     joined.show(54000);
     joined.repartition(1).select("*").write().format("com.databricks.spark.csv").option("delimiter", ",")
     .option("header", "true")
     .option("treatEmptyValuesAsNulls", "true")  
     .option("nullValue", "")  
     .save("D:\\xls\\synonym.csv");
  }     
  catch(Exception e){
   e.printStackTrace();
   }
  }
 }

在上面的代码中,而不是使用Statement在控制台中显示输出:
joined.show(54000,false);

我需要直接将它写入csv文件

它给了我一个运行时异常:
1. save("D:\xls\synonym.csv");

org.apache.spark.SparkException:作业已中止 .

Caused by

org.apache.spark.SparkException:作业因阶段失败而中止:阶段3.0中的任务0失败1次,最近失败:阶段3.0中丢失的任务0.0(TID 3,localhost, Actuator 驱动程序):org.apache.spark .SparkException:无法执行用户定义的函数($ anonfun $ apply $ 2:(string,string)=> boolean)

2. return t1.matches(t2);

java.lang.NullPointerException

Caused by:

org.apache.spark.SparkException:无法执行用户定义的函数($ anonfun $ apply $ 2:(string,string)=> boolean)

任何人都可以建议如何将大数据集写入excel文件