我正在使用Apache Kafka,我一直在尝试使用Kafka Streams功能 . 我想要实现的是非常简单的,至少在单词中,它可以通过常规的普通消费者/ 生产环境 者方法轻松实现:
-
从动态主题列表中读取a
-
对邮件进行一些处理
-
将消息推送到另一个主题,该主题根据消息内容计算名称
最初我认为我可以创建一个自定义接收器或注入某种 endpoints 解析器,以便以编程方式为每条消息定义主题名称,尽管最终找不到任何方法 . 所以我挖掘代码并找到了生成的ProducerInterceptor类(引自JavaDoc):
一个插件接口,允许您在生成器发布到Kafka集群之前拦截(并可能改变)生成器接收的记录 .
它是onSend方法:
这是从KafkaProducer.send(ProducerRecord)和KafkaProducer.send(ProducerRecord,Callback)方法调用,然后键和值被序列化并分配了分区(如果未在ProducerRecord中指定分区) .
这对我来说似乎是完美的解决方案,因为我可以有效地返回一个新的ProducerRecord,其中包含我想要的主题名称 . 虽然显然有's a bug (I'已经在他们的JIRA上打开了一个问题:KAFKA-4691)并且当键和值已经被序列化时调用该方法 . 我认为此时不进行额外的反序列化是可以接受的 .
我向你提出的经验丰富,知识渊博的用户问题将是你的意见和建议,以及关于如何有效和优雅地实现它的任何建议 .
在此先感谢您的帮助/意见/建议/想法 .
Below are some code snippets of what I've tried:
public static void main(String[] args) throws Exception {
StreamsConfig streamingConfig = new StreamsConfig(getProperties());
StringDeserializer stringDeserializer = new StringDeserializer();
StringSerializer stringSerializer = new StringSerializer();
MyObjectSerializer myObjectSerializer = new MyObjectSerializer();
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.addSource("SOURCE", stringDeserializer, myObjectSerializer, Pattern.compile("input-.*"));
.addProcessor("PROCESS", MyCustomProcessor::new, "SOURCE");
System.out.println("Starting PurchaseProcessor Example");
KafkaStreams streaming = new KafkaStreams(topologyBuilder, streamingConfig);
streaming.start();
System.out.println("Now started PurchaseProcessor Example");
}
private static Properties getProperties() {
Properties props = new Properties();
.....
.....
props.put(StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG), "com.test.kafka.streams.OutputTopicRouterInterceptor");
return props;
}
OutputTopicRouterInterceptor onSend implementation:
@Override
public ProducerRecord<String, MyObject> onSend(ProducerRecord<String, MyObject> record) {
MyObject obj = record.value();
String topic = computeTopicName(obj);
ProducerRecord<String, MyObject> newRecord = new ProducerRecord<String, MyObject>(topic, record.partition(), record.timestamp(), record.key(), obj);
return newRecord;
}