我有一个连接到kafka经纪人的Pyspark笔记本,并创建一个名为temp的spark writeStream . Kafka主题中的数据值是json格式,但我不知道如何创建一个可以实时解析这些数据的spark sql表 . 我知道的唯一方法是创建表的副本,将其转换为RDD或DF,并将值解析为另一个RDD和DF . 是否有可能在写入流时实时处理?
Code:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers","localhost:9092") \
.option("subscribe","hoteth") \
.option("startingOffsets", "earliest") \
.load()
ds = df.selectExpr("CAST (key AS STRING)", "CAST(value AS STRING)", "timestamp")
ds.writeStream.queryName("temp").format("memory").start()
spark.sql("select * from temp limit 5").show()
Output:
+----+--------------------+--------------------+
| key| value| timestamp|
+----+--------------------+--------------------+
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
+----+--------------------+--------------------+
1 回答
我可以解决这个问题的一种方法就是像在Hive HQL中一样只是横向查看json_tuple . 我仍然在寻找一种解决方案,它可以直接从流中解析数据,这样就不需要额外的处理时间来解析使用查询 .