首页 文章

如何使用 Python decode/deserialize Kafka Avro 字符串

提问于
浏览
0

我从 Python 中接收远程服务器 Kafka Avro 消息(使用 Confluent Kafka Python 库的消费者),它使用 json 字典表示点击流数据,其中包含用户代理,位置,URL 等字段。这是消息的样子:

b'\x01\x00\x00\xde\x9e\xa8\xd5\x8fW\xec\x9a\xa8\xd5\x8fW\x1axxx.xxx.xxx.xxx\x02:https://website.in/rooms/\x02Hhttps://website.in/wellness-spa/\x02\xaa\x14\x02\x9c\n\x02\xaa\x14\x02\xd0\x0b\x02V0:j3lcu1if:rTftGozmxSPo96dz1kGH2hvd0CREXmf2\x02V0:j3lj1xt7:YD4daqNRv_Vsea4wuFErpDaWeHu4tW7e\x02\x08null\x02\nnull0\x10pageview\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x10Thailand\x02\xa6\x80\xc4\x01\x02\x0eBangkok\x02\x8c\xba\xc4\x01\x020*\xa9\x13\xd0\x84+@\x02\xec\xc09#J\x1fY@\x02\x8a\x02Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/58.0.3029.96 Chrome/58.0.3029.96 Safari/537.36\x02\x10Chromium\x02\x10Chromium\x028Google Inc. and contributors\x02\x0eBrowser\x02\x1858.0.3029.96\x02"Personal computer\x02\nLinux\x02\x00\x02\x1cCanonical Ltd.'

怎么解码呢?我尝试过 bson 解码,但字符串未被识别为 UTF-8,因为它是我猜的特定 Avro 编码。我发现https://github.com/verisign/python-confluent-schemaregistry但它只支持 Python 2.7. 理想情况下,我希望使用 Python 3.5 和 MongoDB 来处理数据并将其存储为我当前的基础架构。

1 回答

  • 1

    我认为 Avro 库只是为了阅读 Avro 文件,但它实际上解决了解码 Kafka 消息的问题,如下所示:我首先导入库并将模式文件作为参数,然后创建一个函数将消息解码为字典,我可以在消费者循环中使用。

    import io
    
    from confluent_kafka import Consumer, KafkaError
    from avro.io import DatumReader, BinaryDecoder
    import avro.schema
    
    schema = avro.schema.Parse(open("data_sources/EventRecord.avsc").read())
    reader = DatumReader(schema)
    
    def decode(msg_value):
        message_bytes = io.BytesIO(msg_value)
        decoder = BinaryDecoder(message_bytes)
        event_dict = reader.read(decoder)
        return event_dict
    
    c = Consumer()
    c.subscribe(topic)
    running = True
    while running:
        msg = c.poll()
        if not msg.error():
            msg_value = msg.value()
            event_dict = decode(msg_value)
            print(event_dict)
        elif msg.error().code() != KafkaError._PARTITION_EOF:
            print(msg.error())
            running = False
    

相关问题