首页 文章

如何在ftp源水槽代理中保留文件的原始基名

提问于
浏览
0

我配置了一个水槽代理,它从FTP服务器读取并将文件发送到hdfs接收器 . 我的大问题是,我想用原始文件名存储hdfs中的文件 . 我尝试使用Spooldir源代码,它工作正常并且能够使用它们的基本名称在hdfs中存储文件,但是flume agent crush:

1)如果在放入假脱机目录后写入文件,Flume会在其日志文件中输出错误并停止处理 .

2)如果文件名稍后重复使用,Flume会在其日志文件中输出错误并停止处理 .

事实上,spooldir-source不适合我的用例 . 那么,有没有想法如何使ftp源保持文件名,随后,hdfs根据其名称分别存储文件 .

这是我的经纪人:

agent.sources = r1
agent.channels = c1
agent.sinks = k

#configure ftp source
agent.sources.r1.type = org.keedio.flume.source.mra.source.Source
agent.sources.r1.client.source = sftp
agent.sources.r1.name.server = ip
agent.sources.r1.user = user
agent.sources.r1.password = pwd
agent.sources.r1.port = 22
agent.sources.r1.knownHosts = ~/.ssh/known_hosts
agent.sources.r1.work.dir = /DATA/flume_ftp_source
agent.sources.r1.fileHeader = true
agent.sources.r1.basenameHeader = true
agent.sources.r1.inputCharset = ISO-8859-1
agent.sources.r1.flushlines = true

#configure sink s1
agent.sinks.k.type = hdfs
agent.sinks.k.hdfs.path =  hdfs://hostname:8020/user/admin/DATA/import_flume/agents/agent1/%Y/%m/%d/%H
agent.sinks.k.hdfs.filePrefix = %{basename}
agent.sinks.k.hdfs.rollCount = 0
agent.sinks.k.hdfs.rollInterval = 0
agent.sinks.k.hdfs.rollSize = 0
agent.sinks.k.hdfs.useLocalTimeStamp = true
agent.sinks.k.hdfs.batchsize =    1000000
agent.sinks.k.hdfs.fileType = DataStream

agent.channels.c1.type = memory
agent.channels.c1.capacity =  1000000
agent.channels.c1.transactionCapacity =   1000000

agent.sources.r1.channels = c1
agent.sinks.k.channel = c1

2 回答

  • 0

    我刚推出了一个解决方案来解决flume ftp github项目:

    KR,菲利普

    Is there a trick on how to fix the fact that the property % is missing ?

  • 0

    正如我所说,根据以下代码更新:Fix for Flume FTP source,这是我使用%变量的方式:

    ##############
    # COMPONENTS #
    ##############
    myPrj.sources = source_01
    myPrj.channels = channel_01
    myPrj.sinks = sink_01
    
    ############
    # BINDINGS #
    ############
    myPrj.sources.source_01.channels = channel_01
    myPrj.sinks.sink_01.channel = channel_01
    
    ###########
    # CHANNEL #
    ###########
    myPrj.channels.channel_01.type = memory
    myPrj.channels.channel_01.capacity = 10000
    myPrj.channels.channel_01.transactionCapacity = 10000
    
    ##########
    # SOURCE #
    ##########
    myPrj.sources.source_01.type = org.keedio.flume.source.ftp.source.Source
    
    myPrj.sources.source_01.client.source = ftp
    myPrj.sources.source_01.name.server = 127.0.0.1
    myPrj.sources.source_01.user = myPrj
    myPrj.sources.source_01.password = myPrj
    myPrj.sources.source_01.port = 21
    
    #myPrj.sources.source_01.security.enabled = true
    #myPrj.sources.source_01.security.cipher = TLS
    #myPrj.sources.source_01.security.certificate.enabled = true
    #myPrj.sources.source_01.path.keystore = /paht/to/keystore
    #myPrj.sources.source_01.store.pass = the_keyStore_password 
    
    myPrj.sources.source_01.run.discover.delay = 5000
    myPrj.sources.source_01.flushlines = false
    myPrj.sources.source_01.chunk.size = 33554432
    
    myPrj.sources.source_01.folder = /home/foo/app-flume-ftp-hdfs
    myPrj.sources.source_01.file.name = flume-ftp-hdfs.ser
    
    myPrj.sources.source_01.fileHeader = true
    myPrj.sources.source_01.basenameHeader = true
    
    # Deserializer
    myPrj.sources.source_01.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
    myPrj.sources.source_01.deserializer.maxBlobLength = 33554432
    
    ########
    # SINK #
    ########
    #myPrj.sinks.sink_01.type = logger
    myPrj.sinks.sink_01.type = hdfs
    myPrj.sinks.sink_01.hdfs.path = /user/foo/ftp_source/%Y/%m/%d/%H/%M/%{basename}
    myPrj.sinks.sink_01.hdfs.filePrefix = FTP_SOURCE
    myPrj.sinks.sink_01.hdfs.useLocalTimeStamp = true
    myPrj.sinks.sink_01.hdfs.rollCount = 0
    myPrj.sinks.sink_01.hdfs.rollInterval = 0
    myPrj.sinks.sink_01.hdfs.batchSize = 100
    
    # Data compressed
    #myPrj.sinks.sink_01.hdfs.rollSize = 33554432
    #myPrj.sinks.sink_01.hdfs.codeC = gzip
    #myPrj.sinks.sink_01.hdfs.fileType = CompressedStream
    
    # Data no compressed
    myPrj.sinks.sink_01.hdfs.rollSize = 33554432
    myPrj.sinks.sink_01.hdfs.fileType = DataStream
    

相关问题