在上面的代码中,我们首先导入了KafkaProducer类,然后创建了一个生产者对象,并指定了Kafka集群的地址。接着,我们调用send()方法将消息发送到名为“test”的主题中。 三、消费者
在Kafka中,消费者负责从Kafka集群中消费消息。Python中使用Kafka-Python包可以轻松实现消费者功能。下面是一个消费者的示例代码:
from kafka import KafkaConsumer
consumer = KafkaConsumer('test', bootstrap_servers=['localhost:9092'])
for message in consumer:
print(message.value)
在上面的代码中,我们首先导入了KafkaConsumer类,然后创建了一个消费者对象,并指定了Kafka集群的地址和要消费的主题。接着,我们使用for循环遍历消费者返回的消息,并打印出消息的内容。 四、批量发送和批量消费
在实际应用中,我们通常需要批量发送和批量消费消息。Kafka-Python包提供了批量发送和批量消费的功能。下面是一个批量发送和批量消费消息的示例代码:
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
for i in range(10):
message = 'Message {}'.format(i)
future = producer.send('test', bytes(message, 'utf-8'))
try:
record_metadata = future.get(timeout=10)
print('Message {} sent to partition {} with offset {}'.format(message, record_metadata.partition, record_metadata.offset))
except KafkaError as e:
print('Failed to send message {}: {}'.format(message, e))
consumer = KafkaConsumer('test', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=True, group_id='my-group', max_poll_records=10)
while True:
messages = consumer.poll(timeout_ms=1000)
if not messages:
continue
for topic_partition, records in messages.items():
for record in records:
print(record.value.decode('utf-8'))