首页 文章

如何通过datastrem API或Flink Table API / SQL在给定密钥和公共窗口上连接三个或更多数据流/表?

提问于
浏览
0

我想在给定的密钥和公共窗口上加入 three or more 数据流或表 . 但是我不知道如何正确编写代码 . 官方文档https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/给出了下面的例子,但是它只加入了两个数据流,那么如何在给定的密钥和公共窗口上加入 three or more 数据流呢?

dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});

我试图找出我首先使用公共窗口加入两个数据流,并使用结果数据流将第三个数据流加入公共窗口?但是,当我们将TimeCharacteristic设置为事件时间时,这三个数据流的事件时间的语义似乎会发生变化 .

==================

对于FlinK Table API和SQL,同样的问题,如何在给定键和公共窗口上加入 three or more 表?官方文件https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sql.html只是给出了单表的下面的例子 .

Table result1 = tableEnv.sqlQuery(
"SELECT user, " +
"  TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart,  " +
"  SUM(amount) FROM Orders " +
"GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user");

我尝试编写如下的SQL来连接给定键和公共窗口上的三个表,但我不认为它是正确的 .

String SQL = "SELECT" +
            " grades.user1  , SUM(salaries.amount)   FROM grades " +
            " INNER JOIN salaries ON   grades.user1 =   salaries.user1 " +
            " INNER JOIN person ON   grades.user1 =   person.user1 "+
             "GROUP BY grades.user1, TUMBLE(grades.proctime,  INTERVAL '5' SECOND) "

那么,通过datastrem API或Flink Table API / SQL在给定密钥和公共窗口上连接三个或更多数据流/表的正确方法是什么?

_1892593_于6/16/2018提出更清楚的问题 .

对于Flink SQL,我需要的,就像下面的Pseudocode一样,是连接三个表和一个共同的TumblingEventTimeWindow,也就是说DataStream API的替代版本,无论如何由Flink SQL表示,也意味着从三个表连接所有事件,发生在同一个TumblingEventTimeWindow中 .

SELECT A.a, B.b, C.c
FROM A, B, C
WHERE A.x = B.x AND A.x = C.x AND
window(TumblingEventTimeWindows.of(Time.seconds(3))

似乎连接功能也在下面的Flink设计文档中提到:“事件时间翻滚窗口的Stream-Stream连接:加入两个流的元组,它们处于相同的翻滚事件时间窗口中”,我不知道是否Flink SQL已经实现了这种类型的Flink SQL连接功能 .

https://docs.google.com/document/d/1TLayJNOTBle_-m1rQfgA6Ouj1oYsfqRjPcp1h2TVqdI/edit#

1 回答

  • 1

    很难对你的问题给出明确的答案,因为你需要的连接的语义不明确 . DataStream API的窗口化连接实现的语义与Table API / SQL的窗口连接不同 .

    在DataStream API上,您可以简单地定义另一个连接,如下所示:

    firstStream
      .join(secondStream)
        .where(<key selector>).equalTo(<key selector>)
        .window(TumblingEventTimeWindows.of(Time.seconds(3)))
        .apply (new JoinFunction () {...})
      .join(thirdStream)
        .where(<key selector>).equalTo(<key selector>)
        .window(TumblingEventTimeWindows.of(Time.seconds(3)))
        .apply (new JoinFunction () {...})
    

    由于Flink实现了标准SQL,您可以像往常一样定义三个表的连接:

    SELECT A.a, B.b, C.c
      FROM A, B, C
      WHERE A.x = B.x AND A.x = C.x AND
            A.ts BETWEEN B.ts - INTERVAL '10' MINUTE AND B.ts + INTERVAL '10' MINUTE AND
            A.ts BETWEEN C.ts - INTERVAL '10' MINUTE AND C.ts + INTERVAL '10' MINUTE
    

    窗口范围( A.ts BETWEEN B.ts - X AND B.ts + Y) 可以根据需要定义 .

相关问题