首页 文章

如何使用debezium更改数据捕获在mysql中捕获数据并使用kafka connect中的jdbc接收器消耗?

提问于
浏览
0

我有使用debezium更改数据捕获在mysql中捕获数据并使用kafka connect jdbc sink将其消耗给另一个mysql的问题 .

因为debezium对kafka主题产生的模式和有效负载与kafka connect jdbc sink期望的模式不兼容 .

当jdbc接收器想要使用数据并在另一个mysql中创建记录时,我得到异常 .

我该如何解决这个问题?

1 回答

  • 5

    Debezium生成的消息结构确实与JDBC接收器预期的消息结构不同 . JDBC接收器期望消息中的每个字段对应于行中的字段,因此消息对应于行的"after"状态 . OTOH,Debezium MySQL connector执行变更数据捕获,这意味着它不仅仅包括行的最新状态 . 具体来说,连接器使用包含行的主键或唯一键列的键输出消息,并使用包含信封结构的消息值:

    • 该操作,例如是插入,更新还是删除

    • 行的状态 before 发生了更改(插入时为null)

    • 行的状态 after 发生了更改(删除时为null)

    • 特定于源的信息,包括服务器元数据,事务ID,数据库和表名,事件发生时的服务器时间戳以及有关发现事件的位置的详细信息等 .

    • 连接器生成事件的时间戳

    解决这种差异的最简单方法是使用Kafka 0.10.2.x(目前最新版本为0.10.2.1)和Kafka Connect的新版本Single Message Transforms (SMTs) . 每个Kafka Connect连接器都可以配置零个或多个SMT链,这些SMT可以在将消息写入Kafka之前转换源连接器的输出,或者在将Kafka作为输入传递到接收器连接器之前转换从Kafka读取的消息 . SMT故意非常简单,处理单个消息,绝对不应该访问外部资源或维护任何状态,因此不能替代Kafka Streams或其他更强大的流处理系统,可以加入多个输入流,并且可以执行非常复杂的操作并跨多个消息维护状态 .

    如果您正在使用Kafka Streams进行任何类型的处理,那么您应该考虑在Kafka Streams应用程序中操作消息结构 . 如果没有,那么SMT是解决问题的好方法 . 实际上,有两种方法可以使用SMT来调整消息结构 .

    第一种选择是使用带有Debezium连接器的SMT来提取/保留行的“后”状态,并在将所有其他信息写入Kafka之前将其丢弃 . 当然,您将在Kafka主题中存储较少的信息,并丢弃一些可能在将来有 Value 的CDC信息 .

    第二个和IMO首选选项是将源连接器保持原样,并将所有CDC消息保留在Kafka主题中,但是然后使用带有接收器连接器的SMT来提取/保留行的“后”状态在将消息传递到JDBC接收器连接器之前丢弃所有其他信息 . 您可以使用Kafka Connect中包含的现有SMT之一,但您可以考虑编写自己的SMT来完全按照自己的意愿行事 .

相关问题