我有一个包含10个分区的主题,使用以下配置通过JDBC sink连接器流式传输到MariaDB表:
{
"name":"sink-connector-mariadb",
"config": {
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max":"10",
"topics":"TOPIC_A",
"connection.url":"jdbc:mariadb://localhost:3306/kafka?user=myuser&password=mypass",
"auto.create":"false",
"auto.evolve":"true",
"table.name.format":"TABLE_A",
"pk.mode":"record_value",
"pk.fields":"ID",
"insert.mode":"upsert",
"transforms":"ExtractField",
"transforms.ExtractField.type":"org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ExtractField.field":"data"
}
}
然后创建一个视图 VW_TABLE_A
,它通过JDBC源连接器流回kafka:
{ "name": "source-connector-mariadb",
"config": {
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max":"10",
"connection.url":"jdbc:mariadb://localhost:3306/kafka?user=myuser&password=mypass",
"table.whitelist":"VW_TABLE_A",
"mode":"timestamp",
"timestamp.column.name":"ROWVERSION",
"validate.non.null":"false",
"topic.prefix":"source-",
"table.types":"VIEW",
"poll.interval.ms":"1000",
"consumer.group.id":"table-a"
}
}
最后,我有多个用Java编写的自定义使用者,它们使用消息并向RESTful Web服务发送一些请求 . 所有使用者属于同一组并从同一主题中读取消息 . 我的架构如下所示:
+-------------+ +--------------+
| | | |
| Kafka | --Streams TABLE_A---> | MariaDB |
| | | |
| | <-Streams VW_TABLE_A- | |
+-------------+ +--------------+
|
|
Consumers that send requests
|
v
+-------------+
| |
| RESTful |
| Service |
| |
+-------------|
如何保证发送到Web Service的消息按照接收器在第一步接收的顺序发送?
1 回答
我不确定手头的数据,但考虑用户数据 . 让我们说用户A,B,C . 我们不断更新上述用户,我们希望它们按顺序保留 .
用户A更新fname = Test_t1 .
用户A更新fname = Test_t2 .
用户B更新一些数据 .
用户C更新一些数据 .
现在我们希望按顺序处理数据 . 即用户A应具有fname = Test_t2 . 我们可以根据密钥计算kafka分区的分区号 . 让我们说对于用户数据,我们有消息key = UserID .
覆盖Partitioner类 . 这将始终根据键返回分区号 .
现在,如果您发送消息到'测试'主题,让我们说3个分区 . 用户A的所有消息将按顺序转到单个分区 .