我们正在对从MySQL收集的kafka数据进行流式传输 . 现在,一旦完成所有分析,我想将我的数据直接保存到Hbase . 我已经通过spark结构化的流媒体文档,但找不到Hbase的任何接收器 . 我用来读取 Kafka 数据的代码如下 .
val records = spark.readStream.format("kafka").option("subscribe", "kaapociot").option("kafka.bootstrap.servers", "XX.XX.XX.XX:6667").option("startingOffsets", "earliest").load
val jsonschema = StructType(Seq(StructField("header", StringType, true),StructField("event", StringType, true)))
val uschema = StructType(Seq(
StructField("MeterNumber", StringType, true),
StructField("Utility", StringType, true),
StructField("VendorServiceNumber", StringType, true),
StructField("VendorName", StringType, true),
StructField("SiteNumber", StringType, true),
StructField("SiteName", StringType, true),
StructField("Location", StringType, true),
StructField("timestamp", LongType, true),
StructField("power", DoubleType, true)
))
val DF_Hbase = records.selectExpr("cast (value as string) as Json").select(from_json($"json",schema=jsonschema).as("data")).select("data.event").select(from_json($"event", uschema).as("mykafkadata")).select("mykafkadata.*")
最后,我想在hbase中保存DF_Hbase数据帧 .
3 回答
即使使用pyspark,这种方法也适用于我:https://github.com/hortonworks-spark/shc/issues/205
我将名为HBaseSinkProvider.scala的文件添加到
shc/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase
并构建它,该示例工作正常这是示例,如何使用(scala):
以及我如何在python中使用它的一个例子:
1-将这些库添加到您的项目中:
2-将此特性添加到您的代码中:
3-将它用于你的逻辑:
您在处理来自Kafka的数据吗?或者只是将它泵送到HBase?要考虑的选项是使用Kafka Connect . 这为您提供了一种基于配置文件的方法,用于将Kafka与其他系统(包括HBase)集成 .