概述
业务中经常会遇到一些延迟执行的需求;通常想到的都是
但是系统中不一定集成了
那么用
简单的来说就是消费者生产了一个消息任务,塞到
redisson延时任务机制简述
生产者先将任务
这里具体要说的就是客户端通知和获取机制;
消费者在启动时通常都会去
RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue("SANYOU");
RDelayedQueue<String> delayQueue = redissonClient.getDelayedQueue(blockingQueue);
这样做的目的:
消费者订阅队列,从
这样有一个好处就是不用一直
另外由于客户端都是用
捞一张图片

代码Demo
import cn.hutool.extra.spring.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
public class RedissonDelayQueueConfig implements InitializingBean {
@Resource
private RedissonClient redissonClient;
//延时队列map
private final Map<String, RDelayedQueue<DelayMessageDTO>> delayQueueMap = new ConcurrentHashMap<>(16);
/**
* 消费者初始化所有队列,订阅对应的队列,并开启第一个过期任务的过期时间对应的延迟任务
*/
@PostConstruct
public void reScheduleDelayedTasks() {
DelayQueueEnum[] queueEnums = DelayQueueEnum.values();
for (DelayQueueEnum queueEnum : queueEnums) {
RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueEnum.getCode());
RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
}
}
@Override
public void afterPropertiesSet() {
// 有新的延迟队列在这里添加,队列消费类需要继承DelayQueueConsumer
DelayQueueEnum[] queueEnums = DelayQueueEnum.values();
for (DelayQueueEnum queueEnum : queueEnums) {
DelayQueueConsumer delayQueueConsumer = SpringUtil.getBean(queueEnum.getBeanName());
if (delayQueueConsumer == null) {
throw new ServiceException("queueName=" + queueEnum.getBeanName() + ",delayQueueConsumer=null,请检查配置...");
}
// Redisson的延时队列是对另一个队列的再包装,使用时要先将延时消息添加到延时队列中,当延时队列中的消息达到设定的延时时间后,
// 该延时消息才会进行进入到被包装队列中,因此,我们只需要对被包装队列进行监听即可。
RBlockingQueue<DelayMessageDTO> rBlockingQueue = redissonClient.getBlockingDeque(queueEnum.getCode());
//消费者初始化队列
RDelayedQueue<DelayMessageDTO> rDelayedQueue = redissonClient.getDelayedQueue(rBlockingQueue);
//set到map中方便获取
delayQueueMap.put(queueEnum.getCode(), rDelayedQueue);
// 订阅新元素的到来,调用的是takeAsync(),异步执行
rBlockingQueue.subscribeOnElements(delayQueueConsumer::execute);
}
}
public RedissonClient getRedissonClient() {
return redissonClient;
}
public Map<String, RDelayedQueue<DelayMessageDTO>> getDelayQueueMap() {
return delayQueueMap;
}
}
import cn.hutool.core.date.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class DelayQueueUtil {
private static RedissonDelayQueueConfig redissonDelayQueueConfig;
@Resource
public void setRedissonDelayQueueConfig(RedissonDelayQueueConfig redissonDelayQueueConfig) {
DelayQueueUtil.redissonDelayQueueConfig = redissonDelayQueueConfig;
}
private static Map<String, RDelayedQueue<DelayMessageDTO>> getDelayQueueMap() {
if(null == redissonDelayQueueConfig) return Collections.emptyMap();
return redissonDelayQueueConfig.getDelayQueueMap();
}
private static RedissonClient getRedissonClient() {
if(null == redissonDelayQueueConfig) return null;
return redissonDelayQueueConfig.getRedissonClient();
}
/**
* 添加延迟消息
*/
public static void addDelayMessage(DelayMessageDTO delayMessage) {
log.info("delayMessage={}", delayMessage);
Assert.isTrue(getDelayQueueMap().containsKey(delayMessage.getQueueName()), "队列不存在");
delayMessage.setCreateTime(DateUtil.now());
if(null == delayMessage.getTimeUnit()){
delayMessage.setTimeUnit(TimeUnit.SECONDS);
}
RDelayedQueue<DelayMessageDTO> rDelayedQueue = getDelayQueueMap().get(delayMessage.getQueueName());
//移除相同的消息
rDelayedQueue.remove(delayMessage);
//添加消息
rDelayedQueue.offer(delayMessage, delayMessage.getDelayTime(), delayMessage.getTimeUnit());
}
/**
* 移除指定队列中的消息
*/
public static void removeDelayMessage(DelayMessageDTO delayMessage) {
log.info("取消:delayMessage={}", delayMessage);
if (!getDelayQueueMap().containsKey(delayMessage.getQueueName())) {
log.error("queueName={},该延迟队列不存在,请确认后再试...", delayMessage.getQueueName());
return;
}
RDelayedQueue<DelayMessageDTO> rDelayedQueue = getDelayQueueMap().get(delayMessage.getQueueName());
rDelayedQueue.remove(delayMessage);
removeDelayQueue(delayMessage);
}
/**
* 从所有队列中删除消息
*/
public static void removeDelayQueue(DelayMessageDTO value) {
DelayQueueEnum[] queueEnums = DelayQueueEnum.values();
for (DelayQueueEnum queueEnum : queueEnums) {
RBlockingDeque<Object> blockingDeque = getRedissonClient().getBlockingDeque(queueEnum.getCode());
RDelayedQueue<Object> delayedQueue = getRedissonClient().getDelayedQueue(blockingDeque);
delayedQueue.remove(value);
}
}
}
参考了大佬的博文
https://lhalcyon.com/delay-task/index.html