A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

时间轮算法
时间轮是一种高效、低延迟的调度数据结构。其在Linux内核中广泛使用,是Linux内核定时器的实现方法和基础之一。按使用场景,大致可以分为两种时间轮:原始时间轮和分层时间轮。分层时间轮是原始时间轮的升级版本,来应对时间“槽”数量比较大的情况,对内存和精度都有很高要求的情况。延迟任务的场景一般只需要用到原始时间轮就可以了。
代码案例
推荐使用Netty提供的HashedWheelTimer工具类来实现延迟任务。
引入依赖:
<dependency>      <groupId>io.netty</groupId>      <artifactId>netty-common</artifactId>      <version>4.1.23.Final</version></dependency>
红包过期队列信息:
/** * 红包过期队列信息 */public class RedPacketTimerTask implements TimerTask {    private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");    /**     * 红包 ID     */    private final long redPacketId;    /**     * 创建时间戳     */    private final long timestamp;    public RedPacketTimerTask(long redPacketId) {        this.redPacketId = redPacketId;        this.timestamp = System.currentTimeMillis();    }    @Override    public void run(Timeout timeout) {        //异步处理任务        System.out.println(String.format("任务执行时间:%s,红包创建时间:%s,红包ID:%s",                LocalDateTime.now().format(F), LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F), redPacketId));    }}
测试用例:
/** * 基于 netty 的时间轮算法 HashedWheelTimer 实现的延迟任务 */public class RedPacketHashedWheelTimer {    private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");    public static void main(String[] args) throws Exception {        ThreadFactory factory = r -> {            Thread thread = new Thread(r);            thread.setDaemon(true);            thread.setName("RedPacketHashedWheelTimerWorker");            return thread;        };        /**         * @param tickDuration - 每tick一次的时间间隔         * @param unit - tickDuration 的时间单位         * @param ticksPerWheel - 时间轮中的槽数         * @param leakDetection - 检查内存溢出         */        Timer timer = new HashedWheelTimer(factory, 1,                                           TimeUnit.SECONDS, 100,true);        System.out.println(String.format("开始任务时间:%s",LocalDateTime.now().format(F)));        for(int i=1;i<10;i++){            TimerTask timerTask = new RedPacketTimerTask(i);            timer.newTimeout(timerTask, i, TimeUnit.SECONDS);        }        Thread.sleep(Integer.MAX_VALUE);    }}
打印任务执行日志:
开始任务时间:2020-02-12 15:22:23.404任务执行时间:2020-02-12 15:22:25.410,红包创建时间:2020-02-12 15:22:23.409,红包ID:1任务执行时间:2020-02-12 15:22:26.411,红包创建时间:2020-02-12 15:22:23.414,红包ID:2任务执行时间:2020-02-12 15:22:27.424,红包创建时间:2020-02-12 15:22:23.414,红包ID:3任务执行时间:2020-02-12 15:22:28.410,红包创建时间:2020-02-12 15:22:23.414,红包ID:4任务执行时间:2020-02-12 15:22:29.411,红包创建时间:2020-02-12 15:22:23.414,红包ID:5任务执行时间:2020-02-12 15:22:30.409,红包创建时间:2020-02-12 15:22:23.414,红包ID:6任务执行时间:2020-02-12 15:22:31.411,红包创建时间:2020-02-12 15:22:23.414,红包ID:7任务执行时间:2020-02-12 15:22:32.409,红包创建时间:2020-02-12 15:22:23.414,红包ID:8任务执行时间:2020-02-12 15:22:33.411,红包创建时间:2020-02-12 15:22:23.414,红包ID:9源码相关
其核心是workerThread线程,主要负责每过tickDuration时间就累加一次tick。同时也负责执行到期的timeout任务以及添加timeout任务到指定的wheel中。
构造方法:
public HashedWheelTimer(            ThreadFactory threadFactory,            long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,            long maxPendingTimeouts) {        if (threadFactory == null) {            throw new NullPointerException("threadFactory");        }        if (unit == null) {            throw new NullPointerException("unit");        }        if (tickDuration <= 0) {            throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);        }        if (ticksPerWheel <= 0) {            throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);        }        // Normalize ticksPerWheel to power of two and initialize the wheel.        wheel = createWheel(ticksPerWheel);        mask = wheel.length - 1;        // Convert tickDuration to nanos.        this.tickDuration = unit.toNanos(tickDuration);        // Prevent overflow.        if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {            throw new IllegalArgumentException(String.format(                    "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",                    tickDuration, Long.MAX_VALUE / wheel.length));        }        //这里-爪洼笔记        workerThread = threadFactory.newThread(worker);        leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;        this.maxPendingTimeouts = maxPendingTimeouts;        if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&            WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {            reportTooManyInstances();        }}
新增任务,创建即启动:
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {        if (task == null) {            throw new NullPointerException("task");        }        if (unit == null) {            throw new NullPointerException("unit");        }        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {            pendingTimeouts.decrementAndGet();            throw new RejectedExecutionException("Number of pending timeouts ("                + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "                + "timeouts (" + maxPendingTimeouts + ")");        }        //这里-爪洼笔记        start();        // Add the timeout to the timeout queue which will be processed on the next tick.        // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;        // Guard against overflow.        if (delay > 0 && deadline < 0) {            deadline = Long.MAX_VALUE;        }        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);        timeouts.add(timeout);        return timeout;}
线程启动:
   /**     * Starts the background thread explicitly.  The background thread will     * start automatically on demand even if you did not call this method.     *     * @throws IllegalStateException if this timer has been     *                               {@linkplain #stop() stopped} already     */    public void start() {        switch (WORKER_STATE_UPDATER.get(this)) {            case WORKER_STATE_INIT:                if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {                    workerThread.start();                }                break;            case WORKER_STATE_STARTED:                break;            case WORKER_STATE_SHUTDOWN:                throw new IllegalStateException("cannot be started once stopped");            default:                throw new Error("Invalid WorkerState");        }        // Wait until the startTime is initialized by the worker.        while (startTime == 0) {            try {                startTimeInitialized.await();            } catch (InterruptedException ignore) {                // Ignore - it will be ready very soon.            }        }    }
执行相关操作:
public void run() {            // Initialize the startTime.            startTime = System.nanoTime();            if (startTime == 0) {                // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.                startTime = 1;            }            // Notify the other threads waiting for the initialization at start().            startTimeInitialized.countDown();            do {                final long deadline = waitForNextTick();                if (deadline > 0) {                    int idx = (int) (tick & mask);                    processCancelledTasks();                    HashedWheelBucket bucket =                            wheel[idx];                    transferTimeoutsToBuckets();                    bucket.expireTimeouts(deadline);                    tick++;                }            } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);            // Fill the unprocessedTimeouts so we can return them from stop() method.            for (HashedWheelBucket bucket: wheel) {                bucket.clearTimeouts(unprocessedTimeouts);            }            for (;;) {                HashedWheelTimeout timeout = timeouts.poll();                if (timeout == null) {                    break;                }                if (!timeout.isCancelled()) {                    unprocessedTimeouts.add(timeout);                }            }            processCancelledTasks();}小结
以上方案并没有实现持久化和分布式,生产环境可根据实际业务需求选择使用。
源码
https://gitee.com/52itstyle/spring-boot-seckill

以上文章转载于网络


0 个回复

您需要登录后才可以回帖 登录 | 加入黑马