A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始


kafka概念
D:由Apache公司开发的一款分布式消息中间件
E:处理实时数据,保证资源统一,高通量,低等待的

kafka应用场景
1.日志收集:收集各个分布式服务的日志进行管理,通过kafka以统一接口管理服务的方式开发给各个consumer
2.消息系统:解耦,缓存消息等
3.用户记录追踪:把用户浏览的web或者app的活动信息通过各个服务器发布到kafka的topic中进行实时监控分析

Kafka整体架构




1.Producer(生产者)---向队列发送消息,将一条记录添加到topic(队列组)的某个partition(分区)中
2.Consumer(消费者) ---从队列中消费消息,每个consumer都有对应的group(消费组)
3.Broker(中间人)  ---每个kafka实例都是一个broker ,每个broker里多个topic

4.topic(队列组) ---用于分类消息,本质是一个目录,里面有partition
5.partition(分区) ---一个topic里可有1-n分区,创建topic时可设置分区数,单机版的topic默认只有一个分区
6.offset(下标)---分区里的每条消息都被标上有序的数字,记录它的位置


7. 消费组要点:
①可以有多个消费组(consumer group)
②每个消费组里可有多个consumer,每个consumer没消费一个消息,offset+1
③同组consumer属于竞争关系,共享offset,同一消息在组内只能被消费一次,不同组之间互不影响,offset独立
④同组同时消费同一topic,实现负载均衡,不同组消费同一topic,可重复消费。

kafka mac安装
1. 安装java8扩展
brew cask install java8
2.安装kafka(会默认也安装kafka扩展zookeeper)
brew install kafka
3.启动zookeeper和kafka服务
brew services start zookeeperbrew services start kafka
4.创建topic(消息队列组)
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytopic-01#使用zookeeper来创建topic,端口默认2181,复制因子1个,topic中partitions 1个,指定topic名
5.查看创建的topic
kafka-topics --list --zookeeper localhost:2181

交互模式:
6. 发送一些消息
kafka-console-producer --broker-list localhost:9092 --topic mytopic-01
7. 消费消息
kafka-console-consumer --bootstrap-server localhost:9092 --topic mytopic-01 --from-beginning

kafka配置
#配置文件路径:#zookeeper配置文件/usr/local/etc/kafka/zookeeper.propertie#kafka配置文件/usr/local/etc/kafka/server.properties#broker.id 申明当前kafka服务器在集群中的唯一ID,需配置为integer,并且集群中的每一个kafka服务器的id都应是唯一的,我们这里采用默认配置即可#listeners 申明此kafka服务器需要监听的端口号,如果是在本机上跑虚拟机运行可以不用配置本项,默认会使用localhost的地址,如果是在远程服务器上运行则必须配置,例如:listeners=PLAINTEXT://localhost:9092。并确保服务器的9092端口能够访问#zookeeper.connect 申明kafka所连接的zookeeper的地址 ,需配置为zookeeper的地址,由于本次使用的是kafka高版本中自带zookeeper,使用默认配置即可zookeeper.connect=localhost:2181

kafka-python
安装:pip install kafka-python
使用:
①生产者
import kafka#参数1:kafka服务'ip:port'或者服务列表 #参数asks:当acks=0时,producer成功写入消息时不等待其他服务器响应,吞吐量低,存在消息丢失风险;acks=1时,当集群master节点确认得到响应,如果错误会重发;acks=all或acks=-1时,等待集群参与复制的节点全部确认才会得到响应,消息最为安全,吞吐量大#参数partitioner分区器:默认使用DefaultPartitioner,随机分配分区producer=kafka.KafkaProducer(bootstrap_servers='127.0.0.1:9092',ack='all') for i in range(10):    msg='这是第%s条消息'%(i+1)    #参数1:topic,参数2:消息value,bytes类型,参数3:消息key,bytes类型,可不传    producer.send('mytopic-01',msg.encode('utf-8'),str(i).decode())     print(msg)
②消费者
import kafka#参数1:选择要订阅的topic,参数2:kafka服务的ip:port或列表,参数auto_offset_reset:重置偏移量,有两个取值,latest表示读取消息队列中最新的消息,另一个取值earliest表示读取最早的消息。consumer = kafka.KafkaConsumer("mytopic-01", bootstrap_servers=["localhost:9092"], auto_offset_reset='latest')#常用属性或方法:#1.获取该服务的所有topics,返回topic的set  print(consumer.topics())  #{'mytopic-01', 'mytopic-02'}#2.获取某一个topic的partitions,返回partitions的setprint(consumer.partitions_for_topic('mytopic-02'))   #{0, 1, 2}#3.返回当前订阅的topic的TopicPartition对象,返回类型:setprint(consumer.assignment())   #{TopicPartition(topic='mytopic-02', partition=1), TopicPartition(topic='mytopic-02', partition=2), TopicPartition(topic='mytopic-02', partition=0)}#4.订阅要消费的多个topicconsumer.subscribe(('mytopic-02','mytopic-01'))#5.自动消费:这是一个阻塞的过程,当生产者有消息传来的时候,就会读取消息,若是没有消息就会阻塞等待for msg in consumer:  #msg是ConsumerRecord对象,有topic,partition,offset,key,value等属性    key = msg.key.decode(encoding="utf-8")               #因为接收到的数据时bytes类型,因此需要解码    value = msg.value.decode(encoding="utf-8")    print("%s-%d-%d key=%s value=%s" % (msg.topic, msg.partition, msg.offset, key, value))#6.手动消费:手动消费topic的消息,如果获取不到是{},如获取到{TopicPartition:ConsumerRecord},timeout_ms:轮询等待的毫秒数,默认0msg=consumer.poll(timeout_ms=5) #7.暂时挂起或释放(可选择不消费某一topic分区)consumer.pause(TopicPartition(topic='mytopic-02', partition=0))consumer.resume(TopicPartition(topic='mytopic-02', partition=0))


0 个回复

您需要登录后才可以回帖 登录 | 加入黑马