1.创建生产者
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(
bootstrap_servers=['127.0.0.1:5000', '127.0.0.1:5001', '127.0.0.1:5002'])
future = producer.send("pic_collect", b'I am rito yan')
try:
record_metadata = future.get(timeout=10)
print(record_metadata)
except KafkaError as e:
print(e)
2.创建消费者:
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"pic_collect",
group_id="pic_consumer",
bootstrap_servers=['127.0.0.1:5000', '127.0.0.1:5001', '127.0.0.1:5002'])
for message in consumer:
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))