最后更新:2020-05-22 10:07:35 手机定位技术交流文章

作者|丁伟
来源|中间件兴趣圈
问题现象
在首先收到项目反馈后,使用RocketMQ时会出现以下错误:
错误消息关键点:MQBrokerException:CODE:2DESC:2DESC:问题分析
首先,我们根据关键字:TIMEOUT_CLEAN_QUEUE来查询RocketMQ,以找出上面的错误何时会被抛出。根据全文搜索如下图所示:
这个方法是在代理快速脆弱中定义的,它的设计目的可以通过它的名字看到:代理端快速故障机制。
代理端快速故障示意图如下:
消息发送者向代理发送消息写请求。代理将在收到请求后首先将请求放入队列(SendThreadPoolQueue),默认容量为10000。
代理将使用一个特殊的线程池(SendMessageExecutor)从队列中获取任务并执行消息写请求。为了确保消息的顺序处理,这个线程池的默认线程数是1。
如果单个写数据由于垃圾收集和代理端的其他因素而抖动,则单个代理端的请求积压量太大,无法及时处理,这将大大延长客户端消息的发送时间。
想象一下,如果由于代理压力增加而需要500毫秒甚至超过1秒来写一条消息,并且队列中有5000条消息,那么消息发送方的默认超时时间是3秒。如果以这种速度,当轮到代理执行写请求时,客户端已经超时了,这不仅会导致大量无效处理,还会导致客户端发送超时。
因此,为了解决这个问题,RocketMQ引入了代理端快速故障机制,即每隔10毫秒启动一个调度线程来检查队列中的第一个队列节点。如果节点的排队时间超过200毫秒,它将取消队列中所有超过200毫秒的请求,并立即将故障返回给客户端,以便客户端可以尽快重试。由于代理部署在集群中,下一次重试可以发送给其他代理,这样可以保证在默认的3s时间内通过重试机制最大限度地发送消息,并且可以有效避免由于某个代理的高瞬时压力而导致的消息发送不可用,从而实现消息发送的高可用性。
从引入代理端快速故障机制的初衷来看,在快速故障后将启动重试。除非群集中的所有代理同时忙碌,否则消息将被成功发送。用户不会察觉到这个错误。那么用户为什么会感觉到呢?超时_清除_队列是否错误,并且代理没有重试?
为了解开这个谜团,源代码分析方法将被用来探索真相。其次,以同步消息传输为例,揭示其消息传输过程中的核心要点。
MQClient消息发送方将首先使用网络通道向代理发送请求,然后在收到请求结果后,将调用ProcessSendResponse方法来分析响应结果,如下图所示:
此处返回的代码是remote sysresponsescode . system _ busy。
从proccessSendResponse方法可以知道,如果代码是SYSTEM_BUSY,该方法将抛出MQBrokerException,响应代码是SYSTEM_BUSY,其错误描述是开头的错误消息。
然后,我们可以沿着方法的调用链接找到它的直接调用方:DefaultMQProducer的SendKernelImpl。我们将关注如果基础方法抛出一个MQBrokerException,该方法将如何处理。
关键代码如下图所示:
可以看出,在SendKernelImpl方法中,将首先捕获异常,首先执行注册的钩子函数,也就是说,即使执行失败,也将执行相应的消息发送post钩子函数,然后完整地抛出异常。
DefaultMQProducerImpl的SendDefaultImpl方法调用了SendKernelImpl方法。以下是其核心实现的屏幕截图:
从这里可以看出,RocketMQ消息发送高可用性设计的一个非常关键的点,重试机制,是通过在For循环中用Try Catch来包装SendKernelImpl方法来实现的,这可以确保该方法在抛出异常后可以继续重试。从上面可以看出,如果SYSTEM_BUSY抛出一个MQBrokerException,但只发现上面提到的错误代码被重试,因为如果不是上面提到的错误代码,异常将继续向外抛出,此时For循环将被中断,即不会被重试。
这里非常令人惊讶的是,即使是SYSTEM_ERROR也会再试一次,但它不包含SYSTEM_BUSY,这显然违背了快速故障的最初设计意图。因此,作者得出结论,这是RocketMQ的一个错误,省略了SYSTEM_BUSY。接下来,将会产生一个pr,添加一行代码,并添加SYSTEM_BUSY。
在这里分析问题时,问题应该很清楚。
解决方案
如果每个人都在网上搜索超时清除队列的解决方案,他们都会建议增加等待时间的值,比如将默认为200毫秒,设置为1000秒,等等。我以前反对它,因为我知道Broker会再试一次,但是现在我发现Broker不会再试一次,所以我现在认为如果BUG没有得到解决,适当地增加值可以有效地缓解它。
然而,这不是一个好的解决方案。我将在不久的将来向政府提交一份公共关系报告来解决这个问题。建议公司中的每个人都尽最大努力修改他们使用的版本并键入一个新的包,因为这违背了Broker快速失败的最初设计意图。
但是,在消息发送的业务方面,尽量实现消息本身的重试机制,即不要依赖RocketMQ本身提供的重试机制。由于网络等因素,消息发送不可能100%成功。建议每个人在发送消息时捕捉一个异常。如果发送失败,消息可以存储在数据库中,然后可以结合计时任务重试该消息,以确保消息不会最大程度地丢失。
本文由 在线网速测试 整理编辑,转载请注明出处。