一聚教程网:一个值得你收藏的教程网站

最新下载

热门教程

Redisson延迟队列的实现原理

时间:2026-06-16 09:05:56 编辑:袖梨 来源:一聚教程网

简介

上次发文是在5个月前,讲了一篇redisson分布式锁的实现原理,这次讲讲延迟队列的实现原理。

API的使用

blockingQueue = redissonClient.getBlockingQueue(name);
delayQueue = redissonClient.getDelayedQueue(blockingQueue);

我们可以看到是先获取了一个阻塞队列然后将其包装为一个延迟队列。

核心实现

一个延迟队列会在Redisson内部维护的channel和数据类型,外界无感知,它实际在内部维护了以下4个数据结构:

  • redisson_delay_queue_timeout:{name},sorted set数据类型,存放所有延迟任务,按照延迟任务的到期时间戳(提交任务时的时间戳 + 延迟时间)来排序的,所以列表的最前面的第一个元素就是整个延迟队列中最早要被执行的任务,这个概念很重要
  • redisson_delay_queue:{name},list数据类型,核心过程用不上,后文会讨论他的作用
  • {name},list数据类型,被称为目标队列,这个里面存放的任务都是已经到了延迟时间的,可以被消费者获取的任务,所以上面demo中的RBlockingQueue的take方法是从这个目标队列中获取到任务的
  • redisson_delay_queue_channel:{name},是一个channel,用来通知客户端开启一个延迟任务

初始化过程

首先当调用redissonClient.getDelayedQueue(blockingQueue)时候,其实是new了一个RedissonDelayedQueue,我们看一下他的构造方法。

protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
    super(codec, commandExecutor, name);
    // ... 初始化相关队列名称(channelName, queueName, timeoutSetName)...
    // 1. 创建QueueTransferTask子类实例(这里是匿名实现类,QueueTransferTask是抽象类)
    QueueTransferTask task = new (commandExecutor.getConnectionManager()) {
        @Override
        protected RFuture<Long> pushTaskAsync() {
            // 核心Lua脚本,负责转移到期任务
            return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                    "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); " +
                    "if #expiredValues > 0 then " +
                      "for i, v in ipairs(expiredValues) do " +
                         "local randomId, value = struct.unpack('dLc0', v);" +
                         "redis.call('rpush', KEYS[1], value);" +  // 转移到目标队列
                         "redis.call('lrem', KEYS[3], 1, v);" +   // 从备份队列移除
                      "end; " +
                      "redis.call('zrem', KEYS[2], unpack(expiredValues));" + // 从延迟队列移除
                    "end; " +
                    // 返回下一个任务的到期时间
                    "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); " +
                    "if v[1] ~= nil then " +
                       "return v[2]; " +
                    "end " +
                    "return nil;",
                    Arrays.asList(getRawName(), timeoutSetName, queueName),
                    System.currentTimeMillis(), 100);
        }
        
        @Override
        protected RTopic getTopic() {
            return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, channelName);
        }
    };
    // 2. 启动任务
    queueTransferService.schedule(queueName, task);
}

可以看到客户端会创建一个延迟任务QueueTransferTask。这个延迟任务是redisson内部维护的,这个延迟任务会向Redis Server发送一段lua脚本,Redis执行lua脚本中的命令,并且是原子性的。这段lua脚本主要干了两件事:

  • 将到了延迟时间的任务从Zsetredisson_delay_queue_timeout:{name}中移除,存到List类型的{name}这个目标队列
  • 获取到Zsetredisson_delay_queue_timeout:{name}中目前最早到过期时间的延迟任务的到期时间戳,并返回给客户端。(这里埋下伏笔,思考为什么需要这个到期时间戳)

最后调用了schedule,我们再来看一下这个的源码。


public class QueueTransferService {
    private final Map<String, QueueTransferTask> tasks = new ConcurrentHashMap();
  	//...
    public void schedule(String name, QueueTransferTask task) {
        this.tasks.compute(name, (k, t) -> {
            if (t == null) {
                task.start();
                return task;
            } else {
                t.incUsage();
                return t;
            }
        });
    }
    //....
}

可以看到,执行逻辑是先从记录所有task的Map中获取是否有同名的,如果有就增加计数,说明该任务已经被初始化过监听器了直接跳过。如果没有就调用task的start,为task设置监听器。

在前文提到这个task其实是QueueTransferTask的子类,这里的start其实调用的是父类QueueTransferTask的start方法。

public abstract class QueueTransferTask {
    //.....
    //开启这个延迟任务,初始化2个监听器
    public void start() {
        RTopic schedulerTopic = this.getTopic();
        this.statusListenerId = schedulerTopic.addListener(new BaseStatusListener() {
            //当连接时候调用一次pushTask,后文会提及作用
            public void onSubscribe(String channel) {
                QueueTransferTask.this.pushTask();
            }
        });
        this.messageListenerId = schedulerTopic.addListener(Long.class, new MessageListener<Long>() {
            //当从监听的channel中,监听到消息时候会调用此方法,这里后文会提及作用。
            public void onMessage(CharSequence channel, Long startTime) {
                QueueTransferTask.this.scheduleTask(startTime);
            }
        });
    }
    private void pushTask() {
        if (this.usage != 0) {
            RFuture<Long> startTimeFuture = this.pushTaskAsync();
            //这里res就是刚才返回的最早过期时间戳
            startTimeFuture.whenComplete((res, e) -> {
                //有异常就重试
                if (e != null) {
                    if (!this.serviceManager.isShuttingDown(e)) {
                        log.error(e.getMessage(), e);
                        this.scheduleTask(System.currentTimeMillis() + 5000L);
                    }
                } else {
                    if (res != null) {
                        //将时间戳传递给scheduleTask进行调度任务
                        this.scheduleTask(res);
                    }
                }
            });
        }
    }    //真正去规划做任务的逻辑
    private void scheduleTask(Long startTime) {
        if (this.usage != 0) {
            if (startTime != null) {
                TimeoutTask oldTimeout = (TimeoutTask)this.lastTimeout.get();
                if (oldTimeout != null) {
                    oldTimeout.getTask().cancel();
                }                long delay = startTime - System.currentTimeMillis();
                if (delay > 10L) {
                    //使用netty时间轮,安排任务
                    Timeout timeout = this.serviceManager.newTimeout(new TimerTask() {
                        public void run(Timeout timeout) throws Exception {
                            //执行刚才匿名实现的task的pushTaskAsync方法
                            QueueTransferTask.this.pushTask();
                            TimeoutTask currentTimeout = (TimeoutTask)QueueTransferTask.this.lastTimeout.get();
                            if (currentTimeout != null && currentTimeout.getTask() == timeout) {
                                QueueTransferTask.this.lastTimeout.compareAndSet(currentTimeout, (Object)null);
                            }                        }
                    }, delay, TimeUnit.MILLISECONDS);
                    this.lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout));
                } else {
                    this.pushTask();
                }            }
        }
    }}

start方法主要是设置了指定主题(主题名:redisson_delay_queue_channel:{name})两个发布订阅的监听器。

pushTask方法会调用pushTaskAsync方法,即执行一次前文提到的lua脚本。

  1. 当指定主题有新订阅时调用 pushTask() 方法

    • BaseStatusListener:这是一个连接状态监听器。当客户端与 Redis Server 成功建立订阅连接时,也就是说项目启动的时候会执行一次客户端延迟任务,它的 onSubscribe方法会被触发,并立即执行一次 pushTask()
    • 这是因为项目在重启时,由于没有客户端延迟任务的执行,可能会出现redisson_delay_queue_timeout:{name}队列中有到期但是没有被放到目标队列的可能,重启就执行一次就是为了保证到期的数据能被及时放到目标队列中。
  2. 当指定主题有新消息时调用 scheduleTask(startTime) 方法。它的作用是:

    • 计算延迟时间:接着,它用传入的 startTime(即下一个任务的到期时间戳)减去当前时间戳,得出需要延迟的时间 delay

    • 安排新任务:然后,根据计算出的延迟时间做出决策:

      • 如果延迟时间 > 10毫秒:说明任务还需要等待一段时间才到期。此时,它会利用 Netty 的 HashedWheelTimer(时间轮) 提交一个一次性的定时任务。这个定时任务设定的延迟时间就是刚刚计算出的 delay毫秒。时间轮到点后,会触发执行 pushTask()方法,从而进行任务转移 。
      • 如果延迟时间 <= 10毫秒:说明任务已经到期,或者即将在瞬间到期(10毫秒被认为是可立即执行的阈值)。此时,它会立即执行 pushTask()方法,而不是等待定时器,以保证任务能被尽快处理 。
    • 取消旧任务:首先,它会检查是否存在之前已经安排但尚未执行的定时任务(oldTimeout)。如果存在,就将其取消。这是为了确保总是执行最新的、最早到期的任务,避免旧的任务干扰调度 (这一点和channel有关,后文会解释)

