我使用spark 1.3.1和Python 2.7
这是我第一次使用Spark Streaming .
我尝试使用spark streaming从文件中读取数据的代码示例 .
这是示例的链接:https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/hdfs_wordcount.py
我的代码如下: `conf = (SparkConf() .setMaster("local") .setAppName("My app") .set("spark.executor.memory", "1g")) sc = SparkContext(conf = conf) ssc = StreamingContext(sc, 1) lines = ssc.textFileStream('../inputs/2.txt') counts = lines.flatMap(lambda line: line.split(" "))\ .map(lambda x: (x, 1))\ .reduceByKey(lambda a, b: a+b) counts.pprint() ssc.start() ssc.awaitTermination()`
2.txt文件的内容如下:
a1 b1 c1 d1 e1 f1 g1
a2 b2 c2 d2 e2 f2 g2
a3 b3 c3 d3 e3 f3 g3
我希望与文件内容相关的内容将在控制台中,但没有任何内容 . 除了这样的文字之外什么都没有:
-------------------------------------------
Time: 2015-09-03 15:08:18
-------------------------------------------
和Spark的日志 .
我做错了吗?否则为什么它不起作用?
3 回答
我发现了问题!
我想问题出在文件系统行为上 . 我用的是mac .
如果我只是复制它,我的程序没有看到文件 . 我的程序看到了该文件,但它是空的,当我在此文件夹中创建文件并在此之后输入数据 .
最后,如果我创建文件并将其复制到扫描目录,我的程序会看到文件和内部的任何内容,并在未扫描目录的时间段内执行 .
同样在问题文本中的代码我扫描文件,但我应该扫描目录 .
我遇到了类似的问题,但我意识到,一旦我设置Streaming运行,streamingcontext就会从新文件中获取数据 . 一旦流式传输完成,它仅提取新放置在源目录中的数据 .
实际上,pyspark文档非常明确:
textFileStream(目录)
如果您使用jupyter notebook执行此问题,则需要在批处理层中运行该程序,然后使用jupyter将文本文件上载到指定的文档 .