首页 文章

无法使用spark(java)从Cassandra获取数据

提问于
浏览
1

我是Cassandra和Spark的新手,并尝试使用spark从DB获取数据 . 我正在使用Java来达到这个目的 . 问题是没有抛出异常或发生错误但仍然无法获取数据 . 在下面找到我的代码 -

SparkConf sparkConf = new SparkConf();
    sparkConf.setAppName("Spark-Cassandra Integration");
    sparkConf.setMaster("local[4]");
    sparkConf.set("spark.cassandra.connection.host", "stagingHost22");
    sparkConf.set("spark.cassandra.connection.port", "9042");

    sparkConf.set("spark.cassandra.connection.timeout_ms", "5000");
    sparkConf.set("spark.cassandra.read.timeout_ms", "200000");


    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
    String keySpaceName = "testKeySpace";
    String tableName = "testTable";

    CassandraJavaRDD<CassandraRow> cassandraRDD = CassandraJavaUtil.javaFunctions(javaSparkContext).cassandraTable(keySpaceName, tableName);

    final ArrayList dataList = new ArrayList();
    JavaRDD<String> userRDD = cassandraRDD.map(new Function<CassandraRow, String>() {

        private static final long serialVersionUID = -165799649937652815L;


        public String call(CassandraRow row) throws Exception {
            System.out.println("Inside RDD call");
            dataList.add(row);
            return "test";
        }
    });
    System.out.println( "data Size -" + dataList.size());

Cassandra和spark maven依赖是 -

<dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-core</artifactId>
        <version>3.0.0</version>
    </dependency>

    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-mapping</artifactId>
        <version>3.0.0</version>
    </dependency>

    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-extras</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.sparkjava</groupId>
        <artifactId>spark-core</artifactId>
        <version>2.5.4</version>
    </dependency>
     <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>2.0.0-M3</version>
    </dependency>
     <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.4.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.3.0</version>
    </dependency>

这肯定stagingHost22主机具有带键空间的cassandra数据 - testKeySpace和表名 - testTable . 查找以下查询输出 -

cqlsh:testKeySpace>从testTable中选择count(*);数34(1行)

任何人都可以建议我在这里缺少什么?

提前致谢 .

温暖的问候,

Vibhav

1 回答

相关问题