首页 文章

在接收器发生故障后,如何强制Flume-NG处理积压的事件?

提问于
浏览
7

我正在尝试设置Flume-NG从一堆服务器(主要运行Tomcat实例和Apache Httpd)收集各种日志,并将它们转储到5节点Hadoop集群上的HDFS中 . 设置如下所示:

Flume-Hadoop setup

每个应用程序服务器将相关日志转换为Exec Sources之一(每个日志类型一个:java,httpd,syslog),它们通过FileChannel传送到Avro接收器 . 在每台服务器上,不同的源,通道和接收器由一个代理管理 . 事件由位于Hadoop集群(也承载SecondaryNameNode和Jobtracker的节点)上的AvroSource获取 . 对于每个logtype,都有一个AvroSource侦听不同的端口 . 事件通过FileChannel进入HDFS接收器,使用FlumeEventAvro EventSerializer和Snappy压缩来保存事件 .

问题:管理HDFS接收器的Hadoop节点上的代理(再次,每个日志类型一个)在几个小时后失败,因为我们没有更改JVM的堆大小 . 从那时起,在该节点上的FileChannel中收集了大量事件,之后也在应用服务器上的FileChannel上收集了,因为Hadoop节点上的FileChannel达到了它的最大容量 . 当我修复问题时,我无法让Hadoop节点上的代理快速处理积压,以便恢复正常运行 . 文件通道在下沉之前保存事件的tmp目录的大小,一直在增长 . 此外,HDFS写入似乎真的很慢 . 有没有办法强制Flume在摄取新事件之前先处理积压?以下配置是否最佳?也许相关:写入HDFS的文件非常小,约为1-3 MB左右 . 对于64MB的HDFS默认块大小以及未来的MR操作,这当然不是最佳选择 . 我应该使用哪些设置来收集大到足以支持HDFS块大小的文件中的事件?我感觉Hadoop节点上的配置不对,我怀疑BatchSize,RollCount和相关参数的值是关闭的,但我不确定最佳值应该是什么 .

应用服务器上的配置示例:

agent.sources=syslogtail httpdtail javatail
agent.channels=tmpfile-syslog tmpfile-httpd tmpfile-java
agent.sinks=avrosink-syslog avrosink-httpd avrosink-java

agent.sources.syslogtail.type=exec
agent.sources.syslogtail.command=tail -F /var/log/messages
agent.sources.syslogtail.interceptors=ts
agent.sources.syslogtail.interceptors.ts.type=timestamp
agent.sources.syslogtail.channels=tmpfile-syslog
agent.sources.syslogtail.batchSize=1

...

agent.channels.tmpfile-syslog.type=file
agent.channels.tmpfile-syslog.checkpointDir=/tmp/flume/syslog/checkpoint
agent.channels.tmpfile-syslog.dataDirs=/tmp/flume/syslog/data

...

agent.sinks.avrosink-syslog.type=avro
agent.sinks.avrosink-syslog.channel=tmpfile-syslog
agent.sinks.avrosink-syslog.hostname=somehost
agent.sinks.avrosink-syslog.port=XXXXX
agent.sinks.avrosink-syslog.batch-size=1

Hadoop节点上的配置示例

agent.sources=avrosource-httpd avrosource-syslog avrosource-java
agent.channels=tmpfile-httpd tmpfile-syslog tmpfile-java
agent.sinks=hdfssink-httpd hdfssink-syslog hdfssink-java

agent.sources.avrosource-java.type=avro
agent.sources.avrosource-java.channels=tmpfile-java
agent.sources.avrosource-java.bind=0.0.0.0
agent.sources.avrosource-java.port=XXXXX

...

agent.channels.tmpfile-java.type=file
agent.channels.tmpfile-java.checkpointDir=/tmp/flume/java/checkpoint
agent.channels.tmpfile-java.dataDirs=/tmp/flume/java/data
agent.channels.tmpfile-java.write-timeout=10
agent.channels.tmpfile-java.keepalive=5
agent.channels.tmpfile-java.capacity=2000000

...

agent.sinks.hdfssink-java.type=hdfs
agent.sinks.hdfssink-java.channel=tmpfile-java
agent.sinks.hdfssink-java.hdfs.path=/logs/java/avro/%Y%m%d/%H
agent.sinks.hdfssink-java.hdfs.filePrefix=java-
agent.sinks.hdfssink-java.hdfs.fileType=DataStream
agent.sinks.hdfssink-java.hdfs.rollInterval=300
agent.sinks.hdfssink-java.hdfs.rollSize=0
agent.sinks.hdfssink-java.hdfs.rollCount=40000
agent.sinks.hdfssink-java.hdfs.batchSize=20000
agent.sinks.hdfssink-java.hdfs.txnEventMax=20000
agent.sinks.hdfssink-java.hdfs.threadsPoolSize=100
agent.sinks.hdfssink-java.hdfs.rollTimerPoolSize=10

1 回答

  • 8

    我在配置中看到的一些内容可能会导致问题:

    • 您的第一个代理商似乎有一个批量大小为1的avro接收器 . 您应该将其增加到至少100或更多 . 这是因为第二个代理上的avro源将提交到批量大小为1的通道 . 每次提交都会导致fsync,导致文件通道性能不佳 . exec源上的批处理大小也是1,导致该通道也很慢 . 您可以增加批量大小(或使用假脱机目录源 - 稍后将详细介绍) .

    • 您可以从同一通道读取多个HDFS接收器以提高性能 . 您应该确保每个接收器写入不同的目录或具有不同的“hdfs.filePrefix”,以便多个HDFS接收器不会尝试写入相同的文件 .

    • HDFS接收器的批量大小为20000,非常高,而callTimeout的默认值为10秒 . 如果要保留如此庞大的批量大小,则应增加“hdfs.callTimeout” . 我建议将批量大小减少到1000左右,并且超时约为15-20秒 . (注意,在当前批量大小,每个文件只保留2个批次 - 因此减少批量大小,增加rollInterval和timeOut)

    如果您使用tail -F,我建议您尝试使用新的Spool目录源 . 要使用此源,请将日志文件转出到假脱机目录源处理的目录 . 此源将仅处理不可变的文件,因此您需要将日志文件旋转出去 . 正如Flume用户指南中所述,使用tail -F和exec源存在问题 .

相关问题