首页 文章

Flume流式传输gz文件

提问于
浏览
0

我有一个包含很多gzip文件的文件夹 . 每个gzip文件都包含xml文件 . 我曾使用flume将文件流式传输到HDFS . 以下是我的配置文件:

agent1.sources = src
agent1.channels = ch
agent1.sinks = sink

agent1.sources.src.type = spooldir
agent1.sources.src.spoolDir = /home/tester/datafiles
agent1.sources.src.channels = ch
agent1.sources.src.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder

agent1.channels.ch.type = memory
agent1.channels.ch.capacity = 1000
agent1.channels.ch.transactionCapacity = 1000

agent1.sinks.sink.type = hdfs
agent1.sinks.sink.channel = ch
agent1.sinks.sink.hdfs.path = /user/tester/datafiles
agent1.sinks.sink.hdfs.fileType = CompressedStream
agent1.sinks.sink.hdfs.codeC = gzip
agent1.sinks.sink.hdfs.fileSuffix = .gz
agent1.sinks.sink.hdfs.rollInterval = 0
agent1.sinks.sink.hdfs.rollSize = 122000000
agent1.sinks.sink.hdfs.rollCount = 0
agent1.sinks.sink.hdfs.idleTimeout = 1
agent1.sinks.sink.hdfs.batchSize = 1000

在将文件流式传输到HDFS之后,我使用Spark使用以下代码读取它:

df = sparkSession.read.format('com.databricks.spark.xml').options(rowTag='Panel', compression='gzip').load('/user/tester/datafiles')

但我有问题要阅读它 . 如果我手动将一个gzip文件上传到HDFS文件夹并重新运行上面的Spark代码,它就可以毫无问题地读取它 . 我不确定是不是因为水槽 .

我尝试下载由flume流式传输的文件并解压缩,当我查看内容时,它不再显示xml格式,这是一些不可读的字符 . 谁能让我对此有所了解?谢谢 .

1 回答

  • 0

    我想你是在做这个 Wrong!!! 为什么?

    看到你有一个来源 "Non Split-able" ZIP . 你可以将它们部分地作为记录记录,如果你不解压缩,你会得到一个GZIPInputStream,你得到了水槽来源 .

    在读取GZIP输入流作为输入记录后,您将已经ziped的流保存到另一个GZIP流中,因为您选择了压缩的接收器类型 .

    所以你在HDFS中的Gzip里面有Zipped Streamed . :)

    我建议在cron中安排一个脚本从本地到HDFS进行复制将解决您的问题 .

相关问题