Spark代码下面似乎没有对文件执行任何操作 example.txt
val conf = new org.apache.spark.SparkConf()
.setMaster("local")
.setAppName("filter")
.setSparkHome("C:\\spark\\spark-1.2.1-bin-hadoop2.4")
.set("spark.executor.memory", "2g");
val ssc = new StreamingContext(conf, Seconds(1))
val dataFile: DStream[String] = ssc.textFileStream("C:\\example.txt")
dataFile.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
我正在尝试使用 dataFile.print()
打印文件的前10个元素
一些生成的输出:
15/03/12 12:23:53 INFO JobScheduler: Started JobScheduler
15/03/12 12:23:54 INFO FileInputDStream: Finding new files took 105 ms
15/03/12 12:23:54 INFO FileInputDStream: New files at time 1426163034000 ms:
15/03/12 12:23:54 INFO JobScheduler: Added jobs for time 1426163034000 ms
15/03/12 12:23:54 INFO JobScheduler: Starting job streaming job 1426163034000 ms.0 from job set of time 1426163034000 ms
-------------------------------------------
Time: 1426163034000 ms
-------------------------------------------
15/03/12 12:23:54 INFO JobScheduler: Finished job streaming job 1426163034000 ms.0 from job set of time 1426163034000 ms
15/03/12 12:23:54 INFO JobScheduler: Total delay: 0.157 s for time 1426163034000 ms (execution: 0.006 s)
15/03/12 12:23:54 INFO FileInputDStream: Cleared 0 old files that were older than 1426162974000 ms:
15/03/12 12:23:54 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
15/03/12 12:23:55 INFO FileInputDStream: Finding new files took 2 ms
15/03/12 12:23:55 INFO FileInputDStream: New files at time 1426163035000 ms:
15/03/12 12:23:55 INFO JobScheduler: Added jobs for time 1426163035000 ms
15/03/12 12:23:55 INFO JobScheduler: Starting job streaming job 1426163035000 ms.0 from job set of time 1426163035000 ms
-------------------------------------------
Time: 1426163035000 ms
-------------------------------------------
15/03/12 12:23:55 INFO JobScheduler: Finished job streaming job 1426163035000 ms.0 from job set of time 1426163035000 ms
15/03/12 12:23:55 INFO JobScheduler: Total delay: 0.011 s for time 1426163035000 ms (execution: 0.001 s)
15/03/12 12:23:55 INFO MappedRDD: Removing RDD 1 from persistence list
15/03/12 12:23:55 INFO BlockManager: Removing RDD 1
15/03/12 12:23:55 INFO FileInputDStream: Cleared 0 old files that were older than 1426162975000 ms:
15/03/12 12:23:55 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
15/03/12 12:23:56 INFO FileInputDStream: Finding new files took 3 ms
15/03/12 12:23:56 INFO FileInputDStream: New files at time 1426163036000 ms:
example.txt
格式:
gdaeicjdcg,194,155,98,107
jhbcfbdigg,73,20,122,172
ahdjfgccgd,28,47,40,178
afeidjjcef,105,164,37,53
afeiccfdeg,29,197,128,85
aegddbbcii,58,126,89,28
fjfdbfaeid,80,89,180,82
正如 print
文档所述:
/ ** 打印此DStream中生成的每个RDD的前十个元素 . 这是一个输出运算符,因此这个DStream将被注册为输出流并在那里实现 . * /
这是否意味着为此流生成了0 RDD?如果想要查看RDD的内容,则使用Apache Spark将使用RDD的collect函数 . 这些类似于Streams的方法吗?那么总之如何打印到Stream的控制台内容?
更新:
更新了基于@ 0x0FFF注释的代码 . http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html似乎没有给出从本地文件系统读取的示例 . 这不像使用Spark核心那样常见,那里有从文件中读取数据的明确示例吗?
这是更新的代码:
val conf = new org.apache.spark.SparkConf()
.setMaster("local[2]")
.setAppName("filter")
.setSparkHome("C:\\spark\\spark-1.2.1-bin-hadoop2.4")
.set("spark.executor.memory", "2g");
val ssc = new StreamingContext(conf, Seconds(1))
val dataFile: DStream[String] = ssc.textFileStream("file:///c:/data/")
dataFile.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
但输出是一样的 . 当我将新文件添加到 c:\\data
dir(其格式与现有数据文件相同)时,它们不会被处理 . 我假设 dataFile.print
应该先打10行到控制台?
更新2:
也许这与我在Windows环境中运行此代码的事实有关?
2 回答
你误解了
textFileStream
的使用 . 以下是Spark文档中的描述:创建一个输入流,监视与Hadoop兼容的文件系统以获取新文件并将其作为文本文件读取(使用密钥作为LongWritable,值作为Text,输入格式作为TextInputFormat) .
首先,你应该将它传递给目录,其次,这个目录应该可以从运行接收器的节点获得,所以最好将HDFS用于此目的 . 然后,当您将 new 文件放入此目录时,它将由函数
print()
处理,并且将为其打印前10行更新:
我的代码:
这是我写的一个自定义接收器,它监听指定目录下的数据:
我想要注意的一件事是文件需要在<= configured
batchDuration
的时间内处理 . 在下面的示例中,'s set to 10 seconds but if time to process files by receiver exceeds 10 seconds then some data files will not be processed. I' m可以在这一点上进行修正 .以下是自定义接收器的实现方式:
更多信息在:http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html&https://spark.apache.org/docs/1.2.0/streaming-custom-receivers.html