我有一个火花结构的蒸汽应用程序,我正在从 Kafka 读书 . 这是我的代码的基本结构 .
我创建了Spark会话 .
val spark = SparkSession
.builder
.appName("app_name")
.getOrCreate()
然后我从流中读到
val data_stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server_list")
.option("subscribe", "topic")
.load()
在Kafka记录中,我将“值”转换为字符串 . 它从二进制转换为字符串 . 此时,数据框中有1列
val df = data_stream
.select($"value".cast("string") as "json")
基于预定义的模式,我尝试将JSON结构解析为列 . 但是,这里的问题是如果数据是“坏”,或者是不同的格式,那么它与定义的模式不匹配 . 因此,下一个数据帧(df2)将空值获取到列中 .
val df2 = df.select(from_json($"json", schema) as "data")
.select("data.*")
我希望能够从某个列(我用作数据库中的主键的那一行)中的“null”的行中删除df2,即忽略与模式不匹配的错误数据?
编辑:我有点能够完成这个,但不是我想要的方式 . 在我的过程中,我使用了一个使用 .foreach(writer)
进程的查询 . 这样做是打开与数据库的连接,处理每一行,然后关闭连接 . structured streaming的文档提到了此过程所需的必需品 . 在process方法中,我从每一行获取值并检查我的主键是否为null,如果为null,则不将其插入数据库 .
2 回答
Kafka将数据存储为原始字节数组格式 . 数据 生产环境 者和消费者需要就数据结构达成一致以进行处理 .
如果生成的消息格式发生变化,则消费者需要调整以读取相同的格式 . 当您的数据结构不断发展时,问题就出现了,您可能需要在消费者端兼容 .
通过Protobuff定义消息格式解决了这个问题 .
只需过滤掉您不想要的任何空值: