使用 RocketMQ 如何保证消息不丢失

参考答案

这个是面试官最喜欢问的问题,也是所有 MQ 都需要面对的一个共性问题。

大致的解决思路都是一致的,只是不同的 MQ 产品,有不同的解决方案。

分析这个问题,要从以下几个角度入手:

 

1   哪些环节会有丢消息的可能

我们考虑一个通用的MQ场景:

使用 RocketMQ 如何保证消息不丢失

 

第 1、2、4 个环节都是跨网络的,只要跨网络,就一定有丢消息的可能。

关于第 3 环节,通常 MQ 存盘时,都会先写入操作系统的缓存 page cache 中,再由操作系统异步的将消息写入硬盘。这个中间有个时间差,有可能会造成消息丢失。例如:服务挂了,缓存中还没有来得及写入硬盘的消息就会丢失。 这个是 MQ 场景都会面对的、通用的丢消息问题。

接下来,我们再来看看用 Rocket 如何解决这个问题。

 

2  RocketMQ 消息零丢失方案

2.1   生产者使用事务消息机制保证消息零丢失

RocketMQ 的事务消息机制,就是为了保证零丢失来设计的,已经经过了阿里的验证,非常靠谱。

我们以最常见的电商订单场景为例,来简单分析下事务消息机制如何保证消息不丢失,进一步确认这个事务消息到底是不是真靠谱。

先来看看下面这个流程图:

使用 RocketMQ 如何保证消息不丢失

我们逐个剖析。

问题一:为什么要发送个 half 消息?有什么用?

这个 half 消息是在订单系统进行下单操作前发送,并且对下游服务的消费者是不可见的。

这个消息的作用更多的体现在确认RocketMQ的服务是否正常。相当于嗅探下 RocketMQ 服务是否正常,并且通知 RocketMQ ,我马上就要发一个很重要的消息了,你做好准备。

问题二:half消息如果写入失败了怎么办?

如果没有 half 消息这个流程,那我们通常是会在订单系统中先完成下单,再发送消息给 MQ 。这时候写入消息到 MQ ,如果失败就会非常尴尬了。

而 half 消息如果写入失败,我们就可以认为 MQ 的服务是有问题的,这时,就不能通知下游服务了。我们可以在下单时给订单一个状态标记,然后等待 MQ 服务正常后再进行补偿操作,等 MQ 服务正常后重新下单通知下游服务。

问题三:订单系统写数据库失败了怎么办?

这个问题,我们同样比较下“没有使用事务消息机制时会怎么办?”。

如果没有使用事务消息,我们只能判断下单失败,抛出了异常,那就不往 MQ 发消息了,至少不会对下游服务进行错误的通知。如果过一段时间数据库恢复过来了,这个消息就无法再次发送了。

当然,也可以设计另外的补偿机制。例如:将订单数据缓存起来,再启动一个线程定时尝试往数据库写。

使用事务消息机制,就可以有一种更优雅的方案。

下单时,写数据库失败(可能是数据库崩了,需要等一段时间才能恢复)。那我们可以另外找个地方,把订单消息先缓存起来(Redis、文本或者其他方式),然后给 RocketMQ 返回一个 UNKNOWN 状态。

这样, RocketMQ 就会过一段时间来回查事务状态。我们就可以在回查事务状态时,再尝试把订单数据写入数据库,如果数据库这时候已经恢复了,那就能完整正常的下单,再继续后面的业务。这样这个订单的消息,就不会因为数据库临时崩了而丢失。

问题四:half 消息写入成功后 RocketMQ 挂了怎么办?

在事务消息的处理机制中,未知状态的事务状态回查,是由 RocketMQ 的 Broker 主动发起的。

也就是说,如果出现了这种情况,那 RocketMQ 就不会回调到事务消息中,回查事务状态的服务。

这时,可以将订单一直标记为”新下单”的状态。等 RocketMQ 恢复之后,只要存储的消息没有丢失,RocketMQ 就会再次继续状态回查的流程。

问题五:下单成功后如何优雅的等待支付成功?

在订单场景下,通常会要求下单完成后,客户在一定时间内完成订单支付。支付完成之后,才会通知下游服务进行进一步的营销补偿。

如果不用事务消息,那通常会怎么办?

最简单的方式,是启动一个定时任务,每隔一段时间扫描订单表,比对未支付的订单的下单时间,将超过时间的订单回收。这种方式显然是有很大问题的,需要定时扫描很庞大的一个订单信息,这对系统是个不小的压力。

那更进一步的方案是什么呢?

能否使用 RocketMQ 提供的延迟消息机制,往 MQ 发一个延迟 1 分钟的消息,消费到这个消息后去检查订单的支付状态。

如果订单已经支付,就往下游发送下单的通知。如果没有支付,就再发一个延迟1分钟的消息,最终在第十个消息时把订单回收。

这个方案只需要每次处理一个单独的订单消息,就不用对全部的订单表进行扫描了。

