发布时间:2019-09-23 17:04:04编辑:auto阅读(1766)
程序分为productor.py是发送消息端,consumer为消费消息端,
启动的时候先启动product再启动consumer,毕竟只有发了消息,消费端才有消息可以消费,
productor.py
#!/usr/bin/env python2.7
#_*_coding: utf-8 _*_
from kafka import KafkaProducer
kafka_host = '192.168.1.200' # kafka服务器地址
kafka_port = 9092 # kafka服务器的端口
producer = KafkaProducer(bootstrap_servers=['{kafka_host}:{kafka_port}'.format(
kafka_host = kafka_host,
kafka_port = kafka_port
)])
#简单for循环10次,发送10条消息
for i in range(1,10):
message_string = 'some message'.format(i)
#调用send方法,发送名字为'topic1'的topicid ,发送的消息为message_string
response = producer.send('topic1', message_string.encode('utf-8'))
print responseconsumer.py
#!/usr/bin/env python
#_*_coding: utf-8 _*_
import json
from kafka import *
kafka_host = '192.168.1.200' # kafka服务器地址
kafka_port = 9092 # kafka服务器端口
#消费topic1的topic,并指定group_id(自定义),多个机器或进程想顺序消费,可以指定同一个group_id,
# 如果想一条消费多次消费,可以换一个group_id,会从头开始消费
consumer = KafkaConsumer(
'topic1',
group_id = 'my-group',
bootstrap_servers = ['{kafka_host}:{kafka_port}'.format(kafka_host=kafka_host, kafka_port=kafka_port)]
)
for message in consumer:
#json读取kafka的消息
content = json.loads(message.value)
print content
上一篇: python3.x下 smtp发送htm
下一篇: 编写兼容 Python 2.x 和 3.
51277
50721
41324
38137
32598
29506
28356
23226
23192
21515
1590°
2309°
1921°
1862°
2188°
1904°
2595°
4344°
4192°
2986°