安装kafka库
pip3 install kafka-python
代码
from kafka import KafkaProducer,KafkaConsumer,KafkaAdminClient
import json
import time
topic = 'number'
bootstrap_servers=['127.0.0.1:9092']
消费者
consumer = KafkaConsumer(topic,
group_id=group_id,
bootstrap_servers=bootstrap_servers)
for message in consumer:
print ("value=%s" % (message.value))
生产者
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
for i in range(100):
producer.send(topic, json.dumps({'number': i}).encode('utf-8'))
producer.flush()
topic增加分区
from kafka.admin import NewPartitions
admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
topic_partitions = {}
topic_partitions[topic] = NewPartitions(total_count=5)
admin_client.create_partitions(topic_partitions)
列出所有消费者
from kafka import KafkaAdminClient
def list_consumer_groups(broker):
admin_client = KafkaAdminClient(bootstrap_servers=broker)
# List all the consumer groups
groups = admin_client.list_consumer_groups()
return [group[0] for group in groups]
broker_address = "localhost:9092"
print(list_consumer_groups(broker_address))