我是Cassandra和Spark的新手,并尝试使用spark从DB获取数据 . 我正在使用Java来达到这个目的 . 问题是没有抛出异常或发生错误但仍然无法获取数据 . 在下面找到我的代码 -
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("Spark-Cassandra Integration");
sparkConf.setMaster("local[4]");
sparkConf.set("spark.cassandra.connection.host", "stagingHost22");
sparkConf.set("spark.cassandra.connection.port", "9042");
sparkConf.set("spark.cassandra.connection.timeout_ms", "5000");
sparkConf.set("spark.cassandra.read.timeout_ms", "200000");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
String keySpaceName = "testKeySpace";
String tableName = "testTable";
CassandraJavaRDD<CassandraRow> cassandraRDD = CassandraJavaUtil.javaFunctions(javaSparkContext).cassandraTable(keySpaceName, tableName);
final ArrayList dataList = new ArrayList();
JavaRDD<String> userRDD = cassandraRDD.map(new Function<CassandraRow, String>() {
private static final long serialVersionUID = -165799649937652815L;
public String call(CassandraRow row) throws Exception {
System.out.println("Inside RDD call");
dataList.add(row);
return "test";
}
});
System.out.println( "data Size -" + dataList.size());
Cassandra和spark maven依赖是 -
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-mapping</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-extras</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.sparkjava</groupId>
<artifactId>spark-core</artifactId>
<version>2.5.4</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.10</artifactId>
<version>2.0.0-M3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.3.0</version>
</dependency>
这肯定stagingHost22主机具有带键空间的cassandra数据 - testKeySpace和表名 - testTable . 查找以下查询输出 -
cqlsh:testKeySpace>从testTable中选择count(*);数34(1行)
任何人都可以建议我在这里缺少什么?
提前致谢 .
温暖的问候,
Vibhav
1 回答
您当前的代码不执行任何Spark操作 . 因此,不会加载任何数据 .
请参阅Spark文档以了解Spark中的转换和操作之间的区别:http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations
此外,在使用Cassandra连接器时,将CassandraRows添加到ArrayList并不是通常需要的 . 我建议先实现一个简单的选择(遵循Spark-Cassandra-Connector文档) . 如果这样做,您可以根据需要扩展此代码 .
检查以下链接,了解如何使用连接器加载数据:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md