我试图使用spark streaming来消耗kafka消息队列,在foreachRdd操作中,我试图根据从kafka取出的消息在rdd foreach动作中读取hbase . 但我得到一些错误,看起来有一些死锁 . 任何人都可以帮我弄清楚这里的问题是什么?以下是代码详细信息 .

KafkaUtils.createStream(streamingContext,kafkazk,kafkaGroup,kafkaTopic.split(",").map(_.trim -> 1).toMap)
.foreachRDD(_.processRDD(new CQRequestContext(null,new DBRequestContext(prefix,dbName,tableName,null,null,null))))
streamingContext.start()
streamingContext.awaitTermination()

我试图在每个rdd foreach动作中做一些hbase读取,我试图在foreachPartition动作中创建一个hbase连接,以减少连接数 .

def processRDD(cQRequestContext: CQRequestContext) = {
     rdd.foreachPartition(iterator=>{
          val connection = ConnectionFactory.createConnection(HbaseUtil.getHbaseConfiguration("FdsCQ"));
          val hTable= connection.getTable(TableName.valueOf("cqdev:sampleTestTable"))

      try {
      iterator.foreach(requestTuple =>{
        //processAIPRequest(requestTuple._2,cQRequestContext,myTable)

          val p = new Put(Bytes.toBytes("dabao12345"))
          p.addColumn(Bytes.toBytes(SellerConstants.CQ_ABUSE_RESULT.COLUMN_FAMILY), Bytes.toBytes(SellerConstants.CQ_ABUSE_RESULT.RESPONSE_CONTENT), Bytes.toBytes("content"))
          p.addColumn(Bytes.toBytes(SellerConstants.CQ_ABUSE_RESULT.COLUMN_FAMILY), Bytes.toBytes(SellerConstants.CQ_ABUSE_RESULT.PACKAGE_NOS), Bytes.toBytes("response"))
          p.addColumn(Bytes.toBytes(SellerConstants.CQ_ABUSE_RESULT.COLUMN_FAMILY), Bytes.toBytes(SellerConstants.CQ_ABUSE_RESULT.ABUSE_PACKAGES_NO), Bytes.toBytes("response"))
          p.addColumn(Bytes.toBytes(SellerConstants.CQ_ABUSE_RESULT.COLUMN_FAMILY), Bytes.toBytes(SellerConstants.CQ_ABUSE_RESULT.RESPONSE_CODE), Bytes.toBytes("responseCode"))
          p.addColumn(Bytes.toBytes(SellerConstants.CQ_ABUSE_RESULT.COLUMN_FAMILY), Bytes.toBytes(SellerConstants.CQ_ABUSE_RESULT.RESPONSE_STATUS), Bytes.toBytes("respone"))
          hTable.put(p)

      })

      } finally {
        //scanner.close
        hTable.close()
        connection.close()
      }


    })

}

最后,我得到以下错误,调试后我发现这不像是从hbase用完连接,在测试期间我只将一条消息放入kafka,当我第一次尝试创建hbase连接时,我得到以下错误 .

java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:503)
org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1342)
org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1036)
org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.exists(RecoverableZooKeeper.java:222)
org.apache.hadoop.hbase.zookeeper.ZKUtil.checkExists(ZKUtil.java:481)
org.apache.hadoop.hbase.zookeeper.ZKClusterId.readClusterIdZNode(ZKClusterId.java:65)
org.apache.hadoop.hbase.client.ZooKeeperRegistry.getClusterId(ZooKeeperRegistry.java:86)
org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.retrieveClusterId(ConnectionManager.java:849)
org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.<init>(ConnectionManager.java:670)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:526)
org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:238)
org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:218)
org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119)
com.XXXXXXXX.func.CQStreamingFunction$$anonfun$processRDD$1.apply(CQStreamingFunction.scala:50)
com.XXXXXXXX.func.CQStreamingFunction$$anonfun$processRDD$1.apply(CQStreamingFunction.scala:49)
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898)
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898)
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
org.apache.spark.scheduler.Task.run(Task.scala:88)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

当我尝试在纱线群集中以纱线群集模式运行我的火花流工作时,只会发生上述错误,当我在本地机器中以本地模式运行并连接到远程hbase时,一切正常 .

环境详情:CDH 5.5.2,火花1.5.0