首页 文章

如何在cassandra中保存火花流数据

提问于
浏览
2

build.sbt 以下是build.sbt文件中包含的内容


val sparkVersion = "1.6.3" 
scalaVersion := "2.10.5" 
resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven" 
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"org.apache.spark" %% "spark-streaming-kafka" % sparkVersion)
libraryDependencies +="datastax" % "spark-cassandra-connector" % "1.6.3-s_2.10"
libraryDependencies +="org.apache.spark" %% "spark-sql" % "1.1.0"

Command to initialize shell: 以下命令是我遵循的shell初始化过程

/usr/hdp/2.6.0.3-8/spark/bin/spark-shell --packages datastax:spark-cassandra-connector:1.6.3-s_2.10 --conf spark.cassandra.connection.host=127.0.0.1 –jars spark-streaming-kafka-assembly_2.10-1.6.3.jar

Note: 这里我特意指定了jar,因为SBT无法获取在后续版块中创建kafkaStream时使用的所需的spark spark kafka库

Import required libraries:

此部分包括要在REPL会话的各种情况下使用的要导入的库

import org.apache.spark.SparkConf; import org.apache.spark.streaming.StreamingContext; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.kafka.KafkaUtils; import com.datastax.spark.connector._ ; import org.apache.spark.sql.cassandra._ ;

Setting up Spark Streaming Configuration:

这里配置火花流上下文所需的配置


val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
conf.set("spark.driver.allowMultipleContexts", "true"); // Required to set this to true because during // shell initialization or starting we a spark context is created with configurations of highlighted
conf.setMaster("local"); // then we are assigning those cofigurations locally

Creation of SparkStreamingContext using above configurations: 使用上面定义的配置,我们以下面的方式创建一个Spark流式上下文

val ssc = new StreamingContext(conf, Seconds(1)); // Seconds here describe the interval to fetch

Creating a Kafka stream using above Spark Streaming Context aka SSC: 这里ssc是上面创建的火花流上下文,“localhost:2181”是ZKquoram "spark-streaming-consumer-group"是消费者群组 Map ("test3" - > 5)是 Map (“主题” - >分区数)

val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("test3" -> 5)).map(_._2)

Note 使用 kafkaStream.print() 打印 kafkaStream 对象时获取的值如下图所示
enter image description here


85052,19,960.00,0,2017-08-29 14:52:41,17,VISHAL_GWY01_HT1,26,VISHAL_GTWY17_PRES_01,1,2,4                                                             
85053,19,167.00,0,2017-08-29 14:52:41,17,VISHAL_GWY01_HT1,25,VISHAL_GTWY1_Temp_01,1,2,4                                                              
85054,19,960.00,0,2017-08-29 14:52:41,17,VISHAL_GWY01_HT1,26,VISHAL_GTWY17_PRES_01,1,2,4                                                             
85055,19,167.00,0,2017-08-29 14:52:54,17,VISHAL_GWY01_HT1,25,VISHAL_GTWY1_Temp_01,1,2,4                                                              
85056,19,960.00,0,2017-08-29 14:52:54,17,VISHAL_GWY01_HT1,26,VISHAL_GTWY17_PRES_01,1,2,4                                                             
85057,19,167.00,0,2017-08-29 14:52:55,17,VISHAL_GWY01_HT1,25,VISHAL_GTWY1_Temp_01,1,2,4                                                              
85058,19,960.00,0,2017-08-29 14:52:55,17,VISHAL_GWY01_HT1,26,VISHAL_GTWY17_PRES_01,1,2,4                                                             

17/09/02 18:25:25 INFO JobScheduler: Finished job streaming job 1504376716000 ms.0 from job set of time 1504376716000 ms                             
17/09/02 18:25:25 INFO JobScheduler: Total delay: 9.661 s for time 1504376716000 ms (execution: 0.021 s)                                             
17/09/02 18:25:25 INFO JobScheduler: Starting job streaming job 1504376717000 ms.0 from job set of time 1504376717000 ms

