首页 文章

Kafka消费者解压缩gz文件流并阅读

提问于
浏览
1

Kafka 生产环境 者正在发送.gz文件,但无法在消费者端解压缩和读取文件 . 获取错误为“IOError:不是gzip压缩文件”

制片人 - bin / kafka-console-producer.sh --broker-list localhost:9092 --topic Airport <〜/ Downloads / stocks.json.gz

消费者 -

import sys 
import gzip
import StringIO
from kafka import KafkaConsumer

consumer = KafkaConsumer(KAFKA_TOPIC, bootstrap_servers=KAFKA_BROKERS)

try:
    for message in consumer:
        f = StringIO.StringIO(message.value)
        gzip_f = gzip.GzipFile(fileobj=f)
        unzipped_content = gzip_f.read()
        content = unzipped_content.decode('utf8')
        print (content)
except KeyboardInterrupt:
    sys.exit()

消费者的错误 -

Traceback (most recent call last):
  File "consumer.py", line 18, in <module>
    unzipped_content = gzip_f.read()
  File "/usr/lib64/python2.6/gzip.py", line 212, in read
    self._read(readsize)
  File "/usr/lib64/python2.6/gzip.py", line 255, in _read
    self._read_gzip_header()
  File "/usr/lib64/python2.6/gzip.py", line 156, in _read_gzip_header
    raise IOError, 'Not a gzipped file'
IOError: Not a gzipped file

1 回答

  • 0

    Kafka不是用于发送大量有效载荷/消息 . 您应该将其视为分布式消息总线,它为您提供分布式系统的所有权限 .

    由于以下原因,Kafka限制了可以发送的消息的大小

    • 巨大的消息增加了代理中的内存压力 .

    • 大邮件会降低代理的速度并处理它们非常昂贵 .

    解:

    • 您可以很好地使用基于参考的消息传递,您可以将巨大消息的位置发送给消费者,而不是按原样发送大量数据 . 这将允许您使用外部数据存储的功能,并减少Kafka Brokers的压力 .

    • 您还可以对数据进行分块并将其内联发送并在接收器处重新组装 .

    使用批量大小:

    batch.size 以总字节数而不是消息数来度量批量大小 . 它控制在向Kafka代理发送消息之前要收集的数据字节数 . 在不超出可用内存的情况下将其设置得尽可能高 . 默认值为 16384 .

    如果增加缓冲区的大小,它可能永远不会满 . Producer最终根据其他触发器发送信息,例如逗留时间(以毫秒为单位) . 虽然您可以通过将缓冲区批处理大小设置得过高来减少内存使用量,但这不会影响延迟 .

    如果您的制作人一直在发送,您可能正在获得最佳吞吐量 . 如果 生产环境 者经常闲置,您可能没有编写足够的数据来保证当前的资源分配 .

    因为,您的数据是 gzip ,您可以使用 Reference Based Messaging .

    而不是使用提取大小和消息最大字节大小(不能覆盖所有文件大小)将文件存储在分布式文件系统(如NFS / HDFS / S3)上,并将引用发送给使用者 . 消费者可以选择位置并解压缩数据 .

相关问题