【架构系列】RabbitMQ应用场景及在实际项目中如何搭建可靠的RabbitMQ架构体系

作者:后端小肥肠

创作不易,未经允许禁止转载。

1. 前言

RabbitMQ,作为一款高性能、可靠的消息队列软件,已经成为许多企业和开发团队的首选之一。它的灵活性和可扩展性使得它适用于各种应用场景,从简单的任务队列到复杂的分布式系统。本文将深入探讨RabbitMQ的应用场景以及如何在实际项目中构建可靠的RabbitMQ架构体系。

2. RabbitMQ应用场景

2.1 异步处理

在现代应用中,异步消息处理是提升用户体验和系统效率的关键。RabbitMQ可以有效地用于多种异步处理任务,例如:

  • 用户注册后的邮件发送:用户注册后,通过RabbitMQ发送一个消息到队列中,由后台服务监听并处理发送邮件的任务,从而不会延迟用户的注册过程。
  • 订单处理:在电商平台中,订单处理包括库存管理、支付确认等多个步骤,RabbitMQ可以用来在这些服务间异步传递订单信息,确保处理流程的连续性和效率。

2.2 应用解耦

RabbitMQ支持多种通信模式,如点对点、发布/订阅等,这些模式帮助系统各部分保持低耦合度,便于独立扩展和维护。例如:

  • 微服务架构中的服务通信:在微服务架构中,RabbitMQ允许各个微服务之间通过消息进行交互,而不是直接调用对方的API,这种方式减少了服务间的直接依赖。

2.3 流量削峰

在流量高峰期,如促销或大型活动期间,系统可能会遭遇巨大的访问压力。RabbitMQ可以用来缓冲入站消息,如订单或请求,从而保护后端服务不被过载:

  • 秒杀活动中的订单处理:在秒杀活动中,大量的购买请求可以先进入RabbitMQ队列,系统根据处理能力逐步从队列中取出并处理这些请求,有效避免了系统崩溃。

2.4 通信与集成

RabbitMQ提供了一个灵活的消息传递系统,可以集成复杂的企业系统。它支持多种协议和广泛的开发语言库,适用于:

  • 跨平台通信:在不同操作系统和不同编程语言编写的应用之间,RabbitMQ可以作为消息传递中间件,实现这些系统的有效通信。

2.5 日志处理和应用监控

RabbitMQ也常用于系统日志处理和监控。它可以聚合各服务产生的日志信息,并传输到日志分析系统:

  • 集中式日志管理:通过RabbitMQ,各个系统和应用的日志可以被统一收集至一个中央处理位置,便于进行日志分析、监控和报警。

2.6 数据同步

RabbitMQ 在数据同步中扮演着重要的角色,特别是在分布式系统中,它能够确保数据在多个系统或组件之间保持一致性和最新状态。这对于维护数据的完整性和及时性至关重要。例如:

  • 数据库同步:在多地数据中心运营的情况下,RabbitMQ 可以用来同步不同地点的数据库。通过消息队列,当一个数据中心的数据库更新时,相应的变更可以通过 RabbitMQ 发送到其他数据中心,从而保证所有地点的数据一致。

  • 实时数据复制:在金融服务或电子商务平台,实时数据复制是保证高可用性和灾难恢复的关键。使用 RabbitMQ,可以实现高效的数据复制策略,如将交易数据从主系统复制到备份系统或分析数据库。

  • 缓存刷新:在使用缓存提高应用性能的情况下,RabbitMQ 可以用来在数据更新时自动通知系统刷新缓存。这样,用户总是能够获取到最新的数据,而不是过时的缓存数据。

通过这些应用场景,可以看出RabbitMQ在现代软件架构中扮演的多样化角色,不仅增强了系统的可靠性和伸缩性,还提高了开发和运维的效率。

3. 在项目中如何搭建稳定RabbitMQ架构体系

3.1. RabbitMQ安装

网上RabbitMQ安装教程很多,本文只简述基于docker安装的核心步骤:

1. 环境准备,准备Cenos虚拟机,我的是7.x版本:

2. 拉取或解压RabbitMQ镜像:

3. 运行docker容器:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v /home/docker/rabbitmq/rabbitmq:/var/lib/rabbitmq -v /home/docker/rabbitmq/rabbitmq_conf:/etc/rabbitmq   -e RABBITMQ_DEFAULT_VHOST=km_vhost  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:latest

4. 进入容器 :

 docker exec -it 容器id /bin/bash

5. 运行rabbitmq-plugins enable rabbitmq_management(解决无法访问网页端15672端口问题),即可完成RabbitMQ安装。

3.2. 总体技术流程

本文以异步处理应用场景为例,展示如何构建稳定可靠的RabbitMQ架构体系:

上述流程为异步消息通信的技术流程,在异步消息通信中当消息投递后就立刻返回了结果,我们无法获取消息消费的具体过程,这就导致了虽然我们可以即刻获取程序返回状态,但是程序执行细节或是否失败无法通过程序响应返回的方式获取。

基于以上RabbitMQ异步通信的优缺点,我们要搭建一个可靠的RabbitMQ架构需要从以下几个方面入手:

生产者稳定架构:

1. 消息投递回调监听。创建消息投递回调监听函数,监听生产者投递的消息是否投递成功。

2. 消息确认表创建。创建消息确认表message_confirmation,记录消息投递状态,其中字段status反应了是否投递成功(0为为投递成功,1为投递成功)。

CREATE TABLE "public"."message_confirmation" (
  "id" varchar(100) COLLATE "pg_catalog"."default" NOT NULL,
  "status" int4,
  "create_time" timestamp(6),
  "update_time" timestamp(6),
  "message" varchar(255) COLLATE "pg_catalog"."default",
  CONSTRAINT "message_confirmation_pkey" PRIMARY KEY ("id")
)
;

ALTER TABLE "public"."message_confirmation" 
  OWNER TO "postgres";

3. 创建定时任务监听消息投递确认表。每隔一段时间遍历消息确认表,筛选出status为0的消息数据,进行重复投递动作。

消费者稳定架构

1. 死信队列运用。由于网络或外部因素导致消息消费失败,可将消息投递至死信队列进行二次消费。

2. 日志表记录。如死信队列也消费失败,可将消息写入日志表(message_error)后进行手动消费,由技术人员获取日志表中消费失败记录,排查消费失败原因。

CREATE TABLE "public"."message_error" (
  "id" varchar(100) COLLATE "pg_catalog"."default" NOT NULL,
  "message_id" varchar(100) COLLATE "pg_catalog"."default" NOT NULL,
  "error_log" text COLLATE "pg_catalog"."default",
  "create_time" timestamp(6),
  "update_time" timestamp(6),
  CONSTRAINT "message_error_pkey" PRIMARY KEY ("id")
)
;

ALTER TABLE "public"."message_error" 
  OWNER TO "postgres";

3.3. 实战讲解

3.3.1. 环境配置
3.3.1.1. 所需版本工具
3.3.1.2. pom依赖
<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-validation</artifactId>
        </dependency>
         <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
       <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
        </dependency>
</dependencies>
3.3.2. 生产者核心代码讲解

3.3.2.1. yml配置
server:
  port: 8873
spring:
  datasource:
    url: jdbc:postgresql://127.0.0.1:5432/xfc_mq_producer
    username: postgres
    password: postgres
    driver-class-name: org.postgresql.Driver
  rabbitmq:
    port: 5672
    host: 192.168.10.11
    username: admin
    password: admin
    virtual-host: my_vhost
    publisher-confirm-type: correlated
    listener:
      simple:
        acknowledge-mode: manual
3.3.2.2. 编写回调函数
 @PostConstruct
    public void regCallback() {
        // 消息发送成功以后,给予生产者的消息回执,来确保生产者的可靠性
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("cause:"+cause);
                // 如果ack为true代表消息已经收到
                String messageId = correlationData.getId();

                if (!ack) {
                    // 这里可能要进行其他的方式进行存储
                    log.error("MQ队列应答失败,messageId是:" + messageId);
                    return;
                }

                try {
                    MessageConfirmation messageConfirmation = messageConfirmationMapper.selectById(messageId);
                    messageConfirmation.setStatus(1);
                    int count=messageConfirmationMapper.updateById(messageConfirmation);
                    if (count == 1) {
                        log.info("本地消息状态修改成功,消息成功投递到消息队列中...");
                    }
                } catch (Exception ex) {
                    log.error("本地消息状态修改失败,出现异常:" + ex.getMessage());
                }
            }
        });
    }

上述回调函数主要用于监听生产者发送的消息是否发送成功,并将消息发送状态更新至消息确认表中。

