我想从Dstream中的rdds中取样 . 由于Dstream没有 sample()
转换并且它是一系列rdds所以我这样做是为了从Dstream中取样并在其上应用一个wordcount:
from pyspark import SparkContext
from pyspark import SparkConf
# Optionally configure Spark Settings
conf=SparkConf()
conf.set("spark.executor.memory", "1g")
conf.set("spark.cores.max", "2")
conf.setAppName("SRS")
sc = SparkContext('local[3]', conf=conf)
from pyspark.streaming import StreamingContext
streamContext = StreamingContext(sc,3)
lines = streamContext.socketTextStream("localhost", 9000)
def sampleWord(rdd):
return rdd.sample(false,0.5,10)
lineSample = lines.foreachRDD(sampleWord)
words = lineSample.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word , 1))
wordCount = pairs.reduceByKey(lambda x, y: x + y)
wordCount.pprint(60)
streamContext.start()
streamContext.stop()
使用此代码,Spark启动但没有发生任何事情 . 我不知道为什么 rdd.sample()
不能这样工作?使用 foreachRDD
,我们可以访问流中的每个rdd,所以我认为现在我们可以使用特定于rdd的转换 .
2 回答
使用
transform
而不是foreachRDD
. 此外,您的代码中存在拼写错误 .使用
transform
: