首页 文章

是否可以使用Spark Streaming SQL实时解析Kafka主题中的JSON字符串?

提问于
浏览
1

我有一个连接到kafka经纪人的Pyspark笔记本,并创建一个名为temp的spark writeStream . Kafka主题中的数据值是json格式,但我不知道如何创建一个可以实时解析这些数据的spark sql表 . 我知道的唯一方法是创建表的副本,将其转换为RDD或DF,并将值解析为另一个RDD和DF . 是否有可能在写入流时实时处理?

Code:

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers","localhost:9092") \
    .option("subscribe","hoteth") \
    .option("startingOffsets", "earliest") \
    .load()

ds = df.selectExpr("CAST (key AS STRING)", "CAST(value AS STRING)", "timestamp")
ds.writeStream.queryName("temp").format("memory").start()
spark.sql("select * from temp limit 5").show()

Output:

+----+--------------------+--------------------+
| key|               value|           timestamp|
+----+--------------------+--------------------+
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
+----+--------------------+--------------------+

1 回答

  • 0

    我可以解决这个问题的一种方法就是像在Hive HQL中一样只是横向查看json_tuple . 我仍然在寻找一种解决方案,它可以直接从流中解析数据,这样就不需要额外的处理时间来解析使用查询 .

    spark.sql("""
        select value, v1.transaction,ticker,price
        from temp 
        lateral view json_tuple(value,"e","s","p") v1 as transaction, ticker,price
        limit 5
        """).show()
    

相关问题