首页 文章

不能使用kafka for python

提问于
浏览
0

python:2.6.6

kafka-python:1.4.3

i had run the kafka producer,but it always tips me this error:

Traceback(最近一次调用最后一次):文件“KafkaOperation.py”,第11行,来自kafka import KafkaProducer文件“/usr/lib/python2.6/site-packages/kafka/init.py”,第21行,in来自kafka.consumer从Kafka.consumer.group导入KafkaConsumer文件“/usr/lib/python2 .6 / site-packages / kafka / consumer / group.py“,第13行,来自kafka.consumer.fetcher导入Fetcher文件”/usr/lib/python2.6/site-packages/kafka/consumer/fetcher.py “,第19行,来自kafka.record import MemoryRecords File”/usr/lib/python2.6/site-packages/kafka/record/init.py“,第1行,来自kafka.record.memory_records import MemoryRecords File” /usr/lib/python2.6/site-packages/kafka/record/memory_records.py“,第27行,来自kafka.record.default_records import DefaultRecordBatch,DefaultRecordBatchBuilder File”/usr/lib/python2.6/site-packages /kafka/record/default_records.py“,第338行,类DefaultRecordBatchBuilder(DefaultRe) cordBase,ABCRecordBatchBuilder):文件“/usr/lib/python2.6/site-packages/kafka/record/default_records.py”,第378行,在DefaultRecordBatchBuilder中byte_like =(bytes,bytearray,memoryview),NameError:name'memoryview'没有定义

the codes these:

from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json

class Kafka_producer():

    def __init__(self, kafkahost,kafkaport, kafkatopic):
        self.kafkatopic = kafkatopic
        service_host = kafkahost+":"+kafkaport
        self.producer = KafkaProducer(bootstrap_servers=service_host)

    def sendjsondata(self, params):
        try:
            # parmas_message = json.dumps(params)
            producer = self.producer
            futur = producer.send(self.kafkatopic, params.encode('utf-8'))
            res = futur.get(timeout=60)
            producer.flush()
            producer.close()
        except KafkaError as e:
            print e

if __name__ == '__main__':
    # test = {
    #     "test":"testtets"
    # }
    # Kafka_producer("http://10.25.245.192","9092","nori-log").sendjsondata(test)
    producer = KafkaProducer(bootstrap_servers='10.25.245.192:9092')
    for _ in range(100):
        producer.send('nori-log', {"test":"test_content"})

And i had exchange the toronto version to 2.2.1

1 回答

相关问题