首页 文章

Flink CEP不具有确定性

提问于
浏览
1

我在没有集群的情况下在本地运行以下代码:

val count = new AtomicInteger()
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val text: DataStream[String] = env.readTextFile("file:///flink/data2")
val mapped: DataStream[Map[String, Any]] = text.map((x: String) => Map("user" -> x.split(",")(0), "val" -> x.split(",")(1)))
val pattern: ...
CEP.pattern(mapped, pattern).select(eventMap => {
  println("Found: " + (patternName, eventMap))
  count.incrementAndGet()
})

env.execute()
println(count)

我的数据是CSV文件,格式如下(user,val):

1,1
1,2
1,3
2,1
2,2
2,3
...

我试图检测模式的事件 event(val=1) -> event(val=2) -> event(val=3) . 当我在一个大输入流上运行它时,我知道在流中存在一定数量的事件,我得到的事件计数不一致,几乎总是小于系统中的事件数 . 如果我这样做 env.setParallelism(1) (就像我在代码的第3行中所做的那样),就会检测到所有事件 .

我假设问题是当并行度> 1时,多个线程正在处理流中的事件,这意味着当一个线程有 event(val=1) -> event(val=2) 时, event(val=3) 可能会被发送到另一个线程,并且可能无法检测到整个模式 .

这里有什么我想念的吗?我不能丢失流中的任何模式,但将并行性设置为1似乎打败了使用像Flink这样的系统来检测事件的目的 .

更新:

我尝试使用以下方法键入流:

val mapped: KeyedStream[Map[String, Any]] = text.map(...).keyBy((m) => m.get("user"))

虽然这可以防止不同用户的事件相互干扰:

1,1
2,2
1,3

这并不妨碍Flink不按顺序向节点发送事件,这意味着非确定性仍然存在 .

2 回答

  • 0

    您是否考虑过使用userid键入流(您的第一个值)? Flink保证一个密钥的所有事件都到达同一个处理节点 . 当然,如果你想检测每个用户val = 1-> val = 2-> val = 3的模式,这只会有所帮助 .

  • 0

    最有可能的问题在于在map运算符之后应用keyBy运算符 .

    所以,而不是:

    val mapped: KeyedStream[Map[String, Any]] = text.map(...).keyBy((m) => m.get("user"))
    

    应该有:

    val mapped: KeyedStream[Map[String, Any]] = text.keyBy((m) => m.get("user")).map(...)
    

    我知道这是一个老问题,但也许对某人有帮助 .

相关问题