首页 文章

Kafka的消息订单保证

提问于
浏览
0

我有一个包含10个分区的主题,使用以下配置通过JDBC sink连接器流式传输到MariaDB表:

{ 
    "name":"sink-connector-mariadb", 
    "config": { 
                "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
               "tasks.max":"10", 
               "topics":"TOPIC_A", 
               "connection.url":"jdbc:mariadb://localhost:3306/kafka?user=myuser&password=mypass", 
               "auto.create":"false", 
               "auto.evolve":"true", 
               "table.name.format":"TABLE_A", 
               "pk.mode":"record_value",
               "pk.fields":"ID",
               "insert.mode":"upsert", 
               "transforms":"ExtractField", 
               "transforms.ExtractField.type":"org.apache.kafka.connect.transforms.ExtractField$Value", 
               "transforms.ExtractField.field":"data"
    }
}

然后创建一个视图 VW_TABLE_A ,它通过JDBC源连接器流回kafka:

{ "name": "source-connector-mariadb", 
      "config": { 
            "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector", 
                  "tasks.max":"10", 
                  "connection.url":"jdbc:mariadb://localhost:3306/kafka?user=myuser&password=mypass", 
                  "table.whitelist":"VW_TABLE_A", 
                  "mode":"timestamp", 
                  "timestamp.column.name":"ROWVERSION", 
                  "validate.non.null":"false", 
                  "topic.prefix":"source-", 
                  "table.types":"VIEW", 
                  "poll.interval.ms":"1000", 
                  "consumer.group.id":"table-a" 
        }
    }

最后,我有多个用Java编写的自定义使用者,它们使用消息并向RESTful Web服务发送一些请求 . 所有使用者属于同一组并从同一主题中读取消息 . 我的架构如下所示:

+-------------+                         +--------------+
    |             |                         |              |
    |    Kafka    |  --Streams TABLE_A--->  |   MariaDB    |
    |             |                         |              |
    |             |  <-Streams VW_TABLE_A-  |              |
    +-------------+                         +--------------+
          |
          | 
Consumers that send requests
          |
          v
    +-------------+
    |             |
    |   RESTful   |
    |    Service  |
    |             |
    +-------------|

如何保证发送到Web Service的消息按照接收器在第一步接收的顺序发送?

1 回答

  • 0

    我不确定手头的数据,但考虑用户数据 . 让我们说用户A,B,C . 我们不断更新上述用户,我们希望它们按顺序保留 .

    • 用户A更新fname = Test_t1 .

    • 用户A更新fname = Test_t2 .

    • 用户B更新一些数据 .

    • 用户C更新一些数据 .

    现在我们希望按顺序处理数据 . 即用户A应具有fname = Test_t2 . 我们可以根据密钥计算kafka分区的分区号 . 让我们说对于用户数据,我们有消息key = UserID .

    public class UserPartitioner implements Partitioner {
    
        public UserPartitioner(){
        }
    
        public UserPartitioner (Properties props) {
        }
    
        public void configure(Map<String, ?> configs) {
        }
    
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            if(key!=null && cluster!=null)
                return  MsgUtil.genrateHash(key.toString()) % cluster.partitionCountForTopic(topic);
            return 0;
        }
    
        public void close() {
        }
    
    }
    

    覆盖Partitioner类 . 这将始终根据键返回分区号 .

    现在,如果您发送消息到'测试'主题,让我们说3个分区 . 用户A的所有消息将按顺序转到单个分区 .

相关问题