那如果使用上了事务消息呢?

我们可以用事务消息的状态,回查机制来替代定时的任务。

在下单时,给 Broker 返回一个 UNKNOWN 的未知状态。而在状态回查的方法中去查询订单的支付状态,这样整个业务逻辑就会简单很多了。

我们只需要配置 RocketMQ 中的事务消息回查次数(默认15次)、事务回查间隔时间(messageDelayLevel),就可以更优雅的完成这个支付状态检查的需求。

问题六:事务消息机制的作用

整体来说,在订单这个场景下,消息不丢失的问题,实际上转化成了下单这个业务、与下游服务的业务的分布式事务一致性问题。

事务一致性问题,一直以来都是一个非常复杂的问题。

RocketMQ 的事务消息机制,实际上只保证了整个事务消息的一半,保证的是订单系统下单和发消息这两个事件的事务一致性,而对下游服务的事务并没有保证。

但是,即便如此,目前来看,这也是分布式事务的一个很好的降级方案。

 

2.2   RocketMQ配置同步刷盘+Dledger主从架构保证MQ自身不会丢消息

同步刷盘

看了我们之前的分析,这个就很好理解了。

把 RocketMQ 的刷盘方式  flushDiskType 配置成同步刷盘,就能保证消息在刷盘过程中不会丢失。

Dledger的文件同步

使用 RocketMQ 如何保证消息不丢失

在使用 Dledger 技术搭建的 RocketMQ 集群中,Dledger 会通过两阶段提交的方式,保证文件在主从之间成功同步。

简单来说,数据同步会通过两个阶段:uncommitted阶段、commited阶段。

  • Leader Broker 上的 Dledger 收到一条数据后,会标记为 uncommitted 状态,通过自己的DledgerServer 组件,把这个 uncommitted 数据发给 Follower Broker 的 DledgerServer 组件。
  • 接着,Follower Broker 的 DledgerServer 收到 uncommitted 消息之后,必须返回一个 ack 给 Leader Broker 的 Dledger 。
  • 如果 Leader Broker 收到超过半数的 Follower Broker 返回的 ack ,就会把消息标记为 committed 状态。 Leader Broker 上的 DledgerServer ,就会发送 committed 消息给 Follower Broker 上的DledgerServer ,让他们把消息也标记为 committed 状态。

这样,就基于 Raft 协议完成了两阶段的数据同步。

2.3   消费者端不要使用异步消费机制

正常情况下,消费者端都是需要先处理本地事务,然后再给 MQ 一个 ACK 响应,这时 MQ 就会修改 Offset ,将消息标记为已消费,从而不再往其他消费者推送消息。

所以,在 Broker 的这种重新推送机制下,消息是不会在传输过程中丢失的。

但是,也会有下面这种情况,造成服务端消息丢失。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                    ConsumeConcurrentlyContext context) {
        new Thread(){
            public void run(){
                //处理业务逻辑
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            }
        };
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
复制代码

这种异步消费的方式,就有可能造成消息状态返回后,消费者本地业务逻辑处理失败,造成消息丢失的可能。

2.4  NameServer 挂了,如何保证消息不丢失?

这个是 RocketMQ 特有的问题。

在 RocketMQ 中,NameServer 扮演的是一个路由中心的角色,提供到 Broker 的路由功能。

路由中心这样的功能,在所有的 MQ 中都是需要的。

Kafka 是用 zookeeper 和一个作为 Controller的 Broker 一起来提供路由服务,整个功能相当复杂。而RabbitMQ 是则由每一个 Broker 来提供路由服务,RocketMQ 把这个路由中心单独抽取出来、并独立部署。 NameServer 集群中任意多的节点挂掉,都不会影响它提供的路由功能。

如果集群中所有的NameServer节点都挂了呢?

有很多人认为,在生产者和消费者中都会有全部路由信息的缓存副本,那整个服务可以正常工作一段时间。

我们做一下实验,当 NameServer 全部挂了后,生产者和消费者立即无法工作。那再回到我们的消息不丢失的问题,在这种情况下,RocketMQ 相当于整个服务都不可用了,那他本身肯定无法给我们保证消息不丢失了。

我们只能自己设计一个降级方案来处理这个问题。例如,在订单系统中,如果多次尝试发送RocketMQ 不成功,那就只能另外找给地方(Redis、文件或者内存等)把订单消息缓存下来,然后起一个线程定时的扫描这些失败的订单消息,尝试往 RocketMQ 发送。这样等 RocketMQ 的服务恢复后,就能第一时间把这些消息重新发送出去。

整个这套降级的机制,在大型互联网项目中,都是必须要有的。

 

以上,是消息队列 RocketMQ 面试题【使用 RocketMQ 如何保证消息不丢失】的参考答案。

输出,是最好的学习方法

欢迎在评论区留下你的问题、笔记或知识点补充~

—end—

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