首页 文章

Flink Streaming - 在Windows中应用功能

提问于
浏览
4

我也是flink和流媒体的新手 . 我想为每个分区应用一个特定的功能到流的每个窗口(使用事件时间) . 到目前为止我所做的是:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val inputStream = env.readTextFile("dataset.txt")
      .map(transformStream(_))
      .assignAscendingTimestamps(_.eventTime)
      .keyBy(_.id)
      .timeWindow(Time.seconds(windowSize),Time.seconds(slidingStep))

def transformStream(input: String): EventStream = {...}

case class EventStream(val eventTime: Long, val id: String, actualEvent: String)

我想要做的是对每个窗口批处理的每个分区应用一般函数,可能应用复杂的处理算法或类似的东西 . 我已经看到该方法适用于DataStream API,但我不明白它是如何工作的 . 在Flink API中,它表示它在Scala中的使用方式如下:

inputStream.apply { WindowFunction }

有人可以解释一下apply方法的用途或使用方法吗? Scala中的一个例子是可取的 . apply方法是否符合我的要求?

3 回答

  • 0

    因此,根据您想要进行的计算类型,基本上有两个可能的方向 . 使用: fold / reduce / aggregate 或更通用的,您已经提到过 - apply . 所有这些都适用于Windows的钥匙 .

    至于 apply ,它是一种非常通用的计算方法 . 最基本的版本(在Scala中)将是:

    def apply[R: TypeInformation](function: (K, W, Iterable[T],Collector[R]) => Unit): DataStream[R]
    

    其中function有4个参数:

    • 窗口的键(记住你正在使用keyedStream)

    • 窗口(您可以从中提取窗口的开始或结束)

    • 分配给此特定窗口和键的元素

    • 您应该向其发出处理结果的收集器

    必须记住,这个版本必须保持每个元素处于状态,直到窗口被发出 . 更好的内存性能解决方案是使用带有preAgreggator的版本,该版本在触发上述功能之前执行一些计算 .

    在这里你可以看到一个预先聚合的简短片段:

    val stream: DataStream[(String,Int)] =   ...
    
    stream.keyBy(_._1)
          .window(EventTimeSessionWindows.withGap(Time.seconds(conf.sessionGap())))
          .apply((e1, e2) => (e1._1, e1._2 + e2._2),
                 (key, window, in, out: Collector[(String, Long, Long, Int)]) => {
                    out.collect((key, window.getStart, window.getEnd, in.map(_._2).sum))
          })
    

    它会计算会话窗口中密钥的出现次数 .

    所以基本上如果你不需要窗口的元信息,我会坚持 fold \ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _考虑应用某种预先聚合,如果这还不够,请看一下最通用的 apply .

    有关更完整的示例,您可以查看here .

  • 7

    就我而言,您可以将map / flatmap / keyBy函数调用应用于有状态窗口数据 val inputStream ,以便更改数据 . 所以,如果你要创造

    class DoSthWithYourStream {...}

    在哪里定义方法和输入数据限制,然后您可以创建另一个值:

    val inputStreamChanged = inputStream .map( a => DoSthWithYourStream.Change2ColumnsIntoOne(a.change1st, a.change2nd), a) .flatMap(new DoSthWithYourStream())

    Examples extending Java Classed and applying Scala classes into the stream using map/flapmap/key etc

    如果你想使用CEP,我认为最好的选择是利用CEP pattern API

    val pattern = Pattern.begin("start").where(_.getId == 42) .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0) .followedBy("end").where(_.getName == "end")

    val patternStream = CEP.pattern(inputStream, pattern) val result: DataStream[Alert] = patternStream.select(createAlert(_))

  • 0

    事实证明它需要一点Scala魔法 . 到目前为止我所做的是:

    val test: DataStream[Long] = inputStream.apply(processPartition(_,_,_,_))
    
        def processPartition(key: String, window: TimeWindow,
                             batch: Iterable[EventStream],
                             out: Collector[Long]): Unit =  {..}
    

    从我的实验中,processPartition方法在整个批处理上应用了一个“键分区”的功能(批处理将只包含具有相同键的元素) . 我从Java API中获取了此方法的参数 . 如果任何人都可以详细说明apply函数及其工作方式,那将会非常有用 .

相关问题