首页 文章

如何提高从kafka读取的性能并使用kafka Stream Application转发到kafka

提问于
浏览
1

我有1.0.0 Kafka流API的Kafka流应用程序 . 我有单个经纪人0.10.2.0 kafka和单个主题与单个分区 . 除 生产环境 者request.timeout.ms之外,所有可配置参数都相同 . 我用5分钟配置了 生产环境 者request.timeout.ms来修复Kafka Streams program is throwing exceptions when producing问题 .

在我的流应用程序中,我阅读了Kafka的事件,处理它们并转发到同一个kafka的另一个主题 .

在计算统计数据后,我观察到处理占用了5%的时间,剩余95%的时间用于读写 .

即使我在Kafka有数千万个事件,有时Kafka民意调查返回单个数字的记录,有时Kafka民意调查返回了数千个记录 .

有时候上下文转发需要更多的时间来向kafka发送更少的记录,有时候上下文转发花费更少的时间来向kafka发送更多的记录 .

我尝试通过增加max.poll.records,poll.ms值来提高读取性能 . 但没有运气 .

如何在阅读和转发时提高性能? kafka民意调查和前锋如何运作?什么参数有助于提高性能?

以下是我的应用程序中几个重要的生成器配置参数

acks = 1
batch.size = 16384
buffer.memory = 33554432
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
linger.ms = 100
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 240000
retries = 10
retry.backoff.ms = 100
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
transaction.timeout.ms = 60000
transactional.id = null

以下是我的应用程序中几个重要的消费者配置参数

auto.commit.interval.ms = 5000
auto.offset.reset = earliest
check.crcs = true
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
heartbeat.interval.ms = 3000
internal.leave.group.on.close = false
isolation.level = read_uncommitted
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 10000
metadata.max.age.ms = 300000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000

以下是我的应用程序中几个重要的流配置参数:

application.server =
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
commit.interval.ms = 30000
connections.max.idle.ms = 540000
key.serde = null
metadata.max.age.ms = 300000
num.standby.replicas = 0
num.stream.threads = 1
poll.ms = 1000
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
timestamp.extractor = null
value.serde = null
windowstore.changelog.additional.retention.ms = 86400000
zookeeper.connect =

2 回答

  • 0

    您可以通过控制密钥和增加主题的分区数量来实现操作的并行性 .

    以上将增加Kafka流的数量并行处理 . 这可以通过增加Kafka流应用程序的线程数来处理

  • 0

    您可以在不同的线程中创建多个Kafka Consumer,并将其分配给同一个使用者组 . 他们将并行使用消息,不会丢失消息 .

    你如何发送消息?使用Kafka,您可以以Fire-and-Forget方式发送消息:它可以提高吞吐量 .

    producer.send(record);
    

    acks参数控制在 生产环境 者可以认为写入成功之前必须接收记录的分区副本数 .

    如果设置 ack=0 ,则在假定消息已成功发送之前, 生产环境 者不会等待代理的回复 . 但是,由于 生产环境 者不等待来自服务器的任何响应,它可以以网络支持的速度发送消息,因此可以使用此设置来实现非常高的吞吐量 .

相关问题