首页 文章

是否可以从Spark-streaming检查点恢复广播值

提问于
浏览
0

我使用hbase-spark来记录我的火花流项目中的pv / uv . 然后,当我杀死应用程序并重新启动它时,我在检查点恢复时遇到以下异常:

16/03/02 10:17:21错误HBaseContext:无法从广播java.lang.ClassCastException获取getConfig:[B不能在com.paitao.xmlife.contrib.hbase.HBaseContext中强制转换为org.apache.spark.SerializableWritable .getConf(HBaseContext.scala:645)at com.paitao.xmlife.contrib.hbase.HBaseContext.com $ paitao $ xmlife $ contrib $ hbase $ HBaseContext $$ hbaseForeachPartition(HBaseContext.scala:627)at com.paitao.xmlife . contrib.hbase.HBaseContext $$ anonfun $ com $ paitao $ xmlife $ contrib $ hbase $ HBaseContext $$ bulkMutation $ 1.apply(HBaseContext.scala:457)at com.paitao.xmlife.contrib.hbase.HBaseContext $$ anonfun $ com $ paitao $ xmlife $ contrib $ hbase $ HBaseContext $$ bulkMutation $ 1.apply(HBaseContext.scala:457)at org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1 $$ anonfun $ apply $ 29.apply(RDD . scala:898)在org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1 $$ anonfun $ apply $ 29.apply(RDD.scala:898)org.apache.spark.SparkContext $$ anonfun $ runJob $ 5 . apply(SparkContext.scala:1839)在org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1839)org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)at org.apache.spark.scheduler.Task.run(Task.scala:88)at org .apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:214)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor) .java:617)在java.lang.Thread.run(Thread.java:745)

我检查了HBaseContext的代码,它使用广播来存储HBase配置 .

class HBaseContext(@transient sc: SparkContext,
               @transient config: Configuration,
               val tmpHdfsConfgFile: String = null) extends Serializable with Logging {

    @transient var credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
    @transient var tmpHdfsConfiguration: Configuration = config
    @transient var appliedCredentials = false
    @transient val job = Job.getInstance(config)

    TableMapReduceUtil.initCredentials(job)
    // <-- broadcast for HBaseConfiguration here !!!
    var broadcastedConf = sc.broadcast(new SerializableWritable(config))
    var credentialsConf = sc.broadcast(new SerializableWritable(job.getCredentials()))
    ...

当checkpoint-recover时,它试图在其getConf函数中访问此广播值:

if (tmpHdfsConfiguration == null) {
  try {
    tmpHdfsConfiguration = configBroadcast.value.value
  } catch {
    case ex: Exception => logError("Unable to getConfig from broadcast", ex)
  }
}

然后引发异常 . 我的问题是:是否有可能在spark应用程序中从检查点恢复广播值?我们还有其他一些解决方案来恢复后的 Value 吗?

感谢您的任何反馈!

2 回答

  • 1

    目前,这是Spark的一个已知错误 . 贡献者一直在调查这个问题,但没有取得任何进展 .

    这是我的解决方法:我不是将数据加载到广播变量并广播到所有 Actuator ,而是让每个 Actuator 将数据本身加载到单个对象中 .

    顺便说一句,请按照此问题进行更改https://issues.apache.org/jira/browse/SPARK-5206

  • 1

    遵循以下方法

    • 创建火花上下文 .

    • 初始化广播变量 .

    • 使用上面的spark上下文创建带有检查点目录的流上下文,并传递初始化的广播变量 .

    当流检测作业开始且检查点目录中没有数据时,它将初始化广播变量 .

    当重新启动流时,它将从检查点目录恢复广播变量 .

相关问题