我使用自定义Kafka连接器(用Java编写,使用Kafka Connect的Java API)从外部源提取数据并存储在主题中 . 我需要设置自定义分区策略 . 我了解通过设置partitioner.class property可以设置自定义partitioner . 但是,这个属性不使用 connect-standalone
脚本来运行我的连接器)来使用我写的自定义 Partitioner
吗?
我使用自定义Kafka连接器(用Java编写,使用Kafka Connect的Java API)从外部源提取数据并存储在主题中 . 我需要设置自定义分区策略 . 我了解通过设置partitioner.class property可以设置自定义partitioner . 但是,这个属性不使用 connect-standalone
脚本来运行我的连接器)来使用我写的自定义 Partitioner
吗?
1 回答
源连接器可以通过
SourceRecord
的partition
字段控制写入每个源记录的分区 . 如果这是您自己的连接器,这是最直接的 .但是,如果要更改源连接器分区每条记录的方式,可以使用单个消息转换(SMT)覆盖源记录的
partition
字段 . 您可能必须通过实现org.apache.kafka.connect.transforms.Transformation
并使用自己的分区逻辑来编写自定义SMT,但这实际上比编写自定义Kafka分区程序要容易一些 .例如,这是一个名义上的自定义转换,它显示了如何使用配置属性以及如何使用所需的分区号创建新的
SourceRecord
实例 . 样本不完整,因为它实际上没有任何真正的分区逻辑,但它应该是一个很好的起点 .ConfigDef
和AbstractConfig
功能非常有用,可以执行更多有趣的操作,包括使用自定义验证器和推荐器,以及具有依赖于其他属性的配置属性 . 如果您想了解更多相关信息,请查看一些现有的Kafka Connect连接器,它们也使用相同的框架 .最后一件事 . 运行Kafka Connect独立或分布式工作线程时,但确保将CLASSPATH环境变量设置为指向包含自定义SMT的JAR文件以及SMT所依赖的任何JAR文件(Kafka提供的除外) .
connect-standalone.sh
和connect-distributed.sh
命令会自动将Kafka JAR添加到类路径中 .