首页 文章

将Spark Dataframe保存到couchbase

提问于
浏览
0

我想将spark数据帧数据写入couchbase . 为此,我试图按如下方式进行: -

double[] val=new double[3]; 
SparkContext sc = new SparkContext(new SparkConf().setAppName("sql").setMaster("local").set("com.couchbase.nodes", "url_of_couchbase").set("com.couchbase.bucket.bucket_name", "password"));
SQLContext sql = new SQLContext(sc);
DataFrame df = sql.read().json("sample.json");
df.registerTempTable("sample");

DataFrame men=sql.sql("select mean(imp_recall_interval) from sample");
Row[] r=men.collect();
val[0]=Double.parseDouble(r[0].toString().replace("[", "").replace("]", "").trim());
JsonDocument doc1=JsonDocument.create("docId", JsonObject.create().put("mean", val[0]));
System.out.println("Data Saved");
JsonArrayDocument jrd=JsonArrayDocument.create("imp_recall_timeinterval_mean_median_sd", JsonArray.from("more", "content", "in", "here"));

但是,当我试图将这些并行化时,我无法做到这一点 .

sc.parrallelize(Seq(doc1,jrd));

请告诉我如何将这些数据保存到couchbase . 或者请指定其他方法,我也可以通过它创建创建并在Couchbase中保存文档

1 回答

  • 0

    试试这个 .

    import java.util.ArrayList;
    import java.util.List;
    import com.couchbase.spark.japi.CouchbaseDocumentRDD;
    import com.couchbase.client.java.document.AbstractDocument;
    
    
    JavaSparkContext jsc = new JavaSparkContext(sc);
    SQLContext sql = new SQLContext(jsc);
    
    JsonDocument doc1;
    JsonArrayDocument jrd;
    
    List<AbstractDocument> list = new ArrayList<AbstractDocument>();
    list.add(doc1);
    list.add(jrd);
    
    JavaRDD<AbstractDocument> jRDD = jsc.parallelize(list);
    CouchbaseDocumentRDD<AbstractDocument> cbRDD = CouchbaseDocumentRDD.couchbaseDocumentRDD(jRDD);
    cbRDD.saveToCouchbase();
    

相关问题