我一直在尝试从 Kafka 主题中读取我的数据,并将其写入一个镶木地板文件中。到目前为止,除了 foreachRdd 函数之外,我一切正常。当我在 dstream 上使用地图时,我可以看到数据,但是使用 foreachRdd 的下一步,Rdd 始终为空,我不知道为什么。
我的环境是运行 Kafka 和 Spark 独立的 Ubuntu。我正在使用 pyspark shell。我是 python 的新手,所以我仍然对语法感到迷惑,不知道这是否是我的问题所在。
任何帮助或见识将不胜感激。
这是我粘贴在 pyspark shell 中的代码的副本
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.types import *
import json
kafkaBroker = 'localhost:9092'
consumer_group = 'spark-streaming'
topic = 'test'
batchTimeDur=5
ssc = StreamingContext(sc, batchTimeDur)
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": kafkaBroker})
#change string to json string
lines = directKafkaStream.map(lambda v: json.loads(v[1]))
# show what is in the stream
lines.map(lambda x: 'rec in this line: %s\n' % x).pprint()
# save lines to file
lines.foreachRDD(lambda x: saveAsParquet(x))
def saveAsParquet(rdd):
print('in save a parquet')
if not rdd.isEmpty:
df = sqlContext.createDataFrame(rdd, buildSchema())
#df.write.parquet('file:///vagrant/streamed-parquet', mode='overwrite')
print(' writing file')
df.write.parquet('file:///vagrant/streamed-parquet', mode='append')
print('return save as parquet')
return rdd
ssc.start()
1 回答
RDD.isEmpty
是方法,而不是属性,因此根据语言定义,在布尔值上下文中rdd.isEmpty
将被评估为 true:随后
if not rdd.isEmpty
将为假。你应该: