成功最有效的方法就是向有经验的人学习!

RabbitMQ 延迟队列的实现

1.为什么需要使用延迟队列?适用于什么场景?
场景一:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。
这样类似的需求是我们经常会遇见的问题。最常用的方法是定期轮训数据库,设置状态。在数据量小的时候并没有什么大的问题,但是数据量一大轮训数据库的方式就会变得特别耗资源。当面对千万级、上亿级数据量时,本身写入的IO就比较高,导致长时间查询或者根本就查不出来。通过使用延迟队列来解决这种问题

2.使用RabbitMQ来实现延迟任务必须先了解RabbitMQ的两个概念:消息的Time To Live(TTL)和Dead Letter Exchanges(DLX),利用两者的组合来实现延迟队列
简述一下:A.消息的TTL就是消息的存活时间,B.DLX是死信路由
实现原理:先发送一个消息到队列中,设置存活时间,超时后会转发到死信路由中,客户端消费死信路由中的消息,消息中包装好需要转发的队列名,再根据此队列名发送消息,这样间接中转的方式实现了延迟队列

4.具体代码实现,新建SpringBoot项目,添加 amqp 引用

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.47</version>
</dependency>

5.在配置文件application.properties中配置好mq的连接地址

#rabbitmq
spring.rabbitmq.host=127.0.0.1   
spring.rabbitmq.port=5672  
spring.rabbitmq.username=root
spring.rabbitmq.password=root
spring.rabbitmq.virtual-host=ykh_vhosts

6.创建配置类,使用配置文件中的连接

/**
 * 读取application.properties中的连接配置
 */
@Configuration
public class RabbitMQConfiguration {

    private static Logger logger = Logger.getLogger("RabbitMQConfiguration");

    @Value("${spring.rabbitmq.host}")
    public String host;

    @Value("${spring.rabbitmq.port}")
    public int port;

    @Value("${spring.rabbitmq.username}")
    public String username;

    @Value("${spring.rabbitmq.password}")
    public String password;

    @Value("${spring.rabbitmq.virtual-host}")
    public String virtualHost;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirms(true);
        logger.info("Create ConnectionFactory bean ..");
        return connectionFactory;
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }
}

7.创建一个常量类,定义队列名称

/**
 * Rabbit消息队列相关常量
 */
public final class MQConstant {

    private MQConstant(){}

    //exchange name
    public static final String DEFAULT_EXCHANGE = "ZyChange";

    //TTL QUEUE
    public static final String DEFAULT_DEAD_LETTER_QUEUE_NAME = "zy.dead.letter.queue";

    //DLX repeat QUEUE 死信转发队列
    public static final String DEFAULT_REPEAT_TRADE_QUEUE_NAME = "zy.repeat.trade.queue";

    //Hello 测试消息队列名称
    public static final String HELLO_QUEUE_NAME = "HELLO";

}

8.创建一个队列配置类,作用是信道配置,队列配置,队列绑定

/**
 * 队列配置,所有配置@Bean的队列名称,由系统启动时创建队列,并绑定到Exchane上
 */
@Configuration
public class QueueConfiguration {

    //信道配置
    @Bean
    public DirectExchange defaultExchange() {
        return new DirectExchange(MQConstant.DEFAULT_EXCHANGE, true, false);
    }

    /*********************    业务队列定义与绑定 hello 测试    *****************/
    @Bean
    public Queue queue() {
        Queue queue = new Queue(MQConstant.HELLO_QUEUE_NAME,true);
        return queue;
    }

    @Bean
    public Binding binding() {
        //队列绑定到exchange上,再绑定好路由键
        return BindingBuilder.bind(queue()).to(defaultExchange()).with(MQConstant.HELLO_QUEUE_NAME);
    }
    /*********************    业务队列定义与绑定 hello 测试    *****************/

    //下面是延迟队列的配置
    //转发队列
    @Bean
    public Queue repeatTradeQueue() {
        Queue queue = new Queue(MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME,true,false,false);
        return queue;
    }
    //绑定转发队列
    @Bean
    public Binding  drepeatTradeBinding() {
        return BindingBuilder.bind(repeatTradeQueue()).to(defaultExchange()).with(MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME);
    }

    //死信队列  -- 消息在死信队列上堆积,消息超时时,会把消息转发到转发队列,转发队列根据消息内容再把转发到指定的队列上
    @Bean
    public Queue deadLetterQueue() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", MQConstant.DEFAULT_EXCHANGE);
        arguments.put("x-dead-letter-routing-key", MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME);
        Queue queue = new Queue(MQConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME,true,false,false,arguments);
        return queue;
    }
    //绑定死信队列
    @Bean
    public Binding  deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue()).to(defaultExchange()).with(MQConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME);
    }
}

9.创建消息生成接口和实现

public interface IMessageService {

