首页 文章

基于kafka分区的结构化流式读取

提问于
浏览
0

我正在使用spark结构化Streaming来读取来自Kafka主题的传入消息并根据传入消息写入多个镶木桌面所以我创建了一个readStream,因为Kafka源是常见的,并且每个镶木地板表在循环中创建单独的写入流 . 这工作正常但读取流正在创建瓶颈,因为每个writeStream都会创建一个readStream,并且无法缓存已读取的数据帧 .

val kafkaDf=spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", conf.servers)
      .option("subscribe", conf.topics)
      //  .option("earliestOffset","true")
      .option("failOnDataLoss",false)
      .load()

foreach table   {  
//filter the data from source based on table name
//write to parquet
 parquetDf.writeStream.format("parquet")
        .option("path", outputFolder + File.separator+ tableName)
        .option("checkpointLocation", "checkpoint_"+tableName)
        .outputMode("append")
        .trigger(Trigger.Once())
       .start()
}

现在,每个写入流都在创建一个新的消费者组,并从Kafka读取整个数据,然后进行过滤并写入Parquet . 这会产生巨大的开销 . 为了避免这种开销,我可以将Kafka主题分区为具有与表数一样多的分区,然后读取流应仅从给定分区读取 . 但我没有看到将分区详细信息指定为Kafka读取流的一部分的方法 .

1 回答

  • 1

    如果数据量不是那么高,编写自己的接收器,收集每个微批次的数据,那么你应该能够缓存该数据帧并写入不同的位置,但需要一些调整,但它会工作

相关问题