我们开始通过将消息发布到Kafka主题来整合来自应用程序的事件日志数据 . 虽然我们可以直接从应用程序写入Kafka,但我们选择将其视为一般问题并使用Flume代理 . 这提供了一些灵活性:如果我们想从服务器捕获其他东西,我们可以只是拖尾不同的源并发布到不同的Kafka主题 .
我们创建了一个Flume代理配置文件来拖尾日志并发布到Kafka主题:
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
tier1.sources.source1.type = exec
tier1.sources.source1.command = tail -F /var/log/some_log.log
tier1.sources.source1.channels = channel1
tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000
tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.topic = some_log
tier1.sinks.sink1.brokerList = hadoop01:9092,hadoop02.com:9092,hadoop03.com:9092
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.batchSize = 20
不幸的是,消息本身并没有指定生成它们的主机 . 如果我们在多个主机上运行应用程序并且发生错误,我们无法确定哪个主机生成了该消息 .
我注意到,如果Flume直接写入HDFS,我们可以use a Flume interceptor写入特定的HDFS位置 . 虽然我们可能会对Kafka做类似的事情,即为每个服务器创建一个新主题,但这可能会变得难以处理 . 我们最终会有数千个主题 .
当Flume发布到Kafka主题时,它能否附加/包含原始主机的主机名?
2 回答
如果您正在使用
exec
源,则不会阻止您运行智能命令以将主机名添加到日志文件内容前面 .注意:如果命令使用管道之类的东西,你还需要像这样指定shell:
消息看起来像这样:
...
frb.hi.inet
我们主人的名字 .您可以创建一个自定义TCP源,它读取客户端地址并将其添加到标头中 .
flume-conf.properties可以配置为:
我发送了一条测试消息来测试它,它看起来像:
我在github上传了这个项目