3.3.2.3. 编写定时任务监听消息确认表
@Configuration
@EnableScheduling
@Slf4j
public class confirmMessageTaskService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    MessageConfirmationMapper messageConfirmationMapper;

    @Scheduled(cron = "0 */1 * * * ?")
    public void sendMessage(){
        // 把消息为0的状态消息重新查询出来,投递到MQ中。
        LambdaQueryWrapper<MessageConfirmation> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(MessageConfirmation::getStatus, 0);
        List<MessageConfirmation> noConfirmMessages = messageConfirmationMapper.selectList(queryWrapper)
                .stream()
                .collect(Collectors.toList());
        noConfirmMessages.forEach((noConfirmMessage)->{
            rabbitTemplate.convertAndSend("xz_push_exchange","", JsonUtil.obj2String(noConfirmMessage),
                    new CorrelationData(noConfirmMessage.getId()));
        });
    }
}

 上述定时任务为每分钟遍历消息确认表,将status=0的消息筛选出来进行消息投递。

3.3.2.4. 消息投递
    public void sendMessage(MessageConfirmation messageConfirmation) {
        messageConfirmationMapper.insert(messageConfirmation);
        rabbitTemplate.convertAndSend("xfc_fanout_exchange","", JsonUtil.obj2String(messageConfirmation),
                new CorrelationData(messageConfirmation.getId()));
    }

3.4. 消费者核心代码讲解

3.4.1. yml配置
server:
  port: 8872
spring:
  datasource:
    url: jdbc:postgresql://127.0.0.1:5432/xfc_mq_consumer
    username: postgres
    password: postgres
    driver-class-name: org.postgresql.Driver
  rabbitmq:
    port: 5672
    host: 192.168.10.11
    username: admin
    password: admin
    virtual-host: my_vhost
    listener:
      simple:
        acknowledge-mode: manual
mybatis-plus:
  typeAliasesPackage: com.xfc.consumer.entities
  mapper-locations: classpath:mapper/*.xml
3.4.2. RabbitMQ配置类
@Configuration
public class RabbitMQConfig {
    /**
     * 死信队列
     * @return
     */
    @Bean
    public FanoutExchange deadExchange() {
        return new FanoutExchange("dead_xfc_fanout_exchange", true, false);
    }

    @Bean
    public Queue deadXfcQueue() {
        return new Queue("dead.xfc.queue", true);
    }
    @Bean
    public Binding bindDeadXfc() {
        return BindingBuilder.bind(deadXfcQueue()).to(deadExchange());
    }

    /**
     * 队列
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("xfc_fanout_exchange", true, false);
    }

    @Bean
    public Queue xfcQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", "dead_xfc_fanout_exchange");
        return new Queue("xfc.queue", true, false, false, args);
    }

    @Bean
    public Binding bindXfc() {
        return BindingBuilder.bind(xfcQueue()).to(fanoutExchange());
    }
}

上述代码为RabbitMQ配置类,用于在项目初始化时生成相应的交换机和队列。 

3.4.3. 队列消费
@Service
@Slf4j
public class XfcMqConsumer {
    @RabbitListener(queues = {"xfc.queue"})
    public void messageconsumer(String message, Channel channel,
                                CorrelationData correlationData,
                                @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        MessageConfirmation messageConfirmation=null;
        try {
            log.info("收到MQ的消息是: " + message );
            messageConfirmation= JsonUtil.string2Obj(message, MessageConfirmation.class);
            /**
             * 编写业务逻辑
             */
            
        } catch (Exception e) {
            e.printStackTrace();
            log.error("消息投放到死信队列"+e.getMessage(),e);
            channel.basicNack(tag,false,false);// 死信队列
        }
    }
}
3.4.4. 死信队列消费
@Service
@Slf4j
public class DeadMqConsumer {
    @Autowired
    MessageErrorMapper messageErrorMapper;
    @RabbitListener(queues = {"dead.xfc.queue"})
    public void messageconsumer(String message, Channel channel,
                                CorrelationData correlationData,
                                @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        MessageConfirmation messageConfirmation=null;
        try {
            log.info("收到MQ的消息是: " + message );
            messageConfirmation= JsonUtil.string2Obj(message, MessageConfirmation.class);
            /**
             * 编写业务逻辑
             */
        } catch (Exception e) {
            e.printStackTrace();
            /**
             * 写入message_error
             */
            messageErrorMapper.insert(new MessageError(messageConfirmation.getId(),e.getMessage(),new Date()));
            channel.basicNack(tag,false,false);// 死信队列
        }
    }
}

3.5 效果测试

以上代码编写完成后需要进行架构效果测试,其步骤如下:

