首页 文章

从DStream RDD向批处理RDD添加新元素

提问于
浏览
3

使用Batch RDD连接/联合/组合DStream RDD的唯一方法是通过“transform”方法,该方法返回另一个DStream RDD,因此在微批结束时将其丢弃 .

有没有办法,例如union Dstream RDD with Batch RDD,它生成一个新的Batch RDD,其中包含DStream RDD和Batch RDD的元素 .

并且一旦以上述方式创建这样的批量RDD,其他DStream RDD可以将其用于例如:此时加入,结果可能是另一个DStream RDD

实际上,上述功能将导致批量RDD的元素定期更新(添加) - 其他元素将继续来自DStream RDD,这些RDD随每个微批量继续流入 . 新到达的DStream RDD也可以与之前更新的BAtch RDD连接并生成结果DStream RDD

使用updateStateByKey可以实现几乎相同的东西,但有没有办法按照此处的描述进行操作

1 回答

  • 4

    另一种方法是将批输入转换为DStream并将其与流输入结合 . 然后使用foreachRDD将其写出来,这是您对其他作业的批量输入 .

    val batch = sc.textFile(...)
    
     val ssc = new StreamingContext(sc, Seconds(30))
     val stream = ssc.textFileStream(...)
    
     import scala.collection.mutable
     val batchStream = ssc.queueStream(mutable.Queue.empty[RDD[String]], oneAtATime = false, defaultRDD = batch)
    
     val union = ssc.union(Seq(stream, batchStream))
    
     union.print()
    
     union.foreachRDD { rdd =>
       // Delete previous, or use SchemaRDD with .insertInto(, overwrite = true)
       rdd.saveTextFile(...)
     }
    
     ssc.start()
     ssc.awaitTermination()
    

相关问题