首页 文章

如何使用spark-cassandra-connector连接火花和 Cassandra ?

提问于
浏览
0

你必须原谅我的noobness但我正在尝试设置一个连接到运行python脚本的cassandra的spark集群,目前我正在使用datastax enterprise在solr搜索模式下运行cassandra . 据我所知,为了使用datastax提供的spark-cassandra连接器,您必须在分析模式下运行cassandra(使用-k选项) . 目前我只使用dse spark版本才能使用它,为了使它工作,我按照下面的步骤操作:

  • 在分析模式下启动dse cassandra

  • 将$ PYTHONPATH env变量更改为/path/to/spark/dse/python:/path/to/spark/dse/python/lib/py4j-*.zip:$PYTHONPATH

  • 以root身份运行独立脚本 python test-script.py

此外,我使用单独的spark(不是dse版本)进行了另一个测试,尝试包含使驱动程序类可访问的java包,我做了:

  • 将spark.driver.extraClassPath = /path/to/spark-cassandra-connector-SNAPSHOT.jar添加到文件spark-defaults.conf 2.execute $SPARK_HOME/bin/spark-submit —packages com.datastax.spark:spark-cassandra...

我也尝试运行pyspark shell并测试sc是否有方法cassandraTable来查看驱动程序是否已加载但是没有用完,在这两种情况下我都收到以下错误消息:

AttributeError: 'SparkContext' object has no attribute 'cassandraTable'

我的目标是要解决我必须做的事情,使非dse spark版本与cassandra连接,并使用驱动程序提供的方法 .

我还想知道是否可以将dse spark-cassandra连接器与不与dse一起运行的cassandra节点一起使用 .

谢谢你的帮助

2 回答

  • 1

    以下是如何在非dse版本中将spark-shell连接到cassandra .

    spark-cassandra-connector jar复制到 spark/spark-hadoop-directory/jars/

    spark-shell --jars ~/spark/spark-hadoop-directory/jars/spark-cassandra-connector-*.jar
    

    在spark shell中执行这些命令

    sc.stop
    import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf
    import  org.apache.spark.sql.cassandra._
    val conf = new SparkConf(true).set("spark.cassandra.connection.host", "localhost")
    val sc = new SparkContext(conf)
    val csc = new CassandraSQLContext(sc)
    

    如果你的cassandra有密码设置等,你将不得不提供更多参数:)

  • 1

    我在一个独立的python脚本中使用了pyspark . 我没有't use DSE, I cloned cassandra-spark-connector from datastax'的github存储库,并使用datastax instrucctions进行编译 .

    为了能够访问spark内的spark接口,我将其复制到spark安装中的jars文件夹中 .

    我认为这对你也有好处:

    cp ~/spark-cassandra-connector/spark-cassandra-connector/target/full/scala-2.11/spark-cassandra-connector-assembly-2.0.5-86-ge36c048.jar $SPARK_HOME/jars/
    

    你可以访问this,在那里我解释我自己设置环境的经验 .

    一旦spark可以访问Cassandra连接器,您就可以使用pyspark库作为包装器:

    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SQLContext, SparkSession
    
    spark = SparkSession.builder \
      .appName('SparkCassandraApp') \
      .config('spark.cassandra.connection.host', 'localhost') \
      .config('spark.cassandra.connection.port', '9042') \
      .config('spark.cassandra.output.consistency.level','ONE') \
      .master('local[2]') \
      .getOrCreate()
    
    ds = sqlContext \
      .read \
      .format('org.apache.spark.sql.cassandra') \
      .options(table='tablename', keyspace='keyspace_name') \
      .load()
    
    ds.show(10)
    

    在这个example中,您可以看到整个脚本 .

相关问题