首页 文章

couchbase火花连接器DCP从最后位置恢复

提问于
浏览
1

我有这个火花应用程序:

val conf = new SparkConf().setMaster("local[*]")
                           .setAppName("StreamingSample")
                           .set("com.couchbase.bucket.test", "")
                           .set("com.couchbase.nodes", "test-machine")
 val ssc = new StreamingContext(conf, Seconds(5))

 ssc.couchbaseStream(from = FromNow, to = ToInfinity)
    .filter(!_.isInstanceOf[Snapshot]) // Don't print snapshots, just mutations and deletions
    .checkpoint(Seconds(2))
    .foreachRDD(rdd => {
  val om: Broadcast[ObjectMapper] = ScalaObjectMapper.getInstance(rdd.sparkContext)

  rdd.foreach {
    case m: Mutation =>

      val content: Map[String, Object] = om.value.readValue(m.content, classOf[Map[String, Object]])

      content("objectType") match {
        case "o" => println("o")
        case "c" => println("c")
        case "s" => println("s")
        case unsupportedType => println("unsupported")
      }


    case m: Deletion => println("delete")
  }

})

当恢复火花失败时我如何从上一个位置恢复?

1 回答

  • 1

    不幸的是,当前连接器版本(1.2.1)只能从开头或从当前位置(流的末尾)流式传输 . 因此,在您的示例中,您别无选择,只能将 FromNow 更改为 FromBeginning ,然后跳过(在代码中)超过您已经看到的所有消息,直到您赶上 .

    客户团队目前正在开发一个能够记住状态的新实现,因此您将能够从流中的特定点进行恢复 .

相关问题