1. 消息投递测试

上图调用了消息投递接口。

在消息确认表中,新增了一条消息且status=1,代表该条消息已投递成功。

2. 消费者正常消费测试

3. 消费异常测试

上图可看出消息消费异常投入到了死信队列。

在死信队列中依然消费失败。

消费失败后成功写入了日志表。

4. 结语

本文讲解了RabbitMQ应用场景以及在异步处理场景中如何搭建稳定的RabbitMQ架构体系,逐步详细的给出了生产者及消费者端代码并在文章最后对架构效果进行了测试,感兴趣的同学可根据代码进行实操,有疑问和其他见解也可在评论区留言,我看到都会回复。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/592163.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

阿里低代码引擎学习记录

官网 一、关于设计器 1、从设计器入手进行低代码开发 设计器就是我们用拖拉拽的方法&#xff0c;配合少量代码进行页面或者应用开发的在线工具。 阿里官方提供了以下八个不同类型的设计器Demo&#xff1a; 综合场景Demo&#xff08;各项能力相对完整&#xff0c;使用Fusion…

【前端项目——分页器】手写分页器实现(JS / React)

组件介绍 用了两种方式实现&#xff0c;注释详细~ 可能代码写的不够简洁&#xff0c;见谅&#x1f641; 1. 包含内容显示的分页器 网上看了很多实现&#xff0c;很多只有分页器部分&#xff0c;没和内容显示联动。 因此我增加了模拟content的显示&#xff0c;这里模拟了32条数…

Python数据分析案例44——基于模态分解和深度学习的电负荷量预测(VMD+BiGRU+注意力)

案例背景 承接之前的案例&#xff0c;说要做模态分解加神经网络的模型的&#xff0c;前面纯神经网络的缝合模型参考数据分析案例41和数据分析案例42。 虽然我自己基于各种循环神经网络做时间序列的预测已经做烂了.....但是还是会有很多刚读研究生或者是别的领域过来的小白来问…

Monorepo(单体仓库)与MultiRepo(多仓库): Monorepo 单体仓库开发策略与实践指南

&#x1f31f; 引言 在软件开发的浩瀚宇宙里&#xff0c;选择合适的代码管理方式是构建高效开发环境的关键一步。今天&#xff0c;我们将深入探讨两大策略——Monorepo&#xff08;单体仓库&#xff09;与MultiRepo&#xff08;多仓库&#xff09;&#xff0c;并通过使用现代化…

Vue3 + Vite + TypeScript + Element-Plus创建管理系统项目

官方文档 Vue3官网 Vite官方中文文档 创建项目 使用npm命令创建项目&#xff1a; npm create vitelatest输入项目名称&#xff1a; ? Project name:项目名选择vue&#xff1a; ? Select a framework: - Use arrow-keys. Return to submit.Vanilla > VueReactPrea…

【网站项目】木里风景文化管理平台

&#x1f64a;作者简介&#xff1a;拥有多年开发工作经验&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的项目或者毕业设计。 代码可以私聊博主获取。&#x1f339;赠送计算机毕业设计600个选题excel文件&#xff0c;帮助大学选题。赠送开题报告模板&#xff…

CSS精灵图、字体图标、HTML5新增属性、界面样式和网站 favicon 图标

精灵图 为什么要使用精灵图 一个网页中往往会应用很多小的背景图像作为修饰&#xff0c;当网页中的图像过多时&#xff0c;服务器就会频繁地接收和发送请求图片&#xff0c;造成服务器请求压力过大&#xff0c;这将大大降低页面的加载速度,因此&#xff0c;为了有效地减少服务…

JAVA基础|常用API-JDK8之前传统的日期,时间

一. Date &#xff08;一&#xff09;说明 代表的是日期和时间 &#xff08;二&#xff09;常用的用法 构造器说明public Date()创建一个Date对象&#xff0c;代表的是系统当前此刻日期时间public Date(long time)把时间毫秒值转换成Date日期对象 常见方法说明public long …

算法提高之潜水员

算法提高之潜水员 核心思想&#xff1a;二维01背包 两个容量v1v2注意状态计算时j和p可以<各自的v #include <iostream>#include <cstring>#include <algorithm>using namespace std;const int N 1010,M 80,K 22;int f[K][M];int k,V1,V2;int main(){ci…

FloodFill-----洪水灌溉算法(DFS例题详解)

