我正在尝试使用kafka-python包创建简单的Kafka Producer Consumer . 我的控制台 生产环境 者和消费者能够发送/接收消息 . 此外, Producer.py 能够发送消息,但 Consumer.py 正面临问题 . Consumer.py只是挂起来调用 KafkaConsumer
并且没有通过该调用 .
Zookeeper配置和Kafka Server配置大多是默认配置 .
取消注释server.properties中的以下行
listeners=PLAINTEXT://localhost:9092
Zookeeper Start Command:
C:\kafka_2.12-1.0.0\kafka_2.12-1.0.0\bin\windows>zookeeper-server-start.bat .\..\..\config\zookeeper.properties
Server Start Command:
C:\kafka_2.12-1.0.0\kafka_2.12-1.0.0\bin\windows>kafka-server-start.bat .\..\..\config\server.properties
Topic Create Command:
C:\kafka_2.12-1.0.0\kafka_2.12-1.0.0\bin\windows>kafka-topics.bat --create --topic mytopic --zookeeper localhost:2181 --replication-factor 1 --partitions 1
Producer.py
from kafka import KafkaProducer
if __name__ == "__main__":
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], api_version=(0,10))
for i in range(10):
print("Sending" +str(i))
producer.send('mytopic', key=str.encode('key_{}'.format(i)),
value=b'some_message_bytes')
Consumer.py
from kafka import KafkaConsumer, SimpleClient, SimpleConsumer, TopicPartition
import os
if __name__ == "__main__":
print("Creating kafka Consumer")
consumer = KafkaConsumer('mytopic',
bootstrap_servers=['localhost:9092'],
api_version=(0, 10)) ######## HUNG HERE ########
print("Listening for messages")
for message in consumer:
print(message)