首页 文章

使用TimeBasedPartitioner的Kafka Connect S3 Connector OutOfMemory错误

提问于
浏览
1

我正在使用Kafka Connect S3 Sink Connector 3.3.1将Kafka消息复制到S3,并且在处理延迟数据时出现OutOfMemory错误 .

我知道这看起来像一个很长的问题,但我尽力使其清楚易懂 . 我非常感谢你的帮助 .

高级信息

  • 连接器执行Kafka消息的简单字节到字节副本,并在字节数组的开头添加消息的长度(用于解压缩) .

  • 这是 CustomByteArrayFormat 类的作用(参见下面的配置)

  • 根据 Record 时间戳对数据进行分区和分区

  • CustomTimeBasedPartitioner 扩展 io.confluent.connect.storage.partitioner.TimeBasedPartitioner ,其唯一目的是覆盖 generatePartitionedPath 方法,将主题放在路径的末尾 .

  • Kafka Connect进程的总堆大小为24GB(仅一个节点)

  • 连接器每秒处理8,000到10,000条消息

  • 每条消息的大小接近1 KB

  • Kafka主题有32个分区

OutOfMemory错误的上下文

  • 这些错误仅在连接器停机几个小时并且必须赶上数据时才会发生

  • 重新打开连接器时,它开始赶上,但在OutOfMemory错误的情况下很快失败

可能但不完整的解释

  • 当发生OOM错误时,连接器的 timestamp.extractor 配置设置为 Record

  • 将此配置切换为 Wallclock (即Kafka Connect进程的时间)不要抛出OOM错误,并且可以处理所有后期数据,但不再正确地删除后期数据

  • 所有迟到的数据将在连接器重新打开的 YYYY/MM/dd/HH/mm/topic-name 中被删除

  • 所以我的猜测是,当连接器试图根据 Record 时间戳正确地存储数据时,它会执行太多并行读取导致OOM错误

  • "partition.duration.ms": "600000" 参数使连接器桶数据以每小时6个10分钟的路径生成( 2018/06/20/12/[00|10|20|30|40|50] ,2018-06-20,下午12点)

  • 因此,对于24h的后期数据,连接器必须在 24h * 6 = 144 不同的S3路径中输出数据 .

  • 每10分钟文件夹包含10,000条消息/秒* 600秒= 6,000,000条消息,大小为6 GB

  • 如果它确实并行读取,那将使864GB的数据进入内存

  • 我认为我必须正确配置一组给定的参数以避免那些OOM错误,但我不觉得我看到了大局

  • "flush.size": "100000" 暗示如果读取更多dans 100,000条消息,则应将它们提交给文件(从而释放内存)

  • 使用1KB的消息,这意味着每100MB提交一次

  • 但即使有144个并行读数,仍然只能提供14.4 GB,这小于24GB的堆大小

  • "flush.size" 是否在提交前读取 per partition 的记录数?或者 per connector's task

  • 我理解 "rotate.schedule.interval.ms": "600000" config的方式是,即使没有达到 flush.size 的100,000条消息,数据也将每10分钟提交一次 .

My main question would be what are the maths allowing me to plan for memory usage given:

  • 每秒的数字或记录

  • 记录的大小

  • 我读取的主题的Kafka分区数

  • 连接器任务的数量(如果这是相关的)

  • 每小时写入的桶数(此处为6,因为 "partition.duration.ms": "600000" 配置)

  • 要处理的延迟数据的最大小时数

配置

S3 Sink Connector配置

