首页 文章

Spark结构化流与Hbase集成

提问于
浏览
3

我们正在对从MySQL收集的kafka数据进行流式传输 . 现在,一旦完成所有分析,我想将我的数据直接保存到Hbase . 我已经通过spark结构化的流媒体文档,但找不到Hbase的任何接收器 . 我用来读取 Kafka 数据的代码如下 .

val records = spark.readStream.format("kafka").option("subscribe", "kaapociot").option("kafka.bootstrap.servers", "XX.XX.XX.XX:6667").option("startingOffsets", "earliest").load
 val jsonschema = StructType(Seq(StructField("header", StringType, true),StructField("event", StringType, true)))
 val uschema = StructType(Seq(
             StructField("MeterNumber", StringType, true),
             StructField("Utility", StringType, true),
             StructField("VendorServiceNumber", StringType, true),
             StructField("VendorName", StringType, true),
             StructField("SiteNumber",  StringType, true),
             StructField("SiteName", StringType, true),
             StructField("Location", StringType, true),
             StructField("timestamp", LongType, true),
             StructField("power", DoubleType, true)
             ))
 val DF_Hbase = records.selectExpr("cast (value as string) as Json").select(from_json($"json",schema=jsonschema).as("data")).select("data.event").select(from_json($"event", uschema).as("mykafkadata")).select("mykafkadata.*")

最后,我想在hbase中保存DF_Hbase数据帧 .

3 回答

  • 0

    即使使用pyspark,这种方法也适用于我:https://github.com/hortonworks-spark/shc/issues/205

    package HBase
    import org.apache.spark.internal.Logging
    import org.apache.spark.sql.execution.streaming.Sink
    import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
    import org.apache.spark.sql.streaming.OutputMode
    import org.apache.spark.sql.{DataFrame, SQLContext}
    import org.apache.spark.sql.execution.datasources.hbase._
    
    class HBaseSink(options: Map[String, String]) extends Sink with Logging {
      // String with HBaseTableCatalog.tableCatalog
      private val hBaseCatalog = options.get("hbasecat").map(_.toString).getOrElse("")
    
      override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {   
        val df = data.sparkSession.createDataFrame(data.rdd, data.schema)
        df.write
          .options(Map(HBaseTableCatalog.tableCatalog->hBaseCatalog,
            HBaseTableCatalog.newTable -> "5"))
          .format("org.apache.spark.sql.execution.datasources.hbase").save()
      }
    }
    
    class HBaseSinkProvider extends StreamSinkProvider with DataSourceRegister {
      def createSink(
                      sqlContext: SQLContext,
                      parameters: Map[String, String],
                      partitionColumns: Seq[String],
                      outputMode: OutputMode): Sink = {
        new HBaseSink(parameters)
      }
    
      def shortName(): String = "hbase"
    }
    

    我将名为HBaseSinkProvider.scala的文件添加到 shc/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase 并构建它,该示例工作正常

    这是示例,如何使用(scala):

    inputDF.
       writeStream.
       queryName("hbase writer").
       format("HBase.HBaseSinkProvider").
       option("checkpointLocation", checkPointProdPath).
       option("hbasecat", catalog).
       outputMode(OutputMode.Update()).
       trigger(Trigger.ProcessingTime(30.seconds)).
       start
    

    以及我如何在python中使用它的一个例子:

    inputDF \
        .writeStream \
        .outputMode("append") \
        .format('HBase.HBaseSinkProvider') \
        .option('hbasecat', catalog_kafka) \
        .option("checkpointLocation", '/tmp/checkpoint') \
        .start()
    
  • 4

    1-将这些库添加到您的项目中:

    "org.apache.hbase" % "hbase-client" % "2.0.1"
          "org.apache.hbase" % "hbase-common" % "2.0.1"
    

    2-将此特性添加到您的代码中:

    import java.util.concurrent.ExecutorService
       import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
       import org.apache.hadoop.hbase.security.User
       import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
       import org.apache.spark.sql.ForeachWriter
    
       trait HBaseForeachWriter[RECORD] extends ForeachWriter[RECORD] {
    
         val tableName: String
         val hbaseConfResources: Seq[String]
    
         def pool: Option[ExecutorService] = None
    
         def user: Option[User] = None
    
         private var hTable: Table = _
         private var connection: Connection = _
    
    
         override def open(partitionId: Long, version: Long): Boolean = {
           connection = createConnection()
           hTable = getHTable(connection)
           true
         }
    
         def createConnection(): Connection = {
           val hbaseConfig = HBaseConfiguration.create()
           hbaseConfResources.foreach(hbaseConfig.addResource)
           ConnectionFactory.createConnection(hbaseConfig, pool.orNull,                      user.orNull)
    
         }
    
         def getHTable(connection: Connection): Table = {
           connection.getTable(TableName.valueOf(tableName))
         }
    
         override def process(record: RECORD): Unit = {
           val put = toPut(record)
           hTable.put(put)
         }
    
         override def close(errorOrNull: Throwable): Unit = {
           hTable.close()
           connection.close()
         }
    
         def toPut(record: RECORD): Put
    
       }
    

    3-将它用于你的逻辑:

    val ds = .... //anyDataset[WhatEverYourDataType]
    
        val query = ds.writeStream
               .foreach(new HBaseForeachWriter[WhatEverYourDataType] {
                                override val tableName: String = "hbase-table-name"
                                //your cluster files, i assume here it is in resources  
                                override val hbaseConfResources: Seq[String] = Seq("core-site.xml", "hbase-site.xml") 
    
                                override def toPut(record: WhatEverYourDataType): Put = {
                                  val key = .....
                                  val columnFamaliyName : String = ....
                                  val columnName : String = ....
                                  val columnValue = ....
    
                                  val p = new Put(Bytes.toBytes(key))
                                  //Add columns ... 
                       p.addColumn(Bytes.toBytes(columnFamaliyName),
                                   Bytes.toBytes(columnName), 
                                   Bytes.toBytes(columnValue))
    
                                  p
                                }
    
                              }
               ).start()
    
             query.awaitTermination()
    
  • 2

    您在处理来自Kafka的数据吗?或者只是将它泵送到HBase?要考虑的选项是使用Kafka Connect . 这为您提供了一种基于配置文件的方法,用于将Kafka与其他系统(包括HBase)集成 .

相关问题