我在Linux Ubuntu中运行了一个Flume,它运行得更好,但是在大约一天之后它总是停止运行,以下是水槽配置:

nginx.channels=ch-spooling ch-tail
    nginx.sources=spooling-source tail-source
    nginx.sinks=hdfs-spooling kafka-tail

    nginx.channels.ch-spooling.type=file
    nginx.channels.ch-spooling.checkpointDir=/home/hadoop/flume/file-channel/ch-spooling/checkpoint
    nginx.channels.ch-spooling.dataDirs=/home/hadoop/flume/file-channel/ch-spooling/data
    nginx.channels.ch-spooling.capacity=1000
    nginx.channels.ch-spooling.transactionCapacity=100
    nginx.channels.ch-spooling.capacity=100000

    nginx.channels.ch-tail.type=file
    nginx.channels.ch-tail.checkpointDir=/home/hadoop/flume/file-channel/ch-tail/checkpoint
    nginx.channels.ch-tail.dataDirs=/home/hadoop/flume/file-channel/ch-tail/data
    nginx.channels.ch-tail.capacity=1000
    nginx.channels.ch-tail.transactionCapacity=100
    nginx.channels.ch-tail.capacity=100000

    nginx.sources.spooling-source.type=spooldir
    nginx.sources.spooling-source.channels=ch-spooling
    nginx.sources.spooling-source.spoolDir=/usr/local/nginx/logs/flume_logs
    nginx.sources.spooling-source.fileHeader=true
    nginx.sources.spooling-source.fileHeaderKey=file
    nginx.sources.spooling-source.basenameHeader=true
    nginx.sources.spooling-source.basenameHeaderKey=basename
    nginx.sources.spooling-source.deletePolicy=never

    nginx.sources.spooling-source.consumeOrder=oldest
    nginx.sources.spooling-source.recursiveDirectorySearch=false
    nginx.sources.spooling-source.batchSize=100
    nginx.sources.spooling-source.inputCharset=UTF-8

    nginx.sources.spooling-source.decodeErrorPolicy=IGNORE
    nginx.sources.spooling-source.selector.type=replicating
    nginx.sources.spooling-source.interceptors=i1 i2
    nginx.sources.spooling-source.interceptors.i1.type=timestamp
    nginx.sources.spooling-source.interceptors.i2.type=host
    nginx.sources.spooling-source.interceptors.i2.useIP=true
    nginx.sources.spooling-source.interceptors.i2.hostHeader=host

    nginx.sources.tail-source.type=TAILDIR
    nginx.sources.tail-source.channels=ch-tail
    nginx.sources.tail-source.filegroups=hnrapi hnrapierror
    nginx.sources.tail-source.filegroups.hnrapi=/usr/local/nginx/logs/hnr-api.access.log
    nginx.sources.tail-source.filegroups.hnrapierror=/usr/local/nginx/logs/error.log
    nginx.sources.tail-source.positionFile=/home/hadoop/flume/file-channel/ch-tail/taildir_position.json
    nginx.sources.tail-source.headers.hnrapi.topic=hnrapi
    nginx.sources.tail-source.headers.hnrapierror.topic=hnrapierror
    nginx.sources.tail-source.skipToEnd=true
    nginx.sources.tail-source.interceptors=i1 i2
    nginx.sources.tail-source.interceptors.i1.type=timestamp
    nginx.sources.tail-source.interceptors.i2.type=host
    nginx.sources.tail-source.interceptors.i2.useIP=true
    nginx.sources.tail-source.interceptors.i2.hostHeader=host

    nginx.sinks.hdfs-spooling.channel=ch-spooling
    nginx.sinks.hdfs-spooling.type=hdfs
    nginx.sinks.hdfs-spooling.hdfs.fileType=DataStream
    nginx.sinks.hdfs-spooling.hdfs.writeFormat=Text

    nginx.sinks.hdfs-spooling.hdfs.path=hdfs://namenode1:9000/flume/nginx/%Y-%m-%d

    nginx.sinks.hdfs-spooling.hdfs.filePrefix=%{basename}.[%{host}]
    nginx.sinks.hdfs-spooling.hdfs.fileSuffix=
    nginx.sinks.hdfs-spooling.hdfs.inUseSuffix=.tmp
    nginx.sinks.hdfs-spooling.hdfs.rollInterval=0
    nginx.sinks.hdfs-spooling.hdfs.rollSize=1073741824
    nginx.sinks.hdfs-spooling.hdfs.rollCount=0
    nginx.sinks.hdfs-spooling.hdfs.idleTimeout=60

    nginx.sinks.kafka-tail.channel=ch-tail
    nginx.sinks.kafka-tail.type=org.apache.flume.sink.kafka.KafkaSink
    nginx.sinks.kafka-tail.kafka.bootstrap.servers=192.47.180.63:9192,192.27.125.123:9192,192.27.124.96:9192
    nginx.sinks.kafka-tail.flumeBatchSize=32
    nginx.sinks.kafka-tail.kafka.producer.acks=1
    nginx.sinks.kafka-tail.useFlumeEventFormat=false

