我正在使用Apache Kafka,我一直在尝试使用Kafka Streams功能 . 我想要实现的是非常简单的,至少在单词中,它可以通过常规的普通消费者/ 生产环境 者方法轻松实现:

  • 从动态主题列表中读取a

  • 对邮件进行一些处理

  • 将消息推送到另一个主题,该主题根据消息内容计算名称

最初我认为我可以创建一个自定义接收器或注入某种 endpoints 解析器,以便以编程方式为每条消息定义主题名称,尽管最终找不到任何方法 . 所以我挖掘代码并找到了生成的ProducerInterceptor类(引自JavaDoc):

一个插件接口,允许您在生成器发布到Kafka集群之前拦截(并可能改变)生成器接收的记录 .

它是onSend方法:

这是从KafkaProducer.send(ProducerRecord)和KafkaProducer.send(ProducerRecord,Callback)方法调用,然后键和值被序列化并分配了分区(如果未在ProducerRecord中指定分区) .

这对我来说似乎是完美的解决方案,因为我可以有效地返回一个新的ProducerRecord,其中包含我想要的主题名称 . 虽然显然有's a bug (I'已经在他们的JIRA上打开了一个问题:KAFKA-4691)并且当键和值已经被序列化时调用该方法 . 我认为此时不进行额外的反序列化是可以接受的 .

我向你提出的经验丰富,知识渊博的用户问题将是你的意见和建议,以及关于如何有效和优雅地实现它的任何建议 .

在此先感谢您的帮助/意见/建议/想法 .

Below are some code snippets of what I've tried:

public static void main(String[] args) throws Exception {

    StreamsConfig streamingConfig = new StreamsConfig(getProperties());

    StringDeserializer stringDeserializer = new StringDeserializer();
    StringSerializer stringSerializer = new StringSerializer();

    MyObjectSerializer myObjectSerializer = new MyObjectSerializer();

    TopologyBuilder topologyBuilder = new TopologyBuilder();
    topologyBuilder.addSource("SOURCE", stringDeserializer, myObjectSerializer, Pattern.compile("input-.*"));

    .addProcessor("PROCESS", MyCustomProcessor::new, "SOURCE");


    System.out.println("Starting PurchaseProcessor Example");
    KafkaStreams streaming = new KafkaStreams(topologyBuilder, streamingConfig);
    streaming.start();
    System.out.println("Now started PurchaseProcessor Example");

}

private static Properties getProperties() {
    Properties props = new Properties();
    .....
    .....
    props.put(StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG), "com.test.kafka.streams.OutputTopicRouterInterceptor");

    return props;
}

OutputTopicRouterInterceptor onSend implementation:

@Override
public ProducerRecord<String, MyObject> onSend(ProducerRecord<String, MyObject> record) {
    MyObject obj = record.value();

    String topic = computeTopicName(obj);

    ProducerRecord<String, MyObject> newRecord = new ProducerRecord<String, MyObject>(topic, record.partition(), record.timestamp(), record.key(), obj);
    return newRecord;
}