首页 文章

cassandra java驱动程序连接抛出noHostAvailableException

提问于
浏览
0

我有一个带有两个节点的cassandra集群 . 我已经设置了spark作业来从这个拥有3651568个密钥数量的cassandra集群进行查询 .

import com.datastax.spark.connector.rdd.ReadConf
import org.apache.spark.sql.cassandra
import org.apache.spark.sql.SparkSession

val conf = new SparkConf(true).set("spark.cassandra.connection.host", "hostname)
val sc = new SparkContext(conf)

val spark = SparkSession.builder().master("local").appName("Spark_Cassandra").config("spark.cassandra.connection.host", "hostname").getOrCreate()
val studentsDF = spark.read.cassandraFormat("keyspacename", "tablename").options(ReadConf.SplitSizeInMBParam.option(32)).load()
studentsDF.show(1000)

我能够查询前1000行,但我无法找到从 1001th 行读取到第2000行的方法,以便我可以使用spark作业从Cassandra表中批量读取数据 .

按照我开始使用java驱动程序的建议

这里是完整的解释

我必须使用datastax java驱动程序从cassandra数据库查询..我正在使用datastax java驱动程序版本 cassandra-java-driver-3.5.1 和apache-cassandra版本 apache-cassandra-3.0.9 我已经尝试通过安装jar来解决依赖关系我还检查了yaml文件种子,listen_address,rpc_address是所有指向我的主机并且start_native_transport设置为true这是我 Build 与cassandra数据库的连接的java代码

import java.net.InetAddress;
  import com.datastax.driver.core.Metadata;
  import java.net.UnknownHostException;
  import com.datastax.driver.core.Cluster;
  import com.datastax.driver.core.Cluster.Builder;
  import com.datastax.driver.core.Session;
  import com.datastax.driver.core.ResultSet;
  import com.datastax.driver.core.Row;
public class Started {
    public void connect()
    {
     try
       {
         Cluster cluster;
         Session session;
         cluster = Cluster.builder().addContactPoints("***.***.*.*").build();
       cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(2000);
         System.out.println("Connected to cluster:");
         session= cluster.connect("demo");
         Row row = session.execute("SELECT ename FROM demo.emp").one();
         System.out.println(row.getString("ename"));
         cluster.close();
        }
          catch (Exception e) {
              e.printStackTrace();
              }
           }
    public static void main(String[] args)
     {
       Started st = new Started();
       st.connect();
       }
          }

`

我在cassandra集群中只有一个节点,它已启动并运行 . 我能够在9042端口上cqlsh到它..到目前为止很好,但是当我运行我的java程序时,我收到此错误或异常消息...

Connected to cluster:
`

com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /***.***.*.*:9042 (com.datastax.driver.core.exceptions.TransportException: [/***.***.*.*:9042] Cannot connect))
            at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:232)
            at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:79)
            at com.datastax.driver.core.Cluster$Manager.negotiateProtocolVersionAndConnect(Cluster.java:1631)
            at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1549)
            at com.datastax.driver.core.Cluster.init(Cluster.java:160)
            at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:342)
            at com.datastax.driver.core.Cluster.connect(Cluster.java:292)
            at Started.connect(Started.java:22)
            at Started.main(Started.java:34)

`

任何人都可以请帮助!

2 回答

  • 0

    这是驱动程序兼容性的问题 . 最初我使用的是cassandra-java-driver-3.5.1和apache-cassandra-3.0.9 . 切换到cassandra-java-driver-3.0.8和apache-cassandra-3.0.9并安装几个jar文件: slf4j-log4j12-1.7.7.jarlog4j-1.2.17.jarnetty-all-4.0.39.Final.jar ..对我很好:)

  • 1

    这可能不适合Spark . 例如,显示只显示1000条记录,但不保证记录的顺序 . 多次调用可能会产生不同的结果 .

    在Spark中你最好的选择可能是将结果作为本地迭代器获取,如果你想翻阅它们,但这可能不是最好的做事方式 . Spark是一个用于处理远程群集上的数据的系统 . 这意味着在dataframe api中进行处理 .

    如果你真的只是想慢慢浏览记录,你可以使用 toLocalIterator 来获取批量回到你的驱动程序机器(不推荐) . 但是你可以通过使用Java Driver执行Select(*)来完成类似的事情 . 返回给您的结果集迭代器将在您逐步查看结果时自动分页结果 .

    使用Java驱动程序的分页的示例

    https://docs.datastax.com/en/developer/java-driver/3.2/manual/paging/

    ResultSet rs = session.execute("your query");
      for (Row row : rs) {
      // Process the row ...
      // By default this will only pull a new "page" of data from cassandra
      // when the previous page has been fully iterated through. See the
      // docs for more details    
    }
    

    使用Spark远程处理数据的示例

    RDD Docs for Cassandra Dataframe Docs for Cassandra // RDD API sparkContext.cassandraTable("ks","tab") . foreach(row => // processRow)

    //Dataframe API - although similar foreach is available here as well
    spark.read.format("org.apache.spark.sql.cassandra")
      .load()
      .select(//do some transforms)
      .write(//pickoutput of request)
    

    使用localIterator的示例,可能是最不相关的方法

    Why you might want to do this with an example

    // This reads all data in large blocks to executors, those blocks are then pulled one at a time back to the Spark Driver.
    sparkContext.cassandraTable("ks","tab").toLocalIterator
    

相关问题