我用taildir和spooldir收集nginx日志,taildir有kafka的目标,spooldir有hdfs的目标 .

以下是水槽监控日志的部分内容:

2017-03-06 18:17:54,302(kafka-producer-network-thread | producer-1)[DEBUG - org.apache.flume.sink.kafka.SinkCallback.onCompletion(KafkaSink.java:456)]经过的时间发送:13 2017-03-06 18:18:14,230(Log-BackgroundWorker-ch-spooling)[DEBUG - org.apache.flume.channel.file.FlumeEventQueue.checkpoint(FlumeEventQueue.java:138)]不需要检查点2017-03-06 18:18:14,408(Log-BackgroundWorker-ch-tail)[INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint(EventQueueBackingStoreFile.java:227)]启动/ home / hadoop的检查点/ flume / file-channel / ch-tail / checkpoint / checkpoint,要同步的元素= 8 2017-03-06 18:18:14,415(Log-BackgroundWorker-ch-tail)[INFO - org.apache.flume.channel . file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:252)]更新检查点元数据:logWriteOrderID:1488782535982,queueSize:0,queueHead:1642 2017-03-06 18:18:14,420(Log-BackgroundWorker-ch-tail)[INFO - org.apache.flume.channel.file.Log.writeC heckpoint(Log.java:1052)]更新了文件检查点:/ home / hadoop / flume / file-channel / ch-tail / data / log-16位置:391719 logWriteOrderID:1488782535982 2017-03-06 18:18:14,420 (Log-BackgroundWorker-ch-tail)[DEBUG - org.apache.flume.channel.file.Log.removeOldLogs(Log.java:1115)]目前正在使用的文件:[16] 2017-03-06 18:18: 14,484(conf-file-poller-0)[DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider $ FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)]检查文件:/ home / hadoop / flume / conf / flume-conf . 更改属性2017-03-06 18:18:44,130(Log-BackgroundWorker-ch-spooling)[DEBUG - org.apache.flume.channel.file.FlumeEventQueue.checkpoint(FlumeEventQueue.java:138)]不需要2017年检查点-03-06 18:18:44,420(Log-BackgroundWorker-ch-tail)[DEBUG - org.apache.flume.channel.file.FlumeEventQueue.checkpoint(FlumeEventQueue.java:138)]不需要检查点2017-03-06 18:18:44,484(conf-file-poller-0)[DEBUG - org.apache.flume . node.PollingPropertiesFileConfigurationProvider $ FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)]检查文件:/home/hadoop/flume/conf/flume-conf.properties for changes 2017-03-06 18:19:14,230(Log-BackgroundWorker-ch -spooling)[DEBUG - org.apache.flume.channel.file.FlumeEventQueue.checkpoint(FlumeEventQueue.java:138)]不需要检查点2017-03-06 18:19:14,421(Log-BackgroundWorker-ch-tail)[ DEBUG - org.apache.flume.channel.file.FlumeEventQueue.checkpoint(FlumeEventQueue.java:138)]不需要检查点2017-03-06 18:19:14,485(conf-file-poller-0)[DEBUG - org . apache.flume.node.PollingPropertiesFileConfigurationProvider $ FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)]检查文件:/home/hadoop/flume/conf/flume-conf.properties for changes 2017-03-06 18:19:44,131(Log -BackgroundWorker-ch-spooling)[DEBUG - org.apache.flume.channel.file.FlumeEventQueue.checkpoint(FlumeEventQueue.java:138)]不需要检查点2017-03-0 6 18:19:44,421(Log-BackgroundWorker-ch-tail)[DEBUG - org.apache.flume.channel.file.FlumeEventQueue.checkpoint(FlumeEventQueue.java:138)]不需要检查点2017-03-06 18:19 :44,485(conf-file-poller-0)[DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider $ FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)]检查文件:/ home / hadoop / flume / conf / flume-conf .properties for changes 2017-03-06 18:19:54,577(PollableSourceRunner-TaildirSource-tail-source)[INFO - org.apache.flume.source.taildir.TaildirSource.closeTailFiles(TaildirSource.java:288)]封闭文件: /usr/local/nginx/logs/hnr-api.access.log,inode:2237741,pos:3183 2017-03-06 18:19:59,307(SinkRunner-PollingRunner-DefaultSinkProcessor)[DEBUG - org.apache.flume . sink.kafka.KafkaSink.process(KafkaSink.java:187)]事件#0 2017-03-06 18:19:59,310(kafka-producer-network-thread | producer-1)[DEBUG - org.apache.flume.sink.kafka.SinkCallback.onCompletion(KafkaSink.java:455)] Acked message partition:1 of ofset:122 2017-03-06 18:19:59,310(kafka-producer-network-thread | producer-1)[DEBUG - org.apache.flume.sink.kafka.SinkCallback.onCompletion(KafkaSink.java:456)]已发送的时间:3 2017-03-06 18: 20:14,231(Log-BackgroundWorker-ch-spooling)[DEBUG-org.apache.flume.channel.file.FlumeEventQueue.checkpoint(FlumeEventQueue.java:138)]不需要检查点2017-03-06 18:20:14,421( Log-BackgroundWorker-ch-tail)[INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint(EventQueueBackingStoreFile.java:227)]启动/ home / hadoop / flume / file-channel / ch-tail /的检查点checkpoint / checkpoint,要同步的元素= 1 2017-03-06 18:20:14,427(Log-BackgroundWorker-ch-tail)[INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:252 )]更新检查点元数据:logWriteOrderID:1488782535987,queueSize:0,queueHead:1642 2017-03-06 18:20:14,432(Log-BackgroundWorker-ch-tail)[INFO - org.apache.flume.channel.file.Log .writeCheckpoint(Log.java:1052)]你pdated checkpoint for file:/ home / hadoop / flume / file-channel / ch-tail / data / log-16 position:392302 logWriteOrderID:1488782535987 2017-03-06 18:20:14,432(Log-BackgroundWorker-ch-tail) [DEBUG - org.apache.flume.channel.file.Log.removeOldLogs(Log.java:1115)]目前正在使用的文件:[16] 2017-03-06 18:20:14,485(conf-file-poller-0 )[DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider $ FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)]检查文件:/home/hadoop/flume/conf/flume-conf.properties进行更改2017-03-06 18 :20:43,431(Log-BackgroundWorker-ch-spooling)[DEBUG - org.apache.flume.channel.file.FlumeEventQueue.checkpoint(FlumeEventQueue.java:138)]不需要检查点2017-03-06 18:20:44,433 (Log-BackgroundWorker-ch-tail)[DEBUG - org.apache.flume.channel.file.FlumeEventQueue.checkpoint(FlumeEventQueue.java:138)]不需要检查点2017-03-06 18:20:44,485(conf-file -poller-0)[DEBUG - org.apache.flume.node.PollingPropertiesFileC onfigurationProvider $ FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)]检查文件:/home/hadoop/flume/conf/flume-conf.properties进行更改2017-03-06 18:20:46,224(agent-shutdown-hook)[ INFO - org.apache.flume.lifecycle.LifecycleSupervisor.stop(LifecycleSupervisor.java:78)]停止生命周期监督13 2017-03-06 18:20:46,227(agent-shutdown-hook)[INFO - org.apache.flume .channel.file.FileChannel.stop(FileChannel.java:324)]停止FileChannel ch-tail {dataDirs:[/ home / hadoop / flume / file-channel / ch-tail / data]} ... 2017-03- 06 18:20:46,227(agent-shutdown-hook)[INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint(EventQueueBackingStoreFile.java:227)]启动/ home / hadoop / flume / file-channel的检查点/ ch-tail / checkpoint / checkpoint,要同步的元素= 0 2017-03-06 18:20:46,233(agent-shutdown-hook)[INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile . java:252)]更新chec kpoint元数据:logWriteOrderID:1488782535988,queueSize:0,queueHead:1642 2017-03-06 18:20:46,238(agent-shutdown-hook)[INFO - org.apache.flume.channel.file.Log.writeCheckpoint(Log . java:1052)]更新了文件的检查点:/ home / hadoop / flume / file-channel / ch-tail / data / log-16位置:392302 logWriteOrderID:1488782535988试图关闭后台工作程序 . 2017-03-06 18:20:46,238(agent-shutdown-hook)[INFO - org.apache.flume.channel.file.Log.shutdownWorker(Log.java:868)]试图关闭后台工作程序 . 2017-03-06 18:20:46,238(agent-shutdown-hook)[INFO - org.apache.flume.channel.file.LogFile $ Writer.close(LogFile.java:384)] Closing / home / hadoop / flume / file-channel / ch-tail / data / log-16 2017-03-06 18:20:46,238(agent-shutdown-hook)[INFO - org.apache.flume.channel.file.LogFile $ RandomReader.close( LogFile.java:520)]关闭RandomReader / home / hadoop / flume / file-channel / ch-tail / data / log-16 2017-03-06 18:20:46,243(agent-shutdown-hook)[INFO - org .apache.flume.channel.file.LogFile $ RandomReader.close(LogFile.java:520)]关闭RandomReader / home / hadoop / flume / file-channel / ch-tail / data / log-15 2017-03-06 18 :20:46,249(agent-shutdown-hook)[INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:149)]组件类型:CHANNEL,名称:ch-tail停止2017-03-06 18 :20:46,249(agent-shutdown-hook)[INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:155)] Shutdown Metric类型:CHANNEL,名称:ch-tail . channel.start.time == 1488782534266 2017-03-06 18:20:46,249(agent-shutdown-hook)[INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:161)] Shutdown Metric for类型:CHANNEL,名称:ch-tail . channel.stop.time == 1488795646249 2017-03-06 18:20:46,249(agent-shutdown-hook)[INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:177)] Shutdown Metric for类型:CHANNEL,名称:ch-tail . channel.capacity == 100000 2017-03-06 18:20:46,249(agent-shutdown-hook)[INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:177)]关闭类型的度量标准: CHANNEL,名称:ch-tail . channel.current.size == 0 2017-03-06 18:20:46,249(agent-shutdown-hook)[INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:177)] Shutdown Metric for类型:CHANNEL,名称:ch-tail . channel.event.put.attempt == 834 2017-03-06 18:20:46,249(agent-shutdown-hook)[INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:177)] Shutdown类型的公制:CHANNEL,名称:ch-tail . channel.event.put.success == 834 2017-03-06 18:20:46,249(agent-shutdown-hook)[INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:177)] Shutdown类型的公制:CHANNEL,名称:ch-tail . channel.event.take.attempt == 3559 2017-03-06 18:20:46,249(agent-shutdown-hook)[INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:177)] Shutdown类型的公制:CHANNEL,名称:ch-tail . channel.event.take.success == 834 2017-03-06 18:20:46,249(agent-shutdown-hook)[INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.stop(PollingPropertiesFileConfigurationProvider.java:84)]配置提供商停止2017-03-06 18:20:46,249(agent-shutdown-hook)[DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.stop(PollingPropertiesFileConfigurationProvider.java:96)]配置提供程序已停止2017-03-06 18 :20:46,249(agent-shutdown-hook)[DEBUG - org.apache.flume.SinkRunner.stop(SinkRunner.java:104)]等待跑者线程退出2017-03-06 18:20:46,249(SinkRunner- PollingRunner-DefaultSinkProcessor)[DEBUG - org.apache.flume.SinkRunner $ PollingRunner.run(SinkRunner.java:155)]处理事件时中断 . 退出 . 2017-03-06 18:20:46,249(SinkRunner-PollingRunner-DefaultSinkProcessor)[DEBUG - org.apache.flume.SinkRunner $ PollingRunner.run(SinkRunner.java:171)]轮询运行器退出 . 度量标准:{name:null counters:{runner.interruptions = 1,runner.backoffs.consecutive = 12,runner.backoffs = 2689}} 2017-03-06 18:20:46,250(agent-shutdown-hook)[INFO - org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:613)]用timeoutMillis = 9223372036854775807 ms关闭Kafka 生产环境 者 . 2017-03-06 18:20:46,250(kafka-producer-network-thread | producer-1)[DEBUG - org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)]开始关闭Kafka 生产环境 者I / O线程,发送剩余记录 . 2017-03-06 18:20:46,250(kafka-producer-network-thread | producer-1)[DEBUG - org.apache.kafka.common.metrics.Metrics.removeSensor(Metrics.java:220)]删除了传感器name connections-closed:client-id-producer-1

请有人帮帮我!!谢谢 .