我是开发kafka-streams应用程序的新手 . 我的流处理器用于根据输入json消息中的用户密钥值对json消息进行排序 .
Message 1: {"UserID": "1", "Score":"123", "meta":"qwert"}
Message 2: {"UserID": "5", "Score":"780", "meta":"mnbvs"}
Message 3: {"UserID": "2", "Score":"0", "meta":"fghjk"}
我在这里读到Dynamically connecting a Kafka input stream to multiple output streams没有动态解决方案 .
在我的用例中,我知道输入流所需的用户键和输出主题 . 因此,我正在编写特定于每个用户的单独处理器应用程序,其中每个处理器应用程
所有不同的流处理器应用程序从kafka中的相同json输入主题读取,但是如果满足预设的用户条件,则每个仅将消息写入特定用户的输出主题 .
public class SwitchStream extends AbstractProcessor<String, String> {
@Override
public void process(String key, String value) {
HashMap<String, String> message = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
message = mapper.readValue(value, HashMap.class);
} catch (IOException e){}
// User condition UserID = 1
if(message.get("UserID").equals("1")) {
context().forward(key, value);
context().commit();
}
}
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "sort-stream-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("Source", "INPUT_TOPIC");
builder.addProcessor("Process", SwitchStream::new, "Source");
builder.addSink("Sink", "OUTPUT_TOPIC", "Process");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
}
}
问题1:如果使用低级处理器API,是否可以使用高级流DSL轻松实现相同的功能? (我承认我发现它更难理解并遵循高级流DSL的其他在线示例)
问题2:输入json主题是以20K-25K EPS的高速率输入 . 我的处理器应用程序似乎无法跟上此输入流的步伐 . 我已经尝试过部署每个进程的多个实例,但结果与我希望它们的位置相差无几 . 理想情况下,每个处理器实例应该能够处理3-5K EPS .
有没有办法改善我的处理器逻辑或使用高级流DSL编写相同的处理器逻辑?会有所作为吗?
1 回答
您可以通过
filter()
在高级DSL中执行此操作(您有效地实现了过滤器,因为如果它是userID==1
,您只返回一条消息) . 您可以使用KStream#branch()
来概括此过滤器模式(有关详细信息,请参阅文档:http://docs.confluent.io/current/streams/developer-guide.html#stateless-transformations) . 另请阅读JavaDocs:http://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/streams关于表现 . 单个实例应该能够处理10K记录 . 没有任何进一步的信息很难说出问题可能是什么 . 我建议在Kafka用户列表中询问(参见http://kafka.apache.org/contact)