A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

© 陈泽 中级黑马   /  2019-10-30 10:34  /  1317 人查看  /  0 人回复  /   0 人收藏 转载请遵从CC协议 禁止商业使用本文

kafka特性:
  通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
  高吞吐量[2] :即使是非常普通的硬件Kafka也可以支持每秒数百万[2] 的消息
  支持通过Kafka服务器和消费机集群来分区消息
  支持Hadoop并行数据加载
术语:
  Broker
  Kafka集群包含一个或多个服务器,这种服务器被称为broker
Topic
  每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
Partition
  Partition是物理上的概念,每个Topic包含一个或多个Partition.
Producer
  负责发布消息到Kafka broker
Consumer
  消息消费者,向Kafka broker读取消息的客户端。
Consumer Group
  每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
一、安装
在pypi.python.org有很多关于操作kafka的组件,我们选择weight最高的kafka 1.3.5
1、有internet网的情况下执行如下命令安装:
     
[Python] 纯文本查看 复制代码
pip install kafka[/size][/font][/color][/align]easy_install kafka

2、无internet网的情况下把源码下载下来,上传到需要安装的主机
     压缩包:kafka-x.x.x.tar.gz
     解压: tar xvf kafka-x.x.x.tar.gz
    执行安装命令: cd kafka-x.x.x
    python setup.py install
    如安装报依赖错误,需要把依赖的组件也下载下来,然后进行安装,同样的方法,不赘述!
二、按照官网的样例,先跑一个应用
1、生产者:
[Python] 纯文本查看 复制代码
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['xxx.xx.xx.xxx:9092']) #此处ip可以是多个['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ]

for i in range(3):
msg = "msg%d " % i
producer.send('test', msg)
producer.close()

2、消费者(简单demo):

[Python] 纯文本查看 复制代码
from kafka import KafkaConsumer

consumer = KafkaConsumer('test',
bootstrap_servers=['xxx.xx.xx.xx:9092'])
for message in consumer:
print ("%s:%d:%d: key=%s value=%s " % (message.topic, message.partition,
message.offset, message.key,
message.value))

启动后生产者、消费者可以正常消费。

3、消费者(消费群组)
[Python] 纯文本查看 复制代码
from kafka import KafkaConsumer

consumer = KafkaConsumer('test',
group_id='my-group',
bootstrap_servers=['xxx.xx.xx.xx:9092'])
for message in consumer:
print ("%s:%d:%d: key=%s value=%s " % (message.topic, message.partition,
message.offset, message.key,
message.value))

启动多个消费者,只有其中可以可以消费到,满足要求,消费组可以横向扩展提高处理能力

4、消费者(读取目前最早可读的消息)
[Python] 纯文本查看 复制代码
from kafka import KafkaConsumer

consumer = KafkaConsumer('test',
auto_offset_reset='earliest',
bootstrap_servers=['xxx.xx.xx.xxx:9092'])
for message in consumer:
print ("%s:%d:%d: key=%s value=%s " % (message.topic, message.partition,
message.offset, message.key,
message.value))
auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest
源码定义:{'smallest': 'earliest', 'largest': 'latest'}

5、消费者(手动设置偏移量)
[Python] 纯文本查看 复制代码
from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer('test',
bootstrap_servers=['xxx.xx.xx.xxx:9092'])

print consumer.partitions_for_topic("test ") #获取test主题的分区信息
print consumer.topics() #获取主题列表
print consumer.subscription() #获取当前消费者订阅的主题
print consumer.assignment() #获取当前消费者topic、分区信息
print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量
consumer.seek(TopicPartition(topic=u'test', partition=0), 5) #重置偏移量,从第5个偏移量消费
for message in consumer:
print ("%s:%d:%d: key=%s value=%s " % (message.topic, message.partition,
message.offset, message.key,
message.value))

6、消费者(订阅多个主题)
[Python] 纯文本查看 复制代码
from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer(bootstrap_servers=['xxx.xx.xx.xxx:9092'])
consumer.subscribe(topics=('test','test0')) #订阅要消费的主题
print consumer.topics()
print consumer.position(TopicPartition(topic=u'test', partition=0)) #获取当前主题的最新偏移量
for message in consumer:
print ("%s:%d:%d: key=%s value=%s " % (message.topic, message.partition,
message.offset, message.key,
message.value))

7、消费者(手动拉取消息)
[Python] 纯文本查看 复制代码
from kafka import KafkaConsumer
import time

consumer = KafkaConsumer(bootstrap_servers=['xxx.xx.xx.xxx:9092'])
consumer.subscribe(topics=('test','test0'))
while True:
msg = consumer.poll(timeout_ms=5) #从kafka获取消息
print msg
time.sleep(1)

8、消费者(消息挂起与恢复)
[Python] 纯文本查看 复制代码
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time

consumer = KafkaConsumer(bootstrap_servers=['xxx.xx.xx.xxx:9092'])
consumer.subscribe(topics=('test'))
consumer.topics()
consumer.pause(TopicPartition(topic=u'test', partition=0))
num = 0
while True:
print num
print consumer.paused() #获取当前挂起的消费者
msg = consumer.poll(timeout_ms=5)
print msg
time.sleep(2)
num = num + 1
if num == 10:
print "resume... "
consumer.resume(TopicPartition(topic=u'test', partition=0))
print "resume...... "
pause执行后,consumer不能读取,直到调用resume后恢复。















0 个回复

您需要登录后才可以回帖 登录 | 加入黑马