Python脚本消费kafka数据

发布时间:2019-08-30 09:29:51编辑:auto阅读(2304)

    kafka简介(摘自百度百科)
    
    一、简介:
    详见:https://blog.csdn.net/Beyond_F4/article/details/80310507
    
    二、安装
    详见博客:https://blog.csdn.net/beyond_f4/article/details/80095689              
    
    三、按照官网的样例,先跑一个应用
    1、生产者:
    from kafka import KafkaProducer
    
    producer = KafkaProducer(bootstrap_servers=['172.21.10.136:9092'])  #此处ip可以是多个['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ]
    
    for i in range(3):
        msg = "msg%d" % i
        producer.send('test', msg)
    producer.close()
    
    2、消费者(简单demo):
    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer('test',
                             bootstrap_servers=['172.21.10.136:9092'])
                             
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                              message.offset, message.key,
                                              message.value))
    
    
    启动后生产者、消费者可以正常消费。
    
    3、消费者(消费群组)
    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer('test',
                             group_id='my-group',
                             bootstrap_servers=['172.21.10.136:9092'])
                             
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                              message.offset, message.key,
                                              message.value))
                                              
    启动多个消费者,只有其中可以可以消费到,满足要求,消费组可以横向扩展提高处理能力
    
    4、消费者(读取目前最早可读的消息)
    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer('test',
                             auto_offset_reset='earliest',
                             bootstrap_servers=['172.21.10.136:9092'])
                             
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                              message.offset, message.key,
                                              message.value))
                                             
    auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest
    源码定义:{'smallest': 'earliest', 'largest': 'latest'}
    
    5、消费者(手动设置偏移量)
    from kafka import KafkaConsumer
    from kafka.structs import TopicPartition
    
    consumer = KafkaConsumer('test',
                             bootstrap_servers=['172.21.10.136:9092'])
    
    print consumer.partitions_for_topic("test")  #获取test主题的分区信息
    print consumer.topics()  #获取主题列表
    print consumer.subscription()  #获取当前消费者订阅的主题
    print consumer.assignment()  #获取当前消费者topic、分区信息
    print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量
    consumer.seek(TopicPartition(topic=u'test', partition=0), 5)  #重置偏移量,从第5个偏移量消费
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                              message.offset, message.key,
                                              message.value))
                                              
    6、消费者(订阅多个主题)
    from kafka import KafkaConsumer
    from kafka.structs import TopicPartition
    
    consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
    consumer.subscribe(topics=('test','test0'))  #订阅要消费的主题
    print consumer.topics()
    print consumer.position(TopicPartition(topic=u'test', partition=0)) #获取当前主题的最新偏移量
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                              message.offset, message.key,
                                              message.value))
                                              
    7、消费者(手动拉取消息)
    from kafka import KafkaConsumer
    import time
    
    consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
    consumer.subscribe(topics=('test','test0'))
    while True:
        msg = consumer.poll(timeout_ms=5)   #从kafka获取消息
        print msg
        time.sleep(1)
        
    8、消费者(消息挂起与恢复)
    from kafka import KafkaConsumer
    from kafka.structs import TopicPartition
    import time
    
    consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
    consumer.subscribe(topics=('test'))
    consumer.topics()
    consumer.pause(TopicPartition(topic=u'test', partition=0))
    num = 0
    while True:
        print num
        print consumer.paused()   #获取当前挂起的消费者
        msg = consumer.poll(timeout_ms=5)
        print msg
        time.sleep(2)
        num = num + 1
        if num == 10:
            print "resume..."
            consumer.resume(TopicPartition(topic=u'test', partition=0))
            print "resume......"
            
    pause执行后,consumer不能读取,直到调用resume后恢复。

    如果对您有帮助,记得给我点赞诺

    如果对您有帮助,记得给我点赞诺

关键字