首页 文章

从Spark Streaming DataFrame中删除(损坏的)不适合架构的行(来自Kafka的传入JSON数据)

提问于
浏览
1

我有一个火花结构的蒸汽应用程序,我正在从 Kafka 读书 . 这是我的代码的基本结构 .

我创建了Spark会话 .

val spark = SparkSession
  .builder
  .appName("app_name")
  .getOrCreate()

然后我从流中读到

val data_stream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "server_list")
  .option("subscribe", "topic")
  .load()

在Kafka记录中,我将“值”转换为字符串 . 它从二进制转换为字符串 . 此时,数据框中有1列

val df = data_stream
    .select($"value".cast("string") as "json")

基于预定义的模式,我尝试将JSON结构解析为列 . 但是,这里的问题是如果数据是“坏”,或者是不同的格式,那么它与定义的模式不匹配 . 因此,下一个数据帧(df2)将空值获取到列中 .

val df2 = df.select(from_json($"json", schema) as "data")
  .select("data.*")

我希望能够从某个列(我用作数据库中的主键的那一行)中的“null”的行中删除df2,即忽略与模式不匹配的错误数据?

编辑:我有点能够完成这个,但不是我想要的方式 . 在我的过程中,我使用了一个使用 .foreach(writer) 进程的查询 . 这样做是打开与数据库的连接,处理每一行,然后关闭连接 . structured streaming的文档提到了此过程所需的必需品 . 在process方法中,我从每一行获取值并检查我的主键是否为null,如果为null,则不将其插入数据库 .

2 回答

  • 0

    Kafka将数据存储为原始字节数组格式 . 数据 生产环境 者和消费者需要就数据结构达成一致以进行处理 .

    如果生成的消息格式发生变化,则消费者需要调整以读取相同的格式 . 当您的数据结构不断发展时,问题就出现了,您可能需要在消费者端兼容 .

    通过Protobuff定义消息格式解决了这个问题 .

  • 0

    只需过滤掉您不想要的任何空值:

    df2
      .filter(row => row("colName") != null)
    

相关问题