首页 文章

Datastax DSE Cassandra,Spark,Shark,Standalone Programm

提问于
浏览
2

我使用Datastax Enterprise 4.5 . 我希望我的配置正确,我就像在datastax网站上解释的那样 . 我可以使用Windows服务写入Cassandra数据库,这可以工作,但我无法使用where函数查询Spark .

我用“./dse cassandra -k -t”(在/ bin文件夹中)启动Cassandra节点(只有一个用于测试目的),因此hadoop和spark都在运行 . 我可以毫无问题地写入 Cassandra .

所以当'where'不是RowKey时,你不能在Cassandra查询中使用'where'子句 . 所以我需要使用Spark / Shark . 我可以使用shark(./dse shark)开始并使用我需要的所有查询,但我需要在Scala或Java中编写一个Standalone程序 .

所以我尝试了这个链接:https://github.com/datastax/spark-cassandra-connector

我可以查询一个简单的语句,如:

val conf = new SparkConf(true)
  .set("spark.cassandra.connection.host", "MY_IP")
  .setMaster("spark://MY_IP:7077")
  .setAppName("SparkTest")

// Connect to the Spark cluster:
lazy val sc = new SparkContext(conf)

val rdd = sc.cassandraTable("keyspace", "tablename")
println(rdd.first)

这样做效果很好,但如果我要求更多行或计数:

println(rdd.count)
rdd.toArray.foreach(println)

然后我得到这个例外:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up.
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

当我在Java中尝试这个时,我遇到了同样的问题 . 有谁知道这个问题?我不知道数据库配置是否正确或scala / Javaprogram是否正常工作 . 也许一些港口被封锁但7077和4040是开放的 .

旁注:如果我在Cassandra DB上启动spark,我可以执行以下查询:

sc.cassandraTable("test","words").select("word").toArray.foreach(println)

但是,如果我使用“where”子句,如:

sc.cassandraTable("test","words").select("word").where("word = ?","foo").toArray.foreach(println)

我得到这个例外:

java.io.IOException: Exception during query execution: SELECT "word" FROM "test"."words" WHERE token("word") > 0 AND word = ? ALLOW FILTERING

你知道为什么?我以为我可以在火花中使用where子句?

谢谢!

2 回答

  • 2
    All masters are unresponsive!
    

    意味着您尝试连接的IP实际上并不受spark限制 . 所以这基本上是一个网络配置错误 . 扫描以查看7077上正在侦听的接口,并确保连接到正确的接口 .

    至于第二个问题, where 运算符暗示你将对该子句进行谓词下推 . 目前,您无法使用主键执行此操作 . 如果你想在一个主键上 where 你可以做 filter 来完成它,但你不会看到很好的性能,因为这将进行整个表扫描 .

  • 0

    到目前为止这是我的解决方案 . 这不是我所有问题的答案,但它适用于我,我想分享给你 .

    我使用hive jdbc驱动程序访问带有Java的SharkServer . 这个怎么运作:

    启动sharkserver: bin/dse shark --service sharkserver -p <port>

    Maven的依赖关系:

    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-jdbc</artifactId>
        <version>0.13.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-core</artifactId>
        <version>0.20.2</version>
    </dependency>
    

    Java代码:

    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.sql.Statement;
    
    public class HiveJdbcClient {
      private static String driverName = "org.apache.hadoop.hive.jdbc.HiveDriver";
    
      public static void main(String[] args) throws SQLException {
        try {
          Class.forName(driverName);
        } catch (ClassNotFoundException e) {
          e.printStackTrace();
          System.exit(1);
        }
        Connection con = DriverManager.getConnection("jdbc:hive://YOUR_IP:YOUR_PORT/default", "", "");
        Statement stmt = con.createStatement();
        String sql;
        ResultSet res;
    
    
    
        sql = "SELECT * FROM keyspace.colFam WHERE name = 'John'";
        res = stmt.executeQuery(sql);
        while (res.next()) {
            System.out.println(res.getString("name"));
       }
     }
    }
    

相关问题