首页 文章

Pyspark directStreams foreachRdd 始终具有空的 RDD

提问于
浏览
1

我一直在尝试从 Kafka 主题中读取我的数据,并将其写入一个镶木地板文件中。到目前为止,除了 foreachRdd 函数之外,我一切正常。当我在 dstream 上使用地图时,我可以看到数据,但是使用 foreachRdd 的下一步,Rdd 始终为空,我不知道为什么。

我的环境是运行 Kafka 和 Spark 独立的 Ubuntu。我正在使用 pyspark shell。我是 python 的新手,所以我仍然对语法感到迷惑,不知道这是否是我的问题所在。

任何帮助或见识将不胜感激。

这是我粘贴在 pyspark shell 中的代码的副本

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.types import *             
import json

kafkaBroker = 'localhost:9092'
consumer_group = 'spark-streaming'
topic = 'test'
batchTimeDur=5

ssc = StreamingContext(sc, batchTimeDur)
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": kafkaBroker})

#change string to json string
lines = directKafkaStream.map(lambda v: json.loads(v[1]))

# show what is in the stream
lines.map(lambda x: 'rec in this line: %s\n' % x).pprint()

# save lines to file
lines.foreachRDD(lambda x: saveAsParquet(x))

def saveAsParquet(rdd):
    print('in save a parquet')
    if not rdd.isEmpty:
        df = sqlContext.createDataFrame(rdd, buildSchema())
        #df.write.parquet('file:///vagrant/streamed-parquet', mode='overwrite')
        print('  writing file')
        df.write.parquet('file:///vagrant/streamed-parquet', mode='append')
    print('return save as parquet')
    return rdd

ssc.start()

1 回答

  • 2

    RDD.isEmpty是方法,而不是属性,因此根据语言定义,在布尔值上下文中rdd.isEmpty将被评估为 true:

    以下值将解释为 false:“ False”,“ None”,所有类型的数字零以及空字符串和容器(包括字符串,元组,列表,字典,集合和 Frozensets)。所有其他值均解释为 true。

    随后if not rdd.isEmpty将为假。

    你应该:

    if not rdd.isEmpty(): 
        ...
    

相关问题