首页 文章

Apache Flink - 事件时间窗口

提问于
浏览
0

我想在Apache flink中创建键控窗口,以便每个键的窗口在键的第一个事件到达后n分钟执行 . 是否可以使用事件时间特性来完成(因为处理时间取决于系统时钟,并且不确定第一个事件何时到达) . 如果可能的话,请向事件说明事件时间和水印的分配,并解释如何在n分钟后调用过程窗口功能 .

下面是代码的一部分,可以让您了解我目前正在做什么:

//Make keyed events so as to start a window for a key
            KeyedStream<SourceData, Tuple> keyedEvents = 
                    env.addSource(new MySource(configData),"JSON Source")
                    .assignTimestampsAndWatermarks(new MyTimeStamps())
                    .setParallelism(1)
                    .keyBy("service");


            //Start a window for windowTime time
            DataStream<ResultData> resultData=
                    keyedEvents
                    .timeWindow(Time.minutes(winTime))
                    .process(new ProcessEventWindow(configData))
                    .name("Event Collection Window")
                    .setParallelism(25);

那么,我如何分配事件时间和水印标记,使得窗口跟随第一个事件的事件时间作为起始点并在10分钟后执行(第一个事件的开始时间对于不同的键可以是不同的) . 任何帮助将非常感激 .

/------------ ( window of 10 minutes )
Streams          |------------ ( window of 10 minutes )
            \------------ ( window of 10 minutes )

编辑:我用于分配时间戳和水印的类

public class MyTimeStamps implements AssignerWithPeriodicWatermarks<SourceData> {

    @Override
    public long extractTimestamp(SourceData element, long previousElementTimestamp) {

          //Will return epoch of currentTime
        return GlobalUtilities.getCurrentEpoch();
    }

    @Override
    public Watermark getCurrentWatermark() {
        // TODO Auto-generated method stub
        //Will return epoch of currentTime + 10 minutes
        return new Watermark(GlobalUtilities.getTimeShiftNMinutesEpoch(10));
    }

}

2 回答

  • 0

    我刚才有一个关于事件时间窗口的类似问题 . 这是我的流的样子

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
    //Consumer Setup
    
    val stream = env.addSource(consumer)
      .assignTimestampsAndWatermarks(new WMAssigner)
    
    // Additional Setup here
    
    stream
      .keyBy { data => data.findValue("service") }
      .window(TumblingEventTimeWindows.of(Time.minutes(10)))
      .process { new WindowProcessor }
    
      //Sinks go here
    

    我的WMAssigner类看起来像这样(注意:这允许1分钟的无序事件发生,如果你不想允许延迟,你可以扩展一个不同的Timestamp提取器):

    class WMAssigner extends BoundedOutOfOrdernessTimestampExtractor[ObjectNode] (Time.seconds(60)) {
      override def extractTimestamp(element: ObjectNode): Long = {
        val tsStr = element.findValue("data").findValue("ts").toString replaceAll("\"", "")
        tsStr.toLong
      }
    }
    

    我想用于Watermarks的时间戳是data.ts字段 .

    我的WindowProcessor:

    class WindowProcessor extends ProcessWindowFunction[ObjectNode,String,String,TimeWindow] {
      override def process(key: String, context: Context, elements: Iterable[ObjectNode], out: Collector[String]): Unit = {
        val out = ""
        elements.foreach( value => {
          out = value.findValue("data").findValue("outData")
        }
        out.collect(out)
      }
    }
    

    如果有什么不清楚,请告诉我

  • 0

    我认为对于您的用例,最好使用ProcessFunction . 您可以做的是在第一个事件发生时注册EventTimeTimer . 比在 onTimer 方法中发出结果 .

    就像是:

    public class ProcessFunctionImpl extends ProcessFunction<SourceData, ResultData> {
    
        @Override
        public void processElement(SourceData value, Context ctx, Collector<ResultData> out)
            throws Exception {
    
            // retrieve the current aggregate
            ResultData current = state.value();
            if (current == null) {
                // first event arrived
                current = new ResultData();
                // register end of window
                ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10 * 60 * 1000 /* 10 minutes */);
            }
    
            // update the state's aggregate
            current += value;
    
            // write the state back
            state.update(current);
        }
    
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultData> out)
            throws Exception {
    
            // get the state for the key that scheduled the timer
            ResultData result = state.value();
    
            out.collect(result);
    
            // reset the window state
            state.clear();
        }
    }
    

相关问题