首页 文章

使用flume反序列化Json文件并沉入HDFS

提问于
浏览
1

我有一个假脱机目录,其中存在所有json文件,传入文件将每秒添加到此目录中,并且我必须反序列化传入的json文件并获取requires字段并将其附加到HDFS目录中 .

我做的是我创建了一个flume conf文件,其中将来自假脱机目录的文件作为源并使用1 Sink将json文件直接放入HDFS .

我必须在Sink之前将这个json变成结构格式并将其放入HDFS . 最重要的是,它不是Twitter数据 . 我必须实施纯粹的Flume .

我使用下面的水槽配置来完成工作:

agent_slave_1.channels.fileChannel1_1.type = file 
agent_slave_1.channels.fileChannel1_1.capacity = 200000
agent_slave_1.channels.fileChannel1_1.transactionCapacity = 1000
agent_slave_1.sources.source1_1.type = spooldir

agent_slave_1.sources.source1_1.spoolDir = /home/cloudera/runs/
agent_slave_1.sources.source1_1.fileHeader = false
agent_slave_1.sources.source1_1.fileSuffix = .COMPLETED
agent_slave_1.sinks.hdfs-sink1_1.type = hdfs
agent_slave_1.sinks.hdfs-sink1_1.hdfs.path =hdfs://localhost.localdomain:8020/user/cloudera/runs_scored/
agent_slave_1.sinks.hdfs-sink1_1.hdfs.batchSize = 1000
agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollSize = 268435456
agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollInterval = 0
agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollCount = 50000000
agent_slave_1.sinks.hdfs-sink1_1.hdfs.writeFormat=Text

agent_slave_1.sinks.hdfs-sink1_1.hdfsfileType = DataStream
agent_slave_1.sources.source1_1.channels = fileChannel1_1
agent_slave_1.sinks.hdfs-sink1_1.channel = fileChannel1_1

agent_slave_1.sinks =  hdfs-sink1_1
agent_slave_1.sources = source1_1
agent_slave_1.channels = fileChannel1_1

但我不知道如何使用反序列化器 .

有人可以帮我理解如何反序化Incomming Json文件吗?如果我需要在java中编写任何代码,请帮助我,我需要使用什么接口?如果可能的话给出一些提示 .

1 回答

  • 1

    最好的猜测是编写一个自定义拦截器,将您的JSON转换为所需的HDFS格式 . 它还具有填充可在hdfs路径中使用的标头的好处 .

    以下是配置拦截器的方法:

    agent_slave_1.sources.source1_1.interceptors = my_intercptor
    agent_slave_1.sources.source1_1.interceptors.my_intercptor.type = com.mycompany.MyInteceptor
    

    这个类看起来像这样:

    public class MyInteceptor implements Interceptor, Interceptor.Builder {
    
        private MyInteceptor interceptor;
    
        @Override
        public void initialize() {
    
    
        }
    
        @Override
        public Event intercept(Event event) {
            String bjson = event.getBody()));
            // decode your json, e.g. Jackson
            MyDecodedJsonObject record; // pseudo class
            event.getHeaders().put("timestamp", record.getTimestamp().toString());
            String newBody = record.getA() + "\t" + record.getB();
            event.setBody(newBody.getBytes())
            return event;
        }
    
        @Override
        public List<Event> intercept(List<Event> events) {
    
            for (Iterator<Event> iterator = events.iterator(); iterator.hasNext();) {
                Event next = intercept(iterator.next());
                if (next == null) {
                    iterator.remove();
                }
            }
            return events;
        }
    
        @Override
        public void close() {
    
    
        }
    
        @Override
        public Interceptor build() {
            return interceptor;
        }
    
        @Override
        public void configure(Context context) {
    
            interceptor = new MyInteceptor();
        }
    
    }
    

    不要忘记将这个类打包在jar中并将其放入flume的lib目录中 .

相关问题