首页 文章

KSQL - 确定何时加载表

提问于
浏览
4

如何确定KSQL何时将我的数据从Kafka主题完全加载到我的表中?

GOAL: 获取2个Kafka主题,加入它们并将结果写入新的Kafka主题 .

EXAMPLE:

我正在使用Ksql的Rest API发出以下命令 .

CREATE TABLE MyTable (A1 VARCHAR, A2 VARCHAR) WITH (kafka_topic='topicA', key='A1', value_format='json');
CREATE STREAM MyStream (B1 varchar, B2 varchar) WITH (kafka_topic='topicB', value_format='json');
CREATE STREAM MyDestination WITH (Kafka_topic='topicC', PARTITIONS = 1, value_format='json') AS SELECT a.A1 as A1, a.A2 as A2, b.B1 as B1, b.B2 as B2 FROM  MyStream b left join MyTable a on a.A1 = b.B1;

PROBLEM: topicC仅包含来自topicB的数据,并且所有连接的值都为null .

虽然我从create table命令收到SUCCESS状态,但似乎数据尚未完全加载到表中 . 因此,第3个命令的结果仅包含来自流的数据,并且不包括来自表的数据 . 如果我在执行join命令之前人为地延迟,那么结果主题将正确地具有来自两个主题的数据 . 如何确定何时加载表,并且可以安全地执行join命令?

2 回答

  • 0

    KSQL(和底层Kafka Streams)中的表具有时间维度,即随时间的演变 . 对于流表连接,每个流记录都与“正确”表格版本连接(即,表格按时间版本化) .

    在即将发布的CP 5.1版本中,您可以通过确保表主题的所有记录时间戳都小于流主题的记录时间戳来“预加载”表 . 这告诉KSQL,它需要首先处理表主题数据,但是在它开始加入之前相应地提前表timestamp-version .

    有关详细信息,请查看:https://www.confluent.io/resources/streams-tables-two-sides-same-coin

  • 2

    这确实是一个很好的问题 . 此时,只有在表完全加载后,KSQL才能自动执行流表连接 . 这确实是一个有用的功能 . 这里讨论一个更普遍和相关的问题:https://github.com/confluentinc/ksql/issues/1751

相关问题