首页 文章

将在线数据收集到hdfs时的水槽丢失数据

提问于
浏览
0

我使用 flume-ng 1.5 版本来收集日志 .

数据流中有两个代理,它们分别位于两个主机上 .

并且数据被发送 from agent1 to agent2.

代理商的组成部分如下:

agent1:假冒dir源 - >文件通道 - > avro sink agent2:avro源 - >文件通道 - > hdfs sink

But it seems to loss data about 1/1000 percentage of million data. 为了解决问题,我尝试了以下步骤:

  • 查找代理日志:找不到任何错误或异常 .

  • 查找代理监视器度量标准:从通道放置和取出的事件编号始终等于

  • 统计数据编号由hive查询和hdfs文件分别使用shell:两个数字相等且小于在线数据编号

agent1的配置:

#agent
agent1.sources = src_spooldir
agent1.channels = chan_file
agent1.sinks = sink_avro

#source
agent1.sources.src_spooldir.type = spooldir
agent1.sources.src_spooldir.spoolDir = /data/logs/flume-spooldir
agent1.sources.src_spooldir.interceptors=i1

#interceptors
agent1.sources.src_spooldir.interceptors.i1.type=regex_extractor
agent1.sources.src_spooldir.interceptors.i1.regex=(\\d{4}-\\d{2}-\\d{2}).*
agent1.sources.src_spooldir.interceptors.i1.serializers=s1
agent1.sources.src_spooldir.interceptors.i1.serializers.s1.name=dt

#sink
agent1.sinks.sink_avro.type = avro
agent1.sinks.sink_avro.hostname = 10.235.2.212
agent1.sinks.sink_avro.port = 9910

#channel
agent1.channels.chan_file.type = file
agent1.channels.chan_file.checkpointDir = /data/flume/agent1/checkpoint
agent1.channels.chan_file.dataDirs = /data/flume/agent1/data

agent1.sources.src_spooldir.channels = chan_file
agent1.sinks.sink_avro.channel = chan_file

agent2的配置

# agent 
agent2.sources  = source1
agent2.channels = channel1 
agent2.sinks    = sink1 

# source
agent2.sources.source1.type     = avro
agent2.sources.source1.bind     = 10.235.2.212
agent2.sources.source1.port     = 9910

# sink
agent2.sinks.sink1.type= hdfs
agent2.sinks.sink1.hdfs.fileType = DataStream
agent2.sinks.sink1.hdfs.filePrefix = log
agent2.sinks.sink1.hdfs.path = hdfs://hnd.hadoop.jsh:8020/data/%{dt}
agent2.sinks.sink1.hdfs.rollInterval = 600
agent2.sinks.sink1.hdfs.rollSize = 0
agent2.sinks.sink1.hdfs.rollCount = 0
agent2.sinks.sink1.hdfs.idleTimeout = 300
agent2.sinks.sink1.hdfs.round = true
agent2.sinks.sink1.hdfs.roundValue = 10
agent2.sinks.sink1.hdfs.roundUnit = minute

# channel
agent2.channels.channel1.type   = file
agent2.channels.channel1.checkpointDir = /data/flume/agent2/checkpoint
agent2.channels.channel1.dataDirs = /data/flume/agent2/data

agent2.sinks.sink1.channel      = channel1
agent2.sources.source1.channels = channel1

欢迎任何建议!

1 回答

  • 0

    文件行deseriazer中有一个错误,当遇到utf的某个特定字符,其中点在U 10000和U 10FFFF之间时,它们在utf16中由两个称为代理对的16位代码单元表示 .

相关问题