Transforming the kafkaStream and saving in Cassandra:


kafkaStream.foreachRDD( rdd => { 
if (! rdd.isEmpty()) { 
rdd.map( line => { 
val arr = line.split(",");
(arr(0), arr(1), arr(2), arr(3), arr(4), arr(5), arr(6), arr(7), arr(8), arr(9), arr(10), arr(11))
}). saveToCassandra("test", "sensorfeedVals", SomeColumns(
"tableid", "ccid", "paramval", "batVal", "time", "gwid", "gwhName", "snid", "snhName", "snStatus", "sd", "MId")
)
} else {
 println("No records to save")
}
}
)

Start ssc:

使用 ssc.start ,您可以开始流式传输

这里面临的问题是:1 . 只有在我输入 exitCtrl+C 后才会打印流内容 . 每当我使用 ssc.start 时,它会立即开始流式传输吗?没有时间进入 ssc.awaitTermination 3.当我试图在下面的程序中正常保存时的主要问题***

val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4)))
collection.saveToCassandra("test", "kv", SomeColumns("key", "value"))

能够保存在Cassandra但是每当我尝试使用 Transforming the kafkaStream and saving in Cassandra: 中显示的逻辑保存在Cassandra时,我无法从字符串中提取每个值并将其保存在Cassandra表的各个列中!

3 回答

  • 1

    java.lang.NoClassDefFoundError: Could not initialize class com.datastax.spark.connector.cql.CassandraConnector

    表示尚未为您的应用程序正确设置类路径 . 确保在启动应用程序时使用 --packages 选项,如SCC Docs中所述

    对于你的其他问题

    您在REPL中不需要 awaitTermination ,因为在启动流上下文后,repl不会立即退出 . 该调用是针对一个应用程序,可能没有进一步的指令来阻止主线程退出 .

    Start将立即开始流式传输 .

  • 1

    与上下文相关的一行或两行代码导致此问题!

    当我走过上下文的主题时,我找到了解决方案!

    在这里,我运行多个上下文,但它们彼此独立 .

    我用以下命令初始化了shell:

    /usr/hdp/2.6.0.3-8/spark/bin/spark-shell --packages datastax:spark-cassandra-connector:1.6.3-s_2.10 --conf spark.cassandra.connection.host=127.0.0.1 –jars spark-streaming-kafka-assembly_2.10-1.6.3.jar

    因此,当shell启动时,初始化具有Datastax连接器属性的spark上下文 .

    后来我创建了一些配置并使用这些配置创建了一个火花流上下文 . 使用这个上下文我创建了kafkaStream . 这个kafkaStream只有SSC的属性而不是SC,所以这里提出了存储到cassandra的问题 .

    我试图在下面解决它并成功!


    val sc = new SparkContext(new SparkConf().setAppName("Spark-Kafka-Streaming").setMaster("local[*]").set("spark.cassandra.connection.host", "127.0.0.1"))
    val ssc = new StreamingContext(sc, Seconds(10))
    

    Thanks everyone who came forward to support! Let me know if any more best possible ways to achieve it!

  • 0

    一种非常简单的方法是将流转换为foreachRDD API的数据帧,将RDD转换为DataFrame并使用SparkSQL-Cassandra Datasource API保存到cassandra . 下面是一个简单的代码片段,我将Twitter推文保存到Cassandra表中

    stream.foreachRDD(rdd => {
      if (rdd.count() > 0) {
        val data = rdd.filter(status => status.getLang.equals("en")).map(status => TweetsClass(status.getId,
          status.getCreatedAt.toGMTString(),
          status.getUser.getLocation,
          status.getText)).toDF()
        //Save the data to Cassandra
        data.write.
          format("org.apache.spark.sql.cassandra").
          options(Map(
            "table" -> "sentiment_tweets",
            "keyspace" -> "My Keyspace",
            "cluster" -> "My Cluster")).mode(SaveMode.Append).save()
    
      }
    })
    

相关问题