首页 文章

带有textFileStream的Python Spark Streaming示例不起作用 . 为什么?

提问于
浏览
1

我使用spark 1.3.1和Python 2.7

这是我第一次使用Spark Streaming .

我尝试使用spark streaming从文件中读取数据的代码示例 .

这是示例的链接:https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/hdfs_wordcount.py

我的代码如下: `conf = (SparkConf() .setMaster("local") .setAppName("My app") .set("spark.executor.memory", "1g")) sc = SparkContext(conf = conf) ssc = StreamingContext(sc, 1) lines = ssc.textFileStream('../inputs/2.txt') counts = lines.flatMap(lambda line: line.split(" "))\ .map(lambda x: (x, 1))\ .reduceByKey(lambda a, b: a+b) counts.pprint() ssc.start() ssc.awaitTermination()`

2.txt文件的内容如下:

a1 b1 c1 d1 e1 f1 g1
a2 b2 c2 d2 e2 f2 g2
a3 b3 c3 d3 e3 f3 g3

我希望与文件内容相关的内容将在控制台中,但没有任何内容 . 除了这样的文字之外什么都没有:

-------------------------------------------
Time: 2015-09-03 15:08:18
-------------------------------------------

和Spark的日志 .

我做错了吗?否则为什么它不起作用?

3 回答

  • 2

    我发现了问题!

    我想问题出在文件系统行为上 . 我用的是mac .

    如果我只是复制它,我的程序没有看到文件 . 我的程序看到了该文件,但它是空的,当我在此文件夹中创建文件并在此之后输入数据 .

    最后,如果我创建文件并将其复制到扫描目录,我的程序会看到文件和内部的任何内容,并在未扫描目录的时间段内执行 .

    同样在问题文本中的代码我扫描文件,但我应该扫描目录 .

  • 1

    我遇到了类似的问题,但我意识到,一旦我设置Streaming运行,streamingcontext就会从新文件中获取数据 . 一旦流式传输完成,它仅提取新放置在源目录中的数据 .

    实际上,pyspark文档非常明确:

    textFileStream(目录)

    Create an input stream that monitors a Hadoop-compatible file system for new files and reads them as text files. Files must be wrriten to the monitored directory by “moving” them from another location within the same file system. File names starting with . are ignored.
    
  • 0

    如果您使用jupyter notebook执行此问题,则需要在批处理层中运行该程序,然后使用jupyter将文本文件上载到指定的文档 .

相关问题