A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

© 冷面钢铁侠 初级黑马   /  2019-9-28 18:34  /  1714 人查看  /  0 人回复  /   0 人收藏 转载请遵从CC协议 禁止商业使用本文

Kafka常用术语
Broker:Kafka的服务端即Kafka实例,Kafka集群由一个或多个Broker组成,主要负责接收和处理客户端的请求
Topic:主题,Kafka承载消息的逻辑容器,每条发布到Kafka的消息都有对应的逻辑容器,工作中多用于区分业务
Partition:分区,是物理概念,代表有序不变的消息序列,每个Topic由一个或多个Partion组成
Replica:副本,Kafka中同一条消息拷贝到多个地方做数据冗余,这些地方就是副本,副本分为Leader和Follower,角色不同作用不同,副本是对Partition而言的,每个分区可配置多个副本来实现高可用
Record:消息,Kafka处理的对象
Offset:消息位移,分区中每条消息的位置信息,是单调递增且不变的值
Producer:生产者,向主题发送新消息的应用程序
Consumer:消费者,从主题订阅新消息的应用程序
Consumer Offset:消费者位移,记录消费者的消费进度,每个消费者都有自己的消费者位移
Consumer Group:消费者组,多个消费者组成一个消费者组,同时消费多个分区来实现高可用(组内消费者的个数不能多于分区个数以免浪费资源)
Reblance:重平衡,消费组内消费者实例数量变更后,其他消费者实例自动重新分配订阅主题分区的过程
下面用一张图展示上面提到的部分概念(用PPT画的图,太费劲了,画了老半天,有好用的画图工具欢迎推荐)
消息生产流程
先来个KafkaProducer的小demo
public static void main(String[] args) throws ExecutionException, InterruptedException {        if (args.length != 2) {            throw new IllegalArgumentException("usage: com.ding.KafkaProducerDemo bootstrap-servers topic-name");        }        Properties props = new Properties();        // kafka服务器ip和端口,多个用逗号分割        props.put("bootstrap.servers", args[0]);        // 确认信号配置        // ack=0 代表producer端不需要等待确认信号,可用性最低        // ack=1 等待至少一个leader成功把消息写到log中,不保证follower写入成功,如果leader宕机同时follower没有把数据写入成功        // 消息丢失        // ack=all leader需要等待所有follower成功备份,可用性最高        props.put("ack", "all");        // 重试次数        props.put("retries", 0);        // 批处理消息的大小,批处理可以增加吞吐量        props.put("batch.size", 16384);        // 延迟发送消息的时间        props.put("linger.ms", 1);        // 用来换出数据的内存大小        props.put("buffer.memory", 33554432);        // key 序列化方式        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");        // value 序列化方式        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");        // 创建KafkaProducer对象,创建时会启动Sender线程        Producer<String, String> producer = new KafkaProducer<>(props);        for (int i = 0; i < 100; i++) {            // 往RecordAccumulator中写消息            Future<RecordMetadata> result = producer.send(new ProducerRecord<>(args[1], Integer.toString(i), Integer.toString(i)));            RecordMetadata rm = result.get();            System.out.println("topic: " + rm.topic() + ", partition: " +  rm.partition() + ", offset: " + rm.offset());        }        producer.close();    }实例化
KafkaProducer构造方法主要是根据配置文件进行一些实例化操作
1.解析clientId,若没有配置则由是producer-递增的数字
2.解析并实例化分区器partitioner,可以实现自己的partitioner,比如根据key分区,可以保证相同key分到同一个分区,对保证顺序很有用。若没有指定分区规则,采用默认的规则(消息有key,对key做hash,然后对可用分区取模;若没有key,用随机数对可用分区取模【没有key的时候说随机数对可用分区取模不准确,counter值初始值是随机的,但后面都是递增的,所以可以算到roundrobin】)
3.解析key、value的序列化方式并实例化
4.解析并实例化拦截器
5.解析并实例化RecordAccumulator,主要用于存放消息(KafkaProducer主线程往RecordAccumulator中写消息,Sender线程从RecordAccumulator中读消息并发送到Kafka中)
6.解析Broker地址
7.创建一个Sender线程并启动
...this.sender = newSender(logContext, kafkaClient, this.metadata);this.ioThread = new KafkaThread(ioThreadName, this.sender, true);this.ioThread.start();...消息发送流程
消息的发送入口是KafkaProducer.send方法,主要过程如下
KafkaProducer.sendKafkaProducer.doSend// 获取集群信息KafkaProducer.waitOnMetadata // key/value序列化key\value serialize// 分区KafkaProducer.partion// 创建TopciPartion对象,记录消息的topic和partion信息TopicPartition// 写入消息RecordAccumulator.applend// 唤醒Sender线程Sender.wakeupRecordAccumulator
RecordAccumulator是消息队列用于缓存消息,根据TopicPartition对消息分组
重点看下RecordAccumulator.applend追加消息的流程
// 记录进行applend的线程数appendsInProgress.incrementAndGet();// 根据TopicPartition获取或新建Deque双端队列Deque<ProducerBatch> dq = getOrCreateDeque(tp);...private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {    Deque<ProducerBatch> d = this.batches.get(tp);    if (d != null)        return d;    d = new ArrayDeque<>();    Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d);    if (previous == null)        return d;    else        return previous;}// 尝试将消息加入到缓冲区中// 加锁保证同一个TopicPartition写入有序synchronized (dq) {    if (closed)        throw new KafkaException("Producer closed while send in progress");    // 尝试写入    RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);    if (appendResult != null)        return appendResult;}private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque<ProducerBatch> deque) {    // 从双端队列的尾部取出ProducerBatch    ProducerBatch last = deque.peekLast();    if (last != null) {        // 取到了,尝试添加消息        FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());        // 空间不够,返回null        if (future == null)            last.closeForRecordAppends();        else            return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);    }    // 取不到返回null    return null;}public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {    // 空间不够,返回null    if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {        return null;    } else {        // 真正添加消息        Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);        ...        FutureRecordMetadata future = ...        // future和回调callback进行关联            thunks.add(new Thunk(callback, future));        ...        return future;    }}// 尝试applend失败(返回null),会走到这里。如果tryApplend成功直接返回了// 从BufferPool中申请内存空间,用于创建新的ProducerBatchbuffer = free.allocate(size, maxTimeToBlock);synchronized (dq) {    // 注意这里,前面已经尝试添加失败了,且已经分配了内存,为何还要尝试添加?    // 因为可能已经有其他线程创建了ProducerBatch或者之前的ProducerBatch已经被Sender线程释放了一些空间,所以在尝试添加一次。这里如果添加成功,后面会在finally中释放申请的空间    RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);    if (appendResult != null) {        return appendResult;    }    // 尝试添加失败了,新建ProducerBatch    MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);    ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());    FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));    dq.addLast(batch);    incomplete.add(batch);    // 将buffer置为null,避免在finally汇总释放空间    buffer = null;    return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);}finally {

0 个回复

您需要登录后才可以回帖 登录 | 加入黑马