首页 文章

为什么Kafka Consumer不断收到相同的消息(抵消)

提问于
浏览
0

我有一个SOAP Web服务,它发送kafka请求消息并等待kafka响应消息(例如consumer.poll(10000)) .

每次调用Web服务时,它都会创建一个新的Kafka Producer和一个新的Kafka Consumer .

每次我调用Web服务时,消费者都会收到相同的消息(例如,具有相同偏移量的消息) .

我正在使用Kafka 0.9并启用了自动提交并且自动提交频率为100毫秒 .

对于poll()方法返回的每个ConsumerRecord,我在其自己的Callable中处理,例如

ConsumerRecords<String, String> records = consumer.poll(200);

for (ConsumerRecord<String, String> record : records) {

final Handler handler = new Handler(consumerRecord);
            executor.submit(handler);

}

为什么我一遍又一遍地收到相同的消息?

UPDATE 0001

metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class com.kafka.MDCDeserializer
group.id = group-A.group
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [machine1:6667, machine2:6667, machine3:6667, machine0:6667]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = true
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
session.timeout.ms = 30000
metrics.num.samples = 2
client.id = 
ssl.endpoint.identification.algorithm = null
key.deserializer = class com.kafka.UUIDDerializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXTSASL
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = IbmX509
metrics.sample.window.ms = 30000
fetch.min.bytes = 1024
send.buffer.bytes = 131072
auto.offset.reset = latest

1 回答

  • 3

    基于您显示的代码 . 我认为你的问题是新的Consumer是单线程的 . 如果您轮询一次然后不进行另一次轮询,则auto.commit.offset将不起作用 . 尝试将代码置于while循环中,看看当您再次调用poll时,是否提交了偏移量 .

相关问题