首页 文章

具有链式keyBy的事件顺序调用相同的键

提问于
浏览
1

this link我明白来自某些inDataStream的事件的顺序在以下结果outDataStream中按键保留:

outDataStream = inDataStream.keyBy(...)
    .timeWindow(...)
    .reduce(...)

因此,例如,如果我们从inDataStream输入以下事件(我们在键上执行keyBy):

(1,key1),(2,key1),(3,key2),(4,key1),(5,key2)

然后outDataStream将为key1的事件和key2的事件保留相同的顺序 . 所以outDataStream的结果永远不会发生:

(2,key1),(1,key1),(3,key2),(4,key1),(5,key2)

(因为1和2切换) .

到目前为止,我是对的?然后,如果我们链接另一个keyBy / process,我们再次以相同的顺序生成,对吧?因为我们只是再次申请相同的保证 . 因为相同的密钥的顺序对我们来说至关重要,那么为了确保我们在同一页面上,我制作了我们所拥有的简化版本:

// incoming events. family is used for keyBy grouping.
case class Event(id: Int, family: String, value: Double)
// the aggregation of events
case class Aggregation(latsetId: Int, family: String, total: Double)

// simply adding events into total aggregation
object AggFunc extends AggregateFunction[Event, Aggregation, Aggregation] {
override def add(e: Event, acc: Aggregation) = Aggregation(e.id, e.family, e.value + acc.total)
override def createAccumulator() = Aggregation(-1, null, 0.0)
override def getResult(acc: Aggregation) = acc
}

object ProcessFunc extends ProcessFunction[Aggregation, String] {
override def processElement(agg: Aggregation, ctx: ProcessFunction[Aggregation, String]#Context, out: Collector[String]) =
  out.collect(s"Received aggregation combined with event ${agg.latsetId}. New total=${agg.total}")
}

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// incoming events from a source have 2 families: "A", and "B"
env.fromElements(Event(1, "A", 6.0), Event(2, "B", 4.0), Event(3, "A", -2.0), Event(4, "B", 3.0),
    Event(5, "A", 8.0), Event(6, "B", 1.0), Event(7, "A", -10.0))
  .keyBy(_.family)
  .timeWindow(Time.seconds(1))
  .trigger(CountTrigger.of(1)) // FIRE any incoming event for immediate aggregation and ProcessFunc application
  .aggregate(AggFunc)
  .keyBy(_.family)
  .process(ProcessFunc)
  .print()
}

因此,对于按顺序进入第一个keyBy的此类事件 - 对于任何运算符并行性和集群部署,我们保证Sink(此处为print())将始终接收以下家庭“A”的聚合并按此顺序(但可能与家庭“B”的聚合混合:

"Received aggregation combined with event 1. New total=6.0"
"Received aggregation combined with event 3. New total=4.0"
"Received aggregation combined with event 5. New total=12.0"
"Received aggregation combined with event 7. New total=2.0"

这是对的吗?

2 回答

  • 0

    Flink仅保证并行分区内的顺序,即,它不跨分区通信并保留数据以保证顺序 .

    这意味着,如果您有以下运算符:

    map(Mapper1).keyBy(0).map(Mapper2)
    

    并以2的并行性运行它,即

    Mapper1(1) -\-/- Mapper2(1)
                 X
    Mapper1(2) -/-\- Mapper2(2)
    

    然后, Mapper1(1) 中具有相同键的所有记录将按 Mapper2(1)Mapper2(2) 顺序到达,具体取决于键 . 当然,对于具有来自 Mapper1(2) 的相同键的所有记录也是如此 .

    因此,只要具有相同密钥的记录分布在多个分区(此处为 Mapper1(1)Mapper1(2) ),就不会对不同分区的记录进行排序保证,但仅针对同一分区中的记录 .

    如果顺序很重要,您可以将并行度降低到1,或者使用事件时语义实现运算符,并利用水印来推断记录的无序性 .

  • 2

    我不相信你可以安全地假设流元素的绝对顺序将保持并行> 1 .

    另外,我不认为一旦命中聚合运算符就可以假定它的顺序 . 聚合运算符的输出基于内部窗口计时器,并且不应假设键以任何特定顺序保存 .

    如果您需要订购,那么我认为您最好的选择是在数据出现之后对其进行排序 .

相关问题