1. 概述 本文主要分享 SkyWalking DataCarrier 异步处理库。 基于生产者消费者的模式,大体结构如下图: - 实际项目中,没有 Producer 这个类。所以本文提到的 Producer ,更多的是一种角色。
下面我们来看看整体的项目结构,如下图所示 : 2. bufferorg.skywalking.apm.commons.datacarrier.buffer 包,主要包含 Channels 、Buffer 两个类。Channels 是 Buffer 数组的封装。 2.1 Bufferorg.skywalking.apm.commons.datacarrier.buffer.Buffer ,缓存区。 Buffer 在保存数据时,把 buffer 作为一个 “环“,使用 index 记录最后存储的位置,不断向下,循环存储到 buffer 中。通过这样的方式,带来良好的存储性能,避免扩容问题。But ,存储会存在冲突的问题:buffer 写入位置,暂未被消费,已经存在值。此时,根据不同的 BufferStrategy 进行处理。整体流程见 #save(data) 方法。 2.2 Channelsorg.skywalking.apm.commons.datacarrier.buffer.Channels ,内嵌多个 Buffer 的通道。 Channels 在保存数据时,相比 Buffer ,从 buffer 变成了多 buffer ,因此需要先选一个buffer 。通过使用不同的 IDataPartitioner 实现类,进行 Buffer 的选择。当缓冲策略为BufferStrategy.IF_POSSIBLE 时,根据 IDataPartitioner 定义的重试次数,进行多次保存数据直到成功。整体流程见 #save(data) 方法。 3. partitionIDataPartitioner 目前有两个子类实现: 4. consumerorg.skywalking.apm.commons.datacarrier.consumer 包,主要包含 ConsumerPool 、ConsumerThread 、IConsumer 三个类。 - ConsumerThread 使用 IConsumer ,消费数据
- ConsumerPool 是 ConsumerThread 的线程池封装
4.1 IConsumerorg.skywalking.apm.commons.datacarrier.consumer.IConsumer ,消费者接口。定义了如下方法: 4.2 ConsumerThreadorg.skywalking.apm.commons.datacarrier.consumer.ConsumerThread ,继承 java.lang.Thread ,消费线程。 - 第 78 至 88 行:不断消费,直到线程关闭( #shutdown() )。
- 第 80 行:调用 #consume() 方法,批量消费数据。
- 第 82 至 87 行:当未消费到数据,说明 dataSources 为空,等待 20 ms ,避免 CPU 空跑。
- 第 93 行:当线程关闭,调用 #consume() 方法,消费完 dataSources 剩余的数据。
- 第 95 行:调用 IConsumer#onExit() 方法,处理当消费结束。
- 第 107 至 117 行:从 dataSources 中,获取要消费的数据。
- 第 120 至 126 行:当有数据可消费时,调用 IConsumer#consume(List<T>) 方法。当消费发生异常时,调用 IConsumer#onError(List<T>, Throwable) 方法。
- 第 127 行:返回是否有消费数据。
4.3 ConsumerPoolorg.skywalking.apm.commons.datacarrier.consumer.ConsumerPool ,消费者池,提供了对 Channels 启动指定数量的 ConsumerThread 进行消费。 #begin() 方法,启动 ConsumerPool ,进行数据消费。代码如下: - 第 97 至 99 行:正在运行中,直接返回。
- 第 101 行:获得锁。
- 第 104 行:调用 #allocateBuffer2Thread() 方法,将 channels 的多个 Buffer ,分配给consumerThreads 的多个 ConsumerThread。
- 第 107 至 109 行:启动每个 ConsumerThread ,开始消费。
- 第 112 行:标记正在运行中。
- 第 114 行:释放锁。
- 第 168 行:获得锁。
- 第 169 行:标记不在运行中。
- 第 170 至 172 行:关闭每个 ConsumerThread ,结束消费。
- 第 174 行:释放锁。
4. DataCarrierorg.skywalking.apm.commons.datacarrier.DataCarrier ,DataCarrier 异步处理库的入口程序。通过创建 DataCarrier 对象,使用生产者消费者的模式,执行异步执行逻辑。 channels 属性,数据通道。在构造方法中,我们可以看到默认使用 SimpleRollingPartitioner 作为数据分区分配者,使用 BufferStrategy.BLOCKING 作为缓冲策略。 - bufferSize 方法参数,缓冲区大小。
设置消费者和消费线程数量: 生产消息 关闭消费
|