我有一个假脱机目录,其中存在所有json文件,传入文件将每秒添加到此目录中,并且我必须反序列化传入的json文件并获取requires字段并将其附加到HDFS目录中 .
我做的是我创建了一个flume conf文件,其中将来自假脱机目录的文件作为源并使用1 Sink将json文件直接放入HDFS .
我必须在Sink之前将这个json变成结构格式并将其放入HDFS . 最重要的是,它不是Twitter数据 . 我必须实施纯粹的Flume .
我使用下面的水槽配置来完成工作:
agent_slave_1.channels.fileChannel1_1.type = file
agent_slave_1.channels.fileChannel1_1.capacity = 200000
agent_slave_1.channels.fileChannel1_1.transactionCapacity = 1000
agent_slave_1.sources.source1_1.type = spooldir
agent_slave_1.sources.source1_1.spoolDir = /home/cloudera/runs/
agent_slave_1.sources.source1_1.fileHeader = false
agent_slave_1.sources.source1_1.fileSuffix = .COMPLETED
agent_slave_1.sinks.hdfs-sink1_1.type = hdfs
agent_slave_1.sinks.hdfs-sink1_1.hdfs.path =hdfs://localhost.localdomain:8020/user/cloudera/runs_scored/
agent_slave_1.sinks.hdfs-sink1_1.hdfs.batchSize = 1000
agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollSize = 268435456
agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollInterval = 0
agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollCount = 50000000
agent_slave_1.sinks.hdfs-sink1_1.hdfs.writeFormat=Text
agent_slave_1.sinks.hdfs-sink1_1.hdfsfileType = DataStream
agent_slave_1.sources.source1_1.channels = fileChannel1_1
agent_slave_1.sinks.hdfs-sink1_1.channel = fileChannel1_1
agent_slave_1.sinks = hdfs-sink1_1
agent_slave_1.sources = source1_1
agent_slave_1.channels = fileChannel1_1
但我不知道如何使用反序列化器 .
有人可以帮我理解如何反序化Incomming Json文件吗?如果我需要在java中编写任何代码,请帮助我,我需要使用什么接口?如果可能的话给出一些提示 .
1 回答
最好的猜测是编写一个自定义拦截器,将您的JSON转换为所需的HDFS格式 . 它还具有填充可在hdfs路径中使用的标头的好处 .
以下是配置拦截器的方法:
这个类看起来像这样:
不要忘记将这个类打包在jar中并将其放入flume的lib目录中 .