我们目前正在努力解决有关我们的Kafka设置和消费Java听众的一些困难 .

设置

我们将三个不同的组件一起部署为一个Tomcat容器中的Spring Boot应用程序,让我们将它们称为源,处理器和接收器 .

  • 通过REST调用源,摄取消息并为主题A(具有9个分区)生成Kafka消息 .

  • 处理器使用来自Kafka主题A的消息,丰富并过滤它们,最后为第二个主题B(具有9个分区)生成Kafka消息 .

  • 接收器消耗来自Kafka主题B的消息,并通过REST调用将它们引导到另一个系统,但是效果不佳 .

我们目前只有一名 Kafka 经纪人 .

问题

如果对源的第一个组件的大量REST调用在源上存在高压 - 例如15秒内发出1000个请求 - 下游组件处理器处理所有创建的消息但是接收器无法消耗其中一些消息,因此Kafka延迟开始增长(可能是因为接收器发送REST调用的系统速度慢)并且消费者失败处理它们 . 但它们确实存在于主题中(在本例中为B)!

我们试图通过尝试调整参数session.timeout.ms和max.poll.records来处理这种情况,但是没有达到滞后0 /消耗接收器中的所有消息 .

在我看来,我认为消费者会处理每条消息或者至少会记录一些错误,但遗憾的是不会发生这种情况 .

此外,我们模拟了由接收器调用的系统,因此表现更好,然后是越来越好的问题 .

好吧,我们设法以更简单的方式重现问题,并开始使用yourkit监视应用程序 .

对于下面的配置,我们有两个组件的四倍: - org.springframework.kafka.KafkaListenerEndpointContainer#0-x-kafka-listener-x - org.springframework.kafka.KafkaListenerEndpointContainer#0-x-kafka-consumer-x

在某个时间点,一个或多个kafka听众无限期地等待,他们的线程永远留在sun.misc.Unsafe.park(...) .

问题是为什么他们这样做?

在我看来,即使消费者很慢,最终也应该消费所有消息!

继续

org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-listener-1 State: SLEEPING tid: 23
java.lang.Thread.sleep(long) Thread.java
de.foobar.eventing.business.GenericTopicConsumer.process(String, byte[]) GenericTopicConsumer.java:60
sun.reflect.GeneratedMethodAccessor7.invoke(Object, Object[])
sun.reflect.DelegatingMethodAccessorImpl.invoke(Object, Object[]) DelegatingMethodAccessorImpl.java:43
java.lang.reflect.Method.invoke(Object, Object[]) Method.java:498
org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(Object[]) InvocableHandlerMethod.java:180
org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(Message, Object[]) InvocableHandlerMethod.java:112
org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(Message, Object[]) HandlerAdapter.java:48
org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(Object, Acknowledgment, Message) MessagingMessageListenerAdapter.java:174
org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(ConsumerRecord, Acknowledgment) RecordMessagingMessageListenerAdapter.java:72
org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(Object, Acknowledgment) RecordMessagingMessageListenerAdapter.java:47
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(ConsumerRecords) KafkaMessageListenerContainer.java:764
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(ConsumerRecords) KafkaMessageListenerContainer.java:708
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2500(KafkaMessageListenerContainer$ListenerConsumer, ConsumerRecords) KafkaMessageListenerContainer.java:230
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run() KafkaMessageListenerContainer.java:975
java.util.concurrent.Executors$RunnableAdapter.call() Executors.java:511
java.util.concurrent.FutureTask.run() FutureTask.java:266
java.lang.Thread.run() Thread.java:745

org.springframework.kafka.KafkaListenerEndpointContainer#0-1-kafka-listener-2 State: WAITING tid: 22
sun.misc.Unsafe.park(boolean, long) Unsafe.java
java.util.concurrent.locks.LockSupport.parkNanos(Object, long) LockSupport.java:215
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(long) AbstractQueuedSynchronizer.java:2078
java.util.concurrent.LinkedBlockingQueue.poll(long, TimeUnit) LinkedBlockingQueue.java:467
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run() KafkaMessageListenerContainer.java:971
java.util.concurrent.Executors$RunnableAdapter.call() Executors.java:511
java.util.concurrent.FutureTask.run() FutureTask.java:266
java.lang.Thread.run() Thread.java:745

org.springframework.kafka.KafkaListenerEndpointContainer#0-2-kafka-listener-1 State: WAITING tid: 24
sun.misc.Unsafe.park(boolean, long) Unsafe.java
java.util.concurrent.locks.LockSupport.parkNanos(Object, long) LockSupport.java:215
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(long) AbstractQueuedSynchronizer.java:2078
java.util.concurrent.LinkedBlockingQueue.poll(long, TimeUnit) LinkedBlockingQueue.java:467
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run() KafkaMessageListenerContainer.java:971
java.util.concurrent.Executors$RunnableAdapter.call() Executors.java:511
java.util.concurrent.FutureTask.run() FutureTask.java:266
java.lang.Thread.run() Thread.java:745

org.springframework.kafka.KafkaListenerEndpointContainer#0-3-kafka-listener-1 State: WAITING tid: 25
sun.misc.Unsafe.park(boolean, long) Unsafe.java
java.util.concurrent.locks.LockSupport.parkNanos(Object, long) LockSupport.java:215
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(long) AbstractQueuedSynchronizer.java:2078
java.util.concurrent.LinkedBlockingQueue.poll(long, TimeUnit) LinkedBlockingQueue.java:467
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run() KafkaMessageListenerContainer.java:971
java.util.concurrent.Executors$RunnableAdapter.call() Executors.java:511
java.util.concurrent.FutureTask.run() FutureTask.java:266
java.lang.Thread.run() Thread.java:745