回收伏笔,这也就解释了为什么要将这个最早快要到过期时间的时间戳返回来。

可以认为QueueTransferTask不是一个死板的闹钟到点了即使没有什么任务也会吵你,而是一个灵活的秘书,到达重要时刻就会提醒你参加重要活动。

比如上述所说的将最快过期的时间戳返回给客户端,客户端通过scheduleTask使用这个时间戳开启一个时间轮,让客户端阻塞到达这个时间戳,一旦到达这个时间戳说明redisson_delay_queue_timeout:{name}中上面说的最早到过期时间的任务已经到期了,于是客户端开始执行lua脚本操作,及时将到了延迟时间的任务放到目标队列中。然后再次发布剩余的延迟任务中最早到期的任务到期时间戳到channe中,如此循环往复,一直运行下去,保证redisson_delay_queue_timeout:{name}中到期的数据能及时放到目标队列中。

所以,上述说了一大堆的主要的作用就是保证到了延迟时间的任务能够及时被放到目标队列。

发送延迟任务

生产者在提交任务的时候调用delayQueue.offer时候翻看源码最后会调用到offerAsync方法。

其实是将任务放入到了Zset类型的redisson_delay_queue_timeout:{name}中,分数就是提交任务的时间戳+延迟时间,就是延迟任务的到期时间戳。这段lua脚本会将数据原子性的放入Zset的redisson_delay_queue_timeout:{name}并放一份一样的到List的redisson_delay_queue:{name},然后发送消息到channel中,带上延迟的时间戳。

@Override
public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
  	if (delay < 0) {
   		throw new IllegalArgumentException("Delay can't be negative");
  	}
  	long delayInMs = timeUnit.toMillis(delay);
  	long timeout = System.currentTimeMillis() + delayInMs;
  	long randomId = ThreadLocalRandom.current().nextLong();
  	return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
	//把 `timeout`(过期时间戳)、`randomId`(随机ID)和 `encode(e)`(任务真实数据)打包成了一个二进制的 `value`。
	//即使你添加了两个一模一样的任务(内容一样、时间一样),因为有 `randomId`,打包后的 `value` 也是不一样的。这就允许队列里存在重复的任务。
 	 "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" 
  	+ "redis.call('zadd', KEYS[2], ARGV[1], value);"
	//存入redisson_delay_queue:{name},注意这里并不是存入结果List!!
	//用于保证 `RDelayedQueue` 这个对象本身有数据可查(比如你调用 `contains()` 或 `iterator()` 时,就是查这里)。
  	+ "redis.call('rpush', KEYS[3], value);"
	// 取出 ZSet 里排第一(最早要执行)的任务
  	+ "local v = redis.call('zrange', KEYS[2], 0, 0); "
	//判断:刚才新加的这个任务(value),是不是就是那个排第一的(v[1])?
  	+ "if v[1] == value then "
	//如果是,说明新任务插队成功,成为了最早的任务!发送 PUBLISH 通知客户端:"嘿,最早的时间变了,快调整闹钟!"
  	+ "redis.call('publish', KEYS[4], ARGV[1]); "
  	+ "end;",
  	Arrays.<Object>asList(getRawName(), timeoutSetName, queueName, channelName),
  	timeout, randomId, encode(e));
}

为什么要发送这个channel?取消任务的伏笔回收。比如此时有一个任务是1小时后才需要执行,客户端会阻塞到1小时之后。那此时来了一个任务是10秒后执行,可是客户端已经在阻塞了咋办?所以就需要这个channel。前文提到一旦监听器监听到消息时候就会把这个channel带的时间戳发送给scheduleTask去执行。此时发现下一次的时间戳久于当前这个目标时间戳。那么就会取消掉这个任务。总结:就是它就是用于处理 “插队” 情况。如果没有这段代码:消费者会傻傻地睡到原定时间(比如10:00),导致那个新插入队 09:00 的新任务被推迟整整一小时才执行。有了这段代码,消费者能实时感知到“任务变了”,动态调整自己的睡眠时间,保证延迟任务的准时性。

至此,讲清楚了4个数据结构他们各自的本职工作。

总结

来看看整体的运行原理图

有错误欢迎随时指出。

热门栏目