    /**
     * 发送消息到队列
     * @param queueName 队列名称
     * @param message 消息内容
     */
    public void send(String queueName,String message);

    /**
     * 延迟发送消息到队列
     * @param queueName 队列名称
     * @param message 消息内容
     * @param times 延迟时间 单位毫秒
     */
    public void send(String queueName,String message,long times);
}

/**
 * 消息队列服务接口实现
 */
@Service("messageService")
public class MessageService implements IMessageService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息到队列
     * @param queueName 队列名称
     * @param message 消息内容
     */
    @Override
    public void send(String queueName, String message) {
        rabbitTemplate.convertAndSend(MQConstant.DEFAULT_EXCHANGE,queueName, message);
    }

    /**
     * 延迟发送消息到队列
     * @param queueName 队列名称
     * @param message 消息内容
     * @param times 延迟时间 单位毫秒
     */
    @Override
    public void send(String queueName, String message, long times) {
        //消息发送到死信队列上,当消息超时时,会发生到转发队列上,转发队列根据下面封装的queueName,把消息转发的指定队列上
        //发送前,把消息进行封装,转发时应转发到指定 queueName 队列上
        DLXMessage dlxMessage = new DLXMessage(MQConstant.DEFAULT_EXCHANGE,queueName,message,times);
        MessagePostProcessor processor = new MessagePostProcessor(){
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration(times + "");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(MQConstant.DEFAULT_EXCHANGE,MQConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME, JSON.toJSONString(dlxMessage), processor);
    }
}

10.DLXMessage是一个消息封装对象,很关键,发送延迟队列时,先把消息存在此对象中,在加上目的地队列名称,然后再发到死信队列中,当消息超时时,转发到转发队列,添加对转发队列的监听,消费转发队列,获取需要延迟发送的信息,该信息就是DLXMessage对象,这样就拿到了目的地队列名称,然后再发送一次消息,就完成了延迟队列的发送

/**
 * rabbit 死信消息载体
 */
public class DLXMessage implements Serializable {

    private static final long serialVersionUID = 9956432152000L;
    private String exchange;
    private String queueName;
    private String content;
    private long times;

    public DLXMessage() {
        super();
    }

    public DLXMessage(String queueName, String content, long times) {
        super();
        this.queueName = queueName;
        this.content = content;
        this.times = times;
    }

    public DLXMessage(String exchange, String queueName, String content, long times) {
        super();
        this.exchange = exchange;
        this.queueName = queueName;
        this.content = content;
        this.times = times;
    }

    public static long getSerialVersionUID() {
        return serialVersionUID;
    }

    public String getExchange() {
        return exchange;
    }

    public void setExchange(String exchange) {
        this.exchange = exchange;
    }

    public String getQueueName() {
        return queueName;
    }

    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public long getTimes() {
        return times;
    }

    public void setTimes(long times) {
        this.times = times;
    }
}

11.添加消息消费者监听,当有消息时进行消费

//监听hello队列,有消息时进行消费
@Component
@RabbitListener(queues = MQConstant.HELLO_QUEUE_NAME)
public class ReceiverMessage {

    @RabbitHandler
    public void process(String content) {
        System.out.println("接受时间:"+ System.currentTimeMillis());
        System.out.println("接受消息:" + content);
    }
}

//监听转发队列,有消息时,把消息转发到目标队列
@Component
@RabbitListener(queues = MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME)
public class ReceiverDelayMessage {

    @Autowired
    private IMessageService messageService;

    @RabbitHandler
    public void process(String content) {
        //此时,才把消息发送到指定队列,而实现延迟功能
        DLXMessage message = JSON.parseObject(content, DLXMessage.class);
        messageService.send(message.getQueueName(), message.getContent());
    }

}

12.测试,启动项目,会执行发送消息代码

/**
 * 启动启动时执行
 */
@Component
public class SysInitLoad implements ApplicationRunner {

    @Autowired
    private IMessageService messageService;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        System.out.println("发送时间:"+ System.currentTimeMillis());
        String message = "测试延迟消息";
        messageService.send(MQConstant.HELLO_QUEUE_NAME,message,6000);

        message = "测试普通消息";
        messageService.send(MQConstant.HELLO_QUEUE_NAME,message);
    }
}
赞(1) 打赏
未经允许不得转载:竹影清风阁 » RabbitMQ 延迟队列的实现
分享到

大佬们的评论 抢沙发

全新“一站式”建站,高质量、高售后的一条龙服务

微信 抖音 支付宝 百度 头条 快手全平台打通信息流

橙子建站.极速智能建站8折购买虚拟主机

觉得文章有用就打赏一下文章作者

非常感谢你的打赏,我们将继续给力更多优质内容,让我们一起创建更加美好的网络世界!

支付宝扫一扫

微信扫一扫

登录

找回密码

注册