首页 文章

Flink在timeWindow上应用函数

提问于
浏览
0

我正在做一个Flink项目 . 该项目的主要思想是读取JSON的数据流(网络日志),关联它们,并生成一个新的JSON,它是不同JSON信息的组合 .

此时,我能够读取JSON,生成KeyedStream(基于生成日志的机器),然后生成5秒的窗口流 .

我想要执行的下一步是对窗口使用apply函数并组合每个JSON的信息 . 我对如何做到有点困惑 .

我目前的代码如下:

DataStream<Tuple2<String,JSONObject>> MetaAlert = events
                .flatMap(new JSONParser())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .apply(new generateMetaAlert());




public static class generateMetaAlert implements WindowFunction<Tuple2<String,JSONObject>, Tuple2<String,JSONObject>, String, Window> {

        @Override
        public void apply(String arg0, Window arg1, Iterable<Tuple2<String, JSONObject>> arg2,
                Collector<Tuple2<String, JSONObject>> arg3) throws Exception {


        }

.apply(new generateMetaAlert())部分正在抱怨下一个错误:

方法apply(WindowFunction,R,Tuple,TimeWindow>)在WindowedStream类型中,Tuple,TimeWindow>不适用于参数(MetaAlertGenerator.generateMetaAlert)

任何其他代码结构提案都与我编写的提案截然不同?

预先感谢您的帮助

1 回答

  • 1

    当您应用 keyBy 函数(不使用匿名类)时,自定义 WindowFunction (第3个字段)中的键的类型应为 Tuple ,因为编译器无法确定键的类型 . 此代码编译时没有错误(考虑到我试图用虚拟代码填充空白):

    public class Test {
    
        public Test() {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            DataStream<String> events = env.readTextFile("datastream.log");
    
            DataStream<Tuple2<String, JSONObject>> MetaAlert
                    = events
                    .flatMap(new JSONParser())
                    .keyBy(0)
                    .timeWindow(Time.seconds(5))
                    .apply(new GenerateMetaAlert());
    
        }
    
        public class JSONObject {
        }
    
        public class JSONParser implements FlatMapFunction<String, Tuple2<String, JSONObject>> {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
    
            }
        }
    
        public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow> {
            @Override
            public void apply(Tuple key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
    
            }
        }
    
    }
    

    但最直接的方法是使用匿名类,以便保持 String 类型:

    DataStream<Tuple2<String, JSONObject>> MetaAlert
            = events
            .flatMap(new JSONParser())
            .keyBy(0)
            .timeWindow(Time.seconds(5))
            .apply(new WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow>() {
                @Override
                public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
                    // Your code here
                }
            });
    

    最后,如果你想保留这个类,但你也想保持你的键的类型,你可以实现一个 KeySelector

    public class Test {
    
        public Test() {
    
            DataStream<Tuple2<String, JSONObject>> MetaAlert
                    = events
                    .flatMap(new JSONParser())
                    .keyBy(new KeySelector<Tuple2<String,JSONObject>, String>() {
                        @Override
                        public String getKey(Tuple2<String, JSONObject> json) throws Exception {
                            return json.f0;
                        }
                    })
                    .timeWindow(Time.seconds(5))
                    .apply(new GenerateMetaAlert());
        }
    
        public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, String, TimeWindow> {
            @Override
            public void apply(String key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
    
            }
        }
    
    }
    

相关问题