配置所有三个消费者使用的蓝图

Spring Integration XML

<bean class="org.springframework.kafka.listener.config.ContainerProperties"
        id="containerProperties"
        c:topics="a_topic"
        p:ackMode="RECORD"
        p:ackOnError="false" />

<bean class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer"
        id="kafkaReceiverContainer"
        p:concurrency="4"
        c:consumerFactory-ref="consumerFactory"
        c:containerProperties-ref="containerProperties" />

<int-kafka:message-driven-channel-adapter
    listener-container="kafkaReceiverContainer"
    auto-startup="true"
    channel="kafkaInbound"
    phase="100"
    send-timeout="5000" />

Java配置

@Bean
  public ConsumerFactory<String, byte[]> consumerFactory() {
    Map<String, Object> props = constructConsumerProperties();
    return new DefaultKafkaConsumerFactory<String, byte[]>(props);
  }
  private Map<String, Object> constructConsumerProperties() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); // Kafka broker URLs. Doesn't need to be a complete list of brokers
    props.put(ConsumerConfig.GROUP_ID_CONFIG, this.consumerGroupId); // Consumer Group ID. Must be unique for every app. Multiple instances must have same ID.
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // We will use "RECORD" AckMode in the Spring Listener Container
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30_000); // Timeout to detect failures when using Kafka's group management facilities
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Start with the first message when a new consumer group (app) arrives at the topic
    if (sslEnabled) {
      props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
      props.put("ssl.keystore.location", System.getProperty("javax.net.ssl.keyStore"));
      props.put("ssl.keystore.password", System.getProperty("javax.net.ssl.keyStorePassword"));
    }
    return props;
  }

已使用的版本

  • Kafka客户端版本0.11.0.0

  • Spring Kafka 1.2.2.RELEASE

  • Spring Integration Kafka 2.1.0.RELEASE

server.properties

auto.create.topics.enable=false
auto.leader.rebalance.enable=true
background.threads=10
broker.id=10001
broker.id.generation.enable=false
compression.type=producer
connections.max.idle.ms=600000
controlled.shutdown.enable=true
controlled.shutdown.max.retries=3
controlled.shutdown.retry.backoff.ms=5000
controller.socket.timeout.ms=30000
default.replication.factor=1
delete.topic.enable=true
fetch.purgatory.purge.interval.requests=100
group.max.session.timeout.ms=30000
group.min.session.timeout.ms=6000
inter.broker.protocol.version=0.10.1.0
leader.imbalance.check.interval.seconds=300
leader.imbalance.per.broker.percentage=10
listeners=SSL://example-broker.biz:9092
log.cleaner.backoff.ms=15000
log.cleaner.dedupe.buffer.size=134217728
log.cleaner.delete.retention.ms=86400000
log.cleaner.enable=true
log.cleaner.io.buffer.load.factor=0.9
log.cleaner.io.buffer.size=524288
log.cleaner.io.max.bytes.per.second=1.7976931348623157E308
log.cleaner.min.cleanable.ratio=0.5
log.cleaner.threads=1
log.cleanup.policy=delete
log.dir=/srv/data/kafka/
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.message.format.version=0.10.1.0
log.preallocate=false
log.retention.bytes=-1
log.retention.check.interval.ms=300000
log.retention.hours=336
log.roll.hours=336
log.roll.jitter.hours=0
log.segment.bytes=1073741824
log.segment.delete.delay.ms=60000
max.connections.per.ip=2147483647
message.max.bytes=1000012
metrics.num.samples=2
metrics.sample.window.ms=30000
min.insync.replicas=1
num.io.threads=8
num.network.threads=8
num.partitions=3
num.recovery.threads.per.data.dir=1
num.replica.fetchers=1
offset.metadata.max.bytes=4096
offsets.commit.required.acks=-1
offsets.commit.timeout.ms=5000
offsets.load.buffer.size=5242880
offsets.retention.check.interval.ms=600000
offsets.retention.minutes=2880
offsets.topic.compression.codec=0
offsets.topic.num.partitions=50
offsets.topic.replication.factor=3
offsets.topic.segment.bytes=104857600
principal.builder.class=org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
producer.purgatory.purge.interval.requests=100
queued.max.requests=16
quota.consumer.default=9223372036854775807
quota.producer.default=9223372036854775807
quota.window.num=11
quota.window.size.seconds=1
replica.fetch.backoff.ms=1000
replica.fetch.max.bytes=1048576
replica.fetch.min.bytes=1
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.lag.time.max.ms=10000
replica.socket.receive.buffer.bytes=65536
replica.socket.timeout.ms=30000
request.timeout.ms=30000
reserved.broker.max.id=100000
security.inter.broker.protocol=SSL
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
socket.send.buffer.bytes=1048576
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.key.password=...
ssl.keymanager.algorithm=SunX509
ssl.keystore.location=/srv/kafka/current/ssl/server.keystore.jks
ssl.keystore.password=...
ssl.keystore.type=JKS
ssl.protocol=TLS
ssl.trustmanager.algorithm=PKIX
ssl.truststore.location=/srv/kafka/current/ssl/server.truststore.jks
ssl.truststore.password=...
ssl.truststore.type=JKS
unclean.leader.election.enable=false
zookeeper.connect=localhost:2181
zookeeper.session.timeout.ms=6000
zookeeper.set.acl=false
zookeeper.sync.time.ms=2000