目录 一.图像渲染&#xff1a; 代码详解&#xff1a; 二.岛屿数量&#xff1a; 代码详解&#xff1a; 三.岛屿的最大面积&#xff1a; 代码详解&#xff1a; 四.被围绕的区域&#xff1a; 代码详解&#xff1a; 五.太平洋大西洋水流问题&#xff1a; 代码详解&#x…

锂电池充放电方式曲线

作为一种“化学能-电能”相互转换的能量装置&#xff0c;锂电池在使用过程中必然会进行充电和放电&#xff0c;合理的充放电方式既能减轻锂电池的损伤程度&#xff0c;又能充分发挥锂电池的性能&#xff0c;具有重要的应用价值。 如《GB/T 31484-2015&#xff1a;电动汽车用动…

非对称齿轮的跨棒距算的对不对

前面有一期咱们聊了非对称齿轮《》&#xff0c;非对称齿轮的齿厚测量一般都为跨棒距。最近研究了下计算方法&#xff0c;对计算结果的正确性做了下验证。 在MATLAB中编制了相关的计算程序&#xff1a; 齿轮的模数4&#xff0c;左侧分度圆压力角25&#xff0c;右侧分度圆压力角…

Sqli-labs第一关到第四关

目录 一&#xff0c;了解PHP源代码 二&#xff0c;破解第一关 2.1在了解完源码之后&#xff0c;我们重点看一下 2.2破解这道题表中有几列 2.3查看表中哪一列有回显 2.4查询库&#xff0c;表&#xff0c;列信息 三&#xff0c;总结 前提&#xff1a; 之所以把1234关…

MySQL基础_1.MySQL概述

文章目录 一、关系型数据库和非关系型数据库1.1 关系型&#xff08;RDBMS&#xff09;1.2 非关系型&#xff08;非RDBMS&#xff09; 二、常用的基础语句2.1 查看表的创建信息2.2 编码问题 一、关系型数据库和非关系型数据库 1.1 关系型&#xff08;RDBMS&#xff09; 是最古…

都上3D数字孪生了,2D的WEB组态和大屏可视化未来的发展在哪里?趋势是基于页面嵌套、蓝图连线等新技术,与功能业务应用融合

首先回顾下组态工具的发展史&#xff1a; 回顾发展史&#xff0c;WEB组态终于可以搭建业务系统了&#xff01;&#xff08;页面嵌套 节点编辑 WEB组态 上位机 大屏可视化 无代码 0代码 iframe nodered 蓝图&#xff09;-CSDN博客文章浏览阅读624次&#xff0c;点赞12次&#x…

ThreeJS:纹理的颜色空间

色彩空间Color Space 在ThreeJS中&#xff0c;纹理的colorSpace属性用于定义文里的颜色空间。 颜色空间是一个用于描述颜色的数学模型&#xff0c;在现实生活中&#xff0c;人眼可以观察到无数种颜色&#xff0c;而颜色空间就是用来描述这些颜色的一个方法&#xff0c;不同的颜…

C语言-自定义类型:结构体,枚举,联合

目录 一、结构体1.1 结构体变量的定义和初始化1.2 结构体内存对齐1.3 修改默认对齐数1.4 结构体传参 二、位段2.1 什么是位段2.2 位段的内存分配2.3 位段的跨平台问题2.4 位段的应用 三、枚举3.1 枚举类型的定义3.2 枚举的优点 四、联合&#xff08;共用体&#xff09;4.1 联合…

c#数据库: 9.删除和添加新字段/数据更新

先把原来数据表的sexy字段删除,然后重新在添加字段sexy,如果添加成功,sexy列的随机内容会更新.原数据表如下: using System; using System.Collections.Generic; using System.Data; using System.Data.Common; using System.Data.SqlClient; using System.Linq; using System.…

Linux理解文件操作 文件描述符fd 理解重定向 dup2 缓冲区 C语言实现自己的shell

文章目录 前言一、文件相关概念与操作1.1 open()1.2 close()1.3 write()1.4 read()1.4 写入的时候先清空文件内容再写入1.5 追加&#xff08;a && a&#xff09; 二、文件描述符2.1 文件描述符 fd 0 1 2 的理解2.2 FILE结构体&#xff1a;的源代码 三、深入理解文件描述…

jupyter notebook 设置密码报错ModuleNotFoundError: No module named ‘notebook.auth‘

jupyter notebook 设置密码报错ModuleNotFoundError: No module named ‘notebook.auth‘ 原因是notebook新版本没有notebook.auth 直接输入以下命令即可设置密码 jupyter notebook password
最新文章