{
  "name": "xxxxxxx",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "s3.region": "us-east-1",
    "partition.duration.ms": "600000",
    "topics.dir": "xxxxx",
    "flush.size": "100000",
    "schema.compatibility": "NONE",
    "topics": "xxxxxx,xxxxxx",
    "tasks.max": "16",
    "s3.part.size": "52428800",
    "timezone": "UTC",
    "locale": "en",
    "format.class": "xxx.xxxx.xxx.CustomByteArrayFormat",
    "partitioner.class": "xxx.xxxx.xxx.CustomTimeBasedPartitioner",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "name": "xxxxxxxxx",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "s3.bucket.name": "xxxxxxx",
    "rotate.schedule.interval.ms": "600000",
    "path.format": "YYYY/MM/dd/HH/mm",
    "timestamp.extractor": "Record"
}

Worker 配置

bootstrap.servers=XXXXXX
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
consumer.auto.offset.reset=earliest
consumer.max.partition.fetch.bytes=2097152
consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
group.id=xxxxxxx
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
rest.advertised.host.name=XXXX

Edit

我忘了添加一个错误示例:

2018-06-21 14:54:48,644] ERROR Task XXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482)
java.lang.OutOfMemoryError: Java heap space
[2018-06-21 14:54:48,645] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:483)
[2018-06-21 14:54:48,645] ERROR Task XXXXXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

1 回答

  • 2

    我终于能够理解堆大小使用在Kafka Connect S3 Connector中是如何工作的

    • S3 Connector会将每个Kafka分区的数据写入分区 paths

    • paths 分区的方式取决于 partitioner.class 参数;

    • 默认情况下,它是按时间戳记的,然后 partition.duration.ms 的值将确定每个分区的持续时间 paths .

    • S3 Connector将为每个Kafka分区分配一个 s3.part.size 字节的缓冲区(对于读取的所有主题)和每个分区 paths

    • 读取20个分区的示例, timestamp.extractor 设置为 Recordpartition.duration.ms 设置为1h, s3.part.size 设置为50 MB

    • 堆大小然后每小时需要等于 20 * 50 MB = 1 GB;

    • 但是, timestamp.extractor 被设置为 Record ,具有与之前小时相对应的时间戳的消息将在此较早的小时缓冲区中缓冲 . 因此,实际上,连接器将需要最小的内存,因为总是有迟到的事件,如果有迟到超过1小时的事件则更多;

    • 请注意,如果 timestamp.extractor 设置为 Wallclock ,则不是这样,因为就Kafka Connect而言,几乎不会出现延迟事件 .

    • 这些缓冲区在3个条件下被刷新(即保留内存)

    • rotate.schedule.interval.ms 时间过去了

    • 此刷新条件是 always 已触发 .

    • rotate.interval.ms 时间已过 in terms of timestamp.extractor time

    • 这意味着如果 timestamp.extractor 设置为 Record ,则 Record 时间的10分钟可以传递的时间少于或多于10分钟实际时间

    • 例如,处理延迟数据时,将在几秒钟内处理10分钟的数据,如果 rotate.interval.ms 设置为10分钟,则此条件将每秒触发(应该如此);

    • 相反,如果事件流中存在暂停,则在发现时间戳显示自上次触发条件后已超过 rotate.interval.ms 的事件之前,此条件不会触发 .

    • flush.size 消息的读取时间小于 min(rotate.schedule.interval.msrotate.interval.ms)

    • 对于 rotate.interval.ms ,如果没有足够的消息,这种情况可能永远不会触发 .

    • 因此,您至少需要计划 Kafka partitions * s3.part.size 堆大小

    • 如果您使用 Record 时间戳进行分区,则应将其乘以 max lateness in milliseconds / partition.duration.ms

    • 这是最糟糕的情况,您在所有分区和 max lateness in milliseconds 的所有范围内都有不断发生的事件 .

    • 当从Kafka读取时,S3连接器还将为每个分区缓冲 consumer.max.partition.fetch.bytes 个字节

    • 默认设置为2.1 MB .

    • 最后,您不应该考虑所有的堆大小都可用于缓冲Kafka消息,因为其中还有很多不同的对象

    • 一个安全的考虑因素是确保Kafka消息的缓冲不超过总可用堆大小的50% .

相关问题