RabbitMQ消息的可靠传输和防止消息丢失

news/2024/7/3 4:27:05

在Spring Cloud项目中,为了确保RabbitMQ消息的可靠传输和防止消息丢失,需要考虑以下几个方面:

  1. 消息持久化:确保消息在RabbitMQ中持久化。
  2. 队列持久化:确保队列是持久化的。
  3. 发布确认:使用发布确认机制确保消息发送到RabbitMQ。
  4. 消费者确认:确保消费者正确地确认消息。
  5. 重试机制:在消息消费失败时,设置重试机制。

下面详细介绍如何实现这些措施:

1. 添加依赖

确保在你的pom.xml中添加了Spring Boot和RabbitMQ的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2. 配置RabbitMQ

application.ymlapplication.properties文件中配置RabbitMQ:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-confirm-type: correlated
    publisher-returns: true

3. 定义配置类

创建一个配置类来配置队列、交换机和绑定:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    public static final String QUEUE_NAME = "myQueue";
    public static final String EXCHANGE_NAME = "myExchange";
    public static final String ROUTING_KEY = "myRoutingKey";

    @Bean
    public Queue myQueue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    @Bean
    public DirectExchange myExchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }

    @Bean
    public Binding myBinding(Queue myQueue, DirectExchange myExchange) {
        return BindingBuilder.bind(myQueue).to(myExchange).with(ROUTING_KEY);
    }
}

4. 配置消息生产者

确保消息生产者配置了发布确认和消息持久化:

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.UUID;

@Service
public class MessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        // 设置发布确认回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    System.out.println("Message delivered successfully: " + correlationData);
                } else {
                    System.err.println("Failed to deliver message: " + correlationData + ", cause: " + cause);
                }
            }
        });

        // 设置消息返回回调
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            System.err.println("Returned Message: " + new String(message.getBody()) +
                    ", replyCode: " + replyCode + ", replyText: " + replyText +
                    ", exchange: " + exchange + ", routingKey: " + routingKey);
        });
    }

    public void sendMessage(String message) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, message, correlationData);
    }
}

5. 配置消息消费者

确保消息消费者配置了消息确认机制:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.support.Acknowledgment;
import org.springframework.stereotype.Service;

@Service
public class MessageConsumer {

    @RabbitListener(queues = RabbitConfig.QUEUE_NAME)
    public void handleMessage(String message, Channel channel, Message message) throws Exception {
        try {
            // 处理消息
            System.out.println("Received Message: " + message);
            
            // 消息确认
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 消费失败,重新放回队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
}

6. 启用重试机制

在Spring Cloud Stream中启用重试机制:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: myQueue
          consumer:
            retry:
              max-attempts: 5
              backOffPolicy:
                initialInterval: 1000
                multiplier: 2.0
                maxInterval: 10000

7. 测试

测试消息生产和消费,确保消息在各种情况下都不会丢失,包括网络故障、RabbitMQ服务器重启等。

总结

通过以上步骤,你可以在Spring Cloud项目中使用RabbitMQ并确保消息不会丢失。关键在于:

  1. 消息和队列的持久化:确保消息和队列都是持久化的。
  2. 发布确认:启用发布确认回调机制,确保消息被正确地发送到RabbitMQ。
  3. 消费者确认:确保消费者正确地确认消息。
  4. 重试机制:在消费失败时启用重试机制,以确保消息最终能够被成功处理。

通过这些配置,可以显著提高消息传输的可靠性,防止消息丢失。


http://lihuaxi.xjx100.cn/news/2215828.html

相关文章

Dell戴尔灵越Inspiron 16 Plus 7640/7630笔记本电脑原装Windows11下载,恢复出厂开箱状态预装OEM系统

灵越16P-7630系统包: 链接&#xff1a;https://pan.baidu.com/s/1Rve5_PF1VO8kAKnAQwP22g?pwdjyqq 提取码&#xff1a;jyqq 灵越16P-7640系统包: 链接&#xff1a;https://pan.baidu.com/s/1B8LeIEKM8IF1xbpMVjy3qg?pwdy9qj 提取码&#xff1a;y9qj 戴尔原装WIN11系…

监控易功能点:设备管理中的新增设备自动发现与配置

在现代化的IT环境中&#xff0c;随着业务的发展和技术的不断更新&#xff0c;企业需要管理的设备和系统日益增多。如何快速、准确地识别并配置这些新增设备&#xff0c;成为了企业运维团队面临的重要挑战。监控易凭借其强大的设备管理能力&#xff0c;特别是新增设备自动发现与…

注册中心理论学习

注册中心介绍 注册中心&#xff08;也称为服务注册中心或服务发现服务&#xff09;是微服务架构中的一个关键组件&#xff0c;它负责服务的注册与发现。在微服务体系中&#xff0c;服务实例的数量和位置是动态变化的&#xff0c;注册中心提供了一个集中的地方来存储这些信息&a…

adb简单使用命令

1. 查看当前连接的设备 adb devices 2. 文件路径 adb install apk 注意文件路径中不能有中文 3. adb shell ps Toplogcatlogcat可以结合greplogcat *:w 显示警告级别以上de 4. adb uninstall 软件名 1.软件名如何查看&#xff1f; 使用命令adb shell pm list packages会…

基于springboot实现火锅店管理系统项目【项目源码+论文说明】

基于springboot实现火锅店管理系统演示 摘要 传统办法管理信息首先需要花费的时间比较多&#xff0c;其次数据出错率比较高&#xff0c;而且对错误的数据进行更改也比较困难&#xff0c;最后&#xff0c;检索数据费事费力。因此&#xff0c;在计算机上安装火锅店管理系统软件来…

植物大战僵尸杂交版全新版v2.1解决全屏问题

文章目录 &#x1f68b;一、植物大战僵尸杂交版❤️1. 游戏介绍&#x1f4a5;2. 如何下载《植物大战僵尸杂交版》 &#x1f680;二、解决最新2.1版的全屏问题&#x1f308;三、画质增强以及减少闪退 &#x1f68b;一、植物大战僵尸杂交版 《植物大战僵尸杂交版》是一款在原版《…

C++模板之模板成员函数不能偏特化

目录 1.引言 2.类模板成员函数的特化 2.1.没有函数特化的类模板 2.2.增加函数特化 3.“曲线救国”函数“偏特化” 3.1.函数重载实现“偏特化” 3.2.使用类型选择机制实现“偏特化” 4.总结 1.引言 C 泛型编程的资料在介绍类模板的特化和偏特化的时候&#xff0…

【在线OJ】vue分页+SpringBoot分页模板代码

一、Vue <template><div><el-table:data"user"style"width: 120%"><el-table-columnlabel"id"width"180"><template slot-scope"scope"><i class"el-icon-time"></i>&…