我想在给定的密钥和公共窗口上加入 three or more 数据流或表 . 但是我不知道如何正确编写代码 . 官方文档https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/给出了下面的例子,但是它只加入了两个数据流,那么如何在给定的密钥和公共窗口上加入 three or more 数据流呢?
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});
我试图找出我首先使用公共窗口加入两个数据流,并使用结果数据流将第三个数据流加入公共窗口?但是,当我们将TimeCharacteristic设置为事件时间时,这三个数据流的事件时间的语义似乎会发生变化 .
==================
对于FlinK Table API和SQL,同样的问题,如何在给定键和公共窗口上加入 three or more 表?官方文件https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sql.html只是给出了单表的下面的例子 .
Table result1 = tableEnv.sqlQuery(
"SELECT user, " +
" TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart, " +
" SUM(amount) FROM Orders " +
"GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user");
我尝试编写如下的SQL来连接给定键和公共窗口上的三个表,但我不认为它是正确的 .
String SQL = "SELECT" +
" grades.user1 , SUM(salaries.amount) FROM grades " +
" INNER JOIN salaries ON grades.user1 = salaries.user1 " +
" INNER JOIN person ON grades.user1 = person.user1 "+
"GROUP BY grades.user1, TUMBLE(grades.proctime, INTERVAL '5' SECOND) "
那么,通过datastrem API或Flink Table API / SQL在给定密钥和公共窗口上连接三个或更多数据流/表的正确方法是什么?
_1892593_于6/16/2018提出更清楚的问题 .
对于Flink SQL,我需要的,就像下面的Pseudocode一样,是连接三个表和一个共同的TumblingEventTimeWindow,也就是说DataStream API的替代版本,无论如何由Flink SQL表示,也意味着从三个表连接所有事件,发生在同一个TumblingEventTimeWindow中 .
SELECT A.a, B.b, C.c
FROM A, B, C
WHERE A.x = B.x AND A.x = C.x AND
window(TumblingEventTimeWindows.of(Time.seconds(3))
似乎连接功能也在下面的Flink设计文档中提到:“事件时间翻滚窗口的Stream-Stream连接:加入两个流的元组,它们处于相同的翻滚事件时间窗口中”,我不知道是否Flink SQL已经实现了这种类型的Flink SQL连接功能 .
https://docs.google.com/document/d/1TLayJNOTBle_-m1rQfgA6Ouj1oYsfqRjPcp1h2TVqdI/edit#
1 回答
很难对你的问题给出明确的答案,因为你需要的连接的语义不明确 . DataStream API的窗口化连接实现的语义与Table API / SQL的窗口连接不同 .
在DataStream API上,您可以简单地定义另一个连接,如下所示:
由于Flink实现了标准SQL,您可以像往常一样定义三个表的连接:
窗口范围(
A.ts BETWEEN B.ts - X AND B.ts + Y)
可以根据需要定义 .