首页 文章

Spark-Cassandra Connector:无法打开与Cassandra的本机连接

提问于
浏览
4

我是Spark和Cassandra的新手 . 在尝试提交spark作业时,我在连接到Cassandra时遇到错误 .

细节:

版本:

Spark : 1.3.1 (build for hadoop 2.6 or later : spark-1.3.1-bin-hadoop2.6)
Cassandra : 2.0
Spark-Cassandra-Connector: 1.3.0-M1
scala : 2.10.5

Spark和Cassandra在虚拟集群上集群详细信息:

Spark Master : 192.168.101.13
Spark Slaves : 192.168.101.11 and 192.168.101.12
Cassandra Nodes: 192.168.101.11 (seed node) and 192.168.101.12

我试图通过我的客户机(笔记本电脑) - 172.16.0.6提交工作 . 在使用Google搜索此错误后,我确保可以从客户端计算机ping群集中的所有计算机:spark master / slaves和cassandra节点,并在所有计算机上禁用防火墙 . 但我仍然在努力解决这个错误 .

Cassandra.yaml

listen_address: 192.168.101.11 (192.168.101.12 on other cassandra node)
start_native_transport: true
native_transport_port: 9042
start_rpc: true
rpc_address: 192.168.101.11 (192.168.101.12 on other cassandra node)
rpc_port: 9160

我正在尝试运行最小的样本作业

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import com.datastax.spark.connector._

val rdd = sc.cassandraTable("test", "words")
rdd.toArray.foreach(println)

要提交作业,我使用spark-shell(:将代码粘贴到spark shell中):

spark-shell --jars "/home/ameya/.m2/repository/com/datastax/spark/spark-cassandra-connector_2.10/1.3.0-M1/spark-cassandra-connector_2.10-1.3.0-M1.jar","/home/ameya/.m2/repository/com/datastax/cassandra/cassandra-driver-core/2.1.5/cassandra-driver-core-2.1.5.jar","/home/ameya/.m2/repository/com/google/collections/google-collections/1.0/google-collections-1.0.jar","/home/ameya/.m2/repository/io/netty/netty/3.8.0.Final/netty-3.8.0.Final.jar","/home/ameya/.m2/repository/com/google/guava/guava/14.0.1/guava-14.0.1.jar","/home/ameya/.m2/repository/io/dropwizard/metrics/metrics-core/3.1.0/metrics-core-3.1.0.jar","/home/ameya/.m2/repository/org/slf4j/slf4j-api/1.7.10/slf4j-api-1.7.10.jar","/home/ameya/.m2/repository/com/google/collections/google-collections/1.0/google-collections-1.0.jar","/home/ameya/.m2/repository/io/netty/netty/3.8.0.Final/netty-3.8.0.Final.jar","/home/ameya/.m2/repository/com/google/guava/guava/14.0.1/guava-14.0.1.jar","/home/ameya/.m2/repository/org/apache/cassandra/cassandra-clientutil/2.1.5/cassandra-clientutil-2.1.5.jar","/home/ameya/.m2/repository/joda-time/joda-time/2.3/joda-time-2.3.jar","/home/ameya/.m2/repository/org/apache/cassandra/cassandra-thrift/2.1.3/cassandra-thrift-2.1.3.jar","/home/ameya/.m2/repository/org/joda/joda-convert/1.2/joda-convert-1.2.jar","/home/ameya/.m2/repository/org/apache/thrift/libthrift/0.9.2/libthrift-0.9.2.jar","/home/ameya/.m2/repository/org/apache/thrift/libthrift/0.9.2/libthrift-0.9.2.jar" --master spark://192.168.101.13:7077 --conf spark.cassandra.connection.host=192.168.101.11 --conf spark.cassandra.auth.username=cassandra --conf spark.cassandra.auth.password=cassandra

我得到的错误:

warning: there were 1 deprecation warning(s); re-run with -deprecation for details
**java.io.IOException: Failed to open native connection to Cassandra at {192.168.101.11}:9042**
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:181)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:167)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:167)
    at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
    at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
    at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:76)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:104)
    at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:115)
    at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:243)
    at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.tableDef(CassandraTableRowReaderProvider.scala:49)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef$lzycompute(CassandraTableScanRDD.scala:59)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef(CassandraTableScanRDD.scala:59)
    at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.verify(CassandraTableRowReaderProvider.scala:148)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.verify(CassandraTableScanRDD.scala:59)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:118)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1512)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
    at org.apache.spark.rdd.RDD.toArray(RDD.scala:833)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:40)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:42)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:44)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:52)
    at $iwC$$iwC$$iwC.<init>(<console>:54)
    at $iwC$$iwC.<init>(<console>:56)
    at $iwC.<init>(<console>:58)
    at <init>(<console>:60)
    at .<init>(<console>:64)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
**Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /192.168.101.11:9042 (com.datastax.driver.core.TransportException: [/192.168.101.11:9042] Connection has been closed))**
    at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:223)
    at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:78)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1236)
    at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:333)
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:174)
    ... 71 more

谁能指出我在这里做错了什么?

5 回答

  • 0

    你没有指定 spark.cassandra.connection.host 默认情况下spark假设cassandra主机与spark主节点相同 .

    var sc:SparkContext=_
    val conf = new SparkConf().setAppName("Cassandra Demo").setMaster(master)
    .set("spark.cassandra.connection.host", "192.168.101.11")
    c=new SparkContext(conf)
    
    val rdd = sc.cassandraTable("test", "words")
    rdd.toArray.foreach(println)
    

    如果你已正确设置种子nodein cassandra.yaml 它应该工作

  • 4

    我一夜之间就在这个问题上挣扎,最后得到了一个有效的组合 . 我正在为那些可能遇到类似问题的人写下来 .

    首先,这是一个版本问题cassandra-driver-core的依赖 . 但要追查有效的确切组合需要花费我很多时间 .

    其次,这是适合我的组合 .

    • Spark 1.6.2 with Hadoop 2.6,cassandra 2.1.5(Ubuntu 14.04,Java 1.8),

    • 在built.sbt(sbt assembly,scalaVersion:= "2.10.5")中,使用

    "com.datastax.spark" %% "spark-cassandra-connector" % "1.4.0", "com.datastax.cassandra" % "cassandra-driver-core" % "2.1.5"

    第三,让我澄清一下我的挫败感 . 使用spark-cassandra-connector 1.5.0,我可以使用--master“local [2]”在具有远程cassandra连接的同一台机器上运行带有spark-submit的程序集,没有任何问题 . 连接器1.5.0,1.6.0与Cassandra 2.0,2.1,2.2,3,4的任何组合都能很好地工作 . 但是,如果我尝试使用--master yarn --deploy-mode集群从同一台机器(NodeManager)将作业提交到集群,那么我将始终遇到问题:无法在{192.168打开与Cassandra的本机连接 . 122.12}:9042

    这里发生了什么?来自DataStarX的任何一个都可以看看这个问题?我只能猜测它与“cqlversion”有关,它应该与Cassandra集群的版本相匹配 .

    有人知道更好的解决方案吗? [cassandra],[apache-spark]

  • 6

    问题解决了 . 这是由于一些混乱的依赖 . 我构建了一个带有依赖项的jar并将其传递给spark-submit,而不是单独指定依赖的jar .

  • 2

    终于工作了:

    脚步 :

    • 将listen_address设置为EC2实例的私有IP .

    • 不设置任何broadcast_address

    • 将rpc_address设置为0.0.0.0

    • 将broadcast_rpc_address设置为EC2实例的公共IP .

  • 2

    这是cassandra-driver-core jar的依赖版本的问题 .

    The provided cassandra's version is 2.0
    The provided cassandra-driver-core jar's version is 2.1.5
    

    jar应该与运行cassandra的版本相同 .

    In this case, the included jar file should be cassandra-driver-core-2.0.0.jar
    

相关问题