我是apache kafka的新手 . 我正在尝试使用node.js实现字数计数示例usig kafka streaming . 我正在使用https://www.npmjs.com/package/kafka-streams库 . 我的项目设置如下:
我将使用apache kafka提供的producer-pref-test在一个主题上生成1,00,000条消息 . 我将创建一个消费者,他将使用在主题上创建的此消息流 . 我想执行窗口操作 . 我将10秒(窗口时间)的单词计数结果发送到其他主题或只是打印到控制台 .
我已经检查了https://github.com/nodefluent/kafka-streams/tree/master/examples给出的示例,但是,window.js示例不起作用并给出了一些错误 .
我试图通过自己实现字数统计和窗口操作,但是它没有在窗口中向我发送聚合计数值 .
我的代码:
const {KafkaStreams} = require("kafka-streams");
const config = {
//zkConStr: "localhost:2181/",
kafkaHost: "localhost:9092",
groupId: "kafka-streams-test",
clientName: "kafka-streams-test-name",
workerPerPartition: 1,
options: {
sessionTimeout: 8000,
protocol: ["roundrobin"],
fromOffset: "earliest", //latest
fetchMaxBytes: 1024 * 100,
fetchMinBytes: 1,
fetchMaxWaitMs: 10,
heartbeatInterval: 250,
retryMinTimeout: 250,
autoCommit: true,
autoCommitIntervalMs: 1000,
requireAcks: 0,
//ackTimeoutMs: 100,
//partitionerType: 3
}
};
const kafkaStreams = new KafkaStreams(config);
const consumeStream = kafkaStreams.getKStream("loadTopic6");
const windowPeriod = 10 * 1000; // 10 seconds
const from = Date.now();
const to = Date.now() + windowPeriod;
const {stream, abort} = consumeStream.window(from, to);
stream
.map(keyValueMapperEtl)
.countByKey("key", "count")
.map(kv => kv.key + " " + kv.count)
.tap(kv => console.log(kv))
.to("output-topic");
//start the stream
consumeStream.start();
//setTimeout(abort,50000000000);
function keyValueMapperEtl(message){
return {
key: message.value,
value: undefined // not required
};
}
我对所提供的演示做了一些修改,但它仍然无法正常工作 .
请帮助我,我正在做什么错,我错过了什